diff --git a/Dev_Docs/API/04_Add_Block_to_Blockchain_API.md b/Dev_Docs/API/04_Add_Block_to_Blockchain_API.md index f914822..b0ec501 100644 --- a/Dev_Docs/API/04_Add_Block_to_Blockchain_API.md +++ b/Dev_Docs/API/04_Add_Block_to_Blockchain_API.md @@ -86,6 +86,7 @@ - `bad_signature`, `signature_verify_failed` - `prev_line_block_not_found`, `bad_prev_line_hash` - `limit_exceeded` +- `chain_resync_in_progress` — цепочка временно заблокирована полным resync - `repost_disabled` — репосты временно отключены до будущей реализации - `internal_error` diff --git a/Dev_Docs/Blockchain/sync-between-servers.md b/Dev_Docs/Blockchain/sync-between-servers.md index 3d908c4..b1cebed 100644 --- a/Dev_Docs/Blockchain/sync-between-servers.md +++ b/Dev_Docs/Blockchain/sync-between-servers.md @@ -74,6 +74,13 @@ 3. если локальная цепочка слабее, сервер по одному блоку вызывает `GetBlockchainBlock`; 4. каждый скачанный блок локально применяется через существующий `AddBlock`; 5. если у сервера ещё нет локальной записи пользователя/цепочки, перед этим подготавливается локальный `solana_users + blockchain_state`. +6. если во время replay обнаруживается рассинхрон или на одинаковой высоте удалённая цепочка сильнее, запускается полный resync: + - цепочка помечается in-memory как `resync in progress`; + - создаётся marker-file в `data/`; + - в одной SQL-транзакции очищаются локальные данные цепочки и корректируются чужие счётчики; + - удаляются `.bch` и `.tmp_bch`; + - цепочка подтягивается заново с `0` через `GetBlockchainBlock`. + - обычный `AddBlock` на эту цепочку в этот момент возвращает `chain_resync_in_progress`. ### 4.4 Зачем понадобился `GetSyncUserProfile` @@ -161,8 +168,12 @@ | Push новых DM партнёрам | Нужна реализация | | Push блоков блокчейна партнёрам | ✅ Реализована базовая one-shot версия | | Periodic backfill отсутствующего хвоста | ✅ Реализовано | -| Разрешение рассинхрона / divergence | Нужна реализация | +| Разрешение рассинхрона / divergence | ✅ Реализована базовая full-resync схема во время periodic sync | | Маршрутизация DM через access_servers | Нужна реализация (заглушка) | Текущая версия сервера уже умеет базовую синхронизацию блокчейнов между партнёрами. -Не реализованы ещё DM-sync, постоянные server-to-server соединения и автоматическое исправление рассинхрона цепочек. +Не реализованы ещё DM-sync, постоянные server-to-server соединения и recovery при старте по marker-file для resync. + +Следующие отдельные шаги после текущего этапа: +- добавить startup recovery по marker-file для resync-цепочек; +- вернуть обычному `AddBlock` настоящую `tmp_bch`-схему записи и recovery при резком рестарте. diff --git a/Dev_Docs/Future_Features/README.md b/Dev_Docs/Future_Features/README.md index 68bccfd..e1b5cdd 100644 --- a/Dev_Docs/Future_Features/README.md +++ b/Dev_Docs/Future_Features/README.md @@ -30,6 +30,7 @@ - `near/2026-05-25_1106_telegram_agent_players.md` - разрешённые пользователи Telegram для агента, отдельные папки игроков, персональные истории и публикация краткого вопроса/ответа в общий канал. - `near/2026-05-25_1106_wallet_topup_solana_arweave.md` - пополнение Solana и Arweave через внешний сервис покупки с подсказкой и копированием адреса. +- `near/2026-06-26_1215_tmp_bch_для_обычного_addblock.md` - вернуть обычному `AddBlock` настоящую crash-safe схему через `.tmp_bch` и привести writer к модели startup-recovery. ### Среднесрочные diff --git a/Dev_Docs/Future_Features/near/2026-06-26_1215_tmp_bch_для_обычного_addblock.md b/Dev_Docs/Future_Features/near/2026-06-26_1215_tmp_bch_для_обычного_addblock.md new file mode 100644 index 0000000..7a0d995 --- /dev/null +++ b/Dev_Docs/Future_Features/near/2026-06-26_1215_tmp_bch_для_обычного_addblock.md @@ -0,0 +1,45 @@ +# Вернуть настоящую tmp_bch-схему для обычного AddBlock + +## Зачем нужна эта доработка + +Сейчас обычный `AddBlock` пишет данные так: + +1. в SQL-транзакции добавляет блок в БД и обновляет `blockchain_state`; +2. делает `commit`; +3. только после этого дописывает основной файл `.bch`. + +Это рабочая схема, но она не идеально crash-safe. Если сервер резко упадёт между `commit` БД и записью файла, можно получить состояние: + +- в БД новый блок уже есть; +- в `.bch` файла этого блока ещё нет. + +При этом в проекте уже есть логика startup-recovery через `*.tmp_bch`, но текущий `BlockchainWriter` её полноценно не использует. Из-за этого writer и recovery сейчас живут в разных моделях. + +## Что планируем сделать + +Нужно вернуть единую и понятную схему: + +1. обычный `AddBlock` работает через временный файл `.tmp_bch`; +2. после успешной подготовки нового содержимого выполняется атомарная подмена `tmp -> main`; +3. `BlockchainTmpRecoveryOnStartup` и `BlockchainWriter` используют одну и ту же модель; +4. при резкой перезагрузке сервер на старте может корректно добрать или откатить незавершённый файловый шаг. + +## Что уже есть в коде + +- recovery-класс: `SHiNE-server/src/main/java/server/ws/BlockchainTmpRecoveryOnStartup.java` +- файловые helper-методы под `tmp_bch`: `SHiNE-server/shine-server-blockchain/src/main/java/utils/files/FileStoreUtil.java` +- текущий writer: `SHiNE-server/shine-server-net-protocol/src/main/java/server/logic/ws_protocol/JSON/handlers/blockchain/Net_AddBlock_Handler_utils/BlockchainWriter.java` + +## Что надо будет изменить + +1. Переписать `BlockchainWriter`, чтобы он больше не писал сразу в основной `.bch`. +2. Привести порядок записи и замены файла в соответствие с `BlockchainTmpRecoveryOnStartup`. +3. Проверить сценарии: + - обычный `AddBlock`; + - резкий restart до commit БД; + - резкий restart после commit БД, но до замены файла; + - recovery на старте при наличии `*.tmp_bch`. + +## Почему это отложено отдельно + +Это отдельная задача от server-to-server resync. Для divergence-resync сейчас важнее сначала закончить безопасный SQL cleanup одной цепочки, а усиление crash-safety обычного `AddBlock` делать следующим самостоятельным шагом. diff --git a/Dev_Docs/Pending_Features/2026-06-26_1100_blockchain_resync_cleanup_dao.md b/Dev_Docs/Pending_Features/2026-06-26_1100_blockchain_resync_cleanup_dao.md new file mode 100644 index 0000000..133ecf1 --- /dev/null +++ b/Dev_Docs/Pending_Features/2026-06-26_1100_blockchain_resync_cleanup_dao.md @@ -0,0 +1,19 @@ +# DAO очистки одной blockchain-цепочки перед полным resync + +- Краткое описание: + - Добавлен отдельный DAO-метод, который в одной SQL-транзакции очищает одну blockchain-цепочку перед её полной повторной загрузкой. + - Внутри транзакции сначала уменьшаются чужие `likes_count` и `replies_count`, которые были увеличены блоками удаляемой цепочки. + - Затем удаляются локальные записи цепочки и её derived-state. + - Файлы `.bch` / `.tmp_bch` этот DAO не удаляет: файловый шаг будет отдельной следующей задачей. + - DAO уже используется в periodic full-resync flow, но это ещё не было вручную проверено на прод-цикле. + +- Что именно проверять: + - Метод корректно компилируется и не ломает сборку. + - Метод подключён в рабочий periodic full-resync path, но manual e2e verification ещё нужна. + - Комментарии в коде понятны и соответствуют реальному порядку SQL-шагов. + +- Ожидаемый результат: + - Появилась изолированная транзакционная точка очистки одной цепочки, на которую можно безопасно опереться при следующем шаге реализации divergence-resync. + +- Статус: + - `pending` diff --git a/SHiNE-server/shine-server-blockchain/src/main/java/utils/files/FileStoreUtil.java b/SHiNE-server/shine-server-blockchain/src/main/java/utils/files/FileStoreUtil.java index 9872c88..f618e52 100644 --- a/SHiNE-server/shine-server-blockchain/src/main/java/utils/files/FileStoreUtil.java +++ b/SHiNE-server/shine-server-blockchain/src/main/java/utils/files/FileStoreUtil.java @@ -29,6 +29,9 @@ public final class FileStoreUtil { /** Расширение временного файла (старое+новое). */ public static final String BLOCKCHAIN_TMP_EXTENSION = ".tmp_bch"; + /** Маркер того, что chain сейчас в процессе полного resync. */ + public static final String BLOCKCHAIN_RESYNC_MARKER_EXTENSION = ".resync_pending"; + private static final FileStoreUtil INSTANCE = new FileStoreUtil(); private final Path dataDirPath; @@ -130,6 +133,44 @@ public final class FileStoreUtil { newFile(buildBlockchainTmpFileName(blockchainName), data); } + /** .resync_pending */ + public String buildBlockchainResyncMarkerFileName(String blockchainName) { + validateSimpleFileName(blockchainName); + return blockchainName + BLOCKCHAIN_RESYNC_MARKER_EXTENSION; + } + + public Path resolveBlockchainResyncMarkerPath(String blockchainName) { + return resolveSafe(buildBlockchainResyncMarkerFileName(blockchainName)); + } + + public void writeBlockchainResyncMarker(String blockchainName, String markerContent) { + byte[] data = markerContent == null ? new byte[0] : markerContent.getBytes(java.nio.charset.StandardCharsets.UTF_8); + newFile(buildBlockchainResyncMarkerFileName(blockchainName), data); + } + + public void deleteIfExists(Path path) { + if (path == null) { + return; + } + try { + Files.deleteIfExists(path); + } catch (IOException e) { + throw new IllegalStateException("Не удалось удалить файл: " + path, e); + } + } + + public void deleteBlockchainFileIfExists(String blockchainName) { + deleteIfExists(resolveBlockchainPath(blockchainName)); + } + + public void deleteBlockchainTmpFileIfExists(String blockchainName) { + deleteIfExists(resolveBlockchainTmpPath(blockchainName)); + } + + public void deleteBlockchainResyncMarkerIfExists(String blockchainName) { + deleteIfExists(resolveBlockchainResyncMarkerPath(blockchainName)); + } + /** * Атомарно заменить основной файл блокчейна временным: * .tmp_bch -> .bch @@ -202,4 +243,4 @@ public final class FileStoreUtil { throw new IllegalArgumentException("Недопустимое имя файла: " + fileName); } } -} \ No newline at end of file +} diff --git a/SHiNE-server/shine-server-db/src/main/java/shine/db/dao/BlockchainResyncCleanupDAO.java b/SHiNE-server/shine-server-db/src/main/java/shine/db/dao/BlockchainResyncCleanupDAO.java new file mode 100644 index 0000000..9fe04ae --- /dev/null +++ b/SHiNE-server/shine-server-db/src/main/java/shine/db/dao/BlockchainResyncCleanupDAO.java @@ -0,0 +1,394 @@ +package shine.db.dao; + +import shine.db.DatabaseInitializer; +import shine.db.SqliteDbController; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; + +/** + * BlockchainResyncCleanupDAO — подготовительный "жёсткий reset" одной blockchain-цепочки + * перед её полной повторной загрузкой от сервера-партнёра. + * + * Что делает этот DAO: + * 1) в ОДНОЙ SQL-транзакции сначала аккуратно уменьшает чужие агрегаты, + * которые были увеличены блоками удаляемой цепочки: + * - likes_count + * - replies_count + * 2) затем удаляет все локальные записи самой цепочки и её производные состояния. + * + * Почему это вынесено в отдельный DAO-метод, а не в триггеры DELETE: + * - нам нужен один понятный "блок операции", который можно вызвать из resync-flow; + * - эта схема проще и прозрачнее, чем много обратных триггеров по разным таблицам; + * - если любой шаг не удался, делаем rollback и БД остаётся в исходном состоянии; + * - файловые действия (.bch / .tmp_bch) сознательно НЕ входят в эту транзакцию: + * SQLite не может атомарно закоммитить и SQL, и файловую систему сразу; + * поэтому БД-чистка делается здесь, а файловая чистка будет следующим шагом + * отдельным recovery/resync-слоем после успешного commit. + * + * Важный смысл текущей реализации: + * - мы НЕ трогаем identity-слой (`solana_users`) и НЕ трогаем DM-таблицы; + * - мы очищаем только блокчейн пользователя и derived-state, который строится из неё; + * - висячие cross-chain ссылки в чужих blocks допускаются как нормальное поведение системы. + */ +public final class BlockchainResyncCleanupDAO { + + private static final int BLOCKCHAIN_LOGIN_SUFFIX_LEN = 4; // "-001" + + private static volatile BlockchainResyncCleanupDAO instance; + + private final SqliteDbController db = SqliteDbController.getInstance(); + + private BlockchainResyncCleanupDAO() {} + + public static BlockchainResyncCleanupDAO getInstance() { + if (instance == null) { + synchronized (BlockchainResyncCleanupDAO.class) { + if (instance == null) instance = new BlockchainResyncCleanupDAO(); + } + } + return instance; + } + + /** + * Полностью очищает одну blockchain-цепочку и локальные derived-state, собранные из неё. + * + * Порядок внутри транзакции намеренно такой: + * 1. Сначала уменьшаем чужие likes_count для тех целей, где финальное состояние + * реакции этой цепочки было LIKE. + * 2. Сначала уменьшаем чужие replies_count для reply-блоков этой цепочки. + * 3. После этого удаляем локальные derived-state самой цепочки. + * 4. В конце удаляем blocks и blockchain_state. + * + * Это правильно потому, что агрегаты (`message_stats`) должны видеть исходные blocks + * и reactions_state на момент пересчёта. Если удалить blocks раньше, мы потеряем + * источник правды для корректного уменьшения счётчиков. + * + * Метод идемпотентен по смыслу: + * - если часть данных уже удалена раньше, повторный вызов просто удалит "0 строк"; + * - если blockchain_state уже отсутствует, login берём из blockchainName. + * + * Отдельно важно: + * - здесь НЕТ удаления .bch/.tmp_bch; + * - здесь НЕТ повторной загрузки цепочки; + * - это только атомарная SQL-очистка БД, на которую потом будет опираться resync-flow. + */ + public CleanupResult cleanupBlockchainForFullResync(String blockchainName) throws SQLException { + if (blockchainName == null || blockchainName.isBlank()) { + throw new IllegalArgumentException("blockchainName is blank"); + } + + try (Connection c = db.getConnection()) { + boolean oldAutoCommit = c.getAutoCommit(); + c.setAutoCommit(false); + try { + String login = resolveLoginForCleanup(c, blockchainName); + + int likesAdjusted = decreaseForeignLikesCount(c, blockchainName); + int repliesAdjusted = decreaseForeignRepliesCount(c, blockchainName); + + int deletedMessageStats = deleteMessageStatsForOwnTargets(c, blockchainName); + int deletedReactionsState = deleteReactionsStateForActorChain(c, blockchainName); + int deletedConnectionsState = deleteConnectionsStateForLogin(c, login); + int deletedUsersParams = deleteUsersParamsForLogin(c, login); + int deletedChannelNames = deleteChannelNamesForOwnerChain(c, blockchainName); + int deletedChat200State = deleteChat200StateForOwnerChain(c, blockchainName); + int deletedChat200Members = deleteChat200MembersForOwnerChain(c, blockchainName); + int deletedBlocks = deleteBlocksForChain(c, blockchainName); + int deletedBlockchainState = deleteBlockchainStateForChain(c, blockchainName); + + c.commit(); + + return new CleanupResult( + login, + likesAdjusted, + repliesAdjusted, + deletedMessageStats, + deletedReactionsState, + deletedConnectionsState, + deletedUsersParams, + deletedChannelNames, + deletedChat200State, + deletedChat200Members, + deletedBlocks, + deletedBlockchainState + ); + } catch (Exception e) { + try { c.rollback(); } catch (Exception ignored) {} + if (e instanceof SQLException sqlEx) throw sqlEx; + throw new SQLException("Не удалось очистить blockchain для полного resync: " + blockchainName, e); + } finally { + try { c.setAutoCommit(oldAutoCommit); } catch (Exception ignored) {} + } + } + } + + private String resolveLoginForCleanup(Connection c, String blockchainName) throws SQLException { + String sql = """ + SELECT login + FROM blockchain_state + WHERE blockchain_name = ? + LIMIT 1 + """; + try (PreparedStatement ps = c.prepareStatement(sql)) { + ps.setString(1, blockchainName); + try (ResultSet rs = ps.executeQuery()) { + if (rs.next()) { + String login = rs.getString("login"); + if (login != null && !login.isBlank()) { + return login; + } + } + } + } + + String loginFromName = loginFromBlockchainName(blockchainName); + if (loginFromName == null || loginFromName.isBlank()) { + throw new IllegalArgumentException("Cannot derive login from blockchainName: " + blockchainName); + } + return loginFromName; + } + + /** + * DAO остаётся в модуле БД и не тянет зависимость на blockchain-utils модуль. + * Поэтому здесь локально повторяем минимальное правило имени chain: + * login + "-NNN". + */ + private String loginFromBlockchainName(String blockchainName) { + if (blockchainName == null) return null; + + String s = blockchainName.trim(); + if (s.length() <= BLOCKCHAIN_LOGIN_SUFFIX_LEN) return null; + + int dashPos = s.length() - BLOCKCHAIN_LOGIN_SUFFIX_LEN; + if (s.charAt(dashPos) != '-') return null; + + for (int i = dashPos + 1; i < s.length(); i++) { + char ch = s.charAt(i); + if (ch < '0' || ch > '9') return null; + } + return s.substring(0, dashPos); + } + + /** + * Уменьшаем likes_count только для ЧУЖИХ целей. + * + * Логика: + * - если у удаляемой цепочки финальное состояние реакции на цель = LIKE, + * значит при полном удалении цепочки этот активный лайк исчезает; + * - значит у message_stats этой чужой цели нужно сделать -1; + * - для целей внутри этой же chain этого делать не нужно, потому что сами цели + * тоже будут удалены вместе с цепочкой. + */ + private int decreaseForeignLikesCount(Connection c, String blockchainName) throws SQLException { + String sql = """ + UPDATE message_stats + SET likes_count = MAX( + 0, + likes_count - ( + SELECT COUNT(*) + FROM reactions_state rs + WHERE rs.from_bch_name = ? + AND rs.reaction_type = ? + AND rs.last_sub_type = ? + AND rs.to_login = message_stats.to_login + AND rs.to_bch_name = message_stats.to_bch_name + AND rs.to_block_number = message_stats.to_block_number + AND rs.to_block_hash = message_stats.to_block_hash + AND rs.to_bch_name <> ? + ) + ) + WHERE EXISTS ( + SELECT 1 + FROM reactions_state rs + WHERE rs.from_bch_name = ? + AND rs.reaction_type = ? + AND rs.last_sub_type = ? + AND rs.to_login = message_stats.to_login + AND rs.to_bch_name = message_stats.to_bch_name + AND rs.to_block_number = message_stats.to_block_number + AND rs.to_block_hash = message_stats.to_block_hash + AND rs.to_bch_name <> ? + ) + """; + try (PreparedStatement ps = c.prepareStatement(sql)) { + int i = 1; + ps.setString(i++, blockchainName); + ps.setInt(i++, DatabaseInitializer.REACTION_LIKE); + ps.setInt(i++, DatabaseInitializer.REACTION_LIKE); + ps.setString(i++, blockchainName); + ps.setString(i++, blockchainName); + ps.setInt(i++, DatabaseInitializer.REACTION_LIKE); + ps.setInt(i++, DatabaseInitializer.REACTION_LIKE); + ps.setString(i++, blockchainName); + return ps.executeUpdate(); + } + } + + /** + * Уменьшаем replies_count только для ЧУЖИХ целей. + * + * Если reply этой цепочки ссылался на сообщение из другой цепочки, + * значит после удаления blocks этой цепочки чужой replies_count должен уменьшиться. + * Reply на собственные сообщения здесь игнорируем: целевая цепочка тоже будет удалена. + */ + private int decreaseForeignRepliesCount(Connection c, String blockchainName) throws SQLException { + String sql = """ + UPDATE message_stats + SET replies_count = MAX( + 0, + replies_count - COALESCE(( + SELECT COUNT(*) + FROM blocks b + WHERE b.bch_name = ? + AND b.msg_type = 1 + AND b.msg_sub_type = ? + AND b.to_login = message_stats.to_login + AND b.to_bch_name = message_stats.to_bch_name + AND b.to_block_number = message_stats.to_block_number + AND b.to_block_hash = message_stats.to_block_hash + AND b.to_bch_name <> ? + ), 0) + ) + WHERE EXISTS ( + SELECT 1 + FROM blocks b + WHERE b.bch_name = ? + AND b.msg_type = 1 + AND b.msg_sub_type = ? + AND b.to_login = message_stats.to_login + AND b.to_bch_name = message_stats.to_bch_name + AND b.to_block_number = message_stats.to_block_number + AND b.to_block_hash = message_stats.to_block_hash + AND b.to_bch_name <> ? + ) + """; + try (PreparedStatement ps = c.prepareStatement(sql)) { + int i = 1; + ps.setString(i++, blockchainName); + ps.setInt(i++, DatabaseInitializer.TEXT_REPLY); + ps.setString(i++, blockchainName); + ps.setString(i++, blockchainName); + ps.setInt(i++, DatabaseInitializer.TEXT_REPLY); + ps.setString(i++, blockchainName); + return ps.executeUpdate(); + } + } + + /** + * Статистика сообщений самой удаляемой цепочки после reset не нужна, + * потому что её цели исчезают вместе с chain source data. + */ + private int deleteMessageStatsForOwnTargets(Connection c, String blockchainName) throws SQLException { + return executeDelete(c, """ + DELETE FROM message_stats + WHERE to_bch_name = ? + """, blockchainName); + } + + /** + * reactions_state хранит финальное состояние реакций АКТОРА. + * После удаления всей цепочки актор этой цепочки исчезает, поэтому + * достаточно удалить все строки по from_bch_name. + */ + private int deleteReactionsStateForActorChain(Connection c, String blockchainName) throws SQLException { + return executeDelete(c, """ + DELETE FROM reactions_state + WHERE from_bch_name = ? + """, blockchainName); + } + + /** + * connections_state — текущее состояние связей, выставленных этим login. + * Чистим по владельцу состояния. + */ + private int deleteConnectionsStateForLogin(Connection c, String login) throws SQLException { + return executeDelete(c, """ + DELETE FROM connections_state + WHERE login = ? COLLATE NOCASE + """, login); + } + + /** + * users_params — актуальные параметры, собранные из блоков пользователя. + */ + private int deleteUsersParamsForLogin(Connection c, String login) throws SQLException { + return executeDelete(c, """ + DELETE FROM users_params + WHERE login = ? COLLATE NOCASE + """, login); + } + + /** + * Каналы принадлежат owner_bch_name. + */ + private int deleteChannelNamesForOwnerChain(Connection c, String blockchainName) throws SQLException { + return executeDelete(c, """ + DELETE FROM channel_names_state + WHERE owner_bch_name = ? + """, blockchainName); + } + + private int deleteChat200StateForOwnerChain(Connection c, String blockchainName) throws SQLException { + return executeDelete(c, """ + DELETE FROM chat200_state + WHERE owner_bch_name = ? + """, blockchainName); + } + + private int deleteChat200MembersForOwnerChain(Connection c, String blockchainName) throws SQLException { + return executeDelete(c, """ + DELETE FROM chat200_members_state + WHERE owner_bch_name = ? + """, blockchainName); + } + + /** + * blocks удаляем в конце, потому что до этого шага они нужны как источник правды + * для уменьшения replies_count. + */ + private int deleteBlocksForChain(Connection c, String blockchainName) throws SQLException { + return executeDelete(c, """ + DELETE FROM blocks + WHERE bch_name = ? + """, blockchainName); + } + + /** + * blockchain_state удаляем после blocks, чтобы не нарушать FK-связь blocks -> blockchain_state. + */ + private int deleteBlockchainStateForChain(Connection c, String blockchainName) throws SQLException { + return executeDelete(c, """ + DELETE FROM blockchain_state + WHERE blockchain_name = ? + """, blockchainName); + } + + private int executeDelete(Connection c, String sql, String value) throws SQLException { + try (PreparedStatement ps = c.prepareStatement(sql)) { + ps.setString(1, value); + return ps.executeUpdate(); + } + } + + /** + * Технический результат cleanup-операции. + * Нужен для будущего логирования и ручной диагностики resync-flow. + */ + public record CleanupResult( + String login, + int likesAdjustedRows, + int repliesAdjustedRows, + int deletedMessageStatsRows, + int deletedReactionsStateRows, + int deletedConnectionsStateRows, + int deletedUsersParamsRows, + int deletedChannelNamesRows, + int deletedChat200StateRows, + int deletedChat200MembersRows, + int deletedBlocksRows, + int deletedBlockchainStateRows + ) {} +} diff --git a/SHiNE-server/shine-server-net-protocol/src/main/java/server/logic/ws_protocol/JSON/handlers/blockchain/Net_AddBlock_Handler.java b/SHiNE-server/shine-server-net-protocol/src/main/java/server/logic/ws_protocol/JSON/handlers/blockchain/Net_AddBlock_Handler.java index bc77b07..02df74a 100644 --- a/SHiNE-server/shine-server-net-protocol/src/main/java/server/logic/ws_protocol/JSON/handlers/blockchain/Net_AddBlock_Handler.java +++ b/SHiNE-server/shine-server-net-protocol/src/main/java/server/logic/ws_protocol/JSON/handlers/blockchain/Net_AddBlock_Handler.java @@ -17,6 +17,7 @@ import server.logic.ws_protocol.JSON.entyties.Net_Request; import server.logic.ws_protocol.JSON.entyties.Net_Response; import server.logic.ws_protocol.JSON.handlers.JsonMessageHandler; import server.logic.ws_protocol.JSON.handlers.blockchain.Net_AddBlock_Handler_utils.BlockchainLocks; +import server.sync.BlockchainResyncGuard; import server.logic.ws_protocol.JSON.handlers.blockchain.Net_AddBlock_Handler_utils.BlockchainWriter; import server.logic.ws_protocol.JSON.handlers.blockchain.entyties.Net_AddBlock_Request; import server.logic.ws_protocol.JSON.handlers.blockchain.entyties.Net_AddBlock_Response; @@ -70,6 +71,21 @@ public final class Net_AddBlock_Handler implements JsonMessageHandler { Net_AddBlock_Request req = (Net_AddBlock_Request) baseReq; String blockchainName = req.getBlockchainName(); + if (blockchainName == null || blockchainName.isBlank()) { + return error(req, WireCodes.Status.BAD_REQUEST, "empty_blockchain_name", 0, ""); + } + if (BlockchainResyncGuard.isBlockedForExternalAddBlock(blockchainName)) { + BlockchainStateEntry currentState = null; + try { + currentState = stateDAO.getByBlockchainName(blockchainName); + } catch (Exception ignored) { + } + int lastNum = currentState != null ? currentState.getLastBlockNumber() : -1; + String lastHash = currentState != null ? toHex(currentState.getLastBlockHash()) : ""; + return error(req, 423, "chain_resync_in_progress", + lastNum, + lastHash); + } ReentrantLock lock = BlockchainLocks.lockFor(blockchainName); lock.lock(); try { @@ -126,7 +142,8 @@ public final class Net_AddBlock_Handler implements JsonMessageHandler { } private static String humanMessage(String code) { - if (code == null) return "Ошибка добавления блока"; return switch (code) { + if (code == null) return "Ошибка добавления блока"; + return switch (code) { case "empty_blockchain_name" -> "Пустое имя блокчейна"; case "bad_blockchain_name" -> "Некорректное имя блокчейна"; case "db_error" -> "Ошибка базы данных"; @@ -150,6 +167,7 @@ public final class Net_AddBlock_Handler implements JsonMessageHandler { case "channel_name_already_exists" -> "Такое название канала уже занято"; case "repost_disabled" -> "Репосты временно отключены до будущей реализации"; case "internal_error" -> "Внутренняя ошибка сервера при записи блока"; + case "chain_resync_in_progress" -> "Цепочка сейчас пересинхронизируется"; default -> "Ошибка: " + code; }; } diff --git a/SHiNE-server/shine-server-net-protocol/src/main/java/server/sync/BlockchainResyncGuard.java b/SHiNE-server/shine-server-net-protocol/src/main/java/server/sync/BlockchainResyncGuard.java new file mode 100644 index 0000000..b94956e --- /dev/null +++ b/SHiNE-server/shine-server-net-protocol/src/main/java/server/sync/BlockchainResyncGuard.java @@ -0,0 +1,84 @@ +package server.sync; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * In-memory guard для цепочек, которые сейчас находятся в полном resync. + * + * Задача guard-а: + * - не давать обычному AddBlock писать в цепочку, пока она пересобирается; + * - позволять внутреннему resync-потоку безопасно вызывать тот же AddBlock-путь + * через thread-local bypass. + */ +public final class BlockchainResyncGuard { + + private static final Set ACTIVE = ConcurrentHashMap.newKeySet(); + private static final ThreadLocal> BYPASS = ThreadLocal.withInitial(HashSet::new); + + private BlockchainResyncGuard() {} + + public static boolean tryBegin(String blockchainName) { + String key = normalize(blockchainName); + if (key == null) return false; + return ACTIVE.add(key); + } + + public static void end(String blockchainName) { + String key = normalize(blockchainName); + if (key == null) return; + ACTIVE.remove(key); + } + + public static boolean isBlockedForExternalAddBlock(String blockchainName) { + String key = normalize(blockchainName); + if (key == null) return false; + return ACTIVE.contains(key) && !isBypassed(key); + } + + public static T withBypass(String blockchainName, ThrowingSupplier supplier) throws Exception { + String key = normalize(blockchainName); + if (key == null) { + return supplier.get(); + } + + Set bypassSet = BYPASS.get(); + bypassSet.add(key); + try { + return supplier.get(); + } finally { + bypassSet.remove(key); + if (bypassSet.isEmpty()) { + BYPASS.remove(); + } + } + } + + public static void withBypass(String blockchainName, ThrowingRunnable runnable) throws Exception { + withBypass(blockchainName, () -> { + runnable.run(); + return null; + }); + } + + private static boolean isBypassed(String blockchainName) { + return BYPASS.get().contains(blockchainName); + } + + private static String normalize(String value) { + if (value == null) return null; + String s = value.trim(); + return s.isEmpty() ? null : s; + } + + @FunctionalInterface + public interface ThrowingSupplier { + T get() throws Exception; + } + + @FunctionalInterface + public interface ThrowingRunnable { + void run() throws Exception; + } +} diff --git a/SHiNE-server/src/main/java/server/sync/PeriodicBlockchainSyncService.java b/SHiNE-server/src/main/java/server/sync/PeriodicBlockchainSyncService.java index ae903ff..18de07b 100644 --- a/SHiNE-server/src/main/java/server/sync/PeriodicBlockchainSyncService.java +++ b/SHiNE-server/src/main/java/server/sync/PeriodicBlockchainSyncService.java @@ -6,15 +6,20 @@ import server.logic.ws_protocol.JSON.entyties.Net_Exception_Response; import server.logic.ws_protocol.JSON.entyties.Net_Response; import server.logic.ws_protocol.JSON.handlers.auth.SolanaUserPdaImportService; import server.logic.ws_protocol.JSON.handlers.blockchain.Net_AddBlock_Handler; +import server.logic.ws_protocol.JSON.handlers.blockchain.Net_AddBlock_Handler_utils.BlockchainLocks; import server.logic.ws_protocol.JSON.handlers.blockchain.entyties.Net_AddBlock_Request; +import shine.db.dao.BlockchainResyncCleanupDAO; import shine.db.dao.BlockchainStateDAO; import shine.db.dao.SyncServersDAO; import shine.db.dao.UserCreateDAO; import shine.db.entities.BlockchainStateEntry; import shine.db.entities.SyncServerEntry; +import server.sync.BlockchainResyncGuard; +import utils.files.FileStoreUtil; import utils.blockchain.BlockchainNameUtil; import utils.config.AppConfig; +import java.util.concurrent.locks.ReentrantLock; import java.util.List; import java.util.Locale; import java.util.concurrent.Executors; @@ -25,8 +30,9 @@ import java.util.concurrent.atomic.AtomicBoolean; /** * Плановый межсерверный sync блокчейнов. - * Сейчас реализует только догоняющую синхронизацию отсутствующего хвоста. - * Случай рассинхрона цепочек пока только логируется и пропускается. + * Сейчас реализует: + * - догоняющую синхронизацию отсутствующего хвоста; + * - базовый full-resync при divergence, если удалённая цепочка сильнее. */ public final class PeriodicBlockchainSyncService { @@ -47,6 +53,8 @@ public final class PeriodicBlockchainSyncService { private static final BlockchainStateDAO STATE_DAO = BlockchainStateDAO.getInstance(); private static final SyncServersDAO SYNC_SERVERS_DAO = SyncServersDAO.getInstance(); private static final UserCreateDAO USER_CREATE_DAO = UserCreateDAO.getInstance(); + private static final BlockchainResyncCleanupDAO RESYNC_CLEANUP_DAO = BlockchainResyncCleanupDAO.getInstance(); + private static final FileStoreUtil FILE_STORE = FileStoreUtil.getInstance(); private static final String CONFIG_IMPORT_PROFILE_FROM_PARTNER = "sync.importUserProfileFromPartner.enabled"; private PeriodicBlockchainSyncService() {} @@ -121,14 +129,25 @@ public final class PeriodicBlockchainSyncService { if (localHash.equalsIgnoreCase(remoteHead.lastBlockHash())) { continue; } - log.warn("Periodic blockchain sync: divergence detected but not implemented yet. partner={} blockchainName={} localLast={} localHash={} remoteHash={} localSize={} remoteSize={}", - partnerLogin, - remoteHead.blockchainName(), - localLast, - localHash, - remoteHead.lastBlockHash(), - localState.getFileSizeBytes(), - remoteHead.fileSizeBytes()); + if (isRemoteStronger(localState, remoteHead)) { + log.warn("Periodic blockchain sync: divergence detected, remote chain is stronger, starting full resync. partner={} blockchainName={} localLast={} localHash={} remoteHash={} localSize={} remoteSize={}", + partnerLogin, + remoteHead.blockchainName(), + localLast, + localHash, + remoteHead.lastBlockHash(), + localState.getFileSizeBytes(), + remoteHead.fileSizeBytes()); + resyncFromScratch(partner, remoteHead); + } else { + log.info("Periodic blockchain sync skipped: local chain is stronger or equal. partner={} blockchainName={} localLast={} remoteLast={} localSize={} remoteSize={}", + partnerLogin, + remoteHead.blockchainName(), + localLast, + remoteHead.lastBlockNumber(), + localState.getFileSizeBytes(), + remoteHead.fileSizeBytes()); + } continue; } @@ -163,8 +182,9 @@ public final class PeriodicBlockchainSyncService { LocalAddBlockApplyResult result = applyBlockLocally(remoteBlock, blockNumber == 0 ? "" : localHash); if (!result.ok()) { if ("bad_prev_hash".equalsIgnoreCase(result.code()) || "bad_block_number".equalsIgnoreCase(result.code())) { - log.warn("Periodic blockchain sync: divergence detected during replay, but reconciliation is not implemented yet. partner={} blockchainName={} blockNumber={} code={}", + log.warn("Periodic blockchain sync: divergence detected during replay, starting full resync. partner={} blockchainName={} blockNumber={} code={}", partnerLogin, remoteHead.blockchainName(), blockNumber, result.code()); + resyncFromScratch(partner, remoteHead); } else { log.warn("Periodic blockchain sync: local AddBlock rejected remote block. partner={} blockchainName={} blockNumber={} code={} message={}", partnerLogin, remoteHead.blockchainName(), blockNumber, result.code(), result.message()); @@ -179,6 +199,132 @@ public final class PeriodicBlockchainSyncService { partnerLogin, remoteHead.blockchainName(), fromBlockNumber, remoteHead.lastBlockNumber()); } + private static void resyncFromScratch( + SyncServerEntry partner, + RemoteBlockchainSyncClient.RemoteBlockchainHead remoteHead + ) throws Exception { + if (partner == null || remoteHead == null || remoteHead.blockchainName() == null || remoteHead.blockchainName().isBlank()) { + return; + } + + String blockchainName = remoteHead.blockchainName(); + String partnerLogin = normalize(partner.getLogin()); + + if (!BlockchainResyncGuard.tryBegin(blockchainName)) { + log.warn("Blockchain resync skipped: already in progress for blockchainName={}", blockchainName); + return; + } + + String markerContent = """ + blockchainName=%s + partnerLogin=%s + partnerAddress=%s + remoteLastBlockNumber=%d + remoteLastBlockHash=%s + remoteFileSizeBytes=%d + startedAtMs=%d + """.formatted( + blockchainName, + partnerLogin == null ? "" : partnerLogin, + partner.getServerAddress() == null ? "" : partner.getServerAddress(), + remoteHead.lastBlockNumber(), + remoteHead.lastBlockHash(), + remoteHead.fileSizeBytes(), + System.currentTimeMillis() + ); + + FILE_STORE.writeBlockchainResyncMarker(blockchainName, markerContent); + + ReentrantLock chainLock = BlockchainLocks.lockFor(blockchainName); + chainLock.lock(); + try { + BlockchainResyncCleanupDAO.CleanupResult cleanup = + RESYNC_CLEANUP_DAO.cleanupBlockchainForFullResync(blockchainName); + log.info("Blockchain resync cleanup finished: blockchainName={} login={} likesAdjusted={} repliesAdjusted={} deletedBlocks={} deletedState={}", + blockchainName, + cleanup.login(), + cleanup.likesAdjustedRows(), + cleanup.repliesAdjustedRows(), + cleanup.deletedBlocksRows(), + cleanup.deletedBlockchainStateRows()); + + FILE_STORE.deleteBlockchainFileIfExists(blockchainName); + FILE_STORE.deleteBlockchainTmpFileIfExists(blockchainName); + + if (!ensureLocalChainExists(partner, blockchainName)) { + log.warn("Blockchain resync aborted: failed to recreate local chain state. partner={} blockchainName={}", + partnerLogin, blockchainName); + return; + } + + boolean replayOk = replayRemoteChainFromStart(partner, remoteHead); + if (replayOk) { + log.info("Blockchain resync completed: partner={} blockchainName={} blocks=0..{}", + partnerLogin, blockchainName, remoteHead.lastBlockNumber()); + } + } finally { + try { + FILE_STORE.deleteBlockchainResyncMarkerIfExists(blockchainName); + } catch (Exception e) { + log.warn("Blockchain resync: failed to delete marker file blockchainName={}", blockchainName, e); + } + chainLock.unlock(); + BlockchainResyncGuard.end(blockchainName); + } + } + + private static boolean replayRemoteChainFromStart( + SyncServerEntry partner, + RemoteBlockchainSyncClient.RemoteBlockchainHead remoteHead + ) throws Exception { + String blockchainName = remoteHead.blockchainName(); + String partnerLogin = normalize(partner.getLogin()); + + return BlockchainResyncGuard.withBypass(blockchainName, () -> { + String localPrevHash = ""; + for (int blockNumber = 0; blockNumber <= remoteHead.lastBlockNumber(); blockNumber++) { + RemoteBlockchainSyncClient.RemoteBlockchainBlock remoteBlock = + REMOTE.getBlockchainBlock(partner.getServerAddress(), blockchainName, blockNumber); + if (remoteBlock == null) { + log.warn("Blockchain resync: remote block not found. partner={} blockchainName={} blockNumber={}", + partnerLogin, blockchainName, blockNumber); + return false; + } + + LocalAddBlockApplyResult result = applyBlockLocally(remoteBlock, localPrevHash); + if (!result.ok()) { + log.warn("Blockchain resync: AddBlock rejected replay block. partner={} blockchainName={} blockNumber={} code={} message={}", + partnerLogin, blockchainName, blockNumber, result.code(), result.message()); + return false; + } + + localPrevHash = result.serverLastHash(); + } + return true; + }); + } + + private static boolean isRemoteStronger(BlockchainStateEntry localState, + RemoteBlockchainSyncClient.RemoteBlockchainHead remoteHead) { + if (localState == null || remoteHead == null) return false; + + int localLast = localState.getLastBlockNumber(); + int remoteLast = remoteHead.lastBlockNumber(); + if (remoteLast != localLast) { + return remoteLast > localLast; + } + + long localSize = localState.getFileSizeBytes(); + long remoteSize = remoteHead.fileSizeBytes(); + if (remoteSize != localSize) { + return remoteSize > localSize; + } + + String localHash = toHex32(localState.getLastBlockHash()).toLowerCase(Locale.ROOT); + String remoteHash = normalizeHex64(remoteHead.lastBlockHash()); + return remoteHash.compareTo(localHash) > 0; + } + private static LocalAddBlockApplyResult applyBlockLocally( RemoteBlockchainSyncClient.RemoteBlockchainBlock remoteBlock, String prevHash @@ -273,6 +419,12 @@ public final class PeriodicBlockchainSyncService { return sb.toString(); } + private static String normalizeHex64(String value) { + if (value == null) return ""; + String s = value.trim().toLowerCase(Locale.ROOT); + return s.length() == 64 ? s : ""; + } + private record LocalAddBlockApplyResult( boolean ok, String code, diff --git a/VERSION.properties b/VERSION.properties index 5f18382..3e4cf12 100644 --- a/VERSION.properties +++ b/VERSION.properties @@ -1,2 +1,2 @@ -client.version=1.2.273 -server.version=1.2.253 +client.version=1.2.274 +server.version=1.2.254