From b16a23243ee8fd60e73a0e1a6dc0e3f4488cd3d3e324a957d728c4f48c563a20 Mon Sep 17 00:00:00 2001 From: AidarKC Date: Thu, 18 Jun 2026 15:18:07 +0400 Subject: [PATCH] =?UTF-8?q?=D0=94=D0=BE=D0=B1=D0=B0=D0=B2=D0=B8=D1=82?= =?UTF-8?q?=D1=8C=20=D0=BF=D0=B5=D1=80=D0=B5=D0=BD=D0=BE=D1=81=D0=B8=D0=BC?= =?UTF-8?q?=D1=8B=D0=B9=20=D1=88=D0=B0=D0=B1=D0=BB=D0=BE=D0=BD=20codex-age?= =?UTF-8?q?nt-VPS?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- VERSION.properties | 4 +- codex-agent-VPS/.env.example | 31 + codex-agent-VPS/.gitignore | 5 + codex-agent-VPS/AGENT.md | 51 + codex-agent-VPS/AGENTS.md | 86 + codex-agent-VPS/py_bot_service.py | 2960 +++++++++++++++++ .../systemd/shine-agent-bot-coder.service | 23 + 7 files changed, 3158 insertions(+), 2 deletions(-) create mode 100644 codex-agent-VPS/.env.example create mode 100644 codex-agent-VPS/.gitignore create mode 100644 codex-agent-VPS/AGENT.md create mode 100644 codex-agent-VPS/AGENTS.md create mode 100644 codex-agent-VPS/py_bot_service.py create mode 100644 codex-agent-VPS/scripts/systemd/shine-agent-bot-coder.service diff --git a/VERSION.properties b/VERSION.properties index 7a06810..fb53565 100644 --- a/VERSION.properties +++ b/VERSION.properties @@ -1,2 +1,2 @@ -client.version=1.2.213 -server.version=1.2.201 +client.version=1.2.214 +server.version=1.2.202 diff --git a/codex-agent-VPS/.env.example b/codex-agent-VPS/.env.example new file mode 100644 index 0000000..9b15d6d --- /dev/null +++ b/codex-agent-VPS/.env.example @@ -0,0 +1,31 @@ +TELEGRAM_BOT_TOKEN=replace_me +OPENAI_API_KEY= +ALLOWED_TELEGRAM_USERNAME=owner_username +ALLOWED_TELEGRAM_PLAYERS=user_one:User One,user_two:User Two +ALLOWED_TELEGRAM_CHANNEL_USERNAME= +BOT_USERNAME=your_bot_username +TELEGRAM_API_BASE_URL=https://api.telegram.org +OPENAI_TRANSCRIBE_MODEL=gpt-4o-mini-transcribe +TELEGRAM_FILE_DOWNLOAD_TIMEOUT_SECONDS=300 +OPENAI_TRANSCRIBE_TIMEOUT_SECONDS=900 +OPENAI_TRANSCRIBE_MAX_UPLOAD_BYTES=25165824 +OPENAI_TRANSCRIBE_MAX_CHUNK_SECONDS=900 +OPENAI_TRANSCRIBE_OVERLAP_SECONDS=2 +OPENAI_TRANSCRIBE_REENCODE_BITRATE_KBPS=24 +OPENAI_TRANSCRIBE_FFMPEG_TIMEOUT_SECONDS=1800 +FFMPEG_BIN=ffmpeg +FFPROBE_BIN=ffprobe +OPENAI_TTS_MODEL=gpt-4o-mini-tts +OPENAI_TTS_VOICE=alloy +OPENAI_TTS_RESPONSE_FORMAT=opus +OPENAI_TTS_TIMEOUT_SECONDS=180 +OPENAI_TTS_CHUNK_CHARS=3500 +OPENAI_VOICE_REWRITE_MODEL=gpt-4.1-nano +OPENAI_VOICE_REWRITE_TIMEOUT_SECONDS=90 +OPENAI_VOICE_REWRITE_MAX_INPUT_CHARS=12000 +OPENAI_VOICE_REWRITE_MAX_OUTPUT_TOKENS=900 +CODEX_BIN=/home/your_user/.local/bin/codex +CODEX_WORKDIR=/home/your_user +CODEX_TIMEOUT_SECONDS=900 +MAX_RETRIES=3 +DATA_DIR=./data diff --git a/codex-agent-VPS/.gitignore b/codex-agent-VPS/.gitignore new file mode 100644 index 0000000..2af457e --- /dev/null +++ b/codex-agent-VPS/.gitignore @@ -0,0 +1,5 @@ +.env +data/ +logs/ +run/ +__pycache__/ diff --git a/codex-agent-VPS/AGENT.md b/codex-agent-VPS/AGENT.md new file mode 100644 index 0000000..8831ff1 --- /dev/null +++ b/codex-agent-VPS/AGENT.md @@ -0,0 +1,51 @@ +# AGENT.md для codex-agent-VPS + +Ты запущен как обработчик входящего Telegram-сообщения от пользователя. + +## Контекст +- `codex-agent-VPS` — Telegram-бот, который принимает сообщения, ведёт историю, ставит задачи в очередь и последовательно запускает `codex` CLI на VPS. +- Текстовые сообщения обрабатываются напрямую. +- Voice и audio сначала распознаются через OpenAI transcription, затем передаются как текстовая задача. +- История диалога хранится в JSONL-файле, путь передаётся в промпте. +- Ответ пойдёт пользователю в Telegram как обычное текстовое сообщение. +- Основная реализация сервиса — Python-скрипт `py_bot_service.py`. + +## Пользователи и доступ +- Разрешённые пользователи задаются через `ALLOWED_TELEGRAM_USERNAME` и `ALLOWED_TELEGRAM_PLAYERS`. +- Все разрешённые пользователи считаются полноправными. +- Для неизвестных пользователей в личном чате сервис отвечает вежливым отказом. +- Все входящие задачи попадают в одну общую очередь и выполняются строго по одной. + +## Очередь и состояние +- Сервис ведёт состояние активной задачи и текущего файла истории. +- После рестарта сервис продолжает незавершённую обработку с учётом сохранённого состояния. +- Истории диалогов хранятся отдельно по username: `data/history//`. +- Архив истории после `/new`: `data/history//archive/`. +- После `/new` для этого же пользователя должен сбрасываться и контекст продолжения Codex-сессии; следующий запрос запускается как новая сессия, не через resume. +- Дедупликация Telegram update обязательна, чтобы одно сообщение не обрабатывалось повторно. +- Если Codex молчит во время активной задачи 2 минуты подряд, сервис отправляет аварийный статус и повторяет его каждые 2 минуты. + +## Голосовые ответы +- Озвучивание финальных ответов настраивается персонально командами `/voice_on` и `/voice_off`. +- Для новых пользователей озвучивание включено по умолчанию. +- Адаптация текста перед озвучкой настраивается командами `/voice_rewrite_on` и `/voice_rewrite_off`. +- Если озвучивание включено, после текстового финального ответа сервис дополнительно отправляет voice-файл через OpenAI TTS. +- Промежуточные статусы озвучивать не нужно. + +## Команды +- `/status` — состояние очереди и персональных настроек. +- `/settings` — текущие пользовательские настройки. +- `/queue` — список задач в очереди. +- `/tasks` — список задач и предложений пользователя. +- `/new` — архивировать историю и начать новую Codex-сессию. +- `/stop` — остановить текущую задачу. +- `/cancel ` — удалить задачу по id или очистить очередь. +- `/restart` и `/restart_service` — отложенный рестарт после текущей задачи. +- `/restart_hard`, `/restart_now`, `/restart_force` — жёсткий рестарт прямо сейчас. + +## Правила ответа +- Пиши содержательно и коротко. +- Не упоминай внутренние служебные детали, файловую систему и технические логи, если это не нужно пользователю. +- Если запрос требует действий с кодом или файлами, выполняй их в рабочей директории `CODEX_WORKDIR`. +- Если данных недостаточно, задай ровно один уточняющий вопрос. +- Если в промпте есть пометка retry, учитывай текущее состояние и продолжай аккуратно, а не начинай заново без причины. diff --git a/codex-agent-VPS/AGENTS.md b/codex-agent-VPS/AGENTS.md new file mode 100644 index 0000000..9bebd5d --- /dev/null +++ b/codex-agent-VPS/AGENTS.md @@ -0,0 +1,86 @@ +# AGENTS + +## Назначение +- `codex-agent-VPS` — переносимая версия Telegram-бота для запуска `codex` CLI на VPS. +- Папку можно ставить в любое место на Linux-сервере, если там есть `python3`, `systemd`, `codex` и доступ в интернет. +- Конфигурация делается через `.env`. + +## Состав папки +- `py_bot_service.py` — основная реализация сервиса. +- `AGENT.md` — инструкции, которые бот передаёт в промпт Codex. +- `.env.example` — пример конфигурации. +- `scripts/systemd/shine-agent-bot-coder.service` — шаблон systemd unit. + +## Требования к VPS +- Linux-сервер с `systemd`. +- Установленные `python3`, `curl`, `ffmpeg`. +- Установленный `codex` CLI. +- Выполненный `codex login` под тем пользователем, от которого будет работать сервис. +- Telegram bot token. +- Telegram usernames разрешённых пользователей. + +## Установка через Codex +1. Скопировать папку `codex-agent-VPS` на сервер в нужное место, например: + - `/home/your_user/codex-agent` +2. Установить `codex` CLI под рабочим пользователем. +3. Выполнить под этим же пользователем: + - `codex login` +4. Установить системные зависимости: + - `python3` + - `ffmpeg` +5. Скопировать `.env.example` в `.env`. +6. В `.env` заполнить: + - `TELEGRAM_BOT_TOKEN` + - `ALLOWED_TELEGRAM_USERNAME` + - `ALLOWED_TELEGRAM_PLAYERS` + - `BOT_USERNAME` + - `CODEX_BIN` + - `CODEX_WORKDIR` +7. Если нужны voice/audio и голосовые ответы, дополнительно задать: + - `OPENAI_API_KEY` +8. В `scripts/systemd/shine-agent-bot-coder.service` заменить: + - `your_user` + - `/home/your_user/codex-agent` + на реальные значения. +9. Скопировать unit в: + - `/etc/systemd/system/shine-agent-bot-coder.service` +10. Выполнить: + - `sudo systemctl daemon-reload` + - `sudo systemctl enable --now shine-agent-bot-coder` +11. Проверить: + - `sudo systemctl status shine-agent-bot-coder --no-pager` + - `sudo journalctl -u shine-agent-bot-coder -f` + +## Настройка доступа +- `ALLOWED_TELEGRAM_USERNAME` — основной разрешённый пользователь. +- `ALLOWED_TELEGRAM_PLAYERS` — дополнительные разрешённые пользователи: + - `username1:Имя 1,username2:Имя 2` +- Все пользователи из whitelist в этой версии считаются полноправными. +- Все входящие задачи попадают в одну общую очередь и выполняются строго последовательно. + +## Поведение агента +- Бот принимает текст, voice и audio. +- Для каждого пользователя ведётся отдельная история. +- Все задачи запускаются через `codex exec`. +- Рабочая директория задаётся через `CODEX_WORKDIR`. +- Вызов идёт без sandbox/approval ограничений: `--dangerously-bypass-approvals-and-sandbox`. + +## Что обычно меняют при переносе +- `.env` +- `scripts/systemd/shine-agent-bot-coder.service` +- при необходимости `AGENT.md` + +## Полезные команды +- Проверка установки Codex: + - `codex --version` + - `codex doctor` +- Self-test без Telegram: + - `python3 py_bot_service.py --selftest-codex "Ответь одной строкой: Codex работает"` +- Проверка сервиса: + - `sudo systemctl status shine-agent-bot-coder --no-pager` + - `sudo journalctl -u shine-agent-bot-coder -f` + +## Примечания +- Если `codex doctor` пишет, что credentials не найдены, нужно выполнить `codex login`. +- Если `OPENAI_API_KEY` пустой, текстовые задачи через `codex` будут работать, а voice/audio и TTS-функции — нет. +- Если у пользователя в Telegram нет username, whitelist по username его не пропустит. diff --git a/codex-agent-VPS/py_bot_service.py b/codex-agent-VPS/py_bot_service.py new file mode 100644 index 0000000..e96dee3 --- /dev/null +++ b/codex-agent-VPS/py_bot_service.py @@ -0,0 +1,2960 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse +import datetime as dt +import fcntl +import json +import mimetypes +import os +import random +import re +import shutil +import string +import subprocess +import tempfile +import threading +import time +import traceback +import uuid +from pathlib import Path +from typing import Any, Callable +from urllib import error, request + +DEFAULT_ALLOWED_PLAYERS = ",".join([ + "user_one:User One", + "user_two:User Two", +]) + +TASK_STATUS_LABELS = { + "new": "новая", + "approved": "одобрена", + "rejected": "отклонена", + "needs_work": "на доработку", + "done": "сделана", +} + + +def now_iso() -> str: + return dt.datetime.now(dt.timezone.utc).isoformat() + + +def normalize_username(value: str | None) -> str: + if not value: + return "" + value = value.strip() + if value.startswith("@"): + value = value[1:] + return value.lower() + + +def parse_allowed_players(raw: str) -> dict[str, str]: + players: dict[str, str] = {} + for item in (raw or "").split(","): + part = item.strip() + if not part: + continue + username_part, sep, name_part = part.partition(":") + username = normalize_username(username_part) + if not username: + continue + player_name = (name_part if sep else username_part).strip() or username + players[username] = player_name + return players + + +def compact_spaces(text: str) -> str: + return re.sub(r"\s+", " ", (text or "").strip()) + + +def split_long_text(text: str, chunk_size: int = 3500) -> list[str]: + text = (text or "").strip() + if not text: + return ["(пустой ответ)"] + return [text[i:i + chunk_size] for i in range(0, len(text), chunk_size)] + + +def split_final_private_text(text: str, first_chunk_size: int = 3900, second_chunk_size: int = 3900) -> list[str]: + text = (text or "").strip() + if not text: + return ["(пустой ответ)"] + if len(text) <= first_chunk_size: + return [text] + first = text[:first_chunk_size].rstrip() + rest = text[first_chunk_size:].lstrip() + if len(rest) <= second_chunk_size: + return [first, rest] + second = rest[:second_chunk_size].rstrip() + if len(second) > 40: + second = second[:-40].rstrip() + second = second.rstrip() + "\n...[ответ обрезан]" + return [first, second] + + +def split_text_for_tts(text: str, chunk_size: int) -> list[str]: + text = (text or "").strip() + if not text: + return [] + chunks: list[str] = [] + current = "" + paragraphs = re.split(r"\n\s*\n", text) + for paragraph in paragraphs: + paragraph = paragraph.strip() + if not paragraph: + continue + if len(paragraph) > chunk_size: + if current: + chunks.append(current) + current = "" + for i in range(0, len(paragraph), chunk_size): + part = paragraph[i:i + chunk_size].strip() + if part: + chunks.append(part) + continue + candidate = paragraph if not current else f"{current}\n\n{paragraph}" + if len(candidate) <= chunk_size: + current = candidate + else: + if current: + chunks.append(current) + current = paragraph + if current: + chunks.append(current) + return chunks + + +def read_env_file(path: Path) -> dict[str, str]: + result: dict[str, str] = {} + if not path.exists(): + return result + for raw_line in path.read_text(encoding="utf-8").splitlines(): + line = raw_line.strip() + if not line or line.startswith("#") or "=" not in line: + continue + key, value = line.split("=", 1) + key = key.strip() + value = value.strip().strip('"').strip("'") + result[key] = value + return result + + +class VoiceTranscriptionError(RuntimeError): + def __init__( + self, + user_message: str, + *, + stage: str, + retryable: bool = True, + detail: str = "", + ): + super().__init__(user_message) + self.user_message = user_message + self.stage = stage + self.retryable = retryable + self.detail = detail + + def log_text(self) -> str: + if self.detail and self.detail != self.user_message: + return f"{self.user_message} stage={self.stage} retryable={self.retryable} detail={self.detail}" + return f"{self.user_message} stage={self.stage} retryable={self.retryable}" + + +class VoiceReplyError(RuntimeError): + pass + + +class JsonLineStore: + @staticmethod + def load(path: Path) -> list[dict[str, Any]]: + if not path.exists(): + return [] + items: list[dict[str, Any]] = [] + for line in path.read_text(encoding="utf-8").splitlines(): + line = line.strip() + if not line: + continue + items.append(json.loads(line)) + return items + + @staticmethod + def save(path: Path, items: list[dict[str, Any]]) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + tmp = path.with_suffix(path.suffix + ".tmp") + with tmp.open("w", encoding="utf-8") as f: + for item in items: + f.write(json.dumps(item, ensure_ascii=False) + "\n") + tmp.replace(path) + + @staticmethod + def append(path: Path, item: dict[str, Any]) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + with path.open("a", encoding="utf-8") as f: + f.write(json.dumps(item, ensure_ascii=False) + "\n") + + +class TelegramApi: + def __init__(self, token: str, base_url: str = "https://api.telegram.org"): + self.token = token + self.api_root = (base_url or "https://api.telegram.org").rstrip("/") + self.base = f"{self.api_root}/bot{token}/" + self.file_base = f"{self.api_root}/file/bot{token}/" + + def call(self, method: str, payload: dict[str, Any] | None = None, timeout: int = 60) -> dict[str, Any]: + data = None + headers = {} + if payload is not None: + data = json.dumps(payload).encode("utf-8") + headers["Content-Type"] = "application/json" + req = request.Request(self.base + method, data=data, headers=headers, method="POST") + try: + with request.urlopen(req, timeout=timeout) as resp: + raw = resp.read().decode("utf-8") + except error.HTTPError as e: + body = e.read().decode("utf-8", errors="replace") + raise RuntimeError(f"Telegram HTTP {e.code}: {body}") from e + except Exception as e: + raise RuntimeError(f"Telegram request failed: {e}") from e + + result = json.loads(raw) + if not result.get("ok"): + raise RuntimeError(f"Telegram API error: {result}") + return result + + def call_multipart( + self, + method: str, + fields: dict[str, Any], + files: dict[str, tuple[str, bytes, str]], + timeout: int = 120, + ) -> dict[str, Any]: + boundary = "----shine-tg-boundary-" + "".join(random.choices("abcdef0123456789", k=16)) + body = bytearray() + for name, value in fields.items(): + if value is None: + continue + body.extend( + ( + f"--{boundary}\r\n" + f'Content-Disposition: form-data; name="{name}"\r\n\r\n' + f"{value}\r\n" + ).encode("utf-8") + ) + for name, (filename, data, mime) in files.items(): + body.extend( + ( + f"--{boundary}\r\n" + f'Content-Disposition: form-data; name="{name}"; filename="{filename}"\r\n' + f"Content-Type: {mime}\r\n\r\n" + ).encode("utf-8") + ) + body.extend(data) + body.extend(b"\r\n") + body.extend(f"--{boundary}--\r\n".encode("utf-8")) + + req = request.Request(self.base + method, data=bytes(body), method="POST") + req.add_header("Content-Type", f"multipart/form-data; boundary={boundary}") + try: + with request.urlopen(req, timeout=timeout) as resp: + raw = resp.read().decode("utf-8") + except error.HTTPError as e: + body_text = e.read().decode("utf-8", errors="replace") + raise RuntimeError(f"Telegram HTTP {e.code}: {body_text}") from e + except Exception as e: + raise RuntimeError(f"Telegram multipart request failed: {e}") from e + + result = json.loads(raw) + if not result.get("ok"): + raise RuntimeError(f"Telegram API error: {result}") + return result + + def get_updates(self, offset: int | None, timeout_sec: int) -> list[dict[str, Any]]: + payload: dict[str, Any] = {"timeout": timeout_sec, "allowed_updates": ["message", "channel_post"]} + if offset is not None: + payload["offset"] = offset + result = self.call("getUpdates", payload=payload, timeout=timeout_sec + 15) + return result.get("result", []) + + def send_message(self, chat_id: int | str, text: str, reply_to_message_id: int | None = None) -> dict[str, Any]: + payload: dict[str, Any] = {"chat_id": chat_id, "text": text} + if reply_to_message_id is not None: + payload["reply_to_message_id"] = reply_to_message_id + return self.call("sendMessage", payload=payload, timeout=30) + + def edit_message_text(self, chat_id: int | str, message_id: int, text: str) -> dict[str, Any]: + payload: dict[str, Any] = {"chat_id": chat_id, "message_id": message_id, "text": text} + return self.call("editMessageText", payload=payload, timeout=30) + + def send_voice( + self, + chat_id: int | str, + voice: str, + caption: str = "", + reply_to_message_id: int | None = None, + ) -> dict[str, Any]: + payload: dict[str, Any] = {"chat_id": chat_id, "voice": voice} + if caption: + payload["caption"] = caption + if reply_to_message_id is not None: + payload["reply_to_message_id"] = reply_to_message_id + return self.call("sendVoice", payload=payload, timeout=60) + + def send_audio( + self, + chat_id: int | str, + audio: str, + caption: str = "", + reply_to_message_id: int | None = None, + ) -> dict[str, Any]: + payload: dict[str, Any] = {"chat_id": chat_id, "audio": audio} + if caption: + payload["caption"] = caption + if reply_to_message_id is not None: + payload["reply_to_message_id"] = reply_to_message_id + return self.call("sendAudio", payload=payload, timeout=60) + + def send_voice_upload( + self, + chat_id: int | str, + voice_bytes: bytes, + filename: str, + caption: str = "", + reply_to_message_id: int | None = None, + ) -> dict[str, Any]: + fields: dict[str, Any] = {"chat_id": chat_id} + if caption: + fields["caption"] = caption + if reply_to_message_id is not None: + fields["reply_to_message_id"] = reply_to_message_id + return self.call_multipart( + "sendVoice", + fields=fields, + files={"voice": (filename, voice_bytes, "audio/ogg")}, + timeout=180, + ) + + def delete_webhook(self) -> None: + self.call("deleteWebhook", payload={"drop_pending_updates": False}, timeout=30) + + +class BotConfig: + def __init__(self, root_dir: Path): + env = dict(os.environ) + env.update(read_env_file(root_dir / ".env")) + + self.root_dir = root_dir + self.telegram_bot_token = self._required(env, "TELEGRAM_BOT_TOKEN") + self.allowed_username = normalize_username(env.get("ALLOWED_TELEGRAM_USERNAME", "AidarKC")) + self.allowed_players = parse_allowed_players(env.get("ALLOWED_TELEGRAM_PLAYERS", DEFAULT_ALLOWED_PLAYERS)) + self.allowed_channel_username = normalize_username(env.get("ALLOWED_TELEGRAM_CHANNEL_USERNAME", "shine_writing")) + self.bot_username = env.get("BOT_USERNAME", "aidar_su_bot") + self.telegram_api_base_url = env.get("TELEGRAM_API_BASE_URL", "https://api.telegram.org").strip() or "https://api.telegram.org" + self.openai_api_key = env.get("OPENAI_API_KEY", "").strip() + self.openai_transcribe_model = env.get("OPENAI_TRANSCRIBE_MODEL", "gpt-4o-mini-transcribe") + self.telegram_file_download_timeout_seconds = int(env.get("TELEGRAM_FILE_DOWNLOAD_TIMEOUT_SECONDS", "300")) + self.openai_transcribe_timeout_seconds = int(env.get("OPENAI_TRANSCRIBE_TIMEOUT_SECONDS", "900")) + self.openai_transcribe_max_upload_bytes = max(1_000_000, int(env.get("OPENAI_TRANSCRIBE_MAX_UPLOAD_BYTES", str(24 * 1024 * 1024)))) + self.openai_transcribe_max_chunk_seconds = max(60, int(env.get("OPENAI_TRANSCRIBE_MAX_CHUNK_SECONDS", "900"))) + self.openai_transcribe_overlap_seconds = max(0, int(env.get("OPENAI_TRANSCRIBE_OVERLAP_SECONDS", "2"))) + self.openai_transcribe_reencode_bitrate_kbps = max(12, int(env.get("OPENAI_TRANSCRIBE_REENCODE_BITRATE_KBPS", "24"))) + self.openai_transcribe_ffmpeg_timeout_seconds = max(30, int(env.get("OPENAI_TRANSCRIBE_FFMPEG_TIMEOUT_SECONDS", "1800"))) + self.ffmpeg_bin = env.get("FFMPEG_BIN", "ffmpeg").strip() or "ffmpeg" + self.ffprobe_bin = env.get("FFPROBE_BIN", "ffprobe").strip() or "ffprobe" + self.openai_tts_model = env.get("OPENAI_TTS_MODEL", "gpt-4o-mini-tts") + self.openai_tts_voice = env.get("OPENAI_TTS_VOICE", "alloy") + self.openai_tts_response_format = env.get("OPENAI_TTS_RESPONSE_FORMAT", "opus") + self.openai_tts_timeout_seconds = int(env.get("OPENAI_TTS_TIMEOUT_SECONDS", "180")) + self.openai_tts_chunk_chars = max(500, int(env.get("OPENAI_TTS_CHUNK_CHARS", "3500"))) + self.openai_voice_rewrite_model = env.get("OPENAI_VOICE_REWRITE_MODEL", "gpt-4.1-nano") + self.openai_voice_rewrite_timeout_seconds = int(env.get("OPENAI_VOICE_REWRITE_TIMEOUT_SECONDS", "90")) + self.openai_voice_rewrite_max_input_chars = max(1000, int(env.get("OPENAI_VOICE_REWRITE_MAX_INPUT_CHARS", "12000"))) + self.openai_voice_rewrite_max_output_tokens = max(200, int(env.get("OPENAI_VOICE_REWRITE_MAX_OUTPUT_TOKENS", "900"))) + self.codex_bin = Path(env.get( + "CODEX_BIN", + str(Path.home() / ".local/bin/codex") + )) + self.codex_workdir = Path(env.get("CODEX_WORKDIR", str(Path.home()))) + self.codex_timeout_seconds = int(env.get("CODEX_TIMEOUT_SECONDS", "900")) + self.max_retries = max(1, int(env.get("MAX_RETRIES", "3"))) + self.data_dir = (root_dir / env.get("DATA_DIR", "./data")).resolve() + self.agent_instructions_file = (root_dir / "AGENT.md").resolve() + + @staticmethod + def _required(env: dict[str, str], key: str) -> str: + value = env.get(key, "").strip() + if not value: + raise RuntimeError(f"Не задан обязательный параметр: {key}") + return value + + +class ShinePyBotService: + def __init__(self, config: BotConfig): + self.cfg = config + self.telegram = TelegramApi(config.telegram_bot_token, config.telegram_api_base_url) + + self.queue_file = config.data_dir / "py_queue.jsonl" + self.state_file = config.data_dir / "py_state.json" + self.processed_updates_file = config.data_dir / "py_processed_updates.log" + self.lock_file = config.data_dir / "py_app.lock" + self.history_dir = config.data_dir / "history" + self.history_archive_dir = self.history_dir / "archive" + self.task_center_dir = config.data_dir / "task_center" + self.task_center_file = self.task_center_dir / "items.json" + self.max_processed_updates = 5000 + + self.queue_lock = threading.RLock() + self.task_center_lock = threading.RLock() + self.stop_event = threading.Event() + self.worker = threading.Thread(target=self._worker_loop, name="shine-py-bot-worker", daemon=True) + + self.queue: list[dict[str, Any]] = [] + self.state: dict[str, Any] = {} + self.processed_updates: list[str] = [] + self.active_job_id: str | None = None + self.active_job_started_at: float | None = None + self.active_process: subprocess.Popen[str] | None = None + self.active_process_lock = threading.Lock() + self.stop_current_job = False + self.lock_fd = None + self.last_heartbeat_at: float = 0.0 + self.restart_requested = False + + def _is_owner(self, username: str) -> bool: + return self._is_allowed_user(username) + + def _is_allowed_player(self, username: str) -> bool: + return False + + def _is_allowed_user(self, username: str) -> bool: + uname = normalize_username(username) + return uname == self.cfg.allowed_username or uname in self.cfg.allowed_players + + def _player_name(self, username: str) -> str: + uname = normalize_username(username) + return self.cfg.allowed_players.get(uname, uname) + + def _display_name(self, username: str) -> str: + uname = normalize_username(username) + if uname == self.cfg.allowed_username: + return "Айдар" + return self._player_name(uname) + + def _known_usernames(self) -> dict[str, str]: + users = {self.cfg.allowed_username: "Айдар"} + users.update(self.cfg.allowed_players) + return users + + def _find_user_by_text(self, text: str) -> str: + source = normalize_username(text) + if source in self._known_usernames(): + return source + source_lower = (text or "").strip().lower() + aliases = { + "айдар": self.cfg.allowed_username, + "айдару": self.cfg.allowed_username, + "айдара": self.cfg.allowed_username, + "милана": "malvviiina", + "милане": "malvviiina", + "милану": "malvviiina", + "миланы": "malvviiina", + "сергей": "zodiaktechnika32", + "сергею": "zodiaktechnika32", + "сергея": "zodiaktechnika32", + "иван": "oidasyda", + "ивану": "oidasyda", + "ивана": "oidasyda", + "ворон": "blackbyrd1", + "ворону": "blackbyrd1", + "ворона": "blackbyrd1", + "дима": "dimasol1", + "диме": "dimasol1", + "диму": "dimasol1", + "димы": "dimasol1", + } + for alias, username in aliases.items(): + if re.search(rf"(^|\W){re.escape(alias)}($|\W)", source_lower, flags=re.IGNORECASE): + return username + for username, name in self._known_usernames().items(): + if username and username in source_lower: + return username + if name and name.lower() in source_lower: + return username + return "" + + def run(self) -> None: + self._ensure_dirs() + self._acquire_single_instance_lock() + self._load_state() + self._load_queue() + self._load_processed_updates() + self._recover_active_jobs_after_restart() + self.telegram.delete_webhook() + self._init_offset_if_missing() + self.worker.start() + self._append_history_event("service_started", {"allowedUsername": self.cfg.allowed_username}) + print(f"[py-bot] Запущен. allowed user: @{self.cfg.allowed_username}", flush=True) + + try: + while not self.stop_event.is_set(): + try: + offset = self.state.get("offset") + updates = self.telegram.get_updates(offset=offset, timeout_sec=25) + except Exception as e: + print(f"[py-bot] Ошибка getUpdates: {e}", flush=True) + time.sleep(2) + continue + + for update in updates: + update_id = update.get("update_id") + if isinstance(update_id, int): + self.state["offset"] = update_id + 1 + self._persist_state() + self._handle_update(update) + finally: + self.shutdown() + + def shutdown(self) -> None: + if self.stop_event.is_set(): + pass + self.stop_event.set() + self._stop_active_codex_process() + if self.worker.is_alive(): + self.worker.join(timeout=10) + if self.lock_fd is not None: + try: + fcntl.flock(self.lock_fd, fcntl.LOCK_UN) + finally: + self.lock_fd.close() + self.lock_fd = None + self._append_history_event("service_stopped", {}) + + def _ensure_dirs(self) -> None: + self.cfg.data_dir.mkdir(parents=True, exist_ok=True) + self.history_dir.mkdir(parents=True, exist_ok=True) + self.history_archive_dir.mkdir(parents=True, exist_ok=True) + self.task_center_dir.mkdir(parents=True, exist_ok=True) + + def _acquire_single_instance_lock(self) -> None: + self.lock_file.parent.mkdir(parents=True, exist_ok=True) + self.lock_fd = self.lock_file.open("a+") + try: + fcntl.flock(self.lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) + except BlockingIOError: + raise RuntimeError(f"Уже запущен другой инстанс (lock: {self.lock_file})") + + def _load_state(self) -> None: + if self.state_file.exists(): + self.state = json.loads(self.state_file.read_text(encoding="utf-8")) + else: + self.state = {} + sessions = self.state.get("user_sessions") + if not isinstance(sessions, dict): + sessions = {} + self.state["user_sessions"] = sessions + user_settings = self.state.get("user_settings") + if not isinstance(user_settings, dict): + user_settings = {} + self.state["user_settings"] = user_settings + if not self.state.get("current_history_file"): + history_file = self._create_new_history_file("initial", self.cfg.allowed_username) + self.state["current_history_file"] = str(history_file) + sessions[self.cfg.allowed_username] = {"current_history_file": str(history_file)} + elif self.cfg.allowed_username not in sessions: + sessions[self.cfg.allowed_username] = {"current_history_file": str(self.state["current_history_file"])} + if not isinstance(self.state.get("next_job_number"), int): + self.state["next_job_number"] = 1 + self.state["updated_at"] = now_iso() + self._persist_state() + + def _persist_state(self) -> None: + self.state["updated_at"] = now_iso() + tmp = self.state_file.with_suffix(".tmp") + tmp.write_text(json.dumps(self.state, ensure_ascii=False, indent=2), encoding="utf-8") + tmp.replace(self.state_file) + + def _load_queue(self) -> None: + self.queue = JsonLineStore.load(self.queue_file) + + def _persist_queue(self) -> None: + JsonLineStore.save(self.queue_file, self.queue) + + def _load_processed_updates(self) -> None: + if not self.processed_updates_file.exists(): + self.processed_updates = [] + return + lines = [x.strip() for x in self.processed_updates_file.read_text(encoding="utf-8").splitlines() if x.strip()] + if len(lines) > self.max_processed_updates: + lines = lines[-self.max_processed_updates:] + self.processed_updates_file.write_text("\n".join(lines) + "\n", encoding="utf-8") + self.processed_updates = lines + + def _mark_processed_update(self, update_key: str) -> bool: + if update_key in self.processed_updates: + return True + self.processed_updates.append(update_key) + if len(self.processed_updates) > self.max_processed_updates: + self.processed_updates = self.processed_updates[-self.max_processed_updates:] + self.processed_updates_file.write_text("\n".join(self.processed_updates) + "\n", encoding="utf-8") + else: + with self.processed_updates_file.open("a", encoding="utf-8") as f: + f.write(update_key + "\n") + return False + + def _recover_active_jobs_after_restart(self) -> None: + recovered_ids: list[str] = [] + for job in self.queue: + if job.get("status") == "active": + job["status"] = "pending" + job["retry_reason"] = "service_restart_recovery" + job["updated_at"] = now_iso() + recovered_ids.append(job.get("id", "")) + if recovered_ids: + self._persist_queue() + self._append_history_event("active_jobs_recovered", {"jobIds": recovered_ids}) + + def _init_offset_if_missing(self) -> None: + if self.state.get("offset") is not None: + return + try: + updates = self.telegram.get_updates(offset=None, timeout_sec=0) + if updates: + self.state["offset"] = int(updates[-1]["update_id"]) + 1 + else: + self.state["offset"] = 0 + self._persist_state() + except Exception as e: + print(f"[py-bot] Не удалось инициализировать offset: {e}", flush=True) + self.state["offset"] = 0 + self._persist_state() + + def _current_history_file(self) -> Path: + return self._current_history_file_for_user(self.cfg.allowed_username) + + def _history_dirs_for_user(self, username: str) -> tuple[Path, Path]: + uname = normalize_username(username) or "unknown" + history_dir = self.history_dir / uname + archive_dir = history_dir / "archive" + history_dir.mkdir(parents=True, exist_ok=True) + archive_dir.mkdir(parents=True, exist_ok=True) + return history_dir, archive_dir + + def _ensure_user_session(self, username: str) -> None: + uname = normalize_username(username) or self.cfg.allowed_username + sessions = self.state.setdefault("user_sessions", {}) + if not isinstance(sessions, dict): + sessions = {} + self.state["user_sessions"] = sessions + self._user_settings(uname) + session = sessions.get(uname) + if isinstance(session, dict) and session.get("current_history_file"): + return + history_file = self._create_new_history_file("initial", uname) + sessions[uname] = {"current_history_file": str(history_file)} + if uname == self.cfg.allowed_username: + self.state["current_history_file"] = str(history_file) + self._persist_state() + + def _user_session_state(self, username: str) -> dict[str, Any]: + uname = normalize_username(username) or self.cfg.allowed_username + self._ensure_user_session(uname) + sessions = self.state.get("user_sessions") or {} + session = sessions.get(uname) + if not isinstance(session, dict): + session = {} + sessions[uname] = session + return session + + def _current_history_file_for_user(self, username: str) -> Path: + session = self._user_session_state(username) + return Path(session["current_history_file"]) + + def _codex_thread_id_for_user(self, username: str) -> str: + thread_id = (self._user_session_state(username).get("codex_thread_id") or "").strip() + return thread_id + + def _set_codex_thread_id_for_user(self, username: str, thread_id: str) -> None: + session = self._user_session_state(username) + normalized = (thread_id or "").strip() + if normalized: + session["codex_thread_id"] = normalized + else: + session.pop("codex_thread_id", None) + self._persist_state() + + def _create_new_history_file(self, reason: str, username: str) -> Path: + ts = dt.datetime.now().strftime("%Y-%m-%d_%H%M%S") + rnd = "".join(random.choices(string.hexdigits.lower(), k=8)) + history_dir, _ = self._history_dirs_for_user(username) + path = history_dir / f"{ts}_{rnd}.jsonl" + JsonLineStore.append(path, { + "ts": now_iso(), + "type": "history_created", + "reason": reason, + "username": normalize_username(username), + }) + return path + + def _rotate_history(self, reason: str, username: str) -> Path: + uname = normalize_username(username) or self.cfg.allowed_username + current = self._current_history_file_for_user(uname) + _, archive_dir = self._history_dirs_for_user(uname) + if current.exists(): + archived = archive_dir / current.name + current.replace(archived) + else: + archived = archive_dir / "(empty)" + new_file = self._create_new_history_file(reason, uname) + sessions = self.state.setdefault("user_sessions", {}) + if not isinstance(sessions, dict): + sessions = {} + self.state["user_sessions"] = sessions + previous = sessions.get(uname) if isinstance(sessions.get(uname), dict) else {} + sessions[uname] = {"current_history_file": str(new_file)} + if reason != "command_new" and isinstance(previous, dict): + thread_id = (previous.get("codex_thread_id") or "").strip() + if thread_id: + sessions[uname]["codex_thread_id"] = thread_id + if uname == self.cfg.allowed_username: + self.state["current_history_file"] = str(new_file) + self._persist_state() + self._append_history_event("history_rotated", {"reason": reason, "username": uname, "archived": str(archived)}, username=uname) + return archived + + def _user_settings(self, username: str) -> dict[str, Any]: + uname = normalize_username(username) or self.cfg.allowed_username + settings = self.state.get("user_settings") + if not isinstance(settings, dict): + settings = {} + self.state["user_settings"] = settings + user_settings = settings.get(uname) + if not isinstance(user_settings, dict): + user_settings = {} + settings[uname] = user_settings + if not isinstance(user_settings.get("voice_replies_enabled"), bool): + user_settings["voice_replies_enabled"] = True + if not isinstance(user_settings.get("voice_rewrite_enabled"), bool): + user_settings["voice_rewrite_enabled"] = True + if not isinstance(user_settings.get("single_status_message_enabled"), bool): + user_settings["single_status_message_enabled"] = True + return user_settings + + def _voice_replies_enabled(self, username: str) -> bool: + return bool(self._user_settings(username).get("voice_replies_enabled")) + + def _set_voice_replies_enabled(self, username: str, enabled: bool) -> None: + self._user_settings(username)["voice_replies_enabled"] = enabled + self._persist_state() + + def _voice_rewrite_enabled(self, username: str) -> bool: + return bool(self._user_settings(username).get("voice_rewrite_enabled")) + + def _set_voice_rewrite_enabled(self, username: str, enabled: bool) -> None: + self._user_settings(username)["voice_rewrite_enabled"] = enabled + self._persist_state() + + def _single_status_message_enabled(self, username: str) -> bool: + return bool(self._user_settings(username).get("single_status_message_enabled")) + + def _set_single_status_message_enabled(self, username: str, enabled: bool) -> None: + self._user_settings(username)["single_status_message_enabled"] = enabled + self._persist_state() + + def _remember_private_chat(self, username: str, chat_id: int) -> None: + uname = normalize_username(username) + if not uname: + return + private_chats = self.state.get("private_chat_ids") + if not isinstance(private_chats, dict): + private_chats = {} + self.state["private_chat_ids"] = private_chats + if private_chats.get(uname) == chat_id: + return + private_chats[uname] = chat_id + self._persist_state() + + def _private_chat_id_for_user(self, username: str) -> int | None: + private_chats = self.state.get("private_chat_ids") + if not isinstance(private_chats, dict): + return None + chat_id = private_chats.get(normalize_username(username)) + return self._resolve_chat_id(chat_id) if isinstance(chat_id, int) else None + + def _append_history(self, history_path: Path, event_type: str, payload: dict[str, Any]) -> None: + row = {"ts": now_iso(), "type": event_type} + row.update(payload) + JsonLineStore.append(history_path, row) + + def _append_history_event(self, event_type: str, payload: dict[str, Any], username: str | None = None) -> None: + history_path = self._current_history_file_for_user(username or self.cfg.allowed_username) + self._append_history(history_path, "system_event", {"event": event_type, **payload}) + + def _load_task_items(self) -> list[dict[str, Any]]: + with self.task_center_lock: + if not self.task_center_file.exists(): + return [] + try: + data = json.loads(self.task_center_file.read_text(encoding="utf-8")) + except Exception: + return [] + return data if isinstance(data, list) else [] + + def _save_task_items(self, items: list[dict[str, Any]]) -> None: + with self.task_center_lock: + self.task_center_file.parent.mkdir(parents=True, exist_ok=True) + tmp = self.task_center_file.with_suffix(".tmp") + tmp.write_text(json.dumps(items, ensure_ascii=False, indent=2), encoding="utf-8") + tmp.replace(self.task_center_file) + + def _next_task_item_id(self, items: list[dict[str, Any]]) -> str: + max_num = 0 + for item in items: + raw = str(item.get("id") or "") + match = re.match(r"TC-(\d+)$", raw) + if match: + max_num = max(max_num, int(match.group(1))) + return f"TC-{max_num + 1:04d}" + + def _create_task_item( + self, + *, + kind: str, + title: str, + text: str, + source_username: str, + target_username: str, + source_message_id: int | None = None, + source_chat_id: int | None = None, + opinion: str = "", + ) -> dict[str, Any]: + items = self._load_task_items() + item = { + "id": self._next_task_item_id(items), + "kind": kind, + "status": "new", + "title": compact_spaces(title)[:160] or ("Предложение" if kind == "proposal" else "Задача"), + "text": (text or "").strip(), + "opinion": (opinion or "").strip(), + "source_username": normalize_username(source_username), + "source_name": self._display_name(source_username), + "target_username": normalize_username(target_username), + "target_name": self._display_name(target_username), + "source_chat_id": source_chat_id, + "source_message_id": source_message_id, + "created_at": now_iso(), + "updated_at": now_iso(), + } + items.append(item) + self._save_task_items(items) + self._append_history_event("task_center_item_created", { + "itemId": item["id"], + "kind": kind, + "sourceUsername": item["source_username"], + "targetUsername": item["target_username"], + "title": item["title"], + }, username=source_username) + return item + + def _task_items_for_user(self, username: str, *, include_done: bool = False) -> list[dict[str, Any]]: + uname = normalize_username(username) + items = self._load_task_items() + result = [ + item for item in items + if normalize_username(item.get("target_username")) == uname + and (include_done or item.get("status") != "done") + ] + return sorted(result, key=lambda x: str(x.get("created_at") or "")) + + def _task_center_counts_text(self, username: str) -> str: + counts: dict[str, int] = {} + for item in self._task_items_for_user(username): + status = str(item.get("status") or "new") + counts[status] = counts.get(status, 0) + 1 + active_total = sum(counts.values()) + if active_total <= 0: + return "" + parts = [] + for status in ("new", "approved", "needs_work", "rejected"): + count = counts.get(status, 0) + if count: + parts.append(f"{TASK_STATUS_LABELS.get(status, status)}: {count}") + return f"Напоминание по задачам: всего активных {active_total}; " + ", ".join(parts) + "." + + def _format_task_items(self, username: str, *, include_done: bool = False) -> str: + items = self._task_items_for_user(username, include_done=include_done) + if not items: + return f"Для {self._display_name(username)} активных задач и предложений нет." + lines = [f"Задачи и предложения для {self._display_name(username)}:"] + for item in items[:15]: + kind = "предложение" if item.get("kind") == "proposal" else "задача" + status = TASK_STATUS_LABELS.get(str(item.get("status") or "new"), str(item.get("status") or "new")) + source = item.get("source_name") or item.get("source_username") or "неизвестно" + title = item.get("title") or "(без названия)" + lines.append(f"{item.get('id')} [{status}] {kind} от {source}: {title}") + if len(items) > 15: + lines.append(f"...и ещё {len(items) - 15}") + return "\n".join(lines) + + def _update_task_item_status(self, item_id: str, status: str) -> dict[str, Any] | None: + item_id = (item_id or "").strip().upper() + items = self._load_task_items() + updated = None + for item in items: + if str(item.get("id") or "").upper() == item_id: + item["status"] = status + item["updated_at"] = now_iso() + updated = item + break + if updated is not None: + self._save_task_items(items) + return updated + + def _find_first_task_item(self, *, source_username: str = "", target_username: str = "", kind: str = "") -> dict[str, Any] | None: + source = normalize_username(source_username) + target = normalize_username(target_username) + for item in self._load_task_items(): + if item.get("status") == "done": + continue + if source and normalize_username(item.get("source_username")) != source: + continue + if target and normalize_username(item.get("target_username")) != target: + continue + if kind and item.get("kind") != kind: + continue + return item + return None + + def _notify_user_about_task_item(self, username: str, item: dict[str, Any]) -> None: + chat_id = self._private_chat_id_for_user(username) + if chat_id is None: + return + kind = "предложение" if item.get("kind") == "proposal" else "задача" + source = item.get("source_name") or item.get("source_username") or "кто-то" + title = item.get("title") or "(без названия)" + status = TASK_STATUS_LABELS.get(str(item.get("status") or "new"), str(item.get("status") or "new")) + if item.get("status") == "new": + text = f"У тебя новое {kind} от {source}: {title}\nID: {item.get('id')}" + else: + text = f"Обновление по {kind} {item.get('id')}: статус «{status}».\n{title}" + self._safe_send(chat_id, text) + + def _send_player_welcome_once(self, chat_id: int, message_id: int, username: str) -> None: + uname = normalize_username(username) + sent = self.state.get("player_welcome_sent") + if not isinstance(sent, dict): + sent = {} + self.state["player_welcome_sent"] = sent + if sent.get(uname): + return + player_name = self._player_name(uname) + text = ( + f"Привет, {player_name}.\n" + "Можно задавать вопросы по проекту, просить анализ, идеи и подготовку готового ТЗ.\n" + "Команда /new начинает новую Codex-сессию и архивирует текущую историю." + ) + reminder = self._task_center_counts_text(uname) + if reminder: + text = f"{text}\n\n{reminder}\nКоманда /tasks покажет список." + self._safe_send(chat_id, text, reply_to=message_id) + sent[uname] = now_iso() + self._persist_state() + + def _resolve_chat_id(self, chat_id: int) -> int: + migrations = self.state.get("chat_id_migrations") + if not isinstance(migrations, dict): + return chat_id + current = chat_id + visited: set[int] = set() + while current not in visited: + visited.add(current) + next_chat_id = migrations.get(str(current)) + if not isinstance(next_chat_id, int): + break + current = next_chat_id + return current + + def _remember_chat_migration(self, old_chat_id: int, new_chat_id: int, source: str) -> None: + if old_chat_id == new_chat_id: + return + migrations = self.state.get("chat_id_migrations") + if not isinstance(migrations, dict): + migrations = {} + self.state["chat_id_migrations"] = migrations + if migrations.get(str(old_chat_id)) == new_chat_id: + return + migrations[str(old_chat_id)] = new_chat_id + self._persist_state() + with self.queue_lock: + changed = False + for job in self.queue: + if job.get("chat_id") == old_chat_id: + job["chat_id"] = new_chat_id + job["updated_at"] = now_iso() + changed = True + if changed: + self._persist_queue() + self._append_history_event("chat_migrated_to_supergroup", { + "oldChatId": old_chat_id, + "newChatId": new_chat_id, + "source": source, + }) + + @staticmethod + def _extract_migrate_to_chat_id(error_text: str) -> int | None: + match = re.search(r'"migrate_to_chat_id"\s*:\s*(-?\d+)', error_text) + if match: + return int(match.group(1)) + match = re.search(r"'migrate_to_chat_id'\s*:\s*(-?\d+)", error_text) + if match: + return int(match.group(1)) + return None + + def _handle_update(self, update: dict[str, Any]) -> None: + message = update.get("message") + update_type = "message" + if not isinstance(message, dict): + message = update.get("channel_post") + update_type = "channel_post" + if not isinstance(message, dict): + return + chat = message.get("chat") or {} + chat_id = chat.get("id") + message_id = message.get("message_id") + chat_type = str(chat.get("type") or "") + chat_username = normalize_username(chat.get("username")) + chat_title = str(chat.get("title") or "") + sender = message.get("from") or {} + username = normalize_username(sender.get("username")) + author_signature = str(message.get("author_signature") or "").strip() + author_username = username or normalize_username(author_signature) + if not isinstance(chat_id, int) or not isinstance(message_id, int): + return + + migrate_to_chat_id = message.get("migrate_to_chat_id") + if isinstance(migrate_to_chat_id, int): + self._remember_chat_migration(chat_id, migrate_to_chat_id, "telegram_message") + return + + update_key = f"{chat_id}:{message_id}" + if self._mark_processed_update(update_key): + return + + is_channel_post = update_type == "channel_post" or chat_type == "channel" + is_group_message = update_type == "message" and chat_type in ("group", "supergroup") + is_allowed_channel = ( + not is_channel_post + or not self.cfg.allowed_channel_username + or chat_username == self.cfg.allowed_channel_username + ) + if is_channel_post and not is_allowed_channel: + return + if chat_username and chat_username == self.cfg.allowed_channel_username: + self._remember_public_report_chat(chat_id) + + # Игнорируем системные сообщения о входе/выходе и смене заголовка/фото. + if message.get("new_chat_members") or message.get("left_chat_member"): + return + if message.get("group_chat_created") or message.get("supergroup_chat_created") or message.get("channel_chat_created"): + return + + text = (message.get("text") or message.get("caption") or "").strip() + actor_username = normalize_username(author_username) + is_allowed = self._is_allowed_user(actor_username) + is_private = chat_type == "private" + if not is_allowed: + if is_private: + self._safe_send(chat_id, "Извините, доступ к этому агенту пока не выдан. Обратитесь к Айдару.", reply_to=message_id) + return + if self._is_allowed_player(actor_username) and not is_private: + return + + self._ensure_user_session(actor_username) + if is_private: + self._remember_private_chat(actor_username, chat_id) + history_path = self._current_history_file_for_user(actor_username) + if self._is_allowed_player(actor_username): + self._send_player_welcome_once(chat_id, message_id, actor_username) + + if not text: + if message.get("voice"): + self._enqueue_voice_job( + chat_id, + message_id, + actor_username, + message["voice"].get("file_id"), + duration_seconds=message["voice"].get("duration"), + telegram_file_size=message["voice"].get("file_size"), + media_type="voice", + update_type=update_type, + chat_username=chat_username, + chat_title=chat_title, + author_signature=author_signature, + chat_type=chat_type, + ) + return + if message.get("audio"): + self._enqueue_voice_job( + chat_id, + message_id, + actor_username, + message["audio"].get("file_id"), + duration_seconds=message["audio"].get("duration"), + telegram_file_size=message["audio"].get("file_size"), + media_type="audio", + update_type=update_type, + chat_username=chat_username, + chat_title=chat_title, + author_signature=author_signature, + chat_type=chat_type, + ) + return + self._safe_send(chat_id, "Поддерживаются текст, voice и audio.", reply_to=message_id) + return + + if text.startswith("/"): + self._handle_command(chat_id, message_id, actor_username, text) + return + + if self._handle_task_center_text(chat_id, message_id, actor_username, text): + return + + self._append_history(history_path, "incoming_text", { + "chatId": chat_id, + "messageId": message_id, + "updateType": update_type, + "chatType": chat_type, + "chatUsername": chat_username, + "chatTitle": chat_title, + "username": actor_username, + "authorSignature": author_signature, + "text": text, + }) + job = self._build_job_base(chat_id, message_id, actor_username, str(history_path)) + job["type"] = "text" + job["text"] = text + job["update_type"] = update_type + job["chat_type"] = chat_type + job["chat_username"] = chat_username + job["chat_title"] = chat_title + job["author_signature"] = author_signature + job["role"] = "owner" if self._is_owner(actor_username) else "player" + job["player_name"] = self._player_name(actor_username) if job["role"] == "player" else "" + with self.queue_lock: + self.queue.append(job) + self._persist_queue() + if chat_type == "private": + self._ensure_job_status_message( + job["id"], + chat_id, + message_id, + f"Задача #{job['num']} получена.\nСтатус: в очереди.", + ) + else: + self._safe_send(chat_id, f"Принял задачу #{job['num']}", reply_to=message_id) + + def _enqueue_voice_job( + self, + chat_id: int, + message_id: int, + username: str, + file_id: str | None, + *, + duration_seconds: int | None = None, + telegram_file_size: int | None = None, + media_type: str = "voice", + update_type: str = "message", + chat_username: str = "", + chat_title: str = "", + author_signature: str = "", + chat_type: str = "", + ) -> None: + if not file_id: + self._safe_send(chat_id, "Не удалось прочитать file_id голосового.", reply_to=message_id) + return + history_path = self._current_history_file_for_user(username) + self._append_history(history_path, "incoming_voice", { + "chatId": chat_id, + "messageId": message_id, + "updateType": update_type, + "chatType": chat_type, + "chatUsername": chat_username, + "chatTitle": chat_title, + "username": username, + "authorSignature": author_signature, + "fileId": file_id, + "mediaType": media_type, + "durationSeconds": duration_seconds, + "fileSize": telegram_file_size, + }) + job = self._build_job_base(chat_id, message_id, username, str(history_path)) + job["type"] = "voice" + job["telegram_file_id"] = file_id + job["telegram_media_type"] = media_type + job["telegram_duration_seconds"] = duration_seconds or 0 + job["telegram_file_size"] = telegram_file_size or 0 + job["update_type"] = update_type + job["chat_type"] = chat_type + job["chat_username"] = chat_username + job["chat_title"] = chat_title + job["author_signature"] = author_signature + job["role"] = "owner" if self._is_owner(username) else "player" + job["player_name"] = self._player_name(username) if job["role"] == "player" else "" + with self.queue_lock: + self.queue.append(job) + self._persist_queue() + if chat_type == "private": + self._ensure_job_status_message( + job["id"], + chat_id, + message_id, + f"Voice для задачи #{job['num']} получен.\nСтатус: в очереди.", + ) + else: + self._safe_send(chat_id, f"Принял voice в задачу #{job['num']}", reply_to=message_id) + + def _build_job_base(self, chat_id: int, message_id: int, username: str, history_file: str) -> dict[str, Any]: + with self.queue_lock: + num = int(self.state.get("next_job_number", 1)) + self.state["next_job_number"] = num + 1 + self._persist_state() + return { + "id": str(uuid.uuid4()), + "num": num, + "status": "pending", + "type": "text", + "chat_id": chat_id, + "message_id": message_id, + "username": username, + "update_type": "message", + "chat_type": "", + "chat_username": "", + "chat_title": "", + "author_signature": "", + "role": "owner", + "player_name": "", + "text": "", + "telegram_file_id": "", + "telegram_media_type": "", + "history_file": history_file, + "attempts": 0, + "retry_reason": "", + "last_error": "", + "created_at": now_iso(), + "updated_at": now_iso(), + "active_since": None, + "status_message_id": None, + "status_message_text": "", + } + + def _handle_task_center_text(self, chat_id: int, message_id: int, username: str, text: str) -> bool: + source_text = (text or "").strip() + lower = source_text.lower() + is_owner = self._is_owner(username) + + if re.search(r"\b(покажи|список|какие)\b.*\b(задач|задачи|предложени)", lower): + target = username + explicit_target = self._find_user_by_text(source_text) + if is_owner and explicit_target: + target = explicit_target + self._safe_send(chat_id, self._format_task_items(target), reply_to=message_id) + return True + + if is_owner: + assign_match = re.search( + r"(?:поставь|добавь|создай|запиши)\s+(?:задачу|задание)\s+(.+?)(?::|\s+-\s+|\s+—\s+)(.+)", + source_text, + flags=re.IGNORECASE | re.DOTALL, + ) + if assign_match: + target = self._find_user_by_text(assign_match.group(1)) + body = assign_match.group(2).strip() + if target and body: + item = self._create_task_item( + kind="task", + title=body, + text=body, + source_username=username, + target_username=target, + source_message_id=message_id, + source_chat_id=chat_id, + ) + self._notify_user_about_task_item(target, item) + self._safe_send( + chat_id, + f"Задача добавлена для {self._display_name(target)}: {item['id']} — {item['title']}", + reply_to=message_id, + ) + return True + + status_match = re.search(r"\b(одобрить|отклонить|доработать|закрыть|сделано|закрыта)\b(?:\s+(.+))?", lower, flags=re.IGNORECASE) + if status_match and ("задач" in lower or "предложени" in lower or re.search(r"tc-\d+", lower, flags=re.IGNORECASE)): + action = status_match.group(1) + tail = status_match.group(2) or "" + status = { + "одобрить": "approved", + "отклонить": "rejected", + "доработать": "needs_work", + "закрыть": "done", + "сделано": "done", + "закрыта": "done", + }.get(action, "new") + id_match = re.search(r"tc-\d+", source_text, flags=re.IGNORECASE) + item = None + if id_match: + item = self._update_task_item_status(id_match.group(0), status) + else: + source_user = self._find_user_by_text(tail) + item = self._find_first_task_item( + source_username=source_user, + target_username=username, + kind="proposal" if "предложени" in lower else "", + ) + if item: + item = self._update_task_item_status(str(item.get("id")), status) + if item: + label = TASK_STATUS_LABELS.get(status, status) + self._safe_send(chat_id, f"{item.get('id')} обновлена: {label}.", reply_to=message_id) + source_user = normalize_username(item.get("source_username")) + if source_user and source_user != username: + self._notify_user_about_task_item(source_user, item) + return True + + if not is_owner: + proposal_match = re.match(r"\s*(?:предложение|идея|заявка)\s*[::-]\s*(.+)", source_text, flags=re.IGNORECASE | re.DOTALL) + if proposal_match: + body = proposal_match.group(1).strip() + if body: + item = self._create_task_item( + kind="proposal", + title=body, + text=body, + opinion="Нужно решение Айдара: одобрить, отклонить или отправить на доработку.", + source_username=username, + target_username=self.cfg.allowed_username, + source_message_id=message_id, + source_chat_id=chat_id, + ) + self._notify_user_about_task_item(self.cfg.allowed_username, item) + self._safe_send( + chat_id, + f"Предложение отправлено Айдару как {item['id']}. Статус: новая.", + reply_to=message_id, + ) + return True + + return False + + def _handle_command(self, chat_id: int, message_id: int, username: str, text: str) -> None: + lower = text.lower() + command = lower.split(maxsplit=1)[0].split("@", 1)[0] + is_owner = self._is_owner(username) + if command in ("/start", "/help"): + self._safe_send(chat_id, self._help_text(is_owner=is_owner), reply_to=message_id) + return + if command == "/settings": + self._safe_send(chat_id, self._settings_text(username), reply_to=message_id) + return + if command == "/status": + self._safe_send(chat_id, self._status_text(username), reply_to=message_id) + return + if command == "/queue": + self._safe_send(chat_id, self._queue_text(), reply_to=message_id) + return + if command in ("/tasks", "/my_tasks"): + parts = text.split(maxsplit=1) + target = username + if is_owner and len(parts) > 1: + parsed = self._find_user_by_text(parts[1]) + if parsed: + target = parsed + self._safe_send(chat_id, self._format_task_items(target), reply_to=message_id) + return + if command == "/voice_on": + self._set_voice_replies_enabled(username, True) + self._append_history_event("voice_replies_enabled", {"username": normalize_username(username)}, username=username) + self._safe_send(chat_id, "Озвучивание финальных ответов включено для вашего пользователя.", reply_to=message_id) + return + if command == "/voice_off": + self._set_voice_replies_enabled(username, False) + self._append_history_event("voice_replies_disabled", {"username": normalize_username(username)}, username=username) + self._safe_send(chat_id, "Озвучивание финальных ответов выключено для вашего пользователя.", reply_to=message_id) + return + if command in ("/voice_status", "/voice_rewrite_status"): + self._safe_send(chat_id, self._status_text(username), reply_to=message_id) + return + if command == "/voice_rewrite_on": + self._set_voice_rewrite_enabled(username, True) + self._append_history_event("voice_rewrite_enabled", {"username": normalize_username(username)}, username=username) + self._safe_send(chat_id, "Адаптация текста перед озвучкой включена для вашего пользователя.", reply_to=message_id) + return + if command == "/voice_rewrite_off": + self._set_voice_rewrite_enabled(username, False) + self._append_history_event("voice_rewrite_disabled", {"username": normalize_username(username)}, username=username) + self._safe_send(chat_id, "Адаптация текста перед озвучкой выключена для вашего пользователя.", reply_to=message_id) + return + if command == "/single_message_on": + self._set_single_status_message_enabled(username, True) + self._append_history_event("single_status_message_enabled", {"username": normalize_username(username)}, username=username) + self._safe_send(chat_id, "Режим одного редактируемого сообщения в личке включён для вашего пользователя.", reply_to=message_id) + return + if command == "/single_message_off": + self._set_single_status_message_enabled(username, False) + self._append_history_event("single_status_message_disabled", {"username": normalize_username(username)}, username=username) + self._safe_send(chat_id, "Режим одного редактируемого сообщения в личке выключен. Бот будет отправлять отдельные сообщения по этапам.", reply_to=message_id) + return + if command == "/new": + archived = self._rotate_history("command_new", username) + self._safe_send(chat_id, f"История очищена. Новый диалог начат.\nАрхив: {archived.name}", reply_to=message_id) + return + if command in ("/restart_service", "/restart"): + if not is_owner: + self._safe_send(chat_id, "Команда недоступна.", reply_to=message_id) + return + self._append_history_event("restart_service_deferred_requested", { + "chatId": chat_id, + "messageId": message_id, + "username": username, + }, username=username) + self._safe_send( + chat_id, + "Отложенный рестарт принят. Если задача сейчас выполняется, сервис перезапустится после её завершения и до следующей задачи.", + reply_to=message_id, + ) + self._request_deferred_restart() + return + if command in ("/restart_hard", "/restart_now", "/restart_force"): + if not is_owner: + self._safe_send(chat_id, "Команда недоступна.", reply_to=message_id) + return + self._append_history_event("restart_service_hard_requested", { + "chatId": chat_id, + "messageId": message_id, + "username": username, + }, username=username) + self._safe_send( + chat_id, + "Выполняю жёсткий рестарт сервиса прямо сейчас. Активная задача, если есть, будет прервана и после старта вернётся в очередь.", + reply_to=message_id, + ) + self._schedule_self_restart("hard_restart_requested", force=True) + return + if command == "/stop": + stopped = self._cancel_active_job("stopped_by_user") + if stopped: + self._safe_send(chat_id, "Текущая задача остановлена и удалена из очереди.", reply_to=message_id) + else: + self._safe_send(chat_id, "Сейчас нет активной задачи.", reply_to=message_id) + return + if command == "/cancel": + parts = text.split(maxsplit=1) + if len(parts) < 2: + self._safe_send(chat_id, "Использование: /cancel ", reply_to=message_id) + return + arg = parts[1].strip() + if arg.lower() == "all": + with self.queue_lock: + self.stop_current_job = True + self._stop_active_codex_process() + count = len(self.queue) + self.queue = [] + self._persist_queue() + self._safe_send(chat_id, f"Удалено задач из очереди: {count}", reply_to=message_id) + return + cancelled = self._cancel_by_id_prefix(arg) + self._safe_send(chat_id, f"Задача удалена: {arg}" if cancelled else f"Задача не найдена: {arg}", reply_to=message_id) + return + + def _help_text(self, *, is_owner: bool) -> str: + lines = [ + "Доступные команды:", + "/status — активная задача и размер очереди", + "/settings — текущие настройки и команды для их изменения", + "/queue — список задач в очереди", + "/tasks — список ваших задач и предложений", + "/stop — остановить текущую задачу", + "/cancel — удалить задачу по id (префикс) или все", + "/new — архивировать историю и начать новую Codex-сессию", + "/help — эта справка", + ] + if is_owner: + lines.insert(-1, "/tasks <пользователь> — список задач пользователя") + lines.insert(-1, "/restart — отложенный рестарт после текущей задачи") + lines.insert(-1, "/restart_hard — жёсткий рестарт прямо сейчас") + return "\n".join(lines) + + def _settings_text(self, username: str) -> str: + voice_status = "включено" if self._voice_replies_enabled(username) else "выключено" + rewrite_status = "включена" if self._voice_rewrite_enabled(username) else "выключена" + single_message_status = "включён" if self._single_status_message_enabled(username) else "выключен" + lines = [ + "Текущие настройки:", + f"Озвучивание финальных ответов: {voice_status}", + f"Адаптация текста перед озвучкой: {rewrite_status}", + f"Режим одного редактируемого сообщения в личке: {single_message_status}", + "", + "Команды настроек:", + "/voice_on — включить озвучивание", + "/voice_off — выключить озвучивание", + "/voice_rewrite_on — адаптировать текст перед озвучкой", + "/voice_rewrite_off — озвучивать обычный текст без адаптации", + "/single_message_on — один редактируемый ответ в личке", + "/single_message_off — отдельные сообщения по этапам и финалу", + ] + return "\n".join(lines) + + def _status_text(self, username: str) -> str: + with self.queue_lock: + active = next((j for j in self.queue if j.get("status") == "active"), None) + pending = sum(1 for j in self.queue if j.get("status") == "pending") + voice_status = "включено" if self._voice_replies_enabled(username) else "выключено" + rewrite_status = "включена" if self._voice_rewrite_enabled(username) else "выключена" + single_message_status = "включён" if self._single_status_message_enabled(username) else "выключен" + settings_text = ( + f"Голосовые ответы: {voice_status}\n" + f"Адаптация текста перед озвучкой: {rewrite_status}\n" + f"Режим одного сообщения в личке: {single_message_status}" + ) + restart_text = "\nОтложенный рестарт: ожидает завершения текущей задачи" if self.restart_requested else "" + if not active: + return f"Статус: активной задачи нет.\nВ очереди pending: {pending}\n{settings_text}{restart_text}" + elapsed = int(time.time() - (self.active_job_started_at or time.time())) + return ( + f"Статус: активная задача #{active.get('num', '?')}\n" + f"Тип: {active.get('type', 'text')}\n" + f"Попытка: {int(active.get('attempts', 0)) + 1}/{self.cfg.max_retries}\n" + f"Выполняется: {elapsed}с\n" + f"Pending: {pending}\n" + f"{settings_text}{restart_text}" + ) + + def _queue_text(self) -> str: + with self.queue_lock: + items = list(self.queue) + if not items: + return "Очередь пуста." + lines = [f"Очередь: {len(items)}"] + for i, job in enumerate(items[:10], start=1): + lines.append( + f"{i}) #{job.get('num', '?')} [{job.get('status')}] {job.get('type')} attempts={job.get('attempts', 0)}" + ) + if len(items) > 10: + lines.append(f"...и ещё {len(items) - 10} задач") + return "\n".join(lines) + + def _cancel_active_job(self, reason: str) -> bool: + with self.queue_lock: + active = next((j for j in self.queue if j.get("status") == "active"), None) + if not active: + return False + self.stop_current_job = True + self._stop_active_codex_process() + self.queue = [j for j in self.queue if j.get("id") != active.get("id")] + self._persist_queue() + self._append_history_event("job_stopped_by_user", {"jobId": active.get("id"), "reason": reason}) + return True + + def _cancel_by_id_prefix(self, prefix: str) -> bool: + prefix = prefix.strip().lower() + normalized_num = prefix.lstrip("#") + with self.queue_lock: + target = next( + ( + j for j in self.queue + if str(j.get("id", "")).lower().startswith(prefix) + or str(j.get("num", "")).lower() == normalized_num + ), + None + ) + if not target: + return False + if target.get("status") == "active": + self.stop_current_job = True + self._stop_active_codex_process() + self.queue = [j for j in self.queue if j.get("id") != target.get("id")] + self._persist_queue() + return True + + def _worker_loop(self) -> None: + while not self.stop_event.is_set(): + if self.restart_requested: + self._exit_for_restart("deferred_restart_before_next_job") + return + job = None + with self.queue_lock: + for item in self.queue: + if item.get("status") == "pending": + item["status"] = "active" + item["active_since"] = now_iso() + item["updated_at"] = now_iso() + self.active_job_id = item.get("id") + self.active_job_started_at = time.time() + job = dict(item) + self._persist_queue() + break + if not job: + time.sleep(0.5) + continue + + self.stop_current_job = False + self._process_job(job) + self.active_job_id = None + self.active_job_started_at = None + if self.restart_requested: + self._exit_for_restart("deferred_restart_after_job") + return + + def _process_job(self, job: dict[str, Any]) -> None: + job_id = job["id"] + job_num = job.get("num", "?") + chat_id = int(job["chat_id"]) + message_id = int(job["message_id"]) + history_path = Path(job["history_file"]) + private_single_message = ( + (job.get("chat_type") or "") == "private" + and self._single_status_message_enabled(job.get("username") or "") + ) + self._set_job_status_text(job, f"Задача #{job_num} в работе.\nСтатус: выполняется.") + try: + if job.get("type") == "voice": + self._set_job_status_text(job, f"Задача #{job_num} в работе.\nСтатус: распознаю voice.") + recognized = self._transcribe_voice_job( + job, + status_cb=lambda note: self._set_job_status_text( + job, + f"Задача #{job_num} в работе.\nСтатус: {note}", + ), + ) + job["text"] = recognized + self._append_history(history_path, "voice_transcription", {"jobId": job_id, "jobNum": job_num, "text": recognized}) + preview = recognized.strip() + if len(preview) > 800: + preview = preview[:800].rstrip() + " ...[обрезано]" + self._set_job_status_text( + job, + f"Задача #{job_num} в работе.\nСтатус: voice распознан, отправляю в Codex.\n\nТекст:\n{preview}", + ) + + prompt = self._build_prompt(job) + self._append_history(history_path, "codex_request", {"jobId": job_id, "prompt": prompt}) + self._set_job_status_text(job, f"Задача #{job_num} в работе.\nСтатус: выполняю через Codex.") + answer = self._run_codex(prompt, job) + if private_single_message: + parts = split_final_private_text(answer) + self._set_job_status_text(job, parts[0]) + if len(parts) > 1: + self._safe_send(chat_id, parts[1], reply_to=message_id) + else: + for chunk in split_long_text(answer): + self._safe_send(chat_id, chunk, reply_to=message_id) + self._append_history(history_path, "codex_response", {"jobId": job_id, "text": answer}) + self._send_private_job_public_report(job, answer) + self._send_task_center_reminder(job) + if self._voice_replies_enabled(job.get("username") or ""): + self._send_voice_reply_for_answer(job, answer, history_path, job_id) + self._mark_job_done(job_id) + except Exception as e: + if self.stop_current_job: + self._append_history(history_path, "job_stopped", {"jobId": job_id, "reason": str(e)}) + self._set_job_status_text(job, f"Задача #{job_num} остановлена.") + self._mark_job_removed(job_id) + self.stop_current_job = False + return + if isinstance(e, VoiceTranscriptionError): + self._append_history(history_path, "voice_transcription_failed", { + "jobId": job_id, + "jobNum": job_num, + "stage": e.stage, + "retryable": e.retryable, + "error": e.user_message, + "detail": e.detail, + }) + self._handle_job_failure(job, e) + + def _build_prompt(self, job: dict[str, Any]) -> str: + retry_block = "" + retry_reason = (job.get("retry_reason") or "").strip() + if retry_reason: + retry_block = f"\n\nПометка retry: {retry_reason}" + return ( + "Пришло сообщение в Telegram.\n" + f"Тип: {job.get('type')}\n" + f"Источник Telegram: {job.get('update_type', 'message')}\n" + f"Тип чата: {job.get('chat_type') or ''}\n" + f"Канал/чат: @{job.get('chat_username') or ''} {job.get('chat_title') or ''}\n" + f"Username отправителя: @{job.get('username')}\n" + f"Подпись автора в Telegram: {job.get('author_signature') or ''}\n" + "Текст для обработки:\n" + f"{job.get('text')}\n\n" + f"История диалога (JSONL): {job.get('history_file')}\n" + f"Инструкции агента: {self.cfg.agent_instructions_file}\n" + f"Работай в рабочем проекте аккуратно и верни только текст ответа пользователю.{retry_block}" + ) + + def _run_codex(self, prompt: str, job: dict[str, Any]) -> str: + username = job.get("username") or self.cfg.allowed_username + thread_id = self._codex_thread_id_for_user(username) + try: + return self._run_codex_once(prompt, job, thread_id=thread_id) + except RuntimeError as e: + if not thread_id or not self._is_missing_codex_session_error(str(e)): + raise + self._set_codex_thread_id_for_user(username, "") + self._append_history( + Path(job["history_file"]), + "system_event", + { + "event": "codex_thread_reset", + "reason": "missing_session", + "username": normalize_username(username), + "oldThreadId": thread_id, + }, + ) + return self._run_codex_once(prompt, job, thread_id="") + + def _run_codex_once(self, prompt: str, job: dict[str, Any], *, thread_id: str) -> str: + output_lines: list[str] = [] + job_id = str(job["id"]) + job_num = job.get("num", "?") + username = job.get("username") or self.cfg.allowed_username + with tempfile.NamedTemporaryFile(prefix="shine-codex-last-message-", suffix=".txt", delete=False) as tmp: + output_file = Path(tmp.name) + + cmd = [ + str(self.cfg.codex_bin), + "exec", + "--dangerously-bypass-approvals-and-sandbox", + "--json", + "-C", str(self.cfg.codex_workdir), + "-o", str(output_file), + ] + if thread_id: + cmd.extend(["resume", thread_id]) + cmd.append(prompt) + mode = f"resume {thread_id}" if thread_id else "new" + print(f"[py-bot] codex exec start job={job_id[:8]} mode={mode}", flush=True) + process = subprocess.Popen( + cmd, + stdin=subprocess.DEVNULL, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + encoding="utf-8", + errors="replace", + bufsize=1, + ) + with self.active_process_lock: + self.active_process = process + + self.last_heartbeat_at = 0.0 + last_user_note = "" + last_user_note_at = 0.0 + codex_started_at = time.time() + last_job_message_at = codex_started_at + seen_thread_id = "" + + def on_line(line: str) -> None: + nonlocal last_user_note, last_user_note_at, last_job_message_at, seen_thread_id + output_lines.append(line) + current_thread_id = self._extract_codex_thread_id(line) + if current_thread_id: + seen_thread_id = current_thread_id + note = self._extract_codex_user_note(line) + now = time.time() + if note and note != last_user_note and now - last_user_note_at > 8: + self._set_job_status_text(job, f"Задача #{job_num} в работе.\nСтатус: {note}") + last_user_note = note + last_user_note_at = now + last_job_message_at = now + + reader_done = threading.Event() + + def reader() -> None: + if not process.stdout: + reader_done.set() + return + for line in process.stdout: + on_line(line.rstrip("\n")) + reader_done.set() + + t = threading.Thread(target=reader, name=f"codex-reader-{job_id[:8]}", daemon=True) + t.start() + + try: + deadline = time.time() + self.cfg.codex_timeout_seconds + return_code = None + while return_code is None: + return_code = process.poll() + now = time.time() + if return_code is not None: + break + if now >= deadline: + process.kill() + t.join(timeout=2) + raise RuntimeError(f"Codex timeout after {self.cfg.codex_timeout_seconds}s") + if now - codex_started_at >= 120 and now - last_job_message_at >= 120: + elapsed = self._format_duration(int(now - codex_started_at)) + self._set_job_status_text( + job, + f"Задача #{job_num} в работе.\nСтатус: выполняется уже {elapsed}, от Codex давно нет новых сообщений.", + ) + last_job_message_at = now + self.last_heartbeat_at = now + time.sleep(1) + finally: + with self.active_process_lock: + self.active_process = None + + reader_done.wait(timeout=2) + + if return_code != 0: + tail = "\n".join(output_lines[-40:]) + raise RuntimeError(f"Codex exited with code {return_code}. Output tail:\n{tail}") + + if seen_thread_id and seen_thread_id != thread_id: + self._set_codex_thread_id_for_user(username, seen_thread_id) + + if output_file.exists(): + answer = output_file.read_text(encoding="utf-8").strip() + try: + output_file.unlink(missing_ok=True) + except Exception: + pass + if answer: + return answer + + fallback = self._extract_fallback_message(output_lines) + if not fallback: + raise RuntimeError("Codex returned empty response") + return fallback + + def _stop_active_codex_process(self) -> bool: + with self.active_process_lock: + process = self.active_process + if process is None: + return False + if process.poll() is not None: + return False + process.terminate() + try: + process.wait(timeout=2) + except subprocess.TimeoutExpired: + process.kill() + return True + + def _handle_job_failure(self, job: dict[str, Any], err: Exception) -> None: + job_id = job["id"] + job_num = job.get("num", "?") + error_text = str(err).strip() or err.__class__.__name__ + user_error_text = self._user_error_text(err) + retryable = not isinstance(err, VoiceTranscriptionError) or err.retryable + log_error_text = err.log_text() if isinstance(err, VoiceTranscriptionError) else error_text + print(f"[py-bot] Ошибка job={job_id[:8]}: {log_error_text}", flush=True) + print(traceback.format_exc(), flush=True) + + with self.queue_lock: + target = next((j for j in self.queue if j.get("id") == job_id), None) + if not target: + return + attempts = int(target.get("attempts", 0)) + 1 + target["attempts"] = attempts + target["last_error"] = error_text[:1000] + target["updated_at"] = now_iso() + if retryable and attempts < self.cfg.max_retries: + target["status"] = "pending" + target["retry_reason"] = error_text[:200] + self._persist_queue() + will_retry = True + else: + self.queue = [j for j in self.queue if j.get("id") != job_id] + self._persist_queue() + will_retry = False + + if will_retry: + self._set_job_status_text( + job, + f"{user_error_text}\nПовторю задачу #{job_num}: попытка {attempts + 1}/{self.cfg.max_retries}.", + ) + else: + self._set_job_status_text(job, f"{user_error_text}\nЗадача #{job_num} остановлена.") + + def _user_error_text(self, err: Exception) -> str: + if isinstance(err, VoiceTranscriptionError): + return f"Не удалось распознать голосовое: {err.user_message}" + error_text = str(err).strip() or err.__class__.__name__ + return f"Ошибка выполнения задачи: {error_text}" + + def _mark_job_done(self, job_id: str) -> None: + with self.queue_lock: + self.queue = [j for j in self.queue if j.get("id") != job_id] + self._persist_queue() + + def _mark_job_removed(self, job_id: str) -> None: + with self.queue_lock: + self.queue = [j for j in self.queue if j.get("id") != job_id] + self._persist_queue() + + def _send_task_center_reminder(self, job: dict[str, Any]) -> None: + if job.get("chat_type") != "private": + return + username = job.get("username") or "" + reminder = self._task_center_counts_text(username) + if not reminder: + return + self._safe_send(int(job["chat_id"]), reminder, reply_to=int(job["message_id"])) + + def _remember_public_report_chat(self, chat_id: int) -> None: + if self.state.get("public_report_chat_id") == chat_id: + return + self.state["public_report_chat_id"] = chat_id + self.state["updated_at"] = now_iso() + self._persist_state() + + def _public_report_chat_id(self) -> int | str | None: + chat_id = self.state.get("public_report_chat_id") + if isinstance(chat_id, int): + return self._resolve_chat_id(chat_id) + if self.cfg.allowed_channel_username: + return f"@{self.cfg.allowed_channel_username}" + return None + + def _send_private_job_public_report(self, job: dict[str, Any], answer: str) -> None: + if job.get("chat_type") != "private": + return + report_chat_id = self._public_report_chat_id() + if report_chat_id is None: + return + + job_num = job.get("num", "?") + source_text = (job.get("text") or "").strip() + if not source_text: + source_text = "(пустой текст запроса)" + role = (job.get("role") or "owner").strip().lower() + author_label = "Айдар" + if role == "player": + player_name = (job.get("player_name") or "").strip() or job.get("username") or "Игрок" + author_label = f"{player_name} (@{job.get('username')})" + if job.get("type") == "voice": + voice_file_id = (job.get("telegram_file_id") or "").strip() + media_type = (job.get("telegram_media_type") or "voice").strip() + request_caption = self._trim_telegram_caption( + f"{author_label} сделал {media_type}-запрос, задача #{job_num}.\n\n" + f"Распознанный текст:\n{source_text}" + ) + request_message_id = None + if voice_file_id: + request_message_id = self._safe_send_telegram_file( + report_chat_id, + voice_file_id, + media_type=media_type, + caption=request_caption, + ) + if request_message_id is None: + request_message_id = self._safe_send(report_chat_id, request_caption) + else: + request_report = ( + f"{author_label} сделал запрос, задача #{job_num}.\n\n" + f"{source_text}" + ) + request_message_id = self._safe_send(report_chat_id, request_report) + if request_message_id is None: + return + + answer_text = (answer or "").strip() or "(пустой ответ)" + answer_chunks = split_long_text(f"Ответ на задачу #{job_num}:\n\n{answer_text}") + for chunk in answer_chunks: + self._safe_send(report_chat_id, chunk, reply_to=request_message_id) + + @staticmethod + def _trim_telegram_caption(text: str, limit: int = 1000) -> str: + text = (text or "").strip() + if len(text) <= limit: + return text + return text[:limit].rstrip() + "\n...[обрезано]" + + def _safe_send_telegram_file( + self, + chat_id: int | str, + file_id: str, + *, + media_type: str = "voice", + caption: str = "", + reply_to: int | None = None, + ) -> int | None: + file_id = (file_id or "").strip() + if not file_id: + return None + caption = self._trim_telegram_caption(caption) + resolved_chat_id: int | str = self._resolve_chat_id(chat_id) if isinstance(chat_id, int) else chat_id + resolved_reply_to = reply_to if resolved_chat_id == chat_id or isinstance(chat_id, str) else None + + def send(target_chat_id: int | str, target_reply_to: int | None) -> dict[str, Any]: + if media_type == "audio": + return self.telegram.send_audio(target_chat_id, file_id, caption=caption, reply_to_message_id=target_reply_to) + return self.telegram.send_voice(target_chat_id, file_id, caption=caption, reply_to_message_id=target_reply_to) + + try: + sent = send(resolved_chat_id, resolved_reply_to) + result = sent.get("result") or {} + message_id = result.get("message_id") + return message_id if isinstance(message_id, int) else None + except Exception as e: + migrate_to_chat_id = self._extract_migrate_to_chat_id(str(e)) + if migrate_to_chat_id is not None: + if isinstance(resolved_chat_id, int): + self._remember_chat_migration(resolved_chat_id, migrate_to_chat_id, "send_file_error") + try: + sent = send(migrate_to_chat_id, None) + result = sent.get("result") or {} + message_id = result.get("message_id") + return message_id if isinstance(message_id, int) else None + except Exception as retry_error: + print(f"[py-bot] sendFile retry after migration error: {retry_error}", flush=True) + return None + print(f"[py-bot] sendFile error: {e}", flush=True) + return None + + def _safe_send_voice_upload( + self, + chat_id: int | str, + voice_bytes: bytes, + filename: str, + *, + caption: str = "", + reply_to: int | None = None, + ) -> int | None: + if not voice_bytes: + return None + caption = self._trim_telegram_caption(caption) + resolved_chat_id: int | str = self._resolve_chat_id(chat_id) if isinstance(chat_id, int) else chat_id + resolved_reply_to = reply_to if resolved_chat_id == chat_id or isinstance(chat_id, str) else None + + def send(target_chat_id: int | str, target_reply_to: int | None) -> dict[str, Any]: + return self.telegram.send_voice_upload( + target_chat_id, + voice_bytes, + filename, + caption=caption, + reply_to_message_id=target_reply_to, + ) + + try: + sent = send(resolved_chat_id, resolved_reply_to) + result = sent.get("result") or {} + message_id = result.get("message_id") + return message_id if isinstance(message_id, int) else None + except Exception as e: + migrate_to_chat_id = self._extract_migrate_to_chat_id(str(e)) + if migrate_to_chat_id is not None: + if isinstance(resolved_chat_id, int): + self._remember_chat_migration(resolved_chat_id, migrate_to_chat_id, "send_voice_upload_error") + try: + sent = send(migrate_to_chat_id, None) + result = sent.get("result") or {} + message_id = result.get("message_id") + return message_id if isinstance(message_id, int) else None + except Exception as retry_error: + print(f"[py-bot] sendVoiceUpload retry after migration error: {retry_error}", flush=True) + return None + print(f"[py-bot] sendVoiceUpload error: {e}", flush=True) + return None + + def _safe_send(self, chat_id: int | str, text: str, reply_to: int | None = None) -> int | None: + text = (text or "").strip() + if not text: + return None + if len(text) > 3900: + text = text[:3900] + "\n...[обрезано]" + resolved_chat_id: int | str = self._resolve_chat_id(chat_id) if isinstance(chat_id, int) else chat_id + resolved_reply_to = reply_to if resolved_chat_id == chat_id or isinstance(chat_id, str) else None + try: + sent = self.telegram.send_message(resolved_chat_id, text, reply_to_message_id=resolved_reply_to) + result = sent.get("result") or {} + message_id = result.get("message_id") + return message_id if isinstance(message_id, int) else None + except Exception as e: + migrate_to_chat_id = self._extract_migrate_to_chat_id(str(e)) + if migrate_to_chat_id is not None: + if isinstance(resolved_chat_id, int): + self._remember_chat_migration(resolved_chat_id, migrate_to_chat_id, "send_message_error") + try: + sent = self.telegram.send_message(migrate_to_chat_id, text, reply_to_message_id=None) + result = sent.get("result") or {} + message_id = result.get("message_id") + return message_id if isinstance(message_id, int) else None + except Exception as retry_error: + print(f"[py-bot] sendMessage retry after migration error: {retry_error}", flush=True) + return None + print(f"[py-bot] sendMessage error: {e}", flush=True) + return None + + def _safe_edit(self, chat_id: int | str, message_id: int | None, text: str) -> bool: + text = (text or "").strip() + if not text or not message_id: + return False + if len(text) > 3900: + text = text[:3900] + "\n...[обрезано]" + resolved_chat_id: int | str = self._resolve_chat_id(chat_id) if isinstance(chat_id, int) else chat_id + try: + self.telegram.edit_message_text(resolved_chat_id, message_id, text) + return True + except Exception as e: + error_text = str(e) + if "message is not modified" in error_text: + return True + migrate_to_chat_id = self._extract_migrate_to_chat_id(error_text) + if migrate_to_chat_id is not None: + if isinstance(resolved_chat_id, int): + self._remember_chat_migration(resolved_chat_id, migrate_to_chat_id, "edit_message_error") + try: + self.telegram.edit_message_text(migrate_to_chat_id, message_id, text) + return True + except Exception as retry_error: + print(f"[py-bot] editMessageText retry after migration error: {retry_error}", flush=True) + return False + print(f"[py-bot] editMessageText error: {e}", flush=True) + return False + + def _ensure_job_status_message(self, job_id: str, chat_id: int, reply_to_message_id: int, text: str) -> int | None: + text = (text or "").strip() + if not text: + return None + with self.queue_lock: + target = next((j for j in self.queue if j.get("id") == job_id), None) + if target: + existing_id = target.get("status_message_id") + existing_text = (target.get("status_message_text") or "").strip() + else: + existing_id = None + existing_text = "" + if existing_id and existing_text == text and self._safe_edit(chat_id, int(existing_id), text): + return int(existing_id) + if existing_id and self._safe_edit(chat_id, int(existing_id), text): + with self.queue_lock: + target = next((j for j in self.queue if j.get("id") == job_id), None) + if target: + target["status_message_text"] = text + target["updated_at"] = now_iso() + self._persist_queue() + return int(existing_id) + message_id = self._safe_send(chat_id, text, reply_to=reply_to_message_id) + if message_id is not None: + with self.queue_lock: + target = next((j for j in self.queue if j.get("id") == job_id), None) + if target: + target["status_message_id"] = message_id + target["status_message_text"] = text + target["updated_at"] = now_iso() + self._persist_queue() + return message_id + + def _set_job_status_text(self, job: dict[str, Any], text: str) -> None: + if (job.get("chat_type") or "") != "private": + self._safe_send(int(job["chat_id"]), text, reply_to=int(job["message_id"])) + return + if not self._single_status_message_enabled(job.get("username") or ""): + self._safe_send(int(job["chat_id"]), text, reply_to=int(job["message_id"])) + return + message_id = self._ensure_job_status_message( + job["id"], + int(job["chat_id"]), + int(job["message_id"]), + text, + ) + if message_id is not None: + job["status_message_id"] = message_id + job["status_message_text"] = text + + def _request_deferred_restart(self) -> None: + if self.restart_requested: + return + self.restart_requested = True + self._append_history_event("restart_service_deferred_scheduled", {}) + with self.queue_lock: + has_active = any(j.get("status") == "active" for j in self.queue) + if not has_active: + threading.Thread( + target=lambda: self._exit_for_restart("deferred_restart_no_active_job"), + name="shine-py-bot-deferred-restart", + daemon=True, + ).start() + + def _exit_for_restart(self, reason: str) -> None: + print(f"[py-bot] restart now: {reason}", flush=True) + self._append_history_event("restart_service_executing", {"reason": reason}) + time.sleep(0.5) + os._exit(0) + + def _schedule_self_restart(self, reason: str = "restart_requested", *, force: bool = False) -> None: + if self.restart_requested and not force: + return + self.restart_requested = True + + def restart() -> None: + time.sleep(1.5) + print(f"[py-bot] restart requested by Telegram command: {reason}", flush=True) + if force: + self._stop_active_codex_process() + os._exit(0) + + threading.Thread(target=restart, name="shine-py-bot-self-restart", daemon=True).start() + + def _voice_reply_targets(self, job: dict[str, Any]) -> list[tuple[int | str, int | None, str]]: + chat_id = int(job["chat_id"]) + message_id = int(job["message_id"]) + targets: list[tuple[int | str, int | None, str]] = [(chat_id, message_id, "source")] + username = job.get("username") or "" + + private_chat_id = self._private_chat_id_for_user(username) + if private_chat_id is not None and private_chat_id != self._resolve_chat_id(chat_id): + targets.append((private_chat_id, None, "private")) + + report_chat_id = self._public_report_chat_id() + if report_chat_id is not None: + resolved_report_chat_id = self._resolve_chat_id(report_chat_id) if isinstance(report_chat_id, int) else report_chat_id + resolved_current_chat_id: int | str = self._resolve_chat_id(chat_id) + if resolved_report_chat_id != resolved_current_chat_id: + targets.append((report_chat_id, None, "public")) + + deduped: list[tuple[int | str, int | None, str]] = [] + seen: set[str] = set() + for target_chat_id, target_reply_to, label in targets: + resolved: int | str = self._resolve_chat_id(target_chat_id) if isinstance(target_chat_id, int) else target_chat_id + key = str(resolved) + if key in seen: + continue + seen.add(key) + deduped.append((target_chat_id, target_reply_to, label)) + return deduped + + def _send_voice_reply_for_answer( + self, + job: dict[str, Any], + answer: str, + history_path: Path, + job_id: str, + ) -> None: + chat_id = int(job["chat_id"]) + message_id = int(job["message_id"]) + job_num = job.get("num", "?") + username = job.get("username") or "" + if not self.cfg.openai_api_key: + note = "не настроен ключ OpenAI для озвучивания." + self._append_history(history_path, "voice_reply_failed", {"jobId": job_id, "jobNum": job_num, "error": note}) + self._safe_send(chat_id, f"Озвучивание включено, но {note}", reply_to=message_id) + return + + voice_text = answer + rewrite_enabled = self._voice_rewrite_enabled(username) + if rewrite_enabled: + try: + voice_text = self._openai_rewrite_text_for_voice(answer) + self._append_history(history_path, "voice_rewrite_done", { + "jobId": job_id, + "jobNum": job_num, + "model": self.cfg.openai_voice_rewrite_model, + "sourceChars": len(answer or ""), + "resultChars": len(voice_text or ""), + }) + except VoiceReplyError as e: + self._append_history(history_path, "voice_rewrite_failed", { + "jobId": job_id, + "jobNum": job_num, + "model": self.cfg.openai_voice_rewrite_model, + "error": str(e), + }) + self._safe_send( + chat_id, + f"Не удалось адаптировать ответ #{job_num} для озвучки, озвучиваю обычный текст: {e}", + reply_to=message_id, + ) + voice_text = answer + + chunks = split_text_for_tts(voice_text, self.cfg.openai_tts_chunk_chars) + if not chunks: + return + + sent_count = 0 + total = len(chunks) + targets = self._voice_reply_targets(job) + print(f"[py-bot] tts start job={str(job_id)[:8]} chunks={total} targets={len(targets)} rewrite={rewrite_enabled}", flush=True) + for index, chunk in enumerate(chunks, start=1): + try: + audio = self._openai_tts(chunk) + except VoiceReplyError as e: + self._append_history(history_path, "voice_reply_failed", { + "jobId": job_id, + "jobNum": job_num, + "part": index, + "parts": total, + "error": str(e), + }) + self._safe_send(chat_id, f"Не удалось озвучить ответ #{job_num}: {e}", reply_to=message_id) + return + caption = f"Озвучка ответа #{job_num}" + if total > 1: + caption += f", часть {index}/{total}" + for target_chat_id, target_reply_to, target_label in targets: + message_sent = self._safe_send_voice_upload( + target_chat_id, + audio, + f"shine-answer-{job_num}-{index}.ogg", + caption=caption, + reply_to=target_reply_to, + ) + if message_sent is None: + self._append_history(history_path, "voice_reply_failed", { + "jobId": job_id, + "jobNum": job_num, + "part": index, + "parts": total, + "target": target_label, + "error": "Telegram не принял voice-файл озвучки.", + }) + if target_label == "source": + self._safe_send(chat_id, f"Озвучка ответа #{job_num} создана, но Telegram не принял voice-файл.", reply_to=message_id) + continue + sent_count += 1 + self._append_history(history_path, "voice_reply_sent", { + "jobId": job_id, + "jobNum": job_num, + "parts": total, + "messages": sent_count, + "targets": len(targets), + "rewriteEnabled": rewrite_enabled, + }) + print(f"[py-bot] tts done job={str(job_id)[:8]} sent={sent_count}", flush=True) + + def _openai_rewrite_text_for_voice(self, text: str) -> str: + source = (text or "").strip() + if not source: + return "" + if len(source) > self.cfg.openai_voice_rewrite_max_input_chars: + source = source[:self.cfg.openai_voice_rewrite_max_input_chars].rstrip() + "\n\n...[текстовый ответ был длиннее и обрезан для голосовой версии]" + payload = { + "model": self.cfg.openai_voice_rewrite_model, + "messages": [ + { + "role": "system", + "content": ( + "Ты готовишь русскую версию финального ответа технического агента для озвучивания. " + "Не пересказывай заново и не меняй смысл: сохрани порядок мыслей, итог, предупреждения, статусы и важные действия. " + "Мягко убери только то, что плохо воспринимается на слух: длинные пути, хэши, ID, команды, JSON, " + "длинные списки файлов, точные размеры и счётчики символов. Если деталь важна, замени её коротким описанием. " + "Не добавляй новых фактов. Пиши естественно, без markdown, близко к исходному тексту." + ), + }, + { + "role": "user", + "content": f"Переделай этот финальный текст в вариант для озвучки:\n\n{source}", + }, + ], + "temperature": 0.2, + "max_tokens": self.cfg.openai_voice_rewrite_max_output_tokens, + } + data = json.dumps(payload, ensure_ascii=False).encode("utf-8") + req = request.Request("https://api.openai.com/v1/chat/completions", method="POST", data=data) + req.add_header("Authorization", f"Bearer {self.cfg.openai_api_key}") + req.add_header("Content-Type", "application/json") + try: + with request.urlopen(req, timeout=self.cfg.openai_voice_rewrite_timeout_seconds) as resp: + raw = resp.read().decode("utf-8", errors="replace") + except TimeoutError as e: + raise VoiceReplyError( + f"OpenAI не успел адаптировать текст за {self.cfg.openai_voice_rewrite_timeout_seconds} секунд." + ) from e + except error.HTTPError as e: + detail = e.read().decode("utf-8", errors="replace") + if e.code == 401: + message = "OpenAI отклонил ключ API для адаптации текста." + elif e.code == 429: + message = "OpenAI временно ограничил адаптацию текста из-за лимита запросов." + elif e.code >= 500: + message = "OpenAI временно не смог адаптировать текст." + else: + message = f"OpenAI вернул ошибку HTTP {e.code} при адаптации текста." + if detail: + message = f"{message} Детали: {detail[:500]}" + raise VoiceReplyError(message) from e + except error.URLError as e: + raise VoiceReplyError(f"не удалось отправить текст в OpenAI для адаптации из-за сетевой ошибки: {e.reason}") from e + + try: + body = json.loads(raw) + content = (((body.get("choices") or [{}])[0].get("message") or {}).get("content") or "").strip() + except Exception as e: + raise VoiceReplyError("OpenAI вернул неразборчивый ответ при адаптации текста.") from e + if not content: + raise VoiceReplyError("OpenAI вернул пустой текст адаптации.") + return content + + def _openai_tts(self, text: str) -> bytes: + payload = { + "model": self.cfg.openai_tts_model, + "voice": self.cfg.openai_tts_voice, + "input": text, + "response_format": self.cfg.openai_tts_response_format, + } + data = json.dumps(payload, ensure_ascii=False).encode("utf-8") + req = request.Request("https://api.openai.com/v1/audio/speech", method="POST", data=data) + req.add_header("Authorization", f"Bearer {self.cfg.openai_api_key}") + req.add_header("Content-Type", "application/json") + try: + with request.urlopen(req, timeout=self.cfg.openai_tts_timeout_seconds) as resp: + audio = resp.read() + except TimeoutError as e: + raise VoiceReplyError(f"OpenAI не успел сгенерировать речь за {self.cfg.openai_tts_timeout_seconds} секунд.") from e + except error.HTTPError as e: + detail = e.read().decode("utf-8", errors="replace") + if e.code == 401: + message = "OpenAI отклонил ключ API для озвучивания." + elif e.code == 429: + message = "OpenAI временно ограничил озвучивание из-за лимита запросов." + elif e.code >= 500: + message = "OpenAI временно не смог сгенерировать речь." + else: + message = f"OpenAI вернул ошибку HTTP {e.code} при озвучивании." + if detail: + message = f"{message} Детали: {detail[:500]}" + raise VoiceReplyError(message) from e + except error.URLError as e: + raise VoiceReplyError(f"не удалось отправить текст в OpenAI TTS из-за сетевой ошибки: {e.reason}") from e + if not audio: + raise VoiceReplyError("OpenAI вернул пустой аудиофайл.") + return audio + + def _transcribe_voice_job( + self, + job: dict[str, Any], + *, + status_cb: Callable[[str], Any] | None = None, + ) -> str: + if not self.cfg.openai_api_key: + raise VoiceTranscriptionError( + "не настроен ключ OpenAI для распознавания.", + stage="config", + retryable=False, + ) + file_id = (job.get("telegram_file_id") or "").strip() + if not file_id: + raise VoiceTranscriptionError( + "Telegram не передал идентификатор файла.", + stage="telegram_file_id", + retryable=False, + ) + job_id = str(job.get("id") or "")[:8] + job_num = job.get("num", "?") + media_type = (job.get("telegram_media_type") or "voice").strip() + duration_seconds = int(job.get("telegram_duration_seconds") or 0) + telegram_file_size = int(job.get("telegram_file_size") or 0) + file_looks_big_for_cloud = self._telegram_cloud_download_is_likely_too_big(telegram_file_size) + if file_looks_big_for_cloud and status_cb is not None: + status_cb( + "файл большой, всё равно пробую скачать его из Telegram. " + f"Предварительный размер около {self._bytes_to_mb(telegram_file_size)} MB." + ) + started_at = time.time() + print(f"[py-bot] transcribe start job={job_id} num={job_num} media={media_type}", flush=True) + file_bytes, filename = self._download_telegram_file(file_id) + print( + f"[py-bot] transcribe downloaded job={job_id} filename={filename} size={len(file_bytes)} bytes", + flush=True, + ) + if file_looks_big_for_cloud and status_cb is not None: + status_cb( + "скачивание из Telegram прошло успешно. " + f"Фактический размер около {self._bytes_to_mb(len(file_bytes))} MB, дальше готовлю аудио и отправляю в OpenAI." + ) + prepared_parts = self._prepare_audio_parts_for_transcription( + file_bytes, + filename, + duration_seconds=duration_seconds, + job_id=job_id, + job_num=job_num, + ) + print( + f"[py-bot] transcribe prepared job={job_id} parts={len(prepared_parts)} duration={duration_seconds}s", + flush=True, + ) + parts_text: list[str] = [] + prompt_tail = "" + for index, (part_bytes, part_name) in enumerate(prepared_parts, start=1): + print( + f"[py-bot] transcribe part job={job_id} index={index}/{len(prepared_parts)} filename={part_name} size={len(part_bytes)} bytes", + flush=True, + ) + part_text = self._openai_transcribe(part_bytes, part_name, prompt=prompt_tail).strip() + if part_text: + parts_text.append(part_text) + prompt_tail = self._transcription_prompt_tail("\n".join(parts_text)) + text = "\n".join(parts_text).strip() + if not text: + raise VoiceTranscriptionError( + "сервис распознавания вернул пустой текст. Возможно, в записи нет слышимой речи или качество звука слишком низкое.", + stage="empty_text", + retryable=False, + ) + elapsed = self._format_duration(int(time.time() - started_at)) + print(f"[py-bot] transcribe done job={job_id} chars={len(text)} elapsed={elapsed}", flush=True) + return text + + def _download_telegram_file(self, file_id: str) -> tuple[bytes, str]: + try: + result = self.telegram.call("getFile", {"file_id": file_id}, timeout=120) + except TimeoutError as e: + raise VoiceTranscriptionError( + "Telegram долго не отдавал информацию о файле.", + stage="telegram_get_file_timeout", + detail=str(e), + ) from e + except Exception as e: + detail = str(e) + if "file is too big" in detail.lower(): + raise VoiceTranscriptionError( + "Файл большой: я попробовал скачать его через текущий Telegram Bot API, " + "но Telegram не дал это сделать. Для такого аудио нужен локальный `telegram-bot-api` " + "сервер или другой способ передать файл боту.", + stage="telegram_get_file_too_big", + retryable=False, + detail=detail, + ) from e + raise VoiceTranscriptionError( + "не удалось получить информацию о файле из Telegram.", + stage="telegram_get_file", + detail=detail, + ) from e + info = result.get("result") or {} + file_path = info.get("file_path") + if not file_path: + raise VoiceTranscriptionError( + "Telegram не вернул путь к файлу.", + stage="telegram_file_path", + retryable=True, + detail=json.dumps(info, ensure_ascii=False)[:1000], + ) + file_url = self.telegram.file_base + file_path.lstrip("/") + req = request.Request(file_url, method="GET") + try: + with request.urlopen(req, timeout=self.cfg.telegram_file_download_timeout_seconds) as resp: + data = resp.read() + except TimeoutError as e: + raise VoiceTranscriptionError( + f"Telegram не успел отдать аудиофайл за {self.cfg.telegram_file_download_timeout_seconds} секунд.", + stage="telegram_download_timeout", + detail=str(e), + ) from e + except error.HTTPError as e: + detail = e.read().decode("utf-8", errors="replace") + raise VoiceTranscriptionError( + f"Telegram вернул ошибку HTTP {e.code} при скачивании аудио.", + stage="telegram_download_http", + retryable=e.code >= 500 or e.code == 429, + detail=detail[:1000], + ) from e + except error.URLError as e: + raise VoiceTranscriptionError( + "не удалось скачать аудиофайл из Telegram из-за сетевой ошибки.", + stage="telegram_download_network", + detail=str(e.reason), + ) from e + if not data: + raise VoiceTranscriptionError( + "Telegram отдал пустой аудиофайл.", + stage="telegram_download_empty", + retryable=True, + ) + original_name = Path(file_path).name or "audio.ogg" + lower = original_name.lower() + # OpenAI transcription может не принимать расширение .oga, нормализуем в .ogg. + if lower.endswith(".oga"): + base = original_name[:-4] if len(original_name) > 4 else "audio" + normalized = f"{base}.ogg" + else: + normalized = original_name + return data, normalized + + def _prepare_audio_parts_for_transcription( + self, + file_bytes: bytes, + filename: str, + *, + duration_seconds: int, + job_id: str, + job_num: Any, + ) -> list[tuple[bytes, str]]: + needs_duration_chunking = duration_seconds > self.cfg.openai_transcribe_max_chunk_seconds + if len(file_bytes) <= self.cfg.openai_transcribe_max_upload_bytes and not needs_duration_chunking: + return [(file_bytes, filename)] + ffmpeg_path = shutil.which(self.cfg.ffmpeg_bin) + ffprobe_path = shutil.which(self.cfg.ffprobe_bin) + if not ffmpeg_path or not ffprobe_path: + raise VoiceTranscriptionError( + "для длинного аудио нужен локальный `ffmpeg`/`ffprobe`, но они не найдены в системе.", + stage="audio_prepare_tools_missing", + retryable=False, + ) + with tempfile.TemporaryDirectory(prefix="shine-audio-") as tmpdir: + tmp = Path(tmpdir) + input_suffix = Path(filename).suffix or ".ogg" + input_path = tmp / f"source{input_suffix}" + input_path.write_bytes(file_bytes) + prepared_path = tmp / "prepared.ogg" + self._ffmpeg_reencode_audio(input_path, prepared_path) + prepared_bytes = prepared_path.read_bytes() + prepared_duration = self._ffprobe_duration_seconds(prepared_path) + if ( + len(prepared_bytes) <= self.cfg.openai_transcribe_max_upload_bytes + and prepared_duration <= self.cfg.openai_transcribe_max_chunk_seconds + ): + return [(prepared_bytes, prepared_path.name)] + chunk_length = self._choose_transcription_chunk_seconds(prepared_duration, len(prepared_bytes)) + print( + f"[py-bot] audio chunking job={job_id} num={job_num} duration={prepared_duration:.1f}s total_bytes={len(prepared_bytes)} chunk_seconds={chunk_length}", + flush=True, + ) + chunks: list[tuple[bytes, str]] = [] + offset = 0 + index = 1 + total_duration = max(1, int(prepared_duration + 0.999)) + while offset < total_duration: + chunk_path = tmp / f"chunk_{index:03d}.ogg" + self._ffmpeg_extract_audio_chunk(prepared_path, chunk_path, offset, chunk_length) + chunk_bytes = chunk_path.read_bytes() + if not chunk_bytes: + break + if len(chunk_bytes) > self.cfg.openai_transcribe_max_upload_bytes: + raise VoiceTranscriptionError( + "локальная нарезка аудио дала слишком большой кусок для OpenAI; нужно уменьшить размер чанка.", + stage="audio_chunk_too_large", + retryable=False, + ) + chunks.append((chunk_bytes, chunk_path.name)) + step = max(1, chunk_length - self.cfg.openai_transcribe_overlap_seconds) + offset += step + index += 1 + if not chunks: + raise VoiceTranscriptionError( + "не удалось подготовить куски аудио для распознавания.", + stage="audio_chunk_empty", + retryable=False, + ) + return chunks + + def _ffmpeg_reencode_audio(self, input_path: Path, output_path: Path) -> None: + cmd = [ + self.cfg.ffmpeg_bin, + "-y", + "-i", + str(input_path), + "-vn", + "-ac", + "1", + "-ar", + "16000", + "-c:a", + "libopus", + "-b:a", + f"{self.cfg.openai_transcribe_reencode_bitrate_kbps}k", + str(output_path), + ] + self._run_subprocess_checked(cmd, "audio_reencode_ffmpeg") + + def _ffmpeg_extract_audio_chunk(self, input_path: Path, output_path: Path, offset_seconds: int, chunk_seconds: int) -> None: + cmd = [ + self.cfg.ffmpeg_bin, + "-y", + "-ss", + str(offset_seconds), + "-t", + str(chunk_seconds), + "-i", + str(input_path), + "-vn", + "-acodec", + "copy", + str(output_path), + ] + self._run_subprocess_checked(cmd, "audio_chunk_ffmpeg") + + def _ffprobe_duration_seconds(self, audio_path: Path) -> float: + cmd = [ + self.cfg.ffprobe_bin, + "-v", + "error", + "-show_entries", + "format=duration", + "-of", + "default=noprint_wrappers=1:nokey=1", + str(audio_path), + ] + try: + result = subprocess.run( + cmd, + check=True, + capture_output=True, + text=True, + timeout=self.cfg.openai_transcribe_ffmpeg_timeout_seconds, + ) + except subprocess.TimeoutExpired as e: + raise VoiceTranscriptionError( + f"`ffprobe` не успел определить длительность аудио за {self.cfg.openai_transcribe_ffmpeg_timeout_seconds} секунд.", + stage="audio_probe_timeout", + retryable=False, + ) from e + except subprocess.CalledProcessError as e: + detail = (e.stderr or e.stdout or "").strip() + raise VoiceTranscriptionError( + "не удалось определить длительность аудио через `ffprobe`.", + stage="audio_probe_failed", + retryable=False, + detail=detail[:1500], + ) from e + raw = (result.stdout or "").strip() + try: + return max(0.0, float(raw)) + except ValueError as e: + raise VoiceTranscriptionError( + "`ffprobe` вернул некорректную длительность аудио.", + stage="audio_probe_invalid", + retryable=False, + detail=raw[:300], + ) from e + + def _run_subprocess_checked(self, cmd: list[str], stage: str) -> None: + try: + subprocess.run( + cmd, + check=True, + capture_output=True, + text=True, + timeout=self.cfg.openai_transcribe_ffmpeg_timeout_seconds, + ) + except subprocess.TimeoutExpired as e: + raise VoiceTranscriptionError( + f"локальная обработка аудио не успела завершиться за {self.cfg.openai_transcribe_ffmpeg_timeout_seconds} секунд.", + stage=f"{stage}_timeout", + retryable=False, + ) from e + except subprocess.CalledProcessError as e: + detail = (e.stderr or e.stdout or "").strip() + raise VoiceTranscriptionError( + "локальная обработка аудио через `ffmpeg` завершилась с ошибкой.", + stage=f"{stage}_failed", + retryable=False, + detail=detail[:1500], + ) from e + + def _choose_transcription_chunk_seconds(self, duration_seconds: float, total_bytes: int) -> int: + max_chunk = self.cfg.openai_transcribe_max_chunk_seconds + safe_seconds = max(60, max_chunk - self.cfg.openai_transcribe_overlap_seconds) + if duration_seconds <= 0 or total_bytes <= 0: + return safe_seconds + bytes_per_second = total_bytes / max(duration_seconds, 1.0) + if bytes_per_second <= 0: + return safe_seconds + size_limited = int((self.cfg.openai_transcribe_max_upload_bytes * 0.9) / bytes_per_second) + return max(60, min(safe_seconds, size_limited if size_limited > 0 else safe_seconds)) + + @staticmethod + def _transcription_prompt_tail(text: str, limit: int = 1000) -> str: + source = compact_spaces(text) + if len(source) <= limit: + return source + return source[-limit:] + + def _telegram_cloud_download_is_likely_too_big(self, file_size: int) -> bool: + if file_size <= 0: + return False + using_cloud_api = self.cfg.telegram_api_base_url.rstrip("/") == "https://api.telegram.org" + return using_cloud_api and file_size > 20 * 1024 * 1024 + + @staticmethod + def _bytes_to_mb(value: int) -> str: + return f"{value / (1024 * 1024):.1f}" + + def _openai_transcribe(self, file_bytes: bytes, filename: str, prompt: str = "") -> str: + boundary = "----shine-boundary-" + "".join(random.choices("abcdef0123456789", k=16)) + mime = mimetypes.guess_type(filename)[0] or "application/octet-stream" + + def text_part(name: str, value: str) -> bytes: + return ( + f"--{boundary}\r\n" + f'Content-Disposition: form-data; name="{name}"\r\n\r\n' + f"{value}\r\n" + ).encode("utf-8") + + body = bytearray() + body.extend(text_part("model", self.cfg.openai_transcribe_model)) + body.extend(text_part("response_format", "text")) + prompt = compact_spaces(prompt) + if prompt: + body.extend(text_part("prompt", prompt[:1000])) + body.extend( + ( + f"--{boundary}\r\n" + f'Content-Disposition: form-data; name="file"; filename="{filename}"\r\n' + f"Content-Type: {mime}\r\n\r\n" + ).encode("utf-8") + ) + body.extend(file_bytes) + body.extend(b"\r\n") + body.extend(f"--{boundary}--\r\n".encode("utf-8")) + + req = request.Request("https://api.openai.com/v1/audio/transcriptions", method="POST", data=bytes(body)) + req.add_header("Authorization", f"Bearer {self.cfg.openai_api_key}") + req.add_header("Content-Type", f"multipart/form-data; boundary={boundary}") + try: + with request.urlopen(req, timeout=self.cfg.openai_transcribe_timeout_seconds) as resp: + return resp.read().decode("utf-8", errors="replace") + except TimeoutError as e: + raise VoiceTranscriptionError( + f"OpenAI не успел распознать аудио за {self.cfg.openai_transcribe_timeout_seconds} секунд.", + stage="openai_transcribe_timeout", + detail=str(e), + ) from e + except error.HTTPError as e: + detail = e.read().decode("utf-8", errors="replace") + if e.code == 400: + user_message = "OpenAI не принял аудиофайл для распознавания." + elif e.code == 401: + user_message = "OpenAI отклонил ключ API для распознавания." + elif e.code == 413: + user_message = "аудиофайл слишком большой для распознавания OpenAI." + elif e.code == 429: + user_message = "OpenAI временно ограничил распознавание из-за лимита запросов." + elif e.code >= 500: + user_message = "OpenAI временно не смог обработать распознавание." + else: + user_message = f"OpenAI вернул ошибку HTTP {e.code} при распознавании." + raise VoiceTranscriptionError( + user_message, + stage="openai_transcribe_http", + retryable=e.code == 429 or e.code >= 500, + detail=detail[:1500], + ) from e + except error.URLError as e: + raise VoiceTranscriptionError( + "не удалось отправить аудио в OpenAI из-за сетевой ошибки.", + stage="openai_transcribe_network", + detail=str(e.reason), + ) from e + + @staticmethod + def _extract_codex_user_note(line: str) -> str | None: + s = (line or "").strip() + if not s.startswith("{"): + return None + try: + obj = json.loads(s) + except Exception: + return None + if obj.get("type") != "item.completed": + return None + item = obj.get("item") or {} + if item.get("type") != "agent_message": + return None + text = (item.get("text") or "").strip() + if not text: + return None + if len(text) > 220: + return text[:220] + "..." + return text + + @staticmethod + def _extract_fallback_message(lines: list[str]) -> str: + for line in reversed(lines): + line = line.strip() + if not line: + continue + if line.startswith("{") and '"type":' in line: + continue + if line.startswith("mcp:") or line.startswith("OpenAI Codex"): + continue + return line + return "" + + @staticmethod + def _extract_codex_thread_id(line: str) -> str: + s = (line or "").strip() + if not s.startswith("{"): + return "" + try: + obj = json.loads(s) + except Exception: + return "" + if obj.get("type") != "thread.started": + return "" + thread_id = (obj.get("thread_id") or "").strip() + return thread_id + + @staticmethod + def _is_missing_codex_session_error(text: str) -> bool: + lowered = (text or "").lower() + markers = [ + "session not found", + "conversation not found", + "thread not found", + "no session found", + "invalid session", + "unknown session", + "no conversation found", + "unknown thread", + ] + return any(marker in lowered for marker in markers) + + @staticmethod + def _format_duration(seconds: int) -> str: + seconds = max(0, seconds) + minutes, sec = divmod(seconds, 60) + hours, minutes = divmod(minutes, 60) + if hours: + return f"{hours}ч {minutes}м {sec}с" + if minutes: + return f"{minutes}м {sec}с" + return f"{sec}с" + + +def run_selftest(config: BotConfig, prompt: str) -> int: + cmd = [ + str(config.codex_bin), + "exec", + "--dangerously-bypass-approvals-and-sandbox", + "--json", + "-C", str(config.codex_workdir), + prompt, + ] + proc = subprocess.run( + cmd, + stdin=subprocess.DEVNULL, + capture_output=True, + text=True, + encoding="utf-8", + errors="replace", + ) + print(proc.stdout) + if proc.stderr: + print(proc.stderr) + return proc.returncode + + +def main() -> int: + parser = argparse.ArgumentParser(description="SHiNE Python Telegram bot wrapper for Codex CLI") + parser.add_argument("--selftest-codex", default="", help="Выполнить только codex exec с этим prompt и выйти") + args = parser.parse_args() + + root = Path(__file__).resolve().parent + cfg = BotConfig(root) + if args.selftest_codex: + return run_selftest(cfg, args.selftest_codex) + + service = ShinePyBotService(cfg) + try: + service.run() + except KeyboardInterrupt: + service.shutdown() + return 0 + except Exception as e: + print(f"[py-bot] FATAL: {e}", flush=True) + return 1 + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/codex-agent-VPS/scripts/systemd/shine-agent-bot-coder.service b/codex-agent-VPS/scripts/systemd/shine-agent-bot-coder.service new file mode 100644 index 0000000..f166ef8 --- /dev/null +++ b/codex-agent-VPS/scripts/systemd/shine-agent-bot-coder.service @@ -0,0 +1,23 @@ +[Unit] +Description=SHiNE Agent Bot Coder (Telegram + Codex queue worker) +After=network-online.target +Wants=network-online.target + +[Service] +Type=simple +User=your_user +Group=your_user +WorkingDirectory=/home/your_user/codex-agent +Environment=HOME=/home/your_user +Environment=PATH=/home/your_user/.local/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin +EnvironmentFile=/home/your_user/codex-agent/.env +ExecStart=/usr/bin/python3 /home/your_user/codex-agent/py_bot_service.py +Restart=always +RestartSec=5 +TimeoutStopSec=20 +SuccessExitStatus=143 0 +StandardOutput=append:/home/your_user/codex-agent/logs/service.log +StandardError=append:/home/your_user/codex-agent/logs/service.log + +[Install] +WantedBy=multi-user.target