Документировать API и сервис агента-кодера

This commit is contained in:
AidarKC 2026-05-24 08:04:44 +03:00
parent aa2644d812
commit 4b371e142d
53 changed files with 4317 additions and 18 deletions

View File

@ -8,6 +8,11 @@
## Примечание ## Примечание
- Если внешний инструмент/интеграция требует английский формат, допускается английский, но рядом желательно дать краткое пояснение на русском. - Если внешний инструмент/интеграция требует английский формат, допускается английский, но рядом желательно дать краткое пояснение на русском.
## Сервис агента-кодера
- В проекте есть локальный Telegram-бот-сервис агента-кодера в папке `SHiNE-agent-bot-coder/`.
- Сервис принимает сообщения из Telegram, ведёт историю диалога, ставит задачи в очередь и вызывает Codex CLI для обработки запросов по проекту.
- Подробные правила работы сервиса, его очередь, история, systemd-запуск и особенности ответов описывать в `SHiNE-agent-bot-coder/AGENT.md`.
## Документация блокчейна ## Документация блокчейна
- Актуальная документация по форматам блокчейна находится в `Dev_Docs/Blockchain/README.md`. - Актуальная документация по форматам блокчейна находится в `Dev_Docs/Blockchain/README.md`.
- Это точка входа (оглавление), рядом расположены детальные файлы по форматам, типам каналов и командным сообщениям. - Это точка входа (оглавление), рядом расположены детальные файлы по форматам, типам каналов и командным сообщениям.
@ -24,6 +29,13 @@
- Логика личных сообщений в коде должна всегда соответствовать `Dev_Docs/Personal_Messages/README.md`. - Логика личных сообщений в коде должна всегда соответствовать `Dev_Docs/Personal_Messages/README.md`.
- Документ по личным сообщениям обязан поддерживаться в актуальном состоянии. - Документ по личным сообщениям обязан поддерживаться в актуальном состоянии.
## Документация API сервера
- Актуальная документация по публичному JSON/WebSocket API сервера находится в `Dev_Docs/API/`.
- При любом изменении серверного API/эндпоинтов/операций `op` обязательно обновлять соответствующие документы в `Dev_Docs/API/`.
- Перед изменением самого серверного API обязательно явно предупредить пользователя, какие операции, поля запросов/ответов или коды ошибок будут изменены, и запросить отдельное подтверждение.
- Без явного подтверждения пользователя формат серверного API не менять; допускается только приведение документации в соответствие уже существующему коду.
- Если добавляется новая операция `op`, нужно обновить общий список операций в `Dev_Docs/API/09_Operations_Index.md` или создать его, если файла ещё нет.
## Известная проблема (временная пометка) ## Известная проблема (временная пометка)
- Мнения по связям пользователя (`known_person`, `shine_confirmed`, `shine_seen`) в UI могут отображаться нестабильно. - Мнения по связям пользователя (`known_person`, `shine_confirmed`, `shine_seen`) в UI могут отображаться нестабильно.
- Требуется отдельная доработка и финальная проверка end-to-end: запись мнения в блокчейн → обновление `connections_state` → ответ `GetUserConnectionsGraph` → отображение в UI. - Требуется отдельная доработка и финальная проверка end-to-end: запись мнения в блокчейн → обновление `connections_state` → ответ `GetUserConnectionsGraph` → отображение в UI.
@ -77,10 +89,6 @@
- После подтверждения, что фича проверена и работает корректно, соответствующий файл удалять. - После подтверждения, что фича проверена и работает корректно, соответствующий файл удалять.
- В `Dev_Docs/Pending_Features/README.md` вести краткий регламент и поддерживать актуальность. - В `Dev_Docs/Pending_Features/README.md` вести краткий регламент и поддерживать актуальность.
## Коммуникация в начале нового чата
- В начале каждого нового чата (в первом ответе пользователю) дополнительно сообщать, сколько сейчас недопроверенных фич лежит в `Dev_Docs/Pending_Features/` (без учёта `README.md`).
- В том же первом ответе обязательно уточнять у пользователя, проверил ли он эти фичи и можно ли пометить их как завершённые (удалить соответствующие файлы).
## Коммуникация по новым задачам (обязательно) ## Коммуникация по новым задачам (обязательно)
- При получении нового задания сначала кратко пересказать задачу своими словами. - При получении нового задания сначала кратко пересказать задачу своими словами.
- До начала реализации задать недостающие уточняющие вопросы (если они есть). - До начала реализации задать недостающие уточняющие вопросы (если они есть).

View File

@ -123,7 +123,17 @@
--- ---
## 8. Короткое резюме ## 8. Источник истины по списку операций
Фактический список публичных WebSocket-операций берётся из:
- `shine-server-net-protocol/src/main/java/server/logic/ws_protocol/JSON/JsonHandlerRegistry.java`.
Если операция зарегистрирована в `HANDLERS` и `REQUEST_TYPES`, она считается доступной через JSON/WebSocket API. Общий актуальный индекс таких операций поддерживается в `Dev_Docs/API/09_Operations_Index.md`.
---
## 9. Короткое резюме
- Запросы всегда идут как `op + requestId + payload`. - Запросы всегда идут как `op + requestId + payload`.
- Ответы всегда идут как `op + requestId + status + ok + payload`. - Ответы всегда идут как `op + requestId + status + ok + payload`.

View File

@ -2,10 +2,11 @@
Этот файл описывает временный раздел API, связанный с заведением пользователя на сервере и проверкой, существует ли пользователь. Этот файл описывает временный раздел API, связанный с заведением пользователя на сервере и проверкой, существует ли пользователь.
Сейчас здесь два метода: Сейчас здесь три метода:
- `AddUser` — временная серверная регистрация пользователя; - `AddUser` — временная серверная регистрация пользователя;
- `GetUser` — временная серверная проверка существования пользователя и чтение его базовых данных. - `GetUser` — временная серверная проверка существования пользователя и чтение его базовых данных;
- `SearchUsers` — dev/test поиск логинов по префиксу.
Их логика пока вспомогательная и dev-oriented: сервер сам хранит эти данные локально и сам отвечает на existence-check. В будущем оба сценария должны быть заменены на нормальную работу напрямую через Solana, но пока этот контракт нужен клиентам для разработки и интеграции. Их логика пока вспомогательная и dev-oriented: сервер сам хранит эти данные локально и сам отвечает на existence-check. В будущем оба сценария должны быть заменены на нормальную работу напрямую через Solana, но пока этот контракт нужен клиентам для разработки и интеграции.
@ -166,8 +167,49 @@
--- ---
## 3. Короткое резюме ## 3. Операция `SearchUsers`
### Назначение
Поиск пользователей по префиксу логина. Операция зарегистрирована в серверном API и используется как вспомогательная dev/test операция.
### Запрос
```json
{
"op": "SearchUsers",
"requestId": "search-001",
"payload": {
"prefix": "an"
}
}
```
### Успешный ответ
```json
{
"op": "SearchUsers",
"requestId": "search-001",
"status": 200,
"ok": true,
"payload": {
"logins": ["anya", "andrey"]
}
}
```
### Специфические коды ошибок `SearchUsers`
- `400 / BAD_FIELDS` — некорректный или пустой `prefix`.
- `501 / DB_ERROR` — ошибка БД при поиске.
- `500 / INTERNAL_ERROR` — непредвиденная внутренняя ошибка сервера.
---
## 4. Короткое резюме
- `AddUser` — временная регистрация пользователя на сервере. - `AddUser` — временная регистрация пользователя на сервере.
- `GetUser` — временная проверка существования пользователя на сервере. - `GetUser` — временная проверка существования пользователя на сервере.
- `SearchUsers` — временный поиск пользователей по префиксу.
- И регистрация, и existence-check позже должны быть переведены на Solana. - И регистрация, и existence-check позже должны быть переведены на Solana.

View File

@ -1,11 +1,15 @@
# API для разработчиков: Технические запросы # API для разработчиков: Технические запросы
Этот файл описывает технические запросы, которые не требуют авторизации и нужны для служебной работы клиента с сервером. Этот файл описывает технические WebSocket-запросы, которые нужны для служебной работы клиента с сервером. Часть операций доступна без авторизации, часть требует успешной авторизованной сессии.
Сейчас здесь два метода: Сейчас здесь шесть методов:
- `Ping` — keep-alive запрос для поддержания живого WebSocket-соединения; - `Ping` — keep-alive запрос для поддержания живого WebSocket-соединения;
- `GetServerInfo` — запрос базовой публичной информации о сервере для выбора узла в децентрализованной сети. - `GetServerInfo` — запрос базовой публичной информации о сервере для выбора узла в децентрализованной сети;
- `GetCallIceConfig` — выдача STUN/TURN конфигурации для звонков;
- `ClientErrorLog` — отправка клиентской ошибки в серверный лог;
- `ClientDebugLog` — отправка клиентского debug-события в серверный буфер;
- `CallDeliveryReport` — диагностический отчёт клиента о доставке/установке звонка.
Логика раздела такая: Логика раздела такая:
@ -130,14 +134,186 @@
--- ---
## 3. Короткое резюме ## 3. `GetCallIceConfig`
Доступно только после успешной авторизации.
### Запрос
```json
{
"op": "GetCallIceConfig",
"requestId": "ice-001",
"payload": {
}
}
```
### Успешный ответ
```json
{
"op": "GetCallIceConfig",
"requestId": "ice-001",
"status": 200,
"ok": true,
"payload": {
"stunUrls": ["stun:stun.example.org:3478"],
"turnUrls": ["turn:turn.example.org:3478?transport=udp"],
"turnUsername": "user",
"turnPassword": "password",
"turnServers": [
{
"id": "primary",
"urls": ["turn:turn.example.org:3478?transport=udp"],
"username": "user",
"password": "password"
}
],
"turnEnabled": true,
"generatedAtMs": 1774700000123,
"expiresAtMs": 1774700300123,
"ttlSec": 300
}
}
```
### Специфические коды ошибок `GetCallIceConfig`
- `422 / NOT_AUTHENTICATED` — требуется авторизация.
---
## 4. `ClientErrorLog`
### Запрос
```json
{
"op": "ClientErrorLog",
"requestId": "err-001",
"payload": {
"kind": "global_error",
"message": "TypeError: failed",
"stack": "...",
"sourceUrl": "https://shineup.me/app.js",
"lineNumber": 10,
"columnNumber": 20,
"route": "#/channel-view/own-0",
"href": "https://shineup.me/#/channel-view/own-0",
"userAgent": "...",
"clientTs": 1774700000123,
"requestOp": "GetChannelMessages",
"requestIdRef": "GetChannelMessages-123",
"contextJson": "{\"screen\":\"channels\"}"
}
}
```
### Успешный ответ
```json
{
"op": "ClientErrorLog",
"requestId": "err-001",
"status": 200,
"ok": true,
"payload": {
"serverTs": 1774700000456,
"accepted": true
}
}
```
### Специфические коды ошибок `ClientErrorLog`
- `400 / BAD_FIELDS` — обязательные поля ошибки не заполнены.
---
## 5. `ClientDebugLog`
### Запрос
```json
{
"op": "ClientDebugLog",
"requestId": "dbg-001",
"payload": {
"runId": "ui-run-1",
"level": "info",
"message": "opened channels tab",
"details": "{\"route\":\"#/channels\"}"
}
}
```
### Успешный ответ
```json
{
"op": "ClientDebugLog",
"requestId": "dbg-001",
"status": 200,
"ok": true,
"payload": {
"accepted": true,
"serverTs": 1774700000456
}
}
```
### Специфические коды ошибок `ClientDebugLog`
- `400 / BAD_FIELDS` — поле `message` не заполнено.
---
## 6. `CallDeliveryReport`
### Запрос
```json
{
"op": "CallDeliveryReport",
"requestId": "call-report-001",
"payload": {
"type": "outgoing_failed",
"value": "{\"reason\":\"ice_failed\",\"callId\":\"call-1\"}"
}
}
```
### Успешный ответ
```json
{
"op": "CallDeliveryReport",
"requestId": "call-report-001",
"status": 200,
"ok": true,
"payload": {
"serverTs": 1774700000456,
"accepted": true
}
}
```
### Специфические коды ошибок `CallDeliveryReport`
- `400 / BAD_FIELDS` — поле `type` не заполнено.
---
## 7. Короткое резюме
- `Ping` нужен для keep-alive и проверки, что WebSocket-соединение живо. - `Ping` нужен для keep-alive и проверки, что WebSocket-соединение живо.
- `GetServerInfo` нужен для выбора сервера в сети и показа публичной информации об узле. - `GetServerInfo` нужен для выбора сервера в сети и показа публичной информации об узле.
- Оба запроса доступны без авторизации. - `GetCallIceConfig` нужен для WebRTC-звонков и требует авторизации.
- `ClientErrorLog`, `ClientDebugLog`, `CallDeliveryReport` используются для диагностики клиента и звонков.
## 4. Прямое техническое сообщение в конкретную сессию ## 8. Прямое техническое сообщение в конкретную сессию
На текущий момент в публичном JSON API этого документа **нет отдельного RPC** для отправки произвольного технического сообщения в конкретную сессию пользователя (по `sessionId`). На текущий момент в публичном JSON API этого документа **нет отдельного RPC** для отправки произвольного технического сообщения в конкретную сессию пользователя (по `sessionId`).

View File

@ -1,7 +1,7 @@
# 06. Channels Read API # 06. Channels Read API
## Человеко-читаемое объяснение ## Человеко-читаемое объяснение
Эти 3 функции — это **чтение данных каналов** для UI: Эти функции — это **чтение данных каналов** для UI:
1. `ListSubscriptionsFeed` — отдает данные для экрана списка каналов: 1. `ListSubscriptionsFeed` — отдает данные для экрана списка каналов:
- ваши каналы (личный + созданные вами), - ваши каналы (личный + созданные вами),
@ -14,6 +14,12 @@
3. `GetMessageThread` — отдает дерево обсуждения вокруг конкретного сообщения: 3. `GetMessageThread` — отдает дерево обсуждения вокруг конкретного сообщения:
предки, фокус-сообщение, потомки. предки, фокус-сообщение, потомки.
4. `GetChannelsCounters` — отдает счетчики разделов каналов для пользователя.
5. `ListGroupChats200` — отдает список групповых чатов типа `200`.
6. `GetGroupDialog` — отдает сообщения конкретного группового чата типа `200`.
> На первом этапе мы **не используем курсоры** (`nextCursor`) и загружаем полные списки. > На первом этапе мы **не используем курсоры** (`nextCursor`) и загружаем полные списки.
--- ---
@ -198,6 +204,126 @@
--- ---
## 4) GetChannelsCounters
### Request
```json
{
"op": "GetChannelsCounters",
"requestId": "req-4",
"payload": {
"login": "Alice"
}
}
```
### Response (success)
```json
{
"op": "GetChannelsCounters",
"requestId": "req-4",
"status": 200,
"ok": true,
"payload": {
"login": "Alice",
"feedCount": 12,
"dialogs100Count": 3,
"groupChats200Count": 4,
"myChannelsCount": 2
}
}
```
---
## 5) ListGroupChats200
### Request
```json
{
"op": "ListGroupChats200",
"requestId": "req-5",
"payload": {
"login": "Alice"
}
}
```
### Response (success)
```json
{
"op": "ListGroupChats200",
"requestId": "req-5",
"status": 200,
"ok": true,
"payload": {
"login": "Alice",
"chats": [
{
"ownerLogin": "Alice",
"ownerBlockchainName": "alice-001",
"channelRootBlockNumber": 123,
"channelRootBlockHash": "...",
"channelName": "team",
"chatTitle": "Team chat",
"membersCount": 3,
"updatedAtMs": 1760000000000
}
]
}
}
```
---
## 6) GetGroupDialog
### Request
```json
{
"op": "GetGroupDialog",
"requestId": "req-6",
"payload": {
"login": "Alice",
"group": {
"ownerBlockchainName": "alice-001",
"channelRootBlockNumber": 123
}
}
}
```
### Response (success)
```json
{
"op": "GetGroupDialog",
"requestId": "req-6",
"status": 200,
"ok": true,
"payload": {
"group": {
"ownerLogin": "Alice",
"ownerBlockchainName": "alice-001",
"channelRootBlockNumber": 123,
"channelName": "team",
"chatTitle": "Team chat"
},
"messages": [
{
"authorLogin": "Bob",
"authorBlockchainName": "bob-001",
"blockNumber": 140,
"blockHash": "...",
"createdAtMs": 1760000000000,
"text": "Привет"
}
]
}
}
```
---
## Reason codes ## Reason codes
- `bad_fields` - `bad_fields`
- `user_not_found` - `user_not_found`

View File

