Звонки: WebPush incoming/stop, actions и TTL; обновлена логика

This commit is contained in:
AidarKC 2026-05-02 18:25:44 +03:00
parent 310863faec
commit c0c29b74ab
10 changed files with 711 additions and 208 deletions

View File

@ -0,0 +1,41 @@
# TODO: Звонки и межсерверность
## Текущее ограничение
- Текущая реализация звонков фактически работает в одном сигнальном контуре (один сервер/единый кластер, где обе стороны уже присутствуют).
- Если пользователь A подключён к серверу A, а пользователь B к серверу B (и между ними нет общего сигнального слоя), `CallInviteBroadcast`/`CallSignalToSession` не смогут полноценно провести звонок между ними.
## Почему так сейчас
- Сигналинг звонка привязан к активным сессиям и событиям на конкретном сервере.
- Выбор целевой сессии (`sessionId`) и обмен `OFFER/ANSWER/ICE` происходит в рамках текущего сигнального контура.
- Push решает только «разбудить/уведомить», но не заменяет межсерверный сигнальный канал.
## Что можно сделать дальше
- Добавить временное межсерверное подключение именно для старта и ведения звонка:
- инициатор получает short-lived access на сервер callee (или через доверенный межсерверный gateway),
- в рамках короткой сессии отправляет invite/signal для конкретного `callId`,
- после завершения звонка временная сессия закрывается автоматически.
## Что нужно доработать для этого
1. Межсерверная доверенная модель:
- подпись/верификация межсерверных вызовов,
- allowlist доверенных серверов и ротация ключей.
2. Короткоживущая «call-only» авторизация:
- отдельный тип токена/сессии с TTL (например 13 минуты),
- минимальные права только на `CallInviteBroadcast/CallSignalToSession`.
3. Маршрутизация сессий пользователя между серверами:
- где находится активная сессия callee,
- как доставлять `stop_call` и terminal-сигналы на все устройства callee.
4. Идемпотентность и дедупликация:
- защита от повторов межсерверных сигналов по `callId + eventId`,
- корректная обработка out-of-order событий.
5. Наблюдаемость:
- метрики межсерверной доставки сигналов,
- диагностика по стадиям звонка и причинам срыва.
## Временный рабочий подход (до межсерверности)
- Держать звонки в одном сигнальном контуре.
- Использовать WebPush как fallback-уведомление (`incoming_call`/`stop_call`) для офлайн-сессий.

View File

@ -1,2 +1,2 @@
client.version=1.2.37
server.version=1.2.31
client.version=1.2.38
server.version=1.2.32

View File

