package server.logic;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import server.logic.ws_protocol.binary.handlers.*;
import server.logic.ws_protocol.WireCodes;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Map;

/**
 * Обработчик входящих сообщение на сервер.
 * По коду сообщения (первые 4 байта сообщения) находи нужный хэндлер и передаёт в него сообщение
 * Получает и возвращает ответ от хэндлера
 */
public final class InboundMessageProcessor {
    private static final Logger log = LoggerFactory.getLogger(InboundMessageProcessor.class);

    private static final Map<Integer, MessageHandler> HANDLERS = Map.of(
//            WireCodes.Op.PING,          new PingHandler()
//            WireCodes.Op.ADD_BLOCK,     new AddBlockHandler(),
//            WireCodes.Op.GET_BLOCKCHAIN,new GetBlockchainHandler()
//            WireCodes.Op.SEARCH_USERS,  new SearchUsersHandler(),
//            WireCodes.Op.GET_LAST_BLOCK_INFO,new GetLastBlockInfoHandler()

    );

    private InboundMessageProcessor() {}

    public static byte[] process(byte[] msg) {
        if (msg == null || msg.length < 4)
            return intTo4Bytes(WireCodes.Status.BAD_REQUEST);

        int op = first4ToInt(msg);
        MessageHandler h = HANDLERS.get(op);
        if (h == null) {
            log.warn("Неизвестная операция: {}", op);
            return intTo4Bytes(WireCodes.Status.BAD_REQUEST);
        }

        try {
            return h.handle(msg);
        } catch (Exception e) {
            log.error("Ошибка при обработке операции {}", op, e);
            return intTo4Bytes(WireCodes.Status.INTERNAL_ERROR);
        }
    }

    private static int first4ToInt(byte[] msg) {
        return ByteBuffer.wrap(msg, 0, 4)
                .order(ByteOrder.BIG_ENDIAN)
                .getInt();
    }

    public static byte[] intTo4Bytes(int code) {
        return ByteBuffer.allocate(4)
                .order(ByteOrder.BIG_ENDIAN)
                .putInt(code)
                .array();
    }



}


package server.logic.ws_protocol.binary.handlers;

/**
 * Общий интерфейс для всех обработчиков входящих сообщений.
 */
public interface MessageHandler {
    /**
     * Обработать входящее сообщение и вернуть бинарный ответ.
     */
    byte[] handle(byte[] msg);
}

package server.ws;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import shine.db.dao.BlockchainStateDAO;
import shine.db.entities.BlockchainStateEntry;
import utils.files.FileStoreUtil;
import shine.log.BlockchainAdminNotifier;

import java.io.IOException;
import java.nio.file.*;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;

/**
 * ===============================================================
 * BlockchainTmpRecoveryOnStartup — восстановление консистентности
 * blockchain файлов при старте сервера.
 *
 * Сценарий проблемы:
 *  - при добавлении блока сначала пишется <name>.tmp_bch
 *  - потом коммитится БД (state.fileSizeBytes)
 *  - потом tmp переименовывается поверх <name>.bch (атомарно, если возможно)
 *
 * Если сервер упал в середине, может остаться tmp:
 *  - tmp есть, а основной .bch остался старым
 *  - tmp есть, а основной .bch уже удалили/заменить не успели
 *  - tmp есть, а БД успела/не успела обновиться
 *
 * Этот класс при старте:
 *  - ищет все *.tmp_bch в data/
 *  - сравнивает размеры:
 *      - tmp
 *      - main (если есть)
 *      - state.fileSizeBytes (если есть)
 *
 * Правила:
 *
 * A) state есть:
 *   - если stateSize == mainSize => tmp удаляем
 *   - если stateSize == tmpSize  => tmp ставим на место main (atomicReplaceBlockchainFile)
 *   - иначе => КРИТИЧЕСКАЯ ОШИБКА: сервер останавливаем + уведомление администратору
 *
 * B) state НЕТ:
 *   - если main НЕТ и tmp ЕСТЬ => tmp удаляем (мусор после падения/неуспешной транзакции)
 *   - если main ЕСТЬ и tmp ЕСТЬ => КРИТИЧЕСКАЯ ОШИБКА: уведомление администратору + стоп сервера
 *
 * Логирование:
 *  - обо всех восстановленных/удалённых tmp пишем в лог
 *  - если tmp-файлов нет — тоже пишем в лог
 * ===============================================================
 */