@ -0,0 +1,58 @@
# API для разработчиков: индекс операций
Этот файл фиксирует полный список публичных JSON/WebSocket операций, зарегистрированных в коде сервера.
Источник истины на момент актуализации:
- `shine-server-net-protocol/src/main/java/server/logic/ws_protocol/JSON/JsonHandlerRegistry.java`.
Если операция есть в `HANDLERS` и `REQUEST_TYPES`, клиент может отправлять её как `op` в общем JSON-конверте из `00_Common_API_Format.md`.
## Актуальные операции
| Операция | Раздел документации | Кратко |
| --- | --- | --- |
| `AddUser` | `01_User_Registration_API.md` | временная регистрация пользователя |
| `GetUser` | `01_User_Registration_API.md` | чтение/проверка пользователя |
| `SearchUsers` | `01_User_Registration_API.md` | поиск логинов по префиксу |
| `AuthChallenge` | `02_Authentication_API.md` | challenge для создания новой сессии |
| `CreateAuthSession` | `02_Authentication_API.md` | создание новой авторизованной сессии |
| `SessionChallenge` | `02_Authentication_API.md` | challenge для входа в существующую сессию |
| `SessionLogin` | `02_Authentication_API.md` | вход в существующую сессию |
| `ListSessions` | `03_Session_Management_API.md` | список активных сессий |
| `CloseActiveSession` | `03_Session_Management_API.md` | закрытие активной сессии |
| `AddBlock` | `04_Add_Block_to_Blockchain_API.md` | добавление блока в блокчейн |
| `Ping` | `05_Technical_Requests_API.md` | keep-alive |
| `GetServerInfo` | `05_Technical_Requests_API.md` | публичная информация о сервере |
| `GetCallIceConfig` | `05_Technical_Requests_API.md` | STUN/TURN конфигурация звонков |
| `ClientErrorLog` | `05_Technical_Requests_API.md` | логирование клиентской ошибки |
| `ClientDebugLog` | `05_Technical_Requests_API.md` | клиентский debug-лог |
| `CallDeliveryReport` | `05_Technical_Requests_API.md` | диагностика доставки/установки звонков |
| `ListSubscriptionsFeed` | `06_Channels_Read_API.md` | лента каналов/подписок |
| `GetChannelMessages` | `06_Channels_Read_API.md` | сообщения канала |
| `GetMessageThread` | `06_Channels_Read_API.md` | тред сообщения |
| `GetChannelsCounters` | `06_Channels_Read_API.md` | счетчики разделов каналов |
| `ListGroupChats200` | `06_Channels_Read_API.md` | список групповых чатов типа `200` |
| `GetGroupDialog` | `06_Channels_Read_API.md` | сообщения группового чата типа `200` |
| `UpsertUserParam` | `10_User_Params_API.md` | запись параметра пользователя |
| `GetUserParam` | `10_User_Params_API.md` | чтение одного параметра пользователя |
| `ListUserParams` | `10_User_Params_API.md` | список параметров пользователя |
| `GetFriendsLists` | `11_Connections_API.md` | входящие/исходящие друзья |
| `ListContacts` | `11_Connections_API.md` | контакты текущего пользователя |
| `GetUserConnectionsGraph` | `11_Connections_API.md` | граф связей пользователя |
| `AddCloseFriend` | `11_Connections_API.md` | добавить близкого друга |
| `UpsertPushToken` | `12_Direct_Messages_Push_Calls_API.md` | регистрация WebPush-токена |
| `SendTestWebPush` | `12_Direct_Messages_Push_Calls_API.md` | тестовая push-доставка |
| `SendDirectMessage` | `12_Direct_Messages_Push_Calls_API.md` | отправка подписанного DM-пакета |
| `SendMessagePair` | `12_Direct_Messages_Push_Calls_API.md` | отправка пары входящий/исходящий DM |
| `ReceiveOutcomingMessage` | `12_Direct_Messages_Push_Calls_API.md` | алиас `SendMessagePair` |
| `ReceiveIncomingMessage` | `12_Direct_Messages_Push_Calls_API.md` | прием входящего DM-блока |
| `AckSessionDelivery` | `12_Direct_Messages_Push_Calls_API.md` | подтверждение доставки в сессию |
| `CallInviteBroadcast` | `12_Direct_Messages_Push_Calls_API.md` | broadcast приглашения к звонку |
| `CallSignalToSession` | `12_Direct_Messages_Push_Calls_API.md` | сигнал звонка в конкретную сессию |
## Важные замечания
- `ReceiveOutcomingMessage` сейчас зарегистрирован как алиас того же handler/request-класса, что и `SendMessagePair`.
- Классы `Net_MarkChannelMessagesSeen_*` существуют в коде, но операция `MarkChannelMessagesSeen` не зарегистрирована в `JsonHandlerRegistry`, поэтому в публичный список API не входит.
- HTTP debug endpoints из `src/main/java/server/debug/` не входят в этот индекс WebSocket `op`; они описаны отдельно в `13_HTTP_Debug_API.md`.

View File

@ -0,0 +1,129 @@
# API для разработчиков: параметры пользователя
Документ описывает операции для записи и чтения пользовательских параметров.
Текущие операции:
- `UpsertUserParam`
- `GetUserParam`
- `ListUserParams`
## 1. `UpsertUserParam`
### Запрос
```json
{
"op": "UpsertUserParam",
"requestId": "param-upsert-001",
"payload": {
"login": "alice",
"param": "display_name",
"time_ms": 1774700000123,
"value": "Alice",
"device_key": "BASE64_DEVICE_PUBLIC_KEY",
"signature": "BASE64_SIGNATURE"
}
}
```
### Успешный ответ
```json
{
"op": "UpsertUserParam",
"requestId": "param-upsert-001",
"status": 200,
"ok": true,
"payload": {
}
}
```
### Типовые ошибки
- `400 / BAD_FIELDS` — некорректные обязательные поля.
- `422 / BAD_SIGNATURE` — подпись не прошла проверку.
- `501 / DB_ERROR` — ошибка БД.
---
## 2. `GetUserParam`
### Запрос
```json
{
"op": "GetUserParam",
"requestId": "param-get-001",
"payload": {
"login": "alice",
"param": "display_name"
}
}
```
### Успешный ответ
```json
{
"op": "GetUserParam",
"requestId": "param-get-001",
"status": 200,
"ok": true,
"payload": {
"login": "alice",
"param": "display_name",
"time_ms": 1774700000123,
"value": "Alice",
"device_key": "BASE64_DEVICE_PUBLIC_KEY",
"signature": "BASE64_SIGNATURE"
}
}
```
Если параметр не найден, сервер возвращает `404` с пустым `payload`; отдельный прикладной код ошибки текущий handler не задаёт.
---
## 3. `ListUserParams`
### Запрос
```json
{
"op": "ListUserParams",
"requestId": "param-list-001",
"payload": {
"login": "alice"
}
}
```
### Успешный ответ
```json
{
"op": "ListUserParams",
"requestId": "param-list-001",
"status": 200,
"ok": true,
"payload": {
"login": "alice",
"params": [
{
"login": "alice",
"param": "display_name",
"time_ms": 1774700000123,
"value": "Alice",
"device_key": "BASE64_DEVICE_PUBLIC_KEY",
"signature": "BASE64_SIGNATURE"
}
]
}
}
```
## Примечание
Имена JSON-полей `time_ms` и `device_key` сейчас соответствуют Java-модели ответа/запроса и должны передаваться именно в таком виде.

View File

@ -0,0 +1,174 @@
# API для разработчиков: связи пользователей
Документ описывает операции чтения и записи пользовательских связей.
Текущие операции:
- `GetFriendsLists`
- `ListContacts`
- `GetUserConnectionsGraph`
- `AddCloseFriend`
## 1. `GetFriendsLists`
### Запрос
```json
{
"op": "GetFriendsLists",
"requestId": "friends-001",
"payload": {
"login": "alice"
}
}
```
### Успешный ответ
```json
{
"op": "GetFriendsLists",
"requestId": "friends-001",
"status": 200,
"ok": true,
"payload": {
"login": "Alice",
"out_friends": ["Bob"],
"in_friends": ["Kate"]
}
}
```
---
## 2. `ListContacts`
`ListContacts` использует текущую авторизованную сессию. В payload нет дополнительных полей.
### Запрос
```json
{
"op": "ListContacts",
"requestId": "contacts-001",
"payload": {
}
}
```
### Успешный ответ
```json
{
"op": "ListContacts",
"requestId": "contacts-001",
"status": 200,
"ok": true,
"payload": {
"login": "Alice",
"contacts": ["Bob", "Kate"]
}
}
```
---
## 3. `GetUserConnectionsGraph`
### Запрос
```json
{
"op": "GetUserConnectionsGraph",
"requestId": "graph-001",
"payload": {
"login": "alice"
}
}
```
### Успешный ответ
```json
{
"op": "GetUserConnectionsGraph",
"requestId": "graph-001",
"status": 200,
"ok": true,
"payload": {
"login": "Alice",
"outFriends": ["Bob"],
"inFriends": ["Kate"],
"outContacts": [],
"inContacts": [],
"outFollows": [],
"inFollows": [],
"outSpouses": [],
"inSpouses": [],
"outParents": [],
"inParents": [],
"outChildren": [],
"inChildren": [],
"outSiblings": [],
"inSiblings": [],
"outKnownPersons": [],
"inKnownPersons": [],
"outShineConfirmed": [],
"inShineConfirmed": [],
"outShineSeen": [],
"inShineSeen": [],
"parents": [],
"children": [],
"siblings": [],
"spouses": [],
"allUsers": [
{
"login": "Bob",
"official": false,
"shine": true,
"officialLabel": "",
"shineLabel": "shine",
"avatar": { "ar": "..." }
}
]
}
}
```
### Примечание
Поля `known_person`, `shine_confirmed`, `shine_seen` в UI считаются недопроверенной зоной проекта; при изменениях этой логики нужна ручная end-to-end проверка.
---
## 4. `AddCloseFriend`
`AddCloseFriend` использует текущую авторизованную сессию как источник `login`.
### Запрос
```json
{
"op": "AddCloseFriend",
"requestId": "close-friend-001",
"payload": {
"toLogin": "bob"
}
}
```
### Успешный ответ
```json
{
"op": "AddCloseFriend",
"requestId": "close-friend-001",
"status": 200,
"ok": true,
"payload": {
"login": "Alice",
"toLogin": "Bob",
"relation": "close_friend"
}
}
```

View File

@ -0,0 +1,306 @@
# API для разработчиков: DM, push и сигналы звонков
Документ описывает WebSocket-операции для подписанных личных сообщений, WebPush и realtime-сигналов звонков.
Логика личных сообщений дополнительно описана в `Dev_Docs/Personal_Messages/README.md`; этот файл фиксирует именно публичные `op`, поля запросов и поля ответов.
## 1. `UpsertPushToken`
Требует авторизации.
### Запрос
```json
{
"op": "UpsertPushToken",
"requestId": "push-upsert-001",
"payload": {
"sessionId": "SESSION_ID",
"endpoint": "https://push.example/...",
"p256dhKey": "BASE64",
"authKey": "BASE64",
"platform": "web",
"userAgent": "Mozilla/5.0 ..."
}
}
```
### Успешный ответ
```json
{
"op": "UpsertPushToken",
"requestId": "push-upsert-001",
"status": 200,
"ok": true,
"payload": {
"tokenId": "token-1",
"updatedAtMs": 1774700000123
}
}
```
---
## 2. `SendTestWebPush`
Требует авторизации. Если `login` передан, он должен совпадать с логином текущей сессии.
### Запрос
```json
{
"op": "SendTestWebPush",
"requestId": "push-test-001",
"payload": {
"login": "alice",
"sessionId": "SESSION_ID",
"title": "Test",
"text": "Push body"
}
}
```
### Успешный ответ
```json
{
"op": "SendTestWebPush",
"requestId": "push-test-001",
"status": 200,
"ok": true,
"payload": {
"targetLogin": "alice",
"attemptedSessions": 1,
"sessionsWithPushConfig": 1,
"delivered": 1,
"failed": 0,
"sentAtMs": 1774700000123
}
}
```
---
## 3. `SendDirectMessage`
Отправляет один подписанный DM-пакет.
### Запрос
```json
{
"op": "SendDirectMessage",
"requestId": "dm-001",
"payload": {
"blobB64": "BASE64_SIGNED_DM_PACKET"
}
}
```
### Успешный ответ
```json
{
"op": "SendDirectMessage",
"requestId": "dm-001",
"status": 200,
"ok": true,
"payload": {
"messageId": "dm-1",
"deliveredWsSessions": 1,
"deliveredWebPushSessions": 0,
"sessionNotFound": false
}
}
```
---
## 4. `SendMessagePair` и `ReceiveOutcomingMessage`
`ReceiveOutcomingMessage` сейчас является алиасом `SendMessagePair` и использует тот же request/handler.
### Запрос
```json
{
"op": "SendMessagePair",
"requestId": "dm-pair-001",
"payload": {
"incomingBlobB64": "BASE64_INCOMING_SIGNED_BLOCK",
"outgoingBlobB64": "BASE64_OUTGOING_SIGNED_BLOCK"
}
}
```
### Успешный ответ
```json
{
"op": "SendMessagePair",
"requestId": "dm-pair-001",
"status": 200,
"ok": true,
"payload": {
"baseKey": "base-key",
"incomingKey": "incoming-key",
"outgoingKey": "outgoing-key",
"deliveredWsSessions": 1,
"deliveredWebPushSessions": 0
}
}
```
---
## 5. `ReceiveIncomingMessage`
Принимает входящий подписанный DM-блок.
### Запрос
```json
{
"op": "ReceiveIncomingMessage",
"requestId": "dm-in-001",
"payload": {
"incomingBlobB64": "BASE64_INCOMING_SIGNED_BLOCK"
}
}
```
### Успешный ответ
```json
{
"op": "ReceiveIncomingMessage",
"requestId": "dm-in-001",
"status": 200,
"ok": true,
"payload": {
"messageKey": "incoming-key",
"baseKey": "base-key",
"deliveredWsSessions": 1,
"deliveredWebPushSessions": 0
}
}
```
---
## 6. `AckSessionDelivery`
Требует авторизации. Подтверждает доставку сообщения в текущую сессию.
### Запрос
```json
{
"op": "AckSessionDelivery",
"requestId": "ack-001",
"payload": {
"messageKey": "incoming-key"
}
}
```
### Успешный ответ
```json
{
"op": "AckSessionDelivery",
"requestId": "ack-001",
"status": 200,
"ok": true,
"payload": {
"messageKey": "incoming-key"
}
}
```
---
## 7. `CallInviteBroadcast`
Требует авторизации. Отправляет приглашение к звонку на активные сессии пользователя `toLogin`.
### Запрос
```json
{
"op": "CallInviteBroadcast",
"requestId": "call-invite-001",
"payload": {
"toLogin": "bob",
"callId": "call-1",
"type": 100
}
}
```
### Успешный ответ
```json
{
"op": "CallInviteBroadcast",
"requestId": "call-invite-001",
"status": 200,
"ok": true,
"payload": {
"callId": "call-1",
"deliveredWsSessions": 1,
"deliveredFcmSessions": 0,
"deliveredWebPushSessions": 0
}
}
```
---
## 8. `CallSignalToSession`
Требует авторизации. Отправляет сигнал звонка в конкретную сессию получателя.
### Запрос
```json
{
"op": "CallSignalToSession",
"requestId": "call-signal-001",
"payload": {
"toLogin": "bob",
"targetSessionId": "SESSION_ID",
"callId": "call-1",
"type": 101,
"data": "{\"sdp\":\"...\"}"
}
}
```
### Успешный ответ
```json
{
"op": "CallSignalToSession",
"requestId": "call-signal-001",
"status": 200,
"ok": true,
"payload": {
"delivered": true
}
}
```
Если целевая сессия не найдена или доставка не удалась, сервер может вернуть `404`.
## Типовые ошибки
- `422 / NOT_AUTHENTICATED` — требуется авторизация.
- `400 / BAD_FIELDS` — не заполнены обязательные поля.
- `404 / USER_NOT_FOUND` — пользователь не найден.
- `404 / SESSION_NOT_FOUND` — сессия не найдена.
- `422 / BAD_SIGNATURE` — подпись DM не прошла проверку.
- `422 / BAD_DEVICE_KEY` — некорректный device key отправителя.
- `422 / BAD_TIME_WINDOW` — время подписанного сообщения вне допустимого окна.
- `422 / REPLAY` — повторное сообщение заблокировано.

View File

