package server.logic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import server.logic.ws_protocol.binary.handlers.*; import server.logic.ws_protocol.WireCodes; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.Map; /** * Обработчик входящих сообщение на сервер. * По коду сообщения (первые 4 байта сообщения) находи нужный хэндлер и передаёт в него сообщение * Получает и возвращает ответ от хэндлера */ public final class InboundMessageProcessor { private static final Logger log = LoggerFactory.getLogger(InboundMessageProcessor.class); private static final Map HANDLERS = Map.of( // WireCodes.Op.PING, new PingHandler() // WireCodes.Op.ADD_BLOCK, new AddBlockHandler(), // WireCodes.Op.GET_BLOCKCHAIN,new GetBlockchainHandler() // WireCodes.Op.SEARCH_USERS, new SearchUsersHandler(), // WireCodes.Op.GET_LAST_BLOCK_INFO,new GetLastBlockInfoHandler() ); private InboundMessageProcessor() {} public static byte[] process(byte[] msg) { if (msg == null || msg.length < 4) return intTo4Bytes(WireCodes.Status.BAD_REQUEST); int op = first4ToInt(msg); MessageHandler h = HANDLERS.get(op); if (h == null) { log.warn("Неизвестная операция: {}", op); return intTo4Bytes(WireCodes.Status.BAD_REQUEST); } try { return h.handle(msg); } catch (Exception e) { log.error("Ошибка при обработке операции {}", op, e); return intTo4Bytes(WireCodes.Status.INTERNAL_ERROR); } } private static int first4ToInt(byte[] msg) { return ByteBuffer.wrap(msg, 0, 4) .order(ByteOrder.BIG_ENDIAN) .getInt(); } public static byte[] intTo4Bytes(int code) { return ByteBuffer.allocate(4) .order(ByteOrder.BIG_ENDIAN) .putInt(code) .array(); } } package server.logic.ws_protocol.binary.handlers; /** * Общий интерфейс для всех обработчиков входящих сообщений. */ public interface MessageHandler { /** * Обработать входящее сообщение и вернуть бинарный ответ. */ byte[] handle(byte[] msg); } package server.ws; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import shine.db.dao.BlockchainStateDAO; import shine.db.entities.BlockchainStateEntry; import utils.files.FileStoreUtil; import shine.log.BlockchainAdminNotifier; import java.io.IOException; import java.nio.file.*; import java.sql.SQLException; import java.util.ArrayList; import java.util.List; /** * =============================================================== * BlockchainTmpRecoveryOnStartup — восстановление консистентности * blockchain файлов при старте сервера. * * Сценарий проблемы: * - при добавлении блока сначала пишется .tmp_bch * - потом коммитится БД (state.fileSizeBytes) * - потом tmp переименовывается поверх .bch (атомарно, если возможно) * * Если сервер упал в середине, может остаться tmp: * - tmp есть, а основной .bch остался старым * - tmp есть, а основной .bch уже удалили/заменить не успели * - tmp есть, а БД успела/не успела обновиться * * Этот класс при старте: * - ищет все *.tmp_bch в data/ * - сравнивает размеры: * - tmp * - main (если есть) * - state.fileSizeBytes (если есть) * * Правила: * * A) state есть: * - если stateSize == mainSize => tmp удаляем * - если stateSize == tmpSize => tmp ставим на место main (atomicReplaceBlockchainFile) * - иначе => КРИТИЧЕСКАЯ ОШИБКА: сервер останавливаем + уведомление администратору * * B) state НЕТ: * - если main НЕТ и tmp ЕСТЬ => tmp удаляем (мусор после падения/неуспешной транзакции) * - если main ЕСТЬ и tmp ЕСТЬ => КРИТИЧЕСКАЯ ОШИБКА: уведомление администратору + стоп сервера * * Логирование: * - обо всех восстановленных/удалённых tmp пишем в лог * - если tmp-файлов нет — тоже пишем в лог * =============================================================== */ public final class BlockchainTmpRecoveryOnStartup { private static final Logger log = LoggerFactory.getLogger(BlockchainTmpRecoveryOnStartup.class); private BlockchainTmpRecoveryOnStartup() {} /** * Запуск восстановления. * Если обнаружена ситуация, когда размеры не совпали и сервер сам не может чинить — бросаем исключение. */ public static void runRecoveryOrThrow() { FileStoreUtil fs = FileStoreUtil.getInstance(); BlockchainStateDAO stateDAO = BlockchainStateDAO.getInstance(); Path dataDir = Paths.get(FileStoreUtil.DATA_DIR_NAME); ensureDirExists(dataDir); List tmpFiles = listTmpFiles(dataDir); if (tmpFiles.isEmpty()) { log.info("🟢 BlockchainTmpRecovery: временных *.tmp_bch файлов не найдено — восстановление не требуется."); return; } log.warn("🟡 BlockchainTmpRecovery: найдено временных файлов: {}", tmpFiles.size()); for (Path tmpPath : tmpFiles) { String fileName = tmpPath.getFileName().toString(); String blockchainName = extractBlockchainNameFromTmp(fileName); if (blockchainName == null || blockchainName.isBlank()) { // странное имя — не трогаем автоматически, но это уже повод дернуть админа BlockchainAdminNotifier.critical( "НАЙДЕН TMP-ФАЙЛ С НЕОЖИДАННЫМ ИМЕНЕМ: " + fileName + " (не могу определить blockchainName).", null ); throw new IllegalStateException("Bad tmp file name: " + fileName); } Path mainPath = dataDir.resolve(fs.buildBlockchainFileName(blockchainName)); long tmpSize = safeSize(tmpPath); boolean mainExists = Files.exists(mainPath); long mainSize = mainExists ? safeSize(mainPath) : -1L; BlockchainStateEntry st = null; try { st = stateDAO.getByBlockchainName(blockchainName); } catch (SQLException e) { BlockchainAdminNotifier.critical( "ОШИБКА БД ПРИ ВОССТАНОВЛЕНИИ TMP: blockchainName=" + blockchainName + " (сервер остановлен).", e ); throw new IllegalStateException("DB error during tmp recovery for " + blockchainName, e); } // ============================================================ // CASE B) state НЕТ // ============================================================ if (st == null) { if (!mainExists) { // НЕТ state, НЕТ main, есть tmp => удаляем tmp log.warn("🟠 BlockchainTmpRecovery: state отсутствует и main отсутствует, но tmp найден => удаляем tmp. blockchainName={}, tmpSize={}", blockchainName, tmpSize); safeDelete(tmpPath); continue; } // НЕТ state, но main есть и tmp есть => это уже подозрительно BlockchainAdminNotifier.critical( "НЕСОГЛАСОВАННОСТЬ: ЕСТЬ main И tmp, НО НЕТ state В БД. " + "blockchainName=" + blockchainName + ", mainSize=" + mainSize + ", tmpSize=" + tmpSize + ". СЕРВЕР ОСТАНОВЛЕН. " + "ПОДОЗРЕНИЕ: файлы могли быть изменены вне сервера.", null ); throw new IllegalStateException("State missing but both main and tmp exist for " + blockchainName); } // ============================================================ // CASE A) state ЕСТЬ // ============================================================ long stateSize = st.getFileSizeBytes(); // 1) stateSize == mainSize => tmp мусор if (mainExists && mainSize == stateSize) { log.info("🟢 BlockchainTmpRecovery: stateSize совпадает с main => tmp удаляем. blockchainName={}, stateSize={}, mainSize={}, tmpSize={}", blockchainName, stateSize, mainSize, tmpSize); safeDelete(tmpPath); continue; } // 2) stateSize == tmpSize => tmp это актуальная версия, ставим на место main if (tmpSize == stateSize) { log.warn("🟡 BlockchainTmpRecovery: stateSize совпадает с tmp => восстанавливаем main из tmp. blockchainName={}, stateSize={}, mainSize={}, tmpSize={}", blockchainName, stateSize, mainSize, tmpSize); try { // метод уже есть и делает move tmp->main с попыткой ATOMIC_MOVE fs.atomicReplaceBlockchainFile(blockchainName); // после move tmp должен исчезнуть сам (перемещён) log.info("✅ BlockchainTmpRecovery: восстановление выполнено. blockchainName={}, newMainSize={}", blockchainName, safeSize(mainPath)); } catch (Exception e) { BlockchainAdminNotifier.critical( "НЕ УДАЛОСЬ ВОССТАНОВИТЬ main ИЗ tmp (move failed). " + "blockchainName=" + blockchainName + ", stateSize=" + stateSize + ", mainSize=" + mainSize + ", tmpSize=" + tmpSize + ". СЕРВЕР ОСТАНОВЛЕН.", e ); throw new IllegalStateException("Cannot replace main from tmp for " + blockchainName, e); } continue; } // 3) НИЧЕГО НЕ СОВПАЛО => критическая ситуация BlockchainAdminNotifier.critical( "ФАТАЛЬНАЯ НЕСОГЛАСОВАННОСТЬ BLOCKCHAIN ФАЙЛОВ. " + "blockchainName=" + blockchainName + ", stateSize=" + stateSize + ", mainExists=" + mainExists + ", mainSize=" + mainSize + ", tmpSize=" + tmpSize + ". СЕРВЕР ОСТАНОВЛЕН. " + "ТУТ НУЖНО УВЕДОМЛЕНИЕ АДМИНИСТРАТОРУ: возможно файлы изменены вручную/другой программой.", null ); throw new IllegalStateException("Blockchain files mismatch for " + blockchainName); } log.info("✅ BlockchainTmpRecovery: обработка tmp-файлов завершена."); } /* ===================================================================== */ /* =============================== Helpers ============================== */ /* ===================================================================== */ private static void ensureDirExists(Path dir) { try { if (!Files.exists(dir)) { Files.createDirectories(dir); } } catch (IOException e) { throw new IllegalStateException("Cannot create data dir: " + dir, e); } } private static List listTmpFiles(Path dataDir) { List out = new ArrayList<>(); try (DirectoryStream ds = Files.newDirectoryStream(dataDir, "*" + FileStoreUtil.BLOCKCHAIN_TMP_EXTENSION)) { for (Path p : ds) { if (Files.isRegularFile(p)) out.add(p); } } catch (IOException e) { throw new IllegalStateException("Cannot list tmp files in: " + dataDir, e); } return out; } /** * Из "anya0001.tmp_bch" -> "anya0001" */ private static String extractBlockchainNameFromTmp(String tmpFileName) { if (tmpFileName == null) return null; if (!tmpFileName.endsWith(FileStoreUtil.BLOCKCHAIN_TMP_EXTENSION)) return null; String base = tmpFileName.substring(0, tmpFileName.length() - FileStoreUtil.BLOCKCHAIN_TMP_EXTENSION.length()); // базовая защита: не допускаем слэши/.. даже если кто-то подложил файл if (base.isBlank()) return null; if (base.contains("/") || base.contains("\\") || base.contains("..")) return null; return base; } private static long safeSize(Path p) { try { return Files.size(p); } catch (IOException e) { throw new IllegalStateException("Cannot read file size: " + p, e); } } private static void safeDelete(Path p) { try { Files.deleteIfExists(p); } catch (IOException e) { throw new IllegalStateException("Cannot delete file: " + p, e); } } } package server.ws; import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.WriteCallback; import org.eclipse.jetty.websocket.api.annotations.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import server.logic.InboundMessageProcessor; import server.logic.ws_protocol.JSON.ActiveConnectionsRegistry; import server.logic.ws_protocol.JSON.ConnectionContext; import server.logic.ws_protocol.JSON.JsonInboundProcessor; import java.nio.ByteBuffer; import java.util.concurrent.CompletableFuture; @WebSocket public class BlockchainWsEndpoint { private static final Logger log = LoggerFactory.getLogger(BlockchainWsEndpoint.class); private Session session; /** Контекст для текущего WebSocket-соединения. */ private final ConnectionContext connectionContext = new ConnectionContext(); @OnWebSocketConnect public void onConnect(Session session) { this.session = session; // Привязываем WebSocket-сессию к ConnectionContext connectionContext.setWsSession(session); log.info("WS connected: {}", session.getRemoteAddress()); } @OnWebSocketMessage public void onBinary(byte[] payload, int offset, int length) { byte[] msg = new byte[length]; System.arraycopy(payload, offset, msg, 0, length); // Асинхронно обрабатываем входящее бинарное сообщение CompletableFuture .supplyAsync(() -> InboundMessageProcessor.process(msg)) .thenAccept(resp -> { if (resp != null && session != null && session.isOpen()) { session.getRemote().sendBytes(ByteBuffer.wrap(resp), new WriteCallback() { @Override public void writeFailed(Throwable x) { log.warn("Failed to send response", x); } @Override public void writeSuccess() { log.debug("Response sent successfully"); } }); } }) .exceptionally(ex -> { log.error("Processing failed", ex); trySendCode(500); return null; }); } private void trySendCode(int code) { if (session != null && session.isOpen()) { byte[] resp = InboundMessageProcessor.intTo4Bytes(code); session.getRemote().sendBytes(ByteBuffer.wrap(resp), new WriteCallback() { @Override public void writeFailed(Throwable x) { log.warn("Failed to send error code", x); } @Override public void writeSuccess() { log.debug("Error code {} sent", code); } }); } } @OnWebSocketClose public void onClose(int statusCode, String reason) { log.info("WS closed: {} {}", statusCode, reason); // Удаляем это подключение из реестра активных соединений ActiveConnectionsRegistry.getInstance().remove(connectionContext); // На всякий случай очищаем контекст connectionContext.reset(); } @OnWebSocketError public void onError(Throwable cause) { log.error("WS error", cause); } // Обработка текстовых JSON-запросов @OnWebSocketMessage public void onText(String message) { log.info("📥 Получено TEXT-сообщение от клиента: {}", message); CompletableFuture .supplyAsync(() -> JsonInboundProcessor.processJson(message, connectionContext)) .thenAccept(respJson -> { if (respJson != null && session != null && session.isOpen()) { log.info("📤 Отправляем ответ клиенту: {}", respJson); session.getRemote().sendString(respJson, new WriteCallback() { @Override public void writeFailed(Throwable x) { log.warn("⚠️ Не удалось отправить JSON-ответ клиенту: {}", x.toString()); } @Override public void writeSuccess() { log.debug("✔ JSON-ответ успешно отправлен"); } }); } }) .exceptionally(ex -> { log.error("❌ Ошибка при обработке JSON-сообщения", ex); trySendJsonError(); return null; }); } private void trySendJsonError() { if (session != null && session.isOpen()) { String resp = "{\"op\":null,\"requestId\":null,\"status\":500," + "\"payload\":{\"code\":\"INTERNAL_ERROR\",\"message\":\"Ошибка сервера\"}}"; log.info("📤 Отправляем клиенту ошибку JSON: {}", resp); session.getRemote().sendString(resp, new WriteCallback() { @Override public void writeFailed(Throwable x) { log.warn("⚠️ Не удалось отправить JSON-ответ клиенту: {}", x.toString()); } @Override public void writeSuccess() { log.debug("✔ JSON-ошибка успешно отправлена"); } }); } } } package server.ws; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.websocket.server.config.JettyWebSocketServletContainerInitializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import utils.config.AppConfig; import java.time.Duration; /** * WsServer — поднимает Jetty WS на /ws. * * ВАЖНО: * - перед стартом сервера выполняем recovery tmp-блокчейнов. * - если обнаружена несогласованность, которую сервер сам чинить не может — * recovery бросает исключение и сервер не стартует. */ public final class WsServer { private static final Logger log = LoggerFactory.getLogger(WsServer.class); public static void main(String[] args) throws Exception { // ============================================================ // 0) Восстановление консистентности blockchain файлов // ============================================================ try { BlockchainTmpRecoveryOnStartup.runRecoveryOrThrow(); } catch (Exception e) { // Уже должно быть “большое” уведомление через BlockchainAdminNotifier, // но на всякий случай логируем ещё раз. log.error("❌ Сервер НЕ будет запущен: критическая ошибка восстановления blockchain tmp-файлов.", e); throw e; // останавливаем запуск } // ============================================================ // 1) Настройки порта // ============================================================ AppConfig config = AppConfig.getInstance(); int port = 7070; try { String portStr = config.getParam("server.port"); if (portStr != null && !portStr.isBlank()) { port = Integer.parseInt(portStr.trim()); } } catch (Exception e) { log.info("Не удалось прочитать параметр server.port, используем порт по умолчанию {}", port); } // ============================================================ // 2) Запуск Jetty WS // ============================================================ Server server = new Server(port); ServletContextHandler context = new ServletContextHandler(); context.setContextPath("/"); server.setHandler(context); // Инициализация контейнера WebSocket JettyWebSocketServletContainerInitializer.configure(context, (servletContext, wsContainer) -> { // Таймаут простоя соединения (Jetty 11 синтаксис) wsContainer.setIdleTimeout(Duration.ofMinutes(5)); // Маппинг эндпоинта wsContainer.addMapping("/ws", (req, resp) -> new BlockchainWsEndpoint()); }); server.start(); log.info("✅ WS сервер запущен на ws://localhost:{}/ws", port); server.join(); } }