@ -1,5 +1,7 @@
self.addEventListener('install', () => self.skipWaiting());
self.addEventListener('activate', (event) => event.waitUntil(self.clients.claim()));
self.__shineStoppedCalls = self.__shineStoppedCalls || new Map();
self.addEventListener('message', (event) => {
const data = event?.data || {};
if (data.type === 'SKIP_WAITING') {
@ -17,6 +19,61 @@ async function broadcastToClients(payload) {
});
}
async function broadcastCallActionToClients(action, payload) {
const clients = await self.clients.matchAll({ type: 'window', includeUncontrolled: true });
clients.forEach((client) => {
client.postMessage({
type: 'SHINE_CALL_PUSH_ACTION',
action,
payload,
});
});
}
function rememberStoppedCall(callId, sentAtMs = 0) {
if (!callId) return;
const now = Date.now();
const markAtMs = Number.isFinite(Number(sentAtMs)) ? Number(sentAtMs) : now;
self.__shineStoppedCalls.set(callId, Math.max(now, markAtMs));
const cutoff = now - 10 * 60 * 1000;
for (const [id, ts] of self.__shineStoppedCalls.entries()) {
if (Number(ts || 0) < cutoff) self.__shineStoppedCalls.delete(id);
}
}
function isCallStopped(callId, sentAtMs = 0) {
if (!callId) return false;
const stoppedAt = Number(self.__shineStoppedCalls.get(callId) || 0);
if (!stoppedAt) return false;
const incomingAt = Number.isFinite(Number(sentAtMs)) ? Number(sentAtMs) : 0;
return incomingAt <= 0 || incomingAt <= stoppedAt;
}
async function closeCallNotification(callId) {
if (!callId) return;
const list = await self.registration.getNotifications({ tag: callId });
list.forEach((n) => {
try { n.close(); } catch {}
});
}
function decodePushJson(rawText) {
try {
if (!rawText) return {};
return JSON.parse(rawText);
} catch {
return {};
}
}
function encodeCallPushPayloadForUrl(payload) {
try {
return encodeURIComponent(JSON.stringify(payload || {}));
} catch {
return '';
}
}
self.addEventListener('push', (event) => {
let body = '';
let rawText = '';
@ -27,13 +84,12 @@ self.addEventListener('push', (event) => {
if (event.data) {
const text = event.data.text();
rawText = text || '';
try {
const json = JSON.parse(rawText || '{}');
const json = decodePushJson(rawText);
kind = String(json.kind || '');
title = String(json.title || '');
body = String(json.text || '');
fromLogin = String(json.fromLogin || '');
} catch {
if (!kind && rawText) {
body = rawText || '';
}
}
@ -41,34 +97,106 @@ self.addEventListener('push', (event) => {
// ignore
}
const shouldNotify = kind === 'new_message' || kind === 'test_push' || (!kind && body);
const json = decodePushJson(rawText);
const callId = String(json.callId || '').trim();
const fromSessionId = String(json.fromSessionId || '').trim();
const toLogin = String(json.toLogin || '').trim();
const reason = String(json.reason || '').trim();
const sentAtMs = Number(json.sentAtMs || 0);
const expiresAtMs = Number(json.expiresAtMs || 0);
const nowMs = Date.now();
if (kind === 'stop_call' && callId) {
rememberStoppedCall(callId, sentAtMs || nowMs);
}
const isExpiredIncomingCall = kind === 'incoming_call'
&& Number.isFinite(expiresAtMs)
&& expiresAtMs > 0
&& nowMs > expiresAtMs;
const isIncomingCallAlreadyStopped = kind === 'incoming_call' && callId && isCallStopped(callId, sentAtMs || nowMs);
const shouldNotify = (
kind === 'new_message'
|| kind === 'test_push'
|| (kind === 'incoming_call' && !isExpiredIncomingCall && !isIncomingCallAlreadyStopped)
|| (!kind && body)
);
const notificationTitle = kind === 'test_push'
? (title || 'SHiNE: тестовый push')
: 'SHiNE: входящее сообщение';
: (kind === 'incoming_call'
? 'SHiNE: входящий звонок'
: 'SHiNE: входящее сообщение');
const notifyPromise = shouldNotify
? self.registration.showNotification(notificationTitle, {
body: body || (fromLogin ? `Вам пришло сообщение от ${fromLogin}` : 'Вам пришло сообщение'),
tag: kind === 'test_push' ? 'shine-test-push' : 'shine-direct-message',
tag: callId || (kind === 'test_push' ? 'shine-test-push' : 'shine-direct-message'),
renotify: true,
requireInteraction: kind === 'incoming_call',
data: {
kind,
callId,
fromLogin,
fromSessionId,
toLogin,
sentAtMs,
expiresAtMs,
reason,
},
actions: kind === 'incoming_call'
? [
{ action: 'accept', title: 'Ответить' },
{ action: 'decline', title: 'Сбросить' },
]
: [],
})
: Promise.resolve();
const closeOnStopPromise = kind === 'stop_call' && callId
? closeCallNotification(callId)
: Promise.resolve();
event.waitUntil(Promise.all([
notifyPromise,
closeOnStopPromise,
broadcastToClients({
kind,
body,
fromLogin,
fromSessionId,
toLogin,
callId,
sentAtMs,
expiresAtMs,
reason,
stale: isExpiredIncomingCall || isIncomingCallAlreadyStopped,
rawText,
receivedAt: Date.now(),
receivedAt: nowMs,
}),
]));
});
self.addEventListener('notificationclick', (event) => {
event.notification?.close();
const action = String(event?.action || '').trim().toLowerCase();
const data = event?.notification?.data || {};
const payload = {
kind: String(data.kind || '').trim(),
callId: String(data.callId || '').trim(),
fromLogin: String(data.fromLogin || '').trim(),
fromSessionId: String(data.fromSessionId || '').trim(),
toLogin: String(data.toLogin || '').trim(),
sentAtMs: Number(data.sentAtMs || 0),
expiresAtMs: Number(data.expiresAtMs || 0),
reason: String(data.reason || '').trim(),
};
event.waitUntil((async () => {
if ((action === 'accept' || action === 'decline') && payload.callId) {
await broadcastCallActionToClients(action, payload);
}
const allClients = await self.clients.matchAll({ type: 'window', includeUncontrolled: true });
const existing = allClients.find((client) => {
try {
@ -78,11 +206,26 @@ self.addEventListener('notificationclick', (event) => {
}
});
const openUrlBase = './index.html';
const encodedPayload = encodeCallPushPayloadForUrl(payload);
const openUrl = (action === 'accept' || action === 'decline')
? `${openUrlBase}?callPushAction=${encodeURIComponent(action)}&callPushPayload=${encodedPayload}`
: openUrlBase;
if (existing) {
try {
if (action === 'accept' || action === 'decline') {
existing.postMessage({
type: 'SHINE_CALL_PUSH_ACTION',
action,
payload,
});
}
} catch {}
await existing.focus();
return;
}
await self.clients.openWindow('./index.html');
await self.clients.openWindow(openUrl);
})());
});

View File

@ -5,8 +5,11 @@ import { initPwaInstallPromptHandling } from './services/pwa-install-service.js'
import { initPwaPush } from './services/pwa-push-service.js';
import { initCallUiOverlay } from './services/call-ui-service.js';
import {
handleCallPushAction,
handleIncomingCallInvite,
handleIncomingCallPush,
handleIncomingCallSignal,
handleStopCallPush,
setCallDebugReporter,
startDebugConnectionAsInitiator,
startDebugConnectionAsResponder,
@ -127,6 +130,7 @@ let uiUpdateReloadScheduled = false;
let pwaUpdateCheckAttempted = false;
let uiVersionCheckInFlight = false;
let uiVersionPeriodicIntervalId = null;
const CALL_PUSH_PENDING_ACTION_KEY = 'shine-ui-call-push-pending-action-v1';
setClientErrorTransport((payload) => authService.reportClientError(payload));
initPwaInstallPromptHandling();
@ -220,6 +224,85 @@ function startConnectionCountdown() {
}, 1000);
}
function savePendingCallPushAction(action, payload = {}) {
try {
const item = {
action: String(action || '').trim().toLowerCase(),
payload: payload || {},
savedAtMs: Date.now(),
};
localStorage.setItem(CALL_PUSH_PENDING_ACTION_KEY, JSON.stringify(item));
} catch {
// ignore localStorage errors
}
}
function loadPendingCallPushAction() {
try {
const raw = localStorage.getItem(CALL_PUSH_PENDING_ACTION_KEY);
if (!raw) return null;
const parsed = JSON.parse(raw);
const action = String(parsed?.action || '').trim().toLowerCase();
if (action !== 'accept' && action !== 'decline') return null;
return {
action,
payload: parsed?.payload || {},
};
} catch {
return null;
}
}
function clearPendingCallPushAction() {
try {
localStorage.removeItem(CALL_PUSH_PENDING_ACTION_KEY);
} catch {
// ignore localStorage errors
}
}
function consumeCallPushActionFromUrlIfAny() {
try {
const params = new URLSearchParams(window.location.search || '');
const action = String(params.get('callPushAction') || '').trim().toLowerCase();
const rawPayload = String(params.get('callPushPayload') || '');
if (action !== 'accept' && action !== 'decline') return;
let payload = {};
if (rawPayload) {
try {
payload = JSON.parse(decodeURIComponent(rawPayload));
} catch {
payload = {};
}
}
savePendingCallPushAction(action, payload);
params.delete('callPushAction');
params.delete('callPushPayload');
const nextQuery = params.toString();
const nextUrl = `${window.location.pathname}${nextQuery ? `?${nextQuery}` : ''}${window.location.hash || ''}`;
window.history.replaceState({}, '', nextUrl);
} catch {
// ignore URL parsing errors
}
}
async function processPendingCallPushActionIfPossible() {
if (!state.session.isAuthorized) return;
const pending = loadPendingCallPushAction();
if (!pending) return;
clearPendingCallPushAction();
try {
await handleCallPushAction(pending.action, pending.payload || {});
} catch (error) {
addAppLogEntry({
level: 'warn',
source: 'web-push',
message: 'Не удалось выполнить действие звонка из push',
details: { action: pending.action, error: error?.message || 'unknown' },
});
}
}
function setConnectionStatus(nextState, text = '') {
const state = String(nextState || '').trim();
if (!state) return;
@ -677,9 +760,13 @@ async function ensureSessionRuntimeStarted() {
});
}
}, 15_000);
await processPendingCallPushActionIfPossible();
}
async function init() {
consumeCallPushActionFromUrlIfAny();
addAppLogEntry({
level: 'info',
source: 'app',
@ -698,12 +785,23 @@ async function init() {
setSessionAuthorizedHandler(() => {
void ensureSessionRuntimeStarted();
void processPendingCallPushActionIfPossible();
});
if ('serviceWorker' in navigator) {
navigator.serviceWorker.addEventListener('message', (event) => {
const data = event?.data || {};
if (data.type === 'SHINE_CALL_PUSH_ACTION') {
const action = String(data.action || '').trim().toLowerCase();
const payload = data.payload || {};
if (action === 'accept' || action === 'decline') {
savePendingCallPushAction(action, payload);
void processPendingCallPushActionIfPossible();
}
return;
}
if (data.type !== 'SHINE_WEB_PUSH_EVENT') return;
const payload = data.payload || {};
const kind = String(payload.kind || '').trim();
const now = Date.now();
@ -723,6 +821,11 @@ async function init() {
message: 'Получено push-событие в service worker',
details: payload,
});
if (kind === 'incoming_call' && !payload.stale && state.session.isAuthorized) {
void handleIncomingCallPush(payload);
} else if (kind === 'stop_call' && state.session.isAuthorized) {
void handleStopCallPush(payload);
}
window.dispatchEvent(new CustomEvent('shine-push-diagnostics-update', { detail: payload }));
});
}

View File

@ -1064,6 +1064,84 @@ function ensureIncomingNotification(peerLogin) {
} catch {}
}
function isIncomingCallPushFresh(payload) {
const expiresAtMs = Number(payload?.expiresAtMs || 0);
if (Number.isFinite(expiresAtMs) && expiresAtMs > 0 && Date.now() > expiresAtMs) {
return false;
}
return true;
}
async function handleIncomingInvitePayload(payload, { source = 'ws' } = {}) {
const callId = String(payload?.callId || '').trim();
const fromLogin = String(payload?.fromLogin || '').trim();
const fromSessionId = String(payload?.fromSessionId || '').trim();
if (!callId || !fromLogin || !fromSessionId) return null;
if (activeCallId && activeCallId !== callId) {
try {
await authService.callSignalToSession({
toLogin: fromLogin,
targetSessionId: fromSessionId,
callId,
type: TYPES.DECLINE_BUSY,
data: 'busy',
});
} catch {}
return null;
}
let call = getCall(callId);
if (!call) {
call = {
callId,
peerLogin: fromLogin,
direction: 'in',
phase: 'incoming',
statusText: `Вам звонит ${fromLogin}`,
remoteSessionId: fromSessionId,
timers: {},
startedAtMs: nowMs(),
connectedAtMs: 0,
pc: null,
localStream: null,
audioSenders: [],
muted: false,
connectionRouteLabel: '',
reconnectInProgress: false,
reconnectAttempts: 0,
debugMode: false,
debugRunId: '',
debugRole: '',
pendingRemoteIceCandidates: [],
initialOfferInProgress: false,
initialOfferSent: false,
};
calls.set(callId, call);
} else if (!call.remoteSessionId && fromSessionId) {
call.remoteSessionId = fromSessionId;
}
activeCallId = callId;
setStatus(call, `Вам звонит ${fromLogin}`, 'incoming');
ensureIncomingNotification(fromLogin);
try {
await sendSignal(call, TYPES.RINGING, `ringing:${source}`);
} catch {}
if (!call.timers.incoming20s) {
call.timers.incoming20s = setTimeout(async () => {
if (!calls.has(callId)) return;
try {
await sendSignal(call, TYPES.TIMEOUT, 'timeout_20s');
} catch {}
await finalizeCall(call, { localReasonCode: 'no_answer', debugReason: 'incoming_timeout_20s' });
}, 20000);
}
return call;
}
export function setCallDebugReporter(fn) {
debugReporter = typeof fn === 'function' ? fn : null;
}
@ -1272,69 +1350,7 @@ export async function startOutgoingCall(peerLogin) {
}
export async function handleIncomingCallInvite(evt) {
const payload = evt?.payload || {};
const callId = String(payload.callId || '').trim();
const fromLogin = String(payload.fromLogin || '').trim();
const fromSessionId = String(payload.fromSessionId || '').trim();
if (!callId || !fromLogin || !fromSessionId) return;
if (activeCallId && activeCallId !== callId) {
try {
await authService.callSignalToSession({
toLogin: fromLogin,
targetSessionId: fromSessionId,
callId,
type: TYPES.DECLINE_BUSY,
data: 'busy',
});
} catch {}
return;
}
let call = getCall(callId);
if (!call) {
call = {
callId,
peerLogin: fromLogin,
direction: 'in',
phase: 'incoming',
statusText: `Вам звонит ${fromLogin}`,
remoteSessionId: fromSessionId,
timers: {},
startedAtMs: nowMs(),
connectedAtMs: 0,
pc: null,
localStream: null,
audioSenders: [],
muted: false,
connectionRouteLabel: '',
reconnectInProgress: false,
reconnectAttempts: 0,
debugMode: false,
debugRunId: '',
debugRole: '',
pendingRemoteIceCandidates: [],
initialOfferInProgress: false,
initialOfferSent: false,
};
calls.set(callId, call);
}
activeCallId = callId;
setStatus(call, `Вам звонит ${fromLogin}`, 'incoming');
ensureIncomingNotification(fromLogin);
try {
await sendSignal(call, TYPES.RINGING, 'ringing');
} catch {}
call.timers.incoming20s = setTimeout(async () => {
if (!calls.has(callId)) return;
try {
await sendSignal(call, TYPES.TIMEOUT, 'timeout_20s');
} catch {}
await finalizeCall(call, { localReasonCode: 'no_answer', debugReason: 'incoming_timeout_20s' });
}, 20000);
await handleIncomingInvitePayload(evt?.payload || {}, { source: 'ws' });
}
export async function acceptIncomingCall() {
@ -1534,3 +1550,32 @@ export async function hangupActiveCall() {
notifyRemoteHangup: true,
});
}
export async function handleIncomingCallPush(payload = {}) {
if (!isIncomingCallPushFresh(payload)) return;
await handleIncomingInvitePayload(payload, { source: 'push' });
}
export async function handleStopCallPush(payload = {}) {
const callId = String(payload?.callId || '').trim();
if (!callId) return;
const call = getCall(callId);
if (!call) return;
const reason = String(payload?.reason || 'stop_call_push').trim() || 'stop_call_push';
await finalizeCall(call, {
localReasonCode: call.connectedAtMs ? 'completed' : 'no_answer',
debugReason: `stop_call_push:${reason}`,
});
}
export async function handleCallPushAction(action, payload = {}) {
const normalized = String(action || '').trim().toLowerCase();
if (normalized !== 'accept' && normalized !== 'decline') return;
if (!isIncomingCallPushFresh(payload)) return;
await handleIncomingCallPush(payload);
if (normalized === 'accept') {
await acceptIncomingCall();
return;
}
await declineIncomingCall();
}

View File

@ -9,14 +9,14 @@ import server.logic.ws_protocol.JSON.entyties.Net_Response;
import server.logic.ws_protocol.JSON.handlers.JsonMessageHandler;
import server.logic.ws_protocol.JSON.messages.entyties.Net_CallInviteBroadcast_Request;
import server.logic.ws_protocol.JSON.messages.entyties.Net_CallInviteBroadcast_Response;
import server.logic.ws_protocol.JSON.push.FcmPushSender;
import server.logic.ws_protocol.JSON.push.WebPushSender;
import server.logic.ws_protocol.JSON.push.WsEventSender;
import server.logic.ws_protocol.JSON.utils.NetExceptionResponseFactory;
import server.logic.ws_protocol.JSON.utils.NetIdGenerator;
import server.logic.ws_protocol.WireCodes;
import shine.db.dao.PushTokensDAO;
import shine.db.dao.ActiveSessionsDAO;
import shine.db.dao.SolanaUsersDAO;
import shine.db.entities.PushTokenEntry;
import shine.db.entities.ActiveSessionEntry;
import shine.db.entities.SolanaUserEntry;
import java.util.HashSet;
@ -26,6 +26,7 @@ import java.util.Set;
public class Net_CallInviteBroadcast_Handler implements JsonMessageHandler {
private static final ObjectMapper MAPPER = new ObjectMapper();
private static final int TYPE_INVITE = 100;
private static final long PUSH_CALL_TTL_MS = 10_000L;
@Override
public Net_Response handle(Net_Request baseRequest, ConnectionContext ctx) throws Exception {
@ -49,12 +50,13 @@ public class Net_CallInviteBroadcast_Handler implements JsonMessageHandler {
String from = ctx.getLogin();
String to = targetUser.getLogin();
long timeMs = System.currentTimeMillis();
long expiresAtMs = timeMs + PUSH_CALL_TTL_MS;
Set<ConnectionContext> activeSessions = ActiveConnectionsRegistry.getInstance().getByLogin(to);
List<PushTokenEntry> tokens = PushTokensDAO.getInstance().listByLogin(to);
List<ActiveSessionEntry> allTargetSessions = ActiveSessionsDAO.getInstance().getByLogin(to);
int wsDelivered = 0;
int fcmDelivered = 0;
int webPushDelivered = 0;
Set<String> activeSessionIds = new HashSet<>();
for (ConnectionContext targetCtx : activeSessions) {
@ -74,14 +76,31 @@ public class Net_CallInviteBroadcast_Handler implements JsonMessageHandler {
if (sent) wsDelivered++;
}
for (PushTokenEntry token : tokens) {
boolean pushed = FcmPushSender.sendNotification(
token.getToken(),
"Входящий звонок",
from + " пытается дозвониться",
callId
for (ActiveSessionEntry session : allTargetSessions) {
String sessionId = String.valueOf(session.getSessionId() == null ? "" : session.getSessionId()).trim();
if (!sessionId.isBlank() && activeSessionIds.contains(sessionId)) {
continue;
}
if (isBlank(session.getPushEndpoint()) || isBlank(session.getPushP256dhKey()) || isBlank(session.getPushAuthKey())) {
continue;
}
String payload = "{\"kind\":\"incoming_call\""
+ ",\"title\":\"SHiNE: входящий звонок\""
+ ",\"text\":\"Вам звонит " + jsonEscape(from) + "\""
+ ",\"fromLogin\":\"" + jsonEscape(from) + "\""
+ ",\"fromSessionId\":\"" + jsonEscape(ctx.getSessionId()) + "\""
+ ",\"toLogin\":\"" + jsonEscape(to) + "\""
+ ",\"callId\":\"" + jsonEscape(callId) + "\""
+ ",\"sentAtMs\":" + timeMs
+ ",\"expiresAtMs\":" + expiresAtMs
+ "}";
boolean pushed = WebPushSender.sendBase64Payload(
session.getPushEndpoint(),
session.getPushP256dhKey(),
session.getPushAuthKey(),
payload
);
if (pushed) fcmDelivered++;
if (pushed) webPushDelivered++;
}
Net_CallInviteBroadcast_Response resp = new Net_CallInviteBroadcast_Response();
@ -90,7 +109,27 @@ public class Net_CallInviteBroadcast_Handler implements JsonMessageHandler {
resp.setStatus(WireCodes.Status.OK);
resp.setCallId(callId);
resp.setDeliveredWsSessions(wsDelivered);
resp.setDeliveredFcmSessions(fcmDelivered);
resp.setDeliveredFcmSessions(webPushDelivered);
resp.setDeliveredWebPushSessions(webPushDelivered);
return resp;
}
private boolean isBlank(String s) {
return s == null || s.isBlank();
}
private static String jsonEscape(String s) {
if (s == null) return "";
StringBuilder out = new StringBuilder();
for (int i = 0; i < s.length(); i++) {
char c = s.charAt(i);
if (c == '\\') out.append("\\\\");
else if (c == '"') out.append("\\\"");
else if (c == '\n') out.append("\\n");
else if (c == '\r') out.append("\\r");
else if (c == '\t') out.append("\\t");
else out.append(c);
}
return out.toString();
}
}

View File

@ -9,18 +9,25 @@ import server.logic.ws_protocol.JSON.entyties.Net_Response;
import server.logic.ws_protocol.JSON.handlers.JsonMessageHandler;
import server.logic.ws_protocol.JSON.messages.entyties.Net_CallSignalToSession_Request;
import server.logic.ws_protocol.JSON.messages.entyties.Net_CallSignalToSession_Response;
import server.logic.ws_protocol.JSON.push.WebPushSender;
import server.logic.ws_protocol.JSON.push.WsEventSender;
import server.logic.ws_protocol.JSON.utils.NetExceptionResponseFactory;
import server.logic.ws_protocol.JSON.utils.NetIdGenerator;
import server.logic.ws_protocol.WireCodes;
import shine.db.dao.ActiveSessionsDAO;
import shine.db.dao.SolanaUsersDAO;
import shine.db.entities.ActiveSessionEntry;
import shine.db.entities.SolanaUserEntry;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
public class Net_CallSignalToSession_Handler implements JsonMessageHandler {
private static final ObjectMapper MAPPER = new ObjectMapper();
private static final int TYPE_ACCEPT = 120;
private static final int TYPE_DECLINE_BUSY = 130;
private static final int TYPE_TIMEOUT = 140;
private static final int TYPE_HANGUP = 150;
@Override
@ -72,7 +79,34 @@ public class Net_CallSignalToSession_Handler implements JsonMessageHandler {
boolean delivered = WsEventSender.sendEvent(targetCtx, "IncomingCallSignal", eventId, payload);
if (type == TYPE_ACCEPT) {
notifyAcceptedOnOtherSessions(ctx, callId);
notifyStopOnOtherSessions(
ctx.getLogin(),
ctx.getSessionId(),
ctx.getLogin(),
ctx.getSessionId(),
callId,
"accepted_on_other_device"
);
}
if (type == TYPE_DECLINE_BUSY || type == TYPE_TIMEOUT || type == TYPE_HANGUP) {
String reason = "terminal_call_signal_" + type;
notifyStopOnOtherSessions(
ctx.getLogin(),
ctx.getSessionId(),
ctx.getLogin(),
ctx.getSessionId(),
callId,
reason
);
notifyStopOnOtherSessions(
to,
targetCtx.getSessionId(),
ctx.getLogin(),
ctx.getSessionId(),
callId,
reason
);
}
Net_CallSignalToSession_Response resp = new Net_CallSignalToSession_Response();
@ -83,31 +117,81 @@ public class Net_CallSignalToSession_Handler implements JsonMessageHandler {
return resp;
}
private void notifyAcceptedOnOtherSessions(ConnectionContext accepterCtx, String callId) {
if (accepterCtx == null) return;
String login = accepterCtx.getLogin();
String acceptedSessionId = accepterCtx.getSessionId();
if (login == null || login.isBlank() || acceptedSessionId == null || acceptedSessionId.isBlank() || callId == null || callId.isBlank()) {
private void notifyStopOnOtherSessions(
String targetLogin,
String excludeSessionId,
String fromLogin,
String fromSessionId,
String callId,
String reason
) throws Exception {
if (isBlank(targetLogin) || isBlank(callId)) {
return;
}
Set<ConnectionContext> sameUserSessions = ActiveConnectionsRegistry.getInstance().getByLogin(login);
Set<String> onlineSessionIds = new HashSet<>();
Set<ConnectionContext> sameUserSessions = ActiveConnectionsRegistry.getInstance().getByLogin(targetLogin);
for (ConnectionContext siblingCtx : sameUserSessions) {
if (siblingCtx == null || siblingCtx.getWsSession() == null || !siblingCtx.getWsSession().isOpen()) continue;
if (acceptedSessionId.equals(siblingCtx.getSessionId())) continue;
onlineSessionIds.add(String.valueOf(siblingCtx.getSessionId() == null ? "" : siblingCtx.getSessionId()).trim());
if (!isBlank(excludeSessionId) && excludeSessionId.equals(siblingCtx.getSessionId())) continue;
String siblingEventId = NetIdGenerator.eventId("evt");
ObjectNode siblingPayload = MAPPER.createObjectNode();
siblingPayload.put("eventId", siblingEventId);
siblingPayload.put("fromLogin", login);
siblingPayload.put("fromSessionId", acceptedSessionId);
siblingPayload.put("toLogin", login);
siblingPayload.put("fromLogin", fromLogin);
siblingPayload.put("fromSessionId", fromSessionId);
siblingPayload.put("toLogin", targetLogin);
siblingPayload.put("callId", callId);
siblingPayload.put("type", TYPE_HANGUP);
siblingPayload.put("data", "accepted_on_other_device");
siblingPayload.put("data", reason);
siblingPayload.put("timeMs", System.currentTimeMillis());
WsEventSender.sendEvent(siblingCtx, "IncomingCallSignal", siblingEventId, siblingPayload);
}
List<ActiveSessionEntry> persistedSessions = ActiveSessionsDAO.getInstance().getByLogin(targetLogin);
long sentAtMs = System.currentTimeMillis();
for (ActiveSessionEntry session : persistedSessions) {
String sessionId = String.valueOf(session.getSessionId() == null ? "" : session.getSessionId()).trim();
if (!isBlank(excludeSessionId) && excludeSessionId.equals(sessionId)) continue;
if (!sessionId.isBlank() && onlineSessionIds.contains(sessionId)) continue;
if (isBlank(session.getPushEndpoint()) || isBlank(session.getPushP256dhKey()) || isBlank(session.getPushAuthKey())) {
continue;
}
String pushPayload = "{\"kind\":\"stop_call\""
+ ",\"callId\":\"" + jsonEscape(callId) + "\""
+ ",\"reason\":\"" + jsonEscape(reason) + "\""
+ ",\"fromLogin\":\"" + jsonEscape(fromLogin) + "\""
+ ",\"fromSessionId\":\"" + jsonEscape(fromSessionId) + "\""
+ ",\"toLogin\":\"" + jsonEscape(targetLogin) + "\""
+ ",\"sentAtMs\":" + sentAtMs
+ "}";
WebPushSender.sendBase64Payload(
session.getPushEndpoint(),
session.getPushP256dhKey(),
session.getPushAuthKey(),
pushPayload
);
}
}
private boolean isBlank(String s) {
return s == null || s.isBlank();
}
private static String jsonEscape(String s) {
if (s == null) return "";
StringBuilder out = new StringBuilder();
for (int i = 0; i < s.length(); i++) {
char c = s.charAt(i);
if (c == '\\') out.append("\\\\");
else if (c == '"') out.append("\\\"");
else if (c == '\n') out.append("\\n");
else if (c == '\r') out.append("\\r");
else if (c == '\t') out.append("\\t");
else out.append(c);
}
return out.toString();
}
}

View File

@ -6,6 +6,7 @@ public class Net_CallInviteBroadcast_Response extends Net_Response {
private String callId;
private int deliveredWsSessions;
private int deliveredFcmSessions;
private int deliveredWebPushSessions;
public String getCallId() { return callId; }
public void setCallId(String callId) { this.callId = callId; }
@ -15,4 +16,7 @@ public class Net_CallInviteBroadcast_Response extends Net_Response {
public int getDeliveredFcmSessions() { return deliveredFcmSessions; }
public void setDeliveredFcmSessions(int deliveredFcmSessions) { this.deliveredFcmSessions = deliveredFcmSessions; }
public int getDeliveredWebPushSessions() { return deliveredWebPushSessions; }
public void setDeliveredWebPushSessions(int deliveredWebPushSessions) { this.deliveredWebPushSessions = deliveredWebPushSessions; }
}

View File

@ -1,113 +1,44 @@
# Логика установки соединения через сервер
# Логика установки звонка через сервер (актуальная)
Ниже описан фактический flow звонка в текущей реализации SHiNE с опорой на сообщения, которые проходят через сервер.
## 1) Ключевые API
- `CallInviteBroadcast` — широковещательный старт входящего звонка (`type=100`) по пользователю.
- `CallSignalToSession` — точечный сигнал в конкретную сессию (`RINGING/ACCEPT/DECLINE/TIMEOUT/HANGUP/OFFER/ANSWER/ICE`).
## 1) Основные операции и типы сообщений
## 2) Базовый поток звонка
1. Инициатор отправляет `CallInviteBroadcast(toLogin, callId, type=100)`.
2. Сервер отправляет онлайн-сессиям callee событие `IncomingCallInvite` по WS.
3. Офлайн-сессиям callee сервер отправляет WebPush `incoming_call` (с TTL).
4. Любая сессия callee, получив invite, может отправить `RINGING`.
5. Инициатор после первого `RINGING` показывает «Вызываем…».
6. При `ACCEPT` выбирается одна целевая сессия, остальные получают `HANGUP/stop_call` и закрывают экран входящего.
7. Далее только выбранная пара сессий обменивается `OFFER/ANSWER/ICE`.
Клиентские WS-операции:
- `CallInviteBroadcast` — широковещательный входящий вызов пользователю.
- `CallSignalToSession` — точечный сигнал в конкретную сессию.
## 3) WebPush по звонкам
### `incoming_call`
- Используется как fallback для офлайн-сессий.
- Поля: `kind`, `callId`, `fromLogin`, `fromSessionId`, `toLogin`, `sentAtMs`, `expiresAtMs`.
- TTL сейчас: **10 секунд**.
- Если push пришёл после `expiresAtMs`, уведомление не показывается.
Серверные события клиенту:
- `IncomingCallInvite` — уведомление о входящем вызове.
- `IncomingCallSignal` — сигнал по активному `callId`.
### `stop_call`
- Отправляется при завершении/отмене/принятии на другом устройстве.
- Поля: `kind`, `callId`, `reason`, `fromLogin`, `fromSessionId`, `toLogin`, `sentAtMs`.
- Service Worker закрывает уведомление по `tag=callId`.
Коды `type` в `IncomingCallSignal`:
- `100``INVITE`
- `110``RINGING`
- `120``ACCEPT`
- `130``DECLINE_BUSY`
- `140``TIMEOUT`
- `150``HANGUP`
- `200``OFFER`
- `210``ANSWER`
- `220``ICE`
## 4) Кнопки в push-уведомлении
- Для `incoming_call` есть actions:
- `accept` — «Ответить»
- `decline` — «Сбросить»
- По нажатию action:
- Service Worker шлёт событие в открытые вкладки,
- и открывает/фокусит UI с параметрами действия.
- UI пытается выполнить действие сразу (принять/отклонить), если сессия уже авторизована.
---
## 5) Важные ограничения
- Источник истины по звонку — серверный сигналинг (WS + серверные сигналы), push только вспомогательный канал.
- Push может прийти с задержкой и не по порядку, поэтому клиент фильтрует устаревшие события (`expiresAtMs` + `callId` + `stop_call`).
- Полноценный «автоответ в фоне без открытия UI» в web/PWA не гарантируется платформой.
## 2) Старт исходящего звонка
1. Инициатор создаёт `callId` и отправляет:
- `CallInviteBroadcast(toLogin, callId, type=100)`.
2. Сервер находит все активные WS-сессии целевого логина и шлёт им `IncomingCallInvite`.
3. На устройствах callee появляется экран входящего вызова.
---
## 3) Ранние статусы до поднятия трубки
Каждое устройство callee может отправить инициатору:
- `RINGING (110)` — «звонок идёт».
Для исходящего звонка инициатор фиксирует выбранную `remoteSessionId`:
- первый валидный `RINGING`/`ACCEPT` выбирает сессию,
- сигналы с тем же `callId`, но от других сессий этого же пользователя, игнорируются.
Это защищает от гонок мультидевайса.
---
## 4) Принятие звонка на одном устройстве callee
1. На выбранном устройстве callee пользователь нажимает «Поднять»:
- отправляется `ACCEPT (120)` в выбранную сессию инициатора.
2. Сервер дополнительно рассылает на **другие** сессии этого же callee:
- `HANGUP (150)` с `data=accepted_on_other_device`.
3. Остальные устройства callee закрывают экран входящего вызова.
Итог: активным остаётся один путь «инициатор ↔ выбранная сессия callee».
---
## 5) Обмен SDP (OFFER/ANSWER)
После `ACCEPT`:
1. Инициатор формирует `RTCPeerConnection`, `createOffer()`, отправляет:
- `OFFER (200)`.
2. Calee применяет `setRemoteDescription(offer)`, делает `createAnswer()`, отправляет:
- `ANSWER (210)`.
3. Инициатор применяет `setRemoteDescription(answer)`.
Защиты:
- повторный `ACCEPT`/повторный старт `offer` игнорируется;
- `ANSWER` обрабатывается только для исходящего звонка;
- `ANSWER` без локального `offer` или дубликат в `stable` игнорируется.
---
## 6) Обмен ICE-кандидатами
`ICE (220)` может приходить раньше SDP. Поэтому:
- если `pc` ещё не создан — ICE кладётся в очередь;
- если `pc` есть, но `remoteDescription` ещё нет — ICE тоже в очередь;
- после установки `remoteDescription` очередь применяется (`addIceCandidate`).
Это устраняет race «remote description was null».
---
## 7) Завершение и ошибки
Нормальное завершение:
- `HANGUP (150)` от одной стороны → вторая завершает звонок.
Неуспех установки:
- сторона, у которой setup не удался, шлёт `HANGUP (150)` с `data=setup_failed:...`,
- вторая сторона сразу закрывает экран ожидания.
Также пишутся `CallDeliveryReport`:
- `call_connected`, `incoming_failed`, `outgoing_failed`, `unknown_error` и расширенная диагностика ICE/SDP.
---
## 8) Почему схема устойчива сейчас
Текущая устойчивость обеспечивается тремя правилами:
1. **Одна выбранная сессия callee** для исходящего звонка.
2. **Принятие на одном устройстве закрывает входящий на остальных** через сервер.
3. **ICE буферизуется до готовности SDP/PC**, а не ломает handshake.
Именно комбинация этих трёх пунктов закрывает основные причины «иногда дозванивается, иногда нет» при мультидевайсе.
## 6) Ограничение текущей архитектуры
- Сейчас звонки надёжно работают в рамках одного сигнального контура (когда обе стороны подключены к совместимому серверу/кластеру).
- Межсерверный сценарий «A и B на полностью разных серверах» пока не завершён и вынесен в TODO.

View File

@ -0,0 +1,113 @@
# Логика установки соединения через сервер
Ниже описан фактический flow звонка в текущей реализации SHiNE с опорой на сообщения, которые проходят через сервер.
## 1) Основные операции и типы сообщений
Клиентские WS-операции:
- `CallInviteBroadcast` — широковещательный входящий вызов пользователю.
- `CallSignalToSession` — точечный сигнал в конкретную сессию.
Серверные события клиенту:
- `IncomingCallInvite` — уведомление о входящем вызове.
- `IncomingCallSignal` — сигнал по активному `callId`.
Коды `type` в `IncomingCallSignal`:
- `100``INVITE`
- `110``RINGING`
- `120``ACCEPT`
- `130``DECLINE_BUSY`
- `140``TIMEOUT`
- `150``HANGUP`
- `200``OFFER`
- `210``ANSWER`
- `220``ICE`
---
## 2) Старт исходящего звонка
1. Инициатор создаёт `callId` и отправляет:
- `CallInviteBroadcast(toLogin, callId, type=100)`.
2. Сервер находит все активные WS-сессии целевого логина и шлёт им `IncomingCallInvite`.
3. На устройствах callee появляется экран входящего вызова.
---
## 3) Ранние статусы до поднятия трубки
Каждое устройство callee может отправить инициатору:
- `RINGING (110)` — «звонок идёт».
Для исходящего звонка инициатор фиксирует выбранную `remoteSessionId`:
- первый валидный `RINGING`/`ACCEPT` выбирает сессию,
- сигналы с тем же `callId`, но от других сессий этого же пользователя, игнорируются.
Это защищает от гонок мультидевайса.
---
## 4) Принятие звонка на одном устройстве callee
1. На выбранном устройстве callee пользователь нажимает «Поднять»:
- отправляется `ACCEPT (120)` в выбранную сессию инициатора.
2. Сервер дополнительно рассылает на **другие** сессии этого же callee:
- `HANGUP (150)` с `data=accepted_on_other_device`.
3. Остальные устройства callee закрывают экран входящего вызова.
Итог: активным остаётся один путь «инициатор ↔ выбранная сессия callee».
---
## 5) Обмен SDP (OFFER/ANSWER)
После `ACCEPT`:
1. Инициатор формирует `RTCPeerConnection`, `createOffer()`, отправляет:
- `OFFER (200)`.
2. Calee применяет `setRemoteDescription(offer)`, делает `createAnswer()`, отправляет:
- `ANSWER (210)`.
3. Инициатор применяет `setRemoteDescription(answer)`.
Защиты:
- повторный `ACCEPT`/повторный старт `offer` игнорируется;
- `ANSWER` обрабатывается только для исходящего звонка;
- `ANSWER` без локального `offer` или дубликат в `stable` игнорируется.
---
## 6) Обмен ICE-кандидатами
`ICE (220)` может приходить раньше SDP. Поэтому:
- если `pc` ещё не создан — ICE кладётся в очередь;
- если `pc` есть, но `remoteDescription` ещё нет — ICE тоже в очередь;
- после установки `remoteDescription` очередь применяется (`addIceCandidate`).
Это устраняет race «remote description was null».
---
## 7) Завершение и ошибки
Нормальное завершение:
- `HANGUP (150)` от одной стороны → вторая завершает звонок.
Неуспех установки:
- сторона, у которой setup не удался, шлёт `HANGUP (150)` с `data=setup_failed:...`,
- вторая сторона сразу закрывает экран ожидания.
Также пишутся `CallDeliveryReport`:
- `call_connected`, `incoming_failed`, `outgoing_failed`, `unknown_error` и расширенная диагностика ICE/SDP.
---
## 8) Почему схема устойчива сейчас
Текущая устойчивость обеспечивается тремя правилами:
1. **Одна выбранная сессия callee** для исходящего звонка.
2. **Принятие на одном устройстве закрывает входящий на остальных** через сервер.
3. **ICE буферизуется до готовности SDP/PC**, а не ломает handshake.
Именно комбинация этих трёх пунктов закрывает основные причины «иногда дозванивается, иногда нет» при мультидевайсе.