public final class BlockchainTmpRecoveryOnStartup {

    private static final Logger log = LoggerFactory.getLogger(BlockchainTmpRecoveryOnStartup.class);

    private BlockchainTmpRecoveryOnStartup() {}

    /**
     * Запуск восстановления.
     * Если обнаружена ситуация, когда размеры не совпали и сервер сам не может чинить — бросаем исключение.
     */
    public static void runRecoveryOrThrow() {
        FileStoreUtil fs = FileStoreUtil.getInstance();
        BlockchainStateDAO stateDAO = BlockchainStateDAO.getInstance();

        Path dataDir = Paths.get(FileStoreUtil.DATA_DIR_NAME);
        ensureDirExists(dataDir);

        List<Path> tmpFiles = listTmpFiles(dataDir);

        if (tmpFiles.isEmpty()) {
            log.info("🟢 BlockchainTmpRecovery: временных *.tmp_bch файлов не найдено — восстановление не требуется.");
            return;
        }

        log.warn("🟡 BlockchainTmpRecovery: найдено временных файлов: {}", tmpFiles.size());

        for (Path tmpPath : tmpFiles) {
            String fileName = tmpPath.getFileName().toString();
            String blockchainName = extractBlockchainNameFromTmp(fileName);

            if (blockchainName == null || blockchainName.isBlank()) {
                // странное имя — не трогаем автоматически, но это уже повод дернуть админа
                BlockchainAdminNotifier.critical(
                        "НАЙДЕН TMP-ФАЙЛ С НЕОЖИДАННЫМ ИМЕНЕМ: " + fileName + " (не могу определить blockchainName).",
                        null
                );
                throw new IllegalStateException("Bad tmp file name: " + fileName);
            }

            Path mainPath = dataDir.resolve(fs.buildBlockchainFileName(blockchainName));

            long tmpSize = safeSize(tmpPath);
            boolean mainExists = Files.exists(mainPath);
            long mainSize = mainExists ? safeSize(mainPath) : -1L;

            BlockchainStateEntry st = null;
            try {
                st = stateDAO.getByBlockchainName(blockchainName);
            } catch (SQLException e) {
                BlockchainAdminNotifier.critical(
                        "ОШИБКА БД ПРИ ВОССТАНОВЛЕНИИ TMP: blockchainName=" + blockchainName + " (сервер остановлен).",
                        e
                );
                throw new IllegalStateException("DB error during tmp recovery for " + blockchainName, e);
            }

            // ============================================================
            // CASE B) state НЕТ
            // ============================================================
            if (st == null) {

                if (!mainExists) {
                    // НЕТ state, НЕТ main, есть tmp => удаляем tmp
                    log.warn("🟠 BlockchainTmpRecovery: state отсутствует и main отсутствует, но tmp найден => удаляем tmp. blockchainName={}, tmpSize={}",
                            blockchainName, tmpSize);
                    safeDelete(tmpPath);
                    continue;
                }

                // НЕТ state, но main есть и tmp есть => это уже подозрительно
                BlockchainAdminNotifier.critical(
                        "НЕСОГЛАСОВАННОСТЬ: ЕСТЬ main И tmp, НО НЕТ state В БД. " +
                                "blockchainName=" + blockchainName +
                                ", mainSize=" + mainSize +
                                ", tmpSize=" + tmpSize +
                                ". СЕРВЕР ОСТАНОВЛЕН. " +
                                "ПОДОЗРЕНИЕ: файлы могли быть изменены вне сервера.",
                        null
                );
                throw new IllegalStateException("State missing but both main and tmp exist for " + blockchainName);
            }

            // ============================================================
            // CASE A) state ЕСТЬ
            // ============================================================
            long stateSize = st.getFileSizeBytes();

            // 1) stateSize == mainSize => tmp мусор
            if (mainExists && mainSize == stateSize) {
                log.info("🟢 BlockchainTmpRecovery: stateSize совпадает с main => tmp удаляем. blockchainName={}, stateSize={}, mainSize={}, tmpSize={}",
                        blockchainName, stateSize, mainSize, tmpSize);
                safeDelete(tmpPath);
                continue;
            }

            // 2) stateSize == tmpSize => tmp это актуальная версия, ставим на место main
            if (tmpSize == stateSize) {
                log.warn("🟡 BlockchainTmpRecovery: stateSize совпадает с tmp => восстанавливаем main из tmp. blockchainName={}, stateSize={}, mainSize={}, tmpSize={}",
                        blockchainName, stateSize, mainSize, tmpSize);

                try {
                    // метод уже есть и делает move tmp->main с попыткой ATOMIC_MOVE
                    fs.atomicReplaceBlockchainFile(blockchainName);

                    // после move tmp должен исчезнуть сам (перемещён)
                    log.info("✅ BlockchainTmpRecovery: восстановление выполнено. blockchainName={}, newMainSize={}",
                            blockchainName, safeSize(mainPath));

                } catch (Exception e) {
                    BlockchainAdminNotifier.critical(
                            "НЕ УДАЛОСЬ ВОССТАНОВИТЬ main ИЗ tmp (move failed). " +
                                    "blockchainName=" + blockchainName +
                                    ", stateSize=" + stateSize +
                                    ", mainSize=" + mainSize +
                                    ", tmpSize=" + tmpSize +
                                    ". СЕРВЕР ОСТАНОВЛЕН.",
                            e
                    );
                    throw new IllegalStateException("Cannot replace main from tmp for " + blockchainName, e);
                }
                continue;
            }

            // 3) НИЧЕГО НЕ СОВПАЛО => критическая ситуация
            BlockchainAdminNotifier.critical(
                    "ФАТАЛЬНАЯ НЕСОГЛАСОВАННОСТЬ BLOCKCHAIN ФАЙЛОВ. " +
                            "blockchainName=" + blockchainName +
                            ", stateSize=" + stateSize +
                            ", mainExists=" + mainExists +
                            ", mainSize=" + mainSize +
                            ", tmpSize=" + tmpSize +
                            ". СЕРВЕР ОСТАНОВЛЕН. " +
                            "ТУТ НУЖНО УВЕДОМЛЕНИЕ АДМИНИСТРАТОРУ: возможно файлы изменены вручную/другой программой.",
                    null
            );
            throw new IllegalStateException("Blockchain files mismatch for " + blockchainName);
        }

        log.info("✅ BlockchainTmpRecovery: обработка tmp-файлов завершена.");
    }

