#!/usr/bin/env python3 from __future__ import annotations import argparse import datetime as dt import fcntl import json import mimetypes import os import random import re import string import subprocess import tempfile import threading import time import uuid from pathlib import Path from typing import Any from urllib import error, request def now_iso() -> str: return dt.datetime.now(dt.timezone.utc).isoformat() def normalize_username(value: str | None) -> str: if not value: return "" value = value.strip() if value.startswith("@"): value = value[1:] return value.lower() def split_long_text(text: str, chunk_size: int = 3500) -> list[str]: text = (text or "").strip() if not text: return ["(пустой ответ)"] return [text[i:i + chunk_size] for i in range(0, len(text), chunk_size)] def read_env_file(path: Path) -> dict[str, str]: result: dict[str, str] = {} if not path.exists(): return result for raw_line in path.read_text(encoding="utf-8").splitlines(): line = raw_line.strip() if not line or line.startswith("#") or "=" not in line: continue key, value = line.split("=", 1) key = key.strip() value = value.strip().strip('"').strip("'") result[key] = value return result class JsonLineStore: @staticmethod def load(path: Path) -> list[dict[str, Any]]: if not path.exists(): return [] items: list[dict[str, Any]] = [] for line in path.read_text(encoding="utf-8").splitlines(): line = line.strip() if not line: continue items.append(json.loads(line)) return items @staticmethod def save(path: Path, items: list[dict[str, Any]]) -> None: path.parent.mkdir(parents=True, exist_ok=True) tmp = path.with_suffix(path.suffix + ".tmp") with tmp.open("w", encoding="utf-8") as f: for item in items: f.write(json.dumps(item, ensure_ascii=False) + "\n") tmp.replace(path) @staticmethod def append(path: Path, item: dict[str, Any]) -> None: path.parent.mkdir(parents=True, exist_ok=True) with path.open("a", encoding="utf-8") as f: f.write(json.dumps(item, ensure_ascii=False) + "\n") class TelegramApi: def __init__(self, token: str): self.base = f"https://api.telegram.org/bot{token}/" def call(self, method: str, payload: dict[str, Any] | None = None, timeout: int = 60) -> dict[str, Any]: data = None headers = {} if payload is not None: data = json.dumps(payload).encode("utf-8") headers["Content-Type"] = "application/json" req = request.Request(self.base + method, data=data, headers=headers, method="POST") try: with request.urlopen(req, timeout=timeout) as resp: raw = resp.read().decode("utf-8") except error.HTTPError as e: body = e.read().decode("utf-8", errors="replace") raise RuntimeError(f"Telegram HTTP {e.code}: {body}") from e except Exception as e: raise RuntimeError(f"Telegram request failed: {e}") from e result = json.loads(raw) if not result.get("ok"): raise RuntimeError(f"Telegram API error: {result}") return result def get_updates(self, offset: int | None, timeout_sec: int) -> list[dict[str, Any]]: payload: dict[str, Any] = {"timeout": timeout_sec, "allowed_updates": ["message", "channel_post"]} if offset is not None: payload["offset"] = offset result = self.call("getUpdates", payload=payload, timeout=timeout_sec + 15) return result.get("result", []) def send_message(self, chat_id: int, text: str, reply_to_message_id: int | None = None) -> None: payload: dict[str, Any] = {"chat_id": chat_id, "text": text} if reply_to_message_id is not None: payload["reply_to_message_id"] = reply_to_message_id self.call("sendMessage", payload=payload, timeout=30) def delete_webhook(self) -> None: self.call("deleteWebhook", payload={"drop_pending_updates": False}, timeout=30) class BotConfig: def __init__(self, root_dir: Path): env = dict(os.environ) env.update(read_env_file(root_dir / ".env")) self.root_dir = root_dir self.telegram_bot_token = self._required(env, "TELEGRAM_BOT_TOKEN") self.allowed_username = normalize_username(env.get("ALLOWED_TELEGRAM_USERNAME", "AidarKC")) self.allowed_channel_username = normalize_username(env.get("ALLOWED_TELEGRAM_CHANNEL_USERNAME", "shine_writing")) self.bot_username = env.get("BOT_USERNAME", "aidar_su_bot") self.openai_api_key = env.get("OPENAI_API_KEY", "").strip() self.openai_transcribe_model = env.get("OPENAI_TRANSCRIBE_MODEL", "gpt-4o-mini-transcribe") self.codex_bin = Path(env.get( "CODEX_BIN", "/home/ai/.cache/JetBrains/IntelliJIdea2026.1/aia/codex/bin/codex-x86_64-unknown-linux-musl" )) self.codex_workdir = Path(env.get("CODEX_WORKDIR", "/home/ai/work/SHiNE/SHiNE-server-sha256")) self.codex_timeout_seconds = int(env.get("CODEX_TIMEOUT_SECONDS", "900")) self.max_retries = max(1, int(env.get("MAX_RETRIES", "3"))) self.data_dir = (root_dir / env.get("DATA_DIR", "./data")).resolve() self.agent_instructions_file = (root_dir / "AGENT.md").resolve() @staticmethod def _required(env: dict[str, str], key: str) -> str: value = env.get(key, "").strip() if not value: raise RuntimeError(f"Не задан обязательный параметр: {key}") return value class ShinePyBotService: def __init__(self, config: BotConfig): self.cfg = config self.telegram = TelegramApi(config.telegram_bot_token) self.queue_file = config.data_dir / "py_queue.jsonl" self.state_file = config.data_dir / "py_state.json" self.processed_updates_file = config.data_dir / "py_processed_updates.log" self.lock_file = config.data_dir / "py_app.lock" self.history_dir = config.data_dir / "history" self.history_archive_dir = self.history_dir / "archive" self.max_processed_updates = 5000 self.queue_lock = threading.RLock() self.stop_event = threading.Event() self.worker = threading.Thread(target=self._worker_loop, name="shine-py-bot-worker", daemon=True) self.queue: list[dict[str, Any]] = [] self.state: dict[str, Any] = {} self.processed_updates: list[str] = [] self.active_job_id: str | None = None self.active_job_started_at: float | None = None self.active_process: subprocess.Popen[str] | None = None self.active_process_lock = threading.Lock() self.stop_current_job = False self.lock_fd = None self.last_heartbeat_at: float = 0.0 self.restart_requested = False def run(self) -> None: self._ensure_dirs() self._acquire_single_instance_lock() self._load_state() self._load_queue() self._load_processed_updates() self._recover_active_jobs_after_restart() self.telegram.delete_webhook() self._init_offset_if_missing() self.worker.start() self._append_history_event("service_started", {"allowedUsername": self.cfg.allowed_username}) print(f"[py-bot] Запущен. allowed user: @{self.cfg.allowed_username}", flush=True) try: while not self.stop_event.is_set(): try: offset = self.state.get("offset") updates = self.telegram.get_updates(offset=offset, timeout_sec=25) except Exception as e: print(f"[py-bot] Ошибка getUpdates: {e}", flush=True) time.sleep(2) continue for update in updates: update_id = update.get("update_id") if isinstance(update_id, int): self.state["offset"] = update_id + 1 self._persist_state() self._handle_update(update) finally: self.shutdown() def shutdown(self) -> None: if self.stop_event.is_set(): pass self.stop_event.set() self._stop_active_codex_process() if self.worker.is_alive(): self.worker.join(timeout=10) if self.lock_fd is not None: try: fcntl.flock(self.lock_fd, fcntl.LOCK_UN) finally: self.lock_fd.close() self.lock_fd = None self._append_history_event("service_stopped", {}) def _ensure_dirs(self) -> None: self.cfg.data_dir.mkdir(parents=True, exist_ok=True) self.history_dir.mkdir(parents=True, exist_ok=True) self.history_archive_dir.mkdir(parents=True, exist_ok=True) def _acquire_single_instance_lock(self) -> None: self.lock_file.parent.mkdir(parents=True, exist_ok=True) self.lock_fd = self.lock_file.open("a+") try: fcntl.flock(self.lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) except BlockingIOError: raise RuntimeError(f"Уже запущен другой инстанс (lock: {self.lock_file})") def _load_state(self) -> None: if self.state_file.exists(): self.state = json.loads(self.state_file.read_text(encoding="utf-8")) else: self.state = {} if not self.state.get("current_history_file"): history_file = self._create_new_history_file("initial") self.state["current_history_file"] = str(history_file) if not isinstance(self.state.get("next_job_number"), int): self.state["next_job_number"] = 1 self.state["updated_at"] = now_iso() self._persist_state() def _persist_state(self) -> None: self.state["updated_at"] = now_iso() tmp = self.state_file.with_suffix(".tmp") tmp.write_text(json.dumps(self.state, ensure_ascii=False, indent=2), encoding="utf-8") tmp.replace(self.state_file) def _load_queue(self) -> None: self.queue = JsonLineStore.load(self.queue_file) def _persist_queue(self) -> None: JsonLineStore.save(self.queue_file, self.queue) def _load_processed_updates(self) -> None: if not self.processed_updates_file.exists(): self.processed_updates = [] return lines = [x.strip() for x in self.processed_updates_file.read_text(encoding="utf-8").splitlines() if x.strip()] if len(lines) > self.max_processed_updates: lines = lines[-self.max_processed_updates:] self.processed_updates_file.write_text("\n".join(lines) + "\n", encoding="utf-8") self.processed_updates = lines def _mark_processed_update(self, update_key: str) -> bool: if update_key in self.processed_updates: return True self.processed_updates.append(update_key) if len(self.processed_updates) > self.max_processed_updates: self.processed_updates = self.processed_updates[-self.max_processed_updates:] self.processed_updates_file.write_text("\n".join(self.processed_updates) + "\n", encoding="utf-8") else: with self.processed_updates_file.open("a", encoding="utf-8") as f: f.write(update_key + "\n") return False def _recover_active_jobs_after_restart(self) -> None: recovered_ids: list[str] = [] for job in self.queue: if job.get("status") == "active": job["status"] = "pending" job["retry_reason"] = "service_restart_recovery" job["updated_at"] = now_iso() recovered_ids.append(job.get("id", "")) if recovered_ids: self._persist_queue() self._append_history_event("active_jobs_recovered", {"jobIds": recovered_ids}) def _init_offset_if_missing(self) -> None: if self.state.get("offset") is not None: return try: updates = self.telegram.get_updates(offset=None, timeout_sec=0) if updates: self.state["offset"] = int(updates[-1]["update_id"]) + 1 else: self.state["offset"] = 0 self._persist_state() except Exception as e: print(f"[py-bot] Не удалось инициализировать offset: {e}", flush=True) self.state["offset"] = 0 self._persist_state() def _current_history_file(self) -> Path: return Path(self.state["current_history_file"]) def _create_new_history_file(self, reason: str) -> Path: ts = dt.datetime.now().strftime("%Y-%m-%d_%H%M%S") rnd = "".join(random.choices(string.hexdigits.lower(), k=8)) path = self.history_dir / f"{ts}_{rnd}.jsonl" JsonLineStore.append(path, {"ts": now_iso(), "type": "history_created", "reason": reason}) return path def _rotate_history(self, reason: str, username: str) -> Path: current = self._current_history_file() if current.exists(): archived = self.history_archive_dir / current.name current.replace(archived) else: archived = self.history_archive_dir / "(empty)" new_file = self._create_new_history_file(reason) self.state["current_history_file"] = str(new_file) self._persist_state() self._append_history_event("history_rotated", {"reason": reason, "username": username, "archived": str(archived)}) return archived def _append_history(self, history_path: Path, event_type: str, payload: dict[str, Any]) -> None: row = {"ts": now_iso(), "type": event_type} row.update(payload) JsonLineStore.append(history_path, row) def _append_history_event(self, event_type: str, payload: dict[str, Any]) -> None: history_path = self._current_history_file() self._append_history(history_path, "system_event", {"event": event_type, **payload}) def _resolve_chat_id(self, chat_id: int) -> int: migrations = self.state.get("chat_id_migrations") if not isinstance(migrations, dict): return chat_id current = chat_id visited: set[int] = set() while current not in visited: visited.add(current) next_chat_id = migrations.get(str(current)) if not isinstance(next_chat_id, int): break current = next_chat_id return current def _remember_chat_migration(self, old_chat_id: int, new_chat_id: int, source: str) -> None: if old_chat_id == new_chat_id: return migrations = self.state.get("chat_id_migrations") if not isinstance(migrations, dict): migrations = {} self.state["chat_id_migrations"] = migrations if migrations.get(str(old_chat_id)) == new_chat_id: return migrations[str(old_chat_id)] = new_chat_id self._persist_state() with self.queue_lock: changed = False for job in self.queue: if job.get("chat_id") == old_chat_id: job["chat_id"] = new_chat_id job["updated_at"] = now_iso() changed = True if changed: self._persist_queue() self._append_history_event("chat_migrated_to_supergroup", { "oldChatId": old_chat_id, "newChatId": new_chat_id, "source": source, }) @staticmethod def _extract_migrate_to_chat_id(error_text: str) -> int | None: match = re.search(r'"migrate_to_chat_id"\s*:\s*(-?\d+)', error_text) if match: return int(match.group(1)) match = re.search(r"'migrate_to_chat_id'\s*:\s*(-?\d+)", error_text) if match: return int(match.group(1)) return None def _handle_update(self, update: dict[str, Any]) -> None: message = update.get("message") update_type = "message" if not isinstance(message, dict): message = update.get("channel_post") update_type = "channel_post" if not isinstance(message, dict): return chat = message.get("chat") or {} chat_id = chat.get("id") message_id = message.get("message_id") chat_type = str(chat.get("type") or "") chat_username = normalize_username(chat.get("username")) chat_title = str(chat.get("title") or "") sender = message.get("from") or {} username = normalize_username(sender.get("username")) author_signature = str(message.get("author_signature") or "").strip() author_username = username or normalize_username(author_signature) if not isinstance(chat_id, int) or not isinstance(message_id, int): return migrate_to_chat_id = message.get("migrate_to_chat_id") if isinstance(migrate_to_chat_id, int): self._remember_chat_migration(chat_id, migrate_to_chat_id, "telegram_message") return update_key = f"{chat_id}:{message_id}" if self._mark_processed_update(update_key): return is_channel_post = update_type == "channel_post" or chat_type == "channel" is_group_message = update_type == "message" and chat_type in ("group", "supergroup") is_allowed_channel = ( not is_channel_post or not self.cfg.allowed_channel_username or chat_username == self.cfg.allowed_channel_username ) if is_channel_post and not is_allowed_channel: return text = (message.get("text") or message.get("caption") or "").strip() history_path = self._current_history_file() if author_username != self.cfg.allowed_username: if is_channel_post or is_group_message: self._append_history(history_path, "chat_context_message", { "chatId": chat_id, "messageId": message_id, "updateType": update_type, "chatType": chat_type, "chatUsername": chat_username, "chatTitle": chat_title, "username": username, "authorSignature": author_signature, "text": text, "hasVoice": bool(message.get("voice")), "hasAudio": bool(message.get("audio")), }) if is_group_message: self._safe_send(chat_id, "Получил сообщение.", reply_to=message_id) return if not text: if message.get("voice"): self._enqueue_voice_job( chat_id, message_id, author_username, message["voice"].get("file_id"), update_type=update_type, chat_username=chat_username, chat_title=chat_title, author_signature=author_signature, chat_type=chat_type, ) return if message.get("audio"): self._enqueue_voice_job( chat_id, message_id, author_username, message["audio"].get("file_id"), update_type=update_type, chat_username=chat_username, chat_title=chat_title, author_signature=author_signature, chat_type=chat_type, ) return self._safe_send(chat_id, "Поддерживаются текст, voice и audio.", reply_to=message_id) return if text.startswith("/"): self._handle_command(chat_id, message_id, author_username, text) return self._append_history(history_path, "incoming_text", { "chatId": chat_id, "messageId": message_id, "updateType": update_type, "chatType": chat_type, "chatUsername": chat_username, "chatTitle": chat_title, "username": author_username, "authorSignature": author_signature, "text": text, }) job = self._build_job_base(chat_id, message_id, author_username, str(history_path)) job["type"] = "text" job["text"] = text job["update_type"] = update_type job["chat_type"] = chat_type job["chat_username"] = chat_username job["chat_title"] = chat_title job["author_signature"] = author_signature with self.queue_lock: self.queue.append(job) self._persist_queue() self._safe_send(chat_id, f"Принял задачу #{job['num']}", reply_to=message_id) def _enqueue_voice_job( self, chat_id: int, message_id: int, username: str, file_id: str | None, *, update_type: str = "message", chat_username: str = "", chat_title: str = "", author_signature: str = "", chat_type: str = "", ) -> None: if not file_id: self._safe_send(chat_id, "Не удалось прочитать file_id голосового.", reply_to=message_id) return history_path = self._current_history_file() self._append_history(history_path, "incoming_voice", { "chatId": chat_id, "messageId": message_id, "updateType": update_type, "chatType": chat_type, "chatUsername": chat_username, "chatTitle": chat_title, "username": username, "authorSignature": author_signature, "fileId": file_id, }) job = self._build_job_base(chat_id, message_id, username, str(history_path)) job["type"] = "voice" job["telegram_file_id"] = file_id job["update_type"] = update_type job["chat_type"] = chat_type job["chat_username"] = chat_username job["chat_title"] = chat_title job["author_signature"] = author_signature with self.queue_lock: self.queue.append(job) self._persist_queue() self._safe_send(chat_id, f"Принял voice в задачу #{job['num']}", reply_to=message_id) def _build_job_base(self, chat_id: int, message_id: int, username: str, history_file: str) -> dict[str, Any]: with self.queue_lock: num = int(self.state.get("next_job_number", 1)) self.state["next_job_number"] = num + 1 self._persist_state() return { "id": str(uuid.uuid4()), "num": num, "status": "pending", "type": "text", "chat_id": chat_id, "message_id": message_id, "username": username, "update_type": "message", "chat_type": "", "chat_username": "", "chat_title": "", "author_signature": "", "text": "", "telegram_file_id": "", "history_file": history_file, "attempts": 0, "retry_reason": "", "last_error": "", "created_at": now_iso(), "updated_at": now_iso(), "active_since": None, } def _handle_command(self, chat_id: int, message_id: int, username: str, text: str) -> None: lower = text.lower() if lower in ("/start", "/help"): self._safe_send(chat_id, self._help_text(), reply_to=message_id) return if lower == "/status": self._safe_send(chat_id, self._status_text(), reply_to=message_id) return if lower == "/queue": self._safe_send(chat_id, self._queue_text(), reply_to=message_id) return if lower == "/new": archived = self._rotate_history("command_new", username) self._safe_send(chat_id, f"История очищена. Новый диалог начат.\nАрхив: {archived.name}", reply_to=message_id) return if lower in ("/restart_service", "/restart"): self._append_history_event("restart_service_requested", { "chatId": chat_id, "messageId": message_id, "username": username, }) self._safe_send( chat_id, "Перезапускаю сервис. Если задача была активна, после старта она вернётся в очередь и продолжится.", reply_to=message_id, ) self._schedule_self_restart() return if lower == "/stop": stopped = self._cancel_active_job("stopped_by_user") if stopped: self._safe_send(chat_id, "Текущая задача остановлена и удалена из очереди.", reply_to=message_id) else: self._safe_send(chat_id, "Сейчас нет активной задачи.", reply_to=message_id) return if lower.startswith("/cancel"): parts = text.split(maxsplit=1) if len(parts) < 2: self._safe_send(chat_id, "Использование: /cancel ", reply_to=message_id) return arg = parts[1].strip() if arg.lower() == "all": with self.queue_lock: self.stop_current_job = True self._stop_active_codex_process() count = len(self.queue) self.queue = [] self._persist_queue() self._safe_send(chat_id, f"Удалено задач из очереди: {count}", reply_to=message_id) return cancelled = self._cancel_by_id_prefix(arg) self._safe_send(chat_id, f"Задача удалена: {arg}" if cancelled else f"Задача не найдена: {arg}", reply_to=message_id) return def _help_text(self) -> str: return ( "Доступные команды:\n" "/status — активная задача и размер очереди\n" "/queue — список задач в очереди\n" "/stop — остановить текущую задачу\n" "/cancel — удалить задачу по id (префикс) или все\n" "/new — архивировать историю и начать новую\n" "/restart_service — перезапустить сервис через systemd\n" "/help — эта справка" ) def _status_text(self) -> str: with self.queue_lock: active = next((j for j in self.queue if j.get("status") == "active"), None) pending = sum(1 for j in self.queue if j.get("status") == "pending") if not active: return f"Статус: активной задачи нет.\nВ очереди pending: {pending}" elapsed = int(time.time() - (self.active_job_started_at or time.time())) return ( f"Статус: активная задача #{active.get('num', '?')}\n" f"Тип: {active.get('type', 'text')}\n" f"Попытка: {int(active.get('attempts', 0)) + 1}/{self.cfg.max_retries}\n" f"Выполняется: {elapsed}с\n" f"Pending: {pending}" ) def _queue_text(self) -> str: with self.queue_lock: items = list(self.queue) if not items: return "Очередь пуста." lines = [f"Очередь: {len(items)}"] for i, job in enumerate(items[:10], start=1): lines.append( f"{i}) #{job.get('num', '?')} [{job.get('status')}] {job.get('type')} attempts={job.get('attempts', 0)}" ) if len(items) > 10: lines.append(f"...и ещё {len(items) - 10} задач") return "\n".join(lines) def _cancel_active_job(self, reason: str) -> bool: with self.queue_lock: active = next((j for j in self.queue if j.get("status") == "active"), None) if not active: return False self.stop_current_job = True self._stop_active_codex_process() self.queue = [j for j in self.queue if j.get("id") != active.get("id")] self._persist_queue() self._append_history_event("job_stopped_by_user", {"jobId": active.get("id"), "reason": reason}) return True def _cancel_by_id_prefix(self, prefix: str) -> bool: prefix = prefix.strip().lower() normalized_num = prefix.lstrip("#") with self.queue_lock: target = next( ( j for j in self.queue if str(j.get("id", "")).lower().startswith(prefix) or str(j.get("num", "")).lower() == normalized_num ), None ) if not target: return False if target.get("status") == "active": self.stop_current_job = True self._stop_active_codex_process() self.queue = [j for j in self.queue if j.get("id") != target.get("id")] self._persist_queue() return True def _worker_loop(self) -> None: while not self.stop_event.is_set(): job = None with self.queue_lock: for item in self.queue: if item.get("status") == "pending": item["status"] = "active" item["active_since"] = now_iso() item["updated_at"] = now_iso() self.active_job_id = item.get("id") self.active_job_started_at = time.time() job = dict(item) self._persist_queue() break if not job: time.sleep(0.5) continue self.stop_current_job = False self._process_job(job) self.active_job_id = None self.active_job_started_at = None def _process_job(self, job: dict[str, Any]) -> None: job_id = job["id"] job_num = job.get("num", "?") chat_id = int(job["chat_id"]) message_id = int(job["message_id"]) history_path = Path(job["history_file"]) self._safe_send(chat_id, f"Задача #{job_num} в работе.", reply_to=message_id) try: if job.get("type") == "voice": self._safe_send(chat_id, f"#{job_num}: распознаю голосовое...", reply_to=message_id) recognized = self._transcribe_voice_job(job) job["text"] = recognized self._append_history(history_path, "voice_transcription", {"jobId": job_id, "jobNum": job_num, "text": recognized}) preview = recognized.strip() if len(preview) > 1200: preview = preview[:1200] + " ...[обрезано]" self._safe_send(chat_id, f"#{job_num}: распознано:\n{preview}", reply_to=message_id) self._safe_send(chat_id, f"#{job_num}: распознано, отправляю в Codex.", reply_to=message_id) prompt = self._build_prompt(job) self._append_history(history_path, "codex_request", {"jobId": job_id, "prompt": prompt}) answer = self._run_codex(prompt, chat_id, message_id, job_id, job_num) for chunk in split_long_text(answer): self._safe_send(chat_id, chunk, reply_to=message_id) self._safe_send(chat_id, f"Готово #{job_num}.", reply_to=message_id) self._append_history(history_path, "codex_response", {"jobId": job_id, "text": answer}) self._mark_job_done(job_id) except Exception as e: if self.stop_current_job: self._append_history(history_path, "job_stopped", {"jobId": job_id, "reason": str(e)}) self._safe_send(chat_id, f"Задача #{job_num} остановлена.", reply_to=message_id) self._mark_job_removed(job_id) self.stop_current_job = False return self._handle_job_failure(job, e) def _build_prompt(self, job: dict[str, Any]) -> str: retry_block = "" retry_reason = (job.get("retry_reason") or "").strip() if retry_reason: retry_block = f"\n\nПометка retry: {retry_reason}" return ( "Пришло сообщение в Telegram.\n" f"Тип: {job.get('type')}\n" f"Источник Telegram: {job.get('update_type', 'message')}\n" f"Тип чата: {job.get('chat_type') or ''}\n" f"Канал/чат: @{job.get('chat_username') or ''} {job.get('chat_title') or ''}\n" f"Username отправителя: @{job.get('username')}\n" f"Подпись автора в Telegram: {job.get('author_signature') or ''}\n" "Текст для обработки:\n" f"{job.get('text')}\n\n" f"История диалога (JSONL): {job.get('history_file')}\n" f"Инструкции агента: {self.cfg.agent_instructions_file}\n" f"Работай в рабочем проекте аккуратно и верни только текст ответа пользователю.{retry_block}" ) def _run_codex(self, prompt: str, chat_id: int, message_id: int, job_id: str, job_num: Any) -> str: output_lines: list[str] = [] with tempfile.NamedTemporaryFile(prefix="shine-codex-last-message-", suffix=".txt", delete=False) as tmp: output_file = Path(tmp.name) cmd = [ str(self.cfg.codex_bin), "exec", "--dangerously-bypass-approvals-and-sandbox", "--json", "-C", str(self.cfg.codex_workdir), "-o", str(output_file), prompt, ] print(f"[py-bot] codex exec start job={job_id[:8]}", flush=True) process = subprocess.Popen( cmd, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, encoding="utf-8", errors="replace", bufsize=1, ) with self.active_process_lock: self.active_process = process self.last_heartbeat_at = 0.0 last_user_note = "" last_user_note_at = 0.0 codex_started_at = time.time() last_job_message_at = codex_started_at def on_line(line: str) -> None: nonlocal last_user_note, last_user_note_at, last_job_message_at output_lines.append(line) note = self._extract_codex_user_note(line) now = time.time() if note and note != last_user_note and now - last_user_note_at > 8: self._safe_send(chat_id, f"#{job_num}: {note}", reply_to=message_id) last_user_note = note last_user_note_at = now last_job_message_at = now reader_done = threading.Event() def reader() -> None: if not process.stdout: reader_done.set() return for line in process.stdout: on_line(line.rstrip("\n")) reader_done.set() t = threading.Thread(target=reader, name=f"codex-reader-{job_id[:8]}", daemon=True) t.start() try: deadline = time.time() + self.cfg.codex_timeout_seconds return_code = None while return_code is None: return_code = process.poll() now = time.time() if return_code is not None: break if now >= deadline: process.kill() t.join(timeout=2) raise RuntimeError(f"Codex timeout after {self.cfg.codex_timeout_seconds}s") if now - codex_started_at >= 120 and now - last_job_message_at >= 120: elapsed = self._format_duration(int(now - codex_started_at)) self._safe_send( chat_id, f"#{job_num}: задача ещё выполняется, работает уже {elapsed}. От Codex давно нет сообщений.", reply_to=message_id, ) last_job_message_at = now self.last_heartbeat_at = now time.sleep(1) finally: with self.active_process_lock: self.active_process = None reader_done.wait(timeout=2) if return_code != 0: tail = "\n".join(output_lines[-40:]) raise RuntimeError(f"Codex exited with code {return_code}. Output tail:\n{tail}") if output_file.exists(): answer = output_file.read_text(encoding="utf-8").strip() try: output_file.unlink(missing_ok=True) except Exception: pass if answer: return answer fallback = self._extract_fallback_message(output_lines) if not fallback: raise RuntimeError("Codex returned empty response") return fallback def _stop_active_codex_process(self) -> bool: with self.active_process_lock: process = self.active_process if process is None: return False if process.poll() is not None: return False process.terminate() try: process.wait(timeout=2) except subprocess.TimeoutExpired: process.kill() return True def _handle_job_failure(self, job: dict[str, Any], err: Exception) -> None: job_id = job["id"] job_num = job.get("num", "?") chat_id = int(job["chat_id"]) message_id = int(job["message_id"]) error_text = str(err).strip() or err.__class__.__name__ print(f"[py-bot] Ошибка job={job_id[:8]}: {error_text}", flush=True) with self.queue_lock: target = next((j for j in self.queue if j.get("id") == job_id), None) if not target: return attempts = int(target.get("attempts", 0)) + 1 target["attempts"] = attempts target["last_error"] = error_text[:1000] target["updated_at"] = now_iso() if attempts < self.cfg.max_retries: target["status"] = "pending" target["retry_reason"] = error_text[:200] self._persist_queue() will_retry = True else: self.queue = [j for j in self.queue if j.get("id") != job_id] self._persist_queue() will_retry = False if will_retry: self._safe_send(chat_id, f"Ошибка задачи #{job_num}, повтор: {attempts}/{self.cfg.max_retries}", reply_to=message_id) else: self._safe_send(chat_id, f"Ошибка задачи #{job_num}. Лимит попыток исчерпан.", reply_to=message_id) def _mark_job_done(self, job_id: str) -> None: with self.queue_lock: self.queue = [j for j in self.queue if j.get("id") != job_id] self._persist_queue() def _mark_job_removed(self, job_id: str) -> None: with self.queue_lock: self.queue = [j for j in self.queue if j.get("id") != job_id] self._persist_queue() def _safe_send(self, chat_id: int, text: str, reply_to: int | None = None) -> None: text = (text or "").strip() if not text: return if len(text) > 3900: text = text[:3900] + "\n...[обрезано]" resolved_chat_id = self._resolve_chat_id(chat_id) resolved_reply_to = reply_to if resolved_chat_id == chat_id else None try: self.telegram.send_message(resolved_chat_id, text, reply_to_message_id=resolved_reply_to) except Exception as e: migrate_to_chat_id = self._extract_migrate_to_chat_id(str(e)) if migrate_to_chat_id is not None: self._remember_chat_migration(resolved_chat_id, migrate_to_chat_id, "send_message_error") try: self.telegram.send_message(migrate_to_chat_id, text, reply_to_message_id=None) return except Exception as retry_error: print(f"[py-bot] sendMessage retry after migration error: {retry_error}", flush=True) return print(f"[py-bot] sendMessage error: {e}", flush=True) def _schedule_self_restart(self) -> None: if self.restart_requested: return self.restart_requested = True def restart() -> None: time.sleep(1.5) print("[py-bot] restart requested by Telegram command", flush=True) os._exit(0) threading.Thread(target=restart, name="shine-py-bot-self-restart", daemon=True).start() def _transcribe_voice_job(self, job: dict[str, Any]) -> str: if not self.cfg.openai_api_key: raise RuntimeError("Не задан OPENAI_API_KEY для распознавания voice") file_id = (job.get("telegram_file_id") or "").strip() if not file_id: raise RuntimeError("Пустой telegram_file_id") file_bytes, filename = self._download_telegram_file(file_id) text = self._openai_transcribe(file_bytes, filename).strip() if not text: raise RuntimeError("Распознавание вернуло пустой текст") return text def _download_telegram_file(self, file_id: str) -> tuple[bytes, str]: result = self.telegram.call("getFile", {"file_id": file_id}, timeout=60) info = result.get("result") or {} file_path = info.get("file_path") if not file_path: raise RuntimeError("Telegram getFile не вернул file_path") file_url = f"https://api.telegram.org/file/bot{self.cfg.telegram_bot_token}/{file_path}" req = request.Request(file_url, method="GET") with request.urlopen(req, timeout=120) as resp: data = resp.read() original_name = Path(file_path).name or "audio.ogg" lower = original_name.lower() # OpenAI transcription может не принимать расширение .oga, нормализуем в .ogg. if lower.endswith(".oga"): base = original_name[:-4] if len(original_name) > 4 else "audio" normalized = f"{base}.ogg" else: normalized = original_name return data, normalized def _openai_transcribe(self, file_bytes: bytes, filename: str) -> str: boundary = "----shine-boundary-" + "".join(random.choices("abcdef0123456789", k=16)) mime = mimetypes.guess_type(filename)[0] or "application/octet-stream" def text_part(name: str, value: str) -> bytes: return ( f"--{boundary}\r\n" f'Content-Disposition: form-data; name="{name}"\r\n\r\n' f"{value}\r\n" ).encode("utf-8") body = bytearray() body.extend(text_part("model", self.cfg.openai_transcribe_model)) body.extend(text_part("response_format", "text")) body.extend( ( f"--{boundary}\r\n" f'Content-Disposition: form-data; name="file"; filename="{filename}"\r\n' f"Content-Type: {mime}\r\n\r\n" ).encode("utf-8") ) body.extend(file_bytes) body.extend(b"\r\n") body.extend(f"--{boundary}--\r\n".encode("utf-8")) req = request.Request("https://api.openai.com/v1/audio/transcriptions", method="POST", data=bytes(body)) req.add_header("Authorization", f"Bearer {self.cfg.openai_api_key}") req.add_header("Content-Type", f"multipart/form-data; boundary={boundary}") try: with request.urlopen(req, timeout=240) as resp: return resp.read().decode("utf-8", errors="replace") except error.HTTPError as e: detail = e.read().decode("utf-8", errors="replace") raise RuntimeError(f"OpenAI transcribe HTTP {e.code}: {detail}") from e @staticmethod def _extract_codex_user_note(line: str) -> str | None: s = (line or "").strip() if not s.startswith("{"): return None try: obj = json.loads(s) except Exception: return None if obj.get("type") != "item.completed": return None item = obj.get("item") or {} if item.get("type") != "agent_message": return None text = (item.get("text") or "").strip() if not text: return None if len(text) > 220: return text[:220] + "..." return text @staticmethod def _extract_fallback_message(lines: list[str]) -> str: for line in reversed(lines): line = line.strip() if not line: continue if line.startswith("{") and '"type":' in line: continue if line.startswith("mcp:") or line.startswith("OpenAI Codex"): continue return line return "" @staticmethod def _format_duration(seconds: int) -> str: seconds = max(0, seconds) minutes, sec = divmod(seconds, 60) hours, minutes = divmod(minutes, 60) if hours: return f"{hours}ч {minutes}м {sec}с" if minutes: return f"{minutes}м {sec}с" return f"{sec}с" def run_selftest(config: BotConfig, prompt: str) -> int: cmd = [ str(config.codex_bin), "exec", "--dangerously-bypass-approvals-and-sandbox", "--json", "-C", str(config.codex_workdir), prompt, ] proc = subprocess.run( cmd, stdin=subprocess.DEVNULL, capture_output=True, text=True, encoding="utf-8", errors="replace", ) print(proc.stdout) if proc.stderr: print(proc.stderr) return proc.returncode def main() -> int: parser = argparse.ArgumentParser(description="SHiNE Python Telegram bot wrapper for Codex CLI") parser.add_argument("--selftest-codex", default="", help="Выполнить только codex exec с этим prompt и выйти") args = parser.parse_args() root = Path(__file__).resolve().parent cfg = BotConfig(root) if args.selftest_codex: return run_selftest(cfg, args.selftest_codex) service = ShinePyBotService(cfg) try: service.run() except KeyboardInterrupt: service.shutdown() return 0 except Exception as e: print(f"[py-bot] FATAL: {e}", flush=True) return 1 return 0 if __name__ == "__main__": raise SystemExit(main())