SHiNE-server/SHiNE-agent-bot-coder/py_bot_service.py

2977 lines
134 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
from __future__ import annotations
import argparse
import datetime as dt
import fcntl
import json
import mimetypes
import os
import random
import re
import shutil
import string
import subprocess
import tempfile
import threading
import time
import traceback
import uuid
from pathlib import Path
from typing import Any, Callable
from urllib import error, request
DEFAULT_ALLOWED_PLAYERS = ",".join([
"malvviiina:Милана",
"zodiaktechnika32:Сергей",
"oidasyda:Иван",
"blackbyrd1:Ворон",
"dimasol1:Дима",
])
TASK_STATUS_LABELS = {
"new": "новая",
"approved": "одобрена",
"rejected": "отклонена",
"needs_work": "на доработку",
"done": "сделана",
}
def now_iso() -> str:
return dt.datetime.now(dt.timezone.utc).isoformat()
def normalize_username(value: str | None) -> str:
if not value:
return ""
value = value.strip()
if value.startswith("@"):
value = value[1:]
return value.lower()
def parse_allowed_players(raw: str) -> dict[str, str]:
players: dict[str, str] = {}
for item in (raw or "").split(","):
part = item.strip()
if not part:
continue
username_part, sep, name_part = part.partition(":")
username = normalize_username(username_part)
if not username:
continue
player_name = (name_part if sep else username_part).strip() or username
players[username] = player_name
return players
def compact_spaces(text: str) -> str:
return re.sub(r"\s+", " ", (text or "").strip())
def split_long_text(text: str, chunk_size: int = 3500) -> list[str]:
text = (text or "").strip()
if not text:
return ["(пустой ответ)"]
return [text[i:i + chunk_size] for i in range(0, len(text), chunk_size)]
def split_final_private_text(text: str, first_chunk_size: int = 3900, second_chunk_size: int = 3900) -> list[str]:
text = (text or "").strip()
if not text:
return ["(пустой ответ)"]
if len(text) <= first_chunk_size:
return [text]
first = text[:first_chunk_size].rstrip()
rest = text[first_chunk_size:].lstrip()
if len(rest) <= second_chunk_size:
return [first, rest]
second = rest[:second_chunk_size].rstrip()
if len(second) > 40:
second = second[:-40].rstrip()
second = second.rstrip() + "\n...[ответ обрезан]"
return [first, second]
def split_text_for_tts(text: str, chunk_size: int) -> list[str]:
text = (text or "").strip()
if not text:
return []
chunks: list[str] = []
current = ""
paragraphs = re.split(r"\n\s*\n", text)
for paragraph in paragraphs:
paragraph = paragraph.strip()
if not paragraph:
continue
if len(paragraph) > chunk_size:
if current:
chunks.append(current)
current = ""
for i in range(0, len(paragraph), chunk_size):
part = paragraph[i:i + chunk_size].strip()
if part:
chunks.append(part)
continue
candidate = paragraph if not current else f"{current}\n\n{paragraph}"
if len(candidate) <= chunk_size:
current = candidate
else:
if current:
chunks.append(current)
current = paragraph
if current:
chunks.append(current)
return chunks
def read_env_file(path: Path) -> dict[str, str]:
result: dict[str, str] = {}
if not path.exists():
return result
for raw_line in path.read_text(encoding="utf-8").splitlines():
line = raw_line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, value = line.split("=", 1)
key = key.strip()
value = value.strip().strip('"').strip("'")
result[key] = value
return result
class VoiceTranscriptionError(RuntimeError):
def __init__(
self,
user_message: str,
*,
stage: str,
retryable: bool = True,
detail: str = "",
):
super().__init__(user_message)
self.user_message = user_message
self.stage = stage
self.retryable = retryable
self.detail = detail
def log_text(self) -> str:
if self.detail and self.detail != self.user_message:
return f"{self.user_message} stage={self.stage} retryable={self.retryable} detail={self.detail}"
return f"{self.user_message} stage={self.stage} retryable={self.retryable}"
class VoiceReplyError(RuntimeError):
pass
class JsonLineStore:
@staticmethod
def load(path: Path) -> list[dict[str, Any]]:
if not path.exists():
return []
items: list[dict[str, Any]] = []
for line in path.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line:
continue
items.append(json.loads(line))
return items
@staticmethod
def save(path: Path, items: list[dict[str, Any]]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(path.suffix + ".tmp")
with tmp.open("w", encoding="utf-8") as f:
for item in items:
f.write(json.dumps(item, ensure_ascii=False) + "\n")
tmp.replace(path)
@staticmethod
def append(path: Path, item: dict[str, Any]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("a", encoding="utf-8") as f:
f.write(json.dumps(item, ensure_ascii=False) + "\n")
class TelegramApi:
def __init__(self, token: str, base_url: str = "https://api.telegram.org"):
self.token = token
self.api_root = (base_url or "https://api.telegram.org").rstrip("/")
self.base = f"{self.api_root}/bot{token}/"
self.file_base = f"{self.api_root}/file/bot{token}/"
def call(self, method: str, payload: dict[str, Any] | None = None, timeout: int = 60) -> dict[str, Any]:
data = None
headers = {}
if payload is not None:
data = json.dumps(payload).encode("utf-8")
headers["Content-Type"] = "application/json"
req = request.Request(self.base + method, data=data, headers=headers, method="POST")
try:
with request.urlopen(req, timeout=timeout) as resp:
raw = resp.read().decode("utf-8")
except error.HTTPError as e:
body = e.read().decode("utf-8", errors="replace")
raise RuntimeError(f"Telegram HTTP {e.code}: {body}") from e
except Exception as e:
raise RuntimeError(f"Telegram request failed: {e}") from e
result = json.loads(raw)
if not result.get("ok"):
raise RuntimeError(f"Telegram API error: {result}")
return result
def call_multipart(
self,
method: str,
fields: dict[str, Any],
files: dict[str, tuple[str, bytes, str]],
timeout: int = 120,
) -> dict[str, Any]:
boundary = "----shine-tg-boundary-" + "".join(random.choices("abcdef0123456789", k=16))
body = bytearray()
for name, value in fields.items():
if value is None:
continue
body.extend(
(
f"--{boundary}\r\n"
f'Content-Disposition: form-data; name="{name}"\r\n\r\n'
f"{value}\r\n"
).encode("utf-8")
)
for name, (filename, data, mime) in files.items():
body.extend(
(
f"--{boundary}\r\n"
f'Content-Disposition: form-data; name="{name}"; filename="{filename}"\r\n'
f"Content-Type: {mime}\r\n\r\n"
).encode("utf-8")
)
body.extend(data)
body.extend(b"\r\n")
body.extend(f"--{boundary}--\r\n".encode("utf-8"))
req = request.Request(self.base + method, data=bytes(body), method="POST")
req.add_header("Content-Type", f"multipart/form-data; boundary={boundary}")
try:
with request.urlopen(req, timeout=timeout) as resp:
raw = resp.read().decode("utf-8")
except error.HTTPError as e:
body_text = e.read().decode("utf-8", errors="replace")
raise RuntimeError(f"Telegram HTTP {e.code}: {body_text}") from e
except Exception as e:
raise RuntimeError(f"Telegram multipart request failed: {e}") from e
result = json.loads(raw)
if not result.get("ok"):
raise RuntimeError(f"Telegram API error: {result}")
return result
def get_updates(self, offset: int | None, timeout_sec: int) -> list[dict[str, Any]]:
payload: dict[str, Any] = {"timeout": timeout_sec, "allowed_updates": ["message", "channel_post"]}
if offset is not None:
payload["offset"] = offset
result = self.call("getUpdates", payload=payload, timeout=timeout_sec + 15)
return result.get("result", [])
def send_message(self, chat_id: int | str, text: str, reply_to_message_id: int | None = None) -> dict[str, Any]:
payload: dict[str, Any] = {"chat_id": chat_id, "text": text}
if reply_to_message_id is not None:
payload["reply_to_message_id"] = reply_to_message_id
return self.call("sendMessage", payload=payload, timeout=30)
def edit_message_text(self, chat_id: int | str, message_id: int, text: str) -> dict[str, Any]:
payload: dict[str, Any] = {"chat_id": chat_id, "message_id": message_id, "text": text}
return self.call("editMessageText", payload=payload, timeout=30)
def send_voice(
self,
chat_id: int | str,
voice: str,
caption: str = "",
reply_to_message_id: int | None = None,
) -> dict[str, Any]:
payload: dict[str, Any] = {"chat_id": chat_id, "voice": voice}
if caption:
payload["caption"] = caption
if reply_to_message_id is not None:
payload["reply_to_message_id"] = reply_to_message_id
return self.call("sendVoice", payload=payload, timeout=60)
def send_audio(
self,
chat_id: int | str,
audio: str,
caption: str = "",
reply_to_message_id: int | None = None,
) -> dict[str, Any]:
payload: dict[str, Any] = {"chat_id": chat_id, "audio": audio}
if caption:
payload["caption"] = caption
if reply_to_message_id is not None:
payload["reply_to_message_id"] = reply_to_message_id
return self.call("sendAudio", payload=payload, timeout=60)
def send_voice_upload(
self,
chat_id: int | str,
voice_bytes: bytes,
filename: str,
caption: str = "",
reply_to_message_id: int | None = None,
) -> dict[str, Any]:
fields: dict[str, Any] = {"chat_id": chat_id}
if caption:
fields["caption"] = caption
if reply_to_message_id is not None:
fields["reply_to_message_id"] = reply_to_message_id
return self.call_multipart(
"sendVoice",
fields=fields,
files={"voice": (filename, voice_bytes, "audio/ogg")},
timeout=180,
)
def delete_webhook(self) -> None:
self.call("deleteWebhook", payload={"drop_pending_updates": False}, timeout=30)
class BotConfig:
def __init__(self, root_dir: Path):
env = dict(os.environ)
env.update(read_env_file(root_dir / ".env"))
self.root_dir = root_dir
self.telegram_bot_token = self._required(env, "TELEGRAM_BOT_TOKEN")
self.allowed_username = normalize_username(env.get("ALLOWED_TELEGRAM_USERNAME", "AidarKC"))
self.allowed_players = parse_allowed_players(env.get("ALLOWED_TELEGRAM_PLAYERS", DEFAULT_ALLOWED_PLAYERS))
self.allowed_channel_username = normalize_username(env.get("ALLOWED_TELEGRAM_CHANNEL_USERNAME", "shine_writing"))
self.bot_username = env.get("BOT_USERNAME", "aidar_su_bot")
self.telegram_api_base_url = env.get("TELEGRAM_API_BASE_URL", "https://api.telegram.org").strip() or "https://api.telegram.org"
self.openai_api_key = env.get("OPENAI_API_KEY", "").strip()
self.openai_transcribe_model = env.get("OPENAI_TRANSCRIBE_MODEL", "gpt-4o-mini-transcribe")
self.telegram_file_download_timeout_seconds = int(env.get("TELEGRAM_FILE_DOWNLOAD_TIMEOUT_SECONDS", "300"))
self.openai_transcribe_timeout_seconds = int(env.get("OPENAI_TRANSCRIBE_TIMEOUT_SECONDS", "900"))
self.openai_transcribe_max_upload_bytes = max(1_000_000, int(env.get("OPENAI_TRANSCRIBE_MAX_UPLOAD_BYTES", str(24 * 1024 * 1024))))
self.openai_transcribe_max_chunk_seconds = max(60, int(env.get("OPENAI_TRANSCRIBE_MAX_CHUNK_SECONDS", "900")))
self.openai_transcribe_overlap_seconds = max(0, int(env.get("OPENAI_TRANSCRIBE_OVERLAP_SECONDS", "2")))
self.openai_transcribe_reencode_bitrate_kbps = max(12, int(env.get("OPENAI_TRANSCRIBE_REENCODE_BITRATE_KBPS", "24")))
self.openai_transcribe_ffmpeg_timeout_seconds = max(30, int(env.get("OPENAI_TRANSCRIBE_FFMPEG_TIMEOUT_SECONDS", "1800")))
self.ffmpeg_bin = env.get("FFMPEG_BIN", "ffmpeg").strip() or "ffmpeg"
self.ffprobe_bin = env.get("FFPROBE_BIN", "ffprobe").strip() or "ffprobe"
self.openai_tts_model = env.get("OPENAI_TTS_MODEL", "gpt-4o-mini-tts")
self.openai_tts_voice = env.get("OPENAI_TTS_VOICE", "alloy")
self.openai_tts_response_format = env.get("OPENAI_TTS_RESPONSE_FORMAT", "opus")
self.openai_tts_timeout_seconds = int(env.get("OPENAI_TTS_TIMEOUT_SECONDS", "180"))
self.openai_tts_chunk_chars = max(500, int(env.get("OPENAI_TTS_CHUNK_CHARS", "3500")))
self.openai_voice_rewrite_model = env.get("OPENAI_VOICE_REWRITE_MODEL", "gpt-4.1-nano")
self.openai_voice_rewrite_timeout_seconds = int(env.get("OPENAI_VOICE_REWRITE_TIMEOUT_SECONDS", "90"))
self.openai_voice_rewrite_max_input_chars = max(1000, int(env.get("OPENAI_VOICE_REWRITE_MAX_INPUT_CHARS", "12000")))
self.openai_voice_rewrite_max_output_tokens = max(200, int(env.get("OPENAI_VOICE_REWRITE_MAX_OUTPUT_TOKENS", "900")))
self.codex_bin = Path(env.get(
"CODEX_BIN",
"/home/ai/.cache/JetBrains/IntelliJIdea2026.1/aia/codex/bin/codex-x86_64-unknown-linux-musl"
))
self.codex_workdir = Path(env.get("CODEX_WORKDIR", "/home/ai/work/SHiNE/SHiNE-server-sha256"))
self.codex_timeout_seconds = int(env.get("CODEX_TIMEOUT_SECONDS", "900"))
self.max_retries = max(1, int(env.get("MAX_RETRIES", "3")))
self.data_dir = (root_dir / env.get("DATA_DIR", "./data")).resolve()
self.agent_instructions_file = (root_dir / "AGENT.md").resolve()
@staticmethod
def _required(env: dict[str, str], key: str) -> str:
value = env.get(key, "").strip()
if not value:
raise RuntimeError(f"Не задан обязательный параметр: {key}")
return value
class ShinePyBotService:
def __init__(self, config: BotConfig):
self.cfg = config
self.telegram = TelegramApi(config.telegram_bot_token, config.telegram_api_base_url)
self.queue_file = config.data_dir / "py_queue.jsonl"
self.state_file = config.data_dir / "py_state.json"
self.processed_updates_file = config.data_dir / "py_processed_updates.log"
self.lock_file = config.data_dir / "py_app.lock"
self.history_dir = config.data_dir / "history"
self.history_archive_dir = self.history_dir / "archive"
self.task_center_dir = config.data_dir / "task_center"
self.task_center_file = self.task_center_dir / "items.json"
self.max_processed_updates = 5000
self.queue_lock = threading.RLock()
self.task_center_lock = threading.RLock()
self.stop_event = threading.Event()
self.worker = threading.Thread(target=self._worker_loop, name="shine-py-bot-worker", daemon=True)
self.queue: list[dict[str, Any]] = []
self.state: dict[str, Any] = {}
self.processed_updates: list[str] = []
self.active_job_id: str | None = None
self.active_job_started_at: float | None = None
self.active_process: subprocess.Popen[str] | None = None
self.active_process_lock = threading.Lock()
self.stop_current_job = False
self.lock_fd = None
self.last_heartbeat_at: float = 0.0
self.restart_requested = False
def _is_owner(self, username: str) -> bool:
return normalize_username(username) == self.cfg.allowed_username
def _is_allowed_player(self, username: str) -> bool:
return normalize_username(username) in self.cfg.allowed_players
def _is_allowed_user(self, username: str) -> bool:
uname = normalize_username(username)
return uname == self.cfg.allowed_username or uname in self.cfg.allowed_players
def _player_name(self, username: str) -> str:
uname = normalize_username(username)
return self.cfg.allowed_players.get(uname, uname)
def _display_name(self, username: str) -> str:
uname = normalize_username(username)
if uname == self.cfg.allowed_username:
return "Айдар"
return self._player_name(uname)
def _known_usernames(self) -> dict[str, str]:
users = {self.cfg.allowed_username: "Айдар"}
users.update(self.cfg.allowed_players)
return users
def _find_user_by_text(self, text: str) -> str:
source = normalize_username(text)
if source in self._known_usernames():
return source
source_lower = (text or "").strip().lower()
aliases = {
"айдар": self.cfg.allowed_username,
"айдару": self.cfg.allowed_username,
"айдара": self.cfg.allowed_username,
"милана": "malvviiina",
"милане": "malvviiina",
"милану": "malvviiina",
"миланы": "malvviiina",
"сергей": "zodiaktechnika32",
"сергею": "zodiaktechnika32",
"сергея": "zodiaktechnika32",
"иван": "oidasyda",
"ивану": "oidasyda",
"ивана": "oidasyda",
"ворон": "blackbyrd1",
"ворону": "blackbyrd1",
"ворона": "blackbyrd1",
"дима": "dimasol1",
"диме": "dimasol1",
"диму": "dimasol1",
"димы": "dimasol1",
}
for alias, username in aliases.items():
if re.search(rf"(^|\W){re.escape(alias)}($|\W)", source_lower, flags=re.IGNORECASE):
return username
for username, name in self._known_usernames().items():
if username and username in source_lower:
return username
if name and name.lower() in source_lower:
return username
return ""
def run(self) -> None:
self._ensure_dirs()
self._acquire_single_instance_lock()
self._load_state()
self._load_queue()
self._load_processed_updates()
self._recover_active_jobs_after_restart()
self.telegram.delete_webhook()
self._init_offset_if_missing()
self.worker.start()
self._append_history_event("service_started", {"allowedUsername": self.cfg.allowed_username})
print(f"[py-bot] Запущен. allowed user: @{self.cfg.allowed_username}", flush=True)
try:
while not self.stop_event.is_set():
try:
offset = self.state.get("offset")
updates = self.telegram.get_updates(offset=offset, timeout_sec=25)
except Exception as e:
print(f"[py-bot] Ошибка getUpdates: {e}", flush=True)
time.sleep(2)
continue
for update in updates:
update_id = update.get("update_id")
if isinstance(update_id, int):
self.state["offset"] = update_id + 1
self._persist_state()
self._handle_update(update)
finally:
self.shutdown()
def shutdown(self) -> None:
if self.stop_event.is_set():
pass
self.stop_event.set()
self._stop_active_codex_process()
if self.worker.is_alive():
self.worker.join(timeout=10)
if self.lock_fd is not None:
try:
fcntl.flock(self.lock_fd, fcntl.LOCK_UN)
finally:
self.lock_fd.close()
self.lock_fd = None
self._append_history_event("service_stopped", {})
def _ensure_dirs(self) -> None:
self.cfg.data_dir.mkdir(parents=True, exist_ok=True)
self.history_dir.mkdir(parents=True, exist_ok=True)
self.history_archive_dir.mkdir(parents=True, exist_ok=True)
self.task_center_dir.mkdir(parents=True, exist_ok=True)
def _acquire_single_instance_lock(self) -> None:
self.lock_file.parent.mkdir(parents=True, exist_ok=True)
self.lock_fd = self.lock_file.open("a+")
try:
fcntl.flock(self.lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
except BlockingIOError:
raise RuntimeError(f"Уже запущен другой инстанс (lock: {self.lock_file})")
def _load_state(self) -> None:
if self.state_file.exists():
self.state = json.loads(self.state_file.read_text(encoding="utf-8"))
else:
self.state = {}
sessions = self.state.get("user_sessions")
if not isinstance(sessions, dict):
sessions = {}
self.state["user_sessions"] = sessions
user_settings = self.state.get("user_settings")
if not isinstance(user_settings, dict):
user_settings = {}
self.state["user_settings"] = user_settings
if not self.state.get("current_history_file"):
history_file = self._create_new_history_file("initial", self.cfg.allowed_username)
self.state["current_history_file"] = str(history_file)
sessions[self.cfg.allowed_username] = {"current_history_file": str(history_file)}
elif self.cfg.allowed_username not in sessions:
sessions[self.cfg.allowed_username] = {"current_history_file": str(self.state["current_history_file"])}
if not isinstance(self.state.get("next_job_number"), int):
self.state["next_job_number"] = 1
self.state["updated_at"] = now_iso()
self._persist_state()
def _persist_state(self) -> None:
self.state["updated_at"] = now_iso()
tmp = self.state_file.with_suffix(".tmp")
tmp.write_text(json.dumps(self.state, ensure_ascii=False, indent=2), encoding="utf-8")
tmp.replace(self.state_file)
def _load_queue(self) -> None:
self.queue = JsonLineStore.load(self.queue_file)
def _persist_queue(self) -> None:
JsonLineStore.save(self.queue_file, self.queue)
def _load_processed_updates(self) -> None:
if not self.processed_updates_file.exists():
self.processed_updates = []
return
lines = [x.strip() for x in self.processed_updates_file.read_text(encoding="utf-8").splitlines() if x.strip()]
if len(lines) > self.max_processed_updates:
lines = lines[-self.max_processed_updates:]
self.processed_updates_file.write_text("\n".join(lines) + "\n", encoding="utf-8")
self.processed_updates = lines
def _mark_processed_update(self, update_key: str) -> bool:
if update_key in self.processed_updates:
return True
self.processed_updates.append(update_key)
if len(self.processed_updates) > self.max_processed_updates:
self.processed_updates = self.processed_updates[-self.max_processed_updates:]
self.processed_updates_file.write_text("\n".join(self.processed_updates) + "\n", encoding="utf-8")
else:
with self.processed_updates_file.open("a", encoding="utf-8") as f:
f.write(update_key + "\n")
return False
def _recover_active_jobs_after_restart(self) -> None:
recovered_ids: list[str] = []
for job in self.queue:
if job.get("status") == "active":
job["status"] = "pending"
job["retry_reason"] = "service_restart_recovery"
job["updated_at"] = now_iso()
recovered_ids.append(job.get("id", ""))
if recovered_ids:
self._persist_queue()
self._append_history_event("active_jobs_recovered", {"jobIds": recovered_ids})
def _init_offset_if_missing(self) -> None:
if self.state.get("offset") is not None:
return
try:
updates = self.telegram.get_updates(offset=None, timeout_sec=0)
if updates:
self.state["offset"] = int(updates[-1]["update_id"]) + 1
else:
self.state["offset"] = 0
self._persist_state()
except Exception as e:
print(f"[py-bot] Не удалось инициализировать offset: {e}", flush=True)
self.state["offset"] = 0
self._persist_state()
def _current_history_file(self) -> Path:
return self._current_history_file_for_user(self.cfg.allowed_username)
def _history_dirs_for_user(self, username: str) -> tuple[Path, Path]:
uname = normalize_username(username) or "unknown"
history_dir = self.history_dir / uname
archive_dir = history_dir / "archive"
history_dir.mkdir(parents=True, exist_ok=True)
archive_dir.mkdir(parents=True, exist_ok=True)
return history_dir, archive_dir
def _ensure_user_session(self, username: str) -> None:
uname = normalize_username(username) or self.cfg.allowed_username
sessions = self.state.setdefault("user_sessions", {})
if not isinstance(sessions, dict):
sessions = {}
self.state["user_sessions"] = sessions
self._user_settings(uname)
session = sessions.get(uname)
if isinstance(session, dict) and session.get("current_history_file"):
return
history_file = self._create_new_history_file("initial", uname)
sessions[uname] = {"current_history_file": str(history_file)}
if uname == self.cfg.allowed_username:
self.state["current_history_file"] = str(history_file)
self._persist_state()
def _user_session_state(self, username: str) -> dict[str, Any]:
uname = normalize_username(username) or self.cfg.allowed_username
self._ensure_user_session(uname)
sessions = self.state.get("user_sessions") or {}
session = sessions.get(uname)
if not isinstance(session, dict):
session = {}
sessions[uname] = session
return session
def _current_history_file_for_user(self, username: str) -> Path:
session = self._user_session_state(username)
return Path(session["current_history_file"])
def _codex_thread_id_for_user(self, username: str) -> str:
thread_id = (self._user_session_state(username).get("codex_thread_id") or "").strip()
return thread_id
def _set_codex_thread_id_for_user(self, username: str, thread_id: str) -> None:
session = self._user_session_state(username)
normalized = (thread_id or "").strip()
if normalized:
session["codex_thread_id"] = normalized
else:
session.pop("codex_thread_id", None)
self._persist_state()
def _create_new_history_file(self, reason: str, username: str) -> Path:
ts = dt.datetime.now().strftime("%Y-%m-%d_%H%M%S")
rnd = "".join(random.choices(string.hexdigits.lower(), k=8))
history_dir, _ = self._history_dirs_for_user(username)
path = history_dir / f"{ts}_{rnd}.jsonl"
JsonLineStore.append(path, {
"ts": now_iso(),
"type": "history_created",
"reason": reason,
"username": normalize_username(username),
})
return path
def _rotate_history(self, reason: str, username: str) -> Path:
uname = normalize_username(username) or self.cfg.allowed_username
current = self._current_history_file_for_user(uname)
_, archive_dir = self._history_dirs_for_user(uname)
if current.exists():
archived = archive_dir / current.name
current.replace(archived)
else:
archived = archive_dir / "(empty)"
new_file = self._create_new_history_file(reason, uname)
sessions = self.state.setdefault("user_sessions", {})
if not isinstance(sessions, dict):
sessions = {}
self.state["user_sessions"] = sessions
previous = sessions.get(uname) if isinstance(sessions.get(uname), dict) else {}
sessions[uname] = {"current_history_file": str(new_file)}
if reason != "command_new" and isinstance(previous, dict):
thread_id = (previous.get("codex_thread_id") or "").strip()
if thread_id:
sessions[uname]["codex_thread_id"] = thread_id
if uname == self.cfg.allowed_username:
self.state["current_history_file"] = str(new_file)
self._persist_state()
self._append_history_event("history_rotated", {"reason": reason, "username": uname, "archived": str(archived)}, username=uname)
return archived
def _user_settings(self, username: str) -> dict[str, Any]:
uname = normalize_username(username) or self.cfg.allowed_username
settings = self.state.get("user_settings")
if not isinstance(settings, dict):
settings = {}
self.state["user_settings"] = settings
user_settings = settings.get(uname)
if not isinstance(user_settings, dict):
user_settings = {}
settings[uname] = user_settings
if not isinstance(user_settings.get("voice_replies_enabled"), bool):
user_settings["voice_replies_enabled"] = True
if not isinstance(user_settings.get("voice_rewrite_enabled"), bool):
user_settings["voice_rewrite_enabled"] = True
if not isinstance(user_settings.get("single_status_message_enabled"), bool):
user_settings["single_status_message_enabled"] = True
return user_settings
def _voice_replies_enabled(self, username: str) -> bool:
return bool(self._user_settings(username).get("voice_replies_enabled"))
def _set_voice_replies_enabled(self, username: str, enabled: bool) -> None:
self._user_settings(username)["voice_replies_enabled"] = enabled
self._persist_state()
def _voice_rewrite_enabled(self, username: str) -> bool:
return bool(self._user_settings(username).get("voice_rewrite_enabled"))
def _set_voice_rewrite_enabled(self, username: str, enabled: bool) -> None:
self._user_settings(username)["voice_rewrite_enabled"] = enabled
self._persist_state()
def _single_status_message_enabled(self, username: str) -> bool:
return bool(self._user_settings(username).get("single_status_message_enabled"))
def _set_single_status_message_enabled(self, username: str, enabled: bool) -> None:
self._user_settings(username)["single_status_message_enabled"] = enabled
self._persist_state()
def _remember_private_chat(self, username: str, chat_id: int) -> None:
uname = normalize_username(username)
if not uname:
return
private_chats = self.state.get("private_chat_ids")
if not isinstance(private_chats, dict):
private_chats = {}
self.state["private_chat_ids"] = private_chats
if private_chats.get(uname) == chat_id:
return
private_chats[uname] = chat_id
self._persist_state()
def _private_chat_id_for_user(self, username: str) -> int | None:
private_chats = self.state.get("private_chat_ids")
if not isinstance(private_chats, dict):
return None
chat_id = private_chats.get(normalize_username(username))
return self._resolve_chat_id(chat_id) if isinstance(chat_id, int) else None
def _append_history(self, history_path: Path, event_type: str, payload: dict[str, Any]) -> None:
row = {"ts": now_iso(), "type": event_type}
row.update(payload)
JsonLineStore.append(history_path, row)
def _append_history_event(self, event_type: str, payload: dict[str, Any], username: str | None = None) -> None:
history_path = self._current_history_file_for_user(username or self.cfg.allowed_username)
self._append_history(history_path, "system_event", {"event": event_type, **payload})
def _load_task_items(self) -> list[dict[str, Any]]:
with self.task_center_lock:
if not self.task_center_file.exists():
return []
try:
data = json.loads(self.task_center_file.read_text(encoding="utf-8"))
except Exception:
return []
return data if isinstance(data, list) else []
def _save_task_items(self, items: list[dict[str, Any]]) -> None:
with self.task_center_lock:
self.task_center_file.parent.mkdir(parents=True, exist_ok=True)
tmp = self.task_center_file.with_suffix(".tmp")
tmp.write_text(json.dumps(items, ensure_ascii=False, indent=2), encoding="utf-8")
tmp.replace(self.task_center_file)
def _next_task_item_id(self, items: list[dict[str, Any]]) -> str:
max_num = 0
for item in items:
raw = str(item.get("id") or "")
match = re.match(r"TC-(\d+)$", raw)
if match:
max_num = max(max_num, int(match.group(1)))
return f"TC-{max_num + 1:04d}"
def _create_task_item(
self,
*,
kind: str,
title: str,
text: str,
source_username: str,
target_username: str,
source_message_id: int | None = None,
source_chat_id: int | None = None,
opinion: str = "",
) -> dict[str, Any]:
items = self._load_task_items()
item = {
"id": self._next_task_item_id(items),
"kind": kind,
"status": "new",
"title": compact_spaces(title)[:160] or ("Предложение" if kind == "proposal" else "Задача"),
"text": (text or "").strip(),
"opinion": (opinion or "").strip(),
"source_username": normalize_username(source_username),
"source_name": self._display_name(source_username),
"target_username": normalize_username(target_username),
"target_name": self._display_name(target_username),
"source_chat_id": source_chat_id,
"source_message_id": source_message_id,
"created_at": now_iso(),
"updated_at": now_iso(),
}
items.append(item)
self._save_task_items(items)
self._append_history_event("task_center_item_created", {
"itemId": item["id"],
"kind": kind,
"sourceUsername": item["source_username"],
"targetUsername": item["target_username"],
"title": item["title"],
}, username=source_username)
return item
def _task_items_for_user(self, username: str, *, include_done: bool = False) -> list[dict[str, Any]]:
uname = normalize_username(username)
items = self._load_task_items()
result = [
item for item in items
if normalize_username(item.get("target_username")) == uname
and (include_done or item.get("status") != "done")
]
return sorted(result, key=lambda x: str(x.get("created_at") or ""))
def _task_center_counts_text(self, username: str) -> str:
counts: dict[str, int] = {}
for item in self._task_items_for_user(username):
status = str(item.get("status") or "new")
counts[status] = counts.get(status, 0) + 1
active_total = sum(counts.values())
if active_total <= 0:
return ""
parts = []
for status in ("new", "approved", "needs_work", "rejected"):
count = counts.get(status, 0)
if count:
parts.append(f"{TASK_STATUS_LABELS.get(status, status)}: {count}")
return f"Напоминание по задачам: всего активных {active_total}; " + ", ".join(parts) + "."
def _format_task_items(self, username: str, *, include_done: bool = False) -> str:
items = self._task_items_for_user(username, include_done=include_done)
if not items:
return f"Для {self._display_name(username)} активных задач и предложений нет."
lines = [f"Задачи и предложения для {self._display_name(username)}:"]
for item in items[:15]:
kind = "предложение" if item.get("kind") == "proposal" else "задача"
status = TASK_STATUS_LABELS.get(str(item.get("status") or "new"), str(item.get("status") or "new"))
source = item.get("source_name") or item.get("source_username") or "неизвестно"
title = item.get("title") or "(без названия)"
lines.append(f"{item.get('id')} [{status}] {kind} от {source}: {title}")
if len(items) > 15:
lines.append(f"...и ещё {len(items) - 15}")
return "\n".join(lines)
def _update_task_item_status(self, item_id: str, status: str) -> dict[str, Any] | None:
item_id = (item_id or "").strip().upper()
items = self._load_task_items()
updated = None
for item in items:
if str(item.get("id") or "").upper() == item_id:
item["status"] = status
item["updated_at"] = now_iso()
updated = item
break
if updated is not None:
self._save_task_items(items)
return updated
def _find_first_task_item(self, *, source_username: str = "", target_username: str = "", kind: str = "") -> dict[str, Any] | None:
source = normalize_username(source_username)
target = normalize_username(target_username)
for item in self._load_task_items():
if item.get("status") == "done":
continue
if source and normalize_username(item.get("source_username")) != source:
continue
if target and normalize_username(item.get("target_username")) != target:
continue
if kind and item.get("kind") != kind:
continue
return item
return None
def _notify_user_about_task_item(self, username: str, item: dict[str, Any]) -> None:
chat_id = self._private_chat_id_for_user(username)
if chat_id is None:
return
kind = "предложение" if item.get("kind") == "proposal" else "задача"
source = item.get("source_name") or item.get("source_username") or "кто-то"
title = item.get("title") or "(без названия)"
status = TASK_STATUS_LABELS.get(str(item.get("status") or "new"), str(item.get("status") or "new"))
if item.get("status") == "new":
text = f"У тебя новое {kind} от {source}: {title}\nID: {item.get('id')}"
else:
text = f"Обновление по {kind} {item.get('id')}: статус «{status}».\n{title}"
self._safe_send(chat_id, text)
def _send_player_welcome_once(self, chat_id: int, message_id: int, username: str) -> None:
uname = normalize_username(username)
sent = self.state.get("player_welcome_sent")
if not isinstance(sent, dict):
sent = {}
self.state["player_welcome_sent"] = sent
if sent.get(uname):
return
player_name = self._player_name(uname)
text = (
f"Привет, {player_name}.\n"
"Можно задавать вопросы по проекту, просить анализ, идеи и подготовку готового ТЗ.\n"
"Команда /new начинает новую Codex-сессию и архивирует текущую историю."
)
reminder = self._task_center_counts_text(uname)
if reminder:
text = f"{text}\n\n{reminder}\nКоманда /tasks покажет список."
self._safe_send(chat_id, text, reply_to=message_id)
sent[uname] = now_iso()
self._persist_state()
def _resolve_chat_id(self, chat_id: int) -> int:
migrations = self.state.get("chat_id_migrations")
if not isinstance(migrations, dict):
return chat_id
current = chat_id
visited: set[int] = set()
while current not in visited:
visited.add(current)
next_chat_id = migrations.get(str(current))
if not isinstance(next_chat_id, int):
break
current = next_chat_id
return current
def _remember_chat_migration(self, old_chat_id: int, new_chat_id: int, source: str) -> None:
if old_chat_id == new_chat_id:
return
migrations = self.state.get("chat_id_migrations")
if not isinstance(migrations, dict):
migrations = {}
self.state["chat_id_migrations"] = migrations
if migrations.get(str(old_chat_id)) == new_chat_id:
return
migrations[str(old_chat_id)] = new_chat_id
self._persist_state()
with self.queue_lock:
changed = False
for job in self.queue:
if job.get("chat_id") == old_chat_id:
job["chat_id"] = new_chat_id
job["updated_at"] = now_iso()
changed = True
if changed:
self._persist_queue()
self._append_history_event("chat_migrated_to_supergroup", {
"oldChatId": old_chat_id,
"newChatId": new_chat_id,
"source": source,
})
@staticmethod
def _extract_migrate_to_chat_id(error_text: str) -> int | None:
match = re.search(r'"migrate_to_chat_id"\s*:\s*(-?\d+)', error_text)
if match:
return int(match.group(1))
match = re.search(r"'migrate_to_chat_id'\s*:\s*(-?\d+)", error_text)
if match:
return int(match.group(1))
return None
def _handle_update(self, update: dict[str, Any]) -> None:
message = update.get("message")
update_type = "message"
if not isinstance(message, dict):
message = update.get("channel_post")
update_type = "channel_post"
if not isinstance(message, dict):
return
chat = message.get("chat") or {}
chat_id = chat.get("id")
message_id = message.get("message_id")
chat_type = str(chat.get("type") or "")
chat_username = normalize_username(chat.get("username"))
chat_title = str(chat.get("title") or "")
sender = message.get("from") or {}
username = normalize_username(sender.get("username"))
author_signature = str(message.get("author_signature") or "").strip()
author_username = username or normalize_username(author_signature)
if not isinstance(chat_id, int) or not isinstance(message_id, int):
return
migrate_to_chat_id = message.get("migrate_to_chat_id")
if isinstance(migrate_to_chat_id, int):
self._remember_chat_migration(chat_id, migrate_to_chat_id, "telegram_message")
return
update_key = f"{chat_id}:{message_id}"
if self._mark_processed_update(update_key):
return
is_channel_post = update_type == "channel_post" or chat_type == "channel"
is_group_message = update_type == "message" and chat_type in ("group", "supergroup")
is_allowed_channel = (
not is_channel_post
or not self.cfg.allowed_channel_username
or chat_username == self.cfg.allowed_channel_username
)
if is_channel_post and not is_allowed_channel:
return
if chat_username and chat_username == self.cfg.allowed_channel_username:
self._remember_public_report_chat(chat_id)
# Игнорируем системные сообщения о входе/выходе и смене заголовка/фото.
if message.get("new_chat_members") or message.get("left_chat_member"):
return
if message.get("group_chat_created") or message.get("supergroup_chat_created") or message.get("channel_chat_created"):
return
text = (message.get("text") or message.get("caption") or "").strip()
actor_username = normalize_username(author_username)
is_allowed = self._is_allowed_user(actor_username)
is_private = chat_type == "private"
if not is_allowed:
if is_private:
self._safe_send(chat_id, "Извините, доступ к этому агенту пока не выдан. Обратитесь к Айдару.", reply_to=message_id)
return
if self._is_allowed_player(actor_username) and not is_private:
return
self._ensure_user_session(actor_username)
if is_private:
self._remember_private_chat(actor_username, chat_id)
history_path = self._current_history_file_for_user(actor_username)
if self._is_allowed_player(actor_username):
self._send_player_welcome_once(chat_id, message_id, actor_username)
if not text:
if message.get("voice"):
self._enqueue_voice_job(
chat_id,
message_id,
actor_username,
message["voice"].get("file_id"),
duration_seconds=message["voice"].get("duration"),
telegram_file_size=message["voice"].get("file_size"),
media_type="voice",
update_type=update_type,
chat_username=chat_username,
chat_title=chat_title,
author_signature=author_signature,
chat_type=chat_type,
)
return
if message.get("audio"):
self._enqueue_voice_job(
chat_id,
message_id,
actor_username,
message["audio"].get("file_id"),
duration_seconds=message["audio"].get("duration"),
telegram_file_size=message["audio"].get("file_size"),
media_type="audio",
update_type=update_type,
chat_username=chat_username,
chat_title=chat_title,
author_signature=author_signature,
chat_type=chat_type,
)
return
self._safe_send(chat_id, "Поддерживаются текст, voice и audio.", reply_to=message_id)
return
if text.startswith("/"):
self._handle_command(chat_id, message_id, actor_username, text)
return
if self._handle_task_center_text(chat_id, message_id, actor_username, text):
return
self._append_history(history_path, "incoming_text", {
"chatId": chat_id,
"messageId": message_id,
"updateType": update_type,
"chatType": chat_type,
"chatUsername": chat_username,
"chatTitle": chat_title,
"username": actor_username,
"authorSignature": author_signature,
"text": text,
})
job = self._build_job_base(chat_id, message_id, actor_username, str(history_path))
job["type"] = "text"
job["text"] = text
job["update_type"] = update_type
job["chat_type"] = chat_type
job["chat_username"] = chat_username
job["chat_title"] = chat_title
job["author_signature"] = author_signature
job["role"] = "owner" if self._is_owner(actor_username) else "player"
job["player_name"] = self._player_name(actor_username) if job["role"] == "player" else ""
with self.queue_lock:
self.queue.append(job)
self._persist_queue()
if chat_type == "private":
self._ensure_job_status_message(
job["id"],
chat_id,
message_id,
f"Задача #{job['num']} получена.\nСтатус: в очереди.",
)
else:
self._safe_send(chat_id, f"Принял задачу #{job['num']}", reply_to=message_id)
def _enqueue_voice_job(
self,
chat_id: int,
message_id: int,
username: str,
file_id: str | None,
*,
duration_seconds: int | None = None,
telegram_file_size: int | None = None,
media_type: str = "voice",
update_type: str = "message",
chat_username: str = "",
chat_title: str = "",
author_signature: str = "",
chat_type: str = "",
) -> None:
if not file_id:
self._safe_send(chat_id, "Не удалось прочитать file_id голосового.", reply_to=message_id)
return
history_path = self._current_history_file_for_user(username)
self._append_history(history_path, "incoming_voice", {
"chatId": chat_id,
"messageId": message_id,
"updateType": update_type,
"chatType": chat_type,
"chatUsername": chat_username,
"chatTitle": chat_title,
"username": username,
"authorSignature": author_signature,
"fileId": file_id,
"mediaType": media_type,
"durationSeconds": duration_seconds,
"fileSize": telegram_file_size,
})
job = self._build_job_base(chat_id, message_id, username, str(history_path))
job["type"] = "voice"
job["telegram_file_id"] = file_id
job["telegram_media_type"] = media_type
job["telegram_duration_seconds"] = duration_seconds or 0
job["telegram_file_size"] = telegram_file_size or 0
job["update_type"] = update_type
job["chat_type"] = chat_type
job["chat_username"] = chat_username
job["chat_title"] = chat_title
job["author_signature"] = author_signature
job["role"] = "owner" if self._is_owner(username) else "player"
job["player_name"] = self._player_name(username) if job["role"] == "player" else ""
with self.queue_lock:
self.queue.append(job)
self._persist_queue()
if chat_type == "private":
self._ensure_job_status_message(
job["id"],
chat_id,
message_id,
f"Voice для задачи #{job['num']} получен.\nСтатус: в очереди.",
)
else:
self._safe_send(chat_id, f"Принял voice в задачу #{job['num']}", reply_to=message_id)
def _build_job_base(self, chat_id: int, message_id: int, username: str, history_file: str) -> dict[str, Any]:
with self.queue_lock:
num = int(self.state.get("next_job_number", 1))
self.state["next_job_number"] = num + 1
self._persist_state()
return {
"id": str(uuid.uuid4()),
"num": num,
"status": "pending",
"type": "text",
"chat_id": chat_id,
"message_id": message_id,
"username": username,
"update_type": "message",
"chat_type": "",
"chat_username": "",
"chat_title": "",
"author_signature": "",
"role": "owner",
"player_name": "",
"text": "",
"telegram_file_id": "",
"telegram_media_type": "",
"history_file": history_file,
"attempts": 0,
"retry_reason": "",
"last_error": "",
"created_at": now_iso(),
"updated_at": now_iso(),
"active_since": None,
"status_message_id": None,
"status_message_text": "",
}
def _handle_task_center_text(self, chat_id: int, message_id: int, username: str, text: str) -> bool:
source_text = (text or "").strip()
lower = source_text.lower()
is_owner = self._is_owner(username)
if re.search(r"\b(покажи|список|какие)\b.*\b(задач|задачи|предложени)", lower):
target = username
explicit_target = self._find_user_by_text(source_text)
if is_owner and explicit_target:
target = explicit_target
self._safe_send(chat_id, self._format_task_items(target), reply_to=message_id)
return True
if is_owner:
assign_match = re.search(
r"(?:поставь|добавь|создай|запиши)\s+(?:задачу|задание)\s+(.+?)(?::|\s+-\s+|\s+—\s+)(.+)",
source_text,
flags=re.IGNORECASE | re.DOTALL,
)
if assign_match:
target = self._find_user_by_text(assign_match.group(1))
body = assign_match.group(2).strip()
if target and body:
item = self._create_task_item(
kind="task",
title=body,
text=body,
source_username=username,
target_username=target,
source_message_id=message_id,
source_chat_id=chat_id,
)
self._notify_user_about_task_item(target, item)
self._safe_send(
chat_id,
f"Задача добавлена для {self._display_name(target)}: {item['id']}{item['title']}",
reply_to=message_id,
)
return True
status_match = re.search(r"\b(одобрить|отклонить|доработать|закрыть|сделано|закрыта)\b(?:\s+(.+))?", lower, flags=re.IGNORECASE)
if status_match and ("задач" in lower or "предложени" in lower or re.search(r"tc-\d+", lower, flags=re.IGNORECASE)):
action = status_match.group(1)
tail = status_match.group(2) or ""
status = {
"одобрить": "approved",
"отклонить": "rejected",
"доработать": "needs_work",
"закрыть": "done",
"сделано": "done",
"закрыта": "done",
}.get(action, "new")
id_match = re.search(r"tc-\d+", source_text, flags=re.IGNORECASE)
item = None
if id_match:
item = self._update_task_item_status(id_match.group(0), status)
else:
source_user = self._find_user_by_text(tail)
item = self._find_first_task_item(
source_username=source_user,
target_username=username,
kind="proposal" if "предложени" in lower else "",
)
if item:
item = self._update_task_item_status(str(item.get("id")), status)
if item:
label = TASK_STATUS_LABELS.get(status, status)
self._safe_send(chat_id, f"{item.get('id')} обновлена: {label}.", reply_to=message_id)
source_user = normalize_username(item.get("source_username"))
if source_user and source_user != username:
self._notify_user_about_task_item(source_user, item)
return True
if not is_owner:
proposal_match = re.match(r"\s*(?:предложение|идея|заявка)\s*[:-]\s*(.+)", source_text, flags=re.IGNORECASE | re.DOTALL)
if proposal_match:
body = proposal_match.group(1).strip()
if body:
item = self._create_task_item(
kind="proposal",
title=body,
text=body,
opinion="Нужно решение Айдара: одобрить, отклонить или отправить на доработку.",
source_username=username,
target_username=self.cfg.allowed_username,
source_message_id=message_id,
source_chat_id=chat_id,
)
self._notify_user_about_task_item(self.cfg.allowed_username, item)
self._safe_send(
chat_id,
f"Предложение отправлено Айдару как {item['id']}. Статус: новая.",
reply_to=message_id,
)
return True
return False
def _handle_command(self, chat_id: int, message_id: int, username: str, text: str) -> None:
lower = text.lower()
command = lower.split(maxsplit=1)[0].split("@", 1)[0]
is_owner = self._is_owner(username)
if command in ("/start", "/help"):
self._safe_send(chat_id, self._help_text(is_owner=is_owner), reply_to=message_id)
return
if command == "/settings":
self._safe_send(chat_id, self._settings_text(username), reply_to=message_id)
return
if command == "/status":
self._safe_send(chat_id, self._status_text(username), reply_to=message_id)
return
if command == "/queue":
self._safe_send(chat_id, self._queue_text(), reply_to=message_id)
return
if command in ("/tasks", "/my_tasks"):
parts = text.split(maxsplit=1)
target = username
if is_owner and len(parts) > 1:
parsed = self._find_user_by_text(parts[1])
if parsed:
target = parsed
self._safe_send(chat_id, self._format_task_items(target), reply_to=message_id)
return
if command == "/voice_on":
self._set_voice_replies_enabled(username, True)
self._append_history_event("voice_replies_enabled", {"username": normalize_username(username)}, username=username)
self._safe_send(chat_id, "Озвучивание финальных ответов включено для вашего пользователя.", reply_to=message_id)
return
if command == "/voice_off":
self._set_voice_replies_enabled(username, False)
self._append_history_event("voice_replies_disabled", {"username": normalize_username(username)}, username=username)
self._safe_send(chat_id, "Озвучивание финальных ответов выключено для вашего пользователя.", reply_to=message_id)
return
if command in ("/voice_status", "/voice_rewrite_status"):
self._safe_send(chat_id, self._status_text(username), reply_to=message_id)
return
if command == "/voice_rewrite_on":
self._set_voice_rewrite_enabled(username, True)
self._append_history_event("voice_rewrite_enabled", {"username": normalize_username(username)}, username=username)
self._safe_send(chat_id, "Адаптация текста перед озвучкой включена для вашего пользователя.", reply_to=message_id)
return
if command == "/voice_rewrite_off":
self._set_voice_rewrite_enabled(username, False)
self._append_history_event("voice_rewrite_disabled", {"username": normalize_username(username)}, username=username)
self._safe_send(chat_id, "Адаптация текста перед озвучкой выключена для вашего пользователя.", reply_to=message_id)
return
if command == "/single_message_on":
self._set_single_status_message_enabled(username, True)
self._append_history_event("single_status_message_enabled", {"username": normalize_username(username)}, username=username)
self._safe_send(chat_id, "Режим одного редактируемого сообщения в личке включён для вашего пользователя.", reply_to=message_id)
return
if command == "/single_message_off":
self._set_single_status_message_enabled(username, False)
self._append_history_event("single_status_message_disabled", {"username": normalize_username(username)}, username=username)
self._safe_send(chat_id, "Режим одного редактируемого сообщения в личке выключен. Бот будет отправлять отдельные сообщения по этапам.", reply_to=message_id)
return
if command == "/new":
archived = self._rotate_history("command_new", username)
self._safe_send(chat_id, f"История очищена. Новый диалог начат.\nАрхив: {archived.name}", reply_to=message_id)
return
if command in ("/restart_service", "/restart"):
if not is_owner:
self._safe_send(chat_id, "Команда недоступна.", reply_to=message_id)
return
self._append_history_event("restart_service_deferred_requested", {
"chatId": chat_id,
"messageId": message_id,
"username": username,
}, username=username)
self._safe_send(
chat_id,
"Отложенный рестарт принят. Если задача сейчас выполняется, сервис перезапустится после её завершения и до следующей задачи.",
reply_to=message_id,
)
self._request_deferred_restart()
return
if command in ("/restart_hard", "/restart_now", "/restart_force"):
if not is_owner:
self._safe_send(chat_id, "Команда недоступна.", reply_to=message_id)
return
self._append_history_event("restart_service_hard_requested", {
"chatId": chat_id,
"messageId": message_id,
"username": username,
}, username=username)
self._safe_send(
chat_id,
"Выполняю жёсткий рестарт сервиса прямо сейчас. Активная задача, если есть, будет прервана и после старта вернётся в очередь.",
reply_to=message_id,
)
self._schedule_self_restart("hard_restart_requested", force=True)
return
if command == "/stop":
stopped = self._cancel_active_job("stopped_by_user")
if stopped:
self._safe_send(chat_id, "Текущая задача остановлена и удалена из очереди.", reply_to=message_id)
else:
self._safe_send(chat_id, "Сейчас нет активной задачи.", reply_to=message_id)
return
if command == "/cancel":
parts = text.split(maxsplit=1)
if len(parts) < 2:
self._safe_send(chat_id, "Использование: /cancel <id|all>", reply_to=message_id)
return
arg = parts[1].strip()
if arg.lower() == "all":
with self.queue_lock:
self.stop_current_job = True
self._stop_active_codex_process()
count = len(self.queue)
self.queue = []
self._persist_queue()
self._safe_send(chat_id, f"Удалено задач из очереди: {count}", reply_to=message_id)
return
cancelled = self._cancel_by_id_prefix(arg)
self._safe_send(chat_id, f"Задача удалена: {arg}" if cancelled else f"Задача не найдена: {arg}", reply_to=message_id)
return
def _help_text(self, *, is_owner: bool) -> str:
lines = [
"Доступные команды:",
"/status — активная задача и размер очереди",
"/settings — текущие настройки и команды для их изменения",
"/queue — список задач в очереди",
"/tasks — список ваших задач и предложений",
"/stop — остановить текущую задачу",
"/cancel <id|all> — удалить задачу по id (префикс) или все",
"/new — архивировать историю и начать новую Codex-сессию",
"/help — эта справка",
]
if is_owner:
lines.insert(-1, "/tasks <пользователь> — список задач игрока")
lines.insert(-1, "/restart — отложенный рестарт после текущей задачи")
lines.insert(-1, "/restart_hard — жёсткий рестарт прямо сейчас")
return "\n".join(lines)
def _settings_text(self, username: str) -> str:
voice_status = "включено" if self._voice_replies_enabled(username) else "выключено"
rewrite_status = "включена" if self._voice_rewrite_enabled(username) else "выключена"
single_message_status = "включён" if self._single_status_message_enabled(username) else "выключен"
lines = [
"Текущие настройки:",
f"Озвучивание финальных ответов: {voice_status}",
f"Адаптация текста перед озвучкой: {rewrite_status}",
f"Режим одного редактируемого сообщения в личке: {single_message_status}",
"",
"Команды настроек:",
"/voice_on — включить озвучивание",
"/voice_off — выключить озвучивание",
"/voice_rewrite_on — адаптировать текст перед озвучкой",
"/voice_rewrite_off — озвучивать обычный текст без адаптации",
"/single_message_on — один редактируемый ответ в личке",
"/single_message_off — отдельные сообщения по этапам и финалу",
]
return "\n".join(lines)
def _status_text(self, username: str) -> str:
with self.queue_lock:
active = next((j for j in self.queue if j.get("status") == "active"), None)
pending = sum(1 for j in self.queue if j.get("status") == "pending")
voice_status = "включено" if self._voice_replies_enabled(username) else "выключено"
rewrite_status = "включена" if self._voice_rewrite_enabled(username) else "выключена"
single_message_status = "включён" if self._single_status_message_enabled(username) else "выключен"
settings_text = (
f"Голосовые ответы: {voice_status}\n"
f"Адаптация текста перед озвучкой: {rewrite_status}\n"
f"Режим одного сообщения в личке: {single_message_status}"
)
restart_text = "\nОтложенный рестарт: ожидает завершения текущей задачи" if self.restart_requested else ""
if not active:
return f"Статус: активной задачи нет.\nВ очереди pending: {pending}\n{settings_text}{restart_text}"
elapsed = int(time.time() - (self.active_job_started_at or time.time()))
return (
f"Статус: активная задача #{active.get('num', '?')}\n"
f"Тип: {active.get('type', 'text')}\n"
f"Попытка: {int(active.get('attempts', 0)) + 1}/{self.cfg.max_retries}\n"
f"Выполняется: {elapsed}с\n"
f"Pending: {pending}\n"
f"{settings_text}{restart_text}"
)
def _queue_text(self) -> str:
with self.queue_lock:
items = list(self.queue)
if not items:
return "Очередь пуста."
lines = [f"Очередь: {len(items)}"]
for i, job in enumerate(items[:10], start=1):
lines.append(
f"{i}) #{job.get('num', '?')} [{job.get('status')}] {job.get('type')} attempts={job.get('attempts', 0)}"
)
if len(items) > 10:
lines.append(f"...и ещё {len(items) - 10} задач")
return "\n".join(lines)
def _cancel_active_job(self, reason: str) -> bool:
with self.queue_lock:
active = next((j for j in self.queue if j.get("status") == "active"), None)
if not active:
return False
self.stop_current_job = True
self._stop_active_codex_process()
self.queue = [j for j in self.queue if j.get("id") != active.get("id")]
self._persist_queue()
self._append_history_event("job_stopped_by_user", {"jobId": active.get("id"), "reason": reason})
return True
def _cancel_by_id_prefix(self, prefix: str) -> bool:
prefix = prefix.strip().lower()
normalized_num = prefix.lstrip("#")
with self.queue_lock:
target = next(
(
j for j in self.queue
if str(j.get("id", "")).lower().startswith(prefix)
or str(j.get("num", "")).lower() == normalized_num
),
None
)
if not target:
return False
if target.get("status") == "active":
self.stop_current_job = True
self._stop_active_codex_process()
self.queue = [j for j in self.queue if j.get("id") != target.get("id")]
self._persist_queue()
return True
def _worker_loop(self) -> None:
while not self.stop_event.is_set():
if self.restart_requested:
self._exit_for_restart("deferred_restart_before_next_job")
return
job = None
with self.queue_lock:
for item in self.queue:
if item.get("status") == "pending":
item["status"] = "active"
item["active_since"] = now_iso()
item["updated_at"] = now_iso()
self.active_job_id = item.get("id")
self.active_job_started_at = time.time()
job = dict(item)
self._persist_queue()
break
if not job:
time.sleep(0.5)
continue
self.stop_current_job = False
self._process_job(job)
self.active_job_id = None
self.active_job_started_at = None
if self.restart_requested:
self._exit_for_restart("deferred_restart_after_job")
return
def _process_job(self, job: dict[str, Any]) -> None:
job_id = job["id"]
job_num = job.get("num", "?")
chat_id = int(job["chat_id"])
message_id = int(job["message_id"])
history_path = Path(job["history_file"])
private_single_message = (
(job.get("chat_type") or "") == "private"
and self._single_status_message_enabled(job.get("username") or "")
)
self._set_job_status_text(job, f"Задача #{job_num} в работе.\nСтатус: выполняется.")
try:
if job.get("type") == "voice":
self._set_job_status_text(job, f"Задача #{job_num} в работе.\nСтатус: распознаю voice.")
recognized = self._transcribe_voice_job(
job,
status_cb=lambda note: self._set_job_status_text(
job,
f"Задача #{job_num} в работе.\nСтатус: {note}",
),
)
job["text"] = recognized
self._append_history(history_path, "voice_transcription", {"jobId": job_id, "jobNum": job_num, "text": recognized})
preview = recognized.strip()
if len(preview) > 800:
preview = preview[:800].rstrip() + " ...[обрезано]"
self._set_job_status_text(
job,
f"Задача #{job_num} в работе.\nСтатус: voice распознан, отправляю в Codex.\n\nТекст:\n{preview}",
)
prompt = self._build_prompt(job)
self._append_history(history_path, "codex_request", {"jobId": job_id, "prompt": prompt})
self._set_job_status_text(job, f"Задача #{job_num} в работе.\nСтатус: выполняю через Codex.")
answer = self._run_codex(prompt, job)
if private_single_message:
parts = split_final_private_text(answer)
self._set_job_status_text(job, parts[0])
if len(parts) > 1:
self._safe_send(chat_id, parts[1], reply_to=message_id)
else:
for chunk in split_long_text(answer):
self._safe_send(chat_id, chunk, reply_to=message_id)
self._append_history(history_path, "codex_response", {"jobId": job_id, "text": answer})
self._send_private_job_public_report(job, answer)
self._send_task_center_reminder(job)
if self._voice_replies_enabled(job.get("username") or ""):
self._send_voice_reply_for_answer(job, answer, history_path, job_id)
self._mark_job_done(job_id)
except Exception as e:
if self.stop_current_job:
self._append_history(history_path, "job_stopped", {"jobId": job_id, "reason": str(e)})
self._set_job_status_text(job, f"Задача #{job_num} остановлена.")
self._mark_job_removed(job_id)
self.stop_current_job = False
return
if isinstance(e, VoiceTranscriptionError):
self._append_history(history_path, "voice_transcription_failed", {
"jobId": job_id,
"jobNum": job_num,
"stage": e.stage,
"retryable": e.retryable,
"error": e.user_message,
"detail": e.detail,
})
self._handle_job_failure(job, e)
def _build_prompt(self, job: dict[str, Any]) -> str:
retry_block = ""
retry_reason = (job.get("retry_reason") or "").strip()
if retry_reason:
retry_block = f"\n\nПометка retry: {retry_reason}"
role = (job.get("role") or "owner").strip().lower()
player_block = ""
if role == "player":
player_name = (job.get("player_name") or "").strip()
player_dir = self.cfg.codex_workdir / "Players" / (job.get("username") or "")
player_block = (
"\n\nРежим игрока (обязательно):\n"
f"- Пользователь: {player_name} (@{job.get('username')}).\n"
f"- Рабочая папка игрока: {player_dir}\n"
"- Код проекта не изменять.\n"
"- Можно отвечать на вопросы по проекту, предлагать идеи и готовить ТЗ.\n"
"- Если нужны правки кода, описывать предложение текстом и сохранять материалы только в папке игрока."
)
return (
"Пришло сообщение в Telegram.\n"
f"Тип: {job.get('type')}\n"
f"Источник Telegram: {job.get('update_type', 'message')}\n"
f"Тип чата: {job.get('chat_type') or ''}\n"
f"Канал/чат: @{job.get('chat_username') or ''} {job.get('chat_title') or ''}\n"
f"Username отправителя: @{job.get('username')}\n"
f"Подпись автора в Telegram: {job.get('author_signature') or ''}\n"
"Текст для обработки:\n"
f"{job.get('text')}\n\n"
f"История диалога (JSONL): {job.get('history_file')}\n"
f"Инструкции агента: {self.cfg.agent_instructions_file}\n"
f"Работай в рабочем проекте аккуратно и верни только текст ответа пользователю.{player_block}{retry_block}"
)
def _run_codex(self, prompt: str, job: dict[str, Any]) -> str:
username = job.get("username") or self.cfg.allowed_username
thread_id = self._codex_thread_id_for_user(username)
try:
return self._run_codex_once(prompt, job, thread_id=thread_id)
except RuntimeError as e:
if not thread_id or not self._is_missing_codex_session_error(str(e)):
raise
self._set_codex_thread_id_for_user(username, "")
self._append_history(
Path(job["history_file"]),
"system_event",
{
"event": "codex_thread_reset",
"reason": "missing_session",
"username": normalize_username(username),
"oldThreadId": thread_id,
},
)
return self._run_codex_once(prompt, job, thread_id="")
def _run_codex_once(self, prompt: str, job: dict[str, Any], *, thread_id: str) -> str:
output_lines: list[str] = []
job_id = str(job["id"])
job_num = job.get("num", "?")
username = job.get("username") or self.cfg.allowed_username
with tempfile.NamedTemporaryFile(prefix="shine-codex-last-message-", suffix=".txt", delete=False) as tmp:
output_file = Path(tmp.name)
cmd = [
str(self.cfg.codex_bin),
"exec",
"--dangerously-bypass-approvals-and-sandbox",
"--json",
"-C", str(self.cfg.codex_workdir),
"-o", str(output_file),
]
if thread_id:
cmd.extend(["resume", thread_id])
cmd.append(prompt)
mode = f"resume {thread_id}" if thread_id else "new"
print(f"[py-bot] codex exec start job={job_id[:8]} mode={mode}", flush=True)
process = subprocess.Popen(
cmd,
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
encoding="utf-8",
errors="replace",
bufsize=1,
)
with self.active_process_lock:
self.active_process = process
self.last_heartbeat_at = 0.0
last_user_note = ""
last_user_note_at = 0.0
codex_started_at = time.time()
last_job_message_at = codex_started_at
seen_thread_id = ""
def on_line(line: str) -> None:
nonlocal last_user_note, last_user_note_at, last_job_message_at, seen_thread_id
output_lines.append(line)
current_thread_id = self._extract_codex_thread_id(line)
if current_thread_id:
seen_thread_id = current_thread_id
note = self._extract_codex_user_note(line)
now = time.time()
if note and note != last_user_note and now - last_user_note_at > 8:
self._set_job_status_text(job, f"Задача #{job_num} в работе.\nСтатус: {note}")
last_user_note = note
last_user_note_at = now
last_job_message_at = now
reader_done = threading.Event()
def reader() -> None:
if not process.stdout:
reader_done.set()
return
for line in process.stdout:
on_line(line.rstrip("\n"))
reader_done.set()
t = threading.Thread(target=reader, name=f"codex-reader-{job_id[:8]}", daemon=True)
t.start()
try:
deadline = time.time() + self.cfg.codex_timeout_seconds
return_code = None
while return_code is None:
return_code = process.poll()
now = time.time()
if return_code is not None:
break
if now >= deadline:
process.kill()
t.join(timeout=2)
raise RuntimeError(f"Codex timeout after {self.cfg.codex_timeout_seconds}s")
if now - codex_started_at >= 120 and now - last_job_message_at >= 120:
elapsed = self._format_duration(int(now - codex_started_at))
self._set_job_status_text(
job,
f"Задача #{job_num} в работе.\nСтатус: выполняется уже {elapsed}, от Codex давно нет новых сообщений.",
)
last_job_message_at = now
self.last_heartbeat_at = now
time.sleep(1)
finally:
with self.active_process_lock:
self.active_process = None
reader_done.wait(timeout=2)
if return_code != 0:
tail = "\n".join(output_lines[-40:])
raise RuntimeError(f"Codex exited with code {return_code}. Output tail:\n{tail}")
if seen_thread_id and seen_thread_id != thread_id:
self._set_codex_thread_id_for_user(username, seen_thread_id)
if output_file.exists():
answer = output_file.read_text(encoding="utf-8").strip()
try:
output_file.unlink(missing_ok=True)
except Exception:
pass
if answer:
return answer
fallback = self._extract_fallback_message(output_lines)
if not fallback:
raise RuntimeError("Codex returned empty response")
return fallback
def _stop_active_codex_process(self) -> bool:
with self.active_process_lock:
process = self.active_process
if process is None:
return False
if process.poll() is not None:
return False
process.terminate()
try:
process.wait(timeout=2)
except subprocess.TimeoutExpired:
process.kill()
return True
def _handle_job_failure(self, job: dict[str, Any], err: Exception) -> None:
job_id = job["id"]
job_num = job.get("num", "?")
error_text = str(err).strip() or err.__class__.__name__
user_error_text = self._user_error_text(err)
retryable = not isinstance(err, VoiceTranscriptionError) or err.retryable
log_error_text = err.log_text() if isinstance(err, VoiceTranscriptionError) else error_text
print(f"[py-bot] Ошибка job={job_id[:8]}: {log_error_text}", flush=True)
print(traceback.format_exc(), flush=True)
with self.queue_lock:
target = next((j for j in self.queue if j.get("id") == job_id), None)
if not target:
return
attempts = int(target.get("attempts", 0)) + 1
target["attempts"] = attempts
target["last_error"] = error_text[:1000]
target["updated_at"] = now_iso()
if retryable and attempts < self.cfg.max_retries:
target["status"] = "pending"
target["retry_reason"] = error_text[:200]
self._persist_queue()
will_retry = True
else:
self.queue = [j for j in self.queue if j.get("id") != job_id]
self._persist_queue()
will_retry = False
if will_retry:
self._set_job_status_text(
job,
f"{user_error_text}\nПовторю задачу #{job_num}: попытка {attempts + 1}/{self.cfg.max_retries}.",
)
else:
self._set_job_status_text(job, f"{user_error_text}\nЗадача #{job_num} остановлена.")
def _user_error_text(self, err: Exception) -> str:
if isinstance(err, VoiceTranscriptionError):
return f"Не удалось распознать голосовое: {err.user_message}"
error_text = str(err).strip() or err.__class__.__name__
return f"Ошибка выполнения задачи: {error_text}"
def _mark_job_done(self, job_id: str) -> None:
with self.queue_lock:
self.queue = [j for j in self.queue if j.get("id") != job_id]
self._persist_queue()
def _mark_job_removed(self, job_id: str) -> None:
with self.queue_lock:
self.queue = [j for j in self.queue if j.get("id") != job_id]
self._persist_queue()
def _send_task_center_reminder(self, job: dict[str, Any]) -> None:
if job.get("chat_type") != "private":
return
username = job.get("username") or ""
reminder = self._task_center_counts_text(username)
if not reminder:
return
self._safe_send(int(job["chat_id"]), reminder, reply_to=int(job["message_id"]))
def _remember_public_report_chat(self, chat_id: int) -> None:
if self.state.get("public_report_chat_id") == chat_id:
return
self.state["public_report_chat_id"] = chat_id
self.state["updated_at"] = now_iso()
self._persist_state()
def _public_report_chat_id(self) -> int | str | None:
chat_id = self.state.get("public_report_chat_id")
if isinstance(chat_id, int):
return self._resolve_chat_id(chat_id)
if self.cfg.allowed_channel_username:
return f"@{self.cfg.allowed_channel_username}"
return None
def _send_private_job_public_report(self, job: dict[str, Any], answer: str) -> None:
if job.get("chat_type") != "private":
return
report_chat_id = self._public_report_chat_id()
if report_chat_id is None:
return
job_num = job.get("num", "?")
source_text = (job.get("text") or "").strip()
if not source_text:
source_text = "(пустой текст запроса)"
role = (job.get("role") or "owner").strip().lower()
author_label = "Айдар"
if role == "player":
player_name = (job.get("player_name") or "").strip() or job.get("username") or "Игрок"
author_label = f"{player_name} (@{job.get('username')})"
if job.get("type") == "voice":
voice_file_id = (job.get("telegram_file_id") or "").strip()
media_type = (job.get("telegram_media_type") or "voice").strip()
request_caption = self._trim_telegram_caption(
f"{author_label} сделал {media_type}-запрос, задача #{job_num}.\n\n"
f"Распознанный текст:\n{source_text}"
)
request_message_id = None
if voice_file_id:
request_message_id = self._safe_send_telegram_file(
report_chat_id,
voice_file_id,
media_type=media_type,
caption=request_caption,
)
if request_message_id is None:
request_message_id = self._safe_send(report_chat_id, request_caption)
else:
request_report = (
f"{author_label} сделал запрос, задача #{job_num}.\n\n"
f"{source_text}"
)
request_message_id = self._safe_send(report_chat_id, request_report)
if request_message_id is None:
return
answer_text = (answer or "").strip() or "(пустой ответ)"
answer_chunks = split_long_text(f"Ответ на задачу #{job_num}:\n\n{answer_text}")
for chunk in answer_chunks:
self._safe_send(report_chat_id, chunk, reply_to=request_message_id)
@staticmethod
def _trim_telegram_caption(text: str, limit: int = 1000) -> str:
text = (text or "").strip()
if len(text) <= limit:
return text
return text[:limit].rstrip() + "\n...[обрезано]"
def _safe_send_telegram_file(
self,
chat_id: int | str,
file_id: str,
*,
media_type: str = "voice",
caption: str = "",
reply_to: int | None = None,
) -> int | None:
file_id = (file_id or "").strip()
if not file_id:
return None
caption = self._trim_telegram_caption(caption)
resolved_chat_id: int | str = self._resolve_chat_id(chat_id) if isinstance(chat_id, int) else chat_id
resolved_reply_to = reply_to if resolved_chat_id == chat_id or isinstance(chat_id, str) else None
def send(target_chat_id: int | str, target_reply_to: int | None) -> dict[str, Any]:
if media_type == "audio":
return self.telegram.send_audio(target_chat_id, file_id, caption=caption, reply_to_message_id=target_reply_to)
return self.telegram.send_voice(target_chat_id, file_id, caption=caption, reply_to_message_id=target_reply_to)
try:
sent = send(resolved_chat_id, resolved_reply_to)
result = sent.get("result") or {}
message_id = result.get("message_id")
return message_id if isinstance(message_id, int) else None
except Exception as e:
migrate_to_chat_id = self._extract_migrate_to_chat_id(str(e))
if migrate_to_chat_id is not None:
if isinstance(resolved_chat_id, int):
self._remember_chat_migration(resolved_chat_id, migrate_to_chat_id, "send_file_error")
try:
sent = send(migrate_to_chat_id, None)
result = sent.get("result") or {}
message_id = result.get("message_id")
return message_id if isinstance(message_id, int) else None
except Exception as retry_error:
print(f"[py-bot] sendFile retry after migration error: {retry_error}", flush=True)
return None
print(f"[py-bot] sendFile error: {e}", flush=True)
return None
def _safe_send_voice_upload(
self,
chat_id: int | str,
voice_bytes: bytes,
filename: str,
*,
caption: str = "",
reply_to: int | None = None,
) -> int | None:
if not voice_bytes:
return None
caption = self._trim_telegram_caption(caption)
resolved_chat_id: int | str = self._resolve_chat_id(chat_id) if isinstance(chat_id, int) else chat_id
resolved_reply_to = reply_to if resolved_chat_id == chat_id or isinstance(chat_id, str) else None
def send(target_chat_id: int | str, target_reply_to: int | None) -> dict[str, Any]:
return self.telegram.send_voice_upload(
target_chat_id,
voice_bytes,
filename,
caption=caption,
reply_to_message_id=target_reply_to,
)
try:
sent = send(resolved_chat_id, resolved_reply_to)
result = sent.get("result") or {}
message_id = result.get("message_id")
return message_id if isinstance(message_id, int) else None
except Exception as e:
migrate_to_chat_id = self._extract_migrate_to_chat_id(str(e))
if migrate_to_chat_id is not None:
if isinstance(resolved_chat_id, int):
self._remember_chat_migration(resolved_chat_id, migrate_to_chat_id, "send_voice_upload_error")
try:
sent = send(migrate_to_chat_id, None)
result = sent.get("result") or {}
message_id = result.get("message_id")
return message_id if isinstance(message_id, int) else None
except Exception as retry_error:
print(f"[py-bot] sendVoiceUpload retry after migration error: {retry_error}", flush=True)
return None
print(f"[py-bot] sendVoiceUpload error: {e}", flush=True)
return None
def _safe_send(self, chat_id: int | str, text: str, reply_to: int | None = None) -> int | None:
text = (text or "").strip()
if not text:
return None
if len(text) > 3900:
text = text[:3900] + "\n...[обрезано]"
resolved_chat_id: int | str = self._resolve_chat_id(chat_id) if isinstance(chat_id, int) else chat_id
resolved_reply_to = reply_to if resolved_chat_id == chat_id or isinstance(chat_id, str) else None
try:
sent = self.telegram.send_message(resolved_chat_id, text, reply_to_message_id=resolved_reply_to)
result = sent.get("result") or {}
message_id = result.get("message_id")
return message_id if isinstance(message_id, int) else None
except Exception as e:
migrate_to_chat_id = self._extract_migrate_to_chat_id(str(e))
if migrate_to_chat_id is not None:
if isinstance(resolved_chat_id, int):
self._remember_chat_migration(resolved_chat_id, migrate_to_chat_id, "send_message_error")
try:
sent = self.telegram.send_message(migrate_to_chat_id, text, reply_to_message_id=None)
result = sent.get("result") or {}
message_id = result.get("message_id")
return message_id if isinstance(message_id, int) else None
except Exception as retry_error:
print(f"[py-bot] sendMessage retry after migration error: {retry_error}", flush=True)
return None
print(f"[py-bot] sendMessage error: {e}", flush=True)
return None
def _safe_edit(self, chat_id: int | str, message_id: int | None, text: str) -> bool:
text = (text or "").strip()
if not text or not message_id:
return False
if len(text) > 3900:
text = text[:3900] + "\n...[обрезано]"
resolved_chat_id: int | str = self._resolve_chat_id(chat_id) if isinstance(chat_id, int) else chat_id
try:
self.telegram.edit_message_text(resolved_chat_id, message_id, text)
return True
except Exception as e:
error_text = str(e)
if "message is not modified" in error_text:
return True
migrate_to_chat_id = self._extract_migrate_to_chat_id(error_text)
if migrate_to_chat_id is not None:
if isinstance(resolved_chat_id, int):
self._remember_chat_migration(resolved_chat_id, migrate_to_chat_id, "edit_message_error")
try:
self.telegram.edit_message_text(migrate_to_chat_id, message_id, text)
return True
except Exception as retry_error:
print(f"[py-bot] editMessageText retry after migration error: {retry_error}", flush=True)
return False
print(f"[py-bot] editMessageText error: {e}", flush=True)
return False
def _ensure_job_status_message(self, job_id: str, chat_id: int, reply_to_message_id: int, text: str) -> int | None:
text = (text or "").strip()
if not text:
return None
with self.queue_lock:
target = next((j for j in self.queue if j.get("id") == job_id), None)
if target:
existing_id = target.get("status_message_id")
existing_text = (target.get("status_message_text") or "").strip()
else:
existing_id = None
existing_text = ""
if existing_id and existing_text == text and self._safe_edit(chat_id, int(existing_id), text):
return int(existing_id)
if existing_id and self._safe_edit(chat_id, int(existing_id), text):
with self.queue_lock:
target = next((j for j in self.queue if j.get("id") == job_id), None)
if target:
target["status_message_text"] = text
target["updated_at"] = now_iso()
self._persist_queue()
return int(existing_id)
message_id = self._safe_send(chat_id, text, reply_to=reply_to_message_id)
if message_id is not None:
with self.queue_lock:
target = next((j for j in self.queue if j.get("id") == job_id), None)
if target:
target["status_message_id"] = message_id
target["status_message_text"] = text
target["updated_at"] = now_iso()
self._persist_queue()
return message_id
def _set_job_status_text(self, job: dict[str, Any], text: str) -> None:
if (job.get("chat_type") or "") != "private":
self._safe_send(int(job["chat_id"]), text, reply_to=int(job["message_id"]))
return
if not self._single_status_message_enabled(job.get("username") or ""):
self._safe_send(int(job["chat_id"]), text, reply_to=int(job["message_id"]))
return
message_id = self._ensure_job_status_message(
job["id"],
int(job["chat_id"]),
int(job["message_id"]),
text,
)
if message_id is not None:
job["status_message_id"] = message_id
job["status_message_text"] = text
def _request_deferred_restart(self) -> None:
if self.restart_requested:
return
self.restart_requested = True
self._append_history_event("restart_service_deferred_scheduled", {})
with self.queue_lock:
has_active = any(j.get("status") == "active" for j in self.queue)
if not has_active:
threading.Thread(
target=lambda: self._exit_for_restart("deferred_restart_no_active_job"),
name="shine-py-bot-deferred-restart",
daemon=True,
).start()
def _exit_for_restart(self, reason: str) -> None:
print(f"[py-bot] restart now: {reason}", flush=True)
self._append_history_event("restart_service_executing", {"reason": reason})
time.sleep(0.5)
os._exit(0)
def _schedule_self_restart(self, reason: str = "restart_requested", *, force: bool = False) -> None:
if self.restart_requested and not force:
return
self.restart_requested = True
def restart() -> None:
time.sleep(1.5)
print(f"[py-bot] restart requested by Telegram command: {reason}", flush=True)
if force:
self._stop_active_codex_process()
os._exit(0)
threading.Thread(target=restart, name="shine-py-bot-self-restart", daemon=True).start()
def _voice_reply_targets(self, job: dict[str, Any]) -> list[tuple[int | str, int | None, str]]:
chat_id = int(job["chat_id"])
message_id = int(job["message_id"])
targets: list[tuple[int | str, int | None, str]] = [(chat_id, message_id, "source")]
username = job.get("username") or ""
private_chat_id = self._private_chat_id_for_user(username)
if private_chat_id is not None and private_chat_id != self._resolve_chat_id(chat_id):
targets.append((private_chat_id, None, "private"))
report_chat_id = self._public_report_chat_id()
if report_chat_id is not None:
resolved_report_chat_id = self._resolve_chat_id(report_chat_id) if isinstance(report_chat_id, int) else report_chat_id
resolved_current_chat_id: int | str = self._resolve_chat_id(chat_id)
if resolved_report_chat_id != resolved_current_chat_id:
targets.append((report_chat_id, None, "public"))
deduped: list[tuple[int | str, int | None, str]] = []
seen: set[str] = set()
for target_chat_id, target_reply_to, label in targets:
resolved: int | str = self._resolve_chat_id(target_chat_id) if isinstance(target_chat_id, int) else target_chat_id
key = str(resolved)
if key in seen:
continue
seen.add(key)
deduped.append((target_chat_id, target_reply_to, label))
return deduped
def _send_voice_reply_for_answer(
self,
job: dict[str, Any],
answer: str,
history_path: Path,
job_id: str,
) -> None:
chat_id = int(job["chat_id"])
message_id = int(job["message_id"])
job_num = job.get("num", "?")
username = job.get("username") or ""
if not self.cfg.openai_api_key:
note = "не настроен ключ OpenAI для озвучивания."
self._append_history(history_path, "voice_reply_failed", {"jobId": job_id, "jobNum": job_num, "error": note})
self._safe_send(chat_id, f"Озвучивание включено, но {note}", reply_to=message_id)
return
voice_text = answer
rewrite_enabled = self._voice_rewrite_enabled(username)
if rewrite_enabled:
try:
voice_text = self._openai_rewrite_text_for_voice(answer)
self._append_history(history_path, "voice_rewrite_done", {
"jobId": job_id,
"jobNum": job_num,
"model": self.cfg.openai_voice_rewrite_model,
"sourceChars": len(answer or ""),
"resultChars": len(voice_text or ""),
})
except VoiceReplyError as e:
self._append_history(history_path, "voice_rewrite_failed", {
"jobId": job_id,
"jobNum": job_num,
"model": self.cfg.openai_voice_rewrite_model,
"error": str(e),
})
self._safe_send(
chat_id,
f"Не удалось адаптировать ответ #{job_num} для озвучки, озвучиваю обычный текст: {e}",
reply_to=message_id,
)
voice_text = answer
chunks = split_text_for_tts(voice_text, self.cfg.openai_tts_chunk_chars)
if not chunks:
return
sent_count = 0
total = len(chunks)
targets = self._voice_reply_targets(job)
print(f"[py-bot] tts start job={str(job_id)[:8]} chunks={total} targets={len(targets)} rewrite={rewrite_enabled}", flush=True)
for index, chunk in enumerate(chunks, start=1):
try:
audio = self._openai_tts(chunk)
except VoiceReplyError as e:
self._append_history(history_path, "voice_reply_failed", {
"jobId": job_id,
"jobNum": job_num,
"part": index,
"parts": total,
"error": str(e),
})
self._safe_send(chat_id, f"Не удалось озвучить ответ #{job_num}: {e}", reply_to=message_id)
return
caption = f"Озвучка ответа #{job_num}"
if total > 1:
caption += f", часть {index}/{total}"
for target_chat_id, target_reply_to, target_label in targets:
message_sent = self._safe_send_voice_upload(
target_chat_id,
audio,
f"shine-answer-{job_num}-{index}.ogg",
caption=caption,
reply_to=target_reply_to,
)
if message_sent is None:
self._append_history(history_path, "voice_reply_failed", {
"jobId": job_id,
"jobNum": job_num,
"part": index,
"parts": total,
"target": target_label,
"error": "Telegram не принял voice-файл озвучки.",
})
if target_label == "source":
self._safe_send(chat_id, f"Озвучка ответа #{job_num} создана, но Telegram не принял voice-файл.", reply_to=message_id)
continue
sent_count += 1
self._append_history(history_path, "voice_reply_sent", {
"jobId": job_id,
"jobNum": job_num,
"parts": total,
"messages": sent_count,
"targets": len(targets),
"rewriteEnabled": rewrite_enabled,
})
print(f"[py-bot] tts done job={str(job_id)[:8]} sent={sent_count}", flush=True)
def _openai_rewrite_text_for_voice(self, text: str) -> str:
source = (text or "").strip()
if not source:
return ""
if len(source) > self.cfg.openai_voice_rewrite_max_input_chars:
source = source[:self.cfg.openai_voice_rewrite_max_input_chars].rstrip() + "\n\n...[текстовый ответ был длиннее и обрезан для голосовой версии]"
payload = {
"model": self.cfg.openai_voice_rewrite_model,
"messages": [
{
"role": "system",
"content": (
"Ты готовишь русскую версию финального ответа технического агента для озвучивания. "
"Не пересказывай заново и не меняй смысл: сохрани порядок мыслей, итог, предупреждения, статусы и важные действия. "
"Мягко убери только то, что плохо воспринимается на слух: длинные пути, хэши, ID, команды, JSON, "
"длинные списки файлов, точные размеры и счётчики символов. Если деталь важна, замени её коротким описанием. "
"Не добавляй новых фактов. Пиши естественно, без markdown, близко к исходному тексту."
),
},
{
"role": "user",
"content": f"Переделай этот финальный текст в вариант для озвучки:\n\n{source}",
},
],
"temperature": 0.2,
"max_tokens": self.cfg.openai_voice_rewrite_max_output_tokens,
}
data = json.dumps(payload, ensure_ascii=False).encode("utf-8")
req = request.Request("https://api.openai.com/v1/chat/completions", method="POST", data=data)
req.add_header("Authorization", f"Bearer {self.cfg.openai_api_key}")
req.add_header("Content-Type", "application/json")
try:
with request.urlopen(req, timeout=self.cfg.openai_voice_rewrite_timeout_seconds) as resp:
raw = resp.read().decode("utf-8", errors="replace")
except TimeoutError as e:
raise VoiceReplyError(
f"OpenAI не успел адаптировать текст за {self.cfg.openai_voice_rewrite_timeout_seconds} секунд."
) from e
except error.HTTPError as e:
detail = e.read().decode("utf-8", errors="replace")
if e.code == 401:
message = "OpenAI отклонил ключ API для адаптации текста."
elif e.code == 429:
message = "OpenAI временно ограничил адаптацию текста из-за лимита запросов."
elif e.code >= 500:
message = "OpenAI временно не смог адаптировать текст."
else:
message = f"OpenAI вернул ошибку HTTP {e.code} при адаптации текста."
if detail:
message = f"{message} Детали: {detail[:500]}"
raise VoiceReplyError(message) from e
except error.URLError as e:
raise VoiceReplyError(f"не удалось отправить текст в OpenAI для адаптации из-за сетевой ошибки: {e.reason}") from e
try:
body = json.loads(raw)
content = (((body.get("choices") or [{}])[0].get("message") or {}).get("content") or "").strip()
except Exception as e:
raise VoiceReplyError("OpenAI вернул неразборчивый ответ при адаптации текста.") from e
if not content:
raise VoiceReplyError("OpenAI вернул пустой текст адаптации.")
return content
def _openai_tts(self, text: str) -> bytes:
payload = {
"model": self.cfg.openai_tts_model,
"voice": self.cfg.openai_tts_voice,
"input": text,
"response_format": self.cfg.openai_tts_response_format,
}
data = json.dumps(payload, ensure_ascii=False).encode("utf-8")
req = request.Request("https://api.openai.com/v1/audio/speech", method="POST", data=data)
req.add_header("Authorization", f"Bearer {self.cfg.openai_api_key}")
req.add_header("Content-Type", "application/json")
try:
with request.urlopen(req, timeout=self.cfg.openai_tts_timeout_seconds) as resp:
audio = resp.read()
except TimeoutError as e:
raise VoiceReplyError(f"OpenAI не успел сгенерировать речь за {self.cfg.openai_tts_timeout_seconds} секунд.") from e
except error.HTTPError as e:
detail = e.read().decode("utf-8", errors="replace")
if e.code == 401:
message = "OpenAI отклонил ключ API для озвучивания."
elif e.code == 429:
message = "OpenAI временно ограничил озвучивание из-за лимита запросов."
elif e.code >= 500:
message = "OpenAI временно не смог сгенерировать речь."
else:
message = f"OpenAI вернул ошибку HTTP {e.code} при озвучивании."
if detail:
message = f"{message} Детали: {detail[:500]}"
raise VoiceReplyError(message) from e
except error.URLError as e:
raise VoiceReplyError(f"не удалось отправить текст в OpenAI TTS из-за сетевой ошибки: {e.reason}") from e
if not audio:
raise VoiceReplyError("OpenAI вернул пустой аудиофайл.")
return audio
def _transcribe_voice_job(
self,
job: dict[str, Any],
*,
status_cb: Callable[[str], Any] | None = None,
) -> str:
if not self.cfg.openai_api_key:
raise VoiceTranscriptionError(
"не настроен ключ OpenAI для распознавания.",
stage="config",
retryable=False,
)
file_id = (job.get("telegram_file_id") or "").strip()
if not file_id:
raise VoiceTranscriptionError(
"Telegram не передал идентификатор файла.",
stage="telegram_file_id",
retryable=False,
)
job_id = str(job.get("id") or "")[:8]
job_num = job.get("num", "?")
media_type = (job.get("telegram_media_type") or "voice").strip()
duration_seconds = int(job.get("telegram_duration_seconds") or 0)
telegram_file_size = int(job.get("telegram_file_size") or 0)
file_looks_big_for_cloud = self._telegram_cloud_download_is_likely_too_big(telegram_file_size)
if file_looks_big_for_cloud and status_cb is not None:
status_cb(
"файл большой, всё равно пробую скачать его из Telegram. "
f"Предварительный размер около {self._bytes_to_mb(telegram_file_size)} MB."
)
started_at = time.time()
print(f"[py-bot] transcribe start job={job_id} num={job_num} media={media_type}", flush=True)
file_bytes, filename = self._download_telegram_file(file_id)
print(
f"[py-bot] transcribe downloaded job={job_id} filename={filename} size={len(file_bytes)} bytes",
flush=True,
)
if file_looks_big_for_cloud and status_cb is not None:
status_cb(
"скачивание из Telegram прошло успешно. "
f"Фактический размер около {self._bytes_to_mb(len(file_bytes))} MB, дальше готовлю аудио и отправляю в OpenAI."
)
prepared_parts = self._prepare_audio_parts_for_transcription(
file_bytes,
filename,
duration_seconds=duration_seconds,
job_id=job_id,
job_num=job_num,
)
print(
f"[py-bot] transcribe prepared job={job_id} parts={len(prepared_parts)} duration={duration_seconds}s",
flush=True,
)
parts_text: list[str] = []
prompt_tail = ""
for index, (part_bytes, part_name) in enumerate(prepared_parts, start=1):
print(
f"[py-bot] transcribe part job={job_id} index={index}/{len(prepared_parts)} filename={part_name} size={len(part_bytes)} bytes",
flush=True,
)
part_text = self._openai_transcribe(part_bytes, part_name, prompt=prompt_tail).strip()
if part_text:
parts_text.append(part_text)
prompt_tail = self._transcription_prompt_tail("\n".join(parts_text))
text = "\n".join(parts_text).strip()
if not text:
raise VoiceTranscriptionError(
"сервис распознавания вернул пустой текст. Возможно, в записи нет слышимой речи или качество звука слишком низкое.",
stage="empty_text",
retryable=False,
)
elapsed = self._format_duration(int(time.time() - started_at))
print(f"[py-bot] transcribe done job={job_id} chars={len(text)} elapsed={elapsed}", flush=True)
return text
def _download_telegram_file(self, file_id: str) -> tuple[bytes, str]:
try:
result = self.telegram.call("getFile", {"file_id": file_id}, timeout=120)
except TimeoutError as e:
raise VoiceTranscriptionError(
"Telegram долго не отдавал информацию о файле.",
stage="telegram_get_file_timeout",
detail=str(e),
) from e
except Exception as e:
detail = str(e)
if "file is too big" in detail.lower():
raise VoiceTranscriptionError(
"Файл большой: я попробовал скачать его через текущий Telegram Bot API, "
"но Telegram не дал это сделать. Для такого аудио нужен локальный `telegram-bot-api` "
"сервер или другой способ передать файл боту.",
stage="telegram_get_file_too_big",
retryable=False,
detail=detail,
) from e
raise VoiceTranscriptionError(
"не удалось получить информацию о файле из Telegram.",
stage="telegram_get_file",
detail=detail,
) from e
info = result.get("result") or {}
file_path = info.get("file_path")
if not file_path:
raise VoiceTranscriptionError(
"Telegram не вернул путь к файлу.",
stage="telegram_file_path",
retryable=True,
detail=json.dumps(info, ensure_ascii=False)[:1000],
)
file_url = self.telegram.file_base + file_path.lstrip("/")
req = request.Request(file_url, method="GET")
try:
with request.urlopen(req, timeout=self.cfg.telegram_file_download_timeout_seconds) as resp:
data = resp.read()
except TimeoutError as e:
raise VoiceTranscriptionError(
f"Telegram не успел отдать аудиофайл за {self.cfg.telegram_file_download_timeout_seconds} секунд.",
stage="telegram_download_timeout",
detail=str(e),
) from e
except error.HTTPError as e:
detail = e.read().decode("utf-8", errors="replace")
raise VoiceTranscriptionError(
f"Telegram вернул ошибку HTTP {e.code} при скачивании аудио.",
stage="telegram_download_http",
retryable=e.code >= 500 or e.code == 429,
detail=detail[:1000],
) from e
except error.URLError as e:
raise VoiceTranscriptionError(
"не удалось скачать аудиофайл из Telegram из-за сетевой ошибки.",
stage="telegram_download_network",
detail=str(e.reason),
) from e
if not data:
raise VoiceTranscriptionError(
"Telegram отдал пустой аудиофайл.",
stage="telegram_download_empty",
retryable=True,
)
original_name = Path(file_path).name or "audio.ogg"
lower = original_name.lower()
# OpenAI transcription может не принимать расширение .oga, нормализуем в .ogg.
if lower.endswith(".oga"):
base = original_name[:-4] if len(original_name) > 4 else "audio"
normalized = f"{base}.ogg"
else:
normalized = original_name
return data, normalized
def _prepare_audio_parts_for_transcription(
self,
file_bytes: bytes,
filename: str,
*,
duration_seconds: int,
job_id: str,
job_num: Any,
) -> list[tuple[bytes, str]]:
needs_duration_chunking = duration_seconds > self.cfg.openai_transcribe_max_chunk_seconds
if len(file_bytes) <= self.cfg.openai_transcribe_max_upload_bytes and not needs_duration_chunking:
return [(file_bytes, filename)]
ffmpeg_path = shutil.which(self.cfg.ffmpeg_bin)
ffprobe_path = shutil.which(self.cfg.ffprobe_bin)
if not ffmpeg_path or not ffprobe_path:
raise VoiceTranscriptionError(
"для длинного аудио нужен локальный `ffmpeg`/`ffprobe`, но они не найдены в системе.",
stage="audio_prepare_tools_missing",
retryable=False,
)
with tempfile.TemporaryDirectory(prefix="shine-audio-") as tmpdir:
tmp = Path(tmpdir)
input_suffix = Path(filename).suffix or ".ogg"
input_path = tmp / f"source{input_suffix}"
input_path.write_bytes(file_bytes)
prepared_path = tmp / "prepared.ogg"
self._ffmpeg_reencode_audio(input_path, prepared_path)
prepared_bytes = prepared_path.read_bytes()
prepared_duration = self._ffprobe_duration_seconds(prepared_path)
if (
len(prepared_bytes) <= self.cfg.openai_transcribe_max_upload_bytes
and prepared_duration <= self.cfg.openai_transcribe_max_chunk_seconds
):
return [(prepared_bytes, prepared_path.name)]
chunk_length = self._choose_transcription_chunk_seconds(prepared_duration, len(prepared_bytes))
print(
f"[py-bot] audio chunking job={job_id} num={job_num} duration={prepared_duration:.1f}s total_bytes={len(prepared_bytes)} chunk_seconds={chunk_length}",
flush=True,
)
chunks: list[tuple[bytes, str]] = []
offset = 0
index = 1
total_duration = max(1, int(prepared_duration + 0.999))
while offset < total_duration:
chunk_path = tmp / f"chunk_{index:03d}.ogg"
self._ffmpeg_extract_audio_chunk(prepared_path, chunk_path, offset, chunk_length)
chunk_bytes = chunk_path.read_bytes()
if not chunk_bytes:
break
if len(chunk_bytes) > self.cfg.openai_transcribe_max_upload_bytes:
raise VoiceTranscriptionError(
"локальная нарезка аудио дала слишком большой кусок для OpenAI; нужно уменьшить размер чанка.",
stage="audio_chunk_too_large",
retryable=False,
)
chunks.append((chunk_bytes, chunk_path.name))
step = max(1, chunk_length - self.cfg.openai_transcribe_overlap_seconds)
offset += step
index += 1
if not chunks:
raise VoiceTranscriptionError(
"не удалось подготовить куски аудио для распознавания.",
stage="audio_chunk_empty",
retryable=False,
)
return chunks
def _ffmpeg_reencode_audio(self, input_path: Path, output_path: Path) -> None:
cmd = [
self.cfg.ffmpeg_bin,
"-y",
"-i",
str(input_path),
"-vn",
"-ac",
"1",
"-ar",
"16000",
"-c:a",
"libopus",
"-b:a",
f"{self.cfg.openai_transcribe_reencode_bitrate_kbps}k",
str(output_path),
]
self._run_subprocess_checked(cmd, "audio_reencode_ffmpeg")
def _ffmpeg_extract_audio_chunk(self, input_path: Path, output_path: Path, offset_seconds: int, chunk_seconds: int) -> None:
cmd = [
self.cfg.ffmpeg_bin,
"-y",
"-ss",
str(offset_seconds),
"-t",
str(chunk_seconds),
"-i",
str(input_path),
"-vn",
"-acodec",
"copy",
str(output_path),
]
self._run_subprocess_checked(cmd, "audio_chunk_ffmpeg")
def _ffprobe_duration_seconds(self, audio_path: Path) -> float:
cmd = [
self.cfg.ffprobe_bin,
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
str(audio_path),
]
try:
result = subprocess.run(
cmd,
check=True,
capture_output=True,
text=True,
timeout=self.cfg.openai_transcribe_ffmpeg_timeout_seconds,
)
except subprocess.TimeoutExpired as e:
raise VoiceTranscriptionError(
f"`ffprobe` не успел определить длительность аудио за {self.cfg.openai_transcribe_ffmpeg_timeout_seconds} секунд.",
stage="audio_probe_timeout",
retryable=False,
) from e
except subprocess.CalledProcessError as e:
detail = (e.stderr or e.stdout or "").strip()
raise VoiceTranscriptionError(
"не удалось определить длительность аудио через `ffprobe`.",
stage="audio_probe_failed",
retryable=False,
detail=detail[:1500],
) from e
raw = (result.stdout or "").strip()
try:
return max(0.0, float(raw))
except ValueError as e:
raise VoiceTranscriptionError(
"`ffprobe` вернул некорректную длительность аудио.",
stage="audio_probe_invalid",
retryable=False,
detail=raw[:300],
) from e
def _run_subprocess_checked(self, cmd: list[str], stage: str) -> None:
try:
subprocess.run(
cmd,
check=True,
capture_output=True,
text=True,
timeout=self.cfg.openai_transcribe_ffmpeg_timeout_seconds,
)
except subprocess.TimeoutExpired as e:
raise VoiceTranscriptionError(
f"локальная обработка аудио не успела завершиться за {self.cfg.openai_transcribe_ffmpeg_timeout_seconds} секунд.",
stage=f"{stage}_timeout",
retryable=False,
) from e
except subprocess.CalledProcessError as e:
detail = (e.stderr or e.stdout or "").strip()
raise VoiceTranscriptionError(
"локальная обработка аудио через `ffmpeg` завершилась с ошибкой.",
stage=f"{stage}_failed",
retryable=False,
detail=detail[:1500],
) from e
def _choose_transcription_chunk_seconds(self, duration_seconds: float, total_bytes: int) -> int:
max_chunk = self.cfg.openai_transcribe_max_chunk_seconds
safe_seconds = max(60, max_chunk - self.cfg.openai_transcribe_overlap_seconds)
if duration_seconds <= 0 or total_bytes <= 0:
return safe_seconds
bytes_per_second = total_bytes / max(duration_seconds, 1.0)
if bytes_per_second <= 0:
return safe_seconds
size_limited = int((self.cfg.openai_transcribe_max_upload_bytes * 0.9) / bytes_per_second)
return max(60, min(safe_seconds, size_limited if size_limited > 0 else safe_seconds))
@staticmethod
def _transcription_prompt_tail(text: str, limit: int = 1000) -> str:
source = compact_spaces(text)
if len(source) <= limit:
return source
return source[-limit:]
def _telegram_cloud_download_is_likely_too_big(self, file_size: int) -> bool:
if file_size <= 0:
return False
using_cloud_api = self.cfg.telegram_api_base_url.rstrip("/") == "https://api.telegram.org"
return using_cloud_api and file_size > 20 * 1024 * 1024
@staticmethod
def _bytes_to_mb(value: int) -> str:
return f"{value / (1024 * 1024):.1f}"
def _openai_transcribe(self, file_bytes: bytes, filename: str, prompt: str = "") -> str:
boundary = "----shine-boundary-" + "".join(random.choices("abcdef0123456789", k=16))
mime = mimetypes.guess_type(filename)[0] or "application/octet-stream"
def text_part(name: str, value: str) -> bytes:
return (
f"--{boundary}\r\n"
f'Content-Disposition: form-data; name="{name}"\r\n\r\n'
f"{value}\r\n"
).encode("utf-8")
body = bytearray()
body.extend(text_part("model", self.cfg.openai_transcribe_model))
body.extend(text_part("response_format", "text"))
prompt = compact_spaces(prompt)
if prompt:
body.extend(text_part("prompt", prompt[:1000]))
body.extend(
(
f"--{boundary}\r\n"
f'Content-Disposition: form-data; name="file"; filename="{filename}"\r\n'
f"Content-Type: {mime}\r\n\r\n"
).encode("utf-8")
)
body.extend(file_bytes)
body.extend(b"\r\n")
body.extend(f"--{boundary}--\r\n".encode("utf-8"))
req = request.Request("https://api.openai.com/v1/audio/transcriptions", method="POST", data=bytes(body))
req.add_header("Authorization", f"Bearer {self.cfg.openai_api_key}")
req.add_header("Content-Type", f"multipart/form-data; boundary={boundary}")
try:
with request.urlopen(req, timeout=self.cfg.openai_transcribe_timeout_seconds) as resp:
return resp.read().decode("utf-8", errors="replace")
except TimeoutError as e:
raise VoiceTranscriptionError(
f"OpenAI не успел распознать аудио за {self.cfg.openai_transcribe_timeout_seconds} секунд.",
stage="openai_transcribe_timeout",
detail=str(e),
) from e
except error.HTTPError as e:
detail = e.read().decode("utf-8", errors="replace")
if e.code == 400:
user_message = "OpenAI не принял аудиофайл для распознавания."
elif e.code == 401:
user_message = "OpenAI отклонил ключ API для распознавания."
elif e.code == 413:
user_message = "аудиофайл слишком большой для распознавания OpenAI."
elif e.code == 429:
user_message = "OpenAI временно ограничил распознавание из-за лимита запросов."
elif e.code >= 500:
user_message = "OpenAI временно не смог обработать распознавание."
else:
user_message = f"OpenAI вернул ошибку HTTP {e.code} при распознавании."
raise VoiceTranscriptionError(
user_message,
stage="openai_transcribe_http",
retryable=e.code == 429 or e.code >= 500,
detail=detail[:1500],
) from e
except error.URLError as e:
raise VoiceTranscriptionError(
"не удалось отправить аудио в OpenAI из-за сетевой ошибки.",
stage="openai_transcribe_network",
detail=str(e.reason),
) from e
@staticmethod
def _extract_codex_user_note(line: str) -> str | None:
s = (line or "").strip()
if not s.startswith("{"):
return None
try:
obj = json.loads(s)
except Exception:
return None
if obj.get("type") != "item.completed":
return None
item = obj.get("item") or {}
if item.get("type") != "agent_message":
return None
text = (item.get("text") or "").strip()
if not text:
return None
if len(text) > 220:
return text[:220] + "..."
return text
@staticmethod
def _extract_fallback_message(lines: list[str]) -> str:
for line in reversed(lines):
line = line.strip()
if not line:
continue
if line.startswith("{") and '"type":' in line:
continue
if line.startswith("mcp:") or line.startswith("OpenAI Codex"):
continue
return line
return ""
@staticmethod
def _extract_codex_thread_id(line: str) -> str:
s = (line or "").strip()
if not s.startswith("{"):
return ""
try:
obj = json.loads(s)
except Exception:
return ""
if obj.get("type") != "thread.started":
return ""
thread_id = (obj.get("thread_id") or "").strip()
return thread_id
@staticmethod
def _is_missing_codex_session_error(text: str) -> bool:
lowered = (text or "").lower()
markers = [
"session not found",
"conversation not found",
"thread not found",
"no session found",
"invalid session",
"unknown session",
"no conversation found",
"unknown thread",
]
return any(marker in lowered for marker in markers)
@staticmethod
def _format_duration(seconds: int) -> str:
seconds = max(0, seconds)
minutes, sec = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60)
if hours:
return f"{hours}ч {minutes}м {sec}с"
if minutes:
return f"{minutes}м {sec}с"
return f"{sec}с"
def run_selftest(config: BotConfig, prompt: str) -> int:
cmd = [
str(config.codex_bin),
"exec",
"--dangerously-bypass-approvals-and-sandbox",
"--json",
"-C", str(config.codex_workdir),
prompt,
]
proc = subprocess.run(
cmd,
stdin=subprocess.DEVNULL,
capture_output=True,
text=True,
encoding="utf-8",
errors="replace",
)
print(proc.stdout)
if proc.stderr:
print(proc.stderr)
return proc.returncode
def main() -> int:
parser = argparse.ArgumentParser(description="SHiNE Python Telegram bot wrapper for Codex CLI")
parser.add_argument("--selftest-codex", default="", help="Выполнить только codex exec с этим prompt и выйти")
args = parser.parse_args()
root = Path(__file__).resolve().parent
cfg = BotConfig(root)
if args.selftest_codex:
return run_selftest(cfg, args.selftest_codex)
service = ShinePyBotService(cfg)
try:
service.run()
except KeyboardInterrupt:
service.shutdown()
return 0
except Exception as e:
print(f"[py-bot] FATAL: {e}", flush=True)
return 1
return 0
if __name__ == "__main__":
raise SystemExit(main())