Запрос подписок, но это версия уже не актуальна тк дальше буду переделывать блоки под новый формат
This commit is contained in:
AidarKC 2026-01-13 12:44:45 +03:00
parent 973a632b85
commit b7025dde59
5 changed files with 486 additions and 3 deletions

View File

@ -10,7 +10,7 @@ import java.util.Arrays;
import java.util.Objects; import java.util.Objects;
/** /**
* старый формат * старый формат -его надо поменять на новый формат
* *
* RAW (BigEndian): * RAW (BigEndian):
* [4] recordSize (int) = размер RAW (включая этот заголовок), БЕЗ signature+hash * [4] recordSize (int) = размер RAW (включая этот заголовок), БЕЗ signature+hash
@ -24,7 +24,7 @@ import java.util.Objects;
* [64] signature64 (Ed25519) * [64] signature64 (Ed25519)
* [32] hash32 (SHA-256) * [32] hash32 (SHA-256)
*/ */
СМОТРИ файл : "!!! TODO что бы не забыть"
/** /**
* BchBlockEntry универсальный блок нового формата. * BchBlockEntry универсальный блок нового формата.
* *
@ -35,12 +35,16 @@ import java.util.Objects;
* [4] blockNumber (int) номер блока * [4] blockNumber (int) номер блока
* [8] timestamp (long) unix seconds * [8] timestamp (long) unix seconds
* Само сообщение *
* [2] type - тип соощения * [2] type - тип соощения
* [2] Sиbtype - субтип сообщения * [2] Sиbtype - субтип сообщения
* [2] version - версия формата соощения * [2] version - версия формата соощения
*
*
* Дальше Само сообщение (может быть разным)
* [4] prevLineNumber НОМЕР ПРИВЕДУЩЕГО СООБЩЕНИЯ В ЛИНИИ - может быть а может и небыть в зависимости от типа сообщения * [4] prevLineNumber НОМЕР ПРИВЕДУЩЕГО СООБЩЕНИЯ В ЛИНИИ - может быть а может и небыть в зависимости от типа сообщения
* [32] prevLineHash ХЭШ ПРИВЕДУЩЕГО СООБЩЕНИЯ В ЛИНИИ - может быть а может и небыть в зависимости от типа сообщения * [32] prevLineHash ХЭШ ПРИВЕДУЩЕГО СООБЩЕНИЯ В ЛИНИИ - может быть а может и небыть в зависимости от типа сообщения
* [4] номер самого сообщения в этой линии
* [N] bodyBytes (ОСТАЛЬНЫЕ БАЙТЫ]) * [N] bodyBytes (ОСТАЛЬНЫЕ БАЙТЫ])
* TAIL (НЕ входит в recordSize): * TAIL (НЕ входит в recordSize):

View File

@ -0,0 +1,251 @@
package shine.db.dao;
import shine.db.MsgSubType;
import shine.db.SqliteDbController;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
/**
* SubscriptionsDAO агрегатный DAO для "каналов" (подписок).
*
* Возвращает по каждой активной подписке (FOLLOW) + "сам на себя":
* - login цели (channelLogin)
* - blockchainName цели (channelBchName)
* - count публикаций (TEXT_NEW)
* - last publication: bytes оригинального блока (для timestamp)
* - last publication: bytes актуального блока (edit или orig) для текста превью
*
* Важно:
* - это НЕ таблица => сущность результата хранится вложенным классом.
* - методы с Connection НЕ закрывают соединение
* - методы без Connection сами открывают и закрывают соединение
*/
public final class SubscriptionsDAO {
private static volatile SubscriptionsDAO instance;
private final SqliteDbController db = SqliteDbController.getInstance();
private SubscriptionsDAO() {}
public static SubscriptionsDAO getInstance() {
if (instance == null) {
synchronized (SubscriptionsDAO.class) {
if (instance == null) instance = new SubscriptionsDAO();
}
}
return instance;
}
/** Результат одной строки ("канал") для подписок. */
public static final class ChannelRow {
private final String channelLogin;
private final String channelBchName;
private final int publicationsCount;
/** Последняя публикация: global number (nullable если публикаций нет). */
private final Integer lastPublicationGlobalNumber;
/** Байты оригинальной публикации (FULL bytes блока) — для timestamp (nullable). */
private final byte[] lastPublicationBlockBytes;
/** Если публикация редактировалась: global number edit-блока (nullable). */
private final Integer lastEditGlobalNumber;
/** Байты edit-блока (FULL bytes блока) (nullable). */
private final byte[] lastEditBlockBytes;
public ChannelRow(String channelLogin,
String channelBchName,
int publicationsCount,
Integer lastPublicationGlobalNumber,
byte[] lastPublicationBlockBytes,
Integer lastEditGlobalNumber,
byte[] lastEditBlockBytes) {
this.channelLogin = channelLogin;
this.channelBchName = channelBchName;
this.publicationsCount = publicationsCount;
this.lastPublicationGlobalNumber = lastPublicationGlobalNumber;
this.lastPublicationBlockBytes = lastPublicationBlockBytes;
this.lastEditGlobalNumber = lastEditGlobalNumber;
this.lastEditBlockBytes = lastEditBlockBytes;
}
public String getChannelLogin() { return channelLogin; }
public String getChannelBchName() { return channelBchName; }
public int getPublicationsCount() { return publicationsCount; }
public Integer getLastPublicationGlobalNumber() { return lastPublicationGlobalNumber; }
public byte[] getLastPublicationBlockBytes() { return lastPublicationBlockBytes; }
public Integer getLastEditGlobalNumber() { return lastEditGlobalNumber; }
public byte[] getLastEditBlockBytes() { return lastEditBlockBytes; }
}
// В проекте msg_type=1 означает TEXT (у тебя это уже зафиксировано).
private static final int MSG_TYPE_TEXT = 1;
/**
* Получить список подписок (активные FOLLOW) + "сам на себя" и по каждой:
* - count публикаций (TEXT_NEW)
* - последнюю публикацию (orig bytes) + её edit (если есть)
*
* Поведение при 0 публикаций:
* - publications_count = 0
* - last_pub_* = NULL
* - last_edit_* = NULL
*/
public List<ChannelRow> getSubscribedChannels(Connection c, String requesterLogin) throws SQLException {
String sql = """
WITH subs AS (
-- 1) FOLLOW-каналы
SELECT
cs.to_login AS channel_login,
cs.to_bch_name AS channel_bch_name
FROM connections_state cs
WHERE cs.login = ?
AND cs.rel_type = ?
UNION
-- 2) self: все блокчейны пользователя (если их несколько)
SELECT
bs.login AS channel_login,
bs.blockchain_name AS channel_bch_name
FROM blockchain_state bs
WHERE bs.login = ?
),
pub_counts AS (
SELECT
b.login AS channel_login,
b.bch_name AS channel_bch_name,
COUNT(*) AS publications_count
FROM blocks b
JOIN subs s
ON s.channel_login = b.login
AND s.channel_bch_name = b.bch_name
WHERE b.msg_type = ?
AND b.msg_sub_type = ?
GROUP BY b.login, b.bch_name
),
last_pub AS (
SELECT
b.login AS channel_login,
b.bch_name AS channel_bch_name,
MAX(b.block_global_number) AS last_pub_global_number
FROM blocks b
JOIN subs s
ON s.channel_login = b.login
AND s.channel_bch_name = b.bch_name
WHERE b.msg_type = ?
AND b.msg_sub_type = ?
GROUP BY b.login, b.bch_name
),
last_pub_block AS (
SELECT
b.login AS channel_login,
b.bch_name AS channel_bch_name,
b.block_global_number AS last_pub_global_number,
b.block_byte AS last_pub_block_bytes,
b.edited_by_block_global_number AS last_edit_global_number
FROM blocks b
JOIN last_pub lp
ON lp.channel_login = b.login
AND lp.channel_bch_name = b.bch_name
AND lp.last_pub_global_number = b.block_global_number
),
last_edit_block AS (
SELECT
e.login AS channel_login,
e.bch_name AS channel_bch_name,
e.block_global_number AS last_edit_global_number,
e.block_byte AS last_edit_block_bytes
FROM blocks e
JOIN last_pub_block p
ON p.channel_login = e.login
AND p.channel_bch_name = e.bch_name
AND p.last_edit_global_number = e.block_global_number
)
SELECT
s.channel_login,
s.channel_bch_name,
COALESCE(pc.publications_count, 0) AS publications_count,
p.last_pub_global_number,
p.last_pub_block_bytes,
p.last_edit_global_number,
e.last_edit_block_bytes
FROM subs s
LEFT JOIN pub_counts pc
ON pc.channel_login = s.channel_login
AND pc.channel_bch_name = s.channel_bch_name
LEFT JOIN last_pub_block p
ON p.channel_login = s.channel_login
AND p.channel_bch_name = s.channel_bch_name
LEFT JOIN last_edit_block e
ON e.channel_login = s.channel_login
AND e.channel_bch_name = s.channel_bch_name
ORDER BY s.channel_login, s.channel_bch_name
""";
List<ChannelRow> out = new ArrayList<>();
try (PreparedStatement ps = c.prepareStatement(sql)) {
int i = 1;
// FOLLOW
ps.setString(i++, requesterLogin);
ps.setInt(i++, (int) MsgSubType.CONNECTION_FOLLOW);
// self
ps.setString(i++, requesterLogin);
// pub_counts
ps.setInt(i++, MSG_TYPE_TEXT);
ps.setInt(i++, (int) MsgSubType.TEXT_NEW);
// last_pub
ps.setInt(i++, MSG_TYPE_TEXT);
ps.setInt(i++, (int) MsgSubType.TEXT_NEW);
try (ResultSet rs = ps.executeQuery()) {
while (rs.next()) {
String channelLogin = rs.getString("channel_login");
String channelBchName = rs.getString("channel_bch_name");
int publicationsCount = rs.getInt("publications_count");
Integer lastPubGn = (Integer) rs.getObject("last_pub_global_number");
byte[] lastPubBytes = rs.getBytes("last_pub_block_bytes");
Integer lastEditGn = (Integer) rs.getObject("last_edit_global_number");
byte[] lastEditBytes = rs.getBytes("last_edit_block_bytes");
out.add(new ChannelRow(
channelLogin,
channelBchName,
publicationsCount,
lastPubGn,
lastPubBytes,
lastEditGn,
lastEditBytes
));
}
}
}
return out;
}
/** Вариант без внешнего соединения. Сам открывает/закрывает. */
public List<ChannelRow> getSubscribedChannels(String requesterLogin) throws SQLException {
try (Connection c = db.getConnection()) {
return getSubscribedChannels(c, requesterLogin);
}
}
}

