Добавить startup recovery для resync цепочек
This commit is contained in:
parent
be4f76834a
commit
c048347f2e
@ -82,7 +82,52 @@
|
||||
- цепочка подтягивается заново с `0` через `GetBlockchainBlock`.
|
||||
- обычный `AddBlock` на эту цепочку в этот момент возвращает `chain_resync_in_progress`.
|
||||
|
||||
### 4.4 Зачем понадобился `GetSyncUserProfile`
|
||||
### 4.4 Как именно работает full resync
|
||||
|
||||
Full resync запускается только тогда, когда:
|
||||
|
||||
- локальная chain отстаёт и обычная докачка хвоста упирается в `bad_prev_hash` или `bad_block_number`;
|
||||
- либо высота цепочек одинаковая, но удалённая версия сильнее по правилу:
|
||||
- `lastBlockNumber`;
|
||||
- `fileSizeBytes`;
|
||||
- `lastBlockHash`.
|
||||
|
||||
Порядок действий:
|
||||
|
||||
1. Ставится in-memory guard на `blockchainName`.
|
||||
2. Создаётся marker-file `<blockchainName>.resync_pending`.
|
||||
3. Обычный `AddBlock` на эту chain временно получает `chain_resync_in_progress`.
|
||||
4. Вызывается атомарный SQL cleanup одной chain:
|
||||
- уменьшаются чужие `likes_count` и `replies_count`;
|
||||
- удаляются локальные derived-state записи этой chain;
|
||||
- удаляются `blocks` и `blockchain_state` этой chain.
|
||||
5. Удаляются файлы `<blockchainName>.bch` и `<blockchainName>.tmp_bch`.
|
||||
6. Локальная chain создаётся заново через `GetSyncUserProfile` или через Solana import, если `sync.importUserProfileFromPartner.enabled=false`.
|
||||
7. Chain replay-ится с `0` через `GetBlockchainBlock`.
|
||||
8. Если всё прошло успешно, marker-file удаляется.
|
||||
9. Если на любом шаге произошёл сбой, marker-file остаётся на диске, и сервер добивает эту chain при следующем старте.
|
||||
|
||||
Важно:
|
||||
|
||||
- full resync не делает умный rollback по одному блоку;
|
||||
- full resync не трогает DM-таблицы и `solana_users`;
|
||||
- висячие cross-chain ссылки считаются допустимым поведением системы.
|
||||
|
||||
### 4.5 Startup recovery по marker-file
|
||||
|
||||
При старте сервер идёт в таком порядке:
|
||||
|
||||
1. `BlockchainTmpRecoveryOnStartup` для `*.tmp_bch`;
|
||||
2. `BlockchainResyncRecoveryOnStartup` для `*.resync_pending`;
|
||||
3. только потом поднимается обычный сервер и запускается `PeriodicBlockchainSyncService`.
|
||||
|
||||
Если marker-file существует:
|
||||
|
||||
- сервер не должен начинать обычную работу поверх этой chain;
|
||||
- recovery снова выполняет cleanup и replay с нуля;
|
||||
- если recovery не завершился, marker остаётся, и сервер не переходит к обычному режиму для этой chain.
|
||||
|
||||
### 4.6 Зачем понадобился `GetSyncUserProfile`
|
||||
|
||||
Изначально подготовка локальной цепочки делалась через Solana:
|
||||
|
||||
@ -100,7 +145,7 @@
|
||||
|
||||
Это временная практическая заплатка, чтобы clean-start sync не зависел от rate limit внешнего Solana endpoint.
|
||||
|
||||
### 4.5 Что делает настройка `sync.importUserProfileFromPartner.enabled`
|
||||
### 4.7 Что делает настройка `sync.importUserProfileFromPartner.enabled`
|
||||
|
||||
- `false` — стандартный режим, подготовка локального пользователя идёт через Solana PDA;
|
||||
- `true` — sync-режим обхода Solana, локальный пользователь создаётся по server-to-server `GetSyncUserProfile`.
|
||||
@ -169,11 +214,12 @@
|
||||
| Push блоков блокчейна партнёрам | ✅ Реализована базовая one-shot версия |
|
||||
| Periodic backfill отсутствующего хвоста | ✅ Реализовано |
|
||||
| Разрешение рассинхрона / divergence | ✅ Реализована базовая full-resync схема во время periodic sync |
|
||||
| Startup recovery по `*.resync_pending` marker-file | ✅ Реализовано |
|
||||
| Маршрутизация DM через access_servers | Нужна реализация (заглушка) |
|
||||
|
||||
Текущая версия сервера уже умеет базовую синхронизацию блокчейнов между партнёрами.
|
||||
Не реализованы ещё DM-sync, постоянные server-to-server соединения и recovery при старте по marker-file для resync.
|
||||
Не реализованы ещё DM-sync и постоянные server-to-server соединения.
|
||||
|
||||
Следующие отдельные шаги после текущего этапа:
|
||||
- добавить startup recovery по marker-file для resync-цепочек;
|
||||
- вернуть обычному `AddBlock` настоящую `tmp_bch`-схему записи и recovery при резком рестарте.
|
||||
- отдельно проверить full-resync и startup-recovery на реальном тестовом прогоне после ручного удаления БД/файлов.
|
||||
|
||||
@ -0,0 +1,17 @@
|
||||
# Startup recovery для marker-file resync цепочек
|
||||
|
||||
- Краткое описание:
|
||||
- При старте сервер сканирует `data/*.resync_pending` и добивает незавершённые full-resync цепочки перед тем, как перейти к обычной работе.
|
||||
- Пока marker-file не убран, обычный `AddBlock` в эту цепочку возвращает `chain_resync_in_progress`.
|
||||
|
||||
- Что именно проверять:
|
||||
- Сервер не начинает обычный `periodic sync`, пока не отработают все marker-file recovery.
|
||||
- Если marker-файлы есть, startup-blocking реально останавливает нормальный старт до их обработки.
|
||||
- Если восстановление одной цепочки падает, marker остаётся на диске и сервер не переходит в обычный режим.
|
||||
- После успешного восстановления marker удаляется.
|
||||
|
||||
- Ожидаемый результат:
|
||||
- После внезапной перезагрузки сервер не продолжает обычную работу с поломанной цепочкой, а сначала добивает незавершённый resync.
|
||||
|
||||
- Статус:
|
||||
- `pending`
|
||||
@ -171,6 +171,10 @@ public final class FileStoreUtil {
|
||||
deleteIfExists(resolveBlockchainResyncMarkerPath(blockchainName));
|
||||
}
|
||||
|
||||
public boolean existsBlockchainResyncMarker(String blockchainName) {
|
||||
return exists(buildBlockchainResyncMarkerFileName(blockchainName));
|
||||
}
|
||||
|
||||
/**
|
||||
* Атомарно заменить основной файл блокчейна временным:
|
||||
* <name>.tmp_bch -> <name>.bch
|
||||
|
||||
@ -4,6 +4,8 @@ import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import utils.files.FileStoreUtil;
|
||||
|
||||
/**
|
||||
* In-memory guard для цепочек, которые сейчас находятся в полном resync.
|
||||
*
|
||||
@ -34,7 +36,13 @@ public final class BlockchainResyncGuard {
|
||||
public static boolean isBlockedForExternalAddBlock(String blockchainName) {
|
||||
String key = normalize(blockchainName);
|
||||
if (key == null) return false;
|
||||
return ACTIVE.contains(key) && !isBypassed(key);
|
||||
if (isBypassed(key)) {
|
||||
return false;
|
||||
}
|
||||
if (ACTIVE.contains(key)) {
|
||||
return true;
|
||||
}
|
||||
return FileStoreUtil.getInstance().existsBlockchainResyncMarker(key);
|
||||
}
|
||||
|
||||
public static <T> T withBypass(String blockchainName, ThrowingSupplier<T> supplier) throws Exception {
|
||||
|
||||
@ -0,0 +1,149 @@
|
||||
package server.sync;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import shine.db.entities.SyncServerEntry;
|
||||
import utils.files.FileStoreUtil;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.DirectoryStream;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Startup-recovery для цепочек, которые были помечены как resync-pending.
|
||||
*
|
||||
* Правило простое:
|
||||
* - marker-file означает, что chain должен быть пересобран заново;
|
||||
* - мы не пытаемся продолжать с середины;
|
||||
* - если recovery не завершён, обычная работа сервера не стартует.
|
||||
*/
|
||||
public final class BlockchainResyncRecoveryOnStartup {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(BlockchainResyncRecoveryOnStartup.class);
|
||||
private static final RemoteBlockchainSyncClient REMOTE = new RemoteBlockchainSyncClient();
|
||||
|
||||
private BlockchainResyncRecoveryOnStartup() {}
|
||||
|
||||
public static void runRecoveryOrThrow() {
|
||||
Path dataDir = Paths.get(FileStoreUtil.DATA_DIR_NAME);
|
||||
ensureDirExists(dataDir);
|
||||
|
||||
List<Path> markers = listMarkerFiles(dataDir);
|
||||
if (markers.isEmpty()) {
|
||||
log.info("🟢 BlockchainResyncRecovery: resync marker-файлы не найдены.");
|
||||
return;
|
||||
}
|
||||
|
||||
log.warn("🟡 BlockchainResyncRecovery: найдено marker-файлов: {}", markers.size());
|
||||
for (Path marker : markers) {
|
||||
recoverSingleMarkerOrThrow(marker);
|
||||
}
|
||||
log.info("✅ BlockchainResyncRecovery: все marker-файлы обработаны.");
|
||||
}
|
||||
|
||||
private static void recoverSingleMarkerOrThrow(Path markerPath) {
|
||||
String fileName = markerPath.getFileName().toString();
|
||||
String blockchainName = extractBlockchainName(fileName);
|
||||
if (blockchainName == null || blockchainName.isBlank()) {
|
||||
throw new IllegalStateException("Bad resync marker name: " + fileName);
|
||||
}
|
||||
|
||||
Map<String, String> meta = parseMarker(markerPath);
|
||||
String partnerLogin = normalize(meta.get("partnerLogin"));
|
||||
String partnerAddress = normalize(meta.get("partnerAddress"));
|
||||
if (partnerAddress == null) {
|
||||
throw new IllegalStateException("Resync marker has no partnerAddress for blockchainName=" + blockchainName);
|
||||
}
|
||||
|
||||
log.warn("🔁 BlockchainResyncRecovery: processing marker blockchainName={} partnerLogin={} partnerAddress={}",
|
||||
blockchainName, partnerLogin, partnerAddress);
|
||||
|
||||
try {
|
||||
List<RemoteBlockchainSyncClient.RemoteBlockchainHead> heads = REMOTE.listBlockchainHeads(partnerAddress);
|
||||
RemoteBlockchainSyncClient.RemoteBlockchainHead remoteHead = heads.stream()
|
||||
.filter(h -> h != null && blockchainName.equals(h.blockchainName()))
|
||||
.findFirst()
|
||||
.orElseThrow(() -> new IllegalStateException(
|
||||
"Partner does not expose blockchainName=" + blockchainName + " address=" + partnerAddress));
|
||||
|
||||
SyncServerEntry partner = new SyncServerEntry(
|
||||
partnerLogin == null ? "" : partnerLogin,
|
||||
partnerAddress,
|
||||
System.currentTimeMillis()
|
||||
);
|
||||
|
||||
PeriodicBlockchainSyncService.resyncFromScratch(partner, remoteHead);
|
||||
} catch (Exception e) {
|
||||
throw new IllegalStateException(
|
||||
"Не удалось восстановить resync-помеченную цепочку blockchainName=" + blockchainName,
|
||||
e
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private static List<Path> listMarkerFiles(Path dataDir) {
|
||||
try (DirectoryStream<Path> ds = Files.newDirectoryStream(dataDir, "*" + FileStoreUtil.BLOCKCHAIN_RESYNC_MARKER_EXTENSION)) {
|
||||
return stream(ds).toList();
|
||||
} catch (IOException e) {
|
||||
throw new IllegalStateException("Cannot list resync markers in: " + dataDir, e);
|
||||
}
|
||||
}
|
||||
|
||||
private static java.util.stream.Stream<Path> stream(DirectoryStream<Path> ds) {
|
||||
return java.util.stream.StreamSupport.stream(ds.spliterator(), false)
|
||||
.filter(Files::isRegularFile);
|
||||
}
|
||||
|
||||
private static Map<String, String> parseMarker(Path path) {
|
||||
try {
|
||||
List<String> lines = Files.readAllLines(path, StandardCharsets.UTF_8);
|
||||
Map<String, String> result = new HashMap<>();
|
||||
for (String line : lines) {
|
||||
if (line == null) continue;
|
||||
String s = line.trim();
|
||||
if (s.isEmpty() || s.startsWith("#")) continue;
|
||||
int idx = s.indexOf('=');
|
||||
if (idx <= 0) continue;
|
||||
String key = s.substring(0, idx).trim();
|
||||
String value = s.substring(idx + 1).trim();
|
||||
result.put(key, value);
|
||||
}
|
||||
return result;
|
||||
} catch (IOException e) {
|
||||
throw new IllegalStateException("Cannot read resync marker: " + path, e);
|
||||
}
|
||||
}
|
||||
|
||||
private static void ensureDirExists(Path dir) {
|
||||
try {
|
||||
if (!Files.exists(dir)) {
|
||||
Files.createDirectories(dir);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new IllegalStateException("Cannot create data dir: " + dir, e);
|
||||
}
|
||||
}
|
||||
|
||||
private static String extractBlockchainName(String fileName) {
|
||||
if (fileName == null) return null;
|
||||
String s = fileName.trim();
|
||||
if (!s.endsWith(FileStoreUtil.BLOCKCHAIN_RESYNC_MARKER_EXTENSION)) {
|
||||
return null;
|
||||
}
|
||||
return s.substring(0, s.length() - FileStoreUtil.BLOCKCHAIN_RESYNC_MARKER_EXTENSION.length());
|
||||
}
|
||||
|
||||
private static String normalize(String value) {
|
||||
if (value == null) return null;
|
||||
String s = value.trim();
|
||||
return s.isEmpty() ? null : s.toLowerCase(Locale.ROOT);
|
||||
}
|
||||
}
|
||||
@ -199,7 +199,7 @@ public final class PeriodicBlockchainSyncService {
|
||||
partnerLogin, remoteHead.blockchainName(), fromBlockNumber, remoteHead.lastBlockNumber());
|
||||
}
|
||||
|
||||
private static void resyncFromScratch(
|
||||
static void resyncFromScratch(
|
||||
SyncServerEntry partner,
|
||||
RemoteBlockchainSyncClient.RemoteBlockchainHead remoteHead
|
||||
) throws Exception {
|
||||
@ -237,6 +237,7 @@ public final class PeriodicBlockchainSyncService {
|
||||
|
||||
ReentrantLock chainLock = BlockchainLocks.lockFor(blockchainName);
|
||||
chainLock.lock();
|
||||
boolean success = false;
|
||||
try {
|
||||
BlockchainResyncCleanupDAO.CleanupResult cleanup =
|
||||
RESYNC_CLEANUP_DAO.cleanupBlockchainForFullResync(blockchainName);
|
||||
@ -252,25 +253,26 @@ public final class PeriodicBlockchainSyncService {
|
||||
FILE_STORE.deleteBlockchainTmpFileIfExists(blockchainName);
|
||||
|
||||
if (!ensureLocalChainExists(partner, blockchainName)) {
|
||||
log.warn("Blockchain resync aborted: failed to recreate local chain state. partner={} blockchainName={}",
|
||||
partnerLogin, blockchainName);
|
||||
return;
|
||||
throw new IllegalStateException("failed to recreate local chain state for " + blockchainName);
|
||||
}
|
||||
|
||||
boolean replayOk = replayRemoteChainFromStart(partner, remoteHead);
|
||||
if (replayOk) {
|
||||
log.info("Blockchain resync completed: partner={} blockchainName={} blocks=0..{}",
|
||||
partnerLogin, blockchainName, remoteHead.lastBlockNumber());
|
||||
if (!replayOk) {
|
||||
throw new IllegalStateException("failed to replay remote chain for " + blockchainName);
|
||||
}
|
||||
|
||||
FILE_STORE.deleteBlockchainResyncMarkerIfExists(blockchainName);
|
||||
success = true;
|
||||
log.info("Blockchain resync completed: partner={} blockchainName={} blocks=0..{}",
|
||||
partnerLogin, blockchainName, remoteHead.lastBlockNumber());
|
||||
} finally {
|
||||
try {
|
||||
FILE_STORE.deleteBlockchainResyncMarkerIfExists(blockchainName);
|
||||
} catch (Exception e) {
|
||||
log.warn("Blockchain resync: failed to delete marker file blockchainName={}", blockchainName, e);
|
||||
}
|
||||
chainLock.unlock();
|
||||
BlockchainResyncGuard.end(blockchainName);
|
||||
}
|
||||
|
||||
if (!success) {
|
||||
throw new IllegalStateException("Blockchain resync did not complete for " + blockchainName);
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean replayRemoteChainFromStart(
|
||||
|
||||
@ -6,6 +6,7 @@ import org.eclipse.jetty.websocket.server.config.JettyWebSocketServletContainerI
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import server.debug.DebugApiConfigurator;
|
||||
import server.sync.BlockchainResyncRecoveryOnStartup;
|
||||
import server.sync.PeriodicBlockchainSyncService;
|
||||
import server.sync.SyncServersBootstrapService;
|
||||
import utils.config.AppConfig;
|
||||
@ -38,6 +39,16 @@ public final class WsServer {
|
||||
throw e; // останавливаем запуск
|
||||
}
|
||||
|
||||
// ============================================================
|
||||
// 0.1) Восстановление цепочек, зависших на full resync
|
||||
// ============================================================
|
||||
try {
|
||||
BlockchainResyncRecoveryOnStartup.runRecoveryOrThrow();
|
||||
} catch (Exception e) {
|
||||
log.error("❌ Сервер НЕ будет запущен: критическая ошибка восстановления blockchain resync-маркеров.", e);
|
||||
throw e;
|
||||
}
|
||||
|
||||
// ============================================================
|
||||
// 1) Настройки порта
|
||||
// ============================================================
|
||||
|
||||
@ -1,2 +1,2 @@
|
||||
client.version=1.2.274
|
||||
server.version=1.2.254
|
||||
client.version=1.2.275
|
||||
server.version=1.2.255
|
||||
|
||||
Loading…
Reference in New Issue
Block a user