SHiNE-server/SHiNE-agent-bot-coder/py_bot_service.py

1878 lines
81 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
from __future__ import annotations
import argparse
import datetime as dt
import fcntl
import json
import mimetypes
import os
import random
import re
import string
import subprocess
import tempfile
import threading
import time
import traceback
import uuid
from pathlib import Path
from typing import Any
from urllib import error, request
DEFAULT_ALLOWED_PLAYERS = ",".join([
"malvviiina:Милана",
"zodiaktechnika32:Сергей",
"oidasyda:Иван",
"blackbyrd1:Ворон",
"dimasol1:Дима",
])
def now_iso() -> str:
return dt.datetime.now(dt.timezone.utc).isoformat()
def normalize_username(value: str | None) -> str:
if not value:
return ""
value = value.strip()
if value.startswith("@"):
value = value[1:]
return value.lower()
def parse_allowed_players(raw: str) -> dict[str, str]:
players: dict[str, str] = {}
for item in (raw or "").split(","):
part = item.strip()
if not part:
continue
username_part, sep, name_part = part.partition(":")
username = normalize_username(username_part)
if not username:
continue
player_name = (name_part if sep else username_part).strip() or username
players[username] = player_name
return players
def split_long_text(text: str, chunk_size: int = 3500) -> list[str]:
text = (text or "").strip()
if not text:
return ["(пустой ответ)"]
return [text[i:i + chunk_size] for i in range(0, len(text), chunk_size)]
def split_text_for_tts(text: str, chunk_size: int) -> list[str]:
text = (text or "").strip()
if not text:
return []
chunks: list[str] = []
current = ""
paragraphs = re.split(r"\n\s*\n", text)
for paragraph in paragraphs:
paragraph = paragraph.strip()
if not paragraph:
continue
if len(paragraph) > chunk_size:
if current:
chunks.append(current)
current = ""
for i in range(0, len(paragraph), chunk_size):
part = paragraph[i:i + chunk_size].strip()
if part:
chunks.append(part)
continue
candidate = paragraph if not current else f"{current}\n\n{paragraph}"
if len(candidate) <= chunk_size:
current = candidate
else:
if current:
chunks.append(current)
current = paragraph
if current:
chunks.append(current)
return chunks
def read_env_file(path: Path) -> dict[str, str]:
result: dict[str, str] = {}
if not path.exists():
return result
for raw_line in path.read_text(encoding="utf-8").splitlines():
line = raw_line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, value = line.split("=", 1)
key = key.strip()
value = value.strip().strip('"').strip("'")
result[key] = value
return result
class VoiceTranscriptionError(RuntimeError):
def __init__(
self,
user_message: str,
*,
stage: str,
retryable: bool = True,
detail: str = "",
):
super().__init__(user_message)
self.user_message = user_message
self.stage = stage
self.retryable = retryable
self.detail = detail
def log_text(self) -> str:
if self.detail and self.detail != self.user_message:
return f"{self.user_message} stage={self.stage} retryable={self.retryable} detail={self.detail}"
return f"{self.user_message} stage={self.stage} retryable={self.retryable}"
class VoiceReplyError(RuntimeError):
pass
class JsonLineStore:
@staticmethod
def load(path: Path) -> list[dict[str, Any]]:
if not path.exists():
return []
items: list[dict[str, Any]] = []
for line in path.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line:
continue
items.append(json.loads(line))
return items
@staticmethod
def save(path: Path, items: list[dict[str, Any]]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(path.suffix + ".tmp")
with tmp.open("w", encoding="utf-8") as f:
for item in items:
f.write(json.dumps(item, ensure_ascii=False) + "\n")
tmp.replace(path)
@staticmethod
def append(path: Path, item: dict[str, Any]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("a", encoding="utf-8") as f:
f.write(json.dumps(item, ensure_ascii=False) + "\n")
class TelegramApi:
def __init__(self, token: str):
self.base = f"https://api.telegram.org/bot{token}/"
def call(self, method: str, payload: dict[str, Any] | None = None, timeout: int = 60) -> dict[str, Any]:
data = None
headers = {}
if payload is not None:
data = json.dumps(payload).encode("utf-8")
headers["Content-Type"] = "application/json"
req = request.Request(self.base + method, data=data, headers=headers, method="POST")
try:
with request.urlopen(req, timeout=timeout) as resp:
raw = resp.read().decode("utf-8")
except error.HTTPError as e:
body = e.read().decode("utf-8", errors="replace")
raise RuntimeError(f"Telegram HTTP {e.code}: {body}") from e
except Exception as e:
raise RuntimeError(f"Telegram request failed: {e}") from e
result = json.loads(raw)
if not result.get("ok"):
raise RuntimeError(f"Telegram API error: {result}")
return result
def call_multipart(
self,
method: str,
fields: dict[str, Any],
files: dict[str, tuple[str, bytes, str]],
timeout: int = 120,
) -> dict[str, Any]:
boundary = "----shine-tg-boundary-" + "".join(random.choices("abcdef0123456789", k=16))
body = bytearray()
for name, value in fields.items():
if value is None:
continue
body.extend(
(
f"--{boundary}\r\n"
f'Content-Disposition: form-data; name="{name}"\r\n\r\n'
f"{value}\r\n"
).encode("utf-8")
)
for name, (filename, data, mime) in files.items():
body.extend(
(
f"--{boundary}\r\n"
f'Content-Disposition: form-data; name="{name}"; filename="{filename}"\r\n'
f"Content-Type: {mime}\r\n\r\n"
).encode("utf-8")
)
body.extend(data)
body.extend(b"\r\n")
body.extend(f"--{boundary}--\r\n".encode("utf-8"))
req = request.Request(self.base + method, data=bytes(body), method="POST")
req.add_header("Content-Type", f"multipart/form-data; boundary={boundary}")
try:
with request.urlopen(req, timeout=timeout) as resp:
raw = resp.read().decode("utf-8")
except error.HTTPError as e:
body_text = e.read().decode("utf-8", errors="replace")
raise RuntimeError(f"Telegram HTTP {e.code}: {body_text}") from e
except Exception as e:
raise RuntimeError(f"Telegram multipart request failed: {e}") from e
result = json.loads(raw)
if not result.get("ok"):
raise RuntimeError(f"Telegram API error: {result}")
return result
def get_updates(self, offset: int | None, timeout_sec: int) -> list[dict[str, Any]]:
payload: dict[str, Any] = {"timeout": timeout_sec, "allowed_updates": ["message", "channel_post"]}
if offset is not None:
payload["offset"] = offset
result = self.call("getUpdates", payload=payload, timeout=timeout_sec + 15)
return result.get("result", [])
def send_message(self, chat_id: int | str, text: str, reply_to_message_id: int | None = None) -> dict[str, Any]:
payload: dict[str, Any] = {"chat_id": chat_id, "text": text}
if reply_to_message_id is not None:
payload["reply_to_message_id"] = reply_to_message_id
return self.call("sendMessage", payload=payload, timeout=30)
def send_voice(
self,
chat_id: int | str,
voice: str,
caption: str = "",
reply_to_message_id: int | None = None,
) -> dict[str, Any]:
payload: dict[str, Any] = {"chat_id": chat_id, "voice": voice}
if caption:
payload["caption"] = caption
if reply_to_message_id is not None:
payload["reply_to_message_id"] = reply_to_message_id
return self.call("sendVoice", payload=payload, timeout=60)
def send_audio(
self,
chat_id: int | str,
audio: str,
caption: str = "",
reply_to_message_id: int | None = None,
) -> dict[str, Any]:
payload: dict[str, Any] = {"chat_id": chat_id, "audio": audio}
if caption:
payload["caption"] = caption
if reply_to_message_id is not None:
payload["reply_to_message_id"] = reply_to_message_id
return self.call("sendAudio", payload=payload, timeout=60)
def send_voice_upload(
self,
chat_id: int | str,
voice_bytes: bytes,
filename: str,
caption: str = "",
reply_to_message_id: int | None = None,
) -> dict[str, Any]:
fields: dict[str, Any] = {"chat_id": chat_id}
if caption:
fields["caption"] = caption
if reply_to_message_id is not None:
fields["reply_to_message_id"] = reply_to_message_id
return self.call_multipart(
"sendVoice",
fields=fields,
files={"voice": (filename, voice_bytes, "audio/ogg")},
timeout=180,
)
def delete_webhook(self) -> None:
self.call("deleteWebhook", payload={"drop_pending_updates": False}, timeout=30)
class BotConfig:
def __init__(self, root_dir: Path):
env = dict(os.environ)
env.update(read_env_file(root_dir / ".env"))
self.root_dir = root_dir
self.telegram_bot_token = self._required(env, "TELEGRAM_BOT_TOKEN")
self.allowed_username = normalize_username(env.get("ALLOWED_TELEGRAM_USERNAME", "AidarKC"))
self.allowed_players = parse_allowed_players(env.get("ALLOWED_TELEGRAM_PLAYERS", DEFAULT_ALLOWED_PLAYERS))
self.allowed_channel_username = normalize_username(env.get("ALLOWED_TELEGRAM_CHANNEL_USERNAME", "shine_writing"))
self.bot_username = env.get("BOT_USERNAME", "aidar_su_bot")
self.openai_api_key = env.get("OPENAI_API_KEY", "").strip()
self.openai_transcribe_model = env.get("OPENAI_TRANSCRIBE_MODEL", "gpt-4o-mini-transcribe")
self.telegram_file_download_timeout_seconds = int(env.get("TELEGRAM_FILE_DOWNLOAD_TIMEOUT_SECONDS", "300"))
self.openai_transcribe_timeout_seconds = int(env.get("OPENAI_TRANSCRIBE_TIMEOUT_SECONDS", "900"))
self.openai_tts_model = env.get("OPENAI_TTS_MODEL", "gpt-4o-mini-tts")
self.openai_tts_voice = env.get("OPENAI_TTS_VOICE", "alloy")
self.openai_tts_response_format = env.get("OPENAI_TTS_RESPONSE_FORMAT", "opus")
self.openai_tts_timeout_seconds = int(env.get("OPENAI_TTS_TIMEOUT_SECONDS", "180"))
self.openai_tts_chunk_chars = max(500, int(env.get("OPENAI_TTS_CHUNK_CHARS", "3500")))
self.codex_bin = Path(env.get(
"CODEX_BIN",
"/home/ai/.cache/JetBrains/IntelliJIdea2026.1/aia/codex/bin/codex-x86_64-unknown-linux-musl"
))
self.codex_workdir = Path(env.get("CODEX_WORKDIR", "/home/ai/work/SHiNE/SHiNE-server-sha256"))
self.codex_timeout_seconds = int(env.get("CODEX_TIMEOUT_SECONDS", "900"))
self.max_retries = max(1, int(env.get("MAX_RETRIES", "3")))
self.data_dir = (root_dir / env.get("DATA_DIR", "./data")).resolve()
self.agent_instructions_file = (root_dir / "AGENT.md").resolve()
@staticmethod
def _required(env: dict[str, str], key: str) -> str:
value = env.get(key, "").strip()
if not value:
raise RuntimeError(f"Не задан обязательный параметр: {key}")
return value
class ShinePyBotService:
def __init__(self, config: BotConfig):
self.cfg = config
self.telegram = TelegramApi(config.telegram_bot_token)
self.queue_file = config.data_dir / "py_queue.jsonl"
self.state_file = config.data_dir / "py_state.json"
self.processed_updates_file = config.data_dir / "py_processed_updates.log"
self.lock_file = config.data_dir / "py_app.lock"
self.history_dir = config.data_dir / "history"
self.history_archive_dir = self.history_dir / "archive"
self.max_processed_updates = 5000
self.queue_lock = threading.RLock()
self.stop_event = threading.Event()
self.worker = threading.Thread(target=self._worker_loop, name="shine-py-bot-worker", daemon=True)
self.queue: list[dict[str, Any]] = []
self.state: dict[str, Any] = {}
self.processed_updates: list[str] = []
self.active_job_id: str | None = None
self.active_job_started_at: float | None = None
self.active_process: subprocess.Popen[str] | None = None
self.active_process_lock = threading.Lock()
self.stop_current_job = False
self.lock_fd = None
self.last_heartbeat_at: float = 0.0
self.restart_requested = False
def _is_owner(self, username: str) -> bool:
return normalize_username(username) == self.cfg.allowed_username
def _is_allowed_player(self, username: str) -> bool:
return normalize_username(username) in self.cfg.allowed_players
def _is_allowed_user(self, username: str) -> bool:
uname = normalize_username(username)
return uname == self.cfg.allowed_username or uname in self.cfg.allowed_players
def _player_name(self, username: str) -> str:
uname = normalize_username(username)
return self.cfg.allowed_players.get(uname, uname)
def run(self) -> None:
self._ensure_dirs()
self._acquire_single_instance_lock()
self._load_state()
self._load_queue()
self._load_processed_updates()
self._recover_active_jobs_after_restart()
self.telegram.delete_webhook()
self._init_offset_if_missing()
self.worker.start()
self._append_history_event("service_started", {"allowedUsername": self.cfg.allowed_username})
print(f"[py-bot] Запущен. allowed user: @{self.cfg.allowed_username}", flush=True)
try:
while not self.stop_event.is_set():
try:
offset = self.state.get("offset")
updates = self.telegram.get_updates(offset=offset, timeout_sec=25)
except Exception as e:
print(f"[py-bot] Ошибка getUpdates: {e}", flush=True)
time.sleep(2)
continue
for update in updates:
update_id = update.get("update_id")
if isinstance(update_id, int):
self.state["offset"] = update_id + 1
self._persist_state()
self._handle_update(update)
finally:
self.shutdown()
def shutdown(self) -> None:
if self.stop_event.is_set():
pass
self.stop_event.set()
self._stop_active_codex_process()
if self.worker.is_alive():
self.worker.join(timeout=10)
if self.lock_fd is not None:
try:
fcntl.flock(self.lock_fd, fcntl.LOCK_UN)
finally:
self.lock_fd.close()
self.lock_fd = None
self._append_history_event("service_stopped", {})
def _ensure_dirs(self) -> None:
self.cfg.data_dir.mkdir(parents=True, exist_ok=True)
self.history_dir.mkdir(parents=True, exist_ok=True)
self.history_archive_dir.mkdir(parents=True, exist_ok=True)
def _acquire_single_instance_lock(self) -> None:
self.lock_file.parent.mkdir(parents=True, exist_ok=True)
self.lock_fd = self.lock_file.open("a+")
try:
fcntl.flock(self.lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
except BlockingIOError:
raise RuntimeError(f"Уже запущен другой инстанс (lock: {self.lock_file})")
def _load_state(self) -> None:
if self.state_file.exists():
self.state = json.loads(self.state_file.read_text(encoding="utf-8"))
else:
self.state = {}
sessions = self.state.get("user_sessions")
if not isinstance(sessions, dict):
sessions = {}
self.state["user_sessions"] = sessions
user_settings = self.state.get("user_settings")
if not isinstance(user_settings, dict):
user_settings = {}
self.state["user_settings"] = user_settings
if not self.state.get("current_history_file"):
history_file = self._create_new_history_file("initial", self.cfg.allowed_username)
self.state["current_history_file"] = str(history_file)
sessions[self.cfg.allowed_username] = {"current_history_file": str(history_file)}
elif self.cfg.allowed_username not in sessions:
sessions[self.cfg.allowed_username] = {"current_history_file": str(self.state["current_history_file"])}
if not isinstance(self.state.get("next_job_number"), int):
self.state["next_job_number"] = 1
self.state["updated_at"] = now_iso()
self._persist_state()
def _persist_state(self) -> None:
self.state["updated_at"] = now_iso()
tmp = self.state_file.with_suffix(".tmp")
tmp.write_text(json.dumps(self.state, ensure_ascii=False, indent=2), encoding="utf-8")
tmp.replace(self.state_file)
def _load_queue(self) -> None:
self.queue = JsonLineStore.load(self.queue_file)
def _persist_queue(self) -> None:
JsonLineStore.save(self.queue_file, self.queue)
def _load_processed_updates(self) -> None:
if not self.processed_updates_file.exists():
self.processed_updates = []
return
lines = [x.strip() for x in self.processed_updates_file.read_text(encoding="utf-8").splitlines() if x.strip()]
if len(lines) > self.max_processed_updates:
lines = lines[-self.max_processed_updates:]
self.processed_updates_file.write_text("\n".join(lines) + "\n", encoding="utf-8")
self.processed_updates = lines
def _mark_processed_update(self, update_key: str) -> bool:
if update_key in self.processed_updates:
return True
self.processed_updates.append(update_key)
if len(self.processed_updates) > self.max_processed_updates:
self.processed_updates = self.processed_updates[-self.max_processed_updates:]
self.processed_updates_file.write_text("\n".join(self.processed_updates) + "\n", encoding="utf-8")
else:
with self.processed_updates_file.open("a", encoding="utf-8") as f:
f.write(update_key + "\n")
return False
def _recover_active_jobs_after_restart(self) -> None:
recovered_ids: list[str] = []
for job in self.queue:
if job.get("status") == "active":
job["status"] = "pending"
job["retry_reason"] = "service_restart_recovery"
job["updated_at"] = now_iso()
recovered_ids.append(job.get("id", ""))
if recovered_ids:
self._persist_queue()
self._append_history_event("active_jobs_recovered", {"jobIds": recovered_ids})
def _init_offset_if_missing(self) -> None:
if self.state.get("offset") is not None:
return
try:
updates = self.telegram.get_updates(offset=None, timeout_sec=0)
if updates:
self.state["offset"] = int(updates[-1]["update_id"]) + 1
else:
self.state["offset"] = 0
self._persist_state()
except Exception as e:
print(f"[py-bot] Не удалось инициализировать offset: {e}", flush=True)
self.state["offset"] = 0
self._persist_state()
def _current_history_file(self) -> Path:
return self._current_history_file_for_user(self.cfg.allowed_username)
def _history_dirs_for_user(self, username: str) -> tuple[Path, Path]:
uname = normalize_username(username) or "unknown"
history_dir = self.history_dir / uname
archive_dir = history_dir / "archive"
history_dir.mkdir(parents=True, exist_ok=True)
archive_dir.mkdir(parents=True, exist_ok=True)
return history_dir, archive_dir
def _ensure_user_session(self, username: str) -> None:
uname = normalize_username(username) or self.cfg.allowed_username
sessions = self.state.setdefault("user_sessions", {})
if not isinstance(sessions, dict):
sessions = {}
self.state["user_sessions"] = sessions
self._user_settings(uname)
session = sessions.get(uname)
if isinstance(session, dict) and session.get("current_history_file"):
return
history_file = self._create_new_history_file("initial", uname)
sessions[uname] = {"current_history_file": str(history_file)}
if uname == self.cfg.allowed_username:
self.state["current_history_file"] = str(history_file)
self._persist_state()
def _current_history_file_for_user(self, username: str) -> Path:
uname = normalize_username(username) or self.cfg.allowed_username
self._ensure_user_session(uname)
sessions = self.state.get("user_sessions") or {}
session = sessions.get(uname) or {}
return Path(session["current_history_file"])
def _create_new_history_file(self, reason: str, username: str) -> Path:
ts = dt.datetime.now().strftime("%Y-%m-%d_%H%M%S")
rnd = "".join(random.choices(string.hexdigits.lower(), k=8))
history_dir, _ = self._history_dirs_for_user(username)
path = history_dir / f"{ts}_{rnd}.jsonl"
JsonLineStore.append(path, {
"ts": now_iso(),
"type": "history_created",
"reason": reason,
"username": normalize_username(username),
})
return path
def _rotate_history(self, reason: str, username: str) -> Path:
uname = normalize_username(username) or self.cfg.allowed_username
current = self._current_history_file_for_user(uname)
_, archive_dir = self._history_dirs_for_user(uname)
if current.exists():
archived = archive_dir / current.name
current.replace(archived)
else:
archived = archive_dir / "(empty)"
new_file = self._create_new_history_file(reason, uname)
sessions = self.state.setdefault("user_sessions", {})
if not isinstance(sessions, dict):
sessions = {}
self.state["user_sessions"] = sessions
sessions[uname] = {"current_history_file": str(new_file)}
if uname == self.cfg.allowed_username:
self.state["current_history_file"] = str(new_file)
self._persist_state()
self._append_history_event("history_rotated", {"reason": reason, "username": uname, "archived": str(archived)}, username=uname)
return archived
def _user_settings(self, username: str) -> dict[str, Any]:
uname = normalize_username(username) or self.cfg.allowed_username
settings = self.state.get("user_settings")
if not isinstance(settings, dict):
settings = {}
self.state["user_settings"] = settings
user_settings = settings.get(uname)
if not isinstance(user_settings, dict):
user_settings = {}
settings[uname] = user_settings
if not isinstance(user_settings.get("voice_replies_enabled"), bool):
user_settings["voice_replies_enabled"] = False
return user_settings
def _voice_replies_enabled(self, username: str) -> bool:
return bool(self._user_settings(username).get("voice_replies_enabled"))
def _set_voice_replies_enabled(self, username: str, enabled: bool) -> None:
self._user_settings(username)["voice_replies_enabled"] = enabled
self._persist_state()
def _append_history(self, history_path: Path, event_type: str, payload: dict[str, Any]) -> None:
row = {"ts": now_iso(), "type": event_type}
row.update(payload)
JsonLineStore.append(history_path, row)
def _append_history_event(self, event_type: str, payload: dict[str, Any], username: str | None = None) -> None:
history_path = self._current_history_file_for_user(username or self.cfg.allowed_username)
self._append_history(history_path, "system_event", {"event": event_type, **payload})
def _send_player_welcome_once(self, chat_id: int, message_id: int, username: str) -> None:
uname = normalize_username(username)
sent = self.state.get("player_welcome_sent")
if not isinstance(sent, dict):
sent = {}
self.state["player_welcome_sent"] = sent
if sent.get(uname):
return
player_name = self._player_name(uname)
text = (
f"Привет, {player_name}.\n"
"Можно задавать вопросы по проекту, просить анализ, идеи и подготовку готового ТЗ.\n"
"Команда /new начинает новую сессию и архивирует текущую историю."
)
self._safe_send(chat_id, text, reply_to=message_id)
sent[uname] = now_iso()
self._persist_state()
def _resolve_chat_id(self, chat_id: int) -> int:
migrations = self.state.get("chat_id_migrations")
if not isinstance(migrations, dict):
return chat_id
current = chat_id
visited: set[int] = set()
while current not in visited:
visited.add(current)
next_chat_id = migrations.get(str(current))
if not isinstance(next_chat_id, int):
break
current = next_chat_id
return current
def _remember_chat_migration(self, old_chat_id: int, new_chat_id: int, source: str) -> None:
if old_chat_id == new_chat_id:
return
migrations = self.state.get("chat_id_migrations")
if not isinstance(migrations, dict):
migrations = {}
self.state["chat_id_migrations"] = migrations
if migrations.get(str(old_chat_id)) == new_chat_id:
return
migrations[str(old_chat_id)] = new_chat_id
self._persist_state()
with self.queue_lock:
changed = False
for job in self.queue:
if job.get("chat_id") == old_chat_id:
job["chat_id"] = new_chat_id
job["updated_at"] = now_iso()
changed = True
if changed:
self._persist_queue()
self._append_history_event("chat_migrated_to_supergroup", {
"oldChatId": old_chat_id,
"newChatId": new_chat_id,
"source": source,
})
@staticmethod
def _extract_migrate_to_chat_id(error_text: str) -> int | None:
match = re.search(r'"migrate_to_chat_id"\s*:\s*(-?\d+)', error_text)
if match:
return int(match.group(1))
match = re.search(r"'migrate_to_chat_id'\s*:\s*(-?\d+)", error_text)
if match:
return int(match.group(1))
return None
def _handle_update(self, update: dict[str, Any]) -> None:
message = update.get("message")
update_type = "message"
if not isinstance(message, dict):
message = update.get("channel_post")
update_type = "channel_post"
if not isinstance(message, dict):
return
chat = message.get("chat") or {}
chat_id = chat.get("id")
message_id = message.get("message_id")
chat_type = str(chat.get("type") or "")
chat_username = normalize_username(chat.get("username"))
chat_title = str(chat.get("title") or "")
sender = message.get("from") or {}
username = normalize_username(sender.get("username"))
author_signature = str(message.get("author_signature") or "").strip()
author_username = username or normalize_username(author_signature)
if not isinstance(chat_id, int) or not isinstance(message_id, int):
return
migrate_to_chat_id = message.get("migrate_to_chat_id")
if isinstance(migrate_to_chat_id, int):
self._remember_chat_migration(chat_id, migrate_to_chat_id, "telegram_message")
return
update_key = f"{chat_id}:{message_id}"
if self._mark_processed_update(update_key):
return
is_channel_post = update_type == "channel_post" or chat_type == "channel"
is_group_message = update_type == "message" and chat_type in ("group", "supergroup")
is_allowed_channel = (
not is_channel_post
or not self.cfg.allowed_channel_username
or chat_username == self.cfg.allowed_channel_username
)
if is_channel_post and not is_allowed_channel:
return
if chat_username and chat_username == self.cfg.allowed_channel_username:
self._remember_public_report_chat(chat_id)
# Игнорируем системные сообщения о входе/выходе и смене заголовка/фото.
if message.get("new_chat_members") or message.get("left_chat_member"):
return
if message.get("group_chat_created") or message.get("supergroup_chat_created") or message.get("channel_chat_created"):
return
text = (message.get("text") or message.get("caption") or "").strip()
actor_username = normalize_username(author_username)
is_allowed = self._is_allowed_user(actor_username)
is_private = chat_type == "private"
if not is_allowed:
if is_private:
self._safe_send(chat_id, "Извините, доступ к этому агенту пока не выдан. Обратитесь к Айдару.", reply_to=message_id)
return
if self._is_allowed_player(actor_username) and not is_private:
return
self._ensure_user_session(actor_username)
history_path = self._current_history_file_for_user(actor_username)
if self._is_allowed_player(actor_username):
self._send_player_welcome_once(chat_id, message_id, actor_username)
if not text:
if message.get("voice"):
self._enqueue_voice_job(
chat_id,
message_id,
actor_username,
message["voice"].get("file_id"),
media_type="voice",
update_type=update_type,
chat_username=chat_username,
chat_title=chat_title,
author_signature=author_signature,
chat_type=chat_type,
)
return
if message.get("audio"):
self._enqueue_voice_job(
chat_id,
message_id,
actor_username,
message["audio"].get("file_id"),
media_type="audio",
update_type=update_type,
chat_username=chat_username,
chat_title=chat_title,
author_signature=author_signature,
chat_type=chat_type,
)
return
self._safe_send(chat_id, "Поддерживаются текст, voice и audio.", reply_to=message_id)
return
if text.startswith("/"):
self._handle_command(chat_id, message_id, actor_username, text)
return
self._append_history(history_path, "incoming_text", {
"chatId": chat_id,
"messageId": message_id,
"updateType": update_type,
"chatType": chat_type,
"chatUsername": chat_username,
"chatTitle": chat_title,
"username": actor_username,
"authorSignature": author_signature,
"text": text,
})
job = self._build_job_base(chat_id, message_id, actor_username, str(history_path))
job["type"] = "text"
job["text"] = text
job["update_type"] = update_type
job["chat_type"] = chat_type
job["chat_username"] = chat_username
job["chat_title"] = chat_title
job["author_signature"] = author_signature
job["role"] = "owner" if self._is_owner(actor_username) else "player"
job["player_name"] = self._player_name(actor_username) if job["role"] == "player" else ""
with self.queue_lock:
self.queue.append(job)
self._persist_queue()
self._safe_send(chat_id, f"Принял задачу #{job['num']}", reply_to=message_id)
def _enqueue_voice_job(
self,
chat_id: int,
message_id: int,
username: str,
file_id: str | None,
*,
media_type: str = "voice",
update_type: str = "message",
chat_username: str = "",
chat_title: str = "",
author_signature: str = "",
chat_type: str = "",
) -> None:
if not file_id:
self._safe_send(chat_id, "Не удалось прочитать file_id голосового.", reply_to=message_id)
return
history_path = self._current_history_file_for_user(username)
self._append_history(history_path, "incoming_voice", {
"chatId": chat_id,
"messageId": message_id,
"updateType": update_type,
"chatType": chat_type,
"chatUsername": chat_username,
"chatTitle": chat_title,
"username": username,
"authorSignature": author_signature,
"fileId": file_id,
"mediaType": media_type,
})
job = self._build_job_base(chat_id, message_id, username, str(history_path))
job["type"] = "voice"
job["telegram_file_id"] = file_id
job["telegram_media_type"] = media_type
job["update_type"] = update_type
job["chat_type"] = chat_type
job["chat_username"] = chat_username
job["chat_title"] = chat_title
job["author_signature"] = author_signature
job["role"] = "owner" if self._is_owner(username) else "player"
job["player_name"] = self._player_name(username) if job["role"] == "player" else ""
with self.queue_lock:
self.queue.append(job)
self._persist_queue()
self._safe_send(chat_id, f"Принял voice в задачу #{job['num']}", reply_to=message_id)
def _build_job_base(self, chat_id: int, message_id: int, username: str, history_file: str) -> dict[str, Any]:
with self.queue_lock:
num = int(self.state.get("next_job_number", 1))
self.state["next_job_number"] = num + 1
self._persist_state()
return {
"id": str(uuid.uuid4()),
"num": num,
"status": "pending",
"type": "text",
"chat_id": chat_id,
"message_id": message_id,
"username": username,
"update_type": "message",
"chat_type": "",
"chat_username": "",
"chat_title": "",
"author_signature": "",
"role": "owner",
"player_name": "",
"text": "",
"telegram_file_id": "",
"telegram_media_type": "",
"history_file": history_file,
"attempts": 0,
"retry_reason": "",
"last_error": "",
"created_at": now_iso(),
"updated_at": now_iso(),
"active_since": None,
}
def _handle_command(self, chat_id: int, message_id: int, username: str, text: str) -> None:
lower = text.lower()
command = lower.split(maxsplit=1)[0].split("@", 1)[0]
is_owner = self._is_owner(username)
if command in ("/start", "/help"):
self._safe_send(chat_id, self._help_text(is_owner=is_owner), reply_to=message_id)
return
if command == "/status":
self._safe_send(chat_id, self._status_text(), reply_to=message_id)
return
if command == "/queue":
self._safe_send(chat_id, self._queue_text(), reply_to=message_id)
return
if command == "/voice_on":
self._set_voice_replies_enabled(username, True)
self._append_history_event("voice_replies_enabled", {"username": normalize_username(username)}, username=username)
self._safe_send(chat_id, "Озвучивание финальных ответов включено для вашего пользователя.", reply_to=message_id)
return
if command == "/voice_off":
self._set_voice_replies_enabled(username, False)
self._append_history_event("voice_replies_disabled", {"username": normalize_username(username)}, username=username)
self._safe_send(chat_id, "Озвучивание финальных ответов выключено для вашего пользователя.", reply_to=message_id)
return
if command == "/voice_status":
status = "включено" if self._voice_replies_enabled(username) else "выключено"
self._safe_send(chat_id, f"Озвучивание финальных ответов: {status}.", reply_to=message_id)
return
if command == "/new":
archived = self._rotate_history("command_new", username)
self._safe_send(chat_id, f"История очищена. Новый диалог начат.\nАрхив: {archived.name}", reply_to=message_id)
return
if command in ("/restart_service", "/restart"):
if not is_owner:
self._safe_send(chat_id, "Команда недоступна.", reply_to=message_id)
return
self._append_history_event("restart_service_requested", {
"chatId": chat_id,
"messageId": message_id,
"username": username,
}, username=username)
self._safe_send(
chat_id,
"Перезапускаю сервис. Если задача была активна, после старта она вернётся в очередь и продолжится.",
reply_to=message_id,
)
self._schedule_self_restart()
return
if command == "/stop":
stopped = self._cancel_active_job("stopped_by_user")
if stopped:
self._safe_send(chat_id, "Текущая задача остановлена и удалена из очереди.", reply_to=message_id)
else:
self._safe_send(chat_id, "Сейчас нет активной задачи.", reply_to=message_id)
return
if command == "/cancel":
parts = text.split(maxsplit=1)
if len(parts) < 2:
self._safe_send(chat_id, "Использование: /cancel <id|all>", reply_to=message_id)
return
arg = parts[1].strip()
if arg.lower() == "all":
with self.queue_lock:
self.stop_current_job = True
self._stop_active_codex_process()
count = len(self.queue)
self.queue = []
self._persist_queue()
self._safe_send(chat_id, f"Удалено задач из очереди: {count}", reply_to=message_id)
return
cancelled = self._cancel_by_id_prefix(arg)
self._safe_send(chat_id, f"Задача удалена: {arg}" if cancelled else f"Задача не найдена: {arg}", reply_to=message_id)
return
def _help_text(self, *, is_owner: bool) -> str:
lines = [
"Доступные команды:",
"/status — активная задача и размер очереди",
"/queue — список задач в очереди",
"/stop — остановить текущую задачу",
"/cancel <id|all> — удалить задачу по id (префикс) или все",
"/new — архивировать историю и начать новую",
"/voice_on — включить озвучивание финальных ответов",
"/voice_off — выключить озвучивание финальных ответов",
"/voice_status — показать состояние озвучивания",
"/help — эта справка",
]
if is_owner:
lines.insert(-1, "/restart_service — перезапустить сервис через systemd")
return "\n".join(lines)
def _status_text(self) -> str:
with self.queue_lock:
active = next((j for j in self.queue if j.get("status") == "active"), None)
pending = sum(1 for j in self.queue if j.get("status") == "pending")
if not active:
return f"Статус: активной задачи нет.\nВ очереди pending: {pending}"
elapsed = int(time.time() - (self.active_job_started_at or time.time()))
return (
f"Статус: активная задача #{active.get('num', '?')}\n"
f"Тип: {active.get('type', 'text')}\n"
f"Попытка: {int(active.get('attempts', 0)) + 1}/{self.cfg.max_retries}\n"
f"Выполняется: {elapsed}с\n"
f"Pending: {pending}"
)
def _queue_text(self) -> str:
with self.queue_lock:
items = list(self.queue)
if not items:
return "Очередь пуста."
lines = [f"Очередь: {len(items)}"]
for i, job in enumerate(items[:10], start=1):
lines.append(
f"{i}) #{job.get('num', '?')} [{job.get('status')}] {job.get('type')} attempts={job.get('attempts', 0)}"
)
if len(items) > 10:
lines.append(f"...и ещё {len(items) - 10} задач")
return "\n".join(lines)
def _cancel_active_job(self, reason: str) -> bool:
with self.queue_lock:
active = next((j for j in self.queue if j.get("status") == "active"), None)
if not active:
return False
self.stop_current_job = True
self._stop_active_codex_process()
self.queue = [j for j in self.queue if j.get("id") != active.get("id")]
self._persist_queue()
self._append_history_event("job_stopped_by_user", {"jobId": active.get("id"), "reason": reason})
return True
def _cancel_by_id_prefix(self, prefix: str) -> bool:
prefix = prefix.strip().lower()
normalized_num = prefix.lstrip("#")
with self.queue_lock:
target = next(
(
j for j in self.queue
if str(j.get("id", "")).lower().startswith(prefix)
or str(j.get("num", "")).lower() == normalized_num
),
None
)
if not target:
return False
if target.get("status") == "active":
self.stop_current_job = True
self._stop_active_codex_process()
self.queue = [j for j in self.queue if j.get("id") != target.get("id")]
self._persist_queue()
return True
def _worker_loop(self) -> None:
while not self.stop_event.is_set():
job = None
with self.queue_lock:
for item in self.queue:
if item.get("status") == "pending":
item["status"] = "active"
item["active_since"] = now_iso()
item["updated_at"] = now_iso()
self.active_job_id = item.get("id")
self.active_job_started_at = time.time()
job = dict(item)
self._persist_queue()
break
if not job:
time.sleep(0.5)
continue
self.stop_current_job = False
self._process_job(job)
self.active_job_id = None
self.active_job_started_at = None
def _process_job(self, job: dict[str, Any]) -> None:
job_id = job["id"]
job_num = job.get("num", "?")
chat_id = int(job["chat_id"])
message_id = int(job["message_id"])
history_path = Path(job["history_file"])
self._safe_send(chat_id, f"Задача #{job_num} в работе.", reply_to=message_id)
try:
if job.get("type") == "voice":
self._safe_send(chat_id, f"#{job_num}: распознаю голосовое...", reply_to=message_id)
recognized = self._transcribe_voice_job(job)
job["text"] = recognized
self._append_history(history_path, "voice_transcription", {"jobId": job_id, "jobNum": job_num, "text": recognized})
preview = recognized.strip()
if len(preview) > 1200:
preview = preview[:1200] + " ...[обрезано]"
self._safe_send(chat_id, f"#{job_num}: распознано:\n{preview}", reply_to=message_id)
self._safe_send(chat_id, f"#{job_num}: распознано, отправляю в Codex.", reply_to=message_id)
prompt = self._build_prompt(job)
self._append_history(history_path, "codex_request", {"jobId": job_id, "prompt": prompt})
answer = self._run_codex(prompt, chat_id, message_id, job_id, job_num)
for chunk in split_long_text(answer):
self._safe_send(chat_id, chunk, reply_to=message_id)
if self._voice_replies_enabled(job.get("username") or ""):
self._send_voice_reply_for_answer(chat_id, message_id, job_num, answer, history_path, job_id)
self._safe_send(chat_id, f"Готово #{job_num}.", reply_to=message_id)
self._append_history(history_path, "codex_response", {"jobId": job_id, "text": answer})
self._mark_job_done(job_id)
self._send_private_job_public_report(job, answer)
except Exception as e:
if self.stop_current_job:
self._append_history(history_path, "job_stopped", {"jobId": job_id, "reason": str(e)})
self._safe_send(chat_id, f"Задача #{job_num} остановлена.", reply_to=message_id)
self._mark_job_removed(job_id)
self.stop_current_job = False
return
if isinstance(e, VoiceTranscriptionError):
self._append_history(history_path, "voice_transcription_failed", {
"jobId": job_id,
"jobNum": job_num,
"stage": e.stage,
"retryable": e.retryable,
"error": e.user_message,
"detail": e.detail,
})
self._handle_job_failure(job, e)
def _build_prompt(self, job: dict[str, Any]) -> str:
retry_block = ""
retry_reason = (job.get("retry_reason") or "").strip()
if retry_reason:
retry_block = f"\n\nПометка retry: {retry_reason}"
role = (job.get("role") or "owner").strip().lower()
player_block = ""
if role == "player":
player_name = (job.get("player_name") or "").strip()
player_dir = self.cfg.codex_workdir / "Players" / (job.get("username") or "")
player_block = (
"\n\nРежим игрока (обязательно):\n"
f"- Пользователь: {player_name} (@{job.get('username')}).\n"
f"- Рабочая папка игрока: {player_dir}\n"
"- Код проекта не изменять.\n"
"- Можно отвечать на вопросы по проекту, предлагать идеи и готовить ТЗ.\n"
"- Если нужны правки кода, описывать предложение текстом и сохранять материалы только в папке игрока."
)
return (
"Пришло сообщение в Telegram.\n"
f"Тип: {job.get('type')}\n"
f"Источник Telegram: {job.get('update_type', 'message')}\n"
f"Тип чата: {job.get('chat_type') or ''}\n"
f"Канал/чат: @{job.get('chat_username') or ''} {job.get('chat_title') or ''}\n"
f"Username отправителя: @{job.get('username')}\n"
f"Подпись автора в Telegram: {job.get('author_signature') or ''}\n"
"Текст для обработки:\n"
f"{job.get('text')}\n\n"
f"История диалога (JSONL): {job.get('history_file')}\n"
f"Инструкции агента: {self.cfg.agent_instructions_file}\n"
f"Работай в рабочем проекте аккуратно и верни только текст ответа пользователю.{player_block}{retry_block}"
)
def _run_codex(self, prompt: str, chat_id: int, message_id: int, job_id: str, job_num: Any) -> str:
output_lines: list[str] = []
with tempfile.NamedTemporaryFile(prefix="shine-codex-last-message-", suffix=".txt", delete=False) as tmp:
output_file = Path(tmp.name)
cmd = [
str(self.cfg.codex_bin),
"exec",
"--dangerously-bypass-approvals-and-sandbox",
"--json",
"-C", str(self.cfg.codex_workdir),
"-o", str(output_file),
prompt,
]
print(f"[py-bot] codex exec start job={job_id[:8]}", flush=True)
process = subprocess.Popen(
cmd,
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
encoding="utf-8",
errors="replace",
bufsize=1,
)
with self.active_process_lock:
self.active_process = process
self.last_heartbeat_at = 0.0
last_user_note = ""
last_user_note_at = 0.0
codex_started_at = time.time()
last_job_message_at = codex_started_at
def on_line(line: str) -> None:
nonlocal last_user_note, last_user_note_at, last_job_message_at
output_lines.append(line)
note = self._extract_codex_user_note(line)
now = time.time()
if note and note != last_user_note and now - last_user_note_at > 8:
self._safe_send(chat_id, f"#{job_num}: {note}", reply_to=message_id)
last_user_note = note
last_user_note_at = now
last_job_message_at = now
reader_done = threading.Event()
def reader() -> None:
if not process.stdout:
reader_done.set()
return
for line in process.stdout:
on_line(line.rstrip("\n"))
reader_done.set()
t = threading.Thread(target=reader, name=f"codex-reader-{job_id[:8]}", daemon=True)
t.start()
try:
deadline = time.time() + self.cfg.codex_timeout_seconds
return_code = None
while return_code is None:
return_code = process.poll()
now = time.time()
if return_code is not None:
break
if now >= deadline:
process.kill()
t.join(timeout=2)
raise RuntimeError(f"Codex timeout after {self.cfg.codex_timeout_seconds}s")
if now - codex_started_at >= 120 and now - last_job_message_at >= 120:
elapsed = self._format_duration(int(now - codex_started_at))
self._safe_send(
chat_id,
f"#{job_num}: задача ещё выполняется, работает уже {elapsed}. От Codex давно нет сообщений.",
reply_to=message_id,
)
last_job_message_at = now
self.last_heartbeat_at = now
time.sleep(1)
finally:
with self.active_process_lock:
self.active_process = None
reader_done.wait(timeout=2)
if return_code != 0:
tail = "\n".join(output_lines[-40:])
raise RuntimeError(f"Codex exited with code {return_code}. Output tail:\n{tail}")
if output_file.exists():
answer = output_file.read_text(encoding="utf-8").strip()
try:
output_file.unlink(missing_ok=True)
except Exception:
pass
if answer:
return answer
fallback = self._extract_fallback_message(output_lines)
if not fallback:
raise RuntimeError("Codex returned empty response")
return fallback
def _stop_active_codex_process(self) -> bool:
with self.active_process_lock:
process = self.active_process
if process is None:
return False
if process.poll() is not None:
return False
process.terminate()
try:
process.wait(timeout=2)
except subprocess.TimeoutExpired:
process.kill()
return True
def _handle_job_failure(self, job: dict[str, Any], err: Exception) -> None:
job_id = job["id"]
job_num = job.get("num", "?")
chat_id = int(job["chat_id"])
message_id = int(job["message_id"])
error_text = str(err).strip() or err.__class__.__name__
user_error_text = self._user_error_text(err)
retryable = not isinstance(err, VoiceTranscriptionError) or err.retryable
log_error_text = err.log_text() if isinstance(err, VoiceTranscriptionError) else error_text
print(f"[py-bot] Ошибка job={job_id[:8]}: {log_error_text}", flush=True)
print(traceback.format_exc(), flush=True)
with self.queue_lock:
target = next((j for j in self.queue if j.get("id") == job_id), None)
if not target:
return
attempts = int(target.get("attempts", 0)) + 1
target["attempts"] = attempts
target["last_error"] = error_text[:1000]
target["updated_at"] = now_iso()
if retryable and attempts < self.cfg.max_retries:
target["status"] = "pending"
target["retry_reason"] = error_text[:200]
self._persist_queue()
will_retry = True
else:
self.queue = [j for j in self.queue if j.get("id") != job_id]
self._persist_queue()
will_retry = False
if will_retry:
self._safe_send(
chat_id,
f"{user_error_text}\nПовторю задачу #{job_num}: попытка {attempts + 1}/{self.cfg.max_retries}.",
reply_to=message_id,
)
else:
self._safe_send(chat_id, f"{user_error_text}\nЗадача #{job_num} остановлена.", reply_to=message_id)
def _user_error_text(self, err: Exception) -> str:
if isinstance(err, VoiceTranscriptionError):
return f"Не удалось распознать голосовое: {err.user_message}"
error_text = str(err).strip() or err.__class__.__name__
return f"Ошибка выполнения задачи: {error_text}"
def _mark_job_done(self, job_id: str) -> None:
with self.queue_lock:
self.queue = [j for j in self.queue if j.get("id") != job_id]
self._persist_queue()
def _mark_job_removed(self, job_id: str) -> None:
with self.queue_lock:
self.queue = [j for j in self.queue if j.get("id") != job_id]
self._persist_queue()
def _remember_public_report_chat(self, chat_id: int) -> None:
if self.state.get("public_report_chat_id") == chat_id:
return
self.state["public_report_chat_id"] = chat_id
self.state["updated_at"] = now_iso()
self._persist_state()
def _public_report_chat_id(self) -> int | str | None:
chat_id = self.state.get("public_report_chat_id")
if isinstance(chat_id, int):
return self._resolve_chat_id(chat_id)
if self.cfg.allowed_channel_username:
return f"@{self.cfg.allowed_channel_username}"
return None
def _send_private_job_public_report(self, job: dict[str, Any], answer: str) -> None:
if job.get("chat_type") != "private":
return
report_chat_id = self._public_report_chat_id()
if report_chat_id is None:
return
job_num = job.get("num", "?")
source_text = (job.get("text") or "").strip()
if not source_text:
source_text = "(пустой текст запроса)"
role = (job.get("role") or "owner").strip().lower()
author_label = "Айдар"
if role == "player":
player_name = (job.get("player_name") or "").strip() or job.get("username") or "Игрок"
author_label = f"{player_name} (@{job.get('username')})"
if job.get("type") == "voice":
voice_file_id = (job.get("telegram_file_id") or "").strip()
media_type = (job.get("telegram_media_type") or "voice").strip()
request_caption = self._trim_telegram_caption(
f"{author_label} сделал {media_type}-запрос, задача #{job_num}.\n\n"
f"Распознанный текст:\n{source_text}"
)
request_message_id = None
if voice_file_id:
request_message_id = self._safe_send_telegram_file(
report_chat_id,
voice_file_id,
media_type=media_type,
caption=request_caption,
)
if request_message_id is None:
request_message_id = self._safe_send(report_chat_id, request_caption)
else:
request_report = (
f"{author_label} сделал запрос, задача #{job_num}.\n\n"
f"{source_text}"
)
request_message_id = self._safe_send(report_chat_id, request_report)
if request_message_id is None:
return
answer_text = (answer or "").strip() or "(пустой ответ)"
answer_chunks = split_long_text(f"Ответ на задачу #{job_num}:\n\n{answer_text}")
for chunk in answer_chunks:
self._safe_send(report_chat_id, chunk, reply_to=request_message_id)
@staticmethod
def _trim_telegram_caption(text: str, limit: int = 1000) -> str:
text = (text or "").strip()
if len(text) <= limit:
return text
return text[:limit].rstrip() + "\n...[обрезано]"
def _safe_send_telegram_file(
self,
chat_id: int | str,
file_id: str,
*,
media_type: str = "voice",
caption: str = "",
reply_to: int | None = None,
) -> int | None:
file_id = (file_id or "").strip()
if not file_id:
return None
caption = self._trim_telegram_caption(caption)
resolved_chat_id: int | str = self._resolve_chat_id(chat_id) if isinstance(chat_id, int) else chat_id
resolved_reply_to = reply_to if resolved_chat_id == chat_id or isinstance(chat_id, str) else None
def send(target_chat_id: int | str, target_reply_to: int | None) -> dict[str, Any]:
if media_type == "audio":
return self.telegram.send_audio(target_chat_id, file_id, caption=caption, reply_to_message_id=target_reply_to)
return self.telegram.send_voice(target_chat_id, file_id, caption=caption, reply_to_message_id=target_reply_to)
try:
sent = send(resolved_chat_id, resolved_reply_to)
result = sent.get("result") or {}
message_id = result.get("message_id")
return message_id if isinstance(message_id, int) else None
except Exception as e:
migrate_to_chat_id = self._extract_migrate_to_chat_id(str(e))
if migrate_to_chat_id is not None:
if isinstance(resolved_chat_id, int):
self._remember_chat_migration(resolved_chat_id, migrate_to_chat_id, "send_file_error")
try:
sent = send(migrate_to_chat_id, None)
result = sent.get("result") or {}
message_id = result.get("message_id")
return message_id if isinstance(message_id, int) else None
except Exception as retry_error:
print(f"[py-bot] sendFile retry after migration error: {retry_error}", flush=True)
return None
print(f"[py-bot] sendFile error: {e}", flush=True)
return None
def _safe_send_voice_upload(
self,
chat_id: int | str,
voice_bytes: bytes,
filename: str,
*,
caption: str = "",
reply_to: int | None = None,
) -> int | None:
if not voice_bytes:
return None
caption = self._trim_telegram_caption(caption)
resolved_chat_id: int | str = self._resolve_chat_id(chat_id) if isinstance(chat_id, int) else chat_id
resolved_reply_to = reply_to if resolved_chat_id == chat_id or isinstance(chat_id, str) else None
def send(target_chat_id: int | str, target_reply_to: int | None) -> dict[str, Any]:
return self.telegram.send_voice_upload(
target_chat_id,
voice_bytes,
filename,
caption=caption,
reply_to_message_id=target_reply_to,
)
try:
sent = send(resolved_chat_id, resolved_reply_to)
result = sent.get("result") or {}
message_id = result.get("message_id")
return message_id if isinstance(message_id, int) else None
except Exception as e:
migrate_to_chat_id = self._extract_migrate_to_chat_id(str(e))
if migrate_to_chat_id is not None:
if isinstance(resolved_chat_id, int):
self._remember_chat_migration(resolved_chat_id, migrate_to_chat_id, "send_voice_upload_error")
try:
sent = send(migrate_to_chat_id, None)
result = sent.get("result") or {}
message_id = result.get("message_id")
return message_id if isinstance(message_id, int) else None
except Exception as retry_error:
print(f"[py-bot] sendVoiceUpload retry after migration error: {retry_error}", flush=True)
return None
print(f"[py-bot] sendVoiceUpload error: {e}", flush=True)
return None
def _safe_send(self, chat_id: int | str, text: str, reply_to: int | None = None) -> int | None:
text = (text or "").strip()
if not text:
return None
if len(text) > 3900:
text = text[:3900] + "\n...[обрезано]"
resolved_chat_id: int | str = self._resolve_chat_id(chat_id) if isinstance(chat_id, int) else chat_id
resolved_reply_to = reply_to if resolved_chat_id == chat_id or isinstance(chat_id, str) else None
try:
sent = self.telegram.send_message(resolved_chat_id, text, reply_to_message_id=resolved_reply_to)
result = sent.get("result") or {}
message_id = result.get("message_id")
return message_id if isinstance(message_id, int) else None
except Exception as e:
migrate_to_chat_id = self._extract_migrate_to_chat_id(str(e))
if migrate_to_chat_id is not None:
if isinstance(resolved_chat_id, int):
self._remember_chat_migration(resolved_chat_id, migrate_to_chat_id, "send_message_error")
try:
sent = self.telegram.send_message(migrate_to_chat_id, text, reply_to_message_id=None)
result = sent.get("result") or {}
message_id = result.get("message_id")
return message_id if isinstance(message_id, int) else None
except Exception as retry_error:
print(f"[py-bot] sendMessage retry after migration error: {retry_error}", flush=True)
return None
print(f"[py-bot] sendMessage error: {e}", flush=True)
return None
def _schedule_self_restart(self) -> None:
if self.restart_requested:
return
self.restart_requested = True
def restart() -> None:
time.sleep(1.5)
print("[py-bot] restart requested by Telegram command", flush=True)
os._exit(0)
threading.Thread(target=restart, name="shine-py-bot-self-restart", daemon=True).start()
def _send_voice_reply_for_answer(
self,
chat_id: int,
message_id: int,
job_num: Any,
answer: str,
history_path: Path,
job_id: str,
) -> None:
if not self.cfg.openai_api_key:
note = "не настроен ключ OpenAI для озвучивания."
self._append_history(history_path, "voice_reply_failed", {"jobId": job_id, "jobNum": job_num, "error": note})
self._safe_send(chat_id, f"Озвучивание включено, но {note}", reply_to=message_id)
return
chunks = split_text_for_tts(answer, self.cfg.openai_tts_chunk_chars)
if not chunks:
return
sent_count = 0
total = len(chunks)
print(f"[py-bot] tts start job={str(job_id)[:8]} chunks={total}", flush=True)
for index, chunk in enumerate(chunks, start=1):
try:
audio = self._openai_tts(chunk)
except VoiceReplyError as e:
self._append_history(history_path, "voice_reply_failed", {
"jobId": job_id,
"jobNum": job_num,
"part": index,
"parts": total,
"error": str(e),
})
self._safe_send(chat_id, f"Не удалось озвучить ответ #{job_num}: {e}", reply_to=message_id)
return
caption = f"Озвучка ответа #{job_num}"
if total > 1:
caption += f", часть {index}/{total}"
message_sent = self._safe_send_voice_upload(
chat_id,
audio,
f"shine-answer-{job_num}-{index}.ogg",
caption=caption,
reply_to=message_id,
)
if message_sent is None:
self._append_history(history_path, "voice_reply_failed", {
"jobId": job_id,
"jobNum": job_num,
"part": index,
"parts": total,
"error": "Telegram не принял voice-файл озвучки.",
})
self._safe_send(chat_id, f"Озвучка ответа #{job_num} создана, но Telegram не принял voice-файл.", reply_to=message_id)
return
sent_count += 1
self._append_history(history_path, "voice_reply_sent", {"jobId": job_id, "jobNum": job_num, "parts": sent_count})
print(f"[py-bot] tts done job={str(job_id)[:8]} sent={sent_count}", flush=True)
def _openai_tts(self, text: str) -> bytes:
payload = {
"model": self.cfg.openai_tts_model,
"voice": self.cfg.openai_tts_voice,
"input": text,
"response_format": self.cfg.openai_tts_response_format,
}
data = json.dumps(payload, ensure_ascii=False).encode("utf-8")
req = request.Request("https://api.openai.com/v1/audio/speech", method="POST", data=data)
req.add_header("Authorization", f"Bearer {self.cfg.openai_api_key}")
req.add_header("Content-Type", "application/json")
try:
with request.urlopen(req, timeout=self.cfg.openai_tts_timeout_seconds) as resp:
audio = resp.read()
except TimeoutError as e:
raise VoiceReplyError(f"OpenAI не успел сгенерировать речь за {self.cfg.openai_tts_timeout_seconds} секунд.") from e
except error.HTTPError as e:
detail = e.read().decode("utf-8", errors="replace")
if e.code == 401:
message = "OpenAI отклонил ключ API для озвучивания."
elif e.code == 429:
message = "OpenAI временно ограничил озвучивание из-за лимита запросов."
elif e.code >= 500:
message = "OpenAI временно не смог сгенерировать речь."
else:
message = f"OpenAI вернул ошибку HTTP {e.code} при озвучивании."
if detail:
message = f"{message} Детали: {detail[:500]}"
raise VoiceReplyError(message) from e
except error.URLError as e:
raise VoiceReplyError(f"не удалось отправить текст в OpenAI TTS из-за сетевой ошибки: {e.reason}") from e
if not audio:
raise VoiceReplyError("OpenAI вернул пустой аудиофайл.")
return audio
def _transcribe_voice_job(self, job: dict[str, Any]) -> str:
if not self.cfg.openai_api_key:
raise VoiceTranscriptionError(
"не настроен ключ OpenAI для распознавания.",
stage="config",
retryable=False,
)
file_id = (job.get("telegram_file_id") or "").strip()
if not file_id:
raise VoiceTranscriptionError(
"Telegram не передал идентификатор файла.",
stage="telegram_file_id",
retryable=False,
)
job_id = str(job.get("id") or "")[:8]
job_num = job.get("num", "?")
media_type = (job.get("telegram_media_type") or "voice").strip()
started_at = time.time()
print(f"[py-bot] transcribe start job={job_id} num={job_num} media={media_type}", flush=True)
file_bytes, filename = self._download_telegram_file(file_id)
print(
f"[py-bot] transcribe downloaded job={job_id} filename={filename} size={len(file_bytes)} bytes",
flush=True,
)
text = self._openai_transcribe(file_bytes, filename).strip()
if not text:
raise VoiceTranscriptionError(
"сервис распознавания вернул пустой текст. Возможно, в записи нет слышимой речи или качество звука слишком низкое.",
stage="empty_text",
retryable=False,
)
elapsed = self._format_duration(int(time.time() - started_at))
print(f"[py-bot] transcribe done job={job_id} chars={len(text)} elapsed={elapsed}", flush=True)
return text
def _download_telegram_file(self, file_id: str) -> tuple[bytes, str]:
try:
result = self.telegram.call("getFile", {"file_id": file_id}, timeout=120)
except TimeoutError as e:
raise VoiceTranscriptionError(
"Telegram долго не отдавал информацию о файле.",
stage="telegram_get_file_timeout",
detail=str(e),
) from e
except Exception as e:
raise VoiceTranscriptionError(
"не удалось получить информацию о файле из Telegram.",
stage="telegram_get_file",
detail=str(e),
) from e
info = result.get("result") or {}
file_path = info.get("file_path")
if not file_path:
raise VoiceTranscriptionError(
"Telegram не вернул путь к файлу.",
stage="telegram_file_path",
retryable=True,
detail=json.dumps(info, ensure_ascii=False)[:1000],
)
file_url = f"https://api.telegram.org/file/bot{self.cfg.telegram_bot_token}/{file_path}"
req = request.Request(file_url, method="GET")
try:
with request.urlopen(req, timeout=self.cfg.telegram_file_download_timeout_seconds) as resp:
data = resp.read()
except TimeoutError as e:
raise VoiceTranscriptionError(
f"Telegram не успел отдать аудиофайл за {self.cfg.telegram_file_download_timeout_seconds} секунд.",
stage="telegram_download_timeout",
detail=str(e),
) from e
except error.HTTPError as e:
detail = e.read().decode("utf-8", errors="replace")
raise VoiceTranscriptionError(
f"Telegram вернул ошибку HTTP {e.code} при скачивании аудио.",
stage="telegram_download_http",
retryable=e.code >= 500 or e.code == 429,
detail=detail[:1000],
) from e
except error.URLError as e:
raise VoiceTranscriptionError(
"не удалось скачать аудиофайл из Telegram из-за сетевой ошибки.",
stage="telegram_download_network",
detail=str(e.reason),
) from e
if not data:
raise VoiceTranscriptionError(
"Telegram отдал пустой аудиофайл.",
stage="telegram_download_empty",
retryable=True,
)
original_name = Path(file_path).name or "audio.ogg"
lower = original_name.lower()
# OpenAI transcription может не принимать расширение .oga, нормализуем в .ogg.
if lower.endswith(".oga"):
base = original_name[:-4] if len(original_name) > 4 else "audio"
normalized = f"{base}.ogg"
else:
normalized = original_name
return data, normalized
def _openai_transcribe(self, file_bytes: bytes, filename: str) -> str:
boundary = "----shine-boundary-" + "".join(random.choices("abcdef0123456789", k=16))
mime = mimetypes.guess_type(filename)[0] or "application/octet-stream"
def text_part(name: str, value: str) -> bytes:
return (
f"--{boundary}\r\n"
f'Content-Disposition: form-data; name="{name}"\r\n\r\n'
f"{value}\r\n"
).encode("utf-8")
body = bytearray()
body.extend(text_part("model", self.cfg.openai_transcribe_model))
body.extend(text_part("response_format", "text"))
body.extend(
(
f"--{boundary}\r\n"
f'Content-Disposition: form-data; name="file"; filename="{filename}"\r\n'
f"Content-Type: {mime}\r\n\r\n"
).encode("utf-8")
)
body.extend(file_bytes)
body.extend(b"\r\n")
body.extend(f"--{boundary}--\r\n".encode("utf-8"))
req = request.Request("https://api.openai.com/v1/audio/transcriptions", method="POST", data=bytes(body))
req.add_header("Authorization", f"Bearer {self.cfg.openai_api_key}")
req.add_header("Content-Type", f"multipart/form-data; boundary={boundary}")
try:
with request.urlopen(req, timeout=self.cfg.openai_transcribe_timeout_seconds) as resp:
return resp.read().decode("utf-8", errors="replace")
except TimeoutError as e:
raise VoiceTranscriptionError(
f"OpenAI не успел распознать аудио за {self.cfg.openai_transcribe_timeout_seconds} секунд.",
stage="openai_transcribe_timeout",
detail=str(e),
) from e
except error.HTTPError as e:
detail = e.read().decode("utf-8", errors="replace")
if e.code == 400:
user_message = "OpenAI не принял аудиофайл для распознавания."
elif e.code == 401:
user_message = "OpenAI отклонил ключ API для распознавания."
elif e.code == 413:
user_message = "аудиофайл слишком большой для распознавания OpenAI."
elif e.code == 429:
user_message = "OpenAI временно ограничил распознавание из-за лимита запросов."
elif e.code >= 500:
user_message = "OpenAI временно не смог обработать распознавание."
else:
user_message = f"OpenAI вернул ошибку HTTP {e.code} при распознавании."
raise VoiceTranscriptionError(
user_message,
stage="openai_transcribe_http",
retryable=e.code == 429 or e.code >= 500,
detail=detail[:1500],
) from e
except error.URLError as e:
raise VoiceTranscriptionError(
"не удалось отправить аудио в OpenAI из-за сетевой ошибки.",
stage="openai_transcribe_network",
detail=str(e.reason),
) from e
@staticmethod
def _extract_codex_user_note(line: str) -> str | None:
s = (line or "").strip()
if not s.startswith("{"):
return None
try:
obj = json.loads(s)
except Exception:
return None
if obj.get("type") != "item.completed":
return None
item = obj.get("item") or {}
if item.get("type") != "agent_message":
return None
text = (item.get("text") or "").strip()
if not text:
return None
if len(text) > 220:
return text[:220] + "..."
return text
@staticmethod
def _extract_fallback_message(lines: list[str]) -> str:
for line in reversed(lines):
line = line.strip()
if not line:
continue
if line.startswith("{") and '"type":' in line:
continue
if line.startswith("mcp:") or line.startswith("OpenAI Codex"):
continue
return line
return ""
@staticmethod
def _format_duration(seconds: int) -> str:
seconds = max(0, seconds)
minutes, sec = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60)
if hours:
return f"{hours}ч {minutes}м {sec}с"
if minutes:
return f"{minutes}м {sec}с"
return f"{sec}с"
def run_selftest(config: BotConfig, prompt: str) -> int:
cmd = [
str(config.codex_bin),
"exec",
"--dangerously-bypass-approvals-and-sandbox",
"--json",
"-C", str(config.codex_workdir),
prompt,
]
proc = subprocess.run(
cmd,
stdin=subprocess.DEVNULL,
capture_output=True,
text=True,
encoding="utf-8",
errors="replace",
)
print(proc.stdout)
if proc.stderr:
print(proc.stderr)
return proc.returncode
def main() -> int:
parser = argparse.ArgumentParser(description="SHiNE Python Telegram bot wrapper for Codex CLI")
parser.add_argument("--selftest-codex", default="", help="Выполнить только codex exec с этим prompt и выйти")
args = parser.parse_args()
root = Path(__file__).resolve().parent
cfg = BotConfig(root)
if args.selftest_codex:
return run_selftest(cfg, args.selftest_codex)
service = ShinePyBotService(cfg)
try:
service.run()
except KeyboardInterrupt:
service.shutdown()
return 0
except Exception as e:
print(f"[py-bot] FATAL: {e}", flush=True)
return 1
return 0
if __name__ == "__main__":
raise SystemExit(main())