@ -0,0 +1,190 @@
# API для разработчиков: HTTP debug endpoints
Этот файл описывает отдельный HTTP debug API сервера. Он не использует WebSocket-конверт `op/requestId/payload` и включается только настройкой:
- `debug.tempApi.enabled=true`
Источник истины:
- `src/main/java/server/debug/DebugApiConfigurator.java`
- `src/main/java/server/debug/*Servlet.java`
Если `.debug-token` отсутствует или пуст, endpoints возвращают `503 / DEBUG_DISABLED`.
## Авторизация
Для большинства debug endpoints используется Bearer token из `.debug-token`:
```http
Authorization: Bearer <token>
```
Для `POST /debug/ws/ui-reload-all` поддерживается заголовок:
```http
X-Debug-Token: <token>
```
При неверном токене сервер возвращает `401 / UNAUTHORIZED`.
## Формат успешного HTTP-ответа
```json
{
"ok": true,
"payload": {
}
}
```
## Формат HTTP-ошибки
```json
{
"ok": false,
"code": "BAD_JSON",
"message": "Тело запроса должно быть JSON"
}
```
## 1. `GET /debug/ws/clients`
Возвращает список активных WebSocket-клиентов.
### Успешный ответ
```json
{
"ok": true,
"payload": {
"count": 1,
"clients": [
{
"sessionId": "SESSION_ID",
"login": "alice",
"authStatus": 2,
"wsOpen": true,
"remoteAddress": "127.0.0.1",
"ip": "127.0.0.1",
"userAgent": "Mozilla/5.0 ...",
"clientInfoFromClient": "...",
"clientInfoFromRequest": "...",
"userLanguage": "ru",
"sessionCreatedAtMs": 1774700000123
}
]
}
}
```
## 2. `POST /debug/ws/connect`
Запускает debug-сценарий соединения двух активных WS-сессий и отправляет им события:
- `DebugConnectPrepareResponder`
- `DebugConnectStartInitiator`
### Запрос
```json
{
"initiatorSessionId": "SESSION_ID_1",
"responderSessionId": "SESSION_ID_2",
"clearDebugLog": true
}
```
### Успешный ответ
```json
{
"ok": true,
"payload": {
"runId": "dbg-...",
"callId": "debug-call-...",
"accepted": true,
"initiatorSessionId": "SESSION_ID_1",
"responderSessionId": "SESSION_ID_2",
"initiatorLogin": "alice",
"responderLogin": "bob",
"mode": "cross-login"
}
}
```
### Ошибки
- `400 / BAD_JSON` — тело запроса не JSON.
- `400 / BAD_FIELDS` — не заполнены sessionId или переданы одинаковые sessionId.
- `404 / INITIATOR_NOT_FOUND` — сессия инициатора не найдена или неактивна.
- `404 / RESPONDER_NOT_FOUND` — сессия получателя не найдена или неактивна.
## 3. `GET /debug/ws/logs`
Возвращает tail debug-логов из `DebugRunLogBuffer`.
### Query-параметры
- `limit` — количество записей.
- `runId` — фильтр по runId.
### Успешный ответ
```json
{
"ok": true,
"payload": {
"count": 1,
"limit": 100,
"runIdFilter": "ui-run-1",
"logs": [
{
"ts": 1774700000123,
"level": "info",
"runId": "ui-run-1",
"source": "debug-connect",
"sessionId": "SESSION_ID",
"login": "alice",
"message": "opened channels tab",
"details": "{}"
}
]
}
}
```
## 4. `POST /debug/ws/ui-reload-all`
Рассылает активным UI-сессиям debug-событие на reload.
### Запрос
```json
{
"reason": "manual_debug_api",
"reloadAfterMs": 700
}
```
Если `reason` не передан или пустой, сервер использует `manual_debug_api`. `reloadAfterMs` ограничивается диапазоном `100..15000`.
### Успешный ответ
```json
{
"ok": true,
"payload": {
"accepted": true,
"reason": "manual_debug_api",
"reloadAfterMs": 700,
"issuedAtMs": 1774700000123,
"totalConnections": 2,
"sentCount": 2,
"skippedCount": 0
}
}
```
### Ошибки
- `400 / BAD_JSON` — тело запроса не JSON.

View File

@ -0,0 +1,18 @@
# SHiNE-agent-bot-coder: очередь, voice, codex, systemd
- краткое описание фичи:
Добавлен новый сервис `SHiNE-agent-bot-coder` (Java), который обрабатывает сообщения от `@AidarKC`, ведёт JSONL-историю, использует файловую очередь, распознаёт voice через OpenAI, вызывает Codex CLI и поддерживает запуск как `systemd`-сервис.
- что именно проверять:
1. Бот принимает текст от `@AidarKC`, ставит задачу в очередь и отправляет ответ от Codex.
2. Бот принимает voice, отправляет текст распознавания и затем ответ от Codex.
3. Одновременные сообщения обрабатываются строго по одному (без параллельных запусков Codex).
4. После рестарта сервиса незавершённая активная задача повторно уходит в обработку.
5. Команда `/new` архивирует текущую историю и создаёт новую.
6. `systemd`-сервис стартует и автоматически перезапускается.
- ожидаемый результат:
Все пункты выше отрабатывают без потери сообщений, с корректным обновлением `data/queue.jsonl`, `data/state.json` и `data/history/*.jsonl`.
- статус:
`pending`

View File

@ -0,0 +1,22 @@
# Агент-бот coder: устранение дублей и зависаний
- краткое описание фичи:
- добавлен lock-файл `data/app.lock`, чтобы гарантировать единственный инстанс бота;
- доработано завершение worker/codex при stop/restart, чтобы не было ложных retry после штатной остановки;
- синхронизирован systemd-профиль под `--user` для `ai`;
- улучшена отправка промежуточных статусов по событиям `codex --json`.
- что проверять:
- при запущенном сервисе повторный запуск jar вручную завершается сразу с сообщением про занятый lock;
- команда `/status` отвечает один раз (без дублей);
- тестовая текстовая задача от `@AidarKC` обрабатывается и возвращает ответ;
- при `/stop` активная задача завершается без последующего спама retry-ошибками;
- в логах нет `409 Conflict` из-за второго poller после чистого перезапуска.
- ожидаемый результат:
- одновременно работает только один процесс бота;
- бот не дублирует ответы;
- очередь не застревает на interrupted после штатного stop/restart.
- статус:
- pending

View File

@ -0,0 +1,21 @@
# Python-обвязка Telegram → Codex (упрощённый сервис)
- краткое описание фичи:
- добавлен новый упрощённый сервис `SHiNE-agent-bot-coder/py_bot_service.py`;
- сервис работает через long-polling Telegram, принимает только текст, ведёт историю в `JSONL`;
- добавлены команды `/status`, `/queue`, `/stop`, `/cancel`, `/new`, `/help`;
- systemd unit переключён на запуск Python-сервиса.
- что проверять:
- `systemctl --user status shine-agent-bot-coder` показывает `active (running)`;
- бот отвечает на `/status` одним сообщением без дублей;
- тестовый текстовый запрос получает финальный ответ от Codex;
- `/stop` корректно останавливает текущую задачу;
- `/new` переносит текущую историю в `data/history/archive`.
- ожидаемый результат:
- стабильная обработка текстовых задач без зависаний и двойной обработки;
- только один инстанс сервиса (через lock `data/py_app.lock`).
- статус:
- pending

View File

@ -0,0 +1,36 @@
# Локальный деплой SHiNE-agent-bot-coder (systemd, пользователь ai)
## Где находится сервис
- Папка сервиса: `SHiNE-agent-bot-coder/`
- Systemd unit: `SHiNE-agent-bot-coder/scripts/systemd/shine-agent-bot-coder.service`
- Скрипт установки: `SHiNE-agent-bot-coder/scripts/systemd/install-local-systemd.sh`
## Предусловия
1. Заполнен `.env` на основе `.env.example`.
2. Доступен рабочий Codex CLI:
- `/home/ai/.cache/JetBrains/IntelliJIdea2026.1/aia/codex/bin/codex-x86_64-unknown-linux-musl`
3. На машине установлен `systemd --user`.
## Установка
Из корня репозитория:
```bash
bash SHiNE-agent-bot-coder/scripts/systemd/install-local-systemd.sh
```
Скрипт:
1. проверяет наличие `python3`;
2. копирует unit в `~/.config/systemd/user/`;
3. делает `systemctl --user daemon-reload`;
4. включает автозапуск и стартует сервис.
## Проверка
```bash
systemctl --user status shine-agent-bot-coder --no-pager
journalctl --user -u shine-agent-bot-coder -f
```
## Перезапуск после изменений
```bash
systemctl --user restart shine-agent-bot-coder
```

View File

@ -0,0 +1,119 @@
# ESP32-S3-Touch-AMOLED-2.16 Codex Guide
Этот файл переносится в другие проекты как готовая инструкция для Codex по этой плате.
## 1) Что это за плата
- Модель: `Waveshare ESP32-S3-Touch-AMOLED-2.16`
- MCU: `ESP32-S3` (flash 16MB, PSRAM 8MB)
- Экран: AMOLED, физически 480x480, углы скруглены (часть крайних пикселей может быть невидима)
- Touch: CST92xx
- IMU: QMI8658
- Аудио:
- DAC/вывод (динамик): ES8311
- ADC/вход (микрофоны): ES7210
## 2) Что уже установлено в этой среде
- Ubuntu
- `arduino-cli 1.4.0`
- `esp32:esp32` core `3.3.5`
- `esptool` из `~/.arduino15/packages/esp32/tools/esptool_py/5.1.0/esptool`
- USB порт платы: обычно `/dev/ttyACM0`
Проверка:
```bash
arduino-cli version
arduino-cli core list
arduino-cli board list
ls -l /dev/ttyACM0
```
## 3) Структура подпроекта (эталон)
- `official-demo/` — официальный repo Waveshare (примеры+библиотеки)
- `original-firmware/` — backup/restore заводской прошивки
- `test-device/` — прошивки и `burn.sh`
- `reference/` — заметки и ссылки
## 4) Бэкап перед любыми экспериментами
```bash
cd ESP32-S3-Touch-AMOLED-2.16/original-firmware
./backup_factory.sh
```
Ожидаемый результат:
- `factory-full-16mb.bin`
- `factory-full-16mb.bin.sha256`
Восстановление:
```bash
./restore_factory_backup.sh
```
## 5) Деплой (прошивка) — стандарт
Главный скрипт:
```bash
cd ESP32-S3-Touch-AMOLED-2.16/test-device
./burn.sh <mode>
```
Режимы:
- `hello` — базовый экран
- `widgets` — экран+touch+IMU (официальный пример)
- `audio` — тест аудио тракта
- `simple` — кастомный интеграционный тест (экран, touch, запись/воспроизведение, VU, tilt)
## 6) Как писать код под эту плату (важно)
1. **Экран**
- Рабочее разрешение использовать `480x480`.
- Не рисовать критичный текст/кнопки впритык к краю; держать safe margin (`~20px+`) из-за скругленных углов.
- Не делать полный `fillScreen` в каждом loop: только частичные обновления (`fillRect`/локальные перерисовки), иначе мерцание.
2. **Touch**
- Настройка CST:
- `setMaxCoordinates(480, 480)`
- `setSwapXY(true)`
- `setMirrorXY(true, false)`
- Обрабатывать touch по IRQ + `getPoint`.
- После смещения UI обязательно пересчитывать hitbox кнопок.
3. **Аудио**
- Для динамика инициализировать `ES8311`.
- Для микрофона обязательно инициализировать `ES7210`; без этого запись может быть пустой.
- Для отладки записи показывать VU/peak на экране во время `RECORD`.
- Для быстрой проверки тракта всегда держать кнопку `BEEP` (тон), чтобы отделить проблему динамика от проблемы микрофона.
4. **IMU**
- QMI8658 обновлять с ограниченной частотой (например 80150 мс для UI-строки), чтобы не шуметь перерисовками.
5. **Стабильность UI**
- Статика: рисуется один раз в setup.
- Динамика: отдельная зона, перерисовывать только по изменению данных.
## 7) Рекомендуемый workflow для Codex
1. Проверить порт и инструменты.
2. Если новая плата/первый запуск — сделать backup flash.
3. Собрать и залить `simple`.
4. Пройти ручной чек:
- экран отображает текст без обрезки,
- touch срабатывает по кнопкам,
- `BEEP` слышно,
- VU двигается во время записи,
- `PLAY` воспроизводит записанное,
- `Tilt` меняется при повороте.
5. Только после этого усложнять приложение.
## 8) Ссылки
- Product page: https://www.waveshare.com/product/arduino/boards-kits/esp32-s3/esp32-s3-touch-amoled-2.16.htm
- Docs: https://docs.waveshare.com/ESP32-S3-Touch-AMOLED-2.16
- Arduino setup: https://docs.waveshare.com/ESP32-S3-Touch-AMOLED-2.16/Development-Environment-Setup-Arduino
- Official examples: https://github.com/waveshareteam/ESP32-S3-Touch-AMOLED-2.16

View File

@ -0,0 +1,10 @@
TELEGRAM_BOT_TOKEN=replace_me
OPENAI_API_KEY=replace_me
ALLOWED_TELEGRAM_USERNAME=AidarKC
BOT_USERNAME=aidar_su_bot
OPENAI_TRANSCRIBE_MODEL=gpt-4o-mini-transcribe
CODEX_BIN=/home/ai/.cache/JetBrains/IntelliJIdea2026.1/aia/codex/bin/codex-x86_64-unknown-linux-musl
CODEX_WORKDIR=/home/ai/work/SHiNE/SHiNE-server-sha256
CODEX_TIMEOUT_SECONDS=900
MAX_RETRIES=3
DATA_DIR=./data

5
SHiNE-agent-bot-coder/.gitignore vendored Normal file
View File

@ -0,0 +1,5 @@
.env
data/
logs/
run/
__pycache__/

View File

@ -0,0 +1,31 @@
# AGENT.md для SHiNE-agent-bot-coder
Ты запущен как обработчик входящего Telegram-сообщения от пользователя.
## Контекст
- `SHiNE-agent-bot-coder` — локальный Telegram-бот-сервис агента-кодера для работы с этим проектом.
- Сервис принимает входящие сообщения от пользователя Telegram, сохраняет историю, ставит задачи в очередь и последовательно запускает Codex CLI в рабочем проекте.
- Текстовые сообщения обрабатываются напрямую, voice/audio сначала распознаются через OpenAI transcription, затем передаются как текстовая задача.
- История диалога хранится в JSONL-файле, путь передаётся в промпте.
- Сообщение может быть текстом или результатом распознавания голосового.
- Ответ пойдёт пользователю в Telegram как обычное текстовое сообщение.
## Очередь и состояние
- Входящие задачи записываются в файловую очередь и обрабатываются строго по одной, чтобы не смешивать изменения в проекте.
- Сервис ведёт состояние активной задачи и текущего файла истории, а после рестарта продолжает незавершённую обработку с учётом сохранённого состояния.
- Истории диалогов хранятся в JSONL; после команды `/new` старая история архивируется, а новая начинается отдельно.
- Дедупликация входящих Telegram update нужна, чтобы одно сообщение не попало в обработку повторно.
## Локальный запуск и systemd
- Основной запуск сервиса выполняется Python-скриптом `py_bot_service.py` из папки `SHiNE-agent-bot-coder/`.
- Локальные секреты и параметры должны храниться в `.env`, этот файл не коммитится.
- Для проверки Codex без Telegram можно использовать self-test режим сервиса.
- Для постоянного локального запуска используется user-level systemd service `shine-agent-bot-coder`; скрипты установки лежат в `SHiNE-agent-bot-coder/scripts/systemd/`.
- Если меняется логика сервиса, после изменений нужно проверить запуск локально и при необходимости перезапустить user systemd service.
## Правила ответа
- Пиши содержательно и коротко.
- Не упоминай внутренние служебные детали, файловую систему и технические логи.
- Если запрос требует действий с кодом/проектом, выполняй их в рабочей директории.
- Если для ответа данных недостаточно, задай ровно один уточняющий вопрос.
- Если была ошибка предыдущего запуска, в промпте будет пометка retry — учти это и продолжи с учётом текущего состояния проекта.

View File

@ -0,0 +1,44 @@
# SHiNE-agent-bot-coder
Локальный Telegram-бот-сервис для пользователя `ai`:
- принимает сообщения от `@AidarKC`;
- ведёт историю диалога в `JSONL`;
- ставит задачи в файловую очередь;
- обрабатывает задачи строго последовательно;
- поддерживает текстовые и голосовые сообщения (voice/audio через OpenAI transcription);
- вызывает Codex CLI и отправляет ответ в Telegram;
- при рестарте восстанавливает незавершённые задачи.
## Структура
- `.env` — локальные секреты и параметры запуска (не коммитится);
- `data/queue.jsonl` — очередь задач;
- `data/state.json` — текущее состояние (active job + текущий history-файл);
- `data/py_queue.jsonl` — очередь Python-сервиса;
- `data/py_state.json` — текущее состояние Python-сервиса;
- `data/py_processed_updates.log` — дедуп входящих update;
- `data/history/*.jsonl` — активные истории;
- `data/history/archive/*.jsonl` — архив историй после `/new`.
## Локальный запуск
1. Скопировать пример:
- `cp .env.example .env`
2. Заполнить секреты в `.env`.
3. Запуск:
- `python3 SHiNE-agent-bot-coder/py_bot_service.py`
## Быстрый self-test Codex (без Telegram)
```bash
python3 SHiNE-agent-bot-coder/py_bot_service.py --selftest-codex "Ответь одной строкой: Codex работает"
```
## Запуск как systemd-сервис
Файлы для установки:
- `scripts/systemd/shine-agent-bot-coder.service`
- `scripts/systemd/install-local-systemd.sh`
Установка:
- `bash SHiNE-agent-bot-coder/scripts/systemd/install-local-systemd.sh`
Проверка:
- `systemctl --user status shine-agent-bot-coder --no-pager`
- `journalctl --user -u shine-agent-bot-coder -f`

