#!/usr/bin/env python3 from __future__ import annotations import argparse import datetime as dt import fcntl import json import mimetypes import os import random import re import string import subprocess import tempfile import threading import time import traceback import uuid from pathlib import Path from typing import Any from urllib import error, request DEFAULT_ALLOWED_PLAYERS = ",".join([ "malvviiina:Милана", "zodiaktechnika32:Сергей", "oidasyda:Иван", "blackbyrd1:Ворон", "dimasol1:Дима", ]) def now_iso() -> str: return dt.datetime.now(dt.timezone.utc).isoformat() def normalize_username(value: str | None) -> str: if not value: return "" value = value.strip() if value.startswith("@"): value = value[1:] return value.lower() def parse_allowed_players(raw: str) -> dict[str, str]: players: dict[str, str] = {} for item in (raw or "").split(","): part = item.strip() if not part: continue username_part, sep, name_part = part.partition(":") username = normalize_username(username_part) if not username: continue player_name = (name_part if sep else username_part).strip() or username players[username] = player_name return players def split_long_text(text: str, chunk_size: int = 3500) -> list[str]: text = (text or "").strip() if not text: return ["(пустой ответ)"] return [text[i:i + chunk_size] for i in range(0, len(text), chunk_size)] def split_text_for_tts(text: str, chunk_size: int) -> list[str]: text = (text or "").strip() if not text: return [] chunks: list[str] = [] current = "" paragraphs = re.split(r"\n\s*\n", text) for paragraph in paragraphs: paragraph = paragraph.strip() if not paragraph: continue if len(paragraph) > chunk_size: if current: chunks.append(current) current = "" for i in range(0, len(paragraph), chunk_size): part = paragraph[i:i + chunk_size].strip() if part: chunks.append(part) continue candidate = paragraph if not current else f"{current}\n\n{paragraph}" if len(candidate) <= chunk_size: current = candidate else: if current: chunks.append(current) current = paragraph if current: chunks.append(current) return chunks def read_env_file(path: Path) -> dict[str, str]: result: dict[str, str] = {} if not path.exists(): return result for raw_line in path.read_text(encoding="utf-8").splitlines(): line = raw_line.strip() if not line or line.startswith("#") or "=" not in line: continue key, value = line.split("=", 1) key = key.strip() value = value.strip().strip('"').strip("'") result[key] = value return result class VoiceTranscriptionError(RuntimeError): def __init__( self, user_message: str, *, stage: str, retryable: bool = True, detail: str = "", ): super().__init__(user_message) self.user_message = user_message self.stage = stage self.retryable = retryable self.detail = detail def log_text(self) -> str: if self.detail and self.detail != self.user_message: return f"{self.user_message} stage={self.stage} retryable={self.retryable} detail={self.detail}" return f"{self.user_message} stage={self.stage} retryable={self.retryable}" class VoiceReplyError(RuntimeError): pass class JsonLineStore: @staticmethod def load(path: Path) -> list[dict[str, Any]]: if not path.exists(): return [] items: list[dict[str, Any]] = [] for line in path.read_text(encoding="utf-8").splitlines(): line = line.strip() if not line: continue items.append(json.loads(line)) return items @staticmethod def save(path: Path, items: list[dict[str, Any]]) -> None: path.parent.mkdir(parents=True, exist_ok=True) tmp = path.with_suffix(path.suffix + ".tmp") with tmp.open("w", encoding="utf-8") as f: for item in items: f.write(json.dumps(item, ensure_ascii=False) + "\n") tmp.replace(path) @staticmethod def append(path: Path, item: dict[str, Any]) -> None: path.parent.mkdir(parents=True, exist_ok=True) with path.open("a", encoding="utf-8") as f: f.write(json.dumps(item, ensure_ascii=False) + "\n") class TelegramApi: def __init__(self, token: str): self.base = f"https://api.telegram.org/bot{token}/" def call(self, method: str, payload: dict[str, Any] | None = None, timeout: int = 60) -> dict[str, Any]: data = None headers = {} if payload is not None: data = json.dumps(payload).encode("utf-8") headers["Content-Type"] = "application/json" req = request.Request(self.base + method, data=data, headers=headers, method="POST") try: with request.urlopen(req, timeout=timeout) as resp: raw = resp.read().decode("utf-8") except error.HTTPError as e: body = e.read().decode("utf-8", errors="replace") raise RuntimeError(f"Telegram HTTP {e.code}: {body}") from e except Exception as e: raise RuntimeError(f"Telegram request failed: {e}") from e result = json.loads(raw) if not result.get("ok"): raise RuntimeError(f"Telegram API error: {result}") return result def call_multipart( self, method: str, fields: dict[str, Any], files: dict[str, tuple[str, bytes, str]], timeout: int = 120, ) -> dict[str, Any]: boundary = "----shine-tg-boundary-" + "".join(random.choices("abcdef0123456789", k=16)) body = bytearray() for name, value in fields.items(): if value is None: continue body.extend( ( f"--{boundary}\r\n" f'Content-Disposition: form-data; name="{name}"\r\n\r\n' f"{value}\r\n" ).encode("utf-8") ) for name, (filename, data, mime) in files.items(): body.extend( ( f"--{boundary}\r\n" f'Content-Disposition: form-data; name="{name}"; filename="{filename}"\r\n' f"Content-Type: {mime}\r\n\r\n" ).encode("utf-8") ) body.extend(data) body.extend(b"\r\n") body.extend(f"--{boundary}--\r\n".encode("utf-8")) req = request.Request(self.base + method, data=bytes(body), method="POST") req.add_header("Content-Type", f"multipart/form-data; boundary={boundary}") try: with request.urlopen(req, timeout=timeout) as resp: raw = resp.read().decode("utf-8") except error.HTTPError as e: body_text = e.read().decode("utf-8", errors="replace") raise RuntimeError(f"Telegram HTTP {e.code}: {body_text}") from e except Exception as e: raise RuntimeError(f"Telegram multipart request failed: {e}") from e result = json.loads(raw) if not result.get("ok"): raise RuntimeError(f"Telegram API error: {result}") return result def get_updates(self, offset: int | None, timeout_sec: int) -> list[dict[str, Any]]: payload: dict[str, Any] = {"timeout": timeout_sec, "allowed_updates": ["message", "channel_post"]} if offset is not None: payload["offset"] = offset result = self.call("getUpdates", payload=payload, timeout=timeout_sec + 15) return result.get("result", []) def send_message(self, chat_id: int | str, text: str, reply_to_message_id: int | None = None) -> dict[str, Any]: payload: dict[str, Any] = {"chat_id": chat_id, "text": text} if reply_to_message_id is not None: payload["reply_to_message_id"] = reply_to_message_id return self.call("sendMessage", payload=payload, timeout=30) def send_voice( self, chat_id: int | str, voice: str, caption: str = "", reply_to_message_id: int | None = None, ) -> dict[str, Any]: payload: dict[str, Any] = {"chat_id": chat_id, "voice": voice} if caption: payload["caption"] = caption if reply_to_message_id is not None: payload["reply_to_message_id"] = reply_to_message_id return self.call("sendVoice", payload=payload, timeout=60) def send_audio( self, chat_id: int | str, audio: str, caption: str = "", reply_to_message_id: int | None = None, ) -> dict[str, Any]: payload: dict[str, Any] = {"chat_id": chat_id, "audio": audio} if caption: payload["caption"] = caption if reply_to_message_id is not None: payload["reply_to_message_id"] = reply_to_message_id return self.call("sendAudio", payload=payload, timeout=60) def send_voice_upload( self, chat_id: int | str, voice_bytes: bytes, filename: str, caption: str = "", reply_to_message_id: int | None = None, ) -> dict[str, Any]: fields: dict[str, Any] = {"chat_id": chat_id} if caption: fields["caption"] = caption if reply_to_message_id is not None: fields["reply_to_message_id"] = reply_to_message_id return self.call_multipart( "sendVoice", fields=fields, files={"voice": (filename, voice_bytes, "audio/ogg")}, timeout=180, ) def delete_webhook(self) -> None: self.call("deleteWebhook", payload={"drop_pending_updates": False}, timeout=30) class BotConfig: def __init__(self, root_dir: Path): env = dict(os.environ) env.update(read_env_file(root_dir / ".env")) self.root_dir = root_dir self.telegram_bot_token = self._required(env, "TELEGRAM_BOT_TOKEN") self.allowed_username = normalize_username(env.get("ALLOWED_TELEGRAM_USERNAME", "AidarKC")) self.allowed_players = parse_allowed_players(env.get("ALLOWED_TELEGRAM_PLAYERS", DEFAULT_ALLOWED_PLAYERS)) self.allowed_channel_username = normalize_username(env.get("ALLOWED_TELEGRAM_CHANNEL_USERNAME", "shine_writing")) self.bot_username = env.get("BOT_USERNAME", "aidar_su_bot") self.openai_api_key = env.get("OPENAI_API_KEY", "").strip() self.openai_transcribe_model = env.get("OPENAI_TRANSCRIBE_MODEL", "gpt-4o-mini-transcribe") self.telegram_file_download_timeout_seconds = int(env.get("TELEGRAM_FILE_DOWNLOAD_TIMEOUT_SECONDS", "300")) self.openai_transcribe_timeout_seconds = int(env.get("OPENAI_TRANSCRIBE_TIMEOUT_SECONDS", "900")) self.openai_tts_model = env.get("OPENAI_TTS_MODEL", "gpt-4o-mini-tts") self.openai_tts_voice = env.get("OPENAI_TTS_VOICE", "alloy") self.openai_tts_response_format = env.get("OPENAI_TTS_RESPONSE_FORMAT", "opus") self.openai_tts_timeout_seconds = int(env.get("OPENAI_TTS_TIMEOUT_SECONDS", "180")) self.openai_tts_chunk_chars = max(500, int(env.get("OPENAI_TTS_CHUNK_CHARS", "3500"))) self.codex_bin = Path(env.get( "CODEX_BIN", "/home/ai/.cache/JetBrains/IntelliJIdea2026.1/aia/codex/bin/codex-x86_64-unknown-linux-musl" )) self.codex_workdir = Path(env.get("CODEX_WORKDIR", "/home/ai/work/SHiNE/SHiNE-server-sha256")) self.codex_timeout_seconds = int(env.get("CODEX_TIMEOUT_SECONDS", "900")) self.max_retries = max(1, int(env.get("MAX_RETRIES", "3"))) self.data_dir = (root_dir / env.get("DATA_DIR", "./data")).resolve() self.agent_instructions_file = (root_dir / "AGENT.md").resolve() @staticmethod def _required(env: dict[str, str], key: str) -> str: value = env.get(key, "").strip() if not value: raise RuntimeError(f"Не задан обязательный параметр: {key}") return value class ShinePyBotService: def __init__(self, config: BotConfig): self.cfg = config self.telegram = TelegramApi(config.telegram_bot_token) self.queue_file = config.data_dir / "py_queue.jsonl" self.state_file = config.data_dir / "py_state.json" self.processed_updates_file = config.data_dir / "py_processed_updates.log" self.lock_file = config.data_dir / "py_app.lock" self.history_dir = config.data_dir / "history" self.history_archive_dir = self.history_dir / "archive" self.max_processed_updates = 5000 self.queue_lock = threading.RLock() self.stop_event = threading.Event() self.worker = threading.Thread(target=self._worker_loop, name="shine-py-bot-worker", daemon=True) self.queue: list[dict[str, Any]] = [] self.state: dict[str, Any] = {} self.processed_updates: list[str] = [] self.active_job_id: str | None = None self.active_job_started_at: float | None = None self.active_process: subprocess.Popen[str] | None = None self.active_process_lock = threading.Lock() self.stop_current_job = False self.lock_fd = None self.last_heartbeat_at: float = 0.0 self.restart_requested = False def _is_owner(self, username: str) -> bool: return normalize_username(username) == self.cfg.allowed_username def _is_allowed_player(self, username: str) -> bool: return normalize_username(username) in self.cfg.allowed_players def _is_allowed_user(self, username: str) -> bool: uname = normalize_username(username) return uname == self.cfg.allowed_username or uname in self.cfg.allowed_players def _player_name(self, username: str) -> str: uname = normalize_username(username) return self.cfg.allowed_players.get(uname, uname) def run(self) -> None: self._ensure_dirs() self._acquire_single_instance_lock() self._load_state() self._load_queue() self._load_processed_updates() self._recover_active_jobs_after_restart() self.telegram.delete_webhook() self._init_offset_if_missing() self.worker.start() self._append_history_event("service_started", {"allowedUsername": self.cfg.allowed_username}) print(f"[py-bot] Запущен. allowed user: @{self.cfg.allowed_username}", flush=True) try: while not self.stop_event.is_set(): try: offset = self.state.get("offset") updates = self.telegram.get_updates(offset=offset, timeout_sec=25) except Exception as e: print(f"[py-bot] Ошибка getUpdates: {e}", flush=True) time.sleep(2) continue for update in updates: update_id = update.get("update_id") if isinstance(update_id, int): self.state["offset"] = update_id + 1 self._persist_state() self._handle_update(update) finally: self.shutdown() def shutdown(self) -> None: if self.stop_event.is_set(): pass self.stop_event.set() self._stop_active_codex_process() if self.worker.is_alive(): self.worker.join(timeout=10) if self.lock_fd is not None: try: fcntl.flock(self.lock_fd, fcntl.LOCK_UN) finally: self.lock_fd.close() self.lock_fd = None self._append_history_event("service_stopped", {}) def _ensure_dirs(self) -> None: self.cfg.data_dir.mkdir(parents=True, exist_ok=True) self.history_dir.mkdir(parents=True, exist_ok=True) self.history_archive_dir.mkdir(parents=True, exist_ok=True) def _acquire_single_instance_lock(self) -> None: self.lock_file.parent.mkdir(parents=True, exist_ok=True) self.lock_fd = self.lock_file.open("a+") try: fcntl.flock(self.lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) except BlockingIOError: raise RuntimeError(f"Уже запущен другой инстанс (lock: {self.lock_file})") def _load_state(self) -> None: if self.state_file.exists(): self.state = json.loads(self.state_file.read_text(encoding="utf-8")) else: self.state = {} sessions = self.state.get("user_sessions") if not isinstance(sessions, dict): sessions = {} self.state["user_sessions"] = sessions user_settings = self.state.get("user_settings") if not isinstance(user_settings, dict): user_settings = {} self.state["user_settings"] = user_settings if not self.state.get("current_history_file"): history_file = self._create_new_history_file("initial", self.cfg.allowed_username) self.state["current_history_file"] = str(history_file) sessions[self.cfg.allowed_username] = {"current_history_file": str(history_file)} elif self.cfg.allowed_username not in sessions: sessions[self.cfg.allowed_username] = {"current_history_file": str(self.state["current_history_file"])} if not isinstance(self.state.get("next_job_number"), int): self.state["next_job_number"] = 1 self.state["updated_at"] = now_iso() self._persist_state() def _persist_state(self) -> None: self.state["updated_at"] = now_iso() tmp = self.state_file.with_suffix(".tmp") tmp.write_text(json.dumps(self.state, ensure_ascii=False, indent=2), encoding="utf-8") tmp.replace(self.state_file) def _load_queue(self) -> None: self.queue = JsonLineStore.load(self.queue_file) def _persist_queue(self) -> None: JsonLineStore.save(self.queue_file, self.queue) def _load_processed_updates(self) -> None: if not self.processed_updates_file.exists(): self.processed_updates = [] return lines = [x.strip() for x in self.processed_updates_file.read_text(encoding="utf-8").splitlines() if x.strip()] if len(lines) > self.max_processed_updates: lines = lines[-self.max_processed_updates:] self.processed_updates_file.write_text("\n".join(lines) + "\n", encoding="utf-8") self.processed_updates = lines def _mark_processed_update(self, update_key: str) -> bool: if update_key in self.processed_updates: return True self.processed_updates.append(update_key) if len(self.processed_updates) > self.max_processed_updates: self.processed_updates = self.processed_updates[-self.max_processed_updates:] self.processed_updates_file.write_text("\n".join(self.processed_updates) + "\n", encoding="utf-8") else: with self.processed_updates_file.open("a", encoding="utf-8") as f: f.write(update_key + "\n") return False def _recover_active_jobs_after_restart(self) -> None: recovered_ids: list[str] = [] for job in self.queue: if job.get("status") == "active": job["status"] = "pending" job["retry_reason"] = "service_restart_recovery" job["updated_at"] = now_iso() recovered_ids.append(job.get("id", "")) if recovered_ids: self._persist_queue() self._append_history_event("active_jobs_recovered", {"jobIds": recovered_ids}) def _init_offset_if_missing(self) -> None: if self.state.get("offset") is not None: return try: updates = self.telegram.get_updates(offset=None, timeout_sec=0) if updates: self.state["offset"] = int(updates[-1]["update_id"]) + 1 else: self.state["offset"] = 0 self._persist_state() except Exception as e: print(f"[py-bot] Не удалось инициализировать offset: {e}", flush=True) self.state["offset"] = 0 self._persist_state() def _current_history_file(self) -> Path: return self._current_history_file_for_user(self.cfg.allowed_username) def _history_dirs_for_user(self, username: str) -> tuple[Path, Path]: uname = normalize_username(username) or "unknown" history_dir = self.history_dir / uname archive_dir = history_dir / "archive" history_dir.mkdir(parents=True, exist_ok=True) archive_dir.mkdir(parents=True, exist_ok=True) return history_dir, archive_dir def _ensure_user_session(self, username: str) -> None: uname = normalize_username(username) or self.cfg.allowed_username sessions = self.state.setdefault("user_sessions", {}) if not isinstance(sessions, dict): sessions = {} self.state["user_sessions"] = sessions self._user_settings(uname) session = sessions.get(uname) if isinstance(session, dict) and session.get("current_history_file"): return history_file = self._create_new_history_file("initial", uname) sessions[uname] = {"current_history_file": str(history_file)} if uname == self.cfg.allowed_username: self.state["current_history_file"] = str(history_file) self._persist_state() def _current_history_file_for_user(self, username: str) -> Path: uname = normalize_username(username) or self.cfg.allowed_username self._ensure_user_session(uname) sessions = self.state.get("user_sessions") or {} session = sessions.get(uname) or {} return Path(session["current_history_file"]) def _create_new_history_file(self, reason: str, username: str) -> Path: ts = dt.datetime.now().strftime("%Y-%m-%d_%H%M%S") rnd = "".join(random.choices(string.hexdigits.lower(), k=8)) history_dir, _ = self._history_dirs_for_user(username) path = history_dir / f"{ts}_{rnd}.jsonl" JsonLineStore.append(path, { "ts": now_iso(), "type": "history_created", "reason": reason, "username": normalize_username(username), }) return path def _rotate_history(self, reason: str, username: str) -> Path: uname = normalize_username(username) or self.cfg.allowed_username current = self._current_history_file_for_user(uname) _, archive_dir = self._history_dirs_for_user(uname) if current.exists(): archived = archive_dir / current.name current.replace(archived) else: archived = archive_dir / "(empty)" new_file = self._create_new_history_file(reason, uname) sessions = self.state.setdefault("user_sessions", {}) if not isinstance(sessions, dict): sessions = {} self.state["user_sessions"] = sessions sessions[uname] = {"current_history_file": str(new_file)} if uname == self.cfg.allowed_username: self.state["current_history_file"] = str(new_file) self._persist_state() self._append_history_event("history_rotated", {"reason": reason, "username": uname, "archived": str(archived)}, username=uname) return archived def _user_settings(self, username: str) -> dict[str, Any]: uname = normalize_username(username) or self.cfg.allowed_username settings = self.state.get("user_settings") if not isinstance(settings, dict): settings = {} self.state["user_settings"] = settings user_settings = settings.get(uname) if not isinstance(user_settings, dict): user_settings = {} settings[uname] = user_settings if not isinstance(user_settings.get("voice_replies_enabled"), bool): user_settings["voice_replies_enabled"] = False return user_settings def _voice_replies_enabled(self, username: str) -> bool: return bool(self._user_settings(username).get("voice_replies_enabled")) def _set_voice_replies_enabled(self, username: str, enabled: bool) -> None: self._user_settings(username)["voice_replies_enabled"] = enabled self._persist_state() def _append_history(self, history_path: Path, event_type: str, payload: dict[str, Any]) -> None: row = {"ts": now_iso(), "type": event_type} row.update(payload) JsonLineStore.append(history_path, row) def _append_history_event(self, event_type: str, payload: dict[str, Any], username: str | None = None) -> None: history_path = self._current_history_file_for_user(username or self.cfg.allowed_username) self._append_history(history_path, "system_event", {"event": event_type, **payload}) def _send_player_welcome_once(self, chat_id: int, message_id: int, username: str) -> None: uname = normalize_username(username) sent = self.state.get("player_welcome_sent") if not isinstance(sent, dict): sent = {} self.state["player_welcome_sent"] = sent if sent.get(uname): return player_name = self._player_name(uname) text = ( f"Привет, {player_name}.\n" "Можно задавать вопросы по проекту, просить анализ, идеи и подготовку готового ТЗ.\n" "Команда /new начинает новую сессию и архивирует текущую историю." ) self._safe_send(chat_id, text, reply_to=message_id) sent[uname] = now_iso() self._persist_state() def _resolve_chat_id(self, chat_id: int) -> int: migrations = self.state.get("chat_id_migrations") if not isinstance(migrations, dict): return chat_id current = chat_id visited: set[int] = set() while current not in visited: visited.add(current) next_chat_id = migrations.get(str(current)) if not isinstance(next_chat_id, int): break current = next_chat_id return current def _remember_chat_migration(self, old_chat_id: int, new_chat_id: int, source: str) -> None: if old_chat_id == new_chat_id: return migrations = self.state.get("chat_id_migrations") if not isinstance(migrations, dict): migrations = {} self.state["chat_id_migrations"] = migrations if migrations.get(str(old_chat_id)) == new_chat_id: return migrations[str(old_chat_id)] = new_chat_id self._persist_state() with self.queue_lock: changed = False for job in self.queue: if job.get("chat_id") == old_chat_id: job["chat_id"] = new_chat_id job["updated_at"] = now_iso() changed = True if changed: self._persist_queue() self._append_history_event("chat_migrated_to_supergroup", { "oldChatId": old_chat_id, "newChatId": new_chat_id, "source": source, }) @staticmethod def _extract_migrate_to_chat_id(error_text: str) -> int | None: match = re.search(r'"migrate_to_chat_id"\s*:\s*(-?\d+)', error_text) if match: return int(match.group(1)) match = re.search(r"'migrate_to_chat_id'\s*:\s*(-?\d+)", error_text) if match: return int(match.group(1)) return None def _handle_update(self, update: dict[str, Any]) -> None: message = update.get("message") update_type = "message" if not isinstance(message, dict): message = update.get("channel_post") update_type = "channel_post" if not isinstance(message, dict): return chat = message.get("chat") or {} chat_id = chat.get("id") message_id = message.get("message_id") chat_type = str(chat.get("type") or "") chat_username = normalize_username(chat.get("username")) chat_title = str(chat.get("title") or "") sender = message.get("from") or {} username = normalize_username(sender.get("username")) author_signature = str(message.get("author_signature") or "").strip() author_username = username or normalize_username(author_signature) if not isinstance(chat_id, int) or not isinstance(message_id, int): return migrate_to_chat_id = message.get("migrate_to_chat_id") if isinstance(migrate_to_chat_id, int): self._remember_chat_migration(chat_id, migrate_to_chat_id, "telegram_message") return update_key = f"{chat_id}:{message_id}" if self._mark_processed_update(update_key): return is_channel_post = update_type == "channel_post" or chat_type == "channel" is_group_message = update_type == "message" and chat_type in ("group", "supergroup") is_allowed_channel = ( not is_channel_post or not self.cfg.allowed_channel_username or chat_username == self.cfg.allowed_channel_username ) if is_channel_post and not is_allowed_channel: return if chat_username and chat_username == self.cfg.allowed_channel_username: self._remember_public_report_chat(chat_id) # Игнорируем системные сообщения о входе/выходе и смене заголовка/фото. if message.get("new_chat_members") or message.get("left_chat_member"): return if message.get("group_chat_created") or message.get("supergroup_chat_created") or message.get("channel_chat_created"): return text = (message.get("text") or message.get("caption") or "").strip() actor_username = normalize_username(author_username) is_allowed = self._is_allowed_user(actor_username) is_private = chat_type == "private" if not is_allowed: if is_private: self._safe_send(chat_id, "Извините, доступ к этому агенту пока не выдан. Обратитесь к Айдару.", reply_to=message_id) return if self._is_allowed_player(actor_username) and not is_private: return self._ensure_user_session(actor_username) history_path = self._current_history_file_for_user(actor_username) if self._is_allowed_player(actor_username): self._send_player_welcome_once(chat_id, message_id, actor_username) if not text: if message.get("voice"): self._enqueue_voice_job( chat_id, message_id, actor_username, message["voice"].get("file_id"), media_type="voice", update_type=update_type, chat_username=chat_username, chat_title=chat_title, author_signature=author_signature, chat_type=chat_type, ) return if message.get("audio"): self._enqueue_voice_job( chat_id, message_id, actor_username, message["audio"].get("file_id"), media_type="audio", update_type=update_type, chat_username=chat_username, chat_title=chat_title, author_signature=author_signature, chat_type=chat_type, ) return self._safe_send(chat_id, "Поддерживаются текст, voice и audio.", reply_to=message_id) return if text.startswith("/"): self._handle_command(chat_id, message_id, actor_username, text) return self._append_history(history_path, "incoming_text", { "chatId": chat_id, "messageId": message_id, "updateType": update_type, "chatType": chat_type, "chatUsername": chat_username, "chatTitle": chat_title, "username": actor_username, "authorSignature": author_signature, "text": text, }) job = self._build_job_base(chat_id, message_id, actor_username, str(history_path)) job["type"] = "text" job["text"] = text job["update_type"] = update_type job["chat_type"] = chat_type job["chat_username"] = chat_username job["chat_title"] = chat_title job["author_signature"] = author_signature job["role"] = "owner" if self._is_owner(actor_username) else "player" job["player_name"] = self._player_name(actor_username) if job["role"] == "player" else "" with self.queue_lock: self.queue.append(job) self._persist_queue() self._safe_send(chat_id, f"Принял задачу #{job['num']}", reply_to=message_id) def _enqueue_voice_job( self, chat_id: int, message_id: int, username: str, file_id: str | None, *, media_type: str = "voice", update_type: str = "message", chat_username: str = "", chat_title: str = "", author_signature: str = "", chat_type: str = "", ) -> None: if not file_id: self._safe_send(chat_id, "Не удалось прочитать file_id голосового.", reply_to=message_id) return history_path = self._current_history_file_for_user(username) self._append_history(history_path, "incoming_voice", { "chatId": chat_id, "messageId": message_id, "updateType": update_type, "chatType": chat_type, "chatUsername": chat_username, "chatTitle": chat_title, "username": username, "authorSignature": author_signature, "fileId": file_id, "mediaType": media_type, }) job = self._build_job_base(chat_id, message_id, username, str(history_path)) job["type"] = "voice" job["telegram_file_id"] = file_id job["telegram_media_type"] = media_type job["update_type"] = update_type job["chat_type"] = chat_type job["chat_username"] = chat_username job["chat_title"] = chat_title job["author_signature"] = author_signature job["role"] = "owner" if self._is_owner(username) else "player" job["player_name"] = self._player_name(username) if job["role"] == "player" else "" with self.queue_lock: self.queue.append(job) self._persist_queue() self._safe_send(chat_id, f"Принял voice в задачу #{job['num']}", reply_to=message_id) def _build_job_base(self, chat_id: int, message_id: int, username: str, history_file: str) -> dict[str, Any]: with self.queue_lock: num = int(self.state.get("next_job_number", 1)) self.state["next_job_number"] = num + 1 self._persist_state() return { "id": str(uuid.uuid4()), "num": num, "status": "pending", "type": "text", "chat_id": chat_id, "message_id": message_id, "username": username, "update_type": "message", "chat_type": "", "chat_username": "", "chat_title": "", "author_signature": "", "role": "owner", "player_name": "", "text": "", "telegram_file_id": "", "telegram_media_type": "", "history_file": history_file, "attempts": 0, "retry_reason": "", "last_error": "", "created_at": now_iso(), "updated_at": now_iso(), "active_since": None, } def _handle_command(self, chat_id: int, message_id: int, username: str, text: str) -> None: lower = text.lower() command = lower.split(maxsplit=1)[0].split("@", 1)[0] is_owner = self._is_owner(username) if command in ("/start", "/help"): self._safe_send(chat_id, self._help_text(is_owner=is_owner), reply_to=message_id) return if command == "/status": self._safe_send(chat_id, self._status_text(), reply_to=message_id) return if command == "/queue": self._safe_send(chat_id, self._queue_text(), reply_to=message_id) return if command == "/voice_on": self._set_voice_replies_enabled(username, True) self._append_history_event("voice_replies_enabled", {"username": normalize_username(username)}, username=username) self._safe_send(chat_id, "Озвучивание финальных ответов включено для вашего пользователя.", reply_to=message_id) return if command == "/voice_off": self._set_voice_replies_enabled(username, False) self._append_history_event("voice_replies_disabled", {"username": normalize_username(username)}, username=username) self._safe_send(chat_id, "Озвучивание финальных ответов выключено для вашего пользователя.", reply_to=message_id) return if command == "/voice_status": status = "включено" if self._voice_replies_enabled(username) else "выключено" self._safe_send(chat_id, f"Озвучивание финальных ответов: {status}.", reply_to=message_id) return if command == "/new": archived = self._rotate_history("command_new", username) self._safe_send(chat_id, f"История очищена. Новый диалог начат.\nАрхив: {archived.name}", reply_to=message_id) return if command in ("/restart_service", "/restart"): if not is_owner: self._safe_send(chat_id, "Команда недоступна.", reply_to=message_id) return self._append_history_event("restart_service_requested", { "chatId": chat_id, "messageId": message_id, "username": username, }, username=username) self._safe_send( chat_id, "Перезапускаю сервис. Если задача была активна, после старта она вернётся в очередь и продолжится.", reply_to=message_id, ) self._schedule_self_restart() return if command == "/stop": stopped = self._cancel_active_job("stopped_by_user") if stopped: self._safe_send(chat_id, "Текущая задача остановлена и удалена из очереди.", reply_to=message_id) else: self._safe_send(chat_id, "Сейчас нет активной задачи.", reply_to=message_id) return if command == "/cancel": parts = text.split(maxsplit=1) if len(parts) < 2: self._safe_send(chat_id, "Использование: /cancel ", reply_to=message_id) return arg = parts[1].strip() if arg.lower() == "all": with self.queue_lock: self.stop_current_job = True self._stop_active_codex_process() count = len(self.queue) self.queue = [] self._persist_queue() self._safe_send(chat_id, f"Удалено задач из очереди: {count}", reply_to=message_id) return cancelled = self._cancel_by_id_prefix(arg) self._safe_send(chat_id, f"Задача удалена: {arg}" if cancelled else f"Задача не найдена: {arg}", reply_to=message_id) return def _help_text(self, *, is_owner: bool) -> str: lines = [ "Доступные команды:", "/status — активная задача и размер очереди", "/queue — список задач в очереди", "/stop — остановить текущую задачу", "/cancel — удалить задачу по id (префикс) или все", "/new — архивировать историю и начать новую", "/voice_on — включить озвучивание финальных ответов", "/voice_off — выключить озвучивание финальных ответов", "/voice_status — показать состояние озвучивания", "/help — эта справка", ] if is_owner: lines.insert(-1, "/restart_service — перезапустить сервис через systemd") return "\n".join(lines) def _status_text(self) -> str: with self.queue_lock: active = next((j for j in self.queue if j.get("status") == "active"), None) pending = sum(1 for j in self.queue if j.get("status") == "pending") if not active: return f"Статус: активной задачи нет.\nВ очереди pending: {pending}" elapsed = int(time.time() - (self.active_job_started_at or time.time())) return ( f"Статус: активная задача #{active.get('num', '?')}\n" f"Тип: {active.get('type', 'text')}\n" f"Попытка: {int(active.get('attempts', 0)) + 1}/{self.cfg.max_retries}\n" f"Выполняется: {elapsed}с\n" f"Pending: {pending}" ) def _queue_text(self) -> str: with self.queue_lock: items = list(self.queue) if not items: return "Очередь пуста." lines = [f"Очередь: {len(items)}"] for i, job in enumerate(items[:10], start=1): lines.append( f"{i}) #{job.get('num', '?')} [{job.get('status')}] {job.get('type')} attempts={job.get('attempts', 0)}" ) if len(items) > 10: lines.append(f"...и ещё {len(items) - 10} задач") return "\n".join(lines) def _cancel_active_job(self, reason: str) -> bool: with self.queue_lock: active = next((j for j in self.queue if j.get("status") == "active"), None) if not active: return False self.stop_current_job = True self._stop_active_codex_process() self.queue = [j for j in self.queue if j.get("id") != active.get("id")] self._persist_queue() self._append_history_event("job_stopped_by_user", {"jobId": active.get("id"), "reason": reason}) return True def _cancel_by_id_prefix(self, prefix: str) -> bool: prefix = prefix.strip().lower() normalized_num = prefix.lstrip("#") with self.queue_lock: target = next( ( j for j in self.queue if str(j.get("id", "")).lower().startswith(prefix) or str(j.get("num", "")).lower() == normalized_num ), None ) if not target: return False if target.get("status") == "active": self.stop_current_job = True self._stop_active_codex_process() self.queue = [j for j in self.queue if j.get("id") != target.get("id")] self._persist_queue() return True def _worker_loop(self) -> None: while not self.stop_event.is_set(): job = None with self.queue_lock: for item in self.queue: if item.get("status") == "pending": item["status"] = "active" item["active_since"] = now_iso() item["updated_at"] = now_iso() self.active_job_id = item.get("id") self.active_job_started_at = time.time() job = dict(item) self._persist_queue() break if not job: time.sleep(0.5) continue self.stop_current_job = False self._process_job(job) self.active_job_id = None self.active_job_started_at = None def _process_job(self, job: dict[str, Any]) -> None: job_id = job["id"] job_num = job.get("num", "?") chat_id = int(job["chat_id"]) message_id = int(job["message_id"]) history_path = Path(job["history_file"]) self._safe_send(chat_id, f"Задача #{job_num} в работе.", reply_to=message_id) try: if job.get("type") == "voice": self._safe_send(chat_id, f"#{job_num}: распознаю голосовое...", reply_to=message_id) recognized = self._transcribe_voice_job(job) job["text"] = recognized self._append_history(history_path, "voice_transcription", {"jobId": job_id, "jobNum": job_num, "text": recognized}) preview = recognized.strip() if len(preview) > 1200: preview = preview[:1200] + " ...[обрезано]" self._safe_send(chat_id, f"#{job_num}: распознано:\n{preview}", reply_to=message_id) self._safe_send(chat_id, f"#{job_num}: распознано, отправляю в Codex.", reply_to=message_id) prompt = self._build_prompt(job) self._append_history(history_path, "codex_request", {"jobId": job_id, "prompt": prompt}) answer = self._run_codex(prompt, chat_id, message_id, job_id, job_num) for chunk in split_long_text(answer): self._safe_send(chat_id, chunk, reply_to=message_id) if self._voice_replies_enabled(job.get("username") or ""): self._send_voice_reply_for_answer(chat_id, message_id, job_num, answer, history_path, job_id) self._safe_send(chat_id, f"Готово #{job_num}.", reply_to=message_id) self._append_history(history_path, "codex_response", {"jobId": job_id, "text": answer}) self._mark_job_done(job_id) self._send_private_job_public_report(job, answer) except Exception as e: if self.stop_current_job: self._append_history(history_path, "job_stopped", {"jobId": job_id, "reason": str(e)}) self._safe_send(chat_id, f"Задача #{job_num} остановлена.", reply_to=message_id) self._mark_job_removed(job_id) self.stop_current_job = False return if isinstance(e, VoiceTranscriptionError): self._append_history(history_path, "voice_transcription_failed", { "jobId": job_id, "jobNum": job_num, "stage": e.stage, "retryable": e.retryable, "error": e.user_message, "detail": e.detail, }) self._handle_job_failure(job, e) def _build_prompt(self, job: dict[str, Any]) -> str: retry_block = "" retry_reason = (job.get("retry_reason") or "").strip() if retry_reason: retry_block = f"\n\nПометка retry: {retry_reason}" role = (job.get("role") or "owner").strip().lower() player_block = "" if role == "player": player_name = (job.get("player_name") or "").strip() player_dir = self.cfg.codex_workdir / "Players" / (job.get("username") or "") player_block = ( "\n\nРежим игрока (обязательно):\n" f"- Пользователь: {player_name} (@{job.get('username')}).\n" f"- Рабочая папка игрока: {player_dir}\n" "- Код проекта не изменять.\n" "- Можно отвечать на вопросы по проекту, предлагать идеи и готовить ТЗ.\n" "- Если нужны правки кода, описывать предложение текстом и сохранять материалы только в папке игрока." ) return ( "Пришло сообщение в Telegram.\n" f"Тип: {job.get('type')}\n" f"Источник Telegram: {job.get('update_type', 'message')}\n" f"Тип чата: {job.get('chat_type') or ''}\n" f"Канал/чат: @{job.get('chat_username') or ''} {job.get('chat_title') or ''}\n" f"Username отправителя: @{job.get('username')}\n" f"Подпись автора в Telegram: {job.get('author_signature') or ''}\n" "Текст для обработки:\n" f"{job.get('text')}\n\n" f"История диалога (JSONL): {job.get('history_file')}\n" f"Инструкции агента: {self.cfg.agent_instructions_file}\n" f"Работай в рабочем проекте аккуратно и верни только текст ответа пользователю.{player_block}{retry_block}" ) def _run_codex(self, prompt: str, chat_id: int, message_id: int, job_id: str, job_num: Any) -> str: output_lines: list[str] = [] with tempfile.NamedTemporaryFile(prefix="shine-codex-last-message-", suffix=".txt", delete=False) as tmp: output_file = Path(tmp.name) cmd = [ str(self.cfg.codex_bin), "exec", "--dangerously-bypass-approvals-and-sandbox", "--json", "-C", str(self.cfg.codex_workdir), "-o", str(output_file), prompt, ] print(f"[py-bot] codex exec start job={job_id[:8]}", flush=True) process = subprocess.Popen( cmd, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, encoding="utf-8", errors="replace", bufsize=1, ) with self.active_process_lock: self.active_process = process self.last_heartbeat_at = 0.0 last_user_note = "" last_user_note_at = 0.0 codex_started_at = time.time() last_job_message_at = codex_started_at def on_line(line: str) -> None: nonlocal last_user_note, last_user_note_at, last_job_message_at output_lines.append(line) note = self._extract_codex_user_note(line) now = time.time() if note and note != last_user_note and now - last_user_note_at > 8: self._safe_send(chat_id, f"#{job_num}: {note}", reply_to=message_id) last_user_note = note last_user_note_at = now last_job_message_at = now reader_done = threading.Event() def reader() -> None: if not process.stdout: reader_done.set() return for line in process.stdout: on_line(line.rstrip("\n")) reader_done.set() t = threading.Thread(target=reader, name=f"codex-reader-{job_id[:8]}", daemon=True) t.start() try: deadline = time.time() + self.cfg.codex_timeout_seconds return_code = None while return_code is None: return_code = process.poll() now = time.time() if return_code is not None: break if now >= deadline: process.kill() t.join(timeout=2) raise RuntimeError(f"Codex timeout after {self.cfg.codex_timeout_seconds}s") if now - codex_started_at >= 120 and now - last_job_message_at >= 120: elapsed = self._format_duration(int(now - codex_started_at)) self._safe_send( chat_id, f"#{job_num}: задача ещё выполняется, работает уже {elapsed}. От Codex давно нет сообщений.", reply_to=message_id, ) last_job_message_at = now self.last_heartbeat_at = now time.sleep(1) finally: with self.active_process_lock: self.active_process = None reader_done.wait(timeout=2) if return_code != 0: tail = "\n".join(output_lines[-40:]) raise RuntimeError(f"Codex exited with code {return_code}. Output tail:\n{tail}") if output_file.exists(): answer = output_file.read_text(encoding="utf-8").strip() try: output_file.unlink(missing_ok=True) except Exception: pass if answer: return answer fallback = self._extract_fallback_message(output_lines) if not fallback: raise RuntimeError("Codex returned empty response") return fallback def _stop_active_codex_process(self) -> bool: with self.active_process_lock: process = self.active_process if process is None: return False if process.poll() is not None: return False process.terminate() try: process.wait(timeout=2) except subprocess.TimeoutExpired: process.kill() return True def _handle_job_failure(self, job: dict[str, Any], err: Exception) -> None: job_id = job["id"] job_num = job.get("num", "?") chat_id = int(job["chat_id"]) message_id = int(job["message_id"]) error_text = str(err).strip() or err.__class__.__name__ user_error_text = self._user_error_text(err) retryable = not isinstance(err, VoiceTranscriptionError) or err.retryable log_error_text = err.log_text() if isinstance(err, VoiceTranscriptionError) else error_text print(f"[py-bot] Ошибка job={job_id[:8]}: {log_error_text}", flush=True) print(traceback.format_exc(), flush=True) with self.queue_lock: target = next((j for j in self.queue if j.get("id") == job_id), None) if not target: return attempts = int(target.get("attempts", 0)) + 1 target["attempts"] = attempts target["last_error"] = error_text[:1000] target["updated_at"] = now_iso() if retryable and attempts < self.cfg.max_retries: target["status"] = "pending" target["retry_reason"] = error_text[:200] self._persist_queue() will_retry = True else: self.queue = [j for j in self.queue if j.get("id") != job_id] self._persist_queue() will_retry = False if will_retry: self._safe_send( chat_id, f"{user_error_text}\nПовторю задачу #{job_num}: попытка {attempts + 1}/{self.cfg.max_retries}.", reply_to=message_id, ) else: self._safe_send(chat_id, f"{user_error_text}\nЗадача #{job_num} остановлена.", reply_to=message_id) def _user_error_text(self, err: Exception) -> str: if isinstance(err, VoiceTranscriptionError): return f"Не удалось распознать голосовое: {err.user_message}" error_text = str(err).strip() or err.__class__.__name__ return f"Ошибка выполнения задачи: {error_text}" def _mark_job_done(self, job_id: str) -> None: with self.queue_lock: self.queue = [j for j in self.queue if j.get("id") != job_id] self._persist_queue() def _mark_job_removed(self, job_id: str) -> None: with self.queue_lock: self.queue = [j for j in self.queue if j.get("id") != job_id] self._persist_queue() def _remember_public_report_chat(self, chat_id: int) -> None: if self.state.get("public_report_chat_id") == chat_id: return self.state["public_report_chat_id"] = chat_id self.state["updated_at"] = now_iso() self._persist_state() def _public_report_chat_id(self) -> int | str | None: chat_id = self.state.get("public_report_chat_id") if isinstance(chat_id, int): return self._resolve_chat_id(chat_id) if self.cfg.allowed_channel_username: return f"@{self.cfg.allowed_channel_username}" return None def _send_private_job_public_report(self, job: dict[str, Any], answer: str) -> None: if job.get("chat_type") != "private": return report_chat_id = self._public_report_chat_id() if report_chat_id is None: return job_num = job.get("num", "?") source_text = (job.get("text") or "").strip() if not source_text: source_text = "(пустой текст запроса)" role = (job.get("role") or "owner").strip().lower() author_label = "Айдар" if role == "player": player_name = (job.get("player_name") or "").strip() or job.get("username") or "Игрок" author_label = f"{player_name} (@{job.get('username')})" if job.get("type") == "voice": voice_file_id = (job.get("telegram_file_id") or "").strip() media_type = (job.get("telegram_media_type") or "voice").strip() request_caption = self._trim_telegram_caption( f"{author_label} сделал {media_type}-запрос, задача #{job_num}.\n\n" f"Распознанный текст:\n{source_text}" ) request_message_id = None if voice_file_id: request_message_id = self._safe_send_telegram_file( report_chat_id, voice_file_id, media_type=media_type, caption=request_caption, ) if request_message_id is None: request_message_id = self._safe_send(report_chat_id, request_caption) else: request_report = ( f"{author_label} сделал запрос, задача #{job_num}.\n\n" f"{source_text}" ) request_message_id = self._safe_send(report_chat_id, request_report) if request_message_id is None: return answer_text = (answer or "").strip() or "(пустой ответ)" answer_chunks = split_long_text(f"Ответ на задачу #{job_num}:\n\n{answer_text}") for chunk in answer_chunks: self._safe_send(report_chat_id, chunk, reply_to=request_message_id) @staticmethod def _trim_telegram_caption(text: str, limit: int = 1000) -> str: text = (text or "").strip() if len(text) <= limit: return text return text[:limit].rstrip() + "\n...[обрезано]" def _safe_send_telegram_file( self, chat_id: int | str, file_id: str, *, media_type: str = "voice", caption: str = "", reply_to: int | None = None, ) -> int | None: file_id = (file_id or "").strip() if not file_id: return None caption = self._trim_telegram_caption(caption) resolved_chat_id: int | str = self._resolve_chat_id(chat_id) if isinstance(chat_id, int) else chat_id resolved_reply_to = reply_to if resolved_chat_id == chat_id or isinstance(chat_id, str) else None def send(target_chat_id: int | str, target_reply_to: int | None) -> dict[str, Any]: if media_type == "audio": return self.telegram.send_audio(target_chat_id, file_id, caption=caption, reply_to_message_id=target_reply_to) return self.telegram.send_voice(target_chat_id, file_id, caption=caption, reply_to_message_id=target_reply_to) try: sent = send(resolved_chat_id, resolved_reply_to) result = sent.get("result") or {} message_id = result.get("message_id") return message_id if isinstance(message_id, int) else None except Exception as e: migrate_to_chat_id = self._extract_migrate_to_chat_id(str(e)) if migrate_to_chat_id is not None: if isinstance(resolved_chat_id, int): self._remember_chat_migration(resolved_chat_id, migrate_to_chat_id, "send_file_error") try: sent = send(migrate_to_chat_id, None) result = sent.get("result") or {} message_id = result.get("message_id") return message_id if isinstance(message_id, int) else None except Exception as retry_error: print(f"[py-bot] sendFile retry after migration error: {retry_error}", flush=True) return None print(f"[py-bot] sendFile error: {e}", flush=True) return None def _safe_send_voice_upload( self, chat_id: int | str, voice_bytes: bytes, filename: str, *, caption: str = "", reply_to: int | None = None, ) -> int | None: if not voice_bytes: return None caption = self._trim_telegram_caption(caption) resolved_chat_id: int | str = self._resolve_chat_id(chat_id) if isinstance(chat_id, int) else chat_id resolved_reply_to = reply_to if resolved_chat_id == chat_id or isinstance(chat_id, str) else None def send(target_chat_id: int | str, target_reply_to: int | None) -> dict[str, Any]: return self.telegram.send_voice_upload( target_chat_id, voice_bytes, filename, caption=caption, reply_to_message_id=target_reply_to, ) try: sent = send(resolved_chat_id, resolved_reply_to) result = sent.get("result") or {} message_id = result.get("message_id") return message_id if isinstance(message_id, int) else None except Exception as e: migrate_to_chat_id = self._extract_migrate_to_chat_id(str(e)) if migrate_to_chat_id is not None: if isinstance(resolved_chat_id, int): self._remember_chat_migration(resolved_chat_id, migrate_to_chat_id, "send_voice_upload_error") try: sent = send(migrate_to_chat_id, None) result = sent.get("result") or {} message_id = result.get("message_id") return message_id if isinstance(message_id, int) else None except Exception as retry_error: print(f"[py-bot] sendVoiceUpload retry after migration error: {retry_error}", flush=True) return None print(f"[py-bot] sendVoiceUpload error: {e}", flush=True) return None def _safe_send(self, chat_id: int | str, text: str, reply_to: int | None = None) -> int | None: text = (text or "").strip() if not text: return None if len(text) > 3900: text = text[:3900] + "\n...[обрезано]" resolved_chat_id: int | str = self._resolve_chat_id(chat_id) if isinstance(chat_id, int) else chat_id resolved_reply_to = reply_to if resolved_chat_id == chat_id or isinstance(chat_id, str) else None try: sent = self.telegram.send_message(resolved_chat_id, text, reply_to_message_id=resolved_reply_to) result = sent.get("result") or {} message_id = result.get("message_id") return message_id if isinstance(message_id, int) else None except Exception as e: migrate_to_chat_id = self._extract_migrate_to_chat_id(str(e)) if migrate_to_chat_id is not None: if isinstance(resolved_chat_id, int): self._remember_chat_migration(resolved_chat_id, migrate_to_chat_id, "send_message_error") try: sent = self.telegram.send_message(migrate_to_chat_id, text, reply_to_message_id=None) result = sent.get("result") or {} message_id = result.get("message_id") return message_id if isinstance(message_id, int) else None except Exception as retry_error: print(f"[py-bot] sendMessage retry after migration error: {retry_error}", flush=True) return None print(f"[py-bot] sendMessage error: {e}", flush=True) return None def _schedule_self_restart(self) -> None: if self.restart_requested: return self.restart_requested = True def restart() -> None: time.sleep(1.5) print("[py-bot] restart requested by Telegram command", flush=True) os._exit(0) threading.Thread(target=restart, name="shine-py-bot-self-restart", daemon=True).start() def _send_voice_reply_for_answer( self, chat_id: int, message_id: int, job_num: Any, answer: str, history_path: Path, job_id: str, ) -> None: if not self.cfg.openai_api_key: note = "не настроен ключ OpenAI для озвучивания." self._append_history(history_path, "voice_reply_failed", {"jobId": job_id, "jobNum": job_num, "error": note}) self._safe_send(chat_id, f"Озвучивание включено, но {note}", reply_to=message_id) return chunks = split_text_for_tts(answer, self.cfg.openai_tts_chunk_chars) if not chunks: return sent_count = 0 total = len(chunks) print(f"[py-bot] tts start job={str(job_id)[:8]} chunks={total}", flush=True) for index, chunk in enumerate(chunks, start=1): try: audio = self._openai_tts(chunk) except VoiceReplyError as e: self._append_history(history_path, "voice_reply_failed", { "jobId": job_id, "jobNum": job_num, "part": index, "parts": total, "error": str(e), }) self._safe_send(chat_id, f"Не удалось озвучить ответ #{job_num}: {e}", reply_to=message_id) return caption = f"Озвучка ответа #{job_num}" if total > 1: caption += f", часть {index}/{total}" message_sent = self._safe_send_voice_upload( chat_id, audio, f"shine-answer-{job_num}-{index}.ogg", caption=caption, reply_to=message_id, ) if message_sent is None: self._append_history(history_path, "voice_reply_failed", { "jobId": job_id, "jobNum": job_num, "part": index, "parts": total, "error": "Telegram не принял voice-файл озвучки.", }) self._safe_send(chat_id, f"Озвучка ответа #{job_num} создана, но Telegram не принял voice-файл.", reply_to=message_id) return sent_count += 1 self._append_history(history_path, "voice_reply_sent", {"jobId": job_id, "jobNum": job_num, "parts": sent_count}) print(f"[py-bot] tts done job={str(job_id)[:8]} sent={sent_count}", flush=True) def _openai_tts(self, text: str) -> bytes: payload = { "model": self.cfg.openai_tts_model, "voice": self.cfg.openai_tts_voice, "input": text, "response_format": self.cfg.openai_tts_response_format, } data = json.dumps(payload, ensure_ascii=False).encode("utf-8") req = request.Request("https://api.openai.com/v1/audio/speech", method="POST", data=data) req.add_header("Authorization", f"Bearer {self.cfg.openai_api_key}") req.add_header("Content-Type", "application/json") try: with request.urlopen(req, timeout=self.cfg.openai_tts_timeout_seconds) as resp: audio = resp.read() except TimeoutError as e: raise VoiceReplyError(f"OpenAI не успел сгенерировать речь за {self.cfg.openai_tts_timeout_seconds} секунд.") from e except error.HTTPError as e: detail = e.read().decode("utf-8", errors="replace") if e.code == 401: message = "OpenAI отклонил ключ API для озвучивания." elif e.code == 429: message = "OpenAI временно ограничил озвучивание из-за лимита запросов." elif e.code >= 500: message = "OpenAI временно не смог сгенерировать речь." else: message = f"OpenAI вернул ошибку HTTP {e.code} при озвучивании." if detail: message = f"{message} Детали: {detail[:500]}" raise VoiceReplyError(message) from e except error.URLError as e: raise VoiceReplyError(f"не удалось отправить текст в OpenAI TTS из-за сетевой ошибки: {e.reason}") from e if not audio: raise VoiceReplyError("OpenAI вернул пустой аудиофайл.") return audio def _transcribe_voice_job(self, job: dict[str, Any]) -> str: if not self.cfg.openai_api_key: raise VoiceTranscriptionError( "не настроен ключ OpenAI для распознавания.", stage="config", retryable=False, ) file_id = (job.get("telegram_file_id") or "").strip() if not file_id: raise VoiceTranscriptionError( "Telegram не передал идентификатор файла.", stage="telegram_file_id", retryable=False, ) job_id = str(job.get("id") or "")[:8] job_num = job.get("num", "?") media_type = (job.get("telegram_media_type") or "voice").strip() started_at = time.time() print(f"[py-bot] transcribe start job={job_id} num={job_num} media={media_type}", flush=True) file_bytes, filename = self._download_telegram_file(file_id) print( f"[py-bot] transcribe downloaded job={job_id} filename={filename} size={len(file_bytes)} bytes", flush=True, ) text = self._openai_transcribe(file_bytes, filename).strip() if not text: raise VoiceTranscriptionError( "сервис распознавания вернул пустой текст. Возможно, в записи нет слышимой речи или качество звука слишком низкое.", stage="empty_text", retryable=False, ) elapsed = self._format_duration(int(time.time() - started_at)) print(f"[py-bot] transcribe done job={job_id} chars={len(text)} elapsed={elapsed}", flush=True) return text def _download_telegram_file(self, file_id: str) -> tuple[bytes, str]: try: result = self.telegram.call("getFile", {"file_id": file_id}, timeout=120) except TimeoutError as e: raise VoiceTranscriptionError( "Telegram долго не отдавал информацию о файле.", stage="telegram_get_file_timeout", detail=str(e), ) from e except Exception as e: raise VoiceTranscriptionError( "не удалось получить информацию о файле из Telegram.", stage="telegram_get_file", detail=str(e), ) from e info = result.get("result") or {} file_path = info.get("file_path") if not file_path: raise VoiceTranscriptionError( "Telegram не вернул путь к файлу.", stage="telegram_file_path", retryable=True, detail=json.dumps(info, ensure_ascii=False)[:1000], ) file_url = f"https://api.telegram.org/file/bot{self.cfg.telegram_bot_token}/{file_path}" req = request.Request(file_url, method="GET") try: with request.urlopen(req, timeout=self.cfg.telegram_file_download_timeout_seconds) as resp: data = resp.read() except TimeoutError as e: raise VoiceTranscriptionError( f"Telegram не успел отдать аудиофайл за {self.cfg.telegram_file_download_timeout_seconds} секунд.", stage="telegram_download_timeout", detail=str(e), ) from e except error.HTTPError as e: detail = e.read().decode("utf-8", errors="replace") raise VoiceTranscriptionError( f"Telegram вернул ошибку HTTP {e.code} при скачивании аудио.", stage="telegram_download_http", retryable=e.code >= 500 or e.code == 429, detail=detail[:1000], ) from e except error.URLError as e: raise VoiceTranscriptionError( "не удалось скачать аудиофайл из Telegram из-за сетевой ошибки.", stage="telegram_download_network", detail=str(e.reason), ) from e if not data: raise VoiceTranscriptionError( "Telegram отдал пустой аудиофайл.", stage="telegram_download_empty", retryable=True, ) original_name = Path(file_path).name or "audio.ogg" lower = original_name.lower() # OpenAI transcription может не принимать расширение .oga, нормализуем в .ogg. if lower.endswith(".oga"): base = original_name[:-4] if len(original_name) > 4 else "audio" normalized = f"{base}.ogg" else: normalized = original_name return data, normalized def _openai_transcribe(self, file_bytes: bytes, filename: str) -> str: boundary = "----shine-boundary-" + "".join(random.choices("abcdef0123456789", k=16)) mime = mimetypes.guess_type(filename)[0] or "application/octet-stream" def text_part(name: str, value: str) -> bytes: return ( f"--{boundary}\r\n" f'Content-Disposition: form-data; name="{name}"\r\n\r\n' f"{value}\r\n" ).encode("utf-8") body = bytearray() body.extend(text_part("model", self.cfg.openai_transcribe_model)) body.extend(text_part("response_format", "text")) body.extend( ( f"--{boundary}\r\n" f'Content-Disposition: form-data; name="file"; filename="{filename}"\r\n' f"Content-Type: {mime}\r\n\r\n" ).encode("utf-8") ) body.extend(file_bytes) body.extend(b"\r\n") body.extend(f"--{boundary}--\r\n".encode("utf-8")) req = request.Request("https://api.openai.com/v1/audio/transcriptions", method="POST", data=bytes(body)) req.add_header("Authorization", f"Bearer {self.cfg.openai_api_key}") req.add_header("Content-Type", f"multipart/form-data; boundary={boundary}") try: with request.urlopen(req, timeout=self.cfg.openai_transcribe_timeout_seconds) as resp: return resp.read().decode("utf-8", errors="replace") except TimeoutError as e: raise VoiceTranscriptionError( f"OpenAI не успел распознать аудио за {self.cfg.openai_transcribe_timeout_seconds} секунд.", stage="openai_transcribe_timeout", detail=str(e), ) from e except error.HTTPError as e: detail = e.read().decode("utf-8", errors="replace") if e.code == 400: user_message = "OpenAI не принял аудиофайл для распознавания." elif e.code == 401: user_message = "OpenAI отклонил ключ API для распознавания." elif e.code == 413: user_message = "аудиофайл слишком большой для распознавания OpenAI." elif e.code == 429: user_message = "OpenAI временно ограничил распознавание из-за лимита запросов." elif e.code >= 500: user_message = "OpenAI временно не смог обработать распознавание." else: user_message = f"OpenAI вернул ошибку HTTP {e.code} при распознавании." raise VoiceTranscriptionError( user_message, stage="openai_transcribe_http", retryable=e.code == 429 or e.code >= 500, detail=detail[:1500], ) from e except error.URLError as e: raise VoiceTranscriptionError( "не удалось отправить аудио в OpenAI из-за сетевой ошибки.", stage="openai_transcribe_network", detail=str(e.reason), ) from e @staticmethod def _extract_codex_user_note(line: str) -> str | None: s = (line or "").strip() if not s.startswith("{"): return None try: obj = json.loads(s) except Exception: return None if obj.get("type") != "item.completed": return None item = obj.get("item") or {} if item.get("type") != "agent_message": return None text = (item.get("text") or "").strip() if not text: return None if len(text) > 220: return text[:220] + "..." return text @staticmethod def _extract_fallback_message(lines: list[str]) -> str: for line in reversed(lines): line = line.strip() if not line: continue if line.startswith("{") and '"type":' in line: continue if line.startswith("mcp:") or line.startswith("OpenAI Codex"): continue return line return "" @staticmethod def _format_duration(seconds: int) -> str: seconds = max(0, seconds) minutes, sec = divmod(seconds, 60) hours, minutes = divmod(minutes, 60) if hours: return f"{hours}ч {minutes}м {sec}с" if minutes: return f"{minutes}м {sec}с" return f"{sec}с" def run_selftest(config: BotConfig, prompt: str) -> int: cmd = [ str(config.codex_bin), "exec", "--dangerously-bypass-approvals-and-sandbox", "--json", "-C", str(config.codex_workdir), prompt, ] proc = subprocess.run( cmd, stdin=subprocess.DEVNULL, capture_output=True, text=True, encoding="utf-8", errors="replace", ) print(proc.stdout) if proc.stderr: print(proc.stderr) return proc.returncode def main() -> int: parser = argparse.ArgumentParser(description="SHiNE Python Telegram bot wrapper for Codex CLI") parser.add_argument("--selftest-codex", default="", help="Выполнить только codex exec с этим prompt и выйти") args = parser.parse_args() root = Path(__file__).resolve().parent cfg = BotConfig(root) if args.selftest_codex: return run_selftest(cfg, args.selftest_codex) service = ShinePyBotService(cfg) try: service.run() except KeyboardInterrupt: service.shutdown() return 0 except Exception as e: print(f"[py-bot] FATAL: {e}", flush=True) return 1 return 0 if __name__ == "__main__": raise SystemExit(main())