    /* ===================================================================== */
    /* =============================== Helpers ============================== */
    /* ===================================================================== */

    private static void ensureDirExists(Path dir) {
        try {
            if (!Files.exists(dir)) {
                Files.createDirectories(dir);
            }
        } catch (IOException e) {
            throw new IllegalStateException("Cannot create data dir: " + dir, e);
        }
    }

    private static List<Path> listTmpFiles(Path dataDir) {
        List<Path> out = new ArrayList<>();
        try (DirectoryStream<Path> ds = Files.newDirectoryStream(dataDir, "*" + FileStoreUtil.BLOCKCHAIN_TMP_EXTENSION)) {
            for (Path p : ds) {
                if (Files.isRegularFile(p)) out.add(p);
            }
        } catch (IOException e) {
            throw new IllegalStateException("Cannot list tmp files in: " + dataDir, e);
        }
        return out;
    }

    /**
     * Из "anya0001.tmp_bch" -> "anya0001"
     */
    private static String extractBlockchainNameFromTmp(String tmpFileName) {
        if (tmpFileName == null) return null;
        if (!tmpFileName.endsWith(FileStoreUtil.BLOCKCHAIN_TMP_EXTENSION)) return null;

        String base = tmpFileName.substring(0, tmpFileName.length() - FileStoreUtil.BLOCKCHAIN_TMP_EXTENSION.length());

        // базовая защита: не допускаем слэши/.. даже если кто-то подложил файл
        if (base.isBlank()) return null;
        if (base.contains("/") || base.contains("\\") || base.contains("..")) return null;

        return base;
    }