View File

@ -0,0 +1,50 @@
plugins {
id 'java'
id 'application'
id 'com.github.johnrengelman.shadow' version '8.1.1'
}
group = 'shine.agent'
version = '1.0.0'
repositories {
mavenCentral()
}
dependencies {
implementation 'org.telegram:telegrambots:6.9.7.1'
implementation 'com.fasterxml.jackson.core:jackson-databind:2.17.1'
implementation 'org.slf4j:slf4j-api:2.0.16'
runtimeOnly 'org.slf4j:slf4j-simple:2.0.16'
implementation 'org.apache.httpcomponents:httpclient:4.5.14'
implementation 'org.apache.httpcomponents:httpcore:4.4.16'
implementation 'commons-codec:commons-codec:1.17.0'
testImplementation platform('org.junit:junit-bom:5.10.2')
testImplementation 'org.junit.jupiter:junit-jupiter'
}
java {
toolchain {
languageVersion = JavaLanguageVersion.of(17)
}
}
application {
mainClass = 'shine.agent.botcoder.BotCoderApplication'
}
tasks.named('jar') {
enabled = false
}
shadowJar {
archiveBaseName.set('shine-agent-bot-coder')
archiveClassifier.set('')
archiveVersion.set('')
mergeServiceFiles()
}
tasks.named('test') {
useJUnitPlatform()
}

View File

