SHiNE-server/SHiNE-agent-bot-coder/py_bot_service.py

1153 lines
47 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
from __future__ import annotations
import argparse
import datetime as dt
import fcntl
import json
import mimetypes
import os
import random
import re
import string
import subprocess
import tempfile
import threading
import time
import uuid
from pathlib import Path
from typing import Any
from urllib import error, request
def now_iso() -> str:
return dt.datetime.now(dt.timezone.utc).isoformat()
def normalize_username(value: str | None) -> str:
if not value:
return ""
value = value.strip()
if value.startswith("@"):
value = value[1:]
return value.lower()
def split_long_text(text: str, chunk_size: int = 3500) -> list[str]:
text = (text or "").strip()
if not text:
return ["(пустой ответ)"]
return [text[i:i + chunk_size] for i in range(0, len(text), chunk_size)]
def read_env_file(path: Path) -> dict[str, str]:
result: dict[str, str] = {}
if not path.exists():
return result
for raw_line in path.read_text(encoding="utf-8").splitlines():
line = raw_line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, value = line.split("=", 1)
key = key.strip()
value = value.strip().strip('"').strip("'")
result[key] = value
return result
class JsonLineStore:
@staticmethod
def load(path: Path) -> list[dict[str, Any]]:
if not path.exists():
return []
items: list[dict[str, Any]] = []
for line in path.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line:
continue
items.append(json.loads(line))
return items
@staticmethod
def save(path: Path, items: list[dict[str, Any]]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(path.suffix + ".tmp")
with tmp.open("w", encoding="utf-8") as f:
for item in items:
f.write(json.dumps(item, ensure_ascii=False) + "\n")
tmp.replace(path)
@staticmethod
def append(path: Path, item: dict[str, Any]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("a", encoding="utf-8") as f:
f.write(json.dumps(item, ensure_ascii=False) + "\n")
class TelegramApi:
def __init__(self, token: str):
self.base = f"https://api.telegram.org/bot{token}/"
def call(self, method: str, payload: dict[str, Any] | None = None, timeout: int = 60) -> dict[str, Any]:
data = None
headers = {}
if payload is not None:
data = json.dumps(payload).encode("utf-8")
headers["Content-Type"] = "application/json"
req = request.Request(self.base + method, data=data, headers=headers, method="POST")
try:
with request.urlopen(req, timeout=timeout) as resp:
raw = resp.read().decode("utf-8")
except error.HTTPError as e:
body = e.read().decode("utf-8", errors="replace")
raise RuntimeError(f"Telegram HTTP {e.code}: {body}") from e
except Exception as e:
raise RuntimeError(f"Telegram request failed: {e}") from e
result = json.loads(raw)
if not result.get("ok"):
raise RuntimeError(f"Telegram API error: {result}")
return result
def get_updates(self, offset: int | None, timeout_sec: int) -> list[dict[str, Any]]:
payload: dict[str, Any] = {"timeout": timeout_sec, "allowed_updates": ["message", "channel_post"]}
if offset is not None:
payload["offset"] = offset
result = self.call("getUpdates", payload=payload, timeout=timeout_sec + 15)
return result.get("result", [])
def send_message(self, chat_id: int, text: str, reply_to_message_id: int | None = None) -> None:
payload: dict[str, Any] = {"chat_id": chat_id, "text": text}
if reply_to_message_id is not None:
payload["reply_to_message_id"] = reply_to_message_id
self.call("sendMessage", payload=payload, timeout=30)
def delete_webhook(self) -> None:
self.call("deleteWebhook", payload={"drop_pending_updates": False}, timeout=30)
class BotConfig:
def __init__(self, root_dir: Path):
env = dict(os.environ)
env.update(read_env_file(root_dir / ".env"))
self.root_dir = root_dir
self.telegram_bot_token = self._required(env, "TELEGRAM_BOT_TOKEN")
self.allowed_username = normalize_username(env.get("ALLOWED_TELEGRAM_USERNAME", "AidarKC"))
self.allowed_channel_username = normalize_username(env.get("ALLOWED_TELEGRAM_CHANNEL_USERNAME", "shine_writing"))
self.bot_username = env.get("BOT_USERNAME", "aidar_su_bot")
self.openai_api_key = env.get("OPENAI_API_KEY", "").strip()
self.openai_transcribe_model = env.get("OPENAI_TRANSCRIBE_MODEL", "gpt-4o-mini-transcribe")
self.codex_bin = Path(env.get(
"CODEX_BIN",
"/home/ai/.cache/JetBrains/IntelliJIdea2026.1/aia/codex/bin/codex-x86_64-unknown-linux-musl"
))
self.codex_workdir = Path(env.get("CODEX_WORKDIR", "/home/ai/work/SHiNE/SHiNE-server-sha256"))
self.codex_timeout_seconds = int(env.get("CODEX_TIMEOUT_SECONDS", "900"))
self.max_retries = max(1, int(env.get("MAX_RETRIES", "3")))
self.data_dir = (root_dir / env.get("DATA_DIR", "./data")).resolve()
self.agent_instructions_file = (root_dir / "AGENT.md").resolve()
@staticmethod
def _required(env: dict[str, str], key: str) -> str:
value = env.get(key, "").strip()
if not value:
raise RuntimeError(f"Не задан обязательный параметр: {key}")
return value
class ShinePyBotService:
def __init__(self, config: BotConfig):
self.cfg = config
self.telegram = TelegramApi(config.telegram_bot_token)
self.queue_file = config.data_dir / "py_queue.jsonl"
self.state_file = config.data_dir / "py_state.json"
self.processed_updates_file = config.data_dir / "py_processed_updates.log"
self.lock_file = config.data_dir / "py_app.lock"
self.history_dir = config.data_dir / "history"
self.history_archive_dir = self.history_dir / "archive"
self.max_processed_updates = 5000
self.queue_lock = threading.RLock()
self.stop_event = threading.Event()
self.worker = threading.Thread(target=self._worker_loop, name="shine-py-bot-worker", daemon=True)
self.queue: list[dict[str, Any]] = []
self.state: dict[str, Any] = {}
self.processed_updates: list[str] = []
self.active_job_id: str | None = None
self.active_job_started_at: float | None = None
self.active_process: subprocess.Popen[str] | None = None
self.active_process_lock = threading.Lock()
self.stop_current_job = False
self.lock_fd = None
self.last_heartbeat_at: float = 0.0
self.restart_requested = False
def run(self) -> None:
self._ensure_dirs()
self._acquire_single_instance_lock()
self._load_state()
self._load_queue()
self._load_processed_updates()
self._recover_active_jobs_after_restart()
self.telegram.delete_webhook()
self._init_offset_if_missing()
self.worker.start()
self._append_history_event("service_started", {"allowedUsername": self.cfg.allowed_username})
print(f"[py-bot] Запущен. allowed user: @{self.cfg.allowed_username}", flush=True)
try:
while not self.stop_event.is_set():
try:
offset = self.state.get("offset")
updates = self.telegram.get_updates(offset=offset, timeout_sec=25)
except Exception as e:
print(f"[py-bot] Ошибка getUpdates: {e}", flush=True)
time.sleep(2)
continue
for update in updates:
update_id = update.get("update_id")
if isinstance(update_id, int):
self.state["offset"] = update_id + 1
self._persist_state()
self._handle_update(update)
finally:
self.shutdown()
def shutdown(self) -> None:
if self.stop_event.is_set():
pass
self.stop_event.set()
self._stop_active_codex_process()
if self.worker.is_alive():
self.worker.join(timeout=10)
if self.lock_fd is not None:
try:
fcntl.flock(self.lock_fd, fcntl.LOCK_UN)
finally:
self.lock_fd.close()
self.lock_fd = None
self._append_history_event("service_stopped", {})
def _ensure_dirs(self) -> None:
self.cfg.data_dir.mkdir(parents=True, exist_ok=True)
self.history_dir.mkdir(parents=True, exist_ok=True)
self.history_archive_dir.mkdir(parents=True, exist_ok=True)
def _acquire_single_instance_lock(self) -> None:
self.lock_file.parent.mkdir(parents=True, exist_ok=True)
self.lock_fd = self.lock_file.open("a+")
try:
fcntl.flock(self.lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
except BlockingIOError:
raise RuntimeError(f"Уже запущен другой инстанс (lock: {self.lock_file})")
def _load_state(self) -> None:
if self.state_file.exists():
self.state = json.loads(self.state_file.read_text(encoding="utf-8"))
else:
self.state = {}
if not self.state.get("current_history_file"):
history_file = self._create_new_history_file("initial")
self.state["current_history_file"] = str(history_file)
if not isinstance(self.state.get("next_job_number"), int):
self.state["next_job_number"] = 1
self.state["updated_at"] = now_iso()
self._persist_state()
def _persist_state(self) -> None:
self.state["updated_at"] = now_iso()
tmp = self.state_file.with_suffix(".tmp")
tmp.write_text(json.dumps(self.state, ensure_ascii=False, indent=2), encoding="utf-8")
tmp.replace(self.state_file)
def _load_queue(self) -> None:
self.queue = JsonLineStore.load(self.queue_file)
def _persist_queue(self) -> None:
JsonLineStore.save(self.queue_file, self.queue)
def _load_processed_updates(self) -> None:
if not self.processed_updates_file.exists():
self.processed_updates = []
return
lines = [x.strip() for x in self.processed_updates_file.read_text(encoding="utf-8").splitlines() if x.strip()]
if len(lines) > self.max_processed_updates:
lines = lines[-self.max_processed_updates:]
self.processed_updates_file.write_text("\n".join(lines) + "\n", encoding="utf-8")
self.processed_updates = lines
def _mark_processed_update(self, update_key: str) -> bool:
if update_key in self.processed_updates:
return True
self.processed_updates.append(update_key)
if len(self.processed_updates) > self.max_processed_updates:
self.processed_updates = self.processed_updates[-self.max_processed_updates:]
self.processed_updates_file.write_text("\n".join(self.processed_updates) + "\n", encoding="utf-8")
else:
with self.processed_updates_file.open("a", encoding="utf-8") as f:
f.write(update_key + "\n")
return False
def _recover_active_jobs_after_restart(self) -> None:
recovered_ids: list[str] = []
for job in self.queue:
if job.get("status") == "active":
job["status"] = "pending"
job["retry_reason"] = "service_restart_recovery"
job["updated_at"] = now_iso()
recovered_ids.append(job.get("id", ""))
if recovered_ids:
self._persist_queue()
self._append_history_event("active_jobs_recovered", {"jobIds": recovered_ids})
def _init_offset_if_missing(self) -> None:
if self.state.get("offset") is not None:
return
try:
updates = self.telegram.get_updates(offset=None, timeout_sec=0)
if updates:
self.state["offset"] = int(updates[-1]["update_id"]) + 1
else:
self.state["offset"] = 0
self._persist_state()
except Exception as e:
print(f"[py-bot] Не удалось инициализировать offset: {e}", flush=True)
self.state["offset"] = 0
self._persist_state()
def _current_history_file(self) -> Path:
return Path(self.state["current_history_file"])
def _create_new_history_file(self, reason: str) -> Path:
ts = dt.datetime.now().strftime("%Y-%m-%d_%H%M%S")
rnd = "".join(random.choices(string.hexdigits.lower(), k=8))
path = self.history_dir / f"{ts}_{rnd}.jsonl"
JsonLineStore.append(path, {"ts": now_iso(), "type": "history_created", "reason": reason})
return path
def _rotate_history(self, reason: str, username: str) -> Path:
current = self._current_history_file()
if current.exists():
archived = self.history_archive_dir / current.name
current.replace(archived)
else:
archived = self.history_archive_dir / "(empty)"
new_file = self._create_new_history_file(reason)
self.state["current_history_file"] = str(new_file)
self._persist_state()
self._append_history_event("history_rotated", {"reason": reason, "username": username, "archived": str(archived)})
return archived
def _append_history(self, history_path: Path, event_type: str, payload: dict[str, Any]) -> None:
row = {"ts": now_iso(), "type": event_type}
row.update(payload)
JsonLineStore.append(history_path, row)
def _append_history_event(self, event_type: str, payload: dict[str, Any]) -> None:
history_path = self._current_history_file()
self._append_history(history_path, "system_event", {"event": event_type, **payload})
def _resolve_chat_id(self, chat_id: int) -> int:
migrations = self.state.get("chat_id_migrations")
if not isinstance(migrations, dict):
return chat_id
current = chat_id
visited: set[int] = set()
while current not in visited:
visited.add(current)
next_chat_id = migrations.get(str(current))
if not isinstance(next_chat_id, int):
break
current = next_chat_id
return current
def _remember_chat_migration(self, old_chat_id: int, new_chat_id: int, source: str) -> None:
if old_chat_id == new_chat_id:
return
migrations = self.state.get("chat_id_migrations")
if not isinstance(migrations, dict):
migrations = {}
self.state["chat_id_migrations"] = migrations
if migrations.get(str(old_chat_id)) == new_chat_id:
return
migrations[str(old_chat_id)] = new_chat_id
self._persist_state()
with self.queue_lock:
changed = False
for job in self.queue:
if job.get("chat_id") == old_chat_id:
job["chat_id"] = new_chat_id
job["updated_at"] = now_iso()
changed = True
if changed:
self._persist_queue()
self._append_history_event("chat_migrated_to_supergroup", {
"oldChatId": old_chat_id,
"newChatId": new_chat_id,
"source": source,
})
@staticmethod
def _extract_migrate_to_chat_id(error_text: str) -> int | None:
match = re.search(r'"migrate_to_chat_id"\s*:\s*(-?\d+)', error_text)
if match:
return int(match.group(1))
match = re.search(r"'migrate_to_chat_id'\s*:\s*(-?\d+)", error_text)
if match:
return int(match.group(1))
return None
def _handle_update(self, update: dict[str, Any]) -> None:
message = update.get("message")
update_type = "message"
if not isinstance(message, dict):
message = update.get("channel_post")
update_type = "channel_post"
if not isinstance(message, dict):
return
chat = message.get("chat") or {}
chat_id = chat.get("id")
message_id = message.get("message_id")
chat_type = str(chat.get("type") or "")
chat_username = normalize_username(chat.get("username"))
chat_title = str(chat.get("title") or "")
sender = message.get("from") or {}
username = normalize_username(sender.get("username"))
author_signature = str(message.get("author_signature") or "").strip()
author_username = username or normalize_username(author_signature)
if not isinstance(chat_id, int) or not isinstance(message_id, int):
return
migrate_to_chat_id = message.get("migrate_to_chat_id")
if isinstance(migrate_to_chat_id, int):
self._remember_chat_migration(chat_id, migrate_to_chat_id, "telegram_message")
return
update_key = f"{chat_id}:{message_id}"
if self._mark_processed_update(update_key):
return
is_channel_post = update_type == "channel_post" or chat_type == "channel"
is_group_message = update_type == "message" and chat_type in ("group", "supergroup")
is_allowed_channel = (
not is_channel_post
or not self.cfg.allowed_channel_username
or chat_username == self.cfg.allowed_channel_username
)
if is_channel_post and not is_allowed_channel:
return
text = (message.get("text") or message.get("caption") or "").strip()
history_path = self._current_history_file()
if author_username != self.cfg.allowed_username:
if is_channel_post or is_group_message:
self._append_history(history_path, "chat_context_message", {
"chatId": chat_id,
"messageId": message_id,
"updateType": update_type,
"chatType": chat_type,
"chatUsername": chat_username,
"chatTitle": chat_title,
"username": username,
"authorSignature": author_signature,
"text": text,
"hasVoice": bool(message.get("voice")),
"hasAudio": bool(message.get("audio")),
})
if is_group_message:
self._safe_send(chat_id, "Получил сообщение.", reply_to=message_id)
return
if not text:
if message.get("voice"):
self._enqueue_voice_job(
chat_id,
message_id,
author_username,
message["voice"].get("file_id"),
update_type=update_type,
chat_username=chat_username,
chat_title=chat_title,
author_signature=author_signature,
chat_type=chat_type,
)
return
if message.get("audio"):
self._enqueue_voice_job(
chat_id,
message_id,
author_username,
message["audio"].get("file_id"),
update_type=update_type,
chat_username=chat_username,
chat_title=chat_title,
author_signature=author_signature,
chat_type=chat_type,
)
return
self._safe_send(chat_id, "Поддерживаются текст, voice и audio.", reply_to=message_id)
return
if text.startswith("/"):
self._handle_command(chat_id, message_id, author_username, text)
return
self._append_history(history_path, "incoming_text", {
"chatId": chat_id,
"messageId": message_id,
"updateType": update_type,
"chatType": chat_type,
"chatUsername": chat_username,
"chatTitle": chat_title,
"username": author_username,
"authorSignature": author_signature,
"text": text,
})
job = self._build_job_base(chat_id, message_id, author_username, str(history_path))
job["type"] = "text"
job["text"] = text
job["update_type"] = update_type
job["chat_type"] = chat_type
job["chat_username"] = chat_username
job["chat_title"] = chat_title
job["author_signature"] = author_signature
with self.queue_lock:
self.queue.append(job)
self._persist_queue()
self._safe_send(chat_id, f"Принял задачу #{job['num']}", reply_to=message_id)
def _enqueue_voice_job(
self,
chat_id: int,
message_id: int,
username: str,
file_id: str | None,
*,
update_type: str = "message",
chat_username: str = "",
chat_title: str = "",
author_signature: str = "",
chat_type: str = "",
) -> None:
if not file_id:
self._safe_send(chat_id, "Не удалось прочитать file_id голосового.", reply_to=message_id)
return
history_path = self._current_history_file()
self._append_history(history_path, "incoming_voice", {
"chatId": chat_id,
"messageId": message_id,
"updateType": update_type,
"chatType": chat_type,
"chatUsername": chat_username,
"chatTitle": chat_title,
"username": username,
"authorSignature": author_signature,
"fileId": file_id,
})
job = self._build_job_base(chat_id, message_id, username, str(history_path))
job["type"] = "voice"
job["telegram_file_id"] = file_id
job["update_type"] = update_type
job["chat_type"] = chat_type
job["chat_username"] = chat_username
job["chat_title"] = chat_title
job["author_signature"] = author_signature
with self.queue_lock:
self.queue.append(job)
self._persist_queue()
self._safe_send(chat_id, f"Принял voice в задачу #{job['num']}", reply_to=message_id)
def _build_job_base(self, chat_id: int, message_id: int, username: str, history_file: str) -> dict[str, Any]:
with self.queue_lock:
num = int(self.state.get("next_job_number", 1))
self.state["next_job_number"] = num + 1
self._persist_state()
return {
"id": str(uuid.uuid4()),
"num": num,
"status": "pending",
"type": "text",
"chat_id": chat_id,
"message_id": message_id,
"username": username,
"update_type": "message",
"chat_type": "",
"chat_username": "",
"chat_title": "",
"author_signature": "",
"text": "",
"telegram_file_id": "",
"history_file": history_file,
"attempts": 0,
"retry_reason": "",
"last_error": "",
"created_at": now_iso(),
"updated_at": now_iso(),
"active_since": None,
}
def _handle_command(self, chat_id: int, message_id: int, username: str, text: str) -> None:
lower = text.lower()
if lower in ("/start", "/help"):
self._safe_send(chat_id, self._help_text(), reply_to=message_id)
return
if lower == "/status":
self._safe_send(chat_id, self._status_text(), reply_to=message_id)
return
if lower == "/queue":
self._safe_send(chat_id, self._queue_text(), reply_to=message_id)
return
if lower == "/new":
archived = self._rotate_history("command_new", username)
self._safe_send(chat_id, f"История очищена. Новый диалог начат.\nАрхив: {archived.name}", reply_to=message_id)
return
if lower in ("/restart_service", "/restart"):
self._append_history_event("restart_service_requested", {
"chatId": chat_id,
"messageId": message_id,
"username": username,
})
self._safe_send(
chat_id,
"Перезапускаю сервис. Если задача была активна, после старта она вернётся в очередь и продолжится.",
reply_to=message_id,
)
self._schedule_self_restart()
return
if lower == "/stop":
stopped = self._cancel_active_job("stopped_by_user")
if stopped:
self._safe_send(chat_id, "Текущая задача остановлена и удалена из очереди.", reply_to=message_id)
else:
self._safe_send(chat_id, "Сейчас нет активной задачи.", reply_to=message_id)
return
if lower.startswith("/cancel"):
parts = text.split(maxsplit=1)
if len(parts) < 2:
self._safe_send(chat_id, "Использование: /cancel <id|all>", reply_to=message_id)
return
arg = parts[1].strip()
if arg.lower() == "all":
with self.queue_lock:
self.stop_current_job = True
self._stop_active_codex_process()
count = len(self.queue)
self.queue = []
self._persist_queue()
self._safe_send(chat_id, f"Удалено задач из очереди: {count}", reply_to=message_id)
return
cancelled = self._cancel_by_id_prefix(arg)
self._safe_send(chat_id, f"Задача удалена: {arg}" if cancelled else f"Задача не найдена: {arg}", reply_to=message_id)
return
def _help_text(self) -> str:
return (
"Доступные команды:\n"
"/status — активная задача и размер очереди\n"
"/queue — список задач в очереди\n"
"/stop — остановить текущую задачу\n"
"/cancel <id|all> — удалить задачу по id (префикс) или все\n"
"/new — архивировать историю и начать новую\n"
"/restart_service — перезапустить сервис через systemd\n"
"/help — эта справка"
)
def _status_text(self) -> str:
with self.queue_lock:
active = next((j for j in self.queue if j.get("status") == "active"), None)
pending = sum(1 for j in self.queue if j.get("status") == "pending")
if not active:
return f"Статус: активной задачи нет.\nВ очереди pending: {pending}"
elapsed = int(time.time() - (self.active_job_started_at or time.time()))
return (
f"Статус: активная задача #{active.get('num', '?')}\n"
f"Тип: {active.get('type', 'text')}\n"
f"Попытка: {int(active.get('attempts', 0)) + 1}/{self.cfg.max_retries}\n"
f"Выполняется: {elapsed}с\n"
f"Pending: {pending}"
)
def _queue_text(self) -> str:
with self.queue_lock:
items = list(self.queue)
if not items:
return "Очередь пуста."
lines = [f"Очередь: {len(items)}"]
for i, job in enumerate(items[:10], start=1):
lines.append(
f"{i}) #{job.get('num', '?')} [{job.get('status')}] {job.get('type')} attempts={job.get('attempts', 0)}"
)
if len(items) > 10:
lines.append(f"...и ещё {len(items) - 10} задач")
return "\n".join(lines)
def _cancel_active_job(self, reason: str) -> bool:
with self.queue_lock:
active = next((j for j in self.queue if j.get("status") == "active"), None)
if not active:
return False
self.stop_current_job = True
self._stop_active_codex_process()
self.queue = [j for j in self.queue if j.get("id") != active.get("id")]
self._persist_queue()
self._append_history_event("job_stopped_by_user", {"jobId": active.get("id"), "reason": reason})
return True
def _cancel_by_id_prefix(self, prefix: str) -> bool:
prefix = prefix.strip().lower()
normalized_num = prefix.lstrip("#")
with self.queue_lock:
target = next(
(
j for j in self.queue
if str(j.get("id", "")).lower().startswith(prefix)
or str(j.get("num", "")).lower() == normalized_num
),
None
)
if not target:
return False
if target.get("status") == "active":
self.stop_current_job = True
self._stop_active_codex_process()
self.queue = [j for j in self.queue if j.get("id") != target.get("id")]
self._persist_queue()
return True
def _worker_loop(self) -> None:
while not self.stop_event.is_set():
job = None
with self.queue_lock:
for item in self.queue:
if item.get("status") == "pending":
item["status"] = "active"
item["active_since"] = now_iso()
item["updated_at"] = now_iso()
self.active_job_id = item.get("id")
self.active_job_started_at = time.time()
job = dict(item)
self._persist_queue()
break
if not job:
time.sleep(0.5)
continue
self.stop_current_job = False
self._process_job(job)
self.active_job_id = None
self.active_job_started_at = None
def _process_job(self, job: dict[str, Any]) -> None:
job_id = job["id"]
job_num = job.get("num", "?")
chat_id = int(job["chat_id"])
message_id = int(job["message_id"])
history_path = Path(job["history_file"])
self._safe_send(chat_id, f"Задача #{job_num} в работе.", reply_to=message_id)
try:
if job.get("type") == "voice":
self._safe_send(chat_id, f"#{job_num}: распознаю голосовое...", reply_to=message_id)
recognized = self._transcribe_voice_job(job)
job["text"] = recognized
self._append_history(history_path, "voice_transcription", {"jobId": job_id, "jobNum": job_num, "text": recognized})
preview = recognized.strip()
if len(preview) > 1200:
preview = preview[:1200] + " ...[обрезано]"
self._safe_send(chat_id, f"#{job_num}: распознано:\n{preview}", reply_to=message_id)
self._safe_send(chat_id, f"#{job_num}: распознано, отправляю в Codex.", reply_to=message_id)
prompt = self._build_prompt(job)
self._append_history(history_path, "codex_request", {"jobId": job_id, "prompt": prompt})
answer = self._run_codex(prompt, chat_id, message_id, job_id, job_num)
for chunk in split_long_text(answer):
self._safe_send(chat_id, chunk, reply_to=message_id)
self._safe_send(chat_id, f"Готово #{job_num}.", reply_to=message_id)
self._append_history(history_path, "codex_response", {"jobId": job_id, "text": answer})
self._mark_job_done(job_id)
except Exception as e:
if self.stop_current_job:
self._append_history(history_path, "job_stopped", {"jobId": job_id, "reason": str(e)})
self._safe_send(chat_id, f"Задача #{job_num} остановлена.", reply_to=message_id)
self._mark_job_removed(job_id)
self.stop_current_job = False
return
self._handle_job_failure(job, e)
def _build_prompt(self, job: dict[str, Any]) -> str:
retry_block = ""
retry_reason = (job.get("retry_reason") or "").strip()
if retry_reason:
retry_block = f"\n\nПометка retry: {retry_reason}"
return (
"Пришло сообщение в Telegram.\n"
f"Тип: {job.get('type')}\n"
f"Источник Telegram: {job.get('update_type', 'message')}\n"
f"Тип чата: {job.get('chat_type') or ''}\n"
f"Канал/чат: @{job.get('chat_username') or ''} {job.get('chat_title') or ''}\n"
f"Username отправителя: @{job.get('username')}\n"
f"Подпись автора в Telegram: {job.get('author_signature') or ''}\n"
"Текст для обработки:\n"
f"{job.get('text')}\n\n"
f"История диалога (JSONL): {job.get('history_file')}\n"
f"Инструкции агента: {self.cfg.agent_instructions_file}\n"
f"Работай в рабочем проекте аккуратно и верни только текст ответа пользователю.{retry_block}"
)
def _run_codex(self, prompt: str, chat_id: int, message_id: int, job_id: str, job_num: Any) -> str:
output_lines: list[str] = []
with tempfile.NamedTemporaryFile(prefix="shine-codex-last-message-", suffix=".txt", delete=False) as tmp:
output_file = Path(tmp.name)
cmd = [
str(self.cfg.codex_bin),
"exec",
"--dangerously-bypass-approvals-and-sandbox",
"--json",
"-C", str(self.cfg.codex_workdir),
"-o", str(output_file),
prompt,
]
print(f"[py-bot] codex exec start job={job_id[:8]}", flush=True)
process = subprocess.Popen(
cmd,
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
encoding="utf-8",
errors="replace",
bufsize=1,
)
with self.active_process_lock:
self.active_process = process
self.last_heartbeat_at = 0.0
last_user_note = ""
last_user_note_at = 0.0
codex_started_at = time.time()
last_job_message_at = codex_started_at
def on_line(line: str) -> None:
nonlocal last_user_note, last_user_note_at, last_job_message_at
output_lines.append(line)
note = self._extract_codex_user_note(line)
now = time.time()
if note and note != last_user_note and now - last_user_note_at > 8:
self._safe_send(chat_id, f"#{job_num}: {note}", reply_to=message_id)
last_user_note = note
last_user_note_at = now
last_job_message_at = now
reader_done = threading.Event()
def reader() -> None:
if not process.stdout:
reader_done.set()
return
for line in process.stdout:
on_line(line.rstrip("\n"))
reader_done.set()
t = threading.Thread(target=reader, name=f"codex-reader-{job_id[:8]}", daemon=True)
t.start()
try:
deadline = time.time() + self.cfg.codex_timeout_seconds
return_code = None
while return_code is None:
return_code = process.poll()
now = time.time()
if return_code is not None:
break
if now >= deadline:
process.kill()
t.join(timeout=2)
raise RuntimeError(f"Codex timeout after {self.cfg.codex_timeout_seconds}s")
if now - codex_started_at >= 120 and now - last_job_message_at >= 120:
elapsed = self._format_duration(int(now - codex_started_at))
self._safe_send(
chat_id,
f"#{job_num}: задача ещё выполняется, работает уже {elapsed}. От Codex давно нет сообщений.",
reply_to=message_id,
)
last_job_message_at = now
self.last_heartbeat_at = now
time.sleep(1)
finally:
with self.active_process_lock:
self.active_process = None
reader_done.wait(timeout=2)
if return_code != 0:
tail = "\n".join(output_lines[-40:])
raise RuntimeError(f"Codex exited with code {return_code}. Output tail:\n{tail}")
if output_file.exists():
answer = output_file.read_text(encoding="utf-8").strip()
try:
output_file.unlink(missing_ok=True)
except Exception:
pass
if answer:
return answer
fallback = self._extract_fallback_message(output_lines)
if not fallback:
raise RuntimeError("Codex returned empty response")
return fallback
def _stop_active_codex_process(self) -> bool:
with self.active_process_lock:
process = self.active_process
if process is None:
return False
if process.poll() is not None:
return False
process.terminate()
try:
process.wait(timeout=2)
except subprocess.TimeoutExpired:
process.kill()
return True
def _handle_job_failure(self, job: dict[str, Any], err: Exception) -> None:
job_id = job["id"]
job_num = job.get("num", "?")
chat_id = int(job["chat_id"])
message_id = int(job["message_id"])
error_text = str(err).strip() or err.__class__.__name__
print(f"[py-bot] Ошибка job={job_id[:8]}: {error_text}", flush=True)
with self.queue_lock:
target = next((j for j in self.queue if j.get("id") == job_id), None)
if not target:
return
attempts = int(target.get("attempts", 0)) + 1
target["attempts"] = attempts
target["last_error"] = error_text[:1000]
target["updated_at"] = now_iso()
if attempts < self.cfg.max_retries:
target["status"] = "pending"
target["retry_reason"] = error_text[:200]
self._persist_queue()
will_retry = True
else:
self.queue = [j for j in self.queue if j.get("id") != job_id]
self._persist_queue()
will_retry = False
if will_retry:
self._safe_send(chat_id, f"Ошибка задачи #{job_num}, повтор: {attempts}/{self.cfg.max_retries}", reply_to=message_id)
else:
self._safe_send(chat_id, f"Ошибка задачи #{job_num}. Лимит попыток исчерпан.", reply_to=message_id)
def _mark_job_done(self, job_id: str) -> None:
with self.queue_lock:
self.queue = [j for j in self.queue if j.get("id") != job_id]
self._persist_queue()
def _mark_job_removed(self, job_id: str) -> None:
with self.queue_lock:
self.queue = [j for j in self.queue if j.get("id") != job_id]
self._persist_queue()
def _safe_send(self, chat_id: int, text: str, reply_to: int | None = None) -> None:
text = (text or "").strip()
if not text:
return
if len(text) > 3900:
text = text[:3900] + "\n...[обрезано]"
resolved_chat_id = self._resolve_chat_id(chat_id)
resolved_reply_to = reply_to if resolved_chat_id == chat_id else None
try:
self.telegram.send_message(resolved_chat_id, text, reply_to_message_id=resolved_reply_to)
except Exception as e:
migrate_to_chat_id = self._extract_migrate_to_chat_id(str(e))
if migrate_to_chat_id is not None:
self._remember_chat_migration(resolved_chat_id, migrate_to_chat_id, "send_message_error")
try:
self.telegram.send_message(migrate_to_chat_id, text, reply_to_message_id=None)
return
except Exception as retry_error:
print(f"[py-bot] sendMessage retry after migration error: {retry_error}", flush=True)
return
print(f"[py-bot] sendMessage error: {e}", flush=True)
def _schedule_self_restart(self) -> None:
if self.restart_requested:
return
self.restart_requested = True
def restart() -> None:
time.sleep(1.5)
print("[py-bot] restart requested by Telegram command", flush=True)
os._exit(0)
threading.Thread(target=restart, name="shine-py-bot-self-restart", daemon=True).start()
def _transcribe_voice_job(self, job: dict[str, Any]) -> str:
if not self.cfg.openai_api_key:
raise RuntimeError("Не задан OPENAI_API_KEY для распознавания voice")
file_id = (job.get("telegram_file_id") or "").strip()
if not file_id:
raise RuntimeError("Пустой telegram_file_id")
file_bytes, filename = self._download_telegram_file(file_id)
text = self._openai_transcribe(file_bytes, filename).strip()
if not text:
raise RuntimeError("Распознавание вернуло пустой текст")
return text
def _download_telegram_file(self, file_id: str) -> tuple[bytes, str]:
result = self.telegram.call("getFile", {"file_id": file_id}, timeout=60)
info = result.get("result") or {}
file_path = info.get("file_path")
if not file_path:
raise RuntimeError("Telegram getFile не вернул file_path")
file_url = f"https://api.telegram.org/file/bot{self.cfg.telegram_bot_token}/{file_path}"
req = request.Request(file_url, method="GET")
with request.urlopen(req, timeout=120) as resp:
data = resp.read()
original_name = Path(file_path).name or "audio.ogg"
lower = original_name.lower()
# OpenAI transcription может не принимать расширение .oga, нормализуем в .ogg.
if lower.endswith(".oga"):
base = original_name[:-4] if len(original_name) > 4 else "audio"
normalized = f"{base}.ogg"
else:
normalized = original_name
return data, normalized
def _openai_transcribe(self, file_bytes: bytes, filename: str) -> str:
boundary = "----shine-boundary-" + "".join(random.choices("abcdef0123456789", k=16))
mime = mimetypes.guess_type(filename)[0] or "application/octet-stream"
def text_part(name: str, value: str) -> bytes:
return (
f"--{boundary}\r\n"
f'Content-Disposition: form-data; name="{name}"\r\n\r\n'
f"{value}\r\n"
).encode("utf-8")
body = bytearray()
body.extend(text_part("model", self.cfg.openai_transcribe_model))
body.extend(text_part("response_format", "text"))
body.extend(
(
f"--{boundary}\r\n"
f'Content-Disposition: form-data; name="file"; filename="{filename}"\r\n'
f"Content-Type: {mime}\r\n\r\n"
).encode("utf-8")
)
body.extend(file_bytes)
body.extend(b"\r\n")
body.extend(f"--{boundary}--\r\n".encode("utf-8"))
req = request.Request("https://api.openai.com/v1/audio/transcriptions", method="POST", data=bytes(body))
req.add_header("Authorization", f"Bearer {self.cfg.openai_api_key}")
req.add_header("Content-Type", f"multipart/form-data; boundary={boundary}")
try:
with request.urlopen(req, timeout=240) as resp:
return resp.read().decode("utf-8", errors="replace")
except error.HTTPError as e:
detail = e.read().decode("utf-8", errors="replace")
raise RuntimeError(f"OpenAI transcribe HTTP {e.code}: {detail}") from e
@staticmethod
def _extract_codex_user_note(line: str) -> str | None:
s = (line or "").strip()
if not s.startswith("{"):
return None
try:
obj = json.loads(s)
except Exception:
return None
if obj.get("type") != "item.completed":
return None
item = obj.get("item") or {}
if item.get("type") != "agent_message":
return None
text = (item.get("text") or "").strip()
if not text:
return None
if len(text) > 220:
return text[:220] + "..."
return text
@staticmethod
def _extract_fallback_message(lines: list[str]) -> str:
for line in reversed(lines):
line = line.strip()
if not line:
continue
if line.startswith("{") and '"type":' in line:
continue
if line.startswith("mcp:") or line.startswith("OpenAI Codex"):
continue
return line
return ""
@staticmethod
def _format_duration(seconds: int) -> str:
seconds = max(0, seconds)
minutes, sec = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60)
if hours:
return f"{hours}ч {minutes}м {sec}с"
if minutes:
return f"{minutes}м {sec}с"
return f"{sec}с"
def run_selftest(config: BotConfig, prompt: str) -> int:
cmd = [
str(config.codex_bin),
"exec",
"--dangerously-bypass-approvals-and-sandbox",
"--json",
"-C", str(config.codex_workdir),
prompt,
]
proc = subprocess.run(
cmd,
stdin=subprocess.DEVNULL,
capture_output=True,
text=True,
encoding="utf-8",
errors="replace",
)
print(proc.stdout)
if proc.stderr:
print(proc.stderr)
return proc.returncode
def main() -> int:
parser = argparse.ArgumentParser(description="SHiNE Python Telegram bot wrapper for Codex CLI")
parser.add_argument("--selftest-codex", default="", help="Выполнить только codex exec с этим prompt и выйти")
args = parser.parse_args()
root = Path(__file__).resolve().parent
cfg = BotConfig(root)
if args.selftest_codex:
return run_selftest(cfg, args.selftest_codex)
service = ShinePyBotService(cfg)
try:
service.run()
except KeyboardInterrupt:
service.shutdown()
return 0
except Exception as e:
print(f"[py-bot] FATAL: {e}", flush=True)
return 1
return 0
if __name__ == "__main__":
raise SystemExit(main())