    private static long safeSize(Path p) {
        try {
            return Files.size(p);
        } catch (IOException e) {
            throw new IllegalStateException("Cannot read file size: " + p, e);
        }
    }

    private static void safeDelete(Path p) {
        try {
            Files.deleteIfExists(p);
        } catch (IOException e) {
            throw new IllegalStateException("Cannot delete file: " + p, e);
        }
    }
}
package server.ws;

import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.annotations.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import server.logic.InboundMessageProcessor;
import server.logic.ws_protocol.JSON.ActiveConnectionsRegistry;
import server.logic.ws_protocol.JSON.ConnectionContext;
import server.logic.ws_protocol.JSON.JsonInboundProcessor;

import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;

@WebSocket
public class BlockchainWsEndpoint {
    private static final Logger log = LoggerFactory.getLogger(BlockchainWsEndpoint.class);

    private Session session;

    /** Контекст для текущего WebSocket-соединения. */
    private final ConnectionContext connectionContext = new ConnectionContext();

    @OnWebSocketConnect
    public void onConnect(Session session) {
        this.session = session;
        // Привязываем WebSocket-сессию к ConnectionContext
        connectionContext.setWsSession(session);
        log.info("WS connected: {}", session.getRemoteAddress());
    }

    @OnWebSocketMessage
    public void onBinary(byte[] payload, int offset, int length) {
        byte[] msg = new byte[length];
        System.arraycopy(payload, offset, msg, 0, length);

        // Асинхронно обрабатываем входящее бинарное сообщение
        CompletableFuture
                .supplyAsync(() -> InboundMessageProcessor.process(msg))
                .thenAccept(resp -> {
                    if (resp != null && session != null && session.isOpen()) {
                        session.getRemote().sendBytes(ByteBuffer.wrap(resp), new WriteCallback() {
                            @Override
                            public void writeFailed(Throwable x) {
                                log.warn("Failed to send response", x);
                            }

                            @Override
                            public void writeSuccess() {
                                log.debug("Response sent successfully");
                            }
                        });
                    }
                })
                .exceptionally(ex -> {
                    log.error("Processing failed", ex);
                    trySendCode(500);
                    return null;
                });
    }

    private void trySendCode(int code) {
        if (session != null && session.isOpen()) {
            byte[] resp = InboundMessageProcessor.intTo4Bytes(code);
            session.getRemote().sendBytes(ByteBuffer.wrap(resp), new WriteCallback() {
                @Override
                public void writeFailed(Throwable x) {
                    log.warn("Failed to send error code", x);
                }

                @Override
                public void writeSuccess() {
                    log.debug("Error code {} sent", code);
                }
            });
        }
    }

    @OnWebSocketClose
    public void onClose(int statusCode, String reason) {
        log.info("WS closed: {} {}", statusCode, reason);
        // Удаляем это подключение из реестра активных соединений
        ActiveConnectionsRegistry.getInstance().remove(connectionContext);
        // На всякий случай очищаем контекст
        connectionContext.reset();
    }

    @OnWebSocketError
    public void onError(Throwable cause) {
        log.error("WS error", cause);
    }

    // Обработка текстовых JSON-запросов
    @OnWebSocketMessage
    public void onText(String message) {
        log.info("📥 Получено TEXT-сообщение от клиента: {}", message);

        CompletableFuture
                .supplyAsync(() -> JsonInboundProcessor.processJson(message, connectionContext))
                .thenAccept(respJson -> {
                    if (respJson != null && session != null && session.isOpen()) {

                        log.info("📤 Отправляем ответ клиенту: {}", respJson);

                        session.getRemote().sendString(respJson, new WriteCallback() {
                            @Override
                            public void writeFailed(Throwable x) {
                                log.warn("⚠️ Не удалось отправить JSON-ответ клиенту: {}", x.toString());
                            }

                            @Override
                            public void writeSuccess() {
                                log.debug("✔ JSON-ответ успешно отправлен");
                            }
                        });
                    }
                })
                .exceptionally(ex -> {
                    log.error("❌ Ошибка при обработке JSON-сообщения", ex);
                    trySendJsonError();
                    return null;
                });
    }

    private void trySendJsonError() {
        if (session != null && session.isOpen()) {
            String resp = "{\"op\":null,\"requestId\":null,\"status\":500,"
                    + "\"payload\":{\"code\":\"INTERNAL_ERROR\",\"message\":\"Ошибка сервера\"}}";

            log.info("📤 Отправляем клиенту ошибку JSON: {}", resp);

            session.getRemote().sendString(resp, new WriteCallback() {
                @Override
                public void writeFailed(Throwable x) {
                    log.warn("⚠️ Не удалось отправить JSON-ответ клиенту: {}", x.toString());
                }

                @Override
                public void writeSuccess() {
                    log.debug("✔ JSON-ошибка успешно отправлена");
                }
            });
        }
    }
}

package server.ws;

import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.websocket.server.config.JettyWebSocketServletContainerInitializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import utils.config.AppConfig;

import java.time.Duration;

/**
 * WsServer — поднимает Jetty WS на /ws.
 *
 * ВАЖНО:
 *  - перед стартом сервера выполняем recovery tmp-блокчейнов.
 *  - если обнаружена несогласованность, которую сервер сам чинить не может —
 *    recovery бросает исключение и сервер не стартует.
 */
public final class WsServer {

    private static final Logger log = LoggerFactory.getLogger(WsServer.class);

    public static void main(String[] args) throws Exception {

        // ============================================================
        // 0) Восстановление консистентности blockchain файлов
        // ============================================================
        try {
            BlockchainTmpRecoveryOnStartup.runRecoveryOrThrow();
        } catch (Exception e) {
            // Уже должно быть “большое” уведомление через BlockchainAdminNotifier,
            // но на всякий случай логируем ещё раз.
            log.error("❌ Сервер НЕ будет запущен: критическая ошибка восстановления blockchain tmp-файлов.", e);
            throw e; // останавливаем запуск
        }

        // ============================================================
        // 1) Настройки порта
        // ============================================================
        AppConfig config = AppConfig.getInstance();
        int port = 7070;
        try {
            String portStr = config.getParam("server.port");
            if (portStr != null && !portStr.isBlank()) {
                port = Integer.parseInt(portStr.trim());
            }
        } catch (Exception e) {
            log.info("Не удалось прочитать параметр server.port, используем порт по умолчанию {}", port);
        }

        // ============================================================
        // 2) Запуск Jetty WS
        // ============================================================
        Server server = new Server(port);

        ServletContextHandler context = new ServletContextHandler();
        context.setContextPath("/");
        server.setHandler(context);

        // Инициализация контейнера WebSocket
        JettyWebSocketServletContainerInitializer.configure(context, (servletContext, wsContainer) -> {
            // Таймаут простоя соединения (Jetty 11 синтаксис)
            wsContainer.setIdleTimeout(Duration.ofMinutes(5));

            // Маппинг эндпоинта
            wsContainer.addMapping("/ws", (req, resp) -> new BlockchainWsEndpoint());
        });

        server.start();
        log.info("✅ WS сервер запущен на ws://localhost:{}/ws", port);
        server.join();
    }
}