@ -0,0 +1,928 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import datetime as dt
import fcntl
import json
import mimetypes
import os
import random
import string
import subprocess
import tempfile
import threading
import time
import uuid
from pathlib import Path
from typing import Any
from urllib import error, request
def now_iso() -> str:
return dt.datetime.now(dt.timezone.utc).isoformat()
def normalize_username(value: str | None) -> str:
if not value:
return ""
value = value.strip()
if value.startswith("@"):
value = value[1:]
return value.lower()
def split_long_text(text: str, chunk_size: int = 3500) -> list[str]:
text = (text or "").strip()
if not text:
return ["(пустой ответ)"]
return [text[i:i + chunk_size] for i in range(0, len(text), chunk_size)]
def read_env_file(path: Path) -> dict[str, str]:
result: dict[str, str] = {}
if not path.exists():
return result
for raw_line in path.read_text(encoding="utf-8").splitlines():
line = raw_line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, value = line.split("=", 1)
key = key.strip()
value = value.strip().strip('"').strip("'")
result[key] = value
return result
class JsonLineStore:
@staticmethod
def load(path: Path) -> list[dict[str, Any]]:
if not path.exists():
return []
items: list[dict[str, Any]] = []
for line in path.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line:
continue
items.append(json.loads(line))
return items
@staticmethod
def save(path: Path, items: list[dict[str, Any]]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(path.suffix + ".tmp")
with tmp.open("w", encoding="utf-8") as f:
for item in items:
f.write(json.dumps(item, ensure_ascii=False) + "\n")
tmp.replace(path)
@staticmethod
def append(path: Path, item: dict[str, Any]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("a", encoding="utf-8") as f:
f.write(json.dumps(item, ensure_ascii=False) + "\n")
class TelegramApi:
def __init__(self, token: str):
self.base = f"https://api.telegram.org/bot{token}/"
def call(self, method: str, payload: dict[str, Any] | None = None, timeout: int = 60) -> dict[str, Any]:
data = None
headers = {}
if payload is not None:
data = json.dumps(payload).encode("utf-8")
headers["Content-Type"] = "application/json"
req = request.Request(self.base + method, data=data, headers=headers, method="POST")
try:
with request.urlopen(req, timeout=timeout) as resp:
raw = resp.read().decode("utf-8")
except error.HTTPError as e:
body = e.read().decode("utf-8", errors="replace")
raise RuntimeError(f"Telegram HTTP {e.code}: {body}") from e
except Exception as e:
raise RuntimeError(f"Telegram request failed: {e}") from e
result = json.loads(raw)
if not result.get("ok"):
raise RuntimeError(f"Telegram API error: {result}")
return result
def get_updates(self, offset: int | None, timeout_sec: int) -> list[dict[str, Any]]:
payload: dict[str, Any] = {"timeout": timeout_sec, "allowed_updates": ["message"]}
if offset is not None:
payload["offset"] = offset
result = self.call("getUpdates", payload=payload, timeout=timeout_sec + 15)
return result.get("result", [])
def send_message(self, chat_id: int, text: str, reply_to_message_id: int | None = None) -> None:
payload: dict[str, Any] = {"chat_id": chat_id, "text": text}
if reply_to_message_id is not None:
payload["reply_to_message_id"] = reply_to_message_id
self.call("sendMessage", payload=payload, timeout=30)
def delete_webhook(self) -> None:
self.call("deleteWebhook", payload={"drop_pending_updates": False}, timeout=30)
class BotConfig:
def __init__(self, root_dir: Path):
env = dict(os.environ)
env.update(read_env_file(root_dir / ".env"))
self.root_dir = root_dir
self.telegram_bot_token = self._required(env, "TELEGRAM_BOT_TOKEN")
self.allowed_username = normalize_username(env.get("ALLOWED_TELEGRAM_USERNAME", "AidarKC"))
self.bot_username = env.get("BOT_USERNAME", "aidar_su_bot")
self.openai_api_key = env.get("OPENAI_API_KEY", "").strip()
self.openai_transcribe_model = env.get("OPENAI_TRANSCRIBE_MODEL", "gpt-4o-mini-transcribe")
self.codex_bin = Path(env.get(
"CODEX_BIN",
"/home/ai/.cache/JetBrains/IntelliJIdea2026.1/aia/codex/bin/codex-x86_64-unknown-linux-musl"
))
self.codex_workdir = Path(env.get("CODEX_WORKDIR", "/home/ai/work/SHiNE/SHiNE-server-sha256"))
self.codex_timeout_seconds = int(env.get("CODEX_TIMEOUT_SECONDS", "900"))
self.max_retries = max(1, int(env.get("MAX_RETRIES", "3")))
self.data_dir = (root_dir / env.get("DATA_DIR", "./data")).resolve()
self.agent_instructions_file = (root_dir / "AGENT.md").resolve()
@staticmethod
def _required(env: dict[str, str], key: str) -> str:
value = env.get(key, "").strip()
if not value:
raise RuntimeError(f"Не задан обязательный параметр: {key}")
return value
class ShinePyBotService:
def __init__(self, config: BotConfig):
self.cfg = config
self.telegram = TelegramApi(config.telegram_bot_token)
self.queue_file = config.data_dir / "py_queue.jsonl"
self.state_file = config.data_dir / "py_state.json"
self.processed_updates_file = config.data_dir / "py_processed_updates.log"
self.lock_file = config.data_dir / "py_app.lock"
self.history_dir = config.data_dir / "history"
self.history_archive_dir = self.history_dir / "archive"
self.max_processed_updates = 5000
self.queue_lock = threading.RLock()
self.stop_event = threading.Event()
self.worker = threading.Thread(target=self._worker_loop, name="shine-py-bot-worker", daemon=True)
self.queue: list[dict[str, Any]] = []
self.state: dict[str, Any] = {}
self.processed_updates: list[str] = []
self.active_job_id: str | None = None
self.active_job_started_at: float | None = None
self.active_process: subprocess.Popen[str] | None = None
self.active_process_lock = threading.Lock()
self.stop_current_job = False
self.lock_fd = None
self.last_heartbeat_at: float = 0.0
def run(self) -> None:
self._ensure_dirs()
self._acquire_single_instance_lock()
self._load_state()
self._load_queue()
self._load_processed_updates()
self._recover_active_jobs_after_restart()
self.telegram.delete_webhook()
self._init_offset_if_missing()
self.worker.start()
self._append_history_event("service_started", {"allowedUsername": self.cfg.allowed_username})
print(f"[py-bot] Запущен. allowed user: @{self.cfg.allowed_username}", flush=True)
try:
while not self.stop_event.is_set():
try:
offset = self.state.get("offset")
updates = self.telegram.get_updates(offset=offset, timeout_sec=25)
except Exception as e:
print(f"[py-bot] Ошибка getUpdates: {e}", flush=True)
time.sleep(2)
continue
for update in updates:
update_id = update.get("update_id")
if isinstance(update_id, int):
self.state["offset"] = update_id + 1
self._persist_state()
self._handle_update(update)
finally:
self.shutdown()
def shutdown(self) -> None:
if self.stop_event.is_set():
pass
self.stop_event.set()
self._stop_active_codex_process()
if self.worker.is_alive():
self.worker.join(timeout=10)
if self.lock_fd is not None:
try:
fcntl.flock(self.lock_fd, fcntl.LOCK_UN)
finally:
self.lock_fd.close()
self.lock_fd = None
self._append_history_event("service_stopped", {})
def _ensure_dirs(self) -> None:
self.cfg.data_dir.mkdir(parents=True, exist_ok=True)
self.history_dir.mkdir(parents=True, exist_ok=True)
self.history_archive_dir.mkdir(parents=True, exist_ok=True)
def _acquire_single_instance_lock(self) -> None:
self.lock_file.parent.mkdir(parents=True, exist_ok=True)
self.lock_fd = self.lock_file.open("a+")
try:
fcntl.flock(self.lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
except BlockingIOError:
raise RuntimeError(f"Уже запущен другой инстанс (lock: {self.lock_file})")
def _load_state(self) -> None:
if self.state_file.exists():
self.state = json.loads(self.state_file.read_text(encoding="utf-8"))
else:
self.state = {}
if not self.state.get("current_history_file"):
history_file = self._create_new_history_file("initial")
self.state["current_history_file"] = str(history_file)
if not isinstance(self.state.get("next_job_number"), int):
self.state["next_job_number"] = 1
self.state["updated_at"] = now_iso()
self._persist_state()
def _persist_state(self) -> None:
self.state["updated_at"] = now_iso()
tmp = self.state_file.with_suffix(".tmp")
tmp.write_text(json.dumps(self.state, ensure_ascii=False, indent=2), encoding="utf-8")
tmp.replace(self.state_file)
def _load_queue(self) -> None:
self.queue = JsonLineStore.load(self.queue_file)
def _persist_queue(self) -> None:
JsonLineStore.save(self.queue_file, self.queue)
def _load_processed_updates(self) -> None:
if not self.processed_updates_file.exists():
self.processed_updates = []
return
lines = [x.strip() for x in self.processed_updates_file.read_text(encoding="utf-8").splitlines() if x.strip()]
if len(lines) > self.max_processed_updates:
lines = lines[-self.max_processed_updates:]
self.processed_updates_file.write_text("\n".join(lines) + "\n", encoding="utf-8")
self.processed_updates = lines
def _mark_processed_update(self, update_key: str) -> bool:
if update_key in self.processed_updates:
return True
self.processed_updates.append(update_key)
if len(self.processed_updates) > self.max_processed_updates:
self.processed_updates = self.processed_updates[-self.max_processed_updates:]
self.processed_updates_file.write_text("\n".join(self.processed_updates) + "\n", encoding="utf-8")
else:
with self.processed_updates_file.open("a", encoding="utf-8") as f:
f.write(update_key + "\n")
return False
def _recover_active_jobs_after_restart(self) -> None:
recovered_ids: list[str] = []
for job in self.queue:
if job.get("status") == "active":
job["status"] = "pending"
job["retry_reason"] = "service_restart_recovery"
job["updated_at"] = now_iso()
recovered_ids.append(job.get("id", ""))
if recovered_ids:
self._persist_queue()
self._append_history_event("active_jobs_recovered", {"jobIds": recovered_ids})
def _init_offset_if_missing(self) -> None:
if self.state.get("offset") is not None:
return
try:
updates = self.telegram.get_updates(offset=None, timeout_sec=0)
if updates:
self.state["offset"] = int(updates[-1]["update_id"]) + 1
else:
self.state["offset"] = 0
self._persist_state()
except Exception as e:
print(f"[py-bot] Не удалось инициализировать offset: {e}", flush=True)
self.state["offset"] = 0
self._persist_state()
def _current_history_file(self) -> Path:
return Path(self.state["current_history_file"])
def _create_new_history_file(self, reason: str) -> Path:
ts = dt.datetime.now().strftime("%Y-%m-%d_%H%M%S")
rnd = "".join(random.choices(string.hexdigits.lower(), k=8))
path = self.history_dir / f"{ts}_{rnd}.jsonl"
JsonLineStore.append(path, {"ts": now_iso(), "type": "history_created", "reason": reason})
return path
def _rotate_history(self, reason: str, username: str) -> Path:
current = self._current_history_file()
if current.exists():
archived = self.history_archive_dir / current.name
current.replace(archived)
else:
archived = self.history_archive_dir / "(empty)"
new_file = self._create_new_history_file(reason)
self.state["current_history_file"] = str(new_file)
self._persist_state()
self._append_history_event("history_rotated", {"reason": reason, "username": username, "archived": str(archived)})
return archived
def _append_history(self, history_path: Path, event_type: str, payload: dict[str, Any]) -> None:
row = {"ts": now_iso(), "type": event_type}
row.update(payload)
JsonLineStore.append(history_path, row)
def _append_history_event(self, event_type: str, payload: dict[str, Any]) -> None:
history_path = self._current_history_file()
self._append_history(history_path, "system_event", {"event": event_type, **payload})
def _handle_update(self, update: dict[str, Any]) -> None:
message = update.get("message")
if not isinstance(message, dict):
return
chat = message.get("chat") or {}
chat_id = chat.get("id")
message_id = message.get("message_id")
sender = message.get("from") or {}
username = normalize_username(sender.get("username"))
if not isinstance(chat_id, int) or not isinstance(message_id, int):
return
update_key = f"{chat_id}:{message_id}"
if self._mark_processed_update(update_key):
return
if username != self.cfg.allowed_username:
return
text = (message.get("text") or "").strip()
if not text:
if message.get("voice"):
self._enqueue_voice_job(chat_id, message_id, username, message["voice"].get("file_id"))
return
if message.get("audio"):
self._enqueue_voice_job(chat_id, message_id, username, message["audio"].get("file_id"))
return
self._safe_send(chat_id, "Поддерживаются текст, voice и audio.", reply_to=message_id)
return
if text.startswith("/"):
self._handle_command(chat_id, message_id, username, text)
return
history_path = self._current_history_file()
self._append_history(history_path, "incoming_text", {
"chatId": chat_id, "messageId": message_id, "username": username, "text": text
})
job = self._build_job_base(chat_id, message_id, username, str(history_path))
job["type"] = "text"
job["text"] = text
with self.queue_lock:
self.queue.append(job)
self._persist_queue()
self._safe_send(chat_id, f"Принял задачу #{job['num']}", reply_to=message_id)
def _enqueue_voice_job(self, chat_id: int, message_id: int, username: str, file_id: str | None) -> None:
if not file_id:
self._safe_send(chat_id, "Не удалось прочитать file_id голосового.", reply_to=message_id)
return
history_path = self._current_history_file()
self._append_history(history_path, "incoming_voice", {
"chatId": chat_id, "messageId": message_id, "username": username, "fileId": file_id
})
job = self._build_job_base(chat_id, message_id, username, str(history_path))
job["type"] = "voice"
job["telegram_file_id"] = file_id
with self.queue_lock:
self.queue.append(job)
self._persist_queue()
self._safe_send(chat_id, f"Принял voice в задачу #{job['num']}", reply_to=message_id)
def _build_job_base(self, chat_id: int, message_id: int, username: str, history_file: str) -> dict[str, Any]:
with self.queue_lock:
num = int(self.state.get("next_job_number", 1))
self.state["next_job_number"] = num + 1
self._persist_state()
return {
"id": str(uuid.uuid4()),
"num": num,
"status": "pending",
"type": "text",
"chat_id": chat_id,
"message_id": message_id,
"username": username,
"text": "",
"telegram_file_id": "",
"history_file": history_file,
"attempts": 0,
"retry_reason": "",
"last_error": "",
"created_at": now_iso(),
"updated_at": now_iso(),
"active_since": None,
}
def _handle_command(self, chat_id: int, message_id: int, username: str, text: str) -> None:
lower = text.lower()
if lower in ("/start", "/help"):
self._safe_send(chat_id, self._help_text(), reply_to=message_id)
return
if lower == "/status":
self._safe_send(chat_id, self._status_text(), reply_to=message_id)
return
if lower == "/queue":
self._safe_send(chat_id, self._queue_text(), reply_to=message_id)
return
if lower == "/new":
archived = self._rotate_history("command_new", username)
self._safe_send(chat_id, f"История очищена. Новый диалог начат.\nАрхив: {archived.name}", reply_to=message_id)
return
if lower == "/stop":
stopped = self._cancel_active_job("stopped_by_user")
if stopped:
self._safe_send(chat_id, "Текущая задача остановлена и удалена из очереди.", reply_to=message_id)
else:
self._safe_send(chat_id, "Сейчас нет активной задачи.", reply_to=message_id)
return
if lower.startswith("/cancel"):
parts = text.split(maxsplit=1)
if len(parts) < 2:
self._safe_send(chat_id, "Использование: /cancel <id|all>", reply_to=message_id)
return
arg = parts[1].strip()
if arg.lower() == "all":
with self.queue_lock:
self.stop_current_job = True
self._stop_active_codex_process()
count = len(self.queue)
self.queue = []
self._persist_queue()
self._safe_send(chat_id, f"Удалено задач из очереди: {count}", reply_to=message_id)
return
cancelled = self._cancel_by_id_prefix(arg)
self._safe_send(chat_id, f"Задача удалена: {arg}" if cancelled else f"Задача не найдена: {arg}", reply_to=message_id)
return
def _help_text(self) -> str:
return (
"Доступные команды:\n"
"/status — активная задача и размер очереди\n"
"/queue — список задач в очереди\n"
"/stop — остановить текущую задачу\n"
"/cancel <id|all> — удалить задачу по id (префикс) или все\n"
"/new — архивировать историю и начать новую\n"
"/help — эта справка"
)
def _status_text(self) -> str:
with self.queue_lock:
active = next((j for j in self.queue if j.get("status") == "active"), None)
pending = sum(1 for j in self.queue if j.get("status") == "pending")
if not active:
return f"Статус: активной задачи нет.\nВ очереди pending: {pending}"
elapsed = int(time.time() - (self.active_job_started_at or time.time()))
return (
f"Статус: активная задача #{active.get('num', '?')}\n"
f"Тип: {active.get('type', 'text')}\n"
f"Попытка: {int(active.get('attempts', 0)) + 1}/{self.cfg.max_retries}\n"
f"Выполняется: {elapsed}с\n"
f"Pending: {pending}"
)
def _queue_text(self) -> str:
with self.queue_lock:
items = list(self.queue)
if not items:
return "Очередь пуста."
lines = [f"Очередь: {len(items)}"]
for i, job in enumerate(items[:10], start=1):
lines.append(
f"{i}) #{job.get('num', '?')} [{job.get('status')}] {job.get('type')} attempts={job.get('attempts', 0)}"
)
if len(items) > 10:
lines.append(f"...и ещё {len(items) - 10} задач")
return "\n".join(lines)
def _cancel_active_job(self, reason: str) -> bool:
with self.queue_lock:
active = next((j for j in self.queue if j.get("status") == "active"), None)
if not active:
return False
self.stop_current_job = True
self._stop_active_codex_process()
self.queue = [j for j in self.queue if j.get("id") != active.get("id")]
self._persist_queue()
self._append_history_event("job_stopped_by_user", {"jobId": active.get("id"), "reason": reason})
return True
def _cancel_by_id_prefix(self, prefix: str) -> bool:
prefix = prefix.strip().lower()
normalized_num = prefix.lstrip("#")
with self.queue_lock:
target = next(
(
j for j in self.queue
if str(j.get("id", "")).lower().startswith(prefix)
or str(j.get("num", "")).lower() == normalized_num
),
None
)
if not target:
return False
if target.get("status") == "active":
self.stop_current_job = True
self._stop_active_codex_process()
self.queue = [j for j in self.queue if j.get("id") != target.get("id")]
self._persist_queue()
return True
def _worker_loop(self) -> None:
while not self.stop_event.is_set():
job = None
with self.queue_lock:
for item in self.queue:
if item.get("status") == "pending":
item["status"] = "active"
item["active_since"] = now_iso()
item["updated_at"] = now_iso()
self.active_job_id = item.get("id")
self.active_job_started_at = time.time()
job = dict(item)
self._persist_queue()
break
if not job:
time.sleep(0.5)
continue
self.stop_current_job = False
self._process_job(job)
self.active_job_id = None
self.active_job_started_at = None
def _process_job(self, job: dict[str, Any]) -> None:
job_id = job["id"]
job_num = job.get("num", "?")
chat_id = int(job["chat_id"])
message_id = int(job["message_id"])
history_path = Path(job["history_file"])
self._safe_send(chat_id, f"Задача #{job_num} в работе.", reply_to=message_id)
try:
if job.get("type") == "voice":
self._safe_send(chat_id, f"#{job_num}: распознаю голосовое...", reply_to=message_id)
recognized = self._transcribe_voice_job(job)
job["text"] = recognized
self._append_history(history_path, "voice_transcription", {"jobId": job_id, "jobNum": job_num, "text": recognized})
preview = recognized.strip()
if len(preview) > 1200:
preview = preview[:1200] + " ...[обрезано]"
self._safe_send(chat_id, f"#{job_num}: распознано:\n{preview}", reply_to=message_id)
self._safe_send(chat_id, f"#{job_num}: распознано, отправляю в Codex.", reply_to=message_id)
prompt = self._build_prompt(job)
self._append_history(history_path, "codex_request", {"jobId": job_id, "prompt": prompt})
answer = self._run_codex(prompt, chat_id, message_id, job_id, job_num)
for chunk in split_long_text(answer):
self._safe_send(chat_id, chunk, reply_to=message_id)
self._safe_send(chat_id, f"Готово #{job_num}.", reply_to=message_id)
self._append_history(history_path, "codex_response", {"jobId": job_id, "text": answer})
self._mark_job_done(job_id)
except Exception as e:
if self.stop_current_job:
self._append_history(history_path, "job_stopped", {"jobId": job_id, "reason": str(e)})
self._safe_send(chat_id, f"Задача #{job_num} остановлена.", reply_to=message_id)
self._mark_job_removed(job_id)
self.stop_current_job = False
return
self._handle_job_failure(job, e)
def _build_prompt(self, job: dict[str, Any]) -> str:
retry_block = ""
retry_reason = (job.get("retry_reason") or "").strip()
if retry_reason:
retry_block = f"\n\nПометка retry: {retry_reason}"
return (
"Пришло сообщение в Telegram.\n"
f"Тип: {job.get('type')}\n"
f"Username отправителя: @{job.get('username')}\n"
"Текст для обработки:\n"
f"{job.get('text')}\n\n"
f"История диалога (JSONL): {job.get('history_file')}\n"
f"Инструкции агента: {self.cfg.agent_instructions_file}\n"
f"Работай в рабочем проекте аккуратно и верни только текст ответа пользователю.{retry_block}"
)
def _run_codex(self, prompt: str, chat_id: int, message_id: int, job_id: str, job_num: Any) -> str:
output_lines: list[str] = []
with tempfile.NamedTemporaryFile(prefix="shine-codex-last-message-", suffix=".txt", delete=False) as tmp:
output_file = Path(tmp.name)
cmd = [
str(self.cfg.codex_bin),
"exec",
"--dangerously-bypass-approvals-and-sandbox",
"--json",
"-C", str(self.cfg.codex_workdir),
"-o", str(output_file),
prompt,
]
print(f"[py-bot] codex exec start job={job_id[:8]}", flush=True)
process = subprocess.Popen(
cmd,
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
encoding="utf-8",
errors="replace",
bufsize=1,
)
with self.active_process_lock:
self.active_process = process
self.last_heartbeat_at = 0.0
last_user_note = ""
last_user_note_at = 0.0
def on_line(line: str) -> None:
nonlocal last_user_note, last_user_note_at
output_lines.append(line)
note = self._extract_codex_user_note(line)
now = time.time()
if note and note != last_user_note and now - last_user_note_at > 8:
self._safe_send(chat_id, f"#{job_num}: {note}", reply_to=message_id)
last_user_note = note
last_user_note_at = now
if now - self.last_heartbeat_at > 60:
self._safe_send(chat_id, f"#{job_num}: всё ещё выполняется...", reply_to=message_id)
self.last_heartbeat_at = now
reader_done = threading.Event()
def reader() -> None:
if not process.stdout:
reader_done.set()
return
for line in process.stdout:
on_line(line.rstrip("\n"))
reader_done.set()
t = threading.Thread(target=reader, name=f"codex-reader-{job_id[:8]}", daemon=True)
t.start()
try:
return_code = process.wait(timeout=self.cfg.codex_timeout_seconds)
except subprocess.TimeoutExpired:
process.kill()
t.join(timeout=2)
raise RuntimeError(f"Codex timeout after {self.cfg.codex_timeout_seconds}s")
finally:
with self.active_process_lock:
self.active_process = None
reader_done.wait(timeout=2)
if return_code != 0:
tail = "\n".join(output_lines[-40:])
raise RuntimeError(f"Codex exited with code {return_code}. Output tail:\n{tail}")
if output_file.exists():
answer = output_file.read_text(encoding="utf-8").strip()
try:
output_file.unlink(missing_ok=True)
except Exception:
pass
if answer:
return answer
fallback = self._extract_fallback_message(output_lines)
if not fallback:
raise RuntimeError("Codex returned empty response")
return fallback
def _stop_active_codex_process(self) -> bool:
with self.active_process_lock:
process = self.active_process
if process is None:
return False
if process.poll() is not None:
return False
process.terminate()
try:
process.wait(timeout=2)
except subprocess.TimeoutExpired:
process.kill()
return True
def _handle_job_failure(self, job: dict[str, Any], err: Exception) -> None:
job_id = job["id"]
job_num = job.get("num", "?")
chat_id = int(job["chat_id"])
message_id = int(job["message_id"])
error_text = str(err).strip() or err.__class__.__name__
print(f"[py-bot] Ошибка job={job_id[:8]}: {error_text}", flush=True)
with self.queue_lock:
target = next((j for j in self.queue if j.get("id") == job_id), None)
if not target:
return
attempts = int(target.get("attempts", 0)) + 1
target["attempts"] = attempts
target["last_error"] = error_text[:1000]
target["updated_at"] = now_iso()
if attempts < self.cfg.max_retries:
target["status"] = "pending"
target["retry_reason"] = error_text[:200]
self._persist_queue()
will_retry = True
else:
self.queue = [j for j in self.queue if j.get("id") != job_id]
self._persist_queue()
will_retry = False
if will_retry:
self._safe_send(chat_id, f"Ошибка задачи #{job_num}, повтор: {attempts}/{self.cfg.max_retries}", reply_to=message_id)
else:
self._safe_send(chat_id, f"Ошибка задачи #{job_num}. Лимит попыток исчерпан.", reply_to=message_id)
def _mark_job_done(self, job_id: str) -> None:
with self.queue_lock:
self.queue = [j for j in self.queue if j.get("id") != job_id]
self._persist_queue()
def _mark_job_removed(self, job_id: str) -> None:
with self.queue_lock:
self.queue = [j for j in self.queue if j.get("id") != job_id]
self._persist_queue()
def _safe_send(self, chat_id: int, text: str, reply_to: int | None = None) -> None:
text = (text or "").strip()
if not text:
return
if len(text) > 3900:
text = text[:3900] + "\n...[обрезано]"
try:
self.telegram.send_message(chat_id, text, reply_to_message_id=reply_to)
except Exception as e:
print(f"[py-bot] sendMessage error: {e}", flush=True)
def _transcribe_voice_job(self, job: dict[str, Any]) -> str:
if not self.cfg.openai_api_key:
raise RuntimeError("Не задан OPENAI_API_KEY для распознавания voice")
file_id = (job.get("telegram_file_id") or "").strip()
if not file_id:
raise RuntimeError("Пустой telegram_file_id")
file_bytes, filename = self._download_telegram_file(file_id)
text = self._openai_transcribe(file_bytes, filename).strip()
if not text:
raise RuntimeError("Распознавание вернуло пустой текст")
return text
def _download_telegram_file(self, file_id: str) -> tuple[bytes, str]:
result = self.telegram.call("getFile", {"file_id": file_id}, timeout=60)
info = result.get("result") or {}
file_path = info.get("file_path")
if not file_path:
raise RuntimeError("Telegram getFile не вернул file_path")
file_url = f"https://api.telegram.org/file/bot{self.cfg.telegram_bot_token}/{file_path}"
req = request.Request(file_url, method="GET")
with request.urlopen(req, timeout=120) as resp:
data = resp.read()
original_name = Path(file_path).name or "audio.ogg"
lower = original_name.lower()
# OpenAI transcription может не принимать расширение .oga, нормализуем в .ogg.
if lower.endswith(".oga"):
base = original_name[:-4] if len(original_name) > 4 else "audio"
normalized = f"{base}.ogg"
else:
normalized = original_name
return data, normalized
def _openai_transcribe(self, file_bytes: bytes, filename: str) -> str:
boundary = "----shine-boundary-" + "".join(random.choices("abcdef0123456789", k=16))
mime = mimetypes.guess_type(filename)[0] or "application/octet-stream"
def text_part(name: str, value: str) -> bytes:
return (
f"--{boundary}\r\n"
f'Content-Disposition: form-data; name="{name}"\r\n\r\n'
f"{value}\r\n"
).encode("utf-8")
body = bytearray()
body.extend(text_part("model", self.cfg.openai_transcribe_model))
body.extend(text_part("response_format", "text"))
body.extend(
(
f"--{boundary}\r\n"
f'Content-Disposition: form-data; name="file"; filename="{filename}"\r\n'
f"Content-Type: {mime}\r\n\r\n"
).encode("utf-8")
)
body.extend(file_bytes)
body.extend(b"\r\n")
body.extend(f"--{boundary}--\r\n".encode("utf-8"))
req = request.Request("https://api.openai.com/v1/audio/transcriptions", method="POST", data=bytes(body))
req.add_header("Authorization", f"Bearer {self.cfg.openai_api_key}")
req.add_header("Content-Type", f"multipart/form-data; boundary={boundary}")
try:
with request.urlopen(req, timeout=240) as resp:
return resp.read().decode("utf-8", errors="replace")
except error.HTTPError as e:
detail = e.read().decode("utf-8", errors="replace")
raise RuntimeError(f"OpenAI transcribe HTTP {e.code}: {detail}") from e
@staticmethod
def _extract_codex_user_note(line: str) -> str | None:
s = (line or "").strip()
if not s.startswith("{"):
return None
try:
obj = json.loads(s)
except Exception:
return None
if obj.get("type") != "item.completed":
return None
item = obj.get("item") or {}
if item.get("type") != "agent_message":
return None
text = (item.get("text") or "").strip()
if not text:
return None
if len(text) > 220:
return text[:220] + "..."
return text
@staticmethod
def _extract_fallback_message(lines: list[str]) -> str:
for line in reversed(lines):
line = line.strip()
if not line:
continue
if line.startswith("{") and '"type":' in line:
continue
if line.startswith("mcp:") or line.startswith("OpenAI Codex"):
continue
return line
return ""
def run_selftest(config: BotConfig, prompt: str) -> int:
cmd = [
str(config.codex_bin),
"exec",
"--dangerously-bypass-approvals-and-sandbox",
"--json",
"-C", str(config.codex_workdir),
prompt,
]
proc = subprocess.run(
cmd,
stdin=subprocess.DEVNULL,
capture_output=True,
text=True,
encoding="utf-8",
errors="replace",
)
print(proc.stdout)
if proc.stderr:
print(proc.stderr)
return proc.returncode
def main() -> int:
parser = argparse.ArgumentParser(description="SHiNE Python Telegram bot wrapper for Codex CLI")
parser.add_argument("--selftest-codex", default="", help="Выполнить только codex exec с этим prompt и выйти")
args = parser.parse_args()
root = Path(__file__).resolve().parent
cfg = BotConfig(root)
if args.selftest_codex:
return run_selftest(cfg, args.selftest_codex)
service = ShinePyBotService(cfg)
try:
service.run()
except KeyboardInterrupt:
service.shutdown()
return 0
except Exception as e:
print(f"[py-bot] FATAL: {e}", flush=True)
return 1
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@ -0,0 +1,28 @@
#!/usr/bin/env bash
set -euo pipefail
ROOT_DIR="/home/ai/work/SHiNE/SHiNE-server-sha256"
SERVICE_DIR="${ROOT_DIR}/SHiNE-agent-bot-coder"
UNIT_SRC="${SERVICE_DIR}/scripts/systemd/shine-agent-bot-coder.service"
UNIT_DST="${HOME}/.config/systemd/user/shine-agent-bot-coder.service"
echo "[1/6] Проверка python3..."
command -v python3 >/dev/null 2>&1 || { echo "python3 не найден"; exit 1; }
echo "[2/6] Подготовка папки логов..."
mkdir -p "${SERVICE_DIR}/logs"
echo "[3/6] Копирование user systemd unit..."
mkdir -p "$(dirname "${UNIT_DST}")"
cp "${UNIT_SRC}" "${UNIT_DST}"
echo "[4/6] daemon-reload..."
systemctl --user daemon-reload
echo "[5/6] enable + start..."
systemctl --user enable --now shine-agent-bot-coder
echo "[6/6] Статус:"
systemctl --user status shine-agent-bot-coder --no-pager
echo "Готово. Логи: journalctl --user -u shine-agent-bot-coder -f"

View File

@ -0,0 +1,19 @@
[Unit]
Description=SHiNE Agent Bot Coder (Telegram + Codex queue worker)
After=network-online.target
Wants=network-online.target
[Service]
Type=simple
WorkingDirectory=/home/ai/work/SHiNE/SHiNE-server-sha256/SHiNE-agent-bot-coder
EnvironmentFile=/home/ai/work/SHiNE/SHiNE-server-sha256/SHiNE-agent-bot-coder/.env
ExecStart=/usr/bin/python3 /home/ai/work/SHiNE/SHiNE-server-sha256/SHiNE-agent-bot-coder/py_bot_service.py
Restart=always
RestartSec=5
TimeoutStopSec=20
SuccessExitStatus=143 0
StandardOutput=append:/home/ai/work/SHiNE/SHiNE-server-sha256/SHiNE-agent-bot-coder/logs/service.log
StandardError=append:/home/ai/work/SHiNE/SHiNE-server-sha256/SHiNE-agent-bot-coder/logs/service.log
[Install]
WantedBy=default.target

View File

@ -0,0 +1,76 @@
package shine.agent.botcoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.telegram.telegrambots.meta.TelegramBotsApi;
import org.telegram.telegrambots.updatesreceivers.DefaultBotSession;
import shine.agent.botcoder.codex.CodexClient;
import shine.agent.botcoder.config.AppConfig;
import shine.agent.botcoder.history.HistoryManager;
import shine.agent.botcoder.openai.OpenAiTranscriber;
import shine.agent.botcoder.queue.QueueStore;
import shine.agent.botcoder.state.RuntimeStateStore;
import shine.agent.botcoder.state.SingleInstanceLock;
import shine.agent.botcoder.telegram.ProcessedUpdatesStore;
import shine.agent.botcoder.telegram.ShineAgentBot;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class BotCoderApplication {
private static final Logger log = LoggerFactory.getLogger(BotCoderApplication.class);
public static void main(String[] args) throws Exception {
Path serviceRoot = Path.of("").toAbsolutePath().normalize();
AppConfig config = AppConfig.load(serviceRoot);
Files.createDirectories(config.dataDir());
SingleInstanceLock appLock = SingleInstanceLock.tryAcquire(config.dataDir().resolve("app.lock"));
if (appLock == null) {
log.error("SHiNE-agent-bot-coder уже запущен: lock занят {}", config.dataDir().resolve("app.lock"));
return;
}
RuntimeStateStore stateStore = new RuntimeStateStore(config.dataDir().resolve("state.json"));
QueueStore queueStore = new QueueStore(config.dataDir().resolve("queue.jsonl"), stateStore);
HistoryManager historyManager = new HistoryManager(
config.dataDir().resolve("history"),
config.dataDir().resolve("history").resolve("archive"),
stateStore
);
List<String> recovered = queueStore.recoverActiveJobs();
if (!recovered.isEmpty()) {
historyManager.appendSystemEvent("active_jobs_recovered", java.util.Map.of("jobIds", recovered));
}
OpenAiTranscriber transcriber = new OpenAiTranscriber(config.openAiApiKey(), config.openAiTranscribeModel());
CodexClient codexClient = new CodexClient(config.codexBin(), config.codexWorkDir(), config.codexTimeoutSeconds());
ProcessedUpdatesStore processedUpdatesStore = new ProcessedUpdatesStore(
config.dataDir().resolve("processed_updates.log"),
5000
);
ShineAgentBot bot = new ShineAgentBot(config, queueStore, historyManager, transcriber, codexClient, processedUpdatesStore);
bot.startWorkers();
TelegramBotsApi botsApi = new TelegramBotsApi(DefaultBotSession.class);
botsApi.registerBot(bot);
log.info("SHiNE-agent-bot-coder запущен. allowed user: @{}", config.allowedTelegramUsername());
CountDownLatch latch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
bot.shutdown();
try {
appLock.close();
} catch (Exception e) {
log.warn("Не удалось закрыть lock-файл", e);
}
latch.countDown();
}, "shine-agent-bot-shutdown"));
latch.await();
}
}

