Почта v2: ReceiveOutcomingMessage без авторизации и атомарная вставка пары

This commit is contained in:
AidarKC 2026-05-02 16:46:22 +03:00
parent e73328461e
commit b7e6cf7437
6 changed files with 123 additions and 18 deletions

View File

@ -1,2 +1,2 @@
client.version=1.2.33
server.version=1.2.27
client.version=1.2.34
server.version=1.2.28

View File

@ -1325,10 +1325,17 @@ export class AuthService {
}
async sendMessagePair({ incomingBlobB64, outgoingBlobB64 }) {
const response = await this.ws.request('SendMessagePair', { incomingBlobB64, outgoingBlobB64 });
const body = { incomingBlobB64, outgoingBlobB64 };
const primaryOp = 'ReceiveOutcomingMessage';
let response = await this.ws.request(primaryOp, body);
if (response.status === 404) {
response = await this.ws.request('SendMessagePair', body);
if (response.status !== 200) throw opError('SendMessagePair', response);
return response.payload || {};
}
if (response.status !== 200) throw opError(primaryOp, response);
return response.payload || {};
}
async sendDirectMessage({ login, toLogin, text, storagePwd }) {
const cleanFromLogin = String(login || '').trim();

View File

@ -6,6 +6,7 @@ import shine.db.entities.SignedMessageV2Entry;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
@ -54,6 +55,69 @@ public final class SignedMessagesV2DAO {
}
}
/**
* Атомарная вставка пары блоков: либо вставляются оба, либо не вставляется ни один.
* Возвращает true только если обе записи добавлены в БД.
* Если хотя бы одна запись уже существует (или конфликтует по уникальности), возвращает false.
*/
public boolean insertPairBothOrNothing(SignedMessageV2Entry first, SignedMessageV2Entry second) throws Exception {
try (Connection c = db.getConnection()) {
boolean prevAutoCommit = c.getAutoCommit();
c.setAutoCommit(false);
try {
int insertedFirst = insertStrict(c, first);
int insertedSecond = insertStrict(c, second);
if (insertedFirst == 1 && insertedSecond == 1) {
c.commit();
return true;
}
c.rollback();
return false;
} catch (SQLException sqlEx) {
try { c.rollback(); } catch (Exception ignored) {}
if (isConstraintViolation(sqlEx)) {
return false;
}
throw sqlEx;
} finally {
c.setAutoCommit(prevAutoCommit);
}
}
}
private int insertStrict(Connection c, SignedMessageV2Entry e) throws SQLException {
String sql = """
INSERT INTO signed_messages_v2 (
message_key, base_key, target_login, from_login, to_login,
time_ms, nonce, message_type, raw_block, created_at_ms,
source_api, origin_session_id, receipt_ref_base_key, receipt_ref_type
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""";
try (PreparedStatement ps = c.prepareStatement(sql)) {
ps.setString(1, e.getMessageKey());
ps.setString(2, e.getBaseKey());
ps.setString(3, e.getTargetLogin());
ps.setString(4, e.getFromLogin());
ps.setString(5, e.getToLogin());
ps.setLong(6, e.getTimeMs());
ps.setLong(7, e.getNonce());
ps.setInt(8, e.getMessageType());
ps.setBytes(9, e.getRawBlock());
ps.setLong(10, e.getCreatedAtMs());
ps.setString(11, e.getSourceApi());
ps.setString(12, e.getOriginSessionId());
ps.setString(13, e.getReceiptRefBaseKey());
if (e.getReceiptRefType() == null) ps.setObject(14, null);
else ps.setInt(14, e.getReceiptRefType());
return ps.executeUpdate();
}
}
private boolean isConstraintViolation(SQLException ex) {
String msg = String.valueOf(ex.getMessage()).toLowerCase();
return msg.contains("constraint") || msg.contains("unique") || msg.contains("primary key");
}
public SignedMessageV2Entry getByMessageKey(String messageKey) throws Exception {
try (Connection c = db.getConnection()) {
String sql = """

View File

@ -143,6 +143,7 @@ public final class JsonHandlerRegistry {
Map.entry("SendTestWebPush", new Net_SendTestWebPush_Handler()),
Map.entry("SendDirectMessage", new Net_SendDirectMessage_Handler()),
Map.entry("SendMessagePair", new Net_SendMessagePair_Handler()),
Map.entry("ReceiveOutcomingMessage", new Net_SendMessagePair_Handler()),
Map.entry("ReceiveIncomingMessage", new Net_ReceiveIncomingMessage_Handler()),
Map.entry("AckIncomingMessage", new Net_AckIncomingMessage_Handler()),
Map.entry("AckSessionDelivery", new Net_AckSessionDelivery_Handler()),
@ -199,6 +200,7 @@ public final class JsonHandlerRegistry {
Map.entry("SendTestWebPush", Net_SendTestWebPush_Request.class),
Map.entry("SendDirectMessage", Net_SendDirectMessage_Request.class),
Map.entry("SendMessagePair", Net_SendMessagePair_Request.class),
Map.entry("ReceiveOutcomingMessage", Net_SendMessagePair_Request.class),
Map.entry("ReceiveIncomingMessage", Net_ReceiveIncomingMessage_Request.class),
Map.entry("AckIncomingMessage", Net_AckIncomingMessage_Request.class),
Map.entry("AckSessionDelivery", Net_AckSessionDelivery_Request.class),

View File

@ -8,15 +8,13 @@ import server.logic.ws_protocol.JSON.messages.entyties.Net_SendMessagePair_Reque
import server.logic.ws_protocol.JSON.messages.entyties.Net_SendMessagePair_Response;
import server.logic.ws_protocol.JSON.utils.NetExceptionResponseFactory;
import server.logic.ws_protocol.WireCodes;
import shine.db.dao.SignedMessagesV2DAO;
import shine.db.entities.SignedMessageV2Entry;
public class Net_SendMessagePair_Handler implements JsonMessageHandler {
@Override
public Net_Response handle(Net_Request baseRequest, ConnectionContext ctx) throws Exception {
Net_SendMessagePair_Request req = (Net_SendMessagePair_Request) baseRequest;
if (ctx == null || !ctx.isAuthenticatedUser()) {
return NetExceptionResponseFactory.error(req, WireCodes.Status.UNVERIFIED, "NOT_AUTHENTICATED", "Требуется авторизация");
}
if (isBlank(req.getIncomingBlobB64()) || isBlank(req.getOutgoingBlobB64())) {
return NetExceptionResponseFactory.error(req, WireCodes.Status.BAD_REQUEST, "BAD_FIELDS", "incomingBlobB64/outgoingBlobB64 обязательны");
}
@ -31,10 +29,6 @@ public class Net_SendMessagePair_Handler implements JsonMessageHandler {
return NetExceptionResponseFactory.error(req, WireCodes.Status.BAD_REQUEST, ex.getMessage(), "Некорректный формат пары сообщений");
}
if (!incoming.fromLogin.equalsIgnoreCase(ctx.getLogin())) {
return NetExceptionResponseFactory.error(req, WireCodes.Status.UNVERIFIED, "SENDER_MISMATCH", "fromLogin должен совпадать с авторизованной сессией");
}
try {
SignedMessagesCore.verifyUsersAndSignature(incoming);
SignedMessagesCore.verifyUsersAndSignature(outgoing);
@ -47,23 +41,27 @@ public class Net_SendMessagePair_Handler implements JsonMessageHandler {
SignedMessageV2Entry incomingEntry;
SignedMessageV2Entry outgoingEntry;
try {
incomingEntry = SignedMessagesCore.toEntry(incoming, "SendMessagePair", ctx.getSessionId());
outgoingEntry = SignedMessagesCore.toEntry(outgoing, "SendMessagePair", ctx.getSessionId());
String sourceApi = "ReceiveOutcomingMessage";
String originSessionId = (ctx != null && !isBlank(ctx.getSessionId())) ? ctx.getSessionId() : null;
incomingEntry = SignedMessagesCore.toEntry(incoming, sourceApi, originSessionId);
outgoingEntry = SignedMessagesCore.toEntry(outgoing, sourceApi, originSessionId);
} catch (IllegalArgumentException ex) {
return NetExceptionResponseFactory.error(req, WireCodes.Status.BAD_REQUEST, ex.getMessage(), "Некорректный payload подтверждения");
}
boolean incomingInserted = SignedMessagesCore.saveIfAbsent(incomingEntry);
boolean outgoingInserted = SignedMessagesCore.saveIfAbsent(outgoingEntry);
boolean pairInserted = SignedMessagesV2DAO.getInstance().insertPairBothOrNothing(incomingEntry, outgoingEntry);
SignedMessagesRealtime.DeliveryCounters inCounters = new SignedMessagesRealtime.DeliveryCounters();
if (incomingInserted) {
if (pairInserted) {
inCounters = SignedMessagesRealtime.deliverToTargetSessions(incomingEntry, null);
}
String excludeSessionId = outgoingEntry.getTargetLogin().equalsIgnoreCase(ctx.getLogin()) ? ctx.getSessionId() : null;
String excludeSessionId = null;
if (ctx != null && !isBlank(ctx.getLogin()) && outgoingEntry.getTargetLogin().equalsIgnoreCase(ctx.getLogin())) {
excludeSessionId = ctx.getSessionId();
}
SignedMessagesRealtime.DeliveryCounters outCounters = new SignedMessagesRealtime.DeliveryCounters();
if (outgoingInserted) {
if (pairInserted) {
outCounters = SignedMessagesRealtime.deliverToTargetSessions(outgoingEntry, excludeSessionId);
}

View File

@ -0,0 +1,34 @@
# Логика доставки почты (Signed Messages v2)
## Что отправляет клиент
- Клиент формирует **пару подписанных блоков** с одинаковой базой (`baseKey`):
- `incoming` — сообщение для получателя (`targetLogin = toLogin`).
- `outgoing` — копия для отправителя (`targetLogin = fromLogin`).
- Пара отправляется методом `ReceiveOutcomingMessage` (старое имя `SendMessagePair` оставлено для совместимости).
## Что делает сервер при `ReceiveOutcomingMessage`
1. Валидирует поля запроса (`incomingBlobB64`, `outgoingBlobB64`).
2. Разбирает оба блока и проверяет, что это корректная пара.
3. Проверяет пользователей и криптоподписи по каждому блоку.
4. Пытается сохранить обе записи **одной транзакцией**:
- либо добавляются **обе** записи,
- либо при дубле/конфликте не добавляется **ни одна**.
5. Если пара реально добавилась в БД, сервер запускает realtime-доставку в активные сессии целевых пользователей.
6. Если это дубль, дальнейшая доставка не выполняется (повтор не разгоняется).
## Почему допускаются дубли сети
- В модели с несколькими серверами возможны повторные пересылки одного и того же сообщения.
- Дедупликация делается на уровне БД по ключам записи.
- За счёт этого «шторм» затухает: сервер, который уже видел сообщение, больше его не разгоняет.
## Будущая мультисерверная схема (цель)
- Клиент может отправить `ReceiveOutcomingMessage` на любой из своих серверов.
- Сервер-источник:
- сохраняет пару,
- рассылает исходящее по серверам пользователя A,
- рассылает входящее по серверам пользователя B.
- Остальные серверы повторяют тот же принцип, но благодаря дедупликации повторная пересылка быстро прекращается.
## Важные текущие ограничения
- **A) Реальной мультисерверности пока нет.** Сейчас фактически предполагается один сервер на пользователя.
- **B) Нет полноценного graceful shutdown для очереди пересылки.** Возможен сценарий: запись уже сохранена в БД, но сервер перезагрузился до пересылки дальше. Это нужно доработать отдельно.