View File

@ -0,0 +1,147 @@
package server.logic.ws_protocol.JSON.handlers.subscriptions;
import blockchain.BchBlockEntry;
import blockchain.body.TextBody;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import server.logic.ws_protocol.JSON.ConnectionContext;
import server.logic.ws_protocol.JSON.entyties.Net_Request;
import server.logic.ws_protocol.JSON.entyties.Net_Response;
import server.logic.ws_protocol.JSON.handlers.JsonMessageHandler;
import server.logic.ws_protocol.JSON.handlers.subscriptions.entyties.Net_GetSubscribedChannels_Request;
import server.logic.ws_protocol.JSON.handlers.subscriptions.entyties.Net_GetSubscribedChannels_Response;
import server.logic.ws_protocol.JSON.utils.NetExceptionResponseFactory;
import server.logic.ws_protocol.WireCodes;
import shine.db.SqliteDbController;
import shine.db.dao.SubscriptionsDAO;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
/**
* Handler: GetSubscribedChannels
*
* Логика:
* - DAO возвращает last publication orig bytes (+ edit bytes если есть)
* - Handler парсит FULL bytes блока:
* timestamp берём из ОРИГИНАЛА (publication)
* текст берём из EDIT (если есть) иначе из оригинала
* - формируем превью первых 50 символов
*/
public class Net_GetSubscribedChannels_Handler implements JsonMessageHandler {
private static final Logger log = LoggerFactory.getLogger(Net_GetSubscribedChannels_Handler.class);
@Override
public Net_Response handle(Net_Request baseRequest, ConnectionContext ctx) {
Net_GetSubscribedChannels_Request req = (Net_GetSubscribedChannels_Request) baseRequest;
if (req.getLogin() == null || req.getLogin().isBlank()) {
return NetExceptionResponseFactory.error(
req,
WireCodes.Status.BAD_REQUEST,
"BAD_FIELDS",
"Некорректное поле: login"
);
}
// Если хочешь жёстче:
// if (!req.getLogin().matches("^[A-Za-z0-9_]+$")) ...
SubscriptionsDAO dao = SubscriptionsDAO.getInstance();
SqliteDbController db = SqliteDbController.getInstance();
try (Connection c = db.getConnection()) {
List<SubscriptionsDAO.ChannelRow> rows = dao.getSubscribedChannels(c, req.getLogin());
List<Net_GetSubscribedChannels_Response.ChannelInfo> out = new ArrayList<>(rows.size());
for (SubscriptionsDAO.ChannelRow r : rows) {
Net_GetSubscribedChannels_Response.ChannelInfo dto =
new Net_GetSubscribedChannels_Response.ChannelInfo();
dto.setChannelLogin(r.getChannelLogin());
dto.setChannelBchName(r.getChannelBchName());
dto.setPublicationsCount(r.getPublicationsCount());
byte[] pubBytes = r.getLastPublicationBlockBytes();
byte[] editBytes = r.getLastEditBlockBytes();
if (pubBytes == null || pubBytes.length == 0) {
dto.setLastPublicationTimestampSec(null);
dto.setLastTextPreview(null);
out.add(dto);
continue;
}
// 1) timestamp берём из ОРИГИНАЛЬНОЙ публикации
BchBlockEntry pubBlock = new BchBlockEntry(pubBytes);
dto.setLastPublicationTimestampSec(pubBlock.timestamp);
// 2) текст из EDIT (если есть) иначе из оригинала
byte[] actualBytes = (editBytes != null && editBytes.length > 0) ? editBytes : pubBytes;
BchBlockEntry actualBlock = new BchBlockEntry(actualBytes);
if (!(actualBlock.body instanceof TextBody)) {
// Это уже нарушение данных: last publication должен быть текстовым блоком.
throw new IllegalStateException("Last publication is not TextBody: type="
+ (actualBlock.body == null ? "null" : (actualBlock.body.type() & 0xFFFF)));
}
String msg = ((TextBody) actualBlock.body).message;
dto.setLastTextPreview(firstNCharsSafe(msg, 50));
out.add(dto);
}
Net_GetSubscribedChannels_Response resp = new Net_GetSubscribedChannels_Response();
resp.setOp(req.getOp());
resp.setRequestId(req.getRequestId());
resp.setStatus(WireCodes.Status.OK);
resp.setChannels(out);
return resp;
} catch (SQLException e) {
log.error("❌ DB error GetSubscribedChannels", e);
return NetExceptionResponseFactory.error(
req,
WireCodes.Status.SERVER_DATA_ERROR,
"DB_ERROR",
"Ошибка БД"
);
} catch (IllegalArgumentException e) {
// сюда попадёт, например, если BchBlockEntry не смог распарсить block_byte
log.error("❌ Bad block bytes in DB (cannot parse BchBlockEntry)", e);
return NetExceptionResponseFactory.error(
req,
WireCodes.Status.SERVER_DATA_ERROR,
"BAD_BLOCK_BYTES",
"В БД обнаружен повреждённый блок"
);
} catch (Exception e) {
log.error("❌ Internal error GetSubscribedChannels", e);
return NetExceptionResponseFactory.error(
req,
WireCodes.Status.INTERNAL_ERROR,
"INTERNAL_ERROR",
"Внутренняя ошибка сервера"
);
}
}
/**
* Берём первые N "символов" безопасно для emoji/суррогатных пар:
* режем по code points.
*/
private static String firstNCharsSafe(String s, int n) {
if (s == null) return null;
if (n <= 0) return "";
int cp = s.codePointCount(0, s.length());
if (cp <= n) return s;
int end = s.offsetByCodePoints(0, n);
return s.substring(0, end);
}
}