View File

@ -0,0 +1,225 @@
package shine.agent.botcoder.codex;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CodexClient {
private static final Logger log = LoggerFactory.getLogger(CodexClient.class);
private final Path codexBin;
private final Path codexWorkDir;
private final int timeoutSeconds;
private final AtomicReference<Process> activeProcess = new AtomicReference<>();
public CodexClient(Path codexBin, Path codexWorkDir, int timeoutSeconds) {
this.codexBin = codexBin;
this.codexWorkDir = codexWorkDir;
this.timeoutSeconds = timeoutSeconds;
}
public String executePrompt(String prompt, CodexStatusListener statusListener) throws IOException, InterruptedException {
Path lastMessageFile = Files.createTempFile("shine-codex-last-message-", ".txt");
List<String> command = new ArrayList<>();
command.add(codexBin.toString());
command.add("exec");
command.add("--dangerously-bypass-approvals-and-sandbox");
command.add("--json");
command.add("-C");
command.add(codexWorkDir.toString());
command.add("-o");
command.add(lastMessageFile.toString());
command.add(prompt);
log.info("Запуск codex exec, bin={}, workdir={}", codexBin, codexWorkDir);
ProcessBuilder builder = new ProcessBuilder(command);
builder.redirectErrorStream(true);
Process process = builder.start();
activeProcess.set(process);
if (statusListener != null) {
statusListener.onStatus("Codex запущен");
}
StringBuilder output = new StringBuilder();
Thread outputThread = new Thread(() -> readOutput(process, output, statusListener));
outputThread.setDaemon(true);
outputThread.start();
boolean finished;
try {
finished = process.waitFor(timeoutSeconds, TimeUnit.SECONDS);
} catch (InterruptedException interrupted) {
process.destroyForcibly();
joinOutputThread(outputThread);
activeProcess.compareAndSet(process, null);
Thread.currentThread().interrupt();
throw interrupted;
}
try {
if (!finished) {
process.destroyForcibly();
joinOutputThread(outputThread);
log.error("Codex timeout after {}s", timeoutSeconds);
throw new IOException("Codex timeout after " + timeoutSeconds + "s");
}
joinOutputThread(outputThread);
int exitCode = process.exitValue();
String lastMessage = "";
if (Files.exists(lastMessageFile)) {
lastMessage = Files.readString(lastMessageFile, StandardCharsets.UTF_8).trim();
}
if (exitCode != 0) {
log.error("Codex exit code={}, outputTail={}", exitCode, tail(output.toString(), 500));
throw new IOException("Codex exited with code " + exitCode + ". Output: " + tail(output.toString(), 1800));
}
if (!lastMessage.isBlank()) {
return lastMessage;
}
String fallback = extractFallbackMessage(output.toString());
if (fallback.isBlank()) {
throw new IOException("Codex returned empty response");
}
return fallback;
} finally {
activeProcess.compareAndSet(process, null);
try {
Files.deleteIfExists(lastMessageFile);
} catch (IOException ignored) {
}
}
}
public boolean stopActiveProcess() {
Process process = activeProcess.getAndSet(null);
if (process == null) {
return false;
}
process.destroy();
try {
if (!process.waitFor(2, TimeUnit.SECONDS)) {
process.destroyForcibly();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
process.destroyForcibly();
}
return true;
}
private void readOutput(Process process, StringBuilder output, CodexStatusListener statusListener) {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream(), StandardCharsets.UTF_8))) {
String line;
while ((line = reader.readLine()) != null) {
output.append(line).append('\n');
String status = normalizeStatusLine(line);
if (status != null && statusListener != null) {
statusListener.onStatus(status);
}
}
} catch (Exception ignored) {
}
}
private String normalizeStatusLine(String line) {
String trimmed = line == null ? "" : line.trim();
if (trimmed.isEmpty()) {
return null;
}
if (trimmed.contains("\"type\":\"thread.started\"")) {
return "Codex: инициализировал сессию";
}
if (trimmed.contains("\"type\":\"turn.started\"")) {
return "Codex: начал обработку запроса";
}
if (trimmed.contains("\"type\":\"item.completed\"") && trimmed.contains("\"type\":\"agent_message\"")) {
return "Codex: формирует финальный ответ";
}
if (trimmed.contains("\"type\":\"turn.completed\"")) {
return "Codex: завершил шаг";
}
if (trimmed.contains("\"type\":\"error\"")) {
return "Codex: ошибка выполнения";
}
if (trimmed.contains("\"type\":\"reasoning\"")) {
return "Codex: анализирует задачу";
}
if (trimmed.contains("\"type\":\"function_call\"")) {
return "Codex: вызывает инструмент";
}
if (trimmed.contains("\"type\":\"function_call_output\"")) {
return "Codex: получил результат инструмента";
}
if (trimmed.contains("\"type\":\"message\"") && trimmed.contains("\"role\":\"assistant\"")) {
return "Codex: формирует ответ";
}
if (trimmed.startsWith("mcp")) {
return "Codex: инициализирует MCP";
}
if (trimmed.startsWith("tokens used")) {
return "Codex: завершает обработку";
}
if (trimmed.startsWith("ERROR:")) {
return "Codex: ошибка выполнения";
}
return null;
}
private void joinOutputThread(Thread outputThread) throws InterruptedException {
try {
outputThread.join(Duration.ofSeconds(2).toMillis());
} catch (InterruptedException interrupted) {
Thread.currentThread().interrupt();
throw interrupted;
}
}
private String extractFallbackMessage(String rawOutput) {
String[] lines = rawOutput.split("\\R");
for (int i = lines.length - 1; i >= 0; i--) {
String line = lines[i].trim();
if (line.isEmpty()) {
continue;
}
if (line.startsWith("tokens used")) {
continue;
}
if (line.startsWith("OpenAI Codex")) {
continue;
}
if (line.startsWith("workdir:") || line.startsWith("model:") || line.startsWith("provider:")) {
continue;
}
if (line.startsWith("approval:") || line.startsWith("sandbox:") || line.startsWith("reasoning")) {
continue;
}
if (line.equals("user") || line.equals("exec") || line.equals("--------")) {
continue;
}
return line;
}
return "";
}
private String tail(String value, int maxLen) {
if (value.length() <= maxLen) {
return value;
}
return value.substring(value.length() - maxLen);
}
}

View File

@ -0,0 +1,5 @@
package shine.agent.botcoder.codex;
public interface CodexStatusListener {
void onStatus(String message);
}

View File

@ -0,0 +1,88 @@
package shine.agent.botcoder.config;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Map;
public record AppConfig(
String telegramBotToken,
String botUsername,
String allowedTelegramUsername,
String openAiApiKey,
String openAiTranscribeModel,
Path codexBin,
Path codexWorkDir,
int codexTimeoutSeconds,
int maxRetries,
Path dataDir,
Path agentInstructionsFile
) {
public static AppConfig load(Path serviceRoot) throws IOException {
Map<String, String> env = EnvLoader.load(serviceRoot.resolve(".env"));
String telegramBotToken = required(env, "TELEGRAM_BOT_TOKEN");
String openAiApiKey = required(env, "OPENAI_API_KEY");
String botUsername = env.getOrDefault("BOT_USERNAME", "aidar_su_bot");
String allowed = normalizeUsername(env.getOrDefault("ALLOWED_TELEGRAM_USERNAME", "AidarKC"));
String transcribeModel = env.getOrDefault("OPENAI_TRANSCRIBE_MODEL", "gpt-4o-mini-transcribe");
Path codexBin = Path.of(env.getOrDefault(
"CODEX_BIN",
"/home/ai/.cache/JetBrains/IntelliJIdea2026.1/aia/codex/bin/codex-x86_64-unknown-linux-musl"
));
Path codexWorkDir = Path.of(env.getOrDefault(
"CODEX_WORKDIR",
"/home/ai/work/SHiNE/SHiNE-server-sha256"
));
int timeout = parseInt(env.getOrDefault("CODEX_TIMEOUT_SECONDS", "900"), 900);
int retries = parseInt(env.getOrDefault("MAX_RETRIES", "3"), 3);
if (retries < 1) {
retries = 1;
}
Path dataDir = serviceRoot.resolve(env.getOrDefault("DATA_DIR", "./data")).normalize();
Path agentInstructions = serviceRoot.resolve("AGENT.md").normalize();
return new AppConfig(
telegramBotToken,
botUsername,
allowed,
openAiApiKey,
transcribeModel,
codexBin,
codexWorkDir,
timeout,
retries,
dataDir,
agentInstructions
);
}
public static String normalizeUsername(String value) {
if (value == null) {
return "";
}
String normalized = value.trim();
if (normalized.startsWith("@")) {
normalized = normalized.substring(1);
}
return normalized.toLowerCase();
}
private static String required(Map<String, String> env, String key) {
String value = env.get(key);
if (value == null || value.isBlank()) {
throw new IllegalArgumentException("Не задан обязательный параметр: " + key);
}
return value.trim();
}
private static int parseInt(String value, int fallback) {
try {
return Integer.parseInt(value.trim());
} catch (Exception ignored) {
return fallback;
}
}
}

View File

@ -0,0 +1,45 @@
package shine.agent.botcoder.config;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
public final class EnvLoader {
private EnvLoader() {
}
public static Map<String, String> load(Path envFile) throws IOException {
Map<String, String> values = new HashMap<>();
if (Files.exists(envFile)) {
for (String rawLine : Files.readAllLines(envFile)) {
String line = rawLine.trim();
if (line.isEmpty() || line.startsWith("#")) {
continue;
}
int idx = line.indexOf('=');
if (idx <= 0) {
continue;
}
String key = line.substring(0, idx).trim();
String value = line.substring(idx + 1).trim();
values.put(key, stripQuotes(value));
}
}
System.getenv().forEach(values::put);
return values;
}
private static String stripQuotes(String value) {
if (value.length() >= 2) {
char first = value.charAt(0);
char last = value.charAt(value.length() - 1);
if ((first == '"' && last == '"') || (first == '\'' && last == '\'')) {
return value.substring(1, value.length() - 1);
}
}
return value;
}
}

View File

@ -0,0 +1,162 @@
package shine.agent.botcoder.history;
import com.fasterxml.jackson.databind.ObjectMapper;
import shine.agent.botcoder.state.RuntimeState;
import shine.agent.botcoder.state.RuntimeStateStore;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.nio.file.StandardOpenOption;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
public class HistoryManager {
private static final DateTimeFormatter FILE_TS = DateTimeFormatter.ofPattern("yyyy-MM-dd_HHmmss");
private final Path historyDir;
private final Path archiveDir;
private final RuntimeStateStore stateStore;
private final ObjectMapper mapper;
public HistoryManager(Path historyDir, Path archiveDir, RuntimeStateStore stateStore) throws IOException {
this.historyDir = historyDir;
this.archiveDir = archiveDir;
this.stateStore = stateStore;
this.mapper = new ObjectMapper();
Files.createDirectories(historyDir);
Files.createDirectories(archiveDir);
ensureCurrentFile();
}
public synchronized Path currentHistoryFile() throws IOException {
return ensureCurrentFile();
}
public synchronized Path rotateHistory(String reason, String requestedBy) throws IOException {
Path current = ensureCurrentFile();
Path archived = archiveDir.resolve(current.getFileName().toString());
Files.move(current, archived, StandardCopyOption.REPLACE_EXISTING);
Path next = newHistoryFile();
appendSystemEvent(
"history_rotated",
Map.of(
"reason", reason,
"requestedBy", requestedBy,
"archivedFile", archived.toString()
)
);
return next;
}
public synchronized void appendIncomingText(long chatId, int messageId, String username, String text) throws IOException {
Map<String, Object> payload = basePayload("incoming_text");
payload.put("chatId", chatId);
payload.put("messageId", messageId);
payload.put("username", username);
payload.put("text", text);
append(payload);
}
public synchronized void appendIncomingVoice(long chatId, int messageId, String username, String fileId) throws IOException {
Map<String, Object> payload = basePayload("incoming_voice");
payload.put("chatId", chatId);
payload.put("messageId", messageId);
payload.put("username", username);
payload.put("telegramFileId", fileId);
append(payload);
}
public synchronized void appendTranscription(String jobId, String text) throws IOException {
Map<String, Object> payload = basePayload("voice_transcription");
payload.put("jobId", jobId);
payload.put("text", text);
append(payload);
}
public synchronized void appendCodexRequest(String jobId, String prompt) throws IOException {
Map<String, Object> payload = basePayload("codex_request");
payload.put("jobId", jobId);
payload.put("prompt", prompt);
append(payload);
}
public synchronized void appendCodexResponse(String jobId, String response) throws IOException {
Map<String, Object> payload = basePayload("codex_response");
payload.put("jobId", jobId);
payload.put("response", response);
append(payload);
}
public synchronized void appendOutgoingMessage(String jobId, long chatId, String text) throws IOException {
Map<String, Object> payload = basePayload("outgoing_message");
payload.put("jobId", jobId);
payload.put("chatId", chatId);
payload.put("text", text);
append(payload);
}
public synchronized void appendJobError(String jobId, String error, boolean willRetry, int attempts, int maxRetries) throws IOException {
Map<String, Object> payload = basePayload("job_error");
payload.put("jobId", jobId);
payload.put("error", error);
payload.put("willRetry", willRetry);
payload.put("attempts", attempts);
payload.put("maxRetries", maxRetries);
append(payload);
}
public synchronized void appendSystemEvent(String event, Map<String, Object> fields) throws IOException {
Map<String, Object> payload = basePayload(event);
payload.putAll(fields);
append(payload);
}
private Map<String, Object> basePayload(String type) {
Map<String, Object> payload = new HashMap<>();
payload.put("timestamp", Instant.now().toString());
payload.put("type", type);
return payload;
}
private void append(Map<String, Object> payload) throws IOException {
Path current = ensureCurrentFile();
Files.writeString(
current,
mapper.writeValueAsString(payload) + System.lineSeparator(),
StandardCharsets.UTF_8,
StandardOpenOption.CREATE,
StandardOpenOption.APPEND
);
}
private Path ensureCurrentFile() throws IOException {
RuntimeState snapshot = stateStore.snapshot();
if (snapshot.currentHistoryFile != null && !snapshot.currentHistoryFile.isBlank()) {
Path configured = Path.of(snapshot.currentHistoryFile);
if (!configured.isAbsolute()) {
configured = historyDir.resolve(snapshot.currentHistoryFile).normalize();
}
Files.createDirectories(configured.getParent());
if (!Files.exists(configured)) {
Files.createFile(configured);
}
return configured;
}
return newHistoryFile();
}
private Path newHistoryFile() throws IOException {
String name = FILE_TS.format(LocalDateTime.now()) + "_" + UUID.randomUUID().toString().substring(0, 8) + ".jsonl";
Path file = historyDir.resolve(name);
Files.createFile(file);
stateStore.setCurrentHistoryFile(file.toString());
return file;
}
}

View File

@ -0,0 +1,81 @@
package shine.agent.botcoder.openai;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.UUID;
public class OpenAiTranscriber {
private final HttpClient httpClient;
private final ObjectMapper mapper;
private final String apiKey;
private final String model;
public OpenAiTranscriber(String apiKey, String model) {
this.apiKey = apiKey;
this.model = model;
this.httpClient = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(20))
.build();
this.mapper = new ObjectMapper();
}
public String transcribe(byte[] audioBytes, String fileName) throws IOException, InterruptedException {
String boundary = "----shine-boundary-" + UUID.randomUUID();
byte[] body = buildMultipartBody(boundary, audioBytes, fileName);
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create("https://api.openai.com/v1/audio/transcriptions"))
.timeout(Duration.ofSeconds(120))
.header("Authorization", "Bearer " + apiKey)
.header("Content-Type", "multipart/form-data; boundary=" + boundary)
.POST(HttpRequest.BodyPublishers.ofByteArray(body))
.build();
HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString(StandardCharsets.UTF_8));
if (response.statusCode() >= 300) {
throw new IOException("OpenAI transcription error HTTP " + response.statusCode() + ": " + response.body());
}
JsonNode root = mapper.readTree(response.body());
JsonNode text = root.get("text");
if (text == null || text.asText().isBlank()) {
throw new IOException("OpenAI transcription returned empty text");
}
return text.asText().trim();
}
private byte[] buildMultipartBody(String boundary, byte[] audioBytes, String fileName) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
String lineEnd = "\r\n";
String prefix = "--" + boundary + lineEnd;
out.write(prefix.getBytes(StandardCharsets.UTF_8));
out.write(("Content-Disposition: form-data; name=\"model\"" + lineEnd + lineEnd).getBytes(StandardCharsets.UTF_8));
out.write(model.getBytes(StandardCharsets.UTF_8));
out.write(lineEnd.getBytes(StandardCharsets.UTF_8));
out.write(prefix.getBytes(StandardCharsets.UTF_8));
out.write(("Content-Disposition: form-data; name=\"language\"" + lineEnd + lineEnd).getBytes(StandardCharsets.UTF_8));
out.write("ru".getBytes(StandardCharsets.UTF_8));
out.write(lineEnd.getBytes(StandardCharsets.UTF_8));
out.write(prefix.getBytes(StandardCharsets.UTF_8));
out.write(("Content-Disposition: form-data; name=\"file\"; filename=\"" + fileName + "\"" + lineEnd).getBytes(StandardCharsets.UTF_8));
out.write(("Content-Type: audio/ogg" + lineEnd + lineEnd).getBytes(StandardCharsets.UTF_8));
out.write(audioBytes);
out.write(lineEnd.getBytes(StandardCharsets.UTF_8));
out.write(("--" + boundary + "--" + lineEnd).getBytes(StandardCharsets.UTF_8));
return out.toByteArray();
}
}

View File

@ -0,0 +1,4 @@
package shine.agent.botcoder.queue;
public record FailureResult(boolean willRetry, int attempts, int maxRetries) {
}

View File

@ -0,0 +1,54 @@
package shine.agent.botcoder.queue;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import java.time.Instant;
import java.util.UUID;
@JsonIgnoreProperties(ignoreUnknown = true)
public class QueueJob {
public String id;
public QueueStatus status;
public String type;
public long chatId;
public int messageId;
public String username;
public String text;
public String telegramFileId;
public String historyFile;
public String createdAt;
public String updatedAt;
public String activeSince;
public int attempts;
public String retryReason;
public String lastError;
public static QueueJob textJob(long chatId, int messageId, String username, String text, String historyFile) {
QueueJob job = baseJob(chatId, messageId, username, historyFile);
job.type = "text";
job.text = text;
return job;
}
public static QueueJob voiceJob(long chatId, int messageId, String username, String fileId, String historyFile) {
QueueJob job = baseJob(chatId, messageId, username, historyFile);
job.type = "voice";
job.telegramFileId = fileId;
return job;
}
private static QueueJob baseJob(long chatId, int messageId, String username, String historyFile) {
QueueJob job = new QueueJob();
String now = Instant.now().toString();
job.id = UUID.randomUUID().toString();
job.status = QueueStatus.PENDING;
job.chatId = chatId;
job.messageId = messageId;
job.username = username;
job.historyFile = historyFile;
job.createdAt = now;
job.updatedAt = now;
job.attempts = 0;
return job;
}
}

View File

@ -0,0 +1,6 @@
package shine.agent.botcoder.queue;
public enum QueueStatus {
PENDING,
ACTIVE
}

View File

@ -0,0 +1,203 @@
package shine.agent.botcoder.queue;
import com.fasterxml.jackson.databind.ObjectMapper;
import shine.agent.botcoder.state.RuntimeStateStore;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
public class QueueStore {
private final Path queueFile;
private final RuntimeStateStore stateStore;
private final ObjectMapper mapper;
private final List<QueueJob> jobs;
public QueueStore(Path queueFile, RuntimeStateStore stateStore) throws IOException {
this.queueFile = queueFile;
this.stateStore = stateStore;
this.mapper = new ObjectMapper();
Path parent = queueFile.getParent();
if (parent != null) {
Files.createDirectories(parent);
}
this.jobs = loadQueue();
persistQueue();
}
public synchronized void enqueue(QueueJob job) throws IOException {
jobs.add(job);
persistQueue();
}
public synchronized List<String> recoverActiveJobs() throws IOException {
List<String> recovered = new ArrayList<>();
String now = Instant.now().toString();
for (QueueJob job : jobs) {
if (job.status == QueueStatus.ACTIVE) {
job.status = QueueStatus.PENDING;
job.retryReason = "service_restart_recovery";
job.updatedAt = now;
recovered.add(job.id);
}
}
stateStore.setActiveJobId(null);
persistQueue();
return recovered;
}
public synchronized Optional<QueueJob> activateNext() throws IOException {
for (QueueJob job : jobs) {
if (job.status == QueueStatus.PENDING) {
job.status = QueueStatus.ACTIVE;
job.activeSince = Instant.now().toString();
job.updatedAt = job.activeSince;
stateStore.setActiveJobId(job.id);
persistQueue();
return Optional.of(job);
}
}
return Optional.empty();
}
public synchronized void markDone(String jobId) throws IOException {
Iterator<QueueJob> iterator = jobs.iterator();
while (iterator.hasNext()) {
QueueJob job = iterator.next();
if (job.id.equals(jobId)) {
iterator.remove();
break;
}
}
stateStore.setActiveJobId(null);
persistQueue();
}
public synchronized Optional<QueueJob> getActiveJob() {
return jobs.stream().filter(j -> j.status == QueueStatus.ACTIVE).findFirst();
}
public synchronized int pendingCount() {
int count = 0;
for (QueueJob job : jobs) {
if (job.status == QueueStatus.PENDING) {
count++;
}
}
return count;
}
public synchronized int totalCount() {
return jobs.size();
}
public synchronized List<QueueJob> snapshot() {
return new ArrayList<>(jobs);
}
public synchronized boolean cancelActiveJob(String reason) throws IOException {
for (Iterator<QueueJob> iterator = jobs.iterator(); iterator.hasNext(); ) {
QueueJob job = iterator.next();
if (job.status == QueueStatus.ACTIVE) {
iterator.remove();
stateStore.setActiveJobId(null);
persistQueue();
return true;
}
}
return false;
}
public synchronized int cancelAll(String reason) throws IOException {
int size = jobs.size();
if (size == 0) {
return 0;
}
jobs.clear();
stateStore.setActiveJobId(null);
persistQueue();
return size;
}
public synchronized boolean cancelByIdPrefix(String idPrefix) throws IOException {
if (idPrefix == null || idPrefix.isBlank()) {
return false;
}
String normalized = idPrefix.trim().toLowerCase();
for (Iterator<QueueJob> iterator = jobs.iterator(); iterator.hasNext(); ) {
QueueJob job = iterator.next();
if (job.id != null && job.id.toLowerCase().startsWith(normalized)) {
if (job.status == QueueStatus.ACTIVE) {
stateStore.setActiveJobId(null);
}
iterator.remove();
persistQueue();
return true;
}
}
return false;
}
public synchronized FailureResult markFailed(String jobId, String error, int maxRetries) throws IOException {
for (Iterator<QueueJob> it = jobs.iterator(); it.hasNext(); ) {
QueueJob job = it.next();
if (!job.id.equals(jobId)) {
continue;
}
job.attempts = job.attempts + 1;
job.lastError = error;
job.updatedAt = Instant.now().toString();
stateStore.setActiveJobId(null);
if (job.attempts < maxRetries) {
job.status = QueueStatus.PENDING;
job.retryReason = error;
persistQueue();
return new FailureResult(true, job.attempts, maxRetries);
}
it.remove();
persistQueue();
return new FailureResult(false, job.attempts, maxRetries);
}
stateStore.setActiveJobId(null);
persistQueue();
return new FailureResult(false, maxRetries, maxRetries);
}
private List<QueueJob> loadQueue() throws IOException {
List<QueueJob> loaded = new ArrayList<>();
if (!Files.exists(queueFile)) {
return loaded;
}
for (String line : Files.readAllLines(queueFile, StandardCharsets.UTF_8)) {
String trimmed = line.trim();
if (trimmed.isEmpty()) {
continue;
}
loaded.add(mapper.readValue(trimmed, QueueJob.class));
}
return loaded;
}
private void persistQueue() throws IOException {
Files.writeString(queueFile, "", StandardCharsets.UTF_8);
for (QueueJob job : jobs) {
Files.writeString(
queueFile,
mapper.writeValueAsString(job) + System.lineSeparator(),
StandardCharsets.UTF_8,
StandardOpenOption.APPEND
);
}
}
}

View File

@ -0,0 +1,10 @@
package shine.agent.botcoder.state;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
@JsonIgnoreProperties(ignoreUnknown = true)
public class RuntimeState {
public String activeJobId;
public String currentHistoryFile;
public String updatedAt;
}

View File

@ -0,0 +1,75 @@
package shine.agent.botcoder.state;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Instant;
public class RuntimeStateStore {
private final Path stateFile;
private final ObjectMapper mapper;
private RuntimeState state;
public RuntimeStateStore(Path stateFile) throws IOException {
this.stateFile = stateFile;
this.mapper = new ObjectMapper().enable(SerializationFeature.INDENT_OUTPUT);
Path parent = stateFile.getParent();
if (parent != null) {
Files.createDirectories(parent);
}
this.state = loadOrCreate();
persist();
}
public synchronized RuntimeState snapshot() {
RuntimeState copy = new RuntimeState();
copy.activeJobId = state.activeJobId;
copy.currentHistoryFile = state.currentHistoryFile;
copy.updatedAt = state.updatedAt;
return copy;
}
public synchronized void setActiveJobId(String activeJobId) throws IOException {
state.activeJobId = activeJobId;
state.updatedAt = Instant.now().toString();
persist();
}
public synchronized void setCurrentHistoryFile(String historyFile) throws IOException {
state.currentHistoryFile = historyFile;
state.updatedAt = Instant.now().toString();
persist();
}
private RuntimeState loadOrCreate() throws IOException {
if (!Files.exists(stateFile)) {
RuntimeState created = new RuntimeState();
created.updatedAt = Instant.now().toString();
return created;
}
String raw = Files.readString(stateFile, StandardCharsets.UTF_8).trim();
if (raw.isEmpty()) {
RuntimeState created = new RuntimeState();
created.updatedAt = Instant.now().toString();
return created;
}
RuntimeState loaded = mapper.readValue(raw, RuntimeState.class);
if (loaded.updatedAt == null) {
loaded.updatedAt = Instant.now().toString();
}
return loaded;
}
private void persist() throws IOException {
Files.writeString(
stateFile,
mapper.writeValueAsString(state),
StandardCharsets.UTF_8
);
}
}

View File

@ -0,0 +1,50 @@
package shine.agent.botcoder.state;
import java.io.Closeable;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
public final class SingleInstanceLock implements Closeable {
private final FileChannel channel;
private final FileLock lock;
private final Path path;
private SingleInstanceLock(FileChannel channel, FileLock lock, Path path) {
this.channel = channel;
this.lock = lock;
this.path = path;
}
public static SingleInstanceLock tryAcquire(Path lockFile) throws IOException {
FileChannel channel = FileChannel.open(
lockFile,
StandardOpenOption.CREATE,
StandardOpenOption.WRITE
);
FileLock lock = channel.tryLock();
if (lock == null) {
channel.close();
return null;
}
return new SingleInstanceLock(channel, lock, lockFile);
}
public Path path() {
return path;
}
@Override
public void close() throws IOException {
try {
if (lock != null && lock.isValid()) {
lock.release();
}
} finally {
channel.close();
}
}
}

View File

@ -0,0 +1,75 @@
package shine.agent.botcoder.telegram;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Iterator;
public class ProcessedUpdatesStore {
private final Path file;
private final LinkedHashSet<String> ids = new LinkedHashSet<>();
private final int maxEntries;
public ProcessedUpdatesStore(Path file, int maxEntries) throws IOException {
this.file = file;
this.maxEntries = Math.max(100, maxEntries);
Path parent = file.getParent();
if (parent != null) {
Files.createDirectories(parent);
}
if (Files.exists(file)) {
List<String> lines = Files.readAllLines(file, StandardCharsets.UTF_8);
for (String line : lines) {
String id = line.trim();
if (!id.isEmpty()) {
ids.add(id);
}
}
}
trimIfNeeded();
persistAll();
}
public synchronized boolean isDuplicateAndMark(String id) throws IOException {
if (id == null || id.isBlank()) {
return false;
}
String normalized = id.trim();
if (ids.contains(normalized)) {
return true;
}
ids.add(normalized);
trimIfNeeded();
Files.writeString(file, normalized + System.lineSeparator(), StandardCharsets.UTF_8,
StandardOpenOption.CREATE, StandardOpenOption.APPEND);
return false;
}
private void trimIfNeeded() throws IOException {
if (ids.size() <= maxEntries) {
return;
}
int toRemove = ids.size() - maxEntries;
int removed = 0;
Iterator<String> it = ids.iterator();
while (it.hasNext() && removed < toRemove) {
it.next();
it.remove();
removed++;
}
persistAll();
}
private void persistAll() throws IOException {
StringBuilder sb = new StringBuilder();
for (String id : ids) {
sb.append(id).append(System.lineSeparator());
}
Files.writeString(file, sb.toString(), StandardCharsets.UTF_8);
}
}

View File

@ -0,0 +1,589 @@
package shine.agent.botcoder.telegram;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.telegram.telegrambots.bots.TelegramLongPollingBot;
import org.telegram.telegrambots.meta.api.methods.GetFile;
import org.telegram.telegrambots.meta.api.methods.send.SendMessage;
import org.telegram.telegrambots.meta.api.objects.Message;
import org.telegram.telegrambots.meta.api.objects.Update;
import org.telegram.telegrambots.meta.api.objects.User;
import org.telegram.telegrambots.meta.exceptions.TelegramApiException;
import shine.agent.botcoder.codex.CodexStatusListener;
import shine.agent.botcoder.codex.CodexClient;
import shine.agent.botcoder.config.AppConfig;
import shine.agent.botcoder.history.HistoryManager;
import shine.agent.botcoder.openai.OpenAiTranscriber;
import shine.agent.botcoder.queue.FailureResult;
import shine.agent.botcoder.queue.QueueJob;
import shine.agent.botcoder.queue.QueueStore;
import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
public class ShineAgentBot extends TelegramLongPollingBot {
private static final Logger log = LoggerFactory.getLogger(ShineAgentBot.class);
private final AppConfig config;
private final QueueStore queueStore;
private final HistoryManager historyManager;
private final OpenAiTranscriber transcriber;
private final CodexClient codexClient;
private final ExecutorService worker;
private final ExecutorService notifier;
private final AtomicBoolean running;
private final HttpClient httpClient;
private final ProcessedUpdatesStore processedUpdatesStore;
private final AtomicReference<QueueJob> activeJobRef = new AtomicReference<>();
private final AtomicLong activeJobStartedAt = new AtomicLong(0L);
private final ScheduledExecutorService heartbeatScheduler = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r, "shine-agent-heartbeat");
t.setDaemon(true);
return t;
});
public ShineAgentBot(
AppConfig config,
QueueStore queueStore,
HistoryManager historyManager,
OpenAiTranscriber transcriber,
CodexClient codexClient,
ProcessedUpdatesStore processedUpdatesStore
) {
this.config = config;
this.queueStore = queueStore;
this.historyManager = historyManager;
this.transcriber = transcriber;
this.codexClient = codexClient;
this.processedUpdatesStore = processedUpdatesStore;
this.worker = Executors.newSingleThreadExecutor(r -> {
Thread thread = new Thread(r, "shine-agent-bot-worker");
thread.setDaemon(true);
return thread;
});
this.notifier = Executors.newSingleThreadExecutor(r -> {
Thread thread = new Thread(r, "shine-agent-bot-notifier");
thread.setDaemon(true);
return thread;
});
this.running = new AtomicBoolean(true);
this.httpClient = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(20))
.build();
}
public void startWorkers() {
worker.submit(this::processLoop);
}
public void shutdown() {
running.set(false);
codexClient.stopActiveProcess();
worker.shutdown();
notifier.shutdown();
heartbeatScheduler.shutdownNow();
try {
if (!worker.awaitTermination(10, TimeUnit.SECONDS)) {
worker.shutdownNow();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
worker.shutdownNow();
}
try {
if (!notifier.awaitTermination(5, TimeUnit.SECONDS)) {
notifier.shutdownNow();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
notifier.shutdownNow();
}
}
@Override
public String getBotUsername() {
return config.botUsername();
}
@Override
public String getBotToken() {
return config.telegramBotToken();
}
@Override
public void onUpdateReceived(Update update) {
if (update == null || !update.hasMessage()) {
return;
}
Message message = update.getMessage();
try {
String updateId = message.getChatId() + ":" + message.getMessageId();
if (processedUpdatesStore.isDuplicateAndMark(updateId)) {
log.info("Дубликат update пропущен: {}", updateId);
return;
}
} catch (IOException e) {
log.error("Не удалось проверить дубликат update", e);
}
User from = message.getFrom();
if (from == null) {
return;
}
String username = AppConfig.normalizeUsername(from.getUserName());
if (!username.equals(config.allowedTelegramUsername())) {
return;
}
try {
if (message.hasText() && "/new".equalsIgnoreCase(message.getText().trim())) {
handleNewCommand(message, username);
return;
}
if (message.hasText() && handleControlCommand(message)) {
return;
}
if (message.hasText() && !message.getText().isBlank()) {
enqueueText(message, username);
return;
}
if (message.hasVoice()) {
enqueueVoice(message, username);
return;
}
if (message.hasAudio()) {
enqueueAudio(message, username);
}
} catch (Exception e) {
log.error("Ошибка обработки update", e);
safeSendText(message.getChatId(), "Ошибка обработки входящего сообщения: " + shortError(e), message.getMessageId());
}
}
private void handleNewCommand(Message message, String username) throws IOException {
historyManager.appendSystemEvent(
"command_new_received",
Map.of(
"chatId", message.getChatId(),
"messageId", message.getMessageId(),
"username", username
)
);
var archived = historyManager.rotateHistory("command_new", username);
String response = "История очищена. Новый диалог начат.\nАрхив: " + archived.getFileName();
safeSendText(message.getChatId(), response, message.getMessageId());
}
private void enqueueText(Message message, String username) throws IOException {
QueueJob job = QueueJob.textJob(
message.getChatId(),
message.getMessageId(),
username,
message.getText(),
historyManager.currentHistoryFile().toString()
);
historyManager.appendIncomingText(message.getChatId(), message.getMessageId(), username, message.getText());
queueStore.enqueue(job);
safeSendText(message.getChatId(), "Принял в очередь: " + shortId(job.id), message.getMessageId());
}
private void enqueueVoice(Message message, String username) throws IOException {
String fileId = message.getVoice().getFileId();
QueueJob job = QueueJob.voiceJob(
message.getChatId(),
message.getMessageId(),
username,
fileId,
historyManager.currentHistoryFile().toString()
);
historyManager.appendIncomingVoice(message.getChatId(), message.getMessageId(), username, fileId);
queueStore.enqueue(job);
safeSendText(message.getChatId(), "Голосовое принято в очередь: " + shortId(job.id), message.getMessageId());
}
private void enqueueAudio(Message message, String username) throws IOException {
String fileId = message.getAudio().getFileId();
QueueJob job = QueueJob.voiceJob(
message.getChatId(),
message.getMessageId(),
username,
fileId,
historyManager.currentHistoryFile().toString()
);
historyManager.appendIncomingVoice(message.getChatId(), message.getMessageId(), username, fileId);
queueStore.enqueue(job);
safeSendText(message.getChatId(), "Аудио принято в очередь: " + shortId(job.id), message.getMessageId());
}
private void processLoop() {
while (running.get()) {
try {
Optional<QueueJob> next = queueStore.activateNext();
if (next.isEmpty()) {
Thread.sleep(500);
continue;
}
processJob(next.get());
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
return;
} catch (Exception e) {
log.error("Ошибка worker loop", e);
try {
Thread.sleep(1000);
} catch (InterruptedException interrupted) {
Thread.currentThread().interrupt();
return;
}
}
}
}
private void processJob(QueueJob job) {
ScheduledFuture<?> heartbeat = null;
try {
log.info("Начало обработки jobId={}, type={}, chatId={}, attempts={}", job.id, job.type, job.chatId, job.attempts);
activeJobRef.set(job);
activeJobStartedAt.set(System.currentTimeMillis());
safeSendText(job.chatId, "Задача " + shortId(job.id) + " взята в работу.", job.messageId);
String userText = resolveUserText(job);
String prompt = buildPrompt(job, userText);
historyManager.appendCodexRequest(job.id, prompt);
log.info("Вызов Codex для jobId={}", job.id);
heartbeat = heartbeatScheduler.scheduleAtFixedRate(
() -> notifier.submit(() ->
safeSendText(job.chatId, "Статус " + shortId(job.id) + ": в работе " + elapsedSeconds() + "с", job.messageId)
),
30, 30, TimeUnit.SECONDS
);
String answer;
answer = codexClient.executePrompt(prompt, buildStatusListener(job));
log.info("Codex завершился для jobId={}, длина ответа={}", job.id, answer.length());
safeSendText(job.chatId, "Codex завершил обработку, отправляю результат.", job.messageId);
sendLongMessage(job.chatId, answer, job.messageId);
historyManager.appendCodexResponse(job.id, answer);
historyManager.appendOutgoingMessage(job.id, job.chatId, answer);
queueStore.markDone(job.id);
log.info("Задача завершена jobId={}", job.id);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
handleInterruptedJob(job, e);
} catch (Exception e) {
handleFailedJob(job, e);
} finally {
if (heartbeat != null) {
heartbeat.cancel(true);
}
activeJobRef.set(null);
activeJobStartedAt.set(0L);
}
}
private void handleInterruptedJob(QueueJob job, InterruptedException e) {
if (!running.get()) {
log.info("Обработка jobId={} прервана из-за остановки сервиса", job.id);
try {
historyManager.appendSystemEvent("job_interrupted_on_shutdown", Map.of(
"jobId", job.id,
"reason", shortError(e)
));
} catch (Exception ignored) {
}
return;
}
handleFailedJob(job, e);
}
private void handleFailedJob(QueueJob job, Exception e) {
String error = shortError(e);
log.error("Ошибка обработки jobId={}: {}", job.id, error, e);
try {
if (!isJobStillInQueue(job.id)) {
log.info("Задача {} уже удалена из очереди, ошибка не ретраится", job.id);
return;
}
FailureResult failure = queueStore.markFailed(job.id, error, config.maxRetries());
historyManager.appendJobError(job.id, error, failure.willRetry(), failure.attempts(), failure.maxRetries());
String message = failure.willRetry()
? "Ошибка выполнения задачи " + shortId(job.id) + ", повтор: " + failure.attempts() + "/" + failure.maxRetries()
: "Ошибка выполнения задачи " + shortId(job.id) + ". Лимит попыток исчерпан.";
safeSendText(job.chatId, message, job.messageId);
} catch (Exception inner) {
log.error("Не удалось зафиксировать ошибку задачи {}", job.id, inner);
}
}
private boolean isJobStillInQueue(String jobId) {
for (QueueJob item : queueStore.snapshot()) {
if (jobId.equals(item.id)) {
return true;
}
}
return false;
}
private CodexStatusListener buildStatusListener(QueueJob job) {
AtomicReference<String> lastStatus = new AtomicReference<>("");
AtomicLong lastSentAt = new AtomicLong(0L);
return status -> {
long now = System.currentTimeMillis();
String prev = lastStatus.get();
boolean changed = !status.equals(prev);
boolean heartbeatDue = now - lastSentAt.get() > 30_000;
if (changed || heartbeatDue) {
String text = changed
? "Статус " + shortId(job.id) + ": " + status
: "Статус " + shortId(job.id) + ": в работе " + elapsedSeconds() + "с";
notifier.submit(() -> safeSendText(job.chatId, text, job.messageId));
lastStatus.set(status);
lastSentAt.set(now);
}
};
}
private String resolveUserText(QueueJob job) throws IOException, InterruptedException, TelegramApiException {
if (!"voice".equals(job.type)) {
return job.text;
}
byte[] audio = downloadTelegramFile(job.telegramFileId);
String transcription = transcriber.transcribe(audio, job.id + ".ogg");
historyManager.appendTranscription(job.id, transcription);
safeSendText(job.chatId, "Распознано:\n" + transcription, job.messageId);
return transcription;
}
private byte[] downloadTelegramFile(String fileId) throws IOException, InterruptedException, TelegramApiException {
GetFile getFile = new GetFile(fileId);
org.telegram.telegrambots.meta.api.objects.File tgFile = execute(getFile);
String fileUrl = tgFile.getFileUrl(getBotToken());
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(fileUrl))
.timeout(Duration.ofSeconds(60))
.GET()
.build();
HttpResponse<byte[]> response = httpClient.send(request, HttpResponse.BodyHandlers.ofByteArray());
if (response.statusCode() >= 300) {
throw new IOException("Telegram file download failed HTTP " + response.statusCode());
}
return response.body();
}
private String buildPrompt(QueueJob job, String text) {
String retryBlock = "";
if (job.retryReason != null && !job.retryReason.isBlank()) {
retryBlock = "\n\ометка retry: " + job.retryReason;
}
return """
Пришло сообщение в Telegram.
Тип: %s
Username отправителя: @%s
Текст для обработки:
%s
История диалога (JSONL): %s
Инструкции агента: %s
Работай в рабочем проекте аккуратно и верни только текст ответа пользователю.%s
""".formatted(
job.type,
job.username,
text,
job.historyFile,
config.agentInstructionsFile(),
retryBlock
);
}
private boolean handleControlCommand(Message message) throws IOException {
String text = message.getText().trim();
String lower = text.toLowerCase();
if ("/start".equals(lower) || "/help".equals(lower)) {
safeSendText(message.getChatId(), helpText(), message.getMessageId());
return true;
}
if ("/status".equals(lower)) {
safeSendText(message.getChatId(), buildStatusText(), message.getMessageId());
return true;
}
if ("/queue".equals(lower)) {
safeSendText(message.getChatId(), buildQueueText(), message.getMessageId());
return true;
}
if ("/stop".equals(lower)) {
boolean stopped = codexClient.stopActiveProcess();
if (stopped) {
queueStore.cancelActiveJob("stopped_by_user");
historyManager.appendSystemEvent("job_stopped_by_user", Map.of(
"timestamp", Instant.now().toString()
));
safeSendText(message.getChatId(), "Текущая задача остановлена и удалена из очереди.", message.getMessageId());
} else {
safeSendText(message.getChatId(), "Сейчас нет активной задачи.", message.getMessageId());
}
return true;
}
if (lower.startsWith("/cancel")) {
String[] parts = text.split("\\s+", 2);
if (parts.length < 2) {
safeSendText(message.getChatId(), "Использование: /cancel <id|all>", message.getMessageId());
return true;
}
String arg = parts[1].trim();
if ("all".equalsIgnoreCase(arg)) {
codexClient.stopActiveProcess();
int cancelled = queueStore.cancelAll("cancel_all_by_user");
safeSendText(message.getChatId(), "Удалено задач из очереди: " + cancelled, message.getMessageId());
return true;
}
Optional<QueueJob> active = queueStore.getActiveJob();
if (active.isPresent() && active.get().id != null
&& active.get().id.toLowerCase().startsWith(arg.toLowerCase())) {
codexClient.stopActiveProcess();
}
boolean cancelled = queueStore.cancelByIdPrefix(arg);
safeSendText(message.getChatId(),
cancelled ? "Задача удалена: " + arg : "Задача не найдена: " + arg,
message.getMessageId());
return true;
}
return false;
}
private String buildStatusText() {
Optional<QueueJob> active = queueStore.getActiveJob();
int pending = queueStore.pendingCount();
if (active.isEmpty()) {
return "Статус: активной задачи нет.\nВ очереди pending: " + pending;
}
QueueJob job = active.get();
return "Статус: активная задача " + shortId(job.id) +
"\nТип: " + job.type +
"\опытка: " + (job.attempts + 1) + "/" + config.maxRetries() +
"\nВыполняется: " + elapsedSeconds() + "с" +
"\nPending: " + pending;
}
private String buildQueueText() {
List<QueueJob> jobs = queueStore.snapshot();
if (jobs.isEmpty()) {
return "Очередь пуста.";
}
StringBuilder sb = new StringBuilder();
sb.append("Очередь: ").append(jobs.size()).append('\n');
int limit = Math.min(jobs.size(), 10);
for (int i = 0; i < limit; i++) {
QueueJob job = jobs.get(i);
sb.append(i + 1).append(") ")
.append(shortId(job.id))
.append(" [").append(job.status).append("] ")
.append(job.type)
.append(" attempts=").append(job.attempts)
.append('\n');
}
if (jobs.size() > limit) {
sb.append("...и ещё ").append(jobs.size() - limit).append(" задач");
}
return sb.toString().trim();
}
private String helpText() {
return """
Доступные команды:
/status активная задача и размер очереди
/queue список задач в очереди
/stop остановить текущую задачу
/cancel <id|all> удалить задачу по id (префикс) или все
/new архивировать историю и начать новую
/help эта справка
""";
}
private void safeSendText(long chatId, String text, Integer replyToMessageId) {
try {
SendMessage message = new SendMessage();
message.setChatId(String.valueOf(chatId));
message.setText(trimForTelegram(text));
if (replyToMessageId != null) {
message.setReplyToMessageId(replyToMessageId);
}
execute(message);
} catch (Exception e) {
log.error("Не удалось отправить сообщение в Telegram", e);
}
}
private void sendLongMessage(long chatId, String text, Integer replyToMessageId) {
String normalized = text == null ? "" : text.strip();
if (normalized.isEmpty()) {
safeSendText(chatId, "(пустой ответ)", replyToMessageId);
return;
}
int max = 3500;
int start = 0;
while (start < normalized.length()) {
int end = Math.min(start + max, normalized.length());
String part = normalized.substring(start, end);
safeSendText(chatId, part, replyToMessageId);
start = end;
}
}
private String trimForTelegram(String value) {
if (value == null) {
return "";
}
String text = value.strip();
int max = 3900;
if (text.length() <= max) {
return text;
}
return text.substring(0, max) + "\n...[обрезано]";
}
private String shortId(String id) {
if (id == null || id.length() < 8) {
return id;
}
return id.substring(0, 8);
}
private long elapsedSeconds() {
long started = activeJobStartedAt.get();
if (started <= 0) {
return 0;
}
return (System.currentTimeMillis() - started) / 1000L;
}
private String shortError(Throwable e) {
String message = e.getMessage();
if (message == null || message.isBlank()) {
return e.getClass().getSimpleName();
}
String normalized = message.replace('\n', ' ').replace('\r', ' ').trim();
if (normalized.length() > 600) {
return normalized.substring(0, 600);
}
return normalized;
}
}

View File

@ -1,2 +1,2 @@
client.version=1.2.83 client.version=1.2.84
server.version=1.2.77 server.version=1.2.78

View File

@ -7,4 +7,5 @@ include 'shine-server-crypto'
include 'shine-server-blockchain' include 'shine-server-blockchain'
include 'shine-server-db' include 'shine-server-db'
include 'shine-server-net-protocol' include 'shine-server-net-protocol'
include 'shine-server-net-server' include 'shine-server-net-server'
include 'SHiNE-agent-bot-coder'