From 84e0f039cba8ed93994eaeb63d2a242e2e52abb613d7fd47148ab4e7878f9edf Mon Sep 17 00:00:00 2001 From: AidarKC Date: Thu, 25 Jun 2026 17:58:07 +0400 Subject: [PATCH] =?UTF-8?q?=D0=94=D0=BE=D0=B1=D0=B0=D0=B2=D0=B8=D1=82?= =?UTF-8?q?=D1=8C=20=D0=BF=D0=B5=D1=80=D0=B8=D0=BE=D0=B4=D0=B8=D1=87=D0=B5?= =?UTF-8?q?=D1=81=D0=BA=D0=B8=D0=B9=20sync=20=D0=B1=D0=BB=D0=BE=D0=BA?= =?UTF-8?q?=D1=87=D0=B5=D0=B9=D0=BD=D0=BE=D0=B2=20=D0=BA=D0=B0=D0=B6=D0=B4?= =?UTF-8?q?=D1=8B=D0=B5=2012=20=D1=87=D0=B0=D1=81=D0=BE=D0=B2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../API/04_Add_Block_to_Blockchain_API.md | 66 ++++- Dev_Docs/API/09_Operations_Index.md | 1 + ...026-06-25_1200_periodic_blockchain_sync.md | 23 ++ .../ws_protocol/JSON/JsonHandlerRegistry.java | 4 + .../Net_GetBlockchainBlock_Handler.java | 80 ++++++ .../Net_GetBlockchainBlock_Request.java | 15 ++ .../Net_GetBlockchainBlock_Response.java | 23 ++ .../sync/RemoteBlockchainSyncClient.java | 228 ++++++++++++++++ .../sync/PeriodicBlockchainSyncService.java | 244 ++++++++++++++++++ .../src/main/java/server/ws/WsServer.java | 6 + VERSION.properties | 4 +- 11 files changed, 690 insertions(+), 4 deletions(-) create mode 100644 Dev_Docs/Pending_Features/2026-06-25_1200_periodic_blockchain_sync.md create mode 100644 SHiNE-server/shine-server-net-protocol/src/main/java/server/logic/ws_protocol/JSON/handlers/blockchain/Net_GetBlockchainBlock_Handler.java create mode 100644 SHiNE-server/shine-server-net-protocol/src/main/java/server/logic/ws_protocol/JSON/handlers/blockchain/entyties/Net_GetBlockchainBlock_Request.java create mode 100644 SHiNE-server/shine-server-net-protocol/src/main/java/server/logic/ws_protocol/JSON/handlers/blockchain/entyties/Net_GetBlockchainBlock_Response.java create mode 100644 SHiNE-server/shine-server-net-protocol/src/main/java/server/sync/RemoteBlockchainSyncClient.java create mode 100644 SHiNE-server/src/main/java/server/sync/PeriodicBlockchainSyncService.java diff --git a/Dev_Docs/API/04_Add_Block_to_Blockchain_API.md b/Dev_Docs/API/04_Add_Block_to_Blockchain_API.md index cc3ae4c..f914822 100644 --- a/Dev_Docs/API/04_Add_Block_to_Blockchain_API.md +++ b/Dev_Docs/API/04_Add_Block_to_Blockchain_API.md @@ -1,6 +1,11 @@ -# API для разработчиков: 04 — Добавление блока в блокчейн (AddBlock) +# API для разработчиков: 04 — Запись и чтение блока блокчейна -Документ описывает **текущий рабочий формат** сетевого вызова `AddBlock`, который используется для записи **любого** блока в блокчейн пользователя. +Документ описывает **текущий рабочий формат** сетевых вызовов: + +- `AddBlock` — запись любого блока в блокчейн пользователя; +- `GetBlockchainBlock` — публичное чтение одного конкретного блока по имени цепочки и номеру. + +`GetBlockchainBlock` нужен в том числе для межсерверной синхронизации и для открытого чтения публичного блокчейна по одному блоку. > Важный принцип: на уровне JSON API сейчас есть **один универсальный метод** записи — `AddBlock`. > Конкретный смысл записи задаётся типом самого бинарного блока (`type/subType/version` в заголовке блока). @@ -174,3 +179,60 @@ - сейчас нет серверной ACL-политики чтения параметров (в MVP их может читать любой клиент, который знает `login`); - нет валидации формата значений для конкретных ключей (телефон, URL и т.д. проверяются только на стороне клиента); - нет отдельного индекса/поиска по этим полям — только точечное чтение и listing по `login`. + +--- + +## 9. `GetBlockchainBlock` + +### Назначение + +Публичное чтение одного конкретного блока из цепочки. + +Нужно для: + +- открытого чтения блокчейна по одному блоку; +- межсерверной синхронизации; +- восстановления/докачки отсутствующего хвоста цепочки. + +### JSON формат запроса + +`op = "GetBlockchainBlock"`. + +```json +{ + "op": "GetBlockchainBlock", + "requestId": "req-2001", + "payload": { + "blockchainName": "alice-001", + "blockNumber": 12 + } +} +``` + +Поля `payload`: + +- `blockchainName` — обязательно, формат `login-NNN`. +- `blockNumber` — обязательно, номер блока в цепочке, `>= 0`. + +### Успешный ответ + +```json +{ + "op": "GetBlockchainBlock", + "requestId": "req-2001", + "status": 200, + "ok": true, + "payload": { + "blockchainName": "alice-001", + "blockNumber": 12, + "blockHash": "9f0eaabbccddeeff00112233445566778899aabbccddeeff0011223344556677", + "blockBytesB64": "AAAB..." + } +} +``` + +### Ошибки + +- `400 / BAD_FIELDS` — некорректные `blockchainName` или `blockNumber`. +- `404 / BLOCK_NOT_FOUND` — такого блока нет. +- `500 / INTERNAL_ERROR` — внутренняя ошибка сервера. diff --git a/Dev_Docs/API/09_Operations_Index.md b/Dev_Docs/API/09_Operations_Index.md index 8c17ff3..dc1fd25 100644 --- a/Dev_Docs/API/09_Operations_Index.md +++ b/Dev_Docs/API/09_Operations_Index.md @@ -32,6 +32,7 @@ | `ListSessions` | `03_Session_Management_API.md` | список активных сессий | | `CloseActiveSession` | `03_Session_Management_API.md` | закрытие активной сессии | | `AddBlock` | `04_Add_Block_to_Blockchain_API.md` | добавление блока в блокчейн | +| `GetBlockchainBlock` | `04_Add_Block_to_Blockchain_API.md` | чтение одного блока блокчейна | | `Ping` | `05_Technical_Requests_API.md` | keep-alive | | `GetServerInfo` | `05_Technical_Requests_API.md` | публичная информация о сервере | | `ListBlockchainHeads` | `05_Technical_Requests_API.md` | список heads всех локальных блокчейнов | diff --git a/Dev_Docs/Pending_Features/2026-06-25_1200_periodic_blockchain_sync.md b/Dev_Docs/Pending_Features/2026-06-25_1200_periodic_blockchain_sync.md new file mode 100644 index 0000000..8ef4241 --- /dev/null +++ b/Dev_Docs/Pending_Features/2026-06-25_1200_periodic_blockchain_sync.md @@ -0,0 +1,23 @@ +# Периодическая межсерверная синхронизация блокчейнов + +- Краткое описание: + - Добавлен публичный `GetBlockchainBlock` для чтения одного блока. + - Добавлен плановый sync блокчейнов при старте сервера и затем каждые `12` часов. + - Синхронизация пока умеет только докачивать отсутствующий хвост цепочки. + - Случай рассинхрона цепочек пока не исправляется автоматически: он только логируется как не реализованный сценарий. + +- Что именно проверять: + - После старта сервера в логах появляется запуск периодического sync. + - Сервер может запросить у партнёра `ListBlockchainHeads`. + - Сервер может запросить у партнёра `GetBlockchainBlock` и локально применить блок через существующий `AddBlock`. + - На чистом тестовом сервере после удаления БД и файлов блокчейнов сервер сам подтягивает блоки при старте. + - После первичного старта новые блоки продолжают догоняться без ручного вмешательства. + - При рассинхроне цепочек в логах появляется явное сообщение, что reconciliation пока не реализован. + +- Ожидаемый результат: + - Чистый сервер после старта сам восстанавливает локальные цепочки от партнёра синхронизации. + - Периодический sync не мешает обычной работе сервера и не ломает локальный `AddBlock`. + - Нереализованный случай рассинхрона не приводит к падению сервера и явно отражается в логах. + +- Статус: + - `pending` diff --git a/SHiNE-server/shine-server-net-protocol/src/main/java/server/logic/ws_protocol/JSON/JsonHandlerRegistry.java b/SHiNE-server/shine-server-net-protocol/src/main/java/server/logic/ws_protocol/JSON/JsonHandlerRegistry.java index 28133fe..70b870e 100644 --- a/SHiNE-server/shine-server-net-protocol/src/main/java/server/logic/ws_protocol/JSON/JsonHandlerRegistry.java +++ b/SHiNE-server/shine-server-net-protocol/src/main/java/server/logic/ws_protocol/JSON/JsonHandlerRegistry.java @@ -39,7 +39,9 @@ import server.logic.ws_protocol.JSON.handlers.auth.entyties.Net_StartEspPairing_ import server.logic.ws_protocol.JSON.handlers.auth.entyties.Net_UpsertEspPairingSettings_Request; import server.logic.ws_protocol.JSON.handlers.blockchain.Net_AddBlock_Handler; +import server.logic.ws_protocol.JSON.handlers.blockchain.Net_GetBlockchainBlock_Handler; import server.logic.ws_protocol.JSON.handlers.blockchain.entyties.Net_AddBlock_Request; +import server.logic.ws_protocol.JSON.handlers.blockchain.entyties.Net_GetBlockchainBlock_Request; import server.logic.ws_protocol.JSON.handlers.tempToTest.Net_AddUser_Handler; import server.logic.ws_protocol.JSON.handlers.tempToTest.entyties.Net_AddUser_Request; @@ -163,6 +165,7 @@ public final class JsonHandlerRegistry { // --- blockchain --- Map.entry("AddBlock", new Net_AddBlock_Handler()), + Map.entry("GetBlockchainBlock", new Net_GetBlockchainBlock_Handler()), // --- userParams --- Map.entry("UpsertUserParam", new Net_UpsertUserParam_Handler()), @@ -239,6 +242,7 @@ public final class JsonHandlerRegistry { // --- blockchain --- Map.entry("AddBlock", Net_AddBlock_Request.class), + Map.entry("GetBlockchainBlock", Net_GetBlockchainBlock_Request.class), // --- userParams --- Map.entry("UpsertUserParam", Net_UpsertUserParam_Request.class), diff --git a/SHiNE-server/shine-server-net-protocol/src/main/java/server/logic/ws_protocol/JSON/handlers/blockchain/Net_GetBlockchainBlock_Handler.java b/SHiNE-server/shine-server-net-protocol/src/main/java/server/logic/ws_protocol/JSON/handlers/blockchain/Net_GetBlockchainBlock_Handler.java new file mode 100644 index 0000000..05bee04 --- /dev/null +++ b/SHiNE-server/shine-server-net-protocol/src/main/java/server/logic/ws_protocol/JSON/handlers/blockchain/Net_GetBlockchainBlock_Handler.java @@ -0,0 +1,80 @@ +package server.logic.ws_protocol.JSON.handlers.blockchain; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import server.logic.ws_protocol.Base64Ws; +import server.logic.ws_protocol.JSON.ConnectionContext; +import server.logic.ws_protocol.JSON.entyties.Net_Request; +import server.logic.ws_protocol.JSON.entyties.Net_Response; +import server.logic.ws_protocol.JSON.handlers.JsonMessageHandler; +import server.logic.ws_protocol.JSON.handlers.blockchain.entyties.Net_GetBlockchainBlock_Request; +import server.logic.ws_protocol.JSON.handlers.blockchain.entyties.Net_GetBlockchainBlock_Response; +import server.logic.ws_protocol.JSON.utils.NetExceptionResponseFactory; +import server.logic.ws_protocol.WireCodes; +import shine.db.dao.BlocksDAO; +import shine.db.entities.BlockEntry; + +/** + * GetBlockchainBlock — публичное чтение одного конкретного блока по имени цепочки и номеру. + */ +public final class Net_GetBlockchainBlock_Handler implements JsonMessageHandler { + + private static final Logger log = LoggerFactory.getLogger(Net_GetBlockchainBlock_Handler.class); + private final BlocksDAO blocksDAO = BlocksDAO.getInstance(); + + @Override + public Net_Response handle(Net_Request baseRequest, ConnectionContext ctx) { + Net_GetBlockchainBlock_Request req = (Net_GetBlockchainBlock_Request) baseRequest; + + String blockchainName = req.getBlockchainName() == null ? "" : req.getBlockchainName().trim(); + if (blockchainName.isEmpty() || req.getBlockNumber() < 0) { + return NetExceptionResponseFactory.error( + req, + WireCodes.Status.BAD_REQUEST, + "BAD_FIELDS", + "Некорректные поля: blockchainName, blockNumber" + ); + } + + try { + BlockEntry block = blocksDAO.getByNumber(blockchainName, req.getBlockNumber()); + if (block == null || block.getBlockBytes() == null) { + return NetExceptionResponseFactory.error( + req, + WireCodes.Status.NOT_FOUND, + "BLOCK_NOT_FOUND", + "Блок не найден" + ); + } + + Net_GetBlockchainBlock_Response resp = new Net_GetBlockchainBlock_Response(); + resp.setOp(req.getOp()); + resp.setRequestId(req.getRequestId()); + resp.setStatus(WireCodes.Status.OK); + resp.setBlockchainName(block.getBchName()); + resp.setBlockNumber(block.getBlockNumber()); + resp.setBlockHash(toHex(block.getBlockHash())); + resp.setBlockBytesB64(Base64Ws.encode(block.getBlockBytes())); + return resp; + } catch (Exception e) { + log.error("❌ Internal error GetBlockchainBlock blockchainName={} blockNumber={}", + blockchainName, req.getBlockNumber(), e); + return NetExceptionResponseFactory.error( + req, + WireCodes.Status.INTERNAL_ERROR, + "INTERNAL_ERROR", + NetExceptionResponseFactory.detailedMessage("Внутренняя ошибка сервера при GetBlockchainBlock", e) + ); + } + } + + private static String toHex(byte[] bytes) { + if (bytes == null) return ""; + StringBuilder sb = new StringBuilder(bytes.length * 2); + for (byte b : bytes) { + sb.append(Character.forDigit((b >>> 4) & 0xF, 16)); + sb.append(Character.forDigit(b & 0xF, 16)); + } + return sb.toString(); + } +} diff --git a/SHiNE-server/shine-server-net-protocol/src/main/java/server/logic/ws_protocol/JSON/handlers/blockchain/entyties/Net_GetBlockchainBlock_Request.java b/SHiNE-server/shine-server-net-protocol/src/main/java/server/logic/ws_protocol/JSON/handlers/blockchain/entyties/Net_GetBlockchainBlock_Request.java new file mode 100644 index 0000000..d2aa26c --- /dev/null +++ b/SHiNE-server/shine-server-net-protocol/src/main/java/server/logic/ws_protocol/JSON/handlers/blockchain/entyties/Net_GetBlockchainBlock_Request.java @@ -0,0 +1,15 @@ +package server.logic.ws_protocol.JSON.handlers.blockchain.entyties; + +import server.logic.ws_protocol.JSON.entyties.Net_Request; + +public final class Net_GetBlockchainBlock_Request extends Net_Request { + + private String blockchainName; + private int blockNumber; + + public String getBlockchainName() { return blockchainName; } + public void setBlockchainName(String blockchainName) { this.blockchainName = blockchainName; } + + public int getBlockNumber() { return blockNumber; } + public void setBlockNumber(int blockNumber) { this.blockNumber = blockNumber; } +} diff --git a/SHiNE-server/shine-server-net-protocol/src/main/java/server/logic/ws_protocol/JSON/handlers/blockchain/entyties/Net_GetBlockchainBlock_Response.java b/SHiNE-server/shine-server-net-protocol/src/main/java/server/logic/ws_protocol/JSON/handlers/blockchain/entyties/Net_GetBlockchainBlock_Response.java new file mode 100644 index 0000000..26036a8 --- /dev/null +++ b/SHiNE-server/shine-server-net-protocol/src/main/java/server/logic/ws_protocol/JSON/handlers/blockchain/entyties/Net_GetBlockchainBlock_Response.java @@ -0,0 +1,23 @@ +package server.logic.ws_protocol.JSON.handlers.blockchain.entyties; + +import server.logic.ws_protocol.JSON.entyties.Net_Response; + +public final class Net_GetBlockchainBlock_Response extends Net_Response { + + private String blockchainName; + private int blockNumber; + private String blockHash; + private String blockBytesB64; + + public String getBlockchainName() { return blockchainName; } + public void setBlockchainName(String blockchainName) { this.blockchainName = blockchainName; } + + public int getBlockNumber() { return blockNumber; } + public void setBlockNumber(int blockNumber) { this.blockNumber = blockNumber; } + + public String getBlockHash() { return blockHash; } + public void setBlockHash(String blockHash) { this.blockHash = blockHash; } + + public String getBlockBytesB64() { return blockBytesB64; } + public void setBlockBytesB64(String blockBytesB64) { this.blockBytesB64 = blockBytesB64; } +} diff --git a/SHiNE-server/shine-server-net-protocol/src/main/java/server/sync/RemoteBlockchainSyncClient.java b/SHiNE-server/shine-server-net-protocol/src/main/java/server/sync/RemoteBlockchainSyncClient.java new file mode 100644 index 0000000..f987a69 --- /dev/null +++ b/SHiNE-server/shine-server-net-protocol/src/main/java/server/sync/RemoteBlockchainSyncClient.java @@ -0,0 +1,228 @@ +package server.sync; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.WebSocket; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Минимальный клиент для межсерверных JSON-op запросов по WSS. + */ +public final class RemoteBlockchainSyncClient { + + private static final Logger log = LoggerFactory.getLogger(RemoteBlockchainSyncClient.class); + private static final ObjectMapper MAPPER = new ObjectMapper(); + private static final HttpClient HTTP = HttpClient.newBuilder() + .connectTimeout(Duration.ofSeconds(6)) + .build(); + + public List listBlockchainHeads(String serverAddressRaw) throws Exception { + JsonNode response = send(serverAddressRaw, """ + { + "op":"ListBlockchainHeads", + "requestId":%s, + "payload":{} + } + """); + + int status = response.path("status").asInt(500); + if (status < 200 || status >= 300) { + throw new IllegalStateException("ListBlockchainHeads failed: status=" + status + " code=" + errorCode(response)); + } + + List result = new ArrayList<>(); + JsonNode items = response.path("payload").path("blockchains"); + if (!items.isArray()) { + return result; + } + for (JsonNode item : items) { + result.add(new RemoteBlockchainHead( + item.path("blockchainName").asText(""), + item.path("lastBlockNumber").asInt(-1), + item.path("lastBlockHash").asText(""), + item.path("fileSizeBytes").asLong(0L) + )); + } + return result; + } + + public RemoteBlockchainBlock getBlockchainBlock(String serverAddressRaw, String blockchainName, int blockNumber) throws Exception { + String safeBlockchainName = MAPPER.writeValueAsString(blockchainName); + JsonNode response = send(serverAddressRaw, """ + { + "op":"GetBlockchainBlock", + "requestId":%s, + "payload":{ + "blockchainName":%s, + "blockNumber":%d + } + } + """.formatted("%s", safeBlockchainName, blockNumber)); + + int status = response.path("status").asInt(500); + if (status == 404) { + return null; + } + if (status < 200 || status >= 300) { + throw new IllegalStateException("GetBlockchainBlock failed: status=" + status + " code=" + errorCode(response)); + } + + JsonNode payload = response.path("payload"); + return new RemoteBlockchainBlock( + payload.path("blockchainName").asText(blockchainName), + payload.path("blockNumber").asInt(blockNumber), + payload.path("blockHash").asText(""), + payload.path("blockBytesB64").asText("") + ); + } + + private JsonNode send(String serverAddressRaw, String jsonTemplate) throws Exception { + String requestId = MAPPER.writeValueAsString("sync-" + UUID.randomUUID()); + String json = jsonTemplate.formatted(requestId); + String wsUrl = buildWsUrl(serverAddressRaw); + if (wsUrl == null) { + throw new IllegalArgumentException("Invalid server address: " + serverAddressRaw); + } + + CompletableFuture responseFuture = new CompletableFuture<>(); + CountDownLatch openLatch = new CountDownLatch(1); + SyncWsListener listener = new SyncWsListener(responseFuture, openLatch); + + WebSocket webSocket = HTTP.newWebSocketBuilder() + .connectTimeout(Duration.ofSeconds(6)) + .buildAsync(URI.create(wsUrl), listener) + .get(8, TimeUnit.SECONDS); + + if (!openLatch.await(8, TimeUnit.SECONDS)) { + tryAbort(webSocket); + throw new TimeoutException("WS open timeout"); + } + + webSocket.sendText(json, true).get(8, TimeUnit.SECONDS); + String responseJson = responseFuture.get(12, TimeUnit.SECONDS); + tryAbort(webSocket); + return MAPPER.readTree(responseJson); + } + + private static String errorCode(JsonNode response) { + String code = response.path("code").asText(""); + if (!code.isBlank()) return code; + return response.path("error").asText(""); + } + + static String buildWsUrl(String serverAddressRaw) { + String host = normalizeHostLike(serverAddressRaw); + if (host == null) return null; + return "wss://" + host + "/ws"; + } + + private static String normalizeHostLike(String value) { + if (value == null) return null; + String raw = value.trim(); + if (raw.isEmpty()) return null; + try { + String withScheme = raw.matches("^[a-zA-Z]+://.*$") ? raw : "https://" + raw; + URI uri = URI.create(withScheme); + String host = uri.getHost(); + if (host == null || host.isBlank()) return null; + return host.trim().toLowerCase(Locale.ROOT); + } catch (Exception e) { + String cleaned = raw + .replaceFirst("^[a-zA-Z]+://", "") + .replaceFirst("/.*$", "") + .trim() + .toLowerCase(Locale.ROOT); + return cleaned.isEmpty() ? null : cleaned; + } + } + + private static void tryAbort(WebSocket webSocket) { + try { + webSocket.sendClose(WebSocket.NORMAL_CLOSURE, "ok"); + } catch (Exception ignored) { + } + try { + webSocket.abort(); + } catch (Exception ignored) { + } + } + + public record RemoteBlockchainHead( + String blockchainName, + int lastBlockNumber, + String lastBlockHash, + long fileSizeBytes + ) {} + + public record RemoteBlockchainBlock( + String blockchainName, + int blockNumber, + String blockHash, + String blockBytesB64 + ) {} + + private static final class SyncWsListener implements WebSocket.Listener { + private final CompletableFuture responseFuture; + private final CountDownLatch openLatch; + private final StringBuilder textBuffer = new StringBuilder(); + + private SyncWsListener(CompletableFuture responseFuture, CountDownLatch openLatch) { + this.responseFuture = responseFuture; + this.openLatch = openLatch; + } + + @Override + public void onOpen(WebSocket webSocket) { + openLatch.countDown(); + webSocket.request(1); + } + + @Override + public CompletionStage onText(WebSocket webSocket, CharSequence data, boolean last) { + textBuffer.append(data); + if (last && !responseFuture.isDone()) { + responseFuture.complete(textBuffer.toString()); + } + webSocket.request(1); + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletionStage onBinary(WebSocket webSocket, ByteBuffer data, boolean last) { + webSocket.request(1); + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletionStage onClose(WebSocket webSocket, int statusCode, String reason) { + if (!responseFuture.isDone()) { + responseFuture.completeExceptionally(new IllegalStateException("WS closed before response: " + statusCode + " " + reason)); + } + return CompletableFuture.completedFuture(null); + } + + @Override + public void onError(WebSocket webSocket, Throwable error) { + log.warn("Remote sync websocket error: {}", String.valueOf(error)); + if (!responseFuture.isDone()) { + responseFuture.completeExceptionally(error); + } + openLatch.countDown(); + } + } +} diff --git a/SHiNE-server/src/main/java/server/sync/PeriodicBlockchainSyncService.java b/SHiNE-server/src/main/java/server/sync/PeriodicBlockchainSyncService.java new file mode 100644 index 0000000..ffcbc6a --- /dev/null +++ b/SHiNE-server/src/main/java/server/sync/PeriodicBlockchainSyncService.java @@ -0,0 +1,244 @@ +package server.sync; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import server.logic.ws_protocol.JSON.entyties.Net_Exception_Response; +import server.logic.ws_protocol.JSON.entyties.Net_Response; +import server.logic.ws_protocol.JSON.handlers.auth.SolanaUserPdaImportService; +import server.logic.ws_protocol.JSON.handlers.blockchain.Net_AddBlock_Handler; +import server.logic.ws_protocol.JSON.handlers.blockchain.entyties.Net_AddBlock_Request; +import shine.db.dao.BlockchainStateDAO; +import shine.db.dao.SyncServersDAO; +import shine.db.entities.BlockchainStateEntry; +import shine.db.entities.SyncServerEntry; +import utils.blockchain.BlockchainNameUtil; + +import java.util.List; +import java.util.Locale; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Плановый межсерверный sync блокчейнов. + * Сейчас реализует только догоняющую синхронизацию отсутствующего хвоста. + * Случай рассинхрона цепочек пока только логируется и пропускается. + */ +public final class PeriodicBlockchainSyncService { + + private static final Logger log = LoggerFactory.getLogger(PeriodicBlockchainSyncService.class); + private static final long PERIOD_HOURS = 12L; + private static final AtomicBoolean STARTED = new AtomicBoolean(false); + private static final ScheduledExecutorService EXECUTOR = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r, "periodic-blockchain-sync"); + t.setDaemon(true); + return t; + } + }); + + private static final RemoteBlockchainSyncClient REMOTE = new RemoteBlockchainSyncClient(); + private static final Net_AddBlock_Handler ADD_BLOCK_HANDLER = new Net_AddBlock_Handler(); + private static final BlockchainStateDAO STATE_DAO = BlockchainStateDAO.getInstance(); + private static final SyncServersDAO SYNC_SERVERS_DAO = SyncServersDAO.getInstance(); + + private PeriodicBlockchainSyncService() {} + + public static void startOrLog() { + if (!STARTED.compareAndSet(false, true)) { + return; + } + EXECUTOR.scheduleWithFixedDelay( + PeriodicBlockchainSyncService::runCycleSafe, + 0L, + PERIOD_HOURS, + TimeUnit.HOURS + ); + log.info("Periodic blockchain sync scheduled: startup + every {} hours", PERIOD_HOURS); + } + + private static void runCycleSafe() { + try { + runCycle(); + } catch (Exception e) { + log.error("Periodic blockchain sync failed unexpectedly", e); + } + } + + private static void runCycle() throws Exception { + SyncServersBootstrapService.refreshFromSolanaOrLog(); + List partners = SYNC_SERVERS_DAO.listAll(); + if (partners.isEmpty()) { + log.info("Periodic blockchain sync skipped: sync_servers is empty"); + return; + } + + for (SyncServerEntry partner : partners) { + if (partner == null) continue; + try { + syncPartner(partner); + } catch (Exception e) { + log.warn("Periodic blockchain sync partner failed: login={} reason={}", + partner.getLogin(), String.valueOf(e)); + } + } + } + + private static void syncPartner(SyncServerEntry partner) throws Exception { + String partnerLogin = normalize(partner.getLogin()); + if (partnerLogin == null) return; + + List remoteHeads = + REMOTE.listBlockchainHeads(partner.getServerAddress()); + + for (RemoteBlockchainSyncClient.RemoteBlockchainHead remoteHead : remoteHeads) { + if (remoteHead == null || remoteHead.blockchainName() == null || remoteHead.blockchainName().isBlank()) { + continue; + } + + BlockchainStateEntry localState = STATE_DAO.getByBlockchainName(remoteHead.blockchainName()); + if (localState == null) { + syncMissingTail(partner, remoteHead, -1, ""); + continue; + } + + int localLast = localState.getLastBlockNumber(); + String localHash = toHex32(localState.getLastBlockHash()); + + if (localLast < remoteHead.lastBlockNumber()) { + syncMissingTail(partner, remoteHead, localLast, localHash); + continue; + } + + if (localLast == remoteHead.lastBlockNumber()) { + if (localHash.equalsIgnoreCase(remoteHead.lastBlockHash())) { + continue; + } + log.warn("Periodic blockchain sync: divergence detected but not implemented yet. partner={} blockchainName={} localLast={} localHash={} remoteHash={} localSize={} remoteSize={}", + partnerLogin, + remoteHead.blockchainName(), + localLast, + localHash, + remoteHead.lastBlockHash(), + localState.getFileSizeBytes(), + remoteHead.fileSizeBytes()); + continue; + } + + log.info("Periodic blockchain sync skipped: local chain is not weaker. partner={} blockchainName={} localLast={} remoteLast={}", + partnerLogin, remoteHead.blockchainName(), localLast, remoteHead.lastBlockNumber()); + } + } + + private static void syncMissingTail( + SyncServerEntry partner, + RemoteBlockchainSyncClient.RemoteBlockchainHead remoteHead, + int localLast, + String localHash + ) throws Exception { + String partnerLogin = normalize(partner.getLogin()); + if (!ensureLocalChainExists(remoteHead.blockchainName())) { + log.warn("Periodic blockchain sync: cannot prepare local chain. partner={} blockchainName={}", + partnerLogin, remoteHead.blockchainName()); + return; + } + + int fromBlockNumber = Math.max(localLast + 1, 0); + for (int blockNumber = fromBlockNumber; blockNumber <= remoteHead.lastBlockNumber(); blockNumber++) { + RemoteBlockchainSyncClient.RemoteBlockchainBlock remoteBlock = + REMOTE.getBlockchainBlock(partner.getServerAddress(), remoteHead.blockchainName(), blockNumber); + if (remoteBlock == null) { + log.warn("Periodic blockchain sync: remote block not found. partner={} blockchainName={} blockNumber={}", + partnerLogin, remoteHead.blockchainName(), blockNumber); + return; + } + + LocalAddBlockApplyResult result = applyBlockLocally(remoteBlock, blockNumber == 0 ? "" : localHash); + if (!result.ok()) { + if ("bad_prev_hash".equalsIgnoreCase(result.code()) || "bad_block_number".equalsIgnoreCase(result.code())) { + log.warn("Periodic blockchain sync: divergence detected during replay, but reconciliation is not implemented yet. partner={} blockchainName={} blockNumber={} code={}", + partnerLogin, remoteHead.blockchainName(), blockNumber, result.code()); + } else { + log.warn("Periodic blockchain sync: local AddBlock rejected remote block. partner={} blockchainName={} blockNumber={} code={} message={}", + partnerLogin, remoteHead.blockchainName(), blockNumber, result.code(), result.message()); + } + return; + } + + localHash = result.serverLastHash(); + } + + log.info("Periodic blockchain sync ok: partner={} blockchainName={} from={} to={}", + partnerLogin, remoteHead.blockchainName(), fromBlockNumber, remoteHead.lastBlockNumber()); + } + + private static LocalAddBlockApplyResult applyBlockLocally( + RemoteBlockchainSyncClient.RemoteBlockchainBlock remoteBlock, + String prevHash + ) { + Net_AddBlock_Request req = new Net_AddBlock_Request(); + req.setOp("AddBlock"); + req.setRequestId("periodic-sync-local"); + req.setBlockchainName(remoteBlock.blockchainName()); + req.setBlockNumber(remoteBlock.blockNumber()); + req.setPrevBlockHash(prevHash == null ? "" : prevHash); + req.setBlockBytesB64(remoteBlock.blockBytesB64()); + + Net_Response response = ADD_BLOCK_HANDLER.handle(req, null); + if (response.getStatus() >= 200 && response.getStatus() < 300) { + server.logic.ws_protocol.JSON.handlers.blockchain.entyties.Net_AddBlock_Response ok = + (server.logic.ws_protocol.JSON.handlers.blockchain.entyties.Net_AddBlock_Response) response; + return new LocalAddBlockApplyResult(true, "", "", ok.getServerLastGlobalHash()); + } + + Net_Exception_Response error = (Net_Exception_Response) response; + return new LocalAddBlockApplyResult(false, error.getCode(), error.getMessage(), ""); + } + + private static boolean ensureLocalChainExists(String blockchainName) { + try { + if (STATE_DAO.getByBlockchainName(blockchainName) != null) { + return true; + } + String login = BlockchainNameUtil.loginFromBlockchainName(blockchainName); + if (login == null || login.isBlank()) { + return false; + } + SolanaUserPdaImportService.findOrImportByLogin(login); + return STATE_DAO.getByBlockchainName(blockchainName) != null; + } catch (Exception e) { + log.warn("Periodic blockchain sync: failed to ensure local chain exists for blockchainName={} reason={}", + blockchainName, String.valueOf(e)); + return false; + } + } + + private static String normalize(String value) { + if (value == null) return null; + String s = value.trim().toLowerCase(Locale.ROOT); + return s.isEmpty() ? null : s; + } + + private static String toHex32(byte[] bytes32) { + byte[] bytes = bytes32; + if (bytes == null || bytes.length != 32) { + bytes = new byte[32]; + } + StringBuilder sb = new StringBuilder(64); + for (byte b : bytes) { + sb.append(Character.forDigit((b >>> 4) & 0xF, 16)); + sb.append(Character.forDigit(b & 0xF, 16)); + } + return sb.toString(); + } + + private record LocalAddBlockApplyResult( + boolean ok, + String code, + String message, + String serverLastHash + ) {} +} diff --git a/SHiNE-server/src/main/java/server/ws/WsServer.java b/SHiNE-server/src/main/java/server/ws/WsServer.java index 3c5815b..519eea8 100644 --- a/SHiNE-server/src/main/java/server/ws/WsServer.java +++ b/SHiNE-server/src/main/java/server/ws/WsServer.java @@ -6,6 +6,7 @@ import org.eclipse.jetty.websocket.server.config.JettyWebSocketServletContainerI import org.slf4j.Logger; import org.slf4j.LoggerFactory; import server.debug.DebugApiConfigurator; +import server.sync.PeriodicBlockchainSyncService; import server.sync.SyncServersBootstrapService; import utils.config.AppConfig; @@ -56,6 +57,11 @@ public final class WsServer { // ============================================================ SyncServersBootstrapService.refreshFromSolanaOrLog(); + // ============================================================ + // 1.2) Плановый межсерверный sync блокчейнов + // ============================================================ + PeriodicBlockchainSyncService.startOrLog(); + // ============================================================ // 2) Запуск Jetty WS // ============================================================ diff --git a/VERSION.properties b/VERSION.properties index f8866a3..98b7f68 100644 --- a/VERSION.properties +++ b/VERSION.properties @@ -1,2 +1,2 @@ -client.version=1.2.270 -server.version=1.2.250 +client.version=1.2.271 +server.version=1.2.251