View File

@ -0,0 +1,23 @@
package server.logic.ws_protocol.JSON.handlers.subscriptions.entyties;
import server.logic.ws_protocol.JSON.entyties.Net_Request;
/**
* Запрос GetSubscribedChannels.
*
* Клиент отправляет:
* {
* "op": "GetSubscribedChannels",
* "requestId": "....",
* "payload": {
* "login": "anya"
* }
* }
*/
public class Net_GetSubscribedChannels_Request extends Net_Request {
private String login;
public String getLogin() { return login; }
public void setLogin(String login) { this.login = login; }
}

View File

@ -0,0 +1,58 @@
package server.logic.ws_protocol.JSON.handlers.subscriptions.entyties;
import server.logic.ws_protocol.JSON.entyties.Net_Response;
import java.util.List;
/**
* Ответ GetSubscribedChannels.
*
* payload:
* {
* "channels": [
* {
* "channelLogin": "dima",
* "channelBchName": "dima-001",
* "publicationsCount": 123,
* "lastPublicationTimestampSec": 1736371200,
* "lastTextPreview": "...."
* }
* ]
* }
*/
public class Net_GetSubscribedChannels_Response extends Net_Response {
private List<ChannelInfo> channels;
public List<ChannelInfo> getChannels() { return channels; }
public void setChannels(List<ChannelInfo> channels) { this.channels = channels; }
public static class ChannelInfo {
private String channelLogin;
private String channelBchName;
private Integer publicationsCount;
/** Unix seconds времени ПУБЛИКАЦИИ (оригинального TEXT_NEW). Nullable, если публикаций нет. */
private Long lastPublicationTimestampSec;
/** Первые 50 символов актуального текста (edit или orig). Nullable, если публикаций нет. */
private String lastTextPreview;
public String getChannelLogin() { return channelLogin; }
public void setChannelLogin(String channelLogin) { this.channelLogin = channelLogin; }
public String getChannelBchName() { return channelBchName; }
public void setChannelBchName(String channelBchName) { this.channelBchName = channelBchName; }
public Integer getPublicationsCount() { return publicationsCount; }
public void setPublicationsCount(Integer publicationsCount) { this.publicationsCount = publicationsCount; }
public Long getLastPublicationTimestampSec() { return lastPublicationTimestampSec; }
public void setLastPublicationTimestampSec(Long lastPublicationTimestampSec) { this.lastPublicationTimestampSec = lastPublicationTimestampSec; }
public String getLastTextPreview() { return lastTextPreview; }
public void setLastTextPreview(String lastTextPreview) { this.lastTextPreview = lastTextPreview; }
}
}