From 4430615117d146e8d0b0d11262ded1c813bc30a7ef7fd689303e74cf37ad81f0 Mon Sep 17 00:00:00 2001 From: AidarKC Date: Fri, 23 Jan 2026 21:52:45 +0300 Subject: [PATCH] 23 01 25 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Промежуточный комит. Ужас как устал сегодня, узнал что и запросы у меня постоянно закрывают сессию. Надо переделать --- .../auth/Net_RefreshSession_Handler.java | 30 --- .../entyties/Net_RefreshSession_Request.java | 36 ---- .../entyties/Net_RefreshSession_Response.java | 23 --- .../java/server/ws/WsConnectionUtils.java | 155 +++++++++++++--- .../server/logic/InboundMessageProcessor.java | 66 ------- .../binary/handlers/MessageHandler.java | 11 -- .../java/server/ws/BlockchainWsEndpoint.java | 171 +++++++++--------- 7 files changed, 217 insertions(+), 275 deletions(-) delete mode 100644 shine-server-net-protocol/src/main/java/server/logic/ws_protocol/JSON/handlers/auth/Net_RefreshSession_Handler.java delete mode 100644 shine-server-net-protocol/src/main/java/server/logic/ws_protocol/JSON/handlers/auth/entyties/Net_RefreshSession_Request.java delete mode 100644 shine-server-net-protocol/src/main/java/server/logic/ws_protocol/JSON/handlers/auth/entyties/Net_RefreshSession_Response.java delete mode 100644 src/main/java/server/logic/InboundMessageProcessor.java delete mode 100644 src/main/java/server/logic/ws_protocol/binary/handlers/MessageHandler.java diff --git a/shine-server-net-protocol/src/main/java/server/logic/ws_protocol/JSON/handlers/auth/Net_RefreshSession_Handler.java b/shine-server-net-protocol/src/main/java/server/logic/ws_protocol/JSON/handlers/auth/Net_RefreshSession_Handler.java deleted file mode 100644 index 2b4c8ab..0000000 --- a/shine-server-net-protocol/src/main/java/server/logic/ws_protocol/JSON/handlers/auth/Net_RefreshSession_Handler.java +++ /dev/null @@ -1,30 +0,0 @@ -//package server.logic.ws_protocol.JSON.handlers.auth; -// -//import server.logic.ws_protocol.JSON.ConnectionContext; -//import server.logic.ws_protocol.JSON.entyties.Net_Request; -//import server.logic.ws_protocol.JSON.entyties.Net_Response; -//import server.logic.ws_protocol.JSON.handlers.JsonMessageHandler; -//import server.logic.ws_protocol.JSON.handlers.auth.entyties.Net_RefreshSession_Request; -//import server.logic.ws_protocol.JSON.utils.NetExceptionResponseFactory; -//import server.logic.ws_protocol.WireCodes; -// -///** -// * RefreshSession (v2) — ОТКЛЮЧЕН. -// * -// * Раньше это был "короткий вход" (1 запрос) по sessionId+sessionPwd. -// * Теперь вход всегда 2 шага: SessionChallenge -> SessionLogin (подпись sessionKey). -// */ -//public class Net_RefreshSession_Handler implements JsonMessageHandler { -// -// @Override -// public Net_Response handle(Net_Request request, ConnectionContext ctx) throws Exception { -// Net_RefreshSession_Request req = (Net_RefreshSession_Request) request; -// -// return NetExceptionResponseFactory.error( -// req, -// WireCodes.Status.GONE, // 410 -// "DISABLED_V2", -// "RefreshSession отключён в v2. Используй SessionChallenge + SessionLogin." -// ); -// } -//} \ No newline at end of file diff --git a/shine-server-net-protocol/src/main/java/server/logic/ws_protocol/JSON/handlers/auth/entyties/Net_RefreshSession_Request.java b/shine-server-net-protocol/src/main/java/server/logic/ws_protocol/JSON/handlers/auth/entyties/Net_RefreshSession_Request.java deleted file mode 100644 index bab1f1d..0000000 --- a/shine-server-net-protocol/src/main/java/server/logic/ws_protocol/JSON/handlers/auth/entyties/Net_RefreshSession_Request.java +++ /dev/null @@ -1,36 +0,0 @@ -//package server.logic.ws_protocol.JSON.handlers.auth.entyties; -// -//import server.logic.ws_protocol.JSON.entyties.Net_Request; -// -///** -// * Запрос RefreshSession. -// * -// * В новой версии (v2) RefreshSession ОТКЛЮЧЕН. -// * Оставлен временно для совместимости, handler вернёт 410 GONE. -// */ -//public class Net_RefreshSession_Request extends Net_Request { -// -// private String sessionId; -// private String sessionPwd; -// private String clientInfo; -// -// public String getSessionId() { -// return sessionId; -// } -// -// public void setSessionId(String sessionId) { -// this.sessionId = sessionId; -// } -// -// public String getSessionPwd() { -// return sessionPwd; -// } -// -// public void setSessionPwd(String sessionPwd) { -// this.sessionPwd = sessionPwd; -// } -// -// public String getClientInfo() { return clientInfo; } -// -// public void setClientInfo(String clientInfo) { this.clientInfo = clientInfo; } -//} \ No newline at end of file diff --git a/shine-server-net-protocol/src/main/java/server/logic/ws_protocol/JSON/handlers/auth/entyties/Net_RefreshSession_Response.java b/shine-server-net-protocol/src/main/java/server/logic/ws_protocol/JSON/handlers/auth/entyties/Net_RefreshSession_Response.java deleted file mode 100644 index ae63df6..0000000 --- a/shine-server-net-protocol/src/main/java/server/logic/ws_protocol/JSON/handlers/auth/entyties/Net_RefreshSession_Response.java +++ /dev/null @@ -1,23 +0,0 @@ -//package server.logic.ws_protocol.JSON.handlers.auth.entyties; -// -//import server.logic.ws_protocol.JSON.entyties.Net_Response; -// -///** -// * Ответ на RefreshSession. -// * -// * В новой версии (v2) RefreshSession ОТКЛЮЧЕН. -// * Этот класс можно оставить временно для совместимости сериализации, -// * но handler будет возвращать 410 GONE. -// */ -//public class Net_RefreshSession_Response extends Net_Response { -// -// private String storagePwd; -// -// public String getStoragePwd() { -// return storagePwd; -// } -// -// public void setStoragePwd(String storagePwd) { -// this.storagePwd = storagePwd; -// } -//} \ No newline at end of file diff --git a/shine-server-net-protocol/src/main/java/server/ws/WsConnectionUtils.java b/shine-server-net-protocol/src/main/java/server/ws/WsConnectionUtils.java index 8fbe79e..0a8ec17 100644 --- a/shine-server-net-protocol/src/main/java/server/ws/WsConnectionUtils.java +++ b/shine-server-net-protocol/src/main/java/server/ws/WsConnectionUtils.java @@ -5,52 +5,151 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import server.logic.ws_protocol.JSON.ActiveConnectionsRegistry; import server.logic.ws_protocol.JSON.ConnectionContext; +import shine.db.entities.SolanaUserEntry; + +import java.net.SocketAddress; +import java.util.concurrent.atomic.AtomicLong; /** * Утилита для работы с WebSocket-подключениями. + * + * Цель этой версии: + * - всегда логировать "кто закрыл" / "что закрывали" / "в каком состоянии был WS"; + * - логировать исключения так, чтобы было видно первопричину; + * - не терять контекст из-за ctx.reset() (сначала снимаем "снимок" полей). */ public final class WsConnectionUtils { private static final Logger log = LoggerFactory.getLogger(WsConnectionUtils.class); + /** Счётчик событий закрытия (удобно коррелировать логи). */ + private static final AtomicLong CLOSE_SEQ = new AtomicLong(0); + private WsConnectionUtils() { // utility } - /** - * Корректно закрывает WebSocket-соединение: - * - удаляет контекст из ActiveConnectionsRegistry; - * - очищает ConnectionContext; - * - закрывает сам WebSocket (если ещё открыт). - * - * @param ctx контекст соединения - * @param statusCode код закрытия WebSocket (например, 1000, 4001) - * @param reason причина закрытия (для логов/клиента) - */ public static void closeConnection(ConnectionContext ctx, int statusCode, String reason) { - if (ctx == null) { - return; + closeConnection(ctx, statusCode, reason, null, "UNKNOWN"); + } + + /** + * Расширенное закрытие с указанием инициатора и причины (Throwable). + * + * @param ctx контекст + * @param statusCode код закрытия + * @param reason причина (пойдёт в close frame + логи) + * @param cause исключение/первопричина (если закрываем из catch) + * @param initiator строка "кто инициировал" (handler/op/requestId/etc.) + */ + public static void closeConnection(ConnectionContext ctx, + int statusCode, + String reason, + Throwable cause, + String initiator) { + if (ctx == null) return; + + final long closeId = CLOSE_SEQ.incrementAndGet(); + + // --- СНИМОК КОНТЕКСТА ДО reset() --- + final Session ws = ctx.getWsSession(); + + final String sessionId = safeString(ctx.getSessionId()); + final int authStatus = safeAuthStatus(ctx); + + final SolanaUserEntry user = ctx.getSolanaUser(); + final String login = (user != null ? safeString(user.getLogin()) : ""); + + final String activeSessionId = + (ctx.getActiveSession() != null ? safeString(ctx.getActiveSession().getSessionId()) : ""); + + final boolean wsPresent = (ws != null); + final boolean wsOpen = (ws != null && safeIsOpen(ws)); + final String wsInfo = formatWsInfo(ws); + + final String threadName = Thread.currentThread().getName(); + final int ctxId = System.identityHashCode(ctx); + + // Логируем "начало закрытия" всегда, чтобы видеть даже случаи "ws уже закрыт" + if (cause != null) { + log.warn("WS_CLOSE#{} BEGIN initiator={} thread={} ctxId={} login={} sessionId={} activeSessionId={} authStatus={} statusCode={} reason={} wsPresent={} wsOpen={} wsInfo={}", + closeId, initiator, threadName, ctxId, login, sessionId, activeSessionId, authStatus, statusCode, reason, wsPresent, wsOpen, wsInfo, cause); + } else { + log.info("WS_CLOSE#{} BEGIN initiator={} thread={} ctxId={} login={} sessionId={} activeSessionId={} authStatus={} statusCode={} reason={} wsPresent={} wsOpen={} wsInfo={}", + closeId, initiator, threadName, ctxId, login, sessionId, activeSessionId, authStatus, statusCode, reason, wsPresent, wsOpen, wsInfo); } - Session ws = ctx.getWsSession(); - + // --- ШАГ 1: убрать из реестра (чтобы новые сообщения не шли в мёртвый контекст) --- try { - // Удаляем контекст из реестра активных соединений ActiveConnectionsRegistry.getInstance().remove(ctx); - - // Чистим контекст - ctx.reset(); - - // Закрываем WebSocket-сессию - if (ws != null && ws.isOpen()) { - try { - ws.close(statusCode, reason); - } catch (Exception e) { - log.warn("Не удалось закрыть WebSocket-сессию (statusCode={}, reason={})", statusCode, reason, e); - } - } + log.debug("WS_CLOSE#{} registry.remove OK ctxId={} sessionId={} login={}", closeId, ctxId, sessionId, login); } catch (Exception e) { - log.warn("Ошибка при закрытии WebSocket-соединения", e); + log.warn("WS_CLOSE#{} registry.remove FAIL ctxId={} sessionId={} login={}", closeId, ctxId, sessionId, login, e); + } + + // --- ШАГ 2: закрыть WS (если открыт) --- + if (ws != null) { + if (safeIsOpen(ws)) { + try { + ws.close(statusCode, safeString(reason)); + log.info("WS_CLOSE#{} ws.close OK ctxId={} sessionId={} login={} statusCode={} reason={}", + closeId, ctxId, sessionId, login, statusCode, reason); + } catch (Exception e) { + log.warn("WS_CLOSE#{} ws.close FAIL ctxId={} sessionId={} login={} statusCode={} reason={} wsInfo={}", + closeId, ctxId, sessionId, login, statusCode, reason, wsInfo, e); + } + } else { + log.info("WS_CLOSE#{} ws already closed ctxId={} sessionId={} login={} wsInfo={}", + closeId, ctxId, sessionId, login, wsInfo); + } + } + + // --- ШАГ 3: очистить контекст (в конце, чтобы не потерять поля в логах выше) --- + try { + ctx.reset(); + log.debug("WS_CLOSE#{} ctx.reset OK ctxId={} (was sessionId={}, login={})", closeId, ctxId, sessionId, login); + } catch (Exception e) { + log.warn("WS_CLOSE#{} ctx.reset FAIL ctxId={} (was sessionId={}, login={})", closeId, ctxId, sessionId, login, e); + } + + log.info("WS_CLOSE#{} END initiator={} ctxId={} sessionId={} login={}", closeId, initiator, ctxId, sessionId, login); + } + + private static String safeString(String s) { + return (s == null ? "" : s); + } + + private static int safeAuthStatus(ConnectionContext ctx) { + try { + return ctx.getAuthenticationStatus(); + } catch (Exception e) { + return -999; } } + + private static boolean safeIsOpen(Session ws) { + try { + return ws.isOpen(); + } catch (Exception e) { + return false; + } + } + + private static String formatWsInfo(Session ws) { + if (ws == null) return "null"; + + String remote = ""; + String local = ""; + try { + SocketAddress ra = ws.getRemoteAddress(); + remote = (ra != null ? ra.toString() : ""); + } catch (Exception ignored) { } + + try { + SocketAddress la = ws.getLocalAddress(); + local = (la != null ? la.toString() : ""); + } catch (Exception ignored) { } + + return "remote=" + remote + ", local=" + local; + } } \ No newline at end of file diff --git a/src/main/java/server/logic/InboundMessageProcessor.java b/src/main/java/server/logic/InboundMessageProcessor.java deleted file mode 100644 index 9899932..0000000 --- a/src/main/java/server/logic/InboundMessageProcessor.java +++ /dev/null @@ -1,66 +0,0 @@ -package server.logic; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import server.logic.ws_protocol.binary.handlers.*; -import server.logic.ws_protocol.WireCodes; - -import java.nio.ByteBuffer; -import java.nio.ByteOrder; -import java.util.Map; - -/** - * Обработчик входящих сообщение на сервер. - * По коду сообщения (первые 4 байта сообщения) находи нужный хэндлер и передаёт в него сообщение - * Получает и возвращает ответ от хэндлера - */ -public final class InboundMessageProcessor { - private static final Logger log = LoggerFactory.getLogger(InboundMessageProcessor.class); - - private static final Map HANDLERS = Map.of( -// WireCodes.Op.PING, new PingHandler() -// WireCodes.Op.ADD_BLOCK, new AddBlockHandler(), -// WireCodes.Op.GET_BLOCKCHAIN,new GetBlockchainHandler() -// WireCodes.Op.SEARCH_USERS, new SearchUsersHandler(), -// WireCodes.Op.GET_LAST_BLOCK_INFO,new GetLastBlockInfoHandler() - - ); - - private InboundMessageProcessor() {} - - public static byte[] process(byte[] msg) { - if (msg == null || msg.length < 4) - return intTo4Bytes(WireCodes.Status.BAD_REQUEST); - - int op = first4ToInt(msg); - MessageHandler h = HANDLERS.get(op); - if (h == null) { - log.warn("Неизвестная операция: {}", op); - return intTo4Bytes(WireCodes.Status.BAD_REQUEST); - } - - try { - return h.handle(msg); - } catch (Exception e) { - log.error("Ошибка при обработке операции {}", op, e); - return intTo4Bytes(WireCodes.Status.INTERNAL_ERROR); - } - } - - private static int first4ToInt(byte[] msg) { - return ByteBuffer.wrap(msg, 0, 4) - .order(ByteOrder.BIG_ENDIAN) - .getInt(); - } - - public static byte[] intTo4Bytes(int code) { - return ByteBuffer.allocate(4) - .order(ByteOrder.BIG_ENDIAN) - .putInt(code) - .array(); - } - - - -} - diff --git a/src/main/java/server/logic/ws_protocol/binary/handlers/MessageHandler.java b/src/main/java/server/logic/ws_protocol/binary/handlers/MessageHandler.java deleted file mode 100644 index ba4fb05..0000000 --- a/src/main/java/server/logic/ws_protocol/binary/handlers/MessageHandler.java +++ /dev/null @@ -1,11 +0,0 @@ -package server.logic.ws_protocol.binary.handlers; - -/** - * Общий интерфейс для всех обработчиков входящих сообщений. - */ -public interface MessageHandler { - /** - * Обработать входящее сообщение и вернуть бинарный ответ. - */ - byte[] handle(byte[] msg); -} diff --git a/src/main/java/server/ws/BlockchainWsEndpoint.java b/src/main/java/server/ws/BlockchainWsEndpoint.java index affdf19..cf26ecb 100644 --- a/src/main/java/server/ws/BlockchainWsEndpoint.java +++ b/src/main/java/server/ws/BlockchainWsEndpoint.java @@ -5,85 +5,128 @@ import org.eclipse.jetty.websocket.api.WriteCallback; import org.eclipse.jetty.websocket.api.annotations.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import server.logic.InboundMessageProcessor; import server.logic.ws_protocol.JSON.ActiveConnectionsRegistry; import server.logic.ws_protocol.JSON.ConnectionContext; import server.logic.ws_protocol.JSON.JsonInboundProcessor; -import java.nio.ByteBuffer; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; @WebSocket public class BlockchainWsEndpoint { private static final Logger log = LoggerFactory.getLogger(BlockchainWsEndpoint.class); + /** + * Общий пул для обработки ВСЕХ входящих сообщений. + * Важно: не commonPool, чтобы под нагрузкой всё было предсказуемо. + */ + private static final ExecutorService WS_EXECUTOR = new ThreadPoolExecutor( + Math.max(2, Runtime.getRuntime().availableProcessors()), + Math.max(2, Runtime.getRuntime().availableProcessors()), + 60L, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(10_000), + new ThreadFactory() { + private final AtomicLong n = new AtomicLong(1); + @Override public Thread newThread(Runnable r) { + Thread t = new Thread(r, "ws-worker-" + n.getAndIncrement()); + t.setDaemon(true); + return t; + } + }, + new ThreadPoolExecutor.AbortPolicy() + ); + private Session session; /** Контекст для текущего WebSocket-соединения. */ private final ConnectionContext connectionContext = new ConnectionContext(); + /** + * Хвост очереди per-session: гарантирует строгую последовательность. + * Каждое новое сообщение добавляется в цепочку. + */ + private CompletableFuture queueTail = CompletableFuture.completedFuture(null); + + /** Защита от гонки при обновлении queueTail. */ + private final Object queueLock = new Object(); + @OnWebSocketConnect public void onConnect(Session session) { this.session = session; - // Привязываем WebSocket-сессию к ConnectionContext connectionContext.setWsSession(session); log.info("WS connected: {}", session.getRemoteAddress()); } + // JSON only @OnWebSocketMessage - public void onBinary(byte[] payload, int offset, int length) { - byte[] msg = new byte[length]; - System.arraycopy(payload, offset, msg, 0, length); + public void onText(String message) { + // Быстро отфильтруем мусор + if (message == null || message.isBlank()) return; - // Асинхронно обрабатываем входящее бинарное сообщение - CompletableFuture - .supplyAsync(() -> InboundMessageProcessor.process(msg)) - .thenAccept(resp -> { - if (resp != null && session != null && session.isOpen()) { - session.getRemote().sendBytes(ByteBuffer.wrap(resp), new WriteCallback() { - @Override - public void writeFailed(Throwable x) { - log.warn("Failed to send response", x); - } - - @Override - public void writeSuccess() { - log.debug("Response sent successfully"); - } - }); - } - }) - .exceptionally(ex -> { - log.error("Processing failed", ex); - trySendCode(500); - return null; - }); + // Добавляем обработку в очередь данного соединения (строго по порядку) + enqueue(() -> processJsonAndReply(message)); } - private void trySendCode(int code) { - if (session != null && session.isOpen()) { - byte[] resp = InboundMessageProcessor.intTo4Bytes(code); - session.getRemote().sendBytes(ByteBuffer.wrap(resp), new WriteCallback() { - @Override - public void writeFailed(Throwable x) { - log.warn("Failed to send error code", x); + private void enqueue(Runnable task) { + synchronized (queueLock) { + queueTail = queueTail.thenRunAsync(() -> { + try { + task.run(); + } catch (Throwable t) { + // Нельзя дать цепочке "сломаться", иначе очередь остановится навсегда + log.error("❌ Unhandled error in ws task", t); + trySendJsonError(); } - - @Override - public void writeSuccess() { - log.debug("Error code {} sent", code); - } - }); + }, WS_EXECUTOR); } } + private void processJsonAndReply(String message) { + if (session == null || !session.isOpen()) return; + + log.info("📥 Получено TEXT-сообщение от клиента: {}", message); + + String respJson; + try { + respJson = JsonInboundProcessor.processJson(message, connectionContext); + } catch (Exception ex) { + log.error("❌ Ошибка при обработке JSON-сообщения", ex); + trySendJsonError(); + return; + } + + if (respJson == null) return; + if (session == null || !session.isOpen()) return; + + log.info("📤 Отправляем ответ клиенту: {}", respJson); + + session.getRemote().sendString(respJson, new WriteCallback() { + @Override + public void writeFailed(Throwable x) { + log.warn("⚠️ Не удалось отправить JSON-ответ клиенту: {}", x.toString()); + } + + @Override + public void writeSuccess() { + log.debug("✔ JSON-ответ успешно отправлен"); + } + }); + } + @OnWebSocketClose public void onClose(int statusCode, String reason) { log.info("WS closed: {} {}", statusCode, reason); - // Удаляем это подключение из реестра активных соединений + ActiveConnectionsRegistry.getInstance().remove(connectionContext); - // На всякий случай очищаем контекст connectionContext.reset(); + + // “Обрываем” очередь: новые задачи всё равно не исполнятся из-за проверки session.isOpen(), + // но можно и явно завершить хвост. + synchronized (queueLock) { + queueTail = CompletableFuture.completedFuture(null); + } + + this.session = null; } @OnWebSocketError @@ -91,49 +134,15 @@ public class BlockchainWsEndpoint { log.error("WS error", cause); } - // Обработка текстовых JSON-запросов - @OnWebSocketMessage - public void onText(String message) { - log.info("📥 Получено TEXT-сообщение от клиента: {}", message); - - CompletableFuture - .supplyAsync(() -> JsonInboundProcessor.processJson(message, connectionContext)) - .thenAccept(respJson -> { - if (respJson != null && session != null && session.isOpen()) { - - log.info("📤 Отправляем ответ клиенту: {}", respJson); - - session.getRemote().sendString(respJson, new WriteCallback() { - @Override - public void writeFailed(Throwable x) { - log.warn("⚠️ Не удалось отправить JSON-ответ клиенту: {}", x.toString()); - } - - @Override - public void writeSuccess() { - log.debug("✔ JSON-ответ успешно отправлен"); - } - }); - } - }) - .exceptionally(ex -> { - log.error("❌ Ошибка при обработке JSON-сообщения", ex); - trySendJsonError(); - return null; - }); - } - private void trySendJsonError() { if (session != null && session.isOpen()) { String resp = "{\"op\":null,\"requestId\":null,\"status\":500," + "\"payload\":{\"code\":\"INTERNAL_ERROR\",\"message\":\"Ошибка сервера\"}}"; - log.info("📤 Отправляем клиенту ошибку JSON: {}", resp); - session.getRemote().sendString(resp, new WriteCallback() { @Override public void writeFailed(Throwable x) { - log.warn("⚠️ Не удалось отправить JSON-ответ клиенту: {}", x.toString()); + log.warn("⚠️ Не удалось отправить JSON-ошибку клиенту: {}", x.toString()); } @Override @@ -143,4 +152,4 @@ public class BlockchainWsEndpoint { }); } } -} +} \ No newline at end of file