#!/usr/bin/env python3 from __future__ import annotations import argparse import datetime as dt import fcntl import json import mimetypes import os import random import re import shutil import string import subprocess import tempfile import threading import time import traceback import uuid from pathlib import Path from typing import Any, Callable from urllib import error, request DEFAULT_ALLOWED_PLAYERS = ",".join([ "malvviiina:Милана", "zodiaktechnika32:Сергей", "oidasyda:Иван", "blackbyrd1:Ворон", "dimasol1:Дима", ]) TASK_STATUS_LABELS = { "new": "новая", "approved": "одобрена", "rejected": "отклонена", "needs_work": "на доработку", "done": "сделана", } def now_iso() -> str: return dt.datetime.now(dt.timezone.utc).isoformat() def normalize_username(value: str | None) -> str: if not value: return "" value = value.strip() if value.startswith("@"): value = value[1:] return value.lower() def parse_allowed_players(raw: str) -> dict[str, str]: players: dict[str, str] = {} for item in (raw or "").split(","): part = item.strip() if not part: continue username_part, sep, name_part = part.partition(":") username = normalize_username(username_part) if not username: continue player_name = (name_part if sep else username_part).strip() or username players[username] = player_name return players def compact_spaces(text: str) -> str: return re.sub(r"\s+", " ", (text or "").strip()) def split_long_text(text: str, chunk_size: int = 3500) -> list[str]: text = (text or "").strip() if not text: return ["(пустой ответ)"] return [text[i:i + chunk_size] for i in range(0, len(text), chunk_size)] def split_final_private_text(text: str, first_chunk_size: int = 3900, second_chunk_size: int = 3900) -> list[str]: text = (text or "").strip() if not text: return ["(пустой ответ)"] if len(text) <= first_chunk_size: return [text] first = text[:first_chunk_size].rstrip() rest = text[first_chunk_size:].lstrip() if len(rest) <= second_chunk_size: return [first, rest] second = rest[:second_chunk_size].rstrip() if len(second) > 40: second = second[:-40].rstrip() second = second.rstrip() + "\n...[ответ обрезан]" return [first, second] def split_text_for_tts(text: str, chunk_size: int) -> list[str]: text = (text or "").strip() if not text: return [] chunks: list[str] = [] current = "" paragraphs = re.split(r"\n\s*\n", text) for paragraph in paragraphs: paragraph = paragraph.strip() if not paragraph: continue if len(paragraph) > chunk_size: if current: chunks.append(current) current = "" for i in range(0, len(paragraph), chunk_size): part = paragraph[i:i + chunk_size].strip() if part: chunks.append(part) continue candidate = paragraph if not current else f"{current}\n\n{paragraph}" if len(candidate) <= chunk_size: current = candidate else: if current: chunks.append(current) current = paragraph if current: chunks.append(current) return chunks def read_env_file(path: Path) -> dict[str, str]: result: dict[str, str] = {} if not path.exists(): return result for raw_line in path.read_text(encoding="utf-8").splitlines(): line = raw_line.strip() if not line or line.startswith("#") or "=" not in line: continue key, value = line.split("=", 1) key = key.strip() value = value.strip().strip('"').strip("'") result[key] = value return result class VoiceTranscriptionError(RuntimeError): def __init__( self, user_message: str, *, stage: str, retryable: bool = True, detail: str = "", ): super().__init__(user_message) self.user_message = user_message self.stage = stage self.retryable = retryable self.detail = detail def log_text(self) -> str: if self.detail and self.detail != self.user_message: return f"{self.user_message} stage={self.stage} retryable={self.retryable} detail={self.detail}" return f"{self.user_message} stage={self.stage} retryable={self.retryable}" class VoiceReplyError(RuntimeError): pass class JsonLineStore: @staticmethod def load(path: Path) -> list[dict[str, Any]]: if not path.exists(): return [] items: list[dict[str, Any]] = [] for line in path.read_text(encoding="utf-8").splitlines(): line = line.strip() if not line: continue items.append(json.loads(line)) return items @staticmethod def save(path: Path, items: list[dict[str, Any]]) -> None: path.parent.mkdir(parents=True, exist_ok=True) tmp = path.with_suffix(path.suffix + ".tmp") with tmp.open("w", encoding="utf-8") as f: for item in items: f.write(json.dumps(item, ensure_ascii=False) + "\n") tmp.replace(path) @staticmethod def append(path: Path, item: dict[str, Any]) -> None: path.parent.mkdir(parents=True, exist_ok=True) with path.open("a", encoding="utf-8") as f: f.write(json.dumps(item, ensure_ascii=False) + "\n") class TelegramApi: def __init__(self, token: str, base_url: str = "https://api.telegram.org"): self.token = token self.api_root = (base_url or "https://api.telegram.org").rstrip("/") self.base = f"{self.api_root}/bot{token}/" self.file_base = f"{self.api_root}/file/bot{token}/" def call(self, method: str, payload: dict[str, Any] | None = None, timeout: int = 60) -> dict[str, Any]: data = None headers = {} if payload is not None: data = json.dumps(payload).encode("utf-8") headers["Content-Type"] = "application/json" req = request.Request(self.base + method, data=data, headers=headers, method="POST") try: with request.urlopen(req, timeout=timeout) as resp: raw = resp.read().decode("utf-8") except error.HTTPError as e: body = e.read().decode("utf-8", errors="replace") raise RuntimeError(f"Telegram HTTP {e.code}: {body}") from e except Exception as e: raise RuntimeError(f"Telegram request failed: {e}") from e result = json.loads(raw) if not result.get("ok"): raise RuntimeError(f"Telegram API error: {result}") return result def call_multipart( self, method: str, fields: dict[str, Any], files: dict[str, tuple[str, bytes, str]], timeout: int = 120, ) -> dict[str, Any]: boundary = "----shine-tg-boundary-" + "".join(random.choices("abcdef0123456789", k=16)) body = bytearray() for name, value in fields.items(): if value is None: continue body.extend( ( f"--{boundary}\r\n" f'Content-Disposition: form-data; name="{name}"\r\n\r\n' f"{value}\r\n" ).encode("utf-8") ) for name, (filename, data, mime) in files.items(): body.extend( ( f"--{boundary}\r\n" f'Content-Disposition: form-data; name="{name}"; filename="{filename}"\r\n' f"Content-Type: {mime}\r\n\r\n" ).encode("utf-8") ) body.extend(data) body.extend(b"\r\n") body.extend(f"--{boundary}--\r\n".encode("utf-8")) req = request.Request(self.base + method, data=bytes(body), method="POST") req.add_header("Content-Type", f"multipart/form-data; boundary={boundary}") try: with request.urlopen(req, timeout=timeout) as resp: raw = resp.read().decode("utf-8") except error.HTTPError as e: body_text = e.read().decode("utf-8", errors="replace") raise RuntimeError(f"Telegram HTTP {e.code}: {body_text}") from e except Exception as e: raise RuntimeError(f"Telegram multipart request failed: {e}") from e result = json.loads(raw) if not result.get("ok"): raise RuntimeError(f"Telegram API error: {result}") return result def get_updates(self, offset: int | None, timeout_sec: int) -> list[dict[str, Any]]: payload: dict[str, Any] = {"timeout": timeout_sec, "allowed_updates": ["message", "channel_post"]} if offset is not None: payload["offset"] = offset result = self.call("getUpdates", payload=payload, timeout=timeout_sec + 15) return result.get("result", []) def send_message(self, chat_id: int | str, text: str, reply_to_message_id: int | None = None) -> dict[str, Any]: payload: dict[str, Any] = {"chat_id": chat_id, "text": text} if reply_to_message_id is not None: payload["reply_to_message_id"] = reply_to_message_id return self.call("sendMessage", payload=payload, timeout=30) def edit_message_text(self, chat_id: int | str, message_id: int, text: str) -> dict[str, Any]: payload: dict[str, Any] = {"chat_id": chat_id, "message_id": message_id, "text": text} return self.call("editMessageText", payload=payload, timeout=30) def send_voice( self, chat_id: int | str, voice: str, caption: str = "", reply_to_message_id: int | None = None, ) -> dict[str, Any]: payload: dict[str, Any] = {"chat_id": chat_id, "voice": voice} if caption: payload["caption"] = caption if reply_to_message_id is not None: payload["reply_to_message_id"] = reply_to_message_id return self.call("sendVoice", payload=payload, timeout=60) def send_audio( self, chat_id: int | str, audio: str, caption: str = "", reply_to_message_id: int | None = None, ) -> dict[str, Any]: payload: dict[str, Any] = {"chat_id": chat_id, "audio": audio} if caption: payload["caption"] = caption if reply_to_message_id is not None: payload["reply_to_message_id"] = reply_to_message_id return self.call("sendAudio", payload=payload, timeout=60) def send_voice_upload( self, chat_id: int | str, voice_bytes: bytes, filename: str, caption: str = "", reply_to_message_id: int | None = None, ) -> dict[str, Any]: fields: dict[str, Any] = {"chat_id": chat_id} if caption: fields["caption"] = caption if reply_to_message_id is not None: fields["reply_to_message_id"] = reply_to_message_id return self.call_multipart( "sendVoice", fields=fields, files={"voice": (filename, voice_bytes, "audio/ogg")}, timeout=180, ) def delete_webhook(self) -> None: self.call("deleteWebhook", payload={"drop_pending_updates": False}, timeout=30) class BotConfig: def __init__(self, root_dir: Path): env = dict(os.environ) env.update(read_env_file(root_dir / ".env")) self.root_dir = root_dir self.telegram_bot_token = self._required(env, "TELEGRAM_BOT_TOKEN") self.allowed_username = normalize_username(env.get("ALLOWED_TELEGRAM_USERNAME", "AidarKC")) self.allowed_players = parse_allowed_players(env.get("ALLOWED_TELEGRAM_PLAYERS", DEFAULT_ALLOWED_PLAYERS)) self.allowed_channel_username = normalize_username(env.get("ALLOWED_TELEGRAM_CHANNEL_USERNAME", "shine_writing")) self.bot_username = env.get("BOT_USERNAME", "aidar_su_bot") self.telegram_api_base_url = env.get("TELEGRAM_API_BASE_URL", "https://api.telegram.org").strip() or "https://api.telegram.org" self.openai_api_key = env.get("OPENAI_API_KEY", "").strip() self.openai_transcribe_model = env.get("OPENAI_TRANSCRIBE_MODEL", "gpt-4o-mini-transcribe") self.telegram_file_download_timeout_seconds = int(env.get("TELEGRAM_FILE_DOWNLOAD_TIMEOUT_SECONDS", "300")) self.openai_transcribe_timeout_seconds = int(env.get("OPENAI_TRANSCRIBE_TIMEOUT_SECONDS", "900")) self.openai_transcribe_max_upload_bytes = max(1_000_000, int(env.get("OPENAI_TRANSCRIBE_MAX_UPLOAD_BYTES", str(24 * 1024 * 1024)))) self.openai_transcribe_max_chunk_seconds = max(60, int(env.get("OPENAI_TRANSCRIBE_MAX_CHUNK_SECONDS", "900"))) self.openai_transcribe_overlap_seconds = max(0, int(env.get("OPENAI_TRANSCRIBE_OVERLAP_SECONDS", "2"))) self.openai_transcribe_reencode_bitrate_kbps = max(12, int(env.get("OPENAI_TRANSCRIBE_REENCODE_BITRATE_KBPS", "24"))) self.openai_transcribe_ffmpeg_timeout_seconds = max(30, int(env.get("OPENAI_TRANSCRIBE_FFMPEG_TIMEOUT_SECONDS", "1800"))) self.ffmpeg_bin = env.get("FFMPEG_BIN", "ffmpeg").strip() or "ffmpeg" self.ffprobe_bin = env.get("FFPROBE_BIN", "ffprobe").strip() or "ffprobe" self.openai_tts_model = env.get("OPENAI_TTS_MODEL", "gpt-4o-mini-tts") self.openai_tts_voice = env.get("OPENAI_TTS_VOICE", "alloy") self.openai_tts_response_format = env.get("OPENAI_TTS_RESPONSE_FORMAT", "opus") self.openai_tts_timeout_seconds = int(env.get("OPENAI_TTS_TIMEOUT_SECONDS", "180")) self.openai_tts_chunk_chars = max(500, int(env.get("OPENAI_TTS_CHUNK_CHARS", "3500"))) self.openai_voice_rewrite_model = env.get("OPENAI_VOICE_REWRITE_MODEL", "gpt-4.1-nano") self.openai_voice_rewrite_timeout_seconds = int(env.get("OPENAI_VOICE_REWRITE_TIMEOUT_SECONDS", "90")) self.openai_voice_rewrite_max_input_chars = max(1000, int(env.get("OPENAI_VOICE_REWRITE_MAX_INPUT_CHARS", "12000"))) self.openai_voice_rewrite_max_output_tokens = max(200, int(env.get("OPENAI_VOICE_REWRITE_MAX_OUTPUT_TOKENS", "900"))) self.codex_bin = Path(env.get( "CODEX_BIN", "/home/ai/.cache/JetBrains/IntelliJIdea2026.1/aia/codex/bin/codex-x86_64-unknown-linux-musl" )) self.codex_workdir = Path(env.get("CODEX_WORKDIR", "/home/ai/work/SHiNE/SHiNE-server-sha256")) self.codex_timeout_seconds = int(env.get("CODEX_TIMEOUT_SECONDS", "900")) self.max_retries = max(1, int(env.get("MAX_RETRIES", "3"))) self.data_dir = (root_dir / env.get("DATA_DIR", "./data")).resolve() self.agent_instructions_file = (root_dir / "AGENT.md").resolve() @staticmethod def _required(env: dict[str, str], key: str) -> str: value = env.get(key, "").strip() if not value: raise RuntimeError(f"Не задан обязательный параметр: {key}") return value class ShinePyBotService: def __init__(self, config: BotConfig): self.cfg = config self.telegram = TelegramApi(config.telegram_bot_token, config.telegram_api_base_url) self.queue_file = config.data_dir / "py_queue.jsonl" self.state_file = config.data_dir / "py_state.json" self.processed_updates_file = config.data_dir / "py_processed_updates.log" self.lock_file = config.data_dir / "py_app.lock" self.history_dir = config.data_dir / "history" self.history_archive_dir = self.history_dir / "archive" self.task_center_dir = config.data_dir / "task_center" self.task_center_file = self.task_center_dir / "items.json" self.max_processed_updates = 5000 self.queue_lock = threading.RLock() self.task_center_lock = threading.RLock() self.stop_event = threading.Event() self.worker = threading.Thread(target=self._worker_loop, name="shine-py-bot-worker", daemon=True) self.queue: list[dict[str, Any]] = [] self.state: dict[str, Any] = {} self.processed_updates: list[str] = [] self.active_job_id: str | None = None self.active_job_started_at: float | None = None self.active_process: subprocess.Popen[str] | None = None self.active_process_lock = threading.Lock() self.stop_current_job = False self.lock_fd = None self.last_heartbeat_at: float = 0.0 self.restart_requested = False def _is_owner(self, username: str) -> bool: return normalize_username(username) == self.cfg.allowed_username def _is_allowed_player(self, username: str) -> bool: return normalize_username(username) in self.cfg.allowed_players def _is_allowed_user(self, username: str) -> bool: uname = normalize_username(username) return uname == self.cfg.allowed_username or uname in self.cfg.allowed_players def _player_name(self, username: str) -> str: uname = normalize_username(username) return self.cfg.allowed_players.get(uname, uname) def _display_name(self, username: str) -> str: uname = normalize_username(username) if uname == self.cfg.allowed_username: return "Айдар" return self._player_name(uname) def _known_usernames(self) -> dict[str, str]: users = {self.cfg.allowed_username: "Айдар"} users.update(self.cfg.allowed_players) return users def _find_user_by_text(self, text: str) -> str: source = normalize_username(text) if source in self._known_usernames(): return source source_lower = (text or "").strip().lower() aliases = { "айдар": self.cfg.allowed_username, "айдару": self.cfg.allowed_username, "айдара": self.cfg.allowed_username, "милана": "malvviiina", "милане": "malvviiina", "милану": "malvviiina", "миланы": "malvviiina", "сергей": "zodiaktechnika32", "сергею": "zodiaktechnika32", "сергея": "zodiaktechnika32", "иван": "oidasyda", "ивану": "oidasyda", "ивана": "oidasyda", "ворон": "blackbyrd1", "ворону": "blackbyrd1", "ворона": "blackbyrd1", "дима": "dimasol1", "диме": "dimasol1", "диму": "dimasol1", "димы": "dimasol1", } for alias, username in aliases.items(): if re.search(rf"(^|\W){re.escape(alias)}($|\W)", source_lower, flags=re.IGNORECASE): return username for username, name in self._known_usernames().items(): if username and username in source_lower: return username if name and name.lower() in source_lower: return username return "" def run(self) -> None: self._ensure_dirs() self._acquire_single_instance_lock() self._load_state() self._load_queue() self._load_processed_updates() self._recover_active_jobs_after_restart() self.telegram.delete_webhook() self._init_offset_if_missing() self.worker.start() self._append_history_event("service_started", {"allowedUsername": self.cfg.allowed_username}) print(f"[py-bot] Запущен. allowed user: @{self.cfg.allowed_username}", flush=True) try: while not self.stop_event.is_set(): try: offset = self.state.get("offset") updates = self.telegram.get_updates(offset=offset, timeout_sec=25) except Exception as e: print(f"[py-bot] Ошибка getUpdates: {e}", flush=True) time.sleep(2) continue for update in updates: update_id = update.get("update_id") if isinstance(update_id, int): self.state["offset"] = update_id + 1 self._persist_state() self._handle_update(update) finally: self.shutdown() def shutdown(self) -> None: if self.stop_event.is_set(): pass self.stop_event.set() self._stop_active_codex_process() if self.worker.is_alive(): self.worker.join(timeout=10) if self.lock_fd is not None: try: fcntl.flock(self.lock_fd, fcntl.LOCK_UN) finally: self.lock_fd.close() self.lock_fd = None self._append_history_event("service_stopped", {}) def _ensure_dirs(self) -> None: self.cfg.data_dir.mkdir(parents=True, exist_ok=True) self.history_dir.mkdir(parents=True, exist_ok=True) self.history_archive_dir.mkdir(parents=True, exist_ok=True) self.task_center_dir.mkdir(parents=True, exist_ok=True) def _acquire_single_instance_lock(self) -> None: self.lock_file.parent.mkdir(parents=True, exist_ok=True) self.lock_fd = self.lock_file.open("a+") try: fcntl.flock(self.lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) except BlockingIOError: raise RuntimeError(f"Уже запущен другой инстанс (lock: {self.lock_file})") def _load_state(self) -> None: if self.state_file.exists(): self.state = json.loads(self.state_file.read_text(encoding="utf-8")) else: self.state = {} sessions = self.state.get("user_sessions") if not isinstance(sessions, dict): sessions = {} self.state["user_sessions"] = sessions user_settings = self.state.get("user_settings") if not isinstance(user_settings, dict): user_settings = {} self.state["user_settings"] = user_settings if not self.state.get("current_history_file"): history_file = self._create_new_history_file("initial", self.cfg.allowed_username) self.state["current_history_file"] = str(history_file) sessions[self.cfg.allowed_username] = {"current_history_file": str(history_file)} elif self.cfg.allowed_username not in sessions: sessions[self.cfg.allowed_username] = {"current_history_file": str(self.state["current_history_file"])} if not isinstance(self.state.get("next_job_number"), int): self.state["next_job_number"] = 1 self.state["updated_at"] = now_iso() self._persist_state() def _persist_state(self) -> None: self.state["updated_at"] = now_iso() tmp = self.state_file.with_suffix(".tmp") tmp.write_text(json.dumps(self.state, ensure_ascii=False, indent=2), encoding="utf-8") tmp.replace(self.state_file) def _load_queue(self) -> None: self.queue = JsonLineStore.load(self.queue_file) def _persist_queue(self) -> None: JsonLineStore.save(self.queue_file, self.queue) def _load_processed_updates(self) -> None: if not self.processed_updates_file.exists(): self.processed_updates = [] return lines = [x.strip() for x in self.processed_updates_file.read_text(encoding="utf-8").splitlines() if x.strip()] if len(lines) > self.max_processed_updates: lines = lines[-self.max_processed_updates:] self.processed_updates_file.write_text("\n".join(lines) + "\n", encoding="utf-8") self.processed_updates = lines def _mark_processed_update(self, update_key: str) -> bool: if update_key in self.processed_updates: return True self.processed_updates.append(update_key) if len(self.processed_updates) > self.max_processed_updates: self.processed_updates = self.processed_updates[-self.max_processed_updates:] self.processed_updates_file.write_text("\n".join(self.processed_updates) + "\n", encoding="utf-8") else: with self.processed_updates_file.open("a", encoding="utf-8") as f: f.write(update_key + "\n") return False def _recover_active_jobs_after_restart(self) -> None: recovered_ids: list[str] = [] for job in self.queue: if job.get("status") == "active": job["status"] = "pending" job["retry_reason"] = "service_restart_recovery" job["updated_at"] = now_iso() recovered_ids.append(job.get("id", "")) if recovered_ids: self._persist_queue() self._append_history_event("active_jobs_recovered", {"jobIds": recovered_ids}) def _init_offset_if_missing(self) -> None: if self.state.get("offset") is not None: return try: updates = self.telegram.get_updates(offset=None, timeout_sec=0) if updates: self.state["offset"] = int(updates[-1]["update_id"]) + 1 else: self.state["offset"] = 0 self._persist_state() except Exception as e: print(f"[py-bot] Не удалось инициализировать offset: {e}", flush=True) self.state["offset"] = 0 self._persist_state() def _current_history_file(self) -> Path: return self._current_history_file_for_user(self.cfg.allowed_username) def _history_dirs_for_user(self, username: str) -> tuple[Path, Path]: uname = normalize_username(username) or "unknown" history_dir = self.history_dir / uname archive_dir = history_dir / "archive" history_dir.mkdir(parents=True, exist_ok=True) archive_dir.mkdir(parents=True, exist_ok=True) return history_dir, archive_dir def _ensure_user_session(self, username: str) -> None: uname = normalize_username(username) or self.cfg.allowed_username sessions = self.state.setdefault("user_sessions", {}) if not isinstance(sessions, dict): sessions = {} self.state["user_sessions"] = sessions self._user_settings(uname) session = sessions.get(uname) if isinstance(session, dict) and session.get("current_history_file"): return history_file = self._create_new_history_file("initial", uname) sessions[uname] = {"current_history_file": str(history_file)} if uname == self.cfg.allowed_username: self.state["current_history_file"] = str(history_file) self._persist_state() def _user_session_state(self, username: str) -> dict[str, Any]: uname = normalize_username(username) or self.cfg.allowed_username self._ensure_user_session(uname) sessions = self.state.get("user_sessions") or {} session = sessions.get(uname) if not isinstance(session, dict): session = {} sessions[uname] = session return session def _current_history_file_for_user(self, username: str) -> Path: session = self._user_session_state(username) return Path(session["current_history_file"]) def _codex_thread_id_for_user(self, username: str) -> str: thread_id = (self._user_session_state(username).get("codex_thread_id") or "").strip() return thread_id def _set_codex_thread_id_for_user(self, username: str, thread_id: str) -> None: session = self._user_session_state(username) normalized = (thread_id or "").strip() if normalized: session["codex_thread_id"] = normalized else: session.pop("codex_thread_id", None) self._persist_state() def _create_new_history_file(self, reason: str, username: str) -> Path: ts = dt.datetime.now().strftime("%Y-%m-%d_%H%M%S") rnd = "".join(random.choices(string.hexdigits.lower(), k=8)) history_dir, _ = self._history_dirs_for_user(username) path = history_dir / f"{ts}_{rnd}.jsonl" JsonLineStore.append(path, { "ts": now_iso(), "type": "history_created", "reason": reason, "username": normalize_username(username), }) return path def _rotate_history(self, reason: str, username: str) -> Path: uname = normalize_username(username) or self.cfg.allowed_username current = self._current_history_file_for_user(uname) _, archive_dir = self._history_dirs_for_user(uname) if current.exists(): archived = archive_dir / current.name current.replace(archived) else: archived = archive_dir / "(empty)" new_file = self._create_new_history_file(reason, uname) sessions = self.state.setdefault("user_sessions", {}) if not isinstance(sessions, dict): sessions = {} self.state["user_sessions"] = sessions previous = sessions.get(uname) if isinstance(sessions.get(uname), dict) else {} sessions[uname] = {"current_history_file": str(new_file)} if reason != "command_new" and isinstance(previous, dict): thread_id = (previous.get("codex_thread_id") or "").strip() if thread_id: sessions[uname]["codex_thread_id"] = thread_id if uname == self.cfg.allowed_username: self.state["current_history_file"] = str(new_file) self._persist_state() self._append_history_event("history_rotated", {"reason": reason, "username": uname, "archived": str(archived)}, username=uname) return archived def _user_settings(self, username: str) -> dict[str, Any]: uname = normalize_username(username) or self.cfg.allowed_username settings = self.state.get("user_settings") if not isinstance(settings, dict): settings = {} self.state["user_settings"] = settings user_settings = settings.get(uname) if not isinstance(user_settings, dict): user_settings = {} settings[uname] = user_settings if not isinstance(user_settings.get("voice_replies_enabled"), bool): user_settings["voice_replies_enabled"] = True if not isinstance(user_settings.get("voice_rewrite_enabled"), bool): user_settings["voice_rewrite_enabled"] = True if not isinstance(user_settings.get("single_status_message_enabled"), bool): user_settings["single_status_message_enabled"] = True return user_settings def _voice_replies_enabled(self, username: str) -> bool: return bool(self._user_settings(username).get("voice_replies_enabled")) def _set_voice_replies_enabled(self, username: str, enabled: bool) -> None: self._user_settings(username)["voice_replies_enabled"] = enabled self._persist_state() def _voice_rewrite_enabled(self, username: str) -> bool: return bool(self._user_settings(username).get("voice_rewrite_enabled")) def _set_voice_rewrite_enabled(self, username: str, enabled: bool) -> None: self._user_settings(username)["voice_rewrite_enabled"] = enabled self._persist_state() def _single_status_message_enabled(self, username: str) -> bool: return bool(self._user_settings(username).get("single_status_message_enabled")) def _set_single_status_message_enabled(self, username: str, enabled: bool) -> None: self._user_settings(username)["single_status_message_enabled"] = enabled self._persist_state() def _remember_private_chat(self, username: str, chat_id: int) -> None: uname = normalize_username(username) if not uname: return private_chats = self.state.get("private_chat_ids") if not isinstance(private_chats, dict): private_chats = {} self.state["private_chat_ids"] = private_chats if private_chats.get(uname) == chat_id: return private_chats[uname] = chat_id self._persist_state() def _private_chat_id_for_user(self, username: str) -> int | None: private_chats = self.state.get("private_chat_ids") if not isinstance(private_chats, dict): return None chat_id = private_chats.get(normalize_username(username)) return self._resolve_chat_id(chat_id) if isinstance(chat_id, int) else None def _append_history(self, history_path: Path, event_type: str, payload: dict[str, Any]) -> None: row = {"ts": now_iso(), "type": event_type} row.update(payload) JsonLineStore.append(history_path, row) def _append_history_event(self, event_type: str, payload: dict[str, Any], username: str | None = None) -> None: history_path = self._current_history_file_for_user(username or self.cfg.allowed_username) self._append_history(history_path, "system_event", {"event": event_type, **payload}) def _load_task_items(self) -> list[dict[str, Any]]: with self.task_center_lock: if not self.task_center_file.exists(): return [] try: data = json.loads(self.task_center_file.read_text(encoding="utf-8")) except Exception: return [] return data if isinstance(data, list) else [] def _save_task_items(self, items: list[dict[str, Any]]) -> None: with self.task_center_lock: self.task_center_file.parent.mkdir(parents=True, exist_ok=True) tmp = self.task_center_file.with_suffix(".tmp") tmp.write_text(json.dumps(items, ensure_ascii=False, indent=2), encoding="utf-8") tmp.replace(self.task_center_file) def _next_task_item_id(self, items: list[dict[str, Any]]) -> str: max_num = 0 for item in items: raw = str(item.get("id") or "") match = re.match(r"TC-(\d+)$", raw) if match: max_num = max(max_num, int(match.group(1))) return f"TC-{max_num + 1:04d}" def _create_task_item( self, *, kind: str, title: str, text: str, source_username: str, target_username: str, source_message_id: int | None = None, source_chat_id: int | None = None, opinion: str = "", ) -> dict[str, Any]: items = self._load_task_items() item = { "id": self._next_task_item_id(items), "kind": kind, "status": "new", "title": compact_spaces(title)[:160] or ("Предложение" if kind == "proposal" else "Задача"), "text": (text or "").strip(), "opinion": (opinion or "").strip(), "source_username": normalize_username(source_username), "source_name": self._display_name(source_username), "target_username": normalize_username(target_username), "target_name": self._display_name(target_username), "source_chat_id": source_chat_id, "source_message_id": source_message_id, "created_at": now_iso(), "updated_at": now_iso(), } items.append(item) self._save_task_items(items) self._append_history_event("task_center_item_created", { "itemId": item["id"], "kind": kind, "sourceUsername": item["source_username"], "targetUsername": item["target_username"], "title": item["title"], }, username=source_username) return item def _task_items_for_user(self, username: str, *, include_done: bool = False) -> list[dict[str, Any]]: uname = normalize_username(username) items = self._load_task_items() result = [ item for item in items if normalize_username(item.get("target_username")) == uname and (include_done or item.get("status") != "done") ] return sorted(result, key=lambda x: str(x.get("created_at") or "")) def _task_center_counts_text(self, username: str) -> str: counts: dict[str, int] = {} for item in self._task_items_for_user(username): status = str(item.get("status") or "new") counts[status] = counts.get(status, 0) + 1 active_total = sum(counts.values()) if active_total <= 0: return "" parts = [] for status in ("new", "approved", "needs_work", "rejected"): count = counts.get(status, 0) if count: parts.append(f"{TASK_STATUS_LABELS.get(status, status)}: {count}") return f"Напоминание по задачам: всего активных {active_total}; " + ", ".join(parts) + "." def _format_task_items(self, username: str, *, include_done: bool = False) -> str: items = self._task_items_for_user(username, include_done=include_done) if not items: return f"Для {self._display_name(username)} активных задач и предложений нет." lines = [f"Задачи и предложения для {self._display_name(username)}:"] for item in items[:15]: kind = "предложение" if item.get("kind") == "proposal" else "задача" status = TASK_STATUS_LABELS.get(str(item.get("status") or "new"), str(item.get("status") or "new")) source = item.get("source_name") or item.get("source_username") or "неизвестно" title = item.get("title") or "(без названия)" lines.append(f"{item.get('id')} [{status}] {kind} от {source}: {title}") if len(items) > 15: lines.append(f"...и ещё {len(items) - 15}") return "\n".join(lines) def _update_task_item_status(self, item_id: str, status: str) -> dict[str, Any] | None: item_id = (item_id or "").strip().upper() items = self._load_task_items() updated = None for item in items: if str(item.get("id") or "").upper() == item_id: item["status"] = status item["updated_at"] = now_iso() updated = item break if updated is not None: self._save_task_items(items) return updated def _find_first_task_item(self, *, source_username: str = "", target_username: str = "", kind: str = "") -> dict[str, Any] | None: source = normalize_username(source_username) target = normalize_username(target_username) for item in self._load_task_items(): if item.get("status") == "done": continue if source and normalize_username(item.get("source_username")) != source: continue if target and normalize_username(item.get("target_username")) != target: continue if kind and item.get("kind") != kind: continue return item return None def _notify_user_about_task_item(self, username: str, item: dict[str, Any]) -> None: chat_id = self._private_chat_id_for_user(username) if chat_id is None: return kind = "предложение" if item.get("kind") == "proposal" else "задача" source = item.get("source_name") or item.get("source_username") or "кто-то" title = item.get("title") or "(без названия)" status = TASK_STATUS_LABELS.get(str(item.get("status") or "new"), str(item.get("status") or "new")) if item.get("status") == "new": text = f"У тебя новое {kind} от {source}: {title}\nID: {item.get('id')}" else: text = f"Обновление по {kind} {item.get('id')}: статус «{status}».\n{title}" self._safe_send(chat_id, text) def _send_player_welcome_once(self, chat_id: int, message_id: int, username: str) -> None: uname = normalize_username(username) sent = self.state.get("player_welcome_sent") if not isinstance(sent, dict): sent = {} self.state["player_welcome_sent"] = sent if sent.get(uname): return player_name = self._player_name(uname) text = ( f"Привет, {player_name}.\n" "Можно задавать вопросы по проекту, просить анализ, идеи и подготовку готового ТЗ.\n" "Команда /new начинает новую Codex-сессию и архивирует текущую историю." ) reminder = self._task_center_counts_text(uname) if reminder: text = f"{text}\n\n{reminder}\nКоманда /tasks покажет список." self._safe_send(chat_id, text, reply_to=message_id) sent[uname] = now_iso() self._persist_state() def _resolve_chat_id(self, chat_id: int) -> int: migrations = self.state.get("chat_id_migrations") if not isinstance(migrations, dict): return chat_id current = chat_id visited: set[int] = set() while current not in visited: visited.add(current) next_chat_id = migrations.get(str(current)) if not isinstance(next_chat_id, int): break current = next_chat_id return current def _remember_chat_migration(self, old_chat_id: int, new_chat_id: int, source: str) -> None: if old_chat_id == new_chat_id: return migrations = self.state.get("chat_id_migrations") if not isinstance(migrations, dict): migrations = {} self.state["chat_id_migrations"] = migrations if migrations.get(str(old_chat_id)) == new_chat_id: return migrations[str(old_chat_id)] = new_chat_id self._persist_state() with self.queue_lock: changed = False for job in self.queue: if job.get("chat_id") == old_chat_id: job["chat_id"] = new_chat_id job["updated_at"] = now_iso() changed = True if changed: self._persist_queue() self._append_history_event("chat_migrated_to_supergroup", { "oldChatId": old_chat_id, "newChatId": new_chat_id, "source": source, }) @staticmethod def _extract_migrate_to_chat_id(error_text: str) -> int | None: match = re.search(r'"migrate_to_chat_id"\s*:\s*(-?\d+)', error_text) if match: return int(match.group(1)) match = re.search(r"'migrate_to_chat_id'\s*:\s*(-?\d+)", error_text) if match: return int(match.group(1)) return None def _handle_update(self, update: dict[str, Any]) -> None: message = update.get("message") update_type = "message" if not isinstance(message, dict): message = update.get("channel_post") update_type = "channel_post" if not isinstance(message, dict): return chat = message.get("chat") or {} chat_id = chat.get("id") message_id = message.get("message_id") chat_type = str(chat.get("type") or "") chat_username = normalize_username(chat.get("username")) chat_title = str(chat.get("title") or "") sender = message.get("from") or {} username = normalize_username(sender.get("username")) author_signature = str(message.get("author_signature") or "").strip() author_username = username or normalize_username(author_signature) if not isinstance(chat_id, int) or not isinstance(message_id, int): return migrate_to_chat_id = message.get("migrate_to_chat_id") if isinstance(migrate_to_chat_id, int): self._remember_chat_migration(chat_id, migrate_to_chat_id, "telegram_message") return update_key = f"{chat_id}:{message_id}" if self._mark_processed_update(update_key): return is_channel_post = update_type == "channel_post" or chat_type == "channel" is_group_message = update_type == "message" and chat_type in ("group", "supergroup") is_allowed_channel = ( not is_channel_post or not self.cfg.allowed_channel_username or chat_username == self.cfg.allowed_channel_username ) if is_channel_post and not is_allowed_channel: return if chat_username and chat_username == self.cfg.allowed_channel_username: self._remember_public_report_chat(chat_id) # Игнорируем системные сообщения о входе/выходе и смене заголовка/фото. if message.get("new_chat_members") or message.get("left_chat_member"): return if message.get("group_chat_created") or message.get("supergroup_chat_created") or message.get("channel_chat_created"): return text = (message.get("text") or message.get("caption") or "").strip() actor_username = normalize_username(author_username) is_allowed = self._is_allowed_user(actor_username) is_private = chat_type == "private" if not is_allowed: if is_private: self._safe_send(chat_id, "Извините, доступ к этому агенту пока не выдан. Обратитесь к Айдару.", reply_to=message_id) return if self._is_allowed_player(actor_username) and not is_private: return self._ensure_user_session(actor_username) if is_private: self._remember_private_chat(actor_username, chat_id) history_path = self._current_history_file_for_user(actor_username) if self._is_allowed_player(actor_username): self._send_player_welcome_once(chat_id, message_id, actor_username) if not text: if message.get("voice"): self._enqueue_voice_job( chat_id, message_id, actor_username, message["voice"].get("file_id"), duration_seconds=message["voice"].get("duration"), telegram_file_size=message["voice"].get("file_size"), media_type="voice", update_type=update_type, chat_username=chat_username, chat_title=chat_title, author_signature=author_signature, chat_type=chat_type, ) return if message.get("audio"): self._enqueue_voice_job( chat_id, message_id, actor_username, message["audio"].get("file_id"), duration_seconds=message["audio"].get("duration"), telegram_file_size=message["audio"].get("file_size"), media_type="audio", update_type=update_type, chat_username=chat_username, chat_title=chat_title, author_signature=author_signature, chat_type=chat_type, ) return self._safe_send(chat_id, "Поддерживаются текст, voice и audio.", reply_to=message_id) return if text.startswith("/"): self._handle_command(chat_id, message_id, actor_username, text) return if self._handle_task_center_text(chat_id, message_id, actor_username, text): return self._append_history(history_path, "incoming_text", { "chatId": chat_id, "messageId": message_id, "updateType": update_type, "chatType": chat_type, "chatUsername": chat_username, "chatTitle": chat_title, "username": actor_username, "authorSignature": author_signature, "text": text, }) job = self._build_job_base(chat_id, message_id, actor_username, str(history_path)) job["type"] = "text" job["text"] = text job["update_type"] = update_type job["chat_type"] = chat_type job["chat_username"] = chat_username job["chat_title"] = chat_title job["author_signature"] = author_signature job["role"] = "owner" if self._is_owner(actor_username) else "player" job["player_name"] = self._player_name(actor_username) if job["role"] == "player" else "" with self.queue_lock: self.queue.append(job) self._persist_queue() if chat_type == "private": self._ensure_job_status_message( job["id"], chat_id, message_id, f"Задача #{job['num']} получена.\nСтатус: в очереди.", ) else: self._safe_send(chat_id, f"Принял задачу #{job['num']}", reply_to=message_id) def _enqueue_voice_job( self, chat_id: int, message_id: int, username: str, file_id: str | None, *, duration_seconds: int | None = None, telegram_file_size: int | None = None, media_type: str = "voice", update_type: str = "message", chat_username: str = "", chat_title: str = "", author_signature: str = "", chat_type: str = "", ) -> None: if not file_id: self._safe_send(chat_id, "Не удалось прочитать file_id голосового.", reply_to=message_id) return history_path = self._current_history_file_for_user(username) self._append_history(history_path, "incoming_voice", { "chatId": chat_id, "messageId": message_id, "updateType": update_type, "chatType": chat_type, "chatUsername": chat_username, "chatTitle": chat_title, "username": username, "authorSignature": author_signature, "fileId": file_id, "mediaType": media_type, "durationSeconds": duration_seconds, "fileSize": telegram_file_size, }) job = self._build_job_base(chat_id, message_id, username, str(history_path)) job["type"] = "voice" job["telegram_file_id"] = file_id job["telegram_media_type"] = media_type job["telegram_duration_seconds"] = duration_seconds or 0 job["telegram_file_size"] = telegram_file_size or 0 job["update_type"] = update_type job["chat_type"] = chat_type job["chat_username"] = chat_username job["chat_title"] = chat_title job["author_signature"] = author_signature job["role"] = "owner" if self._is_owner(username) else "player" job["player_name"] = self._player_name(username) if job["role"] == "player" else "" with self.queue_lock: self.queue.append(job) self._persist_queue() if chat_type == "private": self._ensure_job_status_message( job["id"], chat_id, message_id, f"Voice для задачи #{job['num']} получен.\nСтатус: в очереди.", ) else: self._safe_send(chat_id, f"Принял voice в задачу #{job['num']}", reply_to=message_id) def _build_job_base(self, chat_id: int, message_id: int, username: str, history_file: str) -> dict[str, Any]: with self.queue_lock: num = int(self.state.get("next_job_number", 1)) self.state["next_job_number"] = num + 1 self._persist_state() return { "id": str(uuid.uuid4()), "num": num, "status": "pending", "type": "text", "chat_id": chat_id, "message_id": message_id, "username": username, "update_type": "message", "chat_type": "", "chat_username": "", "chat_title": "", "author_signature": "", "role": "owner", "player_name": "", "text": "", "telegram_file_id": "", "telegram_media_type": "", "history_file": history_file, "attempts": 0, "retry_reason": "", "last_error": "", "created_at": now_iso(), "updated_at": now_iso(), "active_since": None, "status_message_id": None, "status_message_text": "", } def _handle_task_center_text(self, chat_id: int, message_id: int, username: str, text: str) -> bool: source_text = (text or "").strip() lower = source_text.lower() is_owner = self._is_owner(username) if re.search(r"\b(покажи|список|какие)\b.*\b(задач|задачи|предложени)", lower): target = username explicit_target = self._find_user_by_text(source_text) if is_owner and explicit_target: target = explicit_target self._safe_send(chat_id, self._format_task_items(target), reply_to=message_id) return True if is_owner: assign_match = re.search( r"(?:поставь|добавь|создай|запиши)\s+(?:задачу|задание)\s+(.+?)(?::|\s+-\s+|\s+—\s+)(.+)", source_text, flags=re.IGNORECASE | re.DOTALL, ) if assign_match: target = self._find_user_by_text(assign_match.group(1)) body = assign_match.group(2).strip() if target and body: item = self._create_task_item( kind="task", title=body, text=body, source_username=username, target_username=target, source_message_id=message_id, source_chat_id=chat_id, ) self._notify_user_about_task_item(target, item) self._safe_send( chat_id, f"Задача добавлена для {self._display_name(target)}: {item['id']} — {item['title']}", reply_to=message_id, ) return True status_match = re.search(r"\b(одобрить|отклонить|доработать|закрыть|сделано|закрыта)\b(?:\s+(.+))?", lower, flags=re.IGNORECASE) if status_match and ("задач" in lower or "предложени" in lower or re.search(r"tc-\d+", lower, flags=re.IGNORECASE)): action = status_match.group(1) tail = status_match.group(2) or "" status = { "одобрить": "approved", "отклонить": "rejected", "доработать": "needs_work", "закрыть": "done", "сделано": "done", "закрыта": "done", }.get(action, "new") id_match = re.search(r"tc-\d+", source_text, flags=re.IGNORECASE) item = None if id_match: item = self._update_task_item_status(id_match.group(0), status) else: source_user = self._find_user_by_text(tail) item = self._find_first_task_item( source_username=source_user, target_username=username, kind="proposal" if "предложени" in lower else "", ) if item: item = self._update_task_item_status(str(item.get("id")), status) if item: label = TASK_STATUS_LABELS.get(status, status) self._safe_send(chat_id, f"{item.get('id')} обновлена: {label}.", reply_to=message_id) source_user = normalize_username(item.get("source_username")) if source_user and source_user != username: self._notify_user_about_task_item(source_user, item) return True if not is_owner: proposal_match = re.match(r"\s*(?:предложение|идея|заявка)\s*[::-]\s*(.+)", source_text, flags=re.IGNORECASE | re.DOTALL) if proposal_match: body = proposal_match.group(1).strip() if body: item = self._create_task_item( kind="proposal", title=body, text=body, opinion="Нужно решение Айдара: одобрить, отклонить или отправить на доработку.", source_username=username, target_username=self.cfg.allowed_username, source_message_id=message_id, source_chat_id=chat_id, ) self._notify_user_about_task_item(self.cfg.allowed_username, item) self._safe_send( chat_id, f"Предложение отправлено Айдару как {item['id']}. Статус: новая.", reply_to=message_id, ) return True return False def _handle_command(self, chat_id: int, message_id: int, username: str, text: str) -> None: lower = text.lower() command = lower.split(maxsplit=1)[0].split("@", 1)[0] is_owner = self._is_owner(username) if command in ("/start", "/help"): self._safe_send(chat_id, self._help_text(is_owner=is_owner), reply_to=message_id) return if command == "/settings": self._safe_send(chat_id, self._settings_text(username), reply_to=message_id) return if command == "/status": self._safe_send(chat_id, self._status_text(username), reply_to=message_id) return if command == "/queue": self._safe_send(chat_id, self._queue_text(), reply_to=message_id) return if command in ("/tasks", "/my_tasks"): parts = text.split(maxsplit=1) target = username if is_owner and len(parts) > 1: parsed = self._find_user_by_text(parts[1]) if parsed: target = parsed self._safe_send(chat_id, self._format_task_items(target), reply_to=message_id) return if command == "/voice_on": self._set_voice_replies_enabled(username, True) self._append_history_event("voice_replies_enabled", {"username": normalize_username(username)}, username=username) self._safe_send(chat_id, "Озвучивание финальных ответов включено для вашего пользователя.", reply_to=message_id) return if command == "/voice_off": self._set_voice_replies_enabled(username, False) self._append_history_event("voice_replies_disabled", {"username": normalize_username(username)}, username=username) self._safe_send(chat_id, "Озвучивание финальных ответов выключено для вашего пользователя.", reply_to=message_id) return if command in ("/voice_status", "/voice_rewrite_status"): self._safe_send(chat_id, self._status_text(username), reply_to=message_id) return if command == "/voice_rewrite_on": self._set_voice_rewrite_enabled(username, True) self._append_history_event("voice_rewrite_enabled", {"username": normalize_username(username)}, username=username) self._safe_send(chat_id, "Адаптация текста перед озвучкой включена для вашего пользователя.", reply_to=message_id) return if command == "/voice_rewrite_off": self._set_voice_rewrite_enabled(username, False) self._append_history_event("voice_rewrite_disabled", {"username": normalize_username(username)}, username=username) self._safe_send(chat_id, "Адаптация текста перед озвучкой выключена для вашего пользователя.", reply_to=message_id) return if command == "/single_message_on": self._set_single_status_message_enabled(username, True) self._append_history_event("single_status_message_enabled", {"username": normalize_username(username)}, username=username) self._safe_send(chat_id, "Режим одного редактируемого сообщения в личке включён для вашего пользователя.", reply_to=message_id) return if command == "/single_message_off": self._set_single_status_message_enabled(username, False) self._append_history_event("single_status_message_disabled", {"username": normalize_username(username)}, username=username) self._safe_send(chat_id, "Режим одного редактируемого сообщения в личке выключен. Бот будет отправлять отдельные сообщения по этапам.", reply_to=message_id) return if command == "/new": archived = self._rotate_history("command_new", username) self._safe_send(chat_id, f"История очищена. Новый диалог начат.\nАрхив: {archived.name}", reply_to=message_id) return if command in ("/restart_service", "/restart"): if not is_owner: self._safe_send(chat_id, "Команда недоступна.", reply_to=message_id) return self._append_history_event("restart_service_deferred_requested", { "chatId": chat_id, "messageId": message_id, "username": username, }, username=username) self._safe_send( chat_id, "Отложенный рестарт принят. Если задача сейчас выполняется, сервис перезапустится после её завершения и до следующей задачи.", reply_to=message_id, ) self._request_deferred_restart() return if command in ("/restart_hard", "/restart_now", "/restart_force"): if not is_owner: self._safe_send(chat_id, "Команда недоступна.", reply_to=message_id) return self._append_history_event("restart_service_hard_requested", { "chatId": chat_id, "messageId": message_id, "username": username, }, username=username) self._safe_send( chat_id, "Выполняю жёсткий рестарт сервиса прямо сейчас. Активная задача, если есть, будет прервана и после старта вернётся в очередь.", reply_to=message_id, ) self._schedule_self_restart("hard_restart_requested", force=True) return if command == "/stop": stopped = self._cancel_active_job("stopped_by_user") if stopped: self._safe_send(chat_id, "Текущая задача остановлена и удалена из очереди.", reply_to=message_id) else: self._safe_send(chat_id, "Сейчас нет активной задачи.", reply_to=message_id) return if command == "/cancel": parts = text.split(maxsplit=1) if len(parts) < 2: self._safe_send(chat_id, "Использование: /cancel ", reply_to=message_id) return arg = parts[1].strip() if arg.lower() == "all": with self.queue_lock: self.stop_current_job = True self._stop_active_codex_process() count = len(self.queue) self.queue = [] self._persist_queue() self._safe_send(chat_id, f"Удалено задач из очереди: {count}", reply_to=message_id) return cancelled = self._cancel_by_id_prefix(arg) self._safe_send(chat_id, f"Задача удалена: {arg}" if cancelled else f"Задача не найдена: {arg}", reply_to=message_id) return def _help_text(self, *, is_owner: bool) -> str: lines = [ "Доступные команды:", "/status — активная задача и размер очереди", "/settings — текущие настройки и команды для их изменения", "/queue — список задач в очереди", "/tasks — список ваших задач и предложений", "/stop — остановить текущую задачу", "/cancel — удалить задачу по id (префикс) или все", "/new — архивировать историю и начать новую Codex-сессию", "/help — эта справка", ] if is_owner: lines.insert(-1, "/tasks <пользователь> — список задач игрока") lines.insert(-1, "/restart — отложенный рестарт после текущей задачи") lines.insert(-1, "/restart_hard — жёсткий рестарт прямо сейчас") return "\n".join(lines) def _settings_text(self, username: str) -> str: voice_status = "включено" if self._voice_replies_enabled(username) else "выключено" rewrite_status = "включена" if self._voice_rewrite_enabled(username) else "выключена" single_message_status = "включён" if self._single_status_message_enabled(username) else "выключен" lines = [ "Текущие настройки:", f"Озвучивание финальных ответов: {voice_status}", f"Адаптация текста перед озвучкой: {rewrite_status}", f"Режим одного редактируемого сообщения в личке: {single_message_status}", "", "Команды настроек:", "/voice_on — включить озвучивание", "/voice_off — выключить озвучивание", "/voice_rewrite_on — адаптировать текст перед озвучкой", "/voice_rewrite_off — озвучивать обычный текст без адаптации", "/single_message_on — один редактируемый ответ в личке", "/single_message_off — отдельные сообщения по этапам и финалу", ] return "\n".join(lines) def _status_text(self, username: str) -> str: with self.queue_lock: active = next((j for j in self.queue if j.get("status") == "active"), None) pending = sum(1 for j in self.queue if j.get("status") == "pending") voice_status = "включено" if self._voice_replies_enabled(username) else "выключено" rewrite_status = "включена" if self._voice_rewrite_enabled(username) else "выключена" single_message_status = "включён" if self._single_status_message_enabled(username) else "выключен" settings_text = ( f"Голосовые ответы: {voice_status}\n" f"Адаптация текста перед озвучкой: {rewrite_status}\n" f"Режим одного сообщения в личке: {single_message_status}" ) restart_text = "\nОтложенный рестарт: ожидает завершения текущей задачи" if self.restart_requested else "" if not active: return f"Статус: активной задачи нет.\nВ очереди pending: {pending}\n{settings_text}{restart_text}" elapsed = int(time.time() - (self.active_job_started_at or time.time())) return ( f"Статус: активная задача #{active.get('num', '?')}\n" f"Тип: {active.get('type', 'text')}\n" f"Попытка: {int(active.get('attempts', 0)) + 1}/{self.cfg.max_retries}\n" f"Выполняется: {elapsed}с\n" f"Pending: {pending}\n" f"{settings_text}{restart_text}" ) def _queue_text(self) -> str: with self.queue_lock: items = list(self.queue) if not items: return "Очередь пуста." lines = [f"Очередь: {len(items)}"] for i, job in enumerate(items[:10], start=1): lines.append( f"{i}) #{job.get('num', '?')} [{job.get('status')}] {job.get('type')} attempts={job.get('attempts', 0)}" ) if len(items) > 10: lines.append(f"...и ещё {len(items) - 10} задач") return "\n".join(lines) def _cancel_active_job(self, reason: str) -> bool: with self.queue_lock: active = next((j for j in self.queue if j.get("status") == "active"), None) if not active: return False self.stop_current_job = True self._stop_active_codex_process() self.queue = [j for j in self.queue if j.get("id") != active.get("id")] self._persist_queue() self._append_history_event("job_stopped_by_user", {"jobId": active.get("id"), "reason": reason}) return True def _cancel_by_id_prefix(self, prefix: str) -> bool: prefix = prefix.strip().lower() normalized_num = prefix.lstrip("#") with self.queue_lock: target = next( ( j for j in self.queue if str(j.get("id", "")).lower().startswith(prefix) or str(j.get("num", "")).lower() == normalized_num ), None ) if not target: return False if target.get("status") == "active": self.stop_current_job = True self._stop_active_codex_process() self.queue = [j for j in self.queue if j.get("id") != target.get("id")] self._persist_queue() return True def _worker_loop(self) -> None: while not self.stop_event.is_set(): if self.restart_requested: self._exit_for_restart("deferred_restart_before_next_job") return job = None with self.queue_lock: for item in self.queue: if item.get("status") == "pending": item["status"] = "active" item["active_since"] = now_iso() item["updated_at"] = now_iso() self.active_job_id = item.get("id") self.active_job_started_at = time.time() job = dict(item) self._persist_queue() break if not job: time.sleep(0.5) continue self.stop_current_job = False self._process_job(job) self.active_job_id = None self.active_job_started_at = None if self.restart_requested: self._exit_for_restart("deferred_restart_after_job") return def _process_job(self, job: dict[str, Any]) -> None: job_id = job["id"] job_num = job.get("num", "?") chat_id = int(job["chat_id"]) message_id = int(job["message_id"]) history_path = Path(job["history_file"]) private_single_message = ( (job.get("chat_type") or "") == "private" and self._single_status_message_enabled(job.get("username") or "") ) self._set_job_status_text(job, f"Задача #{job_num} в работе.\nСтатус: выполняется.") try: if job.get("type") == "voice": self._set_job_status_text(job, f"Задача #{job_num} в работе.\nСтатус: распознаю voice.") recognized = self._transcribe_voice_job( job, status_cb=lambda note: self._set_job_status_text( job, f"Задача #{job_num} в работе.\nСтатус: {note}", ), ) job["text"] = recognized self._append_history(history_path, "voice_transcription", {"jobId": job_id, "jobNum": job_num, "text": recognized}) preview = recognized.strip() if len(preview) > 800: preview = preview[:800].rstrip() + " ...[обрезано]" self._set_job_status_text( job, f"Задача #{job_num} в работе.\nСтатус: voice распознан, отправляю в Codex.\n\nТекст:\n{preview}", ) prompt = self._build_prompt(job) self._append_history(history_path, "codex_request", {"jobId": job_id, "prompt": prompt}) self._set_job_status_text(job, f"Задача #{job_num} в работе.\nСтатус: выполняю через Codex.") answer = self._run_codex(prompt, job) if private_single_message: parts = split_final_private_text(answer) self._set_job_status_text(job, parts[0]) if len(parts) > 1: self._safe_send(chat_id, parts[1], reply_to=message_id) else: for chunk in split_long_text(answer): self._safe_send(chat_id, chunk, reply_to=message_id) self._append_history(history_path, "codex_response", {"jobId": job_id, "text": answer}) self._send_private_job_public_report(job, answer) self._send_task_center_reminder(job) if self._voice_replies_enabled(job.get("username") or ""): self._send_voice_reply_for_answer(job, answer, history_path, job_id) self._mark_job_done(job_id) except Exception as e: if self.stop_current_job: self._append_history(history_path, "job_stopped", {"jobId": job_id, "reason": str(e)}) self._set_job_status_text(job, f"Задача #{job_num} остановлена.") self._mark_job_removed(job_id) self.stop_current_job = False return if isinstance(e, VoiceTranscriptionError): self._append_history(history_path, "voice_transcription_failed", { "jobId": job_id, "jobNum": job_num, "stage": e.stage, "retryable": e.retryable, "error": e.user_message, "detail": e.detail, }) self._handle_job_failure(job, e) def _build_prompt(self, job: dict[str, Any]) -> str: retry_block = "" retry_reason = (job.get("retry_reason") or "").strip() if retry_reason: retry_block = f"\n\nПометка retry: {retry_reason}" role = (job.get("role") or "owner").strip().lower() player_block = "" if role == "player": player_name = (job.get("player_name") or "").strip() player_dir = self.cfg.codex_workdir / "Players" / (job.get("username") or "") player_block = ( "\n\nРежим игрока (обязательно):\n" f"- Пользователь: {player_name} (@{job.get('username')}).\n" f"- Рабочая папка игрока: {player_dir}\n" "- Код проекта не изменять.\n" "- Можно отвечать на вопросы по проекту, предлагать идеи и готовить ТЗ.\n" "- Если нужны правки кода, описывать предложение текстом и сохранять материалы только в папке игрока." ) return ( "Пришло сообщение в Telegram.\n" f"Тип: {job.get('type')}\n" f"Источник Telegram: {job.get('update_type', 'message')}\n" f"Тип чата: {job.get('chat_type') or ''}\n" f"Канал/чат: @{job.get('chat_username') or ''} {job.get('chat_title') or ''}\n" f"Username отправителя: @{job.get('username')}\n" f"Подпись автора в Telegram: {job.get('author_signature') or ''}\n" "Текст для обработки:\n" f"{job.get('text')}\n\n" f"История диалога (JSONL): {job.get('history_file')}\n" f"Инструкции агента: {self.cfg.agent_instructions_file}\n" f"Работай в рабочем проекте аккуратно и верни только текст ответа пользователю.{player_block}{retry_block}" ) def _run_codex(self, prompt: str, job: dict[str, Any]) -> str: username = job.get("username") or self.cfg.allowed_username thread_id = self._codex_thread_id_for_user(username) try: return self._run_codex_once(prompt, job, thread_id=thread_id) except RuntimeError as e: if not thread_id or not self._is_missing_codex_session_error(str(e)): raise self._set_codex_thread_id_for_user(username, "") self._append_history( Path(job["history_file"]), "system_event", { "event": "codex_thread_reset", "reason": "missing_session", "username": normalize_username(username), "oldThreadId": thread_id, }, ) return self._run_codex_once(prompt, job, thread_id="") def _run_codex_once(self, prompt: str, job: dict[str, Any], *, thread_id: str) -> str: output_lines: list[str] = [] job_id = str(job["id"]) job_num = job.get("num", "?") username = job.get("username") or self.cfg.allowed_username with tempfile.NamedTemporaryFile(prefix="shine-codex-last-message-", suffix=".txt", delete=False) as tmp: output_file = Path(tmp.name) cmd = [ str(self.cfg.codex_bin), "exec", "--dangerously-bypass-approvals-and-sandbox", "--json", "-C", str(self.cfg.codex_workdir), "-o", str(output_file), ] if thread_id: cmd.extend(["resume", thread_id]) cmd.append(prompt) mode = f"resume {thread_id}" if thread_id else "new" print(f"[py-bot] codex exec start job={job_id[:8]} mode={mode}", flush=True) process = subprocess.Popen( cmd, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, encoding="utf-8", errors="replace", bufsize=1, ) with self.active_process_lock: self.active_process = process self.last_heartbeat_at = 0.0 last_user_note = "" last_user_note_at = 0.0 codex_started_at = time.time() last_job_message_at = codex_started_at seen_thread_id = "" def on_line(line: str) -> None: nonlocal last_user_note, last_user_note_at, last_job_message_at, seen_thread_id output_lines.append(line) current_thread_id = self._extract_codex_thread_id(line) if current_thread_id: seen_thread_id = current_thread_id note = self._extract_codex_user_note(line) now = time.time() if note and note != last_user_note and now - last_user_note_at > 8: self._set_job_status_text(job, f"Задача #{job_num} в работе.\nСтатус: {note}") last_user_note = note last_user_note_at = now last_job_message_at = now reader_done = threading.Event() def reader() -> None: if not process.stdout: reader_done.set() return for line in process.stdout: on_line(line.rstrip("\n")) reader_done.set() t = threading.Thread(target=reader, name=f"codex-reader-{job_id[:8]}", daemon=True) t.start() try: deadline = time.time() + self.cfg.codex_timeout_seconds return_code = None while return_code is None: return_code = process.poll() now = time.time() if return_code is not None: break if now >= deadline: process.kill() t.join(timeout=2) raise RuntimeError(f"Codex timeout after {self.cfg.codex_timeout_seconds}s") if now - codex_started_at >= 120 and now - last_job_message_at >= 120: elapsed = self._format_duration(int(now - codex_started_at)) self._set_job_status_text( job, f"Задача #{job_num} в работе.\nСтатус: выполняется уже {elapsed}, от Codex давно нет новых сообщений.", ) last_job_message_at = now self.last_heartbeat_at = now time.sleep(1) finally: with self.active_process_lock: self.active_process = None reader_done.wait(timeout=2) if return_code != 0: tail = "\n".join(output_lines[-40:]) raise RuntimeError(f"Codex exited with code {return_code}. Output tail:\n{tail}") if seen_thread_id and seen_thread_id != thread_id: self._set_codex_thread_id_for_user(username, seen_thread_id) if output_file.exists(): answer = output_file.read_text(encoding="utf-8").strip() try: output_file.unlink(missing_ok=True) except Exception: pass if answer: return answer fallback = self._extract_fallback_message(output_lines) if not fallback: raise RuntimeError("Codex returned empty response") return fallback def _stop_active_codex_process(self) -> bool: with self.active_process_lock: process = self.active_process if process is None: return False if process.poll() is not None: return False process.terminate() try: process.wait(timeout=2) except subprocess.TimeoutExpired: process.kill() return True def _handle_job_failure(self, job: dict[str, Any], err: Exception) -> None: job_id = job["id"] job_num = job.get("num", "?") error_text = str(err).strip() or err.__class__.__name__ user_error_text = self._user_error_text(err) retryable = not isinstance(err, VoiceTranscriptionError) or err.retryable log_error_text = err.log_text() if isinstance(err, VoiceTranscriptionError) else error_text print(f"[py-bot] Ошибка job={job_id[:8]}: {log_error_text}", flush=True) print(traceback.format_exc(), flush=True) with self.queue_lock: target = next((j for j in self.queue if j.get("id") == job_id), None) if not target: return attempts = int(target.get("attempts", 0)) + 1 target["attempts"] = attempts target["last_error"] = error_text[:1000] target["updated_at"] = now_iso() if retryable and attempts < self.cfg.max_retries: target["status"] = "pending" target["retry_reason"] = error_text[:200] self._persist_queue() will_retry = True else: self.queue = [j for j in self.queue if j.get("id") != job_id] self._persist_queue() will_retry = False if will_retry: self._set_job_status_text( job, f"{user_error_text}\nПовторю задачу #{job_num}: попытка {attempts + 1}/{self.cfg.max_retries}.", ) else: self._set_job_status_text(job, f"{user_error_text}\nЗадача #{job_num} остановлена.") def _user_error_text(self, err: Exception) -> str: if isinstance(err, VoiceTranscriptionError): return f"Не удалось распознать голосовое: {err.user_message}" error_text = str(err).strip() or err.__class__.__name__ return f"Ошибка выполнения задачи: {error_text}" def _mark_job_done(self, job_id: str) -> None: with self.queue_lock: self.queue = [j for j in self.queue if j.get("id") != job_id] self._persist_queue() def _mark_job_removed(self, job_id: str) -> None: with self.queue_lock: self.queue = [j for j in self.queue if j.get("id") != job_id] self._persist_queue() def _send_task_center_reminder(self, job: dict[str, Any]) -> None: if job.get("chat_type") != "private": return username = job.get("username") or "" reminder = self._task_center_counts_text(username) if not reminder: return self._safe_send(int(job["chat_id"]), reminder, reply_to=int(job["message_id"])) def _remember_public_report_chat(self, chat_id: int) -> None: if self.state.get("public_report_chat_id") == chat_id: return self.state["public_report_chat_id"] = chat_id self.state["updated_at"] = now_iso() self._persist_state() def _public_report_chat_id(self) -> int | str | None: chat_id = self.state.get("public_report_chat_id") if isinstance(chat_id, int): return self._resolve_chat_id(chat_id) if self.cfg.allowed_channel_username: return f"@{self.cfg.allowed_channel_username}" return None def _send_private_job_public_report(self, job: dict[str, Any], answer: str) -> None: if job.get("chat_type") != "private": return report_chat_id = self._public_report_chat_id() if report_chat_id is None: return job_num = job.get("num", "?") source_text = (job.get("text") or "").strip() if not source_text: source_text = "(пустой текст запроса)" role = (job.get("role") or "owner").strip().lower() author_label = "Айдар" if role == "player": player_name = (job.get("player_name") or "").strip() or job.get("username") or "Игрок" author_label = f"{player_name} (@{job.get('username')})" if job.get("type") == "voice": voice_file_id = (job.get("telegram_file_id") or "").strip() media_type = (job.get("telegram_media_type") or "voice").strip() request_caption = self._trim_telegram_caption( f"{author_label} сделал {media_type}-запрос, задача #{job_num}.\n\n" f"Распознанный текст:\n{source_text}" ) request_message_id = None if voice_file_id: request_message_id = self._safe_send_telegram_file( report_chat_id, voice_file_id, media_type=media_type, caption=request_caption, ) if request_message_id is None: request_message_id = self._safe_send(report_chat_id, request_caption) else: request_report = ( f"{author_label} сделал запрос, задача #{job_num}.\n\n" f"{source_text}" ) request_message_id = self._safe_send(report_chat_id, request_report) if request_message_id is None: return answer_text = (answer or "").strip() or "(пустой ответ)" answer_chunks = split_long_text(f"Ответ на задачу #{job_num}:\n\n{answer_text}") for chunk in answer_chunks: self._safe_send(report_chat_id, chunk, reply_to=request_message_id) @staticmethod def _trim_telegram_caption(text: str, limit: int = 1000) -> str: text = (text or "").strip() if len(text) <= limit: return text return text[:limit].rstrip() + "\n...[обрезано]" def _safe_send_telegram_file( self, chat_id: int | str, file_id: str, *, media_type: str = "voice", caption: str = "", reply_to: int | None = None, ) -> int | None: file_id = (file_id or "").strip() if not file_id: return None caption = self._trim_telegram_caption(caption) resolved_chat_id: int | str = self._resolve_chat_id(chat_id) if isinstance(chat_id, int) else chat_id resolved_reply_to = reply_to if resolved_chat_id == chat_id or isinstance(chat_id, str) else None def send(target_chat_id: int | str, target_reply_to: int | None) -> dict[str, Any]: if media_type == "audio": return self.telegram.send_audio(target_chat_id, file_id, caption=caption, reply_to_message_id=target_reply_to) return self.telegram.send_voice(target_chat_id, file_id, caption=caption, reply_to_message_id=target_reply_to) try: sent = send(resolved_chat_id, resolved_reply_to) result = sent.get("result") or {} message_id = result.get("message_id") return message_id if isinstance(message_id, int) else None except Exception as e: migrate_to_chat_id = self._extract_migrate_to_chat_id(str(e)) if migrate_to_chat_id is not None: if isinstance(resolved_chat_id, int): self._remember_chat_migration(resolved_chat_id, migrate_to_chat_id, "send_file_error") try: sent = send(migrate_to_chat_id, None) result = sent.get("result") or {} message_id = result.get("message_id") return message_id if isinstance(message_id, int) else None except Exception as retry_error: print(f"[py-bot] sendFile retry after migration error: {retry_error}", flush=True) return None print(f"[py-bot] sendFile error: {e}", flush=True) return None def _safe_send_voice_upload( self, chat_id: int | str, voice_bytes: bytes, filename: str, *, caption: str = "", reply_to: int | None = None, ) -> int | None: if not voice_bytes: return None caption = self._trim_telegram_caption(caption) resolved_chat_id: int | str = self._resolve_chat_id(chat_id) if isinstance(chat_id, int) else chat_id resolved_reply_to = reply_to if resolved_chat_id == chat_id or isinstance(chat_id, str) else None def send(target_chat_id: int | str, target_reply_to: int | None) -> dict[str, Any]: return self.telegram.send_voice_upload( target_chat_id, voice_bytes, filename, caption=caption, reply_to_message_id=target_reply_to, ) try: sent = send(resolved_chat_id, resolved_reply_to) result = sent.get("result") or {} message_id = result.get("message_id") return message_id if isinstance(message_id, int) else None except Exception as e: migrate_to_chat_id = self._extract_migrate_to_chat_id(str(e)) if migrate_to_chat_id is not None: if isinstance(resolved_chat_id, int): self._remember_chat_migration(resolved_chat_id, migrate_to_chat_id, "send_voice_upload_error") try: sent = send(migrate_to_chat_id, None) result = sent.get("result") or {} message_id = result.get("message_id") return message_id if isinstance(message_id, int) else None except Exception as retry_error: print(f"[py-bot] sendVoiceUpload retry after migration error: {retry_error}", flush=True) return None print(f"[py-bot] sendVoiceUpload error: {e}", flush=True) return None def _safe_send(self, chat_id: int | str, text: str, reply_to: int | None = None) -> int | None: text = (text or "").strip() if not text: return None if len(text) > 3900: text = text[:3900] + "\n...[обрезано]" resolved_chat_id: int | str = self._resolve_chat_id(chat_id) if isinstance(chat_id, int) else chat_id resolved_reply_to = reply_to if resolved_chat_id == chat_id or isinstance(chat_id, str) else None try: sent = self.telegram.send_message(resolved_chat_id, text, reply_to_message_id=resolved_reply_to) result = sent.get("result") or {} message_id = result.get("message_id") return message_id if isinstance(message_id, int) else None except Exception as e: migrate_to_chat_id = self._extract_migrate_to_chat_id(str(e)) if migrate_to_chat_id is not None: if isinstance(resolved_chat_id, int): self._remember_chat_migration(resolved_chat_id, migrate_to_chat_id, "send_message_error") try: sent = self.telegram.send_message(migrate_to_chat_id, text, reply_to_message_id=None) result = sent.get("result") or {} message_id = result.get("message_id") return message_id if isinstance(message_id, int) else None except Exception as retry_error: print(f"[py-bot] sendMessage retry after migration error: {retry_error}", flush=True) return None print(f"[py-bot] sendMessage error: {e}", flush=True) return None def _safe_edit(self, chat_id: int | str, message_id: int | None, text: str) -> bool: text = (text or "").strip() if not text or not message_id: return False if len(text) > 3900: text = text[:3900] + "\n...[обрезано]" resolved_chat_id: int | str = self._resolve_chat_id(chat_id) if isinstance(chat_id, int) else chat_id try: self.telegram.edit_message_text(resolved_chat_id, message_id, text) return True except Exception as e: error_text = str(e) if "message is not modified" in error_text: return True migrate_to_chat_id = self._extract_migrate_to_chat_id(error_text) if migrate_to_chat_id is not None: if isinstance(resolved_chat_id, int): self._remember_chat_migration(resolved_chat_id, migrate_to_chat_id, "edit_message_error") try: self.telegram.edit_message_text(migrate_to_chat_id, message_id, text) return True except Exception as retry_error: print(f"[py-bot] editMessageText retry after migration error: {retry_error}", flush=True) return False print(f"[py-bot] editMessageText error: {e}", flush=True) return False def _ensure_job_status_message(self, job_id: str, chat_id: int, reply_to_message_id: int, text: str) -> int | None: text = (text or "").strip() if not text: return None with self.queue_lock: target = next((j for j in self.queue if j.get("id") == job_id), None) if target: existing_id = target.get("status_message_id") existing_text = (target.get("status_message_text") or "").strip() else: existing_id = None existing_text = "" if existing_id and existing_text == text and self._safe_edit(chat_id, int(existing_id), text): return int(existing_id) if existing_id and self._safe_edit(chat_id, int(existing_id), text): with self.queue_lock: target = next((j for j in self.queue if j.get("id") == job_id), None) if target: target["status_message_text"] = text target["updated_at"] = now_iso() self._persist_queue() return int(existing_id) message_id = self._safe_send(chat_id, text, reply_to=reply_to_message_id) if message_id is not None: with self.queue_lock: target = next((j for j in self.queue if j.get("id") == job_id), None) if target: target["status_message_id"] = message_id target["status_message_text"] = text target["updated_at"] = now_iso() self._persist_queue() return message_id def _set_job_status_text(self, job: dict[str, Any], text: str) -> None: if (job.get("chat_type") or "") != "private": self._safe_send(int(job["chat_id"]), text, reply_to=int(job["message_id"])) return if not self._single_status_message_enabled(job.get("username") or ""): self._safe_send(int(job["chat_id"]), text, reply_to=int(job["message_id"])) return message_id = self._ensure_job_status_message( job["id"], int(job["chat_id"]), int(job["message_id"]), text, ) if message_id is not None: job["status_message_id"] = message_id job["status_message_text"] = text def _request_deferred_restart(self) -> None: if self.restart_requested: return self.restart_requested = True self._append_history_event("restart_service_deferred_scheduled", {}) with self.queue_lock: has_active = any(j.get("status") == "active" for j in self.queue) if not has_active: threading.Thread( target=lambda: self._exit_for_restart("deferred_restart_no_active_job"), name="shine-py-bot-deferred-restart", daemon=True, ).start() def _exit_for_restart(self, reason: str) -> None: print(f"[py-bot] restart now: {reason}", flush=True) self._append_history_event("restart_service_executing", {"reason": reason}) time.sleep(0.5) os._exit(0) def _schedule_self_restart(self, reason: str = "restart_requested", *, force: bool = False) -> None: if self.restart_requested and not force: return self.restart_requested = True def restart() -> None: time.sleep(1.5) print(f"[py-bot] restart requested by Telegram command: {reason}", flush=True) if force: self._stop_active_codex_process() os._exit(0) threading.Thread(target=restart, name="shine-py-bot-self-restart", daemon=True).start() def _voice_reply_targets(self, job: dict[str, Any]) -> list[tuple[int | str, int | None, str]]: chat_id = int(job["chat_id"]) message_id = int(job["message_id"]) targets: list[tuple[int | str, int | None, str]] = [(chat_id, message_id, "source")] username = job.get("username") or "" private_chat_id = self._private_chat_id_for_user(username) if private_chat_id is not None and private_chat_id != self._resolve_chat_id(chat_id): targets.append((private_chat_id, None, "private")) report_chat_id = self._public_report_chat_id() if report_chat_id is not None: resolved_report_chat_id = self._resolve_chat_id(report_chat_id) if isinstance(report_chat_id, int) else report_chat_id resolved_current_chat_id: int | str = self._resolve_chat_id(chat_id) if resolved_report_chat_id != resolved_current_chat_id: targets.append((report_chat_id, None, "public")) deduped: list[tuple[int | str, int | None, str]] = [] seen: set[str] = set() for target_chat_id, target_reply_to, label in targets: resolved: int | str = self._resolve_chat_id(target_chat_id) if isinstance(target_chat_id, int) else target_chat_id key = str(resolved) if key in seen: continue seen.add(key) deduped.append((target_chat_id, target_reply_to, label)) return deduped def _send_voice_reply_for_answer( self, job: dict[str, Any], answer: str, history_path: Path, job_id: str, ) -> None: chat_id = int(job["chat_id"]) message_id = int(job["message_id"]) job_num = job.get("num", "?") username = job.get("username") or "" if not self.cfg.openai_api_key: note = "не настроен ключ OpenAI для озвучивания." self._append_history(history_path, "voice_reply_failed", {"jobId": job_id, "jobNum": job_num, "error": note}) self._safe_send(chat_id, f"Озвучивание включено, но {note}", reply_to=message_id) return voice_text = answer rewrite_enabled = self._voice_rewrite_enabled(username) if rewrite_enabled: try: voice_text = self._openai_rewrite_text_for_voice(answer) self._append_history(history_path, "voice_rewrite_done", { "jobId": job_id, "jobNum": job_num, "model": self.cfg.openai_voice_rewrite_model, "sourceChars": len(answer or ""), "resultChars": len(voice_text or ""), }) except VoiceReplyError as e: self._append_history(history_path, "voice_rewrite_failed", { "jobId": job_id, "jobNum": job_num, "model": self.cfg.openai_voice_rewrite_model, "error": str(e), }) self._safe_send( chat_id, f"Не удалось адаптировать ответ #{job_num} для озвучки, озвучиваю обычный текст: {e}", reply_to=message_id, ) voice_text = answer chunks = split_text_for_tts(voice_text, self.cfg.openai_tts_chunk_chars) if not chunks: return sent_count = 0 total = len(chunks) targets = self._voice_reply_targets(job) print(f"[py-bot] tts start job={str(job_id)[:8]} chunks={total} targets={len(targets)} rewrite={rewrite_enabled}", flush=True) for index, chunk in enumerate(chunks, start=1): try: audio = self._openai_tts(chunk) except VoiceReplyError as e: self._append_history(history_path, "voice_reply_failed", { "jobId": job_id, "jobNum": job_num, "part": index, "parts": total, "error": str(e), }) self._safe_send(chat_id, f"Не удалось озвучить ответ #{job_num}: {e}", reply_to=message_id) return caption = f"Озвучка ответа #{job_num}" if total > 1: caption += f", часть {index}/{total}" for target_chat_id, target_reply_to, target_label in targets: message_sent = self._safe_send_voice_upload( target_chat_id, audio, f"shine-answer-{job_num}-{index}.ogg", caption=caption, reply_to=target_reply_to, ) if message_sent is None: self._append_history(history_path, "voice_reply_failed", { "jobId": job_id, "jobNum": job_num, "part": index, "parts": total, "target": target_label, "error": "Telegram не принял voice-файл озвучки.", }) if target_label == "source": self._safe_send(chat_id, f"Озвучка ответа #{job_num} создана, но Telegram не принял voice-файл.", reply_to=message_id) continue sent_count += 1 self._append_history(history_path, "voice_reply_sent", { "jobId": job_id, "jobNum": job_num, "parts": total, "messages": sent_count, "targets": len(targets), "rewriteEnabled": rewrite_enabled, }) print(f"[py-bot] tts done job={str(job_id)[:8]} sent={sent_count}", flush=True) def _openai_rewrite_text_for_voice(self, text: str) -> str: source = (text or "").strip() if not source: return "" if len(source) > self.cfg.openai_voice_rewrite_max_input_chars: source = source[:self.cfg.openai_voice_rewrite_max_input_chars].rstrip() + "\n\n...[текстовый ответ был длиннее и обрезан для голосовой версии]" payload = { "model": self.cfg.openai_voice_rewrite_model, "messages": [ { "role": "system", "content": ( "Ты готовишь русскую версию финального ответа технического агента для озвучивания. " "Не пересказывай заново и не меняй смысл: сохрани порядок мыслей, итог, предупреждения, статусы и важные действия. " "Мягко убери только то, что плохо воспринимается на слух: длинные пути, хэши, ID, команды, JSON, " "длинные списки файлов, точные размеры и счётчики символов. Если деталь важна, замени её коротким описанием. " "Не добавляй новых фактов. Пиши естественно, без markdown, близко к исходному тексту." ), }, { "role": "user", "content": f"Переделай этот финальный текст в вариант для озвучки:\n\n{source}", }, ], "temperature": 0.2, "max_tokens": self.cfg.openai_voice_rewrite_max_output_tokens, } data = json.dumps(payload, ensure_ascii=False).encode("utf-8") req = request.Request("https://api.openai.com/v1/chat/completions", method="POST", data=data) req.add_header("Authorization", f"Bearer {self.cfg.openai_api_key}") req.add_header("Content-Type", "application/json") try: with request.urlopen(req, timeout=self.cfg.openai_voice_rewrite_timeout_seconds) as resp: raw = resp.read().decode("utf-8", errors="replace") except TimeoutError as e: raise VoiceReplyError( f"OpenAI не успел адаптировать текст за {self.cfg.openai_voice_rewrite_timeout_seconds} секунд." ) from e except error.HTTPError as e: detail = e.read().decode("utf-8", errors="replace") if e.code == 401: message = "OpenAI отклонил ключ API для адаптации текста." elif e.code == 429: message = "OpenAI временно ограничил адаптацию текста из-за лимита запросов." elif e.code >= 500: message = "OpenAI временно не смог адаптировать текст." else: message = f"OpenAI вернул ошибку HTTP {e.code} при адаптации текста." if detail: message = f"{message} Детали: {detail[:500]}" raise VoiceReplyError(message) from e except error.URLError as e: raise VoiceReplyError(f"не удалось отправить текст в OpenAI для адаптации из-за сетевой ошибки: {e.reason}") from e try: body = json.loads(raw) content = (((body.get("choices") or [{}])[0].get("message") or {}).get("content") or "").strip() except Exception as e: raise VoiceReplyError("OpenAI вернул неразборчивый ответ при адаптации текста.") from e if not content: raise VoiceReplyError("OpenAI вернул пустой текст адаптации.") return content def _openai_tts(self, text: str) -> bytes: payload = { "model": self.cfg.openai_tts_model, "voice": self.cfg.openai_tts_voice, "input": text, "response_format": self.cfg.openai_tts_response_format, } data = json.dumps(payload, ensure_ascii=False).encode("utf-8") req = request.Request("https://api.openai.com/v1/audio/speech", method="POST", data=data) req.add_header("Authorization", f"Bearer {self.cfg.openai_api_key}") req.add_header("Content-Type", "application/json") try: with request.urlopen(req, timeout=self.cfg.openai_tts_timeout_seconds) as resp: audio = resp.read() except TimeoutError as e: raise VoiceReplyError(f"OpenAI не успел сгенерировать речь за {self.cfg.openai_tts_timeout_seconds} секунд.") from e except error.HTTPError as e: detail = e.read().decode("utf-8", errors="replace") if e.code == 401: message = "OpenAI отклонил ключ API для озвучивания." elif e.code == 429: message = "OpenAI временно ограничил озвучивание из-за лимита запросов." elif e.code >= 500: message = "OpenAI временно не смог сгенерировать речь." else: message = f"OpenAI вернул ошибку HTTP {e.code} при озвучивании." if detail: message = f"{message} Детали: {detail[:500]}" raise VoiceReplyError(message) from e except error.URLError as e: raise VoiceReplyError(f"не удалось отправить текст в OpenAI TTS из-за сетевой ошибки: {e.reason}") from e if not audio: raise VoiceReplyError("OpenAI вернул пустой аудиофайл.") return audio def _transcribe_voice_job( self, job: dict[str, Any], *, status_cb: Callable[[str], Any] | None = None, ) -> str: if not self.cfg.openai_api_key: raise VoiceTranscriptionError( "не настроен ключ OpenAI для распознавания.", stage="config", retryable=False, ) file_id = (job.get("telegram_file_id") or "").strip() if not file_id: raise VoiceTranscriptionError( "Telegram не передал идентификатор файла.", stage="telegram_file_id", retryable=False, ) job_id = str(job.get("id") or "")[:8] job_num = job.get("num", "?") media_type = (job.get("telegram_media_type") or "voice").strip() duration_seconds = int(job.get("telegram_duration_seconds") or 0) telegram_file_size = int(job.get("telegram_file_size") or 0) file_looks_big_for_cloud = self._telegram_cloud_download_is_likely_too_big(telegram_file_size) if file_looks_big_for_cloud and status_cb is not None: status_cb( "файл большой, всё равно пробую скачать его из Telegram. " f"Предварительный размер около {self._bytes_to_mb(telegram_file_size)} MB." ) started_at = time.time() print(f"[py-bot] transcribe start job={job_id} num={job_num} media={media_type}", flush=True) file_bytes, filename = self._download_telegram_file(file_id) print( f"[py-bot] transcribe downloaded job={job_id} filename={filename} size={len(file_bytes)} bytes", flush=True, ) if file_looks_big_for_cloud and status_cb is not None: status_cb( "скачивание из Telegram прошло успешно. " f"Фактический размер около {self._bytes_to_mb(len(file_bytes))} MB, дальше готовлю аудио и отправляю в OpenAI." ) prepared_parts = self._prepare_audio_parts_for_transcription( file_bytes, filename, duration_seconds=duration_seconds, job_id=job_id, job_num=job_num, ) print( f"[py-bot] transcribe prepared job={job_id} parts={len(prepared_parts)} duration={duration_seconds}s", flush=True, ) parts_text: list[str] = [] prompt_tail = "" for index, (part_bytes, part_name) in enumerate(prepared_parts, start=1): print( f"[py-bot] transcribe part job={job_id} index={index}/{len(prepared_parts)} filename={part_name} size={len(part_bytes)} bytes", flush=True, ) part_text = self._openai_transcribe(part_bytes, part_name, prompt=prompt_tail).strip() if part_text: parts_text.append(part_text) prompt_tail = self._transcription_prompt_tail("\n".join(parts_text)) text = "\n".join(parts_text).strip() if not text: raise VoiceTranscriptionError( "сервис распознавания вернул пустой текст. Возможно, в записи нет слышимой речи или качество звука слишком низкое.", stage="empty_text", retryable=False, ) elapsed = self._format_duration(int(time.time() - started_at)) print(f"[py-bot] transcribe done job={job_id} chars={len(text)} elapsed={elapsed}", flush=True) return text def _download_telegram_file(self, file_id: str) -> tuple[bytes, str]: try: result = self.telegram.call("getFile", {"file_id": file_id}, timeout=120) except TimeoutError as e: raise VoiceTranscriptionError( "Telegram долго не отдавал информацию о файле.", stage="telegram_get_file_timeout", detail=str(e), ) from e except Exception as e: detail = str(e) if "file is too big" in detail.lower(): raise VoiceTranscriptionError( "Файл большой: я попробовал скачать его через текущий Telegram Bot API, " "но Telegram не дал это сделать. Для такого аудио нужен локальный `telegram-bot-api` " "сервер или другой способ передать файл боту.", stage="telegram_get_file_too_big", retryable=False, detail=detail, ) from e raise VoiceTranscriptionError( "не удалось получить информацию о файле из Telegram.", stage="telegram_get_file", detail=detail, ) from e info = result.get("result") or {} file_path = info.get("file_path") if not file_path: raise VoiceTranscriptionError( "Telegram не вернул путь к файлу.", stage="telegram_file_path", retryable=True, detail=json.dumps(info, ensure_ascii=False)[:1000], ) file_url = self.telegram.file_base + file_path.lstrip("/") req = request.Request(file_url, method="GET") try: with request.urlopen(req, timeout=self.cfg.telegram_file_download_timeout_seconds) as resp: data = resp.read() except TimeoutError as e: raise VoiceTranscriptionError( f"Telegram не успел отдать аудиофайл за {self.cfg.telegram_file_download_timeout_seconds} секунд.", stage="telegram_download_timeout", detail=str(e), ) from e except error.HTTPError as e: detail = e.read().decode("utf-8", errors="replace") raise VoiceTranscriptionError( f"Telegram вернул ошибку HTTP {e.code} при скачивании аудио.", stage="telegram_download_http", retryable=e.code >= 500 or e.code == 429, detail=detail[:1000], ) from e except error.URLError as e: raise VoiceTranscriptionError( "не удалось скачать аудиофайл из Telegram из-за сетевой ошибки.", stage="telegram_download_network", detail=str(e.reason), ) from e if not data: raise VoiceTranscriptionError( "Telegram отдал пустой аудиофайл.", stage="telegram_download_empty", retryable=True, ) original_name = Path(file_path).name or "audio.ogg" lower = original_name.lower() # OpenAI transcription может не принимать расширение .oga, нормализуем в .ogg. if lower.endswith(".oga"): base = original_name[:-4] if len(original_name) > 4 else "audio" normalized = f"{base}.ogg" else: normalized = original_name return data, normalized def _prepare_audio_parts_for_transcription( self, file_bytes: bytes, filename: str, *, duration_seconds: int, job_id: str, job_num: Any, ) -> list[tuple[bytes, str]]: needs_duration_chunking = duration_seconds > self.cfg.openai_transcribe_max_chunk_seconds if len(file_bytes) <= self.cfg.openai_transcribe_max_upload_bytes and not needs_duration_chunking: return [(file_bytes, filename)] ffmpeg_path = shutil.which(self.cfg.ffmpeg_bin) ffprobe_path = shutil.which(self.cfg.ffprobe_bin) if not ffmpeg_path or not ffprobe_path: raise VoiceTranscriptionError( "для длинного аудио нужен локальный `ffmpeg`/`ffprobe`, но они не найдены в системе.", stage="audio_prepare_tools_missing", retryable=False, ) with tempfile.TemporaryDirectory(prefix="shine-audio-") as tmpdir: tmp = Path(tmpdir) input_suffix = Path(filename).suffix or ".ogg" input_path = tmp / f"source{input_suffix}" input_path.write_bytes(file_bytes) prepared_path = tmp / "prepared.ogg" self._ffmpeg_reencode_audio(input_path, prepared_path) prepared_bytes = prepared_path.read_bytes() prepared_duration = self._ffprobe_duration_seconds(prepared_path) if ( len(prepared_bytes) <= self.cfg.openai_transcribe_max_upload_bytes and prepared_duration <= self.cfg.openai_transcribe_max_chunk_seconds ): return [(prepared_bytes, prepared_path.name)] chunk_length = self._choose_transcription_chunk_seconds(prepared_duration, len(prepared_bytes)) print( f"[py-bot] audio chunking job={job_id} num={job_num} duration={prepared_duration:.1f}s total_bytes={len(prepared_bytes)} chunk_seconds={chunk_length}", flush=True, ) chunks: list[tuple[bytes, str]] = [] offset = 0 index = 1 total_duration = max(1, int(prepared_duration + 0.999)) while offset < total_duration: chunk_path = tmp / f"chunk_{index:03d}.ogg" self._ffmpeg_extract_audio_chunk(prepared_path, chunk_path, offset, chunk_length) chunk_bytes = chunk_path.read_bytes() if not chunk_bytes: break if len(chunk_bytes) > self.cfg.openai_transcribe_max_upload_bytes: raise VoiceTranscriptionError( "локальная нарезка аудио дала слишком большой кусок для OpenAI; нужно уменьшить размер чанка.", stage="audio_chunk_too_large", retryable=False, ) chunks.append((chunk_bytes, chunk_path.name)) step = max(1, chunk_length - self.cfg.openai_transcribe_overlap_seconds) offset += step index += 1 if not chunks: raise VoiceTranscriptionError( "не удалось подготовить куски аудио для распознавания.", stage="audio_chunk_empty", retryable=False, ) return chunks def _ffmpeg_reencode_audio(self, input_path: Path, output_path: Path) -> None: cmd = [ self.cfg.ffmpeg_bin, "-y", "-i", str(input_path), "-vn", "-ac", "1", "-ar", "16000", "-c:a", "libopus", "-b:a", f"{self.cfg.openai_transcribe_reencode_bitrate_kbps}k", str(output_path), ] self._run_subprocess_checked(cmd, "audio_reencode_ffmpeg") def _ffmpeg_extract_audio_chunk(self, input_path: Path, output_path: Path, offset_seconds: int, chunk_seconds: int) -> None: cmd = [ self.cfg.ffmpeg_bin, "-y", "-ss", str(offset_seconds), "-t", str(chunk_seconds), "-i", str(input_path), "-vn", "-acodec", "copy", str(output_path), ] self._run_subprocess_checked(cmd, "audio_chunk_ffmpeg") def _ffprobe_duration_seconds(self, audio_path: Path) -> float: cmd = [ self.cfg.ffprobe_bin, "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", str(audio_path), ] try: result = subprocess.run( cmd, check=True, capture_output=True, text=True, timeout=self.cfg.openai_transcribe_ffmpeg_timeout_seconds, ) except subprocess.TimeoutExpired as e: raise VoiceTranscriptionError( f"`ffprobe` не успел определить длительность аудио за {self.cfg.openai_transcribe_ffmpeg_timeout_seconds} секунд.", stage="audio_probe_timeout", retryable=False, ) from e except subprocess.CalledProcessError as e: detail = (e.stderr or e.stdout or "").strip() raise VoiceTranscriptionError( "не удалось определить длительность аудио через `ffprobe`.", stage="audio_probe_failed", retryable=False, detail=detail[:1500], ) from e raw = (result.stdout or "").strip() try: return max(0.0, float(raw)) except ValueError as e: raise VoiceTranscriptionError( "`ffprobe` вернул некорректную длительность аудио.", stage="audio_probe_invalid", retryable=False, detail=raw[:300], ) from e def _run_subprocess_checked(self, cmd: list[str], stage: str) -> None: try: subprocess.run( cmd, check=True, capture_output=True, text=True, timeout=self.cfg.openai_transcribe_ffmpeg_timeout_seconds, ) except subprocess.TimeoutExpired as e: raise VoiceTranscriptionError( f"локальная обработка аудио не успела завершиться за {self.cfg.openai_transcribe_ffmpeg_timeout_seconds} секунд.", stage=f"{stage}_timeout", retryable=False, ) from e except subprocess.CalledProcessError as e: detail = (e.stderr or e.stdout or "").strip() raise VoiceTranscriptionError( "локальная обработка аудио через `ffmpeg` завершилась с ошибкой.", stage=f"{stage}_failed", retryable=False, detail=detail[:1500], ) from e def _choose_transcription_chunk_seconds(self, duration_seconds: float, total_bytes: int) -> int: max_chunk = self.cfg.openai_transcribe_max_chunk_seconds safe_seconds = max(60, max_chunk - self.cfg.openai_transcribe_overlap_seconds) if duration_seconds <= 0 or total_bytes <= 0: return safe_seconds bytes_per_second = total_bytes / max(duration_seconds, 1.0) if bytes_per_second <= 0: return safe_seconds size_limited = int((self.cfg.openai_transcribe_max_upload_bytes * 0.9) / bytes_per_second) return max(60, min(safe_seconds, size_limited if size_limited > 0 else safe_seconds)) @staticmethod def _transcription_prompt_tail(text: str, limit: int = 1000) -> str: source = compact_spaces(text) if len(source) <= limit: return source return source[-limit:] def _telegram_cloud_download_is_likely_too_big(self, file_size: int) -> bool: if file_size <= 0: return False using_cloud_api = self.cfg.telegram_api_base_url.rstrip("/") == "https://api.telegram.org" return using_cloud_api and file_size > 20 * 1024 * 1024 @staticmethod def _bytes_to_mb(value: int) -> str: return f"{value / (1024 * 1024):.1f}" def _openai_transcribe(self, file_bytes: bytes, filename: str, prompt: str = "") -> str: boundary = "----shine-boundary-" + "".join(random.choices("abcdef0123456789", k=16)) mime = mimetypes.guess_type(filename)[0] or "application/octet-stream" def text_part(name: str, value: str) -> bytes: return ( f"--{boundary}\r\n" f'Content-Disposition: form-data; name="{name}"\r\n\r\n' f"{value}\r\n" ).encode("utf-8") body = bytearray() body.extend(text_part("model", self.cfg.openai_transcribe_model)) body.extend(text_part("response_format", "text")) prompt = compact_spaces(prompt) if prompt: body.extend(text_part("prompt", prompt[:1000])) body.extend( ( f"--{boundary}\r\n" f'Content-Disposition: form-data; name="file"; filename="{filename}"\r\n' f"Content-Type: {mime}\r\n\r\n" ).encode("utf-8") ) body.extend(file_bytes) body.extend(b"\r\n") body.extend(f"--{boundary}--\r\n".encode("utf-8")) req = request.Request("https://api.openai.com/v1/audio/transcriptions", method="POST", data=bytes(body)) req.add_header("Authorization", f"Bearer {self.cfg.openai_api_key}") req.add_header("Content-Type", f"multipart/form-data; boundary={boundary}") try: with request.urlopen(req, timeout=self.cfg.openai_transcribe_timeout_seconds) as resp: return resp.read().decode("utf-8", errors="replace") except TimeoutError as e: raise VoiceTranscriptionError( f"OpenAI не успел распознать аудио за {self.cfg.openai_transcribe_timeout_seconds} секунд.", stage="openai_transcribe_timeout", detail=str(e), ) from e except error.HTTPError as e: detail = e.read().decode("utf-8", errors="replace") if e.code == 400: user_message = "OpenAI не принял аудиофайл для распознавания." elif e.code == 401: user_message = "OpenAI отклонил ключ API для распознавания." elif e.code == 413: user_message = "аудиофайл слишком большой для распознавания OpenAI." elif e.code == 429: user_message = "OpenAI временно ограничил распознавание из-за лимита запросов." elif e.code >= 500: user_message = "OpenAI временно не смог обработать распознавание." else: user_message = f"OpenAI вернул ошибку HTTP {e.code} при распознавании." raise VoiceTranscriptionError( user_message, stage="openai_transcribe_http", retryable=e.code == 429 or e.code >= 500, detail=detail[:1500], ) from e except error.URLError as e: raise VoiceTranscriptionError( "не удалось отправить аудио в OpenAI из-за сетевой ошибки.", stage="openai_transcribe_network", detail=str(e.reason), ) from e @staticmethod def _extract_codex_user_note(line: str) -> str | None: s = (line or "").strip() if not s.startswith("{"): return None try: obj = json.loads(s) except Exception: return None if obj.get("type") != "item.completed": return None item = obj.get("item") or {} if item.get("type") != "agent_message": return None text = (item.get("text") or "").strip() if not text: return None if len(text) > 220: return text[:220] + "..." return text @staticmethod def _extract_fallback_message(lines: list[str]) -> str: for line in reversed(lines): line = line.strip() if not line: continue if line.startswith("{") and '"type":' in line: continue if line.startswith("mcp:") or line.startswith("OpenAI Codex"): continue return line return "" @staticmethod def _extract_codex_thread_id(line: str) -> str: s = (line or "").strip() if not s.startswith("{"): return "" try: obj = json.loads(s) except Exception: return "" if obj.get("type") != "thread.started": return "" thread_id = (obj.get("thread_id") or "").strip() return thread_id @staticmethod def _is_missing_codex_session_error(text: str) -> bool: lowered = (text or "").lower() markers = [ "session not found", "conversation not found", "thread not found", "no session found", "invalid session", "unknown session", "no conversation found", "unknown thread", ] return any(marker in lowered for marker in markers) @staticmethod def _format_duration(seconds: int) -> str: seconds = max(0, seconds) minutes, sec = divmod(seconds, 60) hours, minutes = divmod(minutes, 60) if hours: return f"{hours}ч {minutes}м {sec}с" if minutes: return f"{minutes}м {sec}с" return f"{sec}с" def run_selftest(config: BotConfig, prompt: str) -> int: cmd = [ str(config.codex_bin), "exec", "--dangerously-bypass-approvals-and-sandbox", "--json", "-C", str(config.codex_workdir), prompt, ] proc = subprocess.run( cmd, stdin=subprocess.DEVNULL, capture_output=True, text=True, encoding="utf-8", errors="replace", ) print(proc.stdout) if proc.stderr: print(proc.stderr) return proc.returncode def main() -> int: parser = argparse.ArgumentParser(description="SHiNE Python Telegram bot wrapper for Codex CLI") parser.add_argument("--selftest-codex", default="", help="Выполнить только codex exec с этим prompt и выйти") args = parser.parse_args() root = Path(__file__).resolve().parent cfg = BotConfig(root) if args.selftest_codex: return run_selftest(cfg, args.selftest_codex) service = ShinePyBotService(cfg) try: service.run() except KeyboardInterrupt: service.shutdown() return 0 except Exception as e: print(f"[py-bot] FATAL: {e}", flush=True) return 1 return 0 if __name__ == "__main__": raise SystemExit(main())