2444 lines
109 KiB
Python
2444 lines
109 KiB
Python
#!/usr/bin/env python3
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import datetime as dt
|
||
import fcntl
|
||
import json
|
||
import mimetypes
|
||
import os
|
||
import random
|
||
import re
|
||
import string
|
||
import subprocess
|
||
import tempfile
|
||
import threading
|
||
import time
|
||
import traceback
|
||
import uuid
|
||
from pathlib import Path
|
||
from typing import Any
|
||
from urllib import error, request
|
||
|
||
DEFAULT_ALLOWED_PLAYERS = ",".join([
|
||
"malvviiina:Милана",
|
||
"zodiaktechnika32:Сергей",
|
||
"oidasyda:Иван",
|
||
"blackbyrd1:Ворон",
|
||
"dimasol1:Дима",
|
||
])
|
||
|
||
TASK_STATUS_LABELS = {
|
||
"new": "новая",
|
||
"approved": "одобрена",
|
||
"rejected": "отклонена",
|
||
"needs_work": "на доработку",
|
||
"done": "сделана",
|
||
}
|
||
|
||
|
||
def now_iso() -> str:
|
||
return dt.datetime.now(dt.timezone.utc).isoformat()
|
||
|
||
|
||
def normalize_username(value: str | None) -> str:
|
||
if not value:
|
||
return ""
|
||
value = value.strip()
|
||
if value.startswith("@"):
|
||
value = value[1:]
|
||
return value.lower()
|
||
|
||
|
||
def parse_allowed_players(raw: str) -> dict[str, str]:
|
||
players: dict[str, str] = {}
|
||
for item in (raw or "").split(","):
|
||
part = item.strip()
|
||
if not part:
|
||
continue
|
||
username_part, sep, name_part = part.partition(":")
|
||
username = normalize_username(username_part)
|
||
if not username:
|
||
continue
|
||
player_name = (name_part if sep else username_part).strip() or username
|
||
players[username] = player_name
|
||
return players
|
||
|
||
|
||
def compact_spaces(text: str) -> str:
|
||
return re.sub(r"\s+", " ", (text or "").strip())
|
||
|
||
|
||
def split_long_text(text: str, chunk_size: int = 3500) -> list[str]:
|
||
text = (text or "").strip()
|
||
if not text:
|
||
return ["(пустой ответ)"]
|
||
return [text[i:i + chunk_size] for i in range(0, len(text), chunk_size)]
|
||
|
||
|
||
def split_text_for_tts(text: str, chunk_size: int) -> list[str]:
|
||
text = (text or "").strip()
|
||
if not text:
|
||
return []
|
||
chunks: list[str] = []
|
||
current = ""
|
||
paragraphs = re.split(r"\n\s*\n", text)
|
||
for paragraph in paragraphs:
|
||
paragraph = paragraph.strip()
|
||
if not paragraph:
|
||
continue
|
||
if len(paragraph) > chunk_size:
|
||
if current:
|
||
chunks.append(current)
|
||
current = ""
|
||
for i in range(0, len(paragraph), chunk_size):
|
||
part = paragraph[i:i + chunk_size].strip()
|
||
if part:
|
||
chunks.append(part)
|
||
continue
|
||
candidate = paragraph if not current else f"{current}\n\n{paragraph}"
|
||
if len(candidate) <= chunk_size:
|
||
current = candidate
|
||
else:
|
||
if current:
|
||
chunks.append(current)
|
||
current = paragraph
|
||
if current:
|
||
chunks.append(current)
|
||
return chunks
|
||
|
||
|
||
def read_env_file(path: Path) -> dict[str, str]:
|
||
result: dict[str, str] = {}
|
||
if not path.exists():
|
||
return result
|
||
for raw_line in path.read_text(encoding="utf-8").splitlines():
|
||
line = raw_line.strip()
|
||
if not line or line.startswith("#") or "=" not in line:
|
||
continue
|
||
key, value = line.split("=", 1)
|
||
key = key.strip()
|
||
value = value.strip().strip('"').strip("'")
|
||
result[key] = value
|
||
return result
|
||
|
||
|
||
class VoiceTranscriptionError(RuntimeError):
|
||
def __init__(
|
||
self,
|
||
user_message: str,
|
||
*,
|
||
stage: str,
|
||
retryable: bool = True,
|
||
detail: str = "",
|
||
):
|
||
super().__init__(user_message)
|
||
self.user_message = user_message
|
||
self.stage = stage
|
||
self.retryable = retryable
|
||
self.detail = detail
|
||
|
||
def log_text(self) -> str:
|
||
if self.detail and self.detail != self.user_message:
|
||
return f"{self.user_message} stage={self.stage} retryable={self.retryable} detail={self.detail}"
|
||
return f"{self.user_message} stage={self.stage} retryable={self.retryable}"
|
||
|
||
|
||
class VoiceReplyError(RuntimeError):
|
||
pass
|
||
|
||
|
||
class JsonLineStore:
|
||
@staticmethod
|
||
def load(path: Path) -> list[dict[str, Any]]:
|
||
if not path.exists():
|
||
return []
|
||
items: list[dict[str, Any]] = []
|
||
for line in path.read_text(encoding="utf-8").splitlines():
|
||
line = line.strip()
|
||
if not line:
|
||
continue
|
||
items.append(json.loads(line))
|
||
return items
|
||
|
||
@staticmethod
|
||
def save(path: Path, items: list[dict[str, Any]]) -> None:
|
||
path.parent.mkdir(parents=True, exist_ok=True)
|
||
tmp = path.with_suffix(path.suffix + ".tmp")
|
||
with tmp.open("w", encoding="utf-8") as f:
|
||
for item in items:
|
||
f.write(json.dumps(item, ensure_ascii=False) + "\n")
|
||
tmp.replace(path)
|
||
|
||
@staticmethod
|
||
def append(path: Path, item: dict[str, Any]) -> None:
|
||
path.parent.mkdir(parents=True, exist_ok=True)
|
||
with path.open("a", encoding="utf-8") as f:
|
||
f.write(json.dumps(item, ensure_ascii=False) + "\n")
|
||
|
||
|
||
class TelegramApi:
|
||
def __init__(self, token: str):
|
||
self.base = f"https://api.telegram.org/bot{token}/"
|
||
|
||
def call(self, method: str, payload: dict[str, Any] | None = None, timeout: int = 60) -> dict[str, Any]:
|
||
data = None
|
||
headers = {}
|
||
if payload is not None:
|
||
data = json.dumps(payload).encode("utf-8")
|
||
headers["Content-Type"] = "application/json"
|
||
req = request.Request(self.base + method, data=data, headers=headers, method="POST")
|
||
try:
|
||
with request.urlopen(req, timeout=timeout) as resp:
|
||
raw = resp.read().decode("utf-8")
|
||
except error.HTTPError as e:
|
||
body = e.read().decode("utf-8", errors="replace")
|
||
raise RuntimeError(f"Telegram HTTP {e.code}: {body}") from e
|
||
except Exception as e:
|
||
raise RuntimeError(f"Telegram request failed: {e}") from e
|
||
|
||
result = json.loads(raw)
|
||
if not result.get("ok"):
|
||
raise RuntimeError(f"Telegram API error: {result}")
|
||
return result
|
||
|
||
def call_multipart(
|
||
self,
|
||
method: str,
|
||
fields: dict[str, Any],
|
||
files: dict[str, tuple[str, bytes, str]],
|
||
timeout: int = 120,
|
||
) -> dict[str, Any]:
|
||
boundary = "----shine-tg-boundary-" + "".join(random.choices("abcdef0123456789", k=16))
|
||
body = bytearray()
|
||
for name, value in fields.items():
|
||
if value is None:
|
||
continue
|
||
body.extend(
|
||
(
|
||
f"--{boundary}\r\n"
|
||
f'Content-Disposition: form-data; name="{name}"\r\n\r\n'
|
||
f"{value}\r\n"
|
||
).encode("utf-8")
|
||
)
|
||
for name, (filename, data, mime) in files.items():
|
||
body.extend(
|
||
(
|
||
f"--{boundary}\r\n"
|
||
f'Content-Disposition: form-data; name="{name}"; filename="{filename}"\r\n'
|
||
f"Content-Type: {mime}\r\n\r\n"
|
||
).encode("utf-8")
|
||
)
|
||
body.extend(data)
|
||
body.extend(b"\r\n")
|
||
body.extend(f"--{boundary}--\r\n".encode("utf-8"))
|
||
|
||
req = request.Request(self.base + method, data=bytes(body), method="POST")
|
||
req.add_header("Content-Type", f"multipart/form-data; boundary={boundary}")
|
||
try:
|
||
with request.urlopen(req, timeout=timeout) as resp:
|
||
raw = resp.read().decode("utf-8")
|
||
except error.HTTPError as e:
|
||
body_text = e.read().decode("utf-8", errors="replace")
|
||
raise RuntimeError(f"Telegram HTTP {e.code}: {body_text}") from e
|
||
except Exception as e:
|
||
raise RuntimeError(f"Telegram multipart request failed: {e}") from e
|
||
|
||
result = json.loads(raw)
|
||
if not result.get("ok"):
|
||
raise RuntimeError(f"Telegram API error: {result}")
|
||
return result
|
||
|
||
def get_updates(self, offset: int | None, timeout_sec: int) -> list[dict[str, Any]]:
|
||
payload: dict[str, Any] = {"timeout": timeout_sec, "allowed_updates": ["message", "channel_post"]}
|
||
if offset is not None:
|
||
payload["offset"] = offset
|
||
result = self.call("getUpdates", payload=payload, timeout=timeout_sec + 15)
|
||
return result.get("result", [])
|
||
|
||
def send_message(self, chat_id: int | str, text: str, reply_to_message_id: int | None = None) -> dict[str, Any]:
|
||
payload: dict[str, Any] = {"chat_id": chat_id, "text": text}
|
||
if reply_to_message_id is not None:
|
||
payload["reply_to_message_id"] = reply_to_message_id
|
||
return self.call("sendMessage", payload=payload, timeout=30)
|
||
|
||
def send_voice(
|
||
self,
|
||
chat_id: int | str,
|
||
voice: str,
|
||
caption: str = "",
|
||
reply_to_message_id: int | None = None,
|
||
) -> dict[str, Any]:
|
||
payload: dict[str, Any] = {"chat_id": chat_id, "voice": voice}
|
||
if caption:
|
||
payload["caption"] = caption
|
||
if reply_to_message_id is not None:
|
||
payload["reply_to_message_id"] = reply_to_message_id
|
||
return self.call("sendVoice", payload=payload, timeout=60)
|
||
|
||
def send_audio(
|
||
self,
|
||
chat_id: int | str,
|
||
audio: str,
|
||
caption: str = "",
|
||
reply_to_message_id: int | None = None,
|
||
) -> dict[str, Any]:
|
||
payload: dict[str, Any] = {"chat_id": chat_id, "audio": audio}
|
||
if caption:
|
||
payload["caption"] = caption
|
||
if reply_to_message_id is not None:
|
||
payload["reply_to_message_id"] = reply_to_message_id
|
||
return self.call("sendAudio", payload=payload, timeout=60)
|
||
|
||
def send_voice_upload(
|
||
self,
|
||
chat_id: int | str,
|
||
voice_bytes: bytes,
|
||
filename: str,
|
||
caption: str = "",
|
||
reply_to_message_id: int | None = None,
|
||
) -> dict[str, Any]:
|
||
fields: dict[str, Any] = {"chat_id": chat_id}
|
||
if caption:
|
||
fields["caption"] = caption
|
||
if reply_to_message_id is not None:
|
||
fields["reply_to_message_id"] = reply_to_message_id
|
||
return self.call_multipart(
|
||
"sendVoice",
|
||
fields=fields,
|
||
files={"voice": (filename, voice_bytes, "audio/ogg")},
|
||
timeout=180,
|
||
)
|
||
|
||
def delete_webhook(self) -> None:
|
||
self.call("deleteWebhook", payload={"drop_pending_updates": False}, timeout=30)
|
||
|
||
|
||
class BotConfig:
|
||
def __init__(self, root_dir: Path):
|
||
env = dict(os.environ)
|
||
env.update(read_env_file(root_dir / ".env"))
|
||
|
||
self.root_dir = root_dir
|
||
self.telegram_bot_token = self._required(env, "TELEGRAM_BOT_TOKEN")
|
||
self.allowed_username = normalize_username(env.get("ALLOWED_TELEGRAM_USERNAME", "AidarKC"))
|
||
self.allowed_players = parse_allowed_players(env.get("ALLOWED_TELEGRAM_PLAYERS", DEFAULT_ALLOWED_PLAYERS))
|
||
self.allowed_channel_username = normalize_username(env.get("ALLOWED_TELEGRAM_CHANNEL_USERNAME", "shine_writing"))
|
||
self.bot_username = env.get("BOT_USERNAME", "aidar_su_bot")
|
||
self.openai_api_key = env.get("OPENAI_API_KEY", "").strip()
|
||
self.openai_transcribe_model = env.get("OPENAI_TRANSCRIBE_MODEL", "gpt-4o-mini-transcribe")
|
||
self.telegram_file_download_timeout_seconds = int(env.get("TELEGRAM_FILE_DOWNLOAD_TIMEOUT_SECONDS", "300"))
|
||
self.openai_transcribe_timeout_seconds = int(env.get("OPENAI_TRANSCRIBE_TIMEOUT_SECONDS", "900"))
|
||
self.openai_tts_model = env.get("OPENAI_TTS_MODEL", "gpt-4o-mini-tts")
|
||
self.openai_tts_voice = env.get("OPENAI_TTS_VOICE", "alloy")
|
||
self.openai_tts_response_format = env.get("OPENAI_TTS_RESPONSE_FORMAT", "opus")
|
||
self.openai_tts_timeout_seconds = int(env.get("OPENAI_TTS_TIMEOUT_SECONDS", "180"))
|
||
self.openai_tts_chunk_chars = max(500, int(env.get("OPENAI_TTS_CHUNK_CHARS", "3500")))
|
||
self.openai_voice_rewrite_model = env.get("OPENAI_VOICE_REWRITE_MODEL", "gpt-4.1-nano")
|
||
self.openai_voice_rewrite_timeout_seconds = int(env.get("OPENAI_VOICE_REWRITE_TIMEOUT_SECONDS", "90"))
|
||
self.openai_voice_rewrite_max_input_chars = max(1000, int(env.get("OPENAI_VOICE_REWRITE_MAX_INPUT_CHARS", "12000")))
|
||
self.openai_voice_rewrite_max_output_tokens = max(200, int(env.get("OPENAI_VOICE_REWRITE_MAX_OUTPUT_TOKENS", "900")))
|
||
self.codex_bin = Path(env.get(
|
||
"CODEX_BIN",
|
||
"/home/ai/.cache/JetBrains/IntelliJIdea2026.1/aia/codex/bin/codex-x86_64-unknown-linux-musl"
|
||
))
|
||
self.codex_workdir = Path(env.get("CODEX_WORKDIR", "/home/ai/work/SHiNE/SHiNE-server-sha256"))
|
||
self.codex_timeout_seconds = int(env.get("CODEX_TIMEOUT_SECONDS", "900"))
|
||
self.max_retries = max(1, int(env.get("MAX_RETRIES", "3")))
|
||
self.data_dir = (root_dir / env.get("DATA_DIR", "./data")).resolve()
|
||
self.agent_instructions_file = (root_dir / "AGENT.md").resolve()
|
||
|
||
@staticmethod
|
||
def _required(env: dict[str, str], key: str) -> str:
|
||
value = env.get(key, "").strip()
|
||
if not value:
|
||
raise RuntimeError(f"Не задан обязательный параметр: {key}")
|
||
return value
|
||
|
||
|
||
class ShinePyBotService:
|
||
def __init__(self, config: BotConfig):
|
||
self.cfg = config
|
||
self.telegram = TelegramApi(config.telegram_bot_token)
|
||
|
||
self.queue_file = config.data_dir / "py_queue.jsonl"
|
||
self.state_file = config.data_dir / "py_state.json"
|
||
self.processed_updates_file = config.data_dir / "py_processed_updates.log"
|
||
self.lock_file = config.data_dir / "py_app.lock"
|
||
self.history_dir = config.data_dir / "history"
|
||
self.history_archive_dir = self.history_dir / "archive"
|
||
self.task_center_dir = config.data_dir / "task_center"
|
||
self.task_center_file = self.task_center_dir / "items.json"
|
||
self.max_processed_updates = 5000
|
||
|
||
self.queue_lock = threading.RLock()
|
||
self.task_center_lock = threading.RLock()
|
||
self.stop_event = threading.Event()
|
||
self.worker = threading.Thread(target=self._worker_loop, name="shine-py-bot-worker", daemon=True)
|
||
|
||
self.queue: list[dict[str, Any]] = []
|
||
self.state: dict[str, Any] = {}
|
||
self.processed_updates: list[str] = []
|
||
self.active_job_id: str | None = None
|
||
self.active_job_started_at: float | None = None
|
||
self.active_process: subprocess.Popen[str] | None = None
|
||
self.active_process_lock = threading.Lock()
|
||
self.stop_current_job = False
|
||
self.lock_fd = None
|
||
self.last_heartbeat_at: float = 0.0
|
||
self.restart_requested = False
|
||
|
||
def _is_owner(self, username: str) -> bool:
|
||
return normalize_username(username) == self.cfg.allowed_username
|
||
|
||
def _is_allowed_player(self, username: str) -> bool:
|
||
return normalize_username(username) in self.cfg.allowed_players
|
||
|
||
def _is_allowed_user(self, username: str) -> bool:
|
||
uname = normalize_username(username)
|
||
return uname == self.cfg.allowed_username or uname in self.cfg.allowed_players
|
||
|
||
def _player_name(self, username: str) -> str:
|
||
uname = normalize_username(username)
|
||
return self.cfg.allowed_players.get(uname, uname)
|
||
|
||
def _display_name(self, username: str) -> str:
|
||
uname = normalize_username(username)
|
||
if uname == self.cfg.allowed_username:
|
||
return "Айдар"
|
||
return self._player_name(uname)
|
||
|
||
def _known_usernames(self) -> dict[str, str]:
|
||
users = {self.cfg.allowed_username: "Айдар"}
|
||
users.update(self.cfg.allowed_players)
|
||
return users
|
||
|
||
def _find_user_by_text(self, text: str) -> str:
|
||
source = normalize_username(text)
|
||
if source in self._known_usernames():
|
||
return source
|
||
source_lower = (text or "").strip().lower()
|
||
aliases = {
|
||
"айдар": self.cfg.allowed_username,
|
||
"айдару": self.cfg.allowed_username,
|
||
"айдара": self.cfg.allowed_username,
|
||
"милана": "malvviiina",
|
||
"милане": "malvviiina",
|
||
"милану": "malvviiina",
|
||
"миланы": "malvviiina",
|
||
"сергей": "zodiaktechnika32",
|
||
"сергею": "zodiaktechnika32",
|
||
"сергея": "zodiaktechnika32",
|
||
"иван": "oidasyda",
|
||
"ивану": "oidasyda",
|
||
"ивана": "oidasyda",
|
||
"ворон": "blackbyrd1",
|
||
"ворону": "blackbyrd1",
|
||
"ворона": "blackbyrd1",
|
||
"дима": "dimasol1",
|
||
"диме": "dimasol1",
|
||
"диму": "dimasol1",
|
||
"димы": "dimasol1",
|
||
}
|
||
for alias, username in aliases.items():
|
||
if re.search(rf"(^|\W){re.escape(alias)}($|\W)", source_lower, flags=re.IGNORECASE):
|
||
return username
|
||
for username, name in self._known_usernames().items():
|
||
if username and username in source_lower:
|
||
return username
|
||
if name and name.lower() in source_lower:
|
||
return username
|
||
return ""
|
||
|
||
def run(self) -> None:
|
||
self._ensure_dirs()
|
||
self._acquire_single_instance_lock()
|
||
self._load_state()
|
||
self._load_queue()
|
||
self._load_processed_updates()
|
||
self._recover_active_jobs_after_restart()
|
||
self.telegram.delete_webhook()
|
||
self._init_offset_if_missing()
|
||
self.worker.start()
|
||
self._append_history_event("service_started", {"allowedUsername": self.cfg.allowed_username})
|
||
print(f"[py-bot] Запущен. allowed user: @{self.cfg.allowed_username}", flush=True)
|
||
|
||
try:
|
||
while not self.stop_event.is_set():
|
||
try:
|
||
offset = self.state.get("offset")
|
||
updates = self.telegram.get_updates(offset=offset, timeout_sec=25)
|
||
except Exception as e:
|
||
print(f"[py-bot] Ошибка getUpdates: {e}", flush=True)
|
||
time.sleep(2)
|
||
continue
|
||
|
||
for update in updates:
|
||
update_id = update.get("update_id")
|
||
if isinstance(update_id, int):
|
||
self.state["offset"] = update_id + 1
|
||
self._persist_state()
|
||
self._handle_update(update)
|
||
finally:
|
||
self.shutdown()
|
||
|
||
def shutdown(self) -> None:
|
||
if self.stop_event.is_set():
|
||
pass
|
||
self.stop_event.set()
|
||
self._stop_active_codex_process()
|
||
if self.worker.is_alive():
|
||
self.worker.join(timeout=10)
|
||
if self.lock_fd is not None:
|
||
try:
|
||
fcntl.flock(self.lock_fd, fcntl.LOCK_UN)
|
||
finally:
|
||
self.lock_fd.close()
|
||
self.lock_fd = None
|
||
self._append_history_event("service_stopped", {})
|
||
|
||
def _ensure_dirs(self) -> None:
|
||
self.cfg.data_dir.mkdir(parents=True, exist_ok=True)
|
||
self.history_dir.mkdir(parents=True, exist_ok=True)
|
||
self.history_archive_dir.mkdir(parents=True, exist_ok=True)
|
||
self.task_center_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
def _acquire_single_instance_lock(self) -> None:
|
||
self.lock_file.parent.mkdir(parents=True, exist_ok=True)
|
||
self.lock_fd = self.lock_file.open("a+")
|
||
try:
|
||
fcntl.flock(self.lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||
except BlockingIOError:
|
||
raise RuntimeError(f"Уже запущен другой инстанс (lock: {self.lock_file})")
|
||
|
||
def _load_state(self) -> None:
|
||
if self.state_file.exists():
|
||
self.state = json.loads(self.state_file.read_text(encoding="utf-8"))
|
||
else:
|
||
self.state = {}
|
||
sessions = self.state.get("user_sessions")
|
||
if not isinstance(sessions, dict):
|
||
sessions = {}
|
||
self.state["user_sessions"] = sessions
|
||
user_settings = self.state.get("user_settings")
|
||
if not isinstance(user_settings, dict):
|
||
user_settings = {}
|
||
self.state["user_settings"] = user_settings
|
||
if not self.state.get("current_history_file"):
|
||
history_file = self._create_new_history_file("initial", self.cfg.allowed_username)
|
||
self.state["current_history_file"] = str(history_file)
|
||
sessions[self.cfg.allowed_username] = {"current_history_file": str(history_file)}
|
||
elif self.cfg.allowed_username not in sessions:
|
||
sessions[self.cfg.allowed_username] = {"current_history_file": str(self.state["current_history_file"])}
|
||
if not isinstance(self.state.get("next_job_number"), int):
|
||
self.state["next_job_number"] = 1
|
||
self.state["updated_at"] = now_iso()
|
||
self._persist_state()
|
||
|
||
def _persist_state(self) -> None:
|
||
self.state["updated_at"] = now_iso()
|
||
tmp = self.state_file.with_suffix(".tmp")
|
||
tmp.write_text(json.dumps(self.state, ensure_ascii=False, indent=2), encoding="utf-8")
|
||
tmp.replace(self.state_file)
|
||
|
||
def _load_queue(self) -> None:
|
||
self.queue = JsonLineStore.load(self.queue_file)
|
||
|
||
def _persist_queue(self) -> None:
|
||
JsonLineStore.save(self.queue_file, self.queue)
|
||
|
||
def _load_processed_updates(self) -> None:
|
||
if not self.processed_updates_file.exists():
|
||
self.processed_updates = []
|
||
return
|
||
lines = [x.strip() for x in self.processed_updates_file.read_text(encoding="utf-8").splitlines() if x.strip()]
|
||
if len(lines) > self.max_processed_updates:
|
||
lines = lines[-self.max_processed_updates:]
|
||
self.processed_updates_file.write_text("\n".join(lines) + "\n", encoding="utf-8")
|
||
self.processed_updates = lines
|
||
|
||
def _mark_processed_update(self, update_key: str) -> bool:
|
||
if update_key in self.processed_updates:
|
||
return True
|
||
self.processed_updates.append(update_key)
|
||
if len(self.processed_updates) > self.max_processed_updates:
|
||
self.processed_updates = self.processed_updates[-self.max_processed_updates:]
|
||
self.processed_updates_file.write_text("\n".join(self.processed_updates) + "\n", encoding="utf-8")
|
||
else:
|
||
with self.processed_updates_file.open("a", encoding="utf-8") as f:
|
||
f.write(update_key + "\n")
|
||
return False
|
||
|
||
def _recover_active_jobs_after_restart(self) -> None:
|
||
recovered_ids: list[str] = []
|
||
for job in self.queue:
|
||
if job.get("status") == "active":
|
||
job["status"] = "pending"
|
||
job["retry_reason"] = "service_restart_recovery"
|
||
job["updated_at"] = now_iso()
|
||
recovered_ids.append(job.get("id", ""))
|
||
if recovered_ids:
|
||
self._persist_queue()
|
||
self._append_history_event("active_jobs_recovered", {"jobIds": recovered_ids})
|
||
|
||
def _init_offset_if_missing(self) -> None:
|
||
if self.state.get("offset") is not None:
|
||
return
|
||
try:
|
||
updates = self.telegram.get_updates(offset=None, timeout_sec=0)
|
||
if updates:
|
||
self.state["offset"] = int(updates[-1]["update_id"]) + 1
|
||
else:
|
||
self.state["offset"] = 0
|
||
self._persist_state()
|
||
except Exception as e:
|
||
print(f"[py-bot] Не удалось инициализировать offset: {e}", flush=True)
|
||
self.state["offset"] = 0
|
||
self._persist_state()
|
||
|
||
def _current_history_file(self) -> Path:
|
||
return self._current_history_file_for_user(self.cfg.allowed_username)
|
||
|
||
def _history_dirs_for_user(self, username: str) -> tuple[Path, Path]:
|
||
uname = normalize_username(username) or "unknown"
|
||
history_dir = self.history_dir / uname
|
||
archive_dir = history_dir / "archive"
|
||
history_dir.mkdir(parents=True, exist_ok=True)
|
||
archive_dir.mkdir(parents=True, exist_ok=True)
|
||
return history_dir, archive_dir
|
||
|
||
def _ensure_user_session(self, username: str) -> None:
|
||
uname = normalize_username(username) or self.cfg.allowed_username
|
||
sessions = self.state.setdefault("user_sessions", {})
|
||
if not isinstance(sessions, dict):
|
||
sessions = {}
|
||
self.state["user_sessions"] = sessions
|
||
self._user_settings(uname)
|
||
session = sessions.get(uname)
|
||
if isinstance(session, dict) and session.get("current_history_file"):
|
||
return
|
||
history_file = self._create_new_history_file("initial", uname)
|
||
sessions[uname] = {"current_history_file": str(history_file)}
|
||
if uname == self.cfg.allowed_username:
|
||
self.state["current_history_file"] = str(history_file)
|
||
self._persist_state()
|
||
|
||
def _current_history_file_for_user(self, username: str) -> Path:
|
||
uname = normalize_username(username) or self.cfg.allowed_username
|
||
self._ensure_user_session(uname)
|
||
sessions = self.state.get("user_sessions") or {}
|
||
session = sessions.get(uname) or {}
|
||
return Path(session["current_history_file"])
|
||
|
||
def _create_new_history_file(self, reason: str, username: str) -> Path:
|
||
ts = dt.datetime.now().strftime("%Y-%m-%d_%H%M%S")
|
||
rnd = "".join(random.choices(string.hexdigits.lower(), k=8))
|
||
history_dir, _ = self._history_dirs_for_user(username)
|
||
path = history_dir / f"{ts}_{rnd}.jsonl"
|
||
JsonLineStore.append(path, {
|
||
"ts": now_iso(),
|
||
"type": "history_created",
|
||
"reason": reason,
|
||
"username": normalize_username(username),
|
||
})
|
||
return path
|
||
|
||
def _rotate_history(self, reason: str, username: str) -> Path:
|
||
uname = normalize_username(username) or self.cfg.allowed_username
|
||
current = self._current_history_file_for_user(uname)
|
||
_, archive_dir = self._history_dirs_for_user(uname)
|
||
if current.exists():
|
||
archived = archive_dir / current.name
|
||
current.replace(archived)
|
||
else:
|
||
archived = archive_dir / "(empty)"
|
||
new_file = self._create_new_history_file(reason, uname)
|
||
sessions = self.state.setdefault("user_sessions", {})
|
||
if not isinstance(sessions, dict):
|
||
sessions = {}
|
||
self.state["user_sessions"] = sessions
|
||
sessions[uname] = {"current_history_file": str(new_file)}
|
||
if uname == self.cfg.allowed_username:
|
||
self.state["current_history_file"] = str(new_file)
|
||
self._persist_state()
|
||
self._append_history_event("history_rotated", {"reason": reason, "username": uname, "archived": str(archived)}, username=uname)
|
||
return archived
|
||
|
||
def _user_settings(self, username: str) -> dict[str, Any]:
|
||
uname = normalize_username(username) or self.cfg.allowed_username
|
||
settings = self.state.get("user_settings")
|
||
if not isinstance(settings, dict):
|
||
settings = {}
|
||
self.state["user_settings"] = settings
|
||
user_settings = settings.get(uname)
|
||
if not isinstance(user_settings, dict):
|
||
user_settings = {}
|
||
settings[uname] = user_settings
|
||
if not isinstance(user_settings.get("voice_replies_enabled"), bool):
|
||
user_settings["voice_replies_enabled"] = True
|
||
if not isinstance(user_settings.get("voice_rewrite_enabled"), bool):
|
||
user_settings["voice_rewrite_enabled"] = True
|
||
return user_settings
|
||
|
||
def _voice_replies_enabled(self, username: str) -> bool:
|
||
return bool(self._user_settings(username).get("voice_replies_enabled"))
|
||
|
||
def _set_voice_replies_enabled(self, username: str, enabled: bool) -> None:
|
||
self._user_settings(username)["voice_replies_enabled"] = enabled
|
||
self._persist_state()
|
||
|
||
def _voice_rewrite_enabled(self, username: str) -> bool:
|
||
return bool(self._user_settings(username).get("voice_rewrite_enabled"))
|
||
|
||
def _set_voice_rewrite_enabled(self, username: str, enabled: bool) -> None:
|
||
self._user_settings(username)["voice_rewrite_enabled"] = enabled
|
||
self._persist_state()
|
||
|
||
def _remember_private_chat(self, username: str, chat_id: int) -> None:
|
||
uname = normalize_username(username)
|
||
if not uname:
|
||
return
|
||
private_chats = self.state.get("private_chat_ids")
|
||
if not isinstance(private_chats, dict):
|
||
private_chats = {}
|
||
self.state["private_chat_ids"] = private_chats
|
||
if private_chats.get(uname) == chat_id:
|
||
return
|
||
private_chats[uname] = chat_id
|
||
self._persist_state()
|
||
|
||
def _private_chat_id_for_user(self, username: str) -> int | None:
|
||
private_chats = self.state.get("private_chat_ids")
|
||
if not isinstance(private_chats, dict):
|
||
return None
|
||
chat_id = private_chats.get(normalize_username(username))
|
||
return self._resolve_chat_id(chat_id) if isinstance(chat_id, int) else None
|
||
|
||
def _append_history(self, history_path: Path, event_type: str, payload: dict[str, Any]) -> None:
|
||
row = {"ts": now_iso(), "type": event_type}
|
||
row.update(payload)
|
||
JsonLineStore.append(history_path, row)
|
||
|
||
def _append_history_event(self, event_type: str, payload: dict[str, Any], username: str | None = None) -> None:
|
||
history_path = self._current_history_file_for_user(username or self.cfg.allowed_username)
|
||
self._append_history(history_path, "system_event", {"event": event_type, **payload})
|
||
|
||
def _load_task_items(self) -> list[dict[str, Any]]:
|
||
with self.task_center_lock:
|
||
if not self.task_center_file.exists():
|
||
return []
|
||
try:
|
||
data = json.loads(self.task_center_file.read_text(encoding="utf-8"))
|
||
except Exception:
|
||
return []
|
||
return data if isinstance(data, list) else []
|
||
|
||
def _save_task_items(self, items: list[dict[str, Any]]) -> None:
|
||
with self.task_center_lock:
|
||
self.task_center_file.parent.mkdir(parents=True, exist_ok=True)
|
||
tmp = self.task_center_file.with_suffix(".tmp")
|
||
tmp.write_text(json.dumps(items, ensure_ascii=False, indent=2), encoding="utf-8")
|
||
tmp.replace(self.task_center_file)
|
||
|
||
def _next_task_item_id(self, items: list[dict[str, Any]]) -> str:
|
||
max_num = 0
|
||
for item in items:
|
||
raw = str(item.get("id") or "")
|
||
match = re.match(r"TC-(\d+)$", raw)
|
||
if match:
|
||
max_num = max(max_num, int(match.group(1)))
|
||
return f"TC-{max_num + 1:04d}"
|
||
|
||
def _create_task_item(
|
||
self,
|
||
*,
|
||
kind: str,
|
||
title: str,
|
||
text: str,
|
||
source_username: str,
|
||
target_username: str,
|
||
source_message_id: int | None = None,
|
||
source_chat_id: int | None = None,
|
||
opinion: str = "",
|
||
) -> dict[str, Any]:
|
||
items = self._load_task_items()
|
||
item = {
|
||
"id": self._next_task_item_id(items),
|
||
"kind": kind,
|
||
"status": "new",
|
||
"title": compact_spaces(title)[:160] or ("Предложение" if kind == "proposal" else "Задача"),
|
||
"text": (text or "").strip(),
|
||
"opinion": (opinion or "").strip(),
|
||
"source_username": normalize_username(source_username),
|
||
"source_name": self._display_name(source_username),
|
||
"target_username": normalize_username(target_username),
|
||
"target_name": self._display_name(target_username),
|
||
"source_chat_id": source_chat_id,
|
||
"source_message_id": source_message_id,
|
||
"created_at": now_iso(),
|
||
"updated_at": now_iso(),
|
||
}
|
||
items.append(item)
|
||
self._save_task_items(items)
|
||
self._append_history_event("task_center_item_created", {
|
||
"itemId": item["id"],
|
||
"kind": kind,
|
||
"sourceUsername": item["source_username"],
|
||
"targetUsername": item["target_username"],
|
||
"title": item["title"],
|
||
}, username=source_username)
|
||
return item
|
||
|
||
def _task_items_for_user(self, username: str, *, include_done: bool = False) -> list[dict[str, Any]]:
|
||
uname = normalize_username(username)
|
||
items = self._load_task_items()
|
||
result = [
|
||
item for item in items
|
||
if normalize_username(item.get("target_username")) == uname
|
||
and (include_done or item.get("status") != "done")
|
||
]
|
||
return sorted(result, key=lambda x: str(x.get("created_at") or ""))
|
||
|
||
def _task_center_counts_text(self, username: str) -> str:
|
||
counts: dict[str, int] = {}
|
||
for item in self._task_items_for_user(username):
|
||
status = str(item.get("status") or "new")
|
||
counts[status] = counts.get(status, 0) + 1
|
||
active_total = sum(counts.values())
|
||
if active_total <= 0:
|
||
return ""
|
||
parts = []
|
||
for status in ("new", "approved", "needs_work", "rejected"):
|
||
count = counts.get(status, 0)
|
||
if count:
|
||
parts.append(f"{TASK_STATUS_LABELS.get(status, status)}: {count}")
|
||
return f"Напоминание по задачам: всего активных {active_total}; " + ", ".join(parts) + "."
|
||
|
||
def _format_task_items(self, username: str, *, include_done: bool = False) -> str:
|
||
items = self._task_items_for_user(username, include_done=include_done)
|
||
if not items:
|
||
return f"Для {self._display_name(username)} активных задач и предложений нет."
|
||
lines = [f"Задачи и предложения для {self._display_name(username)}:"]
|
||
for item in items[:15]:
|
||
kind = "предложение" if item.get("kind") == "proposal" else "задача"
|
||
status = TASK_STATUS_LABELS.get(str(item.get("status") or "new"), str(item.get("status") or "new"))
|
||
source = item.get("source_name") or item.get("source_username") or "неизвестно"
|
||
title = item.get("title") or "(без названия)"
|
||
lines.append(f"{item.get('id')} [{status}] {kind} от {source}: {title}")
|
||
if len(items) > 15:
|
||
lines.append(f"...и ещё {len(items) - 15}")
|
||
return "\n".join(lines)
|
||
|
||
def _update_task_item_status(self, item_id: str, status: str) -> dict[str, Any] | None:
|
||
item_id = (item_id or "").strip().upper()
|
||
items = self._load_task_items()
|
||
updated = None
|
||
for item in items:
|
||
if str(item.get("id") or "").upper() == item_id:
|
||
item["status"] = status
|
||
item["updated_at"] = now_iso()
|
||
updated = item
|
||
break
|
||
if updated is not None:
|
||
self._save_task_items(items)
|
||
return updated
|
||
|
||
def _find_first_task_item(self, *, source_username: str = "", target_username: str = "", kind: str = "") -> dict[str, Any] | None:
|
||
source = normalize_username(source_username)
|
||
target = normalize_username(target_username)
|
||
for item in self._load_task_items():
|
||
if item.get("status") == "done":
|
||
continue
|
||
if source and normalize_username(item.get("source_username")) != source:
|
||
continue
|
||
if target and normalize_username(item.get("target_username")) != target:
|
||
continue
|
||
if kind and item.get("kind") != kind:
|
||
continue
|
||
return item
|
||
return None
|
||
|
||
def _notify_user_about_task_item(self, username: str, item: dict[str, Any]) -> None:
|
||
chat_id = self._private_chat_id_for_user(username)
|
||
if chat_id is None:
|
||
return
|
||
kind = "предложение" if item.get("kind") == "proposal" else "задача"
|
||
source = item.get("source_name") or item.get("source_username") or "кто-то"
|
||
title = item.get("title") or "(без названия)"
|
||
status = TASK_STATUS_LABELS.get(str(item.get("status") or "new"), str(item.get("status") or "new"))
|
||
if item.get("status") == "new":
|
||
text = f"У тебя новое {kind} от {source}: {title}\nID: {item.get('id')}"
|
||
else:
|
||
text = f"Обновление по {kind} {item.get('id')}: статус «{status}».\n{title}"
|
||
self._safe_send(chat_id, text)
|
||
|
||
def _send_player_welcome_once(self, chat_id: int, message_id: int, username: str) -> None:
|
||
uname = normalize_username(username)
|
||
sent = self.state.get("player_welcome_sent")
|
||
if not isinstance(sent, dict):
|
||
sent = {}
|
||
self.state["player_welcome_sent"] = sent
|
||
if sent.get(uname):
|
||
return
|
||
player_name = self._player_name(uname)
|
||
text = (
|
||
f"Привет, {player_name}.\n"
|
||
"Можно задавать вопросы по проекту, просить анализ, идеи и подготовку готового ТЗ.\n"
|
||
"Команда /new начинает новую сессию и архивирует текущую историю."
|
||
)
|
||
reminder = self._task_center_counts_text(uname)
|
||
if reminder:
|
||
text = f"{text}\n\n{reminder}\nКоманда /tasks покажет список."
|
||
self._safe_send(chat_id, text, reply_to=message_id)
|
||
sent[uname] = now_iso()
|
||
self._persist_state()
|
||
|
||
def _resolve_chat_id(self, chat_id: int) -> int:
|
||
migrations = self.state.get("chat_id_migrations")
|
||
if not isinstance(migrations, dict):
|
||
return chat_id
|
||
current = chat_id
|
||
visited: set[int] = set()
|
||
while current not in visited:
|
||
visited.add(current)
|
||
next_chat_id = migrations.get(str(current))
|
||
if not isinstance(next_chat_id, int):
|
||
break
|
||
current = next_chat_id
|
||
return current
|
||
|
||
def _remember_chat_migration(self, old_chat_id: int, new_chat_id: int, source: str) -> None:
|
||
if old_chat_id == new_chat_id:
|
||
return
|
||
migrations = self.state.get("chat_id_migrations")
|
||
if not isinstance(migrations, dict):
|
||
migrations = {}
|
||
self.state["chat_id_migrations"] = migrations
|
||
if migrations.get(str(old_chat_id)) == new_chat_id:
|
||
return
|
||
migrations[str(old_chat_id)] = new_chat_id
|
||
self._persist_state()
|
||
with self.queue_lock:
|
||
changed = False
|
||
for job in self.queue:
|
||
if job.get("chat_id") == old_chat_id:
|
||
job["chat_id"] = new_chat_id
|
||
job["updated_at"] = now_iso()
|
||
changed = True
|
||
if changed:
|
||
self._persist_queue()
|
||
self._append_history_event("chat_migrated_to_supergroup", {
|
||
"oldChatId": old_chat_id,
|
||
"newChatId": new_chat_id,
|
||
"source": source,
|
||
})
|
||
|
||
@staticmethod
|
||
def _extract_migrate_to_chat_id(error_text: str) -> int | None:
|
||
match = re.search(r'"migrate_to_chat_id"\s*:\s*(-?\d+)', error_text)
|
||
if match:
|
||
return int(match.group(1))
|
||
match = re.search(r"'migrate_to_chat_id'\s*:\s*(-?\d+)", error_text)
|
||
if match:
|
||
return int(match.group(1))
|
||
return None
|
||
|
||
def _handle_update(self, update: dict[str, Any]) -> None:
|
||
message = update.get("message")
|
||
update_type = "message"
|
||
if not isinstance(message, dict):
|
||
message = update.get("channel_post")
|
||
update_type = "channel_post"
|
||
if not isinstance(message, dict):
|
||
return
|
||
chat = message.get("chat") or {}
|
||
chat_id = chat.get("id")
|
||
message_id = message.get("message_id")
|
||
chat_type = str(chat.get("type") or "")
|
||
chat_username = normalize_username(chat.get("username"))
|
||
chat_title = str(chat.get("title") or "")
|
||
sender = message.get("from") or {}
|
||
username = normalize_username(sender.get("username"))
|
||
author_signature = str(message.get("author_signature") or "").strip()
|
||
author_username = username or normalize_username(author_signature)
|
||
if not isinstance(chat_id, int) or not isinstance(message_id, int):
|
||
return
|
||
|
||
migrate_to_chat_id = message.get("migrate_to_chat_id")
|
||
if isinstance(migrate_to_chat_id, int):
|
||
self._remember_chat_migration(chat_id, migrate_to_chat_id, "telegram_message")
|
||
return
|
||
|
||
update_key = f"{chat_id}:{message_id}"
|
||
if self._mark_processed_update(update_key):
|
||
return
|
||
|
||
is_channel_post = update_type == "channel_post" or chat_type == "channel"
|
||
is_group_message = update_type == "message" and chat_type in ("group", "supergroup")
|
||
is_allowed_channel = (
|
||
not is_channel_post
|
||
or not self.cfg.allowed_channel_username
|
||
or chat_username == self.cfg.allowed_channel_username
|
||
)
|
||
if is_channel_post and not is_allowed_channel:
|
||
return
|
||
if chat_username and chat_username == self.cfg.allowed_channel_username:
|
||
self._remember_public_report_chat(chat_id)
|
||
|
||
# Игнорируем системные сообщения о входе/выходе и смене заголовка/фото.
|
||
if message.get("new_chat_members") or message.get("left_chat_member"):
|
||
return
|
||
if message.get("group_chat_created") or message.get("supergroup_chat_created") or message.get("channel_chat_created"):
|
||
return
|
||
|
||
text = (message.get("text") or message.get("caption") or "").strip()
|
||
actor_username = normalize_username(author_username)
|
||
is_allowed = self._is_allowed_user(actor_username)
|
||
is_private = chat_type == "private"
|
||
if not is_allowed:
|
||
if is_private:
|
||
self._safe_send(chat_id, "Извините, доступ к этому агенту пока не выдан. Обратитесь к Айдару.", reply_to=message_id)
|
||
return
|
||
if self._is_allowed_player(actor_username) and not is_private:
|
||
return
|
||
|
||
self._ensure_user_session(actor_username)
|
||
if is_private:
|
||
self._remember_private_chat(actor_username, chat_id)
|
||
history_path = self._current_history_file_for_user(actor_username)
|
||
if self._is_allowed_player(actor_username):
|
||
self._send_player_welcome_once(chat_id, message_id, actor_username)
|
||
|
||
if not text:
|
||
if message.get("voice"):
|
||
self._enqueue_voice_job(
|
||
chat_id,
|
||
message_id,
|
||
actor_username,
|
||
message["voice"].get("file_id"),
|
||
media_type="voice",
|
||
update_type=update_type,
|
||
chat_username=chat_username,
|
||
chat_title=chat_title,
|
||
author_signature=author_signature,
|
||
chat_type=chat_type,
|
||
)
|
||
return
|
||
if message.get("audio"):
|
||
self._enqueue_voice_job(
|
||
chat_id,
|
||
message_id,
|
||
actor_username,
|
||
message["audio"].get("file_id"),
|
||
media_type="audio",
|
||
update_type=update_type,
|
||
chat_username=chat_username,
|
||
chat_title=chat_title,
|
||
author_signature=author_signature,
|
||
chat_type=chat_type,
|
||
)
|
||
return
|
||
self._safe_send(chat_id, "Поддерживаются текст, voice и audio.", reply_to=message_id)
|
||
return
|
||
|
||
if text.startswith("/"):
|
||
self._handle_command(chat_id, message_id, actor_username, text)
|
||
return
|
||
|
||
if self._handle_task_center_text(chat_id, message_id, actor_username, text):
|
||
return
|
||
|
||
self._append_history(history_path, "incoming_text", {
|
||
"chatId": chat_id,
|
||
"messageId": message_id,
|
||
"updateType": update_type,
|
||
"chatType": chat_type,
|
||
"chatUsername": chat_username,
|
||
"chatTitle": chat_title,
|
||
"username": actor_username,
|
||
"authorSignature": author_signature,
|
||
"text": text,
|
||
})
|
||
job = self._build_job_base(chat_id, message_id, actor_username, str(history_path))
|
||
job["type"] = "text"
|
||
job["text"] = text
|
||
job["update_type"] = update_type
|
||
job["chat_type"] = chat_type
|
||
job["chat_username"] = chat_username
|
||
job["chat_title"] = chat_title
|
||
job["author_signature"] = author_signature
|
||
job["role"] = "owner" if self._is_owner(actor_username) else "player"
|
||
job["player_name"] = self._player_name(actor_username) if job["role"] == "player" else ""
|
||
with self.queue_lock:
|
||
self.queue.append(job)
|
||
self._persist_queue()
|
||
self._safe_send(chat_id, f"Принял задачу #{job['num']}", reply_to=message_id)
|
||
|
||
def _enqueue_voice_job(
|
||
self,
|
||
chat_id: int,
|
||
message_id: int,
|
||
username: str,
|
||
file_id: str | None,
|
||
*,
|
||
media_type: str = "voice",
|
||
update_type: str = "message",
|
||
chat_username: str = "",
|
||
chat_title: str = "",
|
||
author_signature: str = "",
|
||
chat_type: str = "",
|
||
) -> None:
|
||
if not file_id:
|
||
self._safe_send(chat_id, "Не удалось прочитать file_id голосового.", reply_to=message_id)
|
||
return
|
||
history_path = self._current_history_file_for_user(username)
|
||
self._append_history(history_path, "incoming_voice", {
|
||
"chatId": chat_id,
|
||
"messageId": message_id,
|
||
"updateType": update_type,
|
||
"chatType": chat_type,
|
||
"chatUsername": chat_username,
|
||
"chatTitle": chat_title,
|
||
"username": username,
|
||
"authorSignature": author_signature,
|
||
"fileId": file_id,
|
||
"mediaType": media_type,
|
||
})
|
||
job = self._build_job_base(chat_id, message_id, username, str(history_path))
|
||
job["type"] = "voice"
|
||
job["telegram_file_id"] = file_id
|
||
job["telegram_media_type"] = media_type
|
||
job["update_type"] = update_type
|
||
job["chat_type"] = chat_type
|
||
job["chat_username"] = chat_username
|
||
job["chat_title"] = chat_title
|
||
job["author_signature"] = author_signature
|
||
job["role"] = "owner" if self._is_owner(username) else "player"
|
||
job["player_name"] = self._player_name(username) if job["role"] == "player" else ""
|
||
with self.queue_lock:
|
||
self.queue.append(job)
|
||
self._persist_queue()
|
||
self._safe_send(chat_id, f"Принял voice в задачу #{job['num']}", reply_to=message_id)
|
||
|
||
def _build_job_base(self, chat_id: int, message_id: int, username: str, history_file: str) -> dict[str, Any]:
|
||
with self.queue_lock:
|
||
num = int(self.state.get("next_job_number", 1))
|
||
self.state["next_job_number"] = num + 1
|
||
self._persist_state()
|
||
return {
|
||
"id": str(uuid.uuid4()),
|
||
"num": num,
|
||
"status": "pending",
|
||
"type": "text",
|
||
"chat_id": chat_id,
|
||
"message_id": message_id,
|
||
"username": username,
|
||
"update_type": "message",
|
||
"chat_type": "",
|
||
"chat_username": "",
|
||
"chat_title": "",
|
||
"author_signature": "",
|
||
"role": "owner",
|
||
"player_name": "",
|
||
"text": "",
|
||
"telegram_file_id": "",
|
||
"telegram_media_type": "",
|
||
"history_file": history_file,
|
||
"attempts": 0,
|
||
"retry_reason": "",
|
||
"last_error": "",
|
||
"created_at": now_iso(),
|
||
"updated_at": now_iso(),
|
||
"active_since": None,
|
||
}
|
||
|
||
def _handle_task_center_text(self, chat_id: int, message_id: int, username: str, text: str) -> bool:
|
||
source_text = (text or "").strip()
|
||
lower = source_text.lower()
|
||
is_owner = self._is_owner(username)
|
||
|
||
if re.search(r"\b(покажи|список|какие)\b.*\b(задач|задачи|предложени)", lower):
|
||
target = username
|
||
explicit_target = self._find_user_by_text(source_text)
|
||
if is_owner and explicit_target:
|
||
target = explicit_target
|
||
self._safe_send(chat_id, self._format_task_items(target), reply_to=message_id)
|
||
return True
|
||
|
||
if is_owner:
|
||
assign_match = re.search(
|
||
r"(?:поставь|добавь|создай|запиши)\s+(?:задачу|задание)\s+(.+?)(?::|\s+-\s+|\s+—\s+)(.+)",
|
||
source_text,
|
||
flags=re.IGNORECASE | re.DOTALL,
|
||
)
|
||
if assign_match:
|
||
target = self._find_user_by_text(assign_match.group(1))
|
||
body = assign_match.group(2).strip()
|
||
if target and body:
|
||
item = self._create_task_item(
|
||
kind="task",
|
||
title=body,
|
||
text=body,
|
||
source_username=username,
|
||
target_username=target,
|
||
source_message_id=message_id,
|
||
source_chat_id=chat_id,
|
||
)
|
||
self._notify_user_about_task_item(target, item)
|
||
self._safe_send(
|
||
chat_id,
|
||
f"Задача добавлена для {self._display_name(target)}: {item['id']} — {item['title']}",
|
||
reply_to=message_id,
|
||
)
|
||
return True
|
||
|
||
status_match = re.search(r"\b(одобрить|отклонить|доработать|закрыть|сделано|закрыта)\b(?:\s+(.+))?", lower, flags=re.IGNORECASE)
|
||
if status_match and ("задач" in lower or "предложени" in lower or re.search(r"tc-\d+", lower, flags=re.IGNORECASE)):
|
||
action = status_match.group(1)
|
||
tail = status_match.group(2) or ""
|
||
status = {
|
||
"одобрить": "approved",
|
||
"отклонить": "rejected",
|
||
"доработать": "needs_work",
|
||
"закрыть": "done",
|
||
"сделано": "done",
|
||
"закрыта": "done",
|
||
}.get(action, "new")
|
||
id_match = re.search(r"tc-\d+", source_text, flags=re.IGNORECASE)
|
||
item = None
|
||
if id_match:
|
||
item = self._update_task_item_status(id_match.group(0), status)
|
||
else:
|
||
source_user = self._find_user_by_text(tail)
|
||
item = self._find_first_task_item(
|
||
source_username=source_user,
|
||
target_username=username,
|
||
kind="proposal" if "предложени" in lower else "",
|
||
)
|
||
if item:
|
||
item = self._update_task_item_status(str(item.get("id")), status)
|
||
if item:
|
||
label = TASK_STATUS_LABELS.get(status, status)
|
||
self._safe_send(chat_id, f"{item.get('id')} обновлена: {label}.", reply_to=message_id)
|
||
source_user = normalize_username(item.get("source_username"))
|
||
if source_user and source_user != username:
|
||
self._notify_user_about_task_item(source_user, item)
|
||
return True
|
||
|
||
if not is_owner:
|
||
proposal_match = re.match(r"\s*(?:предложение|идея|заявка)\s*[::-]\s*(.+)", source_text, flags=re.IGNORECASE | re.DOTALL)
|
||
if proposal_match:
|
||
body = proposal_match.group(1).strip()
|
||
if body:
|
||
item = self._create_task_item(
|
||
kind="proposal",
|
||
title=body,
|
||
text=body,
|
||
opinion="Нужно решение Айдара: одобрить, отклонить или отправить на доработку.",
|
||
source_username=username,
|
||
target_username=self.cfg.allowed_username,
|
||
source_message_id=message_id,
|
||
source_chat_id=chat_id,
|
||
)
|
||
self._notify_user_about_task_item(self.cfg.allowed_username, item)
|
||
self._safe_send(
|
||
chat_id,
|
||
f"Предложение отправлено Айдару как {item['id']}. Статус: новая.",
|
||
reply_to=message_id,
|
||
)
|
||
return True
|
||
|
||
return False
|
||
|
||
def _handle_command(self, chat_id: int, message_id: int, username: str, text: str) -> None:
|
||
lower = text.lower()
|
||
command = lower.split(maxsplit=1)[0].split("@", 1)[0]
|
||
is_owner = self._is_owner(username)
|
||
if command in ("/start", "/help"):
|
||
self._safe_send(chat_id, self._help_text(is_owner=is_owner), reply_to=message_id)
|
||
return
|
||
if command == "/status":
|
||
self._safe_send(chat_id, self._status_text(username), reply_to=message_id)
|
||
return
|
||
if command == "/queue":
|
||
self._safe_send(chat_id, self._queue_text(), reply_to=message_id)
|
||
return
|
||
if command in ("/tasks", "/my_tasks"):
|
||
parts = text.split(maxsplit=1)
|
||
target = username
|
||
if is_owner and len(parts) > 1:
|
||
parsed = self._find_user_by_text(parts[1])
|
||
if parsed:
|
||
target = parsed
|
||
self._safe_send(chat_id, self._format_task_items(target), reply_to=message_id)
|
||
return
|
||
if command == "/voice_on":
|
||
self._set_voice_replies_enabled(username, True)
|
||
self._append_history_event("voice_replies_enabled", {"username": normalize_username(username)}, username=username)
|
||
self._safe_send(chat_id, "Озвучивание финальных ответов включено для вашего пользователя.", reply_to=message_id)
|
||
return
|
||
if command == "/voice_off":
|
||
self._set_voice_replies_enabled(username, False)
|
||
self._append_history_event("voice_replies_disabled", {"username": normalize_username(username)}, username=username)
|
||
self._safe_send(chat_id, "Озвучивание финальных ответов выключено для вашего пользователя.", reply_to=message_id)
|
||
return
|
||
if command in ("/voice_status", "/voice_rewrite_status"):
|
||
self._safe_send(chat_id, self._status_text(username), reply_to=message_id)
|
||
return
|
||
if command == "/voice_rewrite_on":
|
||
self._set_voice_rewrite_enabled(username, True)
|
||
self._append_history_event("voice_rewrite_enabled", {"username": normalize_username(username)}, username=username)
|
||
self._safe_send(chat_id, "Адаптация текста перед озвучкой включена для вашего пользователя.", reply_to=message_id)
|
||
return
|
||
if command == "/voice_rewrite_off":
|
||
self._set_voice_rewrite_enabled(username, False)
|
||
self._append_history_event("voice_rewrite_disabled", {"username": normalize_username(username)}, username=username)
|
||
self._safe_send(chat_id, "Адаптация текста перед озвучкой выключена для вашего пользователя.", reply_to=message_id)
|
||
return
|
||
if command == "/new":
|
||
archived = self._rotate_history("command_new", username)
|
||
self._safe_send(chat_id, f"История очищена. Новый диалог начат.\nАрхив: {archived.name}", reply_to=message_id)
|
||
return
|
||
if command in ("/restart_service", "/restart"):
|
||
if not is_owner:
|
||
self._safe_send(chat_id, "Команда недоступна.", reply_to=message_id)
|
||
return
|
||
self._append_history_event("restart_service_deferred_requested", {
|
||
"chatId": chat_id,
|
||
"messageId": message_id,
|
||
"username": username,
|
||
}, username=username)
|
||
self._safe_send(
|
||
chat_id,
|
||
"Отложенный рестарт принят. Если задача сейчас выполняется, сервис перезапустится после её завершения и до следующей задачи.",
|
||
reply_to=message_id,
|
||
)
|
||
self._request_deferred_restart()
|
||
return
|
||
if command in ("/restart_hard", "/restart_now", "/restart_force"):
|
||
if not is_owner:
|
||
self._safe_send(chat_id, "Команда недоступна.", reply_to=message_id)
|
||
return
|
||
self._append_history_event("restart_service_hard_requested", {
|
||
"chatId": chat_id,
|
||
"messageId": message_id,
|
||
"username": username,
|
||
}, username=username)
|
||
self._safe_send(
|
||
chat_id,
|
||
"Выполняю жёсткий рестарт сервиса прямо сейчас. Активная задача, если есть, будет прервана и после старта вернётся в очередь.",
|
||
reply_to=message_id,
|
||
)
|
||
self._schedule_self_restart("hard_restart_requested", force=True)
|
||
return
|
||
if command == "/stop":
|
||
stopped = self._cancel_active_job("stopped_by_user")
|
||
if stopped:
|
||
self._safe_send(chat_id, "Текущая задача остановлена и удалена из очереди.", reply_to=message_id)
|
||
else:
|
||
self._safe_send(chat_id, "Сейчас нет активной задачи.", reply_to=message_id)
|
||
return
|
||
if command == "/cancel":
|
||
parts = text.split(maxsplit=1)
|
||
if len(parts) < 2:
|
||
self._safe_send(chat_id, "Использование: /cancel <id|all>", reply_to=message_id)
|
||
return
|
||
arg = parts[1].strip()
|
||
if arg.lower() == "all":
|
||
with self.queue_lock:
|
||
self.stop_current_job = True
|
||
self._stop_active_codex_process()
|
||
count = len(self.queue)
|
||
self.queue = []
|
||
self._persist_queue()
|
||
self._safe_send(chat_id, f"Удалено задач из очереди: {count}", reply_to=message_id)
|
||
return
|
||
cancelled = self._cancel_by_id_prefix(arg)
|
||
self._safe_send(chat_id, f"Задача удалена: {arg}" if cancelled else f"Задача не найдена: {arg}", reply_to=message_id)
|
||
return
|
||
|
||
def _help_text(self, *, is_owner: bool) -> str:
|
||
lines = [
|
||
"Доступные команды:",
|
||
"/status — активная задача и размер очереди",
|
||
"/queue — список задач в очереди",
|
||
"/tasks — список ваших задач и предложений",
|
||
"/stop — остановить текущую задачу",
|
||
"/cancel <id|all> — удалить задачу по id (префикс) или все",
|
||
"/new — архивировать историю и начать новую",
|
||
"/voice_on — включить озвучивание финальных ответов",
|
||
"/voice_off — выключить озвучивание финальных ответов",
|
||
"/voice_rewrite_on — включить адаптацию текста перед озвучкой",
|
||
"/voice_rewrite_off — выключить адаптацию текста перед озвучкой",
|
||
"/help — эта справка",
|
||
]
|
||
if is_owner:
|
||
lines.insert(-1, "/tasks <пользователь> — список задач игрока")
|
||
lines.insert(-1, "/restart — отложенный рестарт после текущей задачи")
|
||
lines.insert(-1, "/restart_hard — жёсткий рестарт прямо сейчас")
|
||
return "\n".join(lines)
|
||
|
||
def _status_text(self, username: str) -> str:
|
||
with self.queue_lock:
|
||
active = next((j for j in self.queue if j.get("status") == "active"), None)
|
||
pending = sum(1 for j in self.queue if j.get("status") == "pending")
|
||
voice_status = "включено" if self._voice_replies_enabled(username) else "выключено"
|
||
rewrite_status = "включена" if self._voice_rewrite_enabled(username) else "выключена"
|
||
settings_text = (
|
||
f"Голосовые ответы: {voice_status}\n"
|
||
f"Адаптация текста перед озвучкой: {rewrite_status}"
|
||
)
|
||
restart_text = "\nОтложенный рестарт: ожидает завершения текущей задачи" if self.restart_requested else ""
|
||
if not active:
|
||
return f"Статус: активной задачи нет.\nВ очереди pending: {pending}\n{settings_text}{restart_text}"
|
||
elapsed = int(time.time() - (self.active_job_started_at or time.time()))
|
||
return (
|
||
f"Статус: активная задача #{active.get('num', '?')}\n"
|
||
f"Тип: {active.get('type', 'text')}\n"
|
||
f"Попытка: {int(active.get('attempts', 0)) + 1}/{self.cfg.max_retries}\n"
|
||
f"Выполняется: {elapsed}с\n"
|
||
f"Pending: {pending}\n"
|
||
f"{settings_text}{restart_text}"
|
||
)
|
||
|
||
def _queue_text(self) -> str:
|
||
with self.queue_lock:
|
||
items = list(self.queue)
|
||
if not items:
|
||
return "Очередь пуста."
|
||
lines = [f"Очередь: {len(items)}"]
|
||
for i, job in enumerate(items[:10], start=1):
|
||
lines.append(
|
||
f"{i}) #{job.get('num', '?')} [{job.get('status')}] {job.get('type')} attempts={job.get('attempts', 0)}"
|
||
)
|
||
if len(items) > 10:
|
||
lines.append(f"...и ещё {len(items) - 10} задач")
|
||
return "\n".join(lines)
|
||
|
||
def _cancel_active_job(self, reason: str) -> bool:
|
||
with self.queue_lock:
|
||
active = next((j for j in self.queue if j.get("status") == "active"), None)
|
||
if not active:
|
||
return False
|
||
self.stop_current_job = True
|
||
self._stop_active_codex_process()
|
||
self.queue = [j for j in self.queue if j.get("id") != active.get("id")]
|
||
self._persist_queue()
|
||
self._append_history_event("job_stopped_by_user", {"jobId": active.get("id"), "reason": reason})
|
||
return True
|
||
|
||
def _cancel_by_id_prefix(self, prefix: str) -> bool:
|
||
prefix = prefix.strip().lower()
|
||
normalized_num = prefix.lstrip("#")
|
||
with self.queue_lock:
|
||
target = next(
|
||
(
|
||
j for j in self.queue
|
||
if str(j.get("id", "")).lower().startswith(prefix)
|
||
or str(j.get("num", "")).lower() == normalized_num
|
||
),
|
||
None
|
||
)
|
||
if not target:
|
||
return False
|
||
if target.get("status") == "active":
|
||
self.stop_current_job = True
|
||
self._stop_active_codex_process()
|
||
self.queue = [j for j in self.queue if j.get("id") != target.get("id")]
|
||
self._persist_queue()
|
||
return True
|
||
|
||
def _worker_loop(self) -> None:
|
||
while not self.stop_event.is_set():
|
||
if self.restart_requested:
|
||
self._exit_for_restart("deferred_restart_before_next_job")
|
||
return
|
||
job = None
|
||
with self.queue_lock:
|
||
for item in self.queue:
|
||
if item.get("status") == "pending":
|
||
item["status"] = "active"
|
||
item["active_since"] = now_iso()
|
||
item["updated_at"] = now_iso()
|
||
self.active_job_id = item.get("id")
|
||
self.active_job_started_at = time.time()
|
||
job = dict(item)
|
||
self._persist_queue()
|
||
break
|
||
if not job:
|
||
time.sleep(0.5)
|
||
continue
|
||
|
||
self.stop_current_job = False
|
||
self._process_job(job)
|
||
self.active_job_id = None
|
||
self.active_job_started_at = None
|
||
if self.restart_requested:
|
||
self._exit_for_restart("deferred_restart_after_job")
|
||
return
|
||
|
||
def _process_job(self, job: dict[str, Any]) -> None:
|
||
job_id = job["id"]
|
||
job_num = job.get("num", "?")
|
||
chat_id = int(job["chat_id"])
|
||
message_id = int(job["message_id"])
|
||
history_path = Path(job["history_file"])
|
||
self._safe_send(chat_id, f"Задача #{job_num} в работе.", reply_to=message_id)
|
||
try:
|
||
if job.get("type") == "voice":
|
||
self._safe_send(chat_id, f"#{job_num}: распознаю голосовое...", reply_to=message_id)
|
||
recognized = self._transcribe_voice_job(job)
|
||
job["text"] = recognized
|
||
self._append_history(history_path, "voice_transcription", {"jobId": job_id, "jobNum": job_num, "text": recognized})
|
||
preview = recognized.strip()
|
||
if len(preview) > 1200:
|
||
preview = preview[:1200] + " ...[обрезано]"
|
||
self._safe_send(chat_id, f"#{job_num}: распознано:\n{preview}", reply_to=message_id)
|
||
self._safe_send(chat_id, f"#{job_num}: распознано, отправляю в Codex.", reply_to=message_id)
|
||
|
||
prompt = self._build_prompt(job)
|
||
self._append_history(history_path, "codex_request", {"jobId": job_id, "prompt": prompt})
|
||
answer = self._run_codex(prompt, chat_id, message_id, job_id, job_num)
|
||
for chunk in split_long_text(answer):
|
||
self._safe_send(chat_id, chunk, reply_to=message_id)
|
||
self._append_history(history_path, "codex_response", {"jobId": job_id, "text": answer})
|
||
self._send_private_job_public_report(job, answer)
|
||
self._send_task_center_reminder(job)
|
||
if self._voice_replies_enabled(job.get("username") or ""):
|
||
self._send_voice_reply_for_answer(job, answer, history_path, job_id)
|
||
self._safe_send(chat_id, f"Готово #{job_num}.", reply_to=message_id)
|
||
self._mark_job_done(job_id)
|
||
except Exception as e:
|
||
if self.stop_current_job:
|
||
self._append_history(history_path, "job_stopped", {"jobId": job_id, "reason": str(e)})
|
||
self._safe_send(chat_id, f"Задача #{job_num} остановлена.", reply_to=message_id)
|
||
self._mark_job_removed(job_id)
|
||
self.stop_current_job = False
|
||
return
|
||
if isinstance(e, VoiceTranscriptionError):
|
||
self._append_history(history_path, "voice_transcription_failed", {
|
||
"jobId": job_id,
|
||
"jobNum": job_num,
|
||
"stage": e.stage,
|
||
"retryable": e.retryable,
|
||
"error": e.user_message,
|
||
"detail": e.detail,
|
||
})
|
||
self._handle_job_failure(job, e)
|
||
|
||
def _build_prompt(self, job: dict[str, Any]) -> str:
|
||
retry_block = ""
|
||
retry_reason = (job.get("retry_reason") or "").strip()
|
||
if retry_reason:
|
||
retry_block = f"\n\nПометка retry: {retry_reason}"
|
||
role = (job.get("role") or "owner").strip().lower()
|
||
player_block = ""
|
||
if role == "player":
|
||
player_name = (job.get("player_name") or "").strip()
|
||
player_dir = self.cfg.codex_workdir / "Players" / (job.get("username") or "")
|
||
player_block = (
|
||
"\n\nРежим игрока (обязательно):\n"
|
||
f"- Пользователь: {player_name} (@{job.get('username')}).\n"
|
||
f"- Рабочая папка игрока: {player_dir}\n"
|
||
"- Код проекта не изменять.\n"
|
||
"- Можно отвечать на вопросы по проекту, предлагать идеи и готовить ТЗ.\n"
|
||
"- Если нужны правки кода, описывать предложение текстом и сохранять материалы только в папке игрока."
|
||
)
|
||
return (
|
||
"Пришло сообщение в Telegram.\n"
|
||
f"Тип: {job.get('type')}\n"
|
||
f"Источник Telegram: {job.get('update_type', 'message')}\n"
|
||
f"Тип чата: {job.get('chat_type') or ''}\n"
|
||
f"Канал/чат: @{job.get('chat_username') or ''} {job.get('chat_title') or ''}\n"
|
||
f"Username отправителя: @{job.get('username')}\n"
|
||
f"Подпись автора в Telegram: {job.get('author_signature') or ''}\n"
|
||
"Текст для обработки:\n"
|
||
f"{job.get('text')}\n\n"
|
||
f"История диалога (JSONL): {job.get('history_file')}\n"
|
||
f"Инструкции агента: {self.cfg.agent_instructions_file}\n"
|
||
f"Работай в рабочем проекте аккуратно и верни только текст ответа пользователю.{player_block}{retry_block}"
|
||
)
|
||
|
||
def _run_codex(self, prompt: str, chat_id: int, message_id: int, job_id: str, job_num: Any) -> str:
|
||
output_lines: list[str] = []
|
||
with tempfile.NamedTemporaryFile(prefix="shine-codex-last-message-", suffix=".txt", delete=False) as tmp:
|
||
output_file = Path(tmp.name)
|
||
|
||
cmd = [
|
||
str(self.cfg.codex_bin),
|
||
"exec",
|
||
"--dangerously-bypass-approvals-and-sandbox",
|
||
"--json",
|
||
"-C", str(self.cfg.codex_workdir),
|
||
"-o", str(output_file),
|
||
prompt,
|
||
]
|
||
print(f"[py-bot] codex exec start job={job_id[:8]}", flush=True)
|
||
process = subprocess.Popen(
|
||
cmd,
|
||
stdin=subprocess.DEVNULL,
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.STDOUT,
|
||
text=True,
|
||
encoding="utf-8",
|
||
errors="replace",
|
||
bufsize=1,
|
||
)
|
||
with self.active_process_lock:
|
||
self.active_process = process
|
||
|
||
self.last_heartbeat_at = 0.0
|
||
last_user_note = ""
|
||
last_user_note_at = 0.0
|
||
codex_started_at = time.time()
|
||
last_job_message_at = codex_started_at
|
||
|
||
def on_line(line: str) -> None:
|
||
nonlocal last_user_note, last_user_note_at, last_job_message_at
|
||
output_lines.append(line)
|
||
note = self._extract_codex_user_note(line)
|
||
now = time.time()
|
||
if note and note != last_user_note and now - last_user_note_at > 8:
|
||
self._safe_send(chat_id, f"#{job_num}: {note}", reply_to=message_id)
|
||
last_user_note = note
|
||
last_user_note_at = now
|
||
last_job_message_at = now
|
||
|
||
reader_done = threading.Event()
|
||
|
||
def reader() -> None:
|
||
if not process.stdout:
|
||
reader_done.set()
|
||
return
|
||
for line in process.stdout:
|
||
on_line(line.rstrip("\n"))
|
||
reader_done.set()
|
||
|
||
t = threading.Thread(target=reader, name=f"codex-reader-{job_id[:8]}", daemon=True)
|
||
t.start()
|
||
|
||
try:
|
||
deadline = time.time() + self.cfg.codex_timeout_seconds
|
||
return_code = None
|
||
while return_code is None:
|
||
return_code = process.poll()
|
||
now = time.time()
|
||
if return_code is not None:
|
||
break
|
||
if now >= deadline:
|
||
process.kill()
|
||
t.join(timeout=2)
|
||
raise RuntimeError(f"Codex timeout after {self.cfg.codex_timeout_seconds}s")
|
||
if now - codex_started_at >= 120 and now - last_job_message_at >= 120:
|
||
elapsed = self._format_duration(int(now - codex_started_at))
|
||
self._safe_send(
|
||
chat_id,
|
||
f"#{job_num}: задача ещё выполняется, работает уже {elapsed}. От Codex давно нет сообщений.",
|
||
reply_to=message_id,
|
||
)
|
||
last_job_message_at = now
|
||
self.last_heartbeat_at = now
|
||
time.sleep(1)
|
||
finally:
|
||
with self.active_process_lock:
|
||
self.active_process = None
|
||
|
||
reader_done.wait(timeout=2)
|
||
|
||
if return_code != 0:
|
||
tail = "\n".join(output_lines[-40:])
|
||
raise RuntimeError(f"Codex exited with code {return_code}. Output tail:\n{tail}")
|
||
|
||
if output_file.exists():
|
||
answer = output_file.read_text(encoding="utf-8").strip()
|
||
try:
|
||
output_file.unlink(missing_ok=True)
|
||
except Exception:
|
||
pass
|
||
if answer:
|
||
return answer
|
||
|
||
fallback = self._extract_fallback_message(output_lines)
|
||
if not fallback:
|
||
raise RuntimeError("Codex returned empty response")
|
||
return fallback
|
||
|
||
def _stop_active_codex_process(self) -> bool:
|
||
with self.active_process_lock:
|
||
process = self.active_process
|
||
if process is None:
|
||
return False
|
||
if process.poll() is not None:
|
||
return False
|
||
process.terminate()
|
||
try:
|
||
process.wait(timeout=2)
|
||
except subprocess.TimeoutExpired:
|
||
process.kill()
|
||
return True
|
||
|
||
def _handle_job_failure(self, job: dict[str, Any], err: Exception) -> None:
|
||
job_id = job["id"]
|
||
job_num = job.get("num", "?")
|
||
chat_id = int(job["chat_id"])
|
||
message_id = int(job["message_id"])
|
||
error_text = str(err).strip() or err.__class__.__name__
|
||
user_error_text = self._user_error_text(err)
|
||
retryable = not isinstance(err, VoiceTranscriptionError) or err.retryable
|
||
log_error_text = err.log_text() if isinstance(err, VoiceTranscriptionError) else error_text
|
||
print(f"[py-bot] Ошибка job={job_id[:8]}: {log_error_text}", flush=True)
|
||
print(traceback.format_exc(), flush=True)
|
||
|
||
with self.queue_lock:
|
||
target = next((j for j in self.queue if j.get("id") == job_id), None)
|
||
if not target:
|
||
return
|
||
attempts = int(target.get("attempts", 0)) + 1
|
||
target["attempts"] = attempts
|
||
target["last_error"] = error_text[:1000]
|
||
target["updated_at"] = now_iso()
|
||
if retryable and attempts < self.cfg.max_retries:
|
||
target["status"] = "pending"
|
||
target["retry_reason"] = error_text[:200]
|
||
self._persist_queue()
|
||
will_retry = True
|
||
else:
|
||
self.queue = [j for j in self.queue if j.get("id") != job_id]
|
||
self._persist_queue()
|
||
will_retry = False
|
||
|
||
if will_retry:
|
||
self._safe_send(
|
||
chat_id,
|
||
f"{user_error_text}\nПовторю задачу #{job_num}: попытка {attempts + 1}/{self.cfg.max_retries}.",
|
||
reply_to=message_id,
|
||
)
|
||
else:
|
||
self._safe_send(chat_id, f"{user_error_text}\nЗадача #{job_num} остановлена.", reply_to=message_id)
|
||
|
||
def _user_error_text(self, err: Exception) -> str:
|
||
if isinstance(err, VoiceTranscriptionError):
|
||
return f"Не удалось распознать голосовое: {err.user_message}"
|
||
error_text = str(err).strip() or err.__class__.__name__
|
||
return f"Ошибка выполнения задачи: {error_text}"
|
||
|
||
def _mark_job_done(self, job_id: str) -> None:
|
||
with self.queue_lock:
|
||
self.queue = [j for j in self.queue if j.get("id") != job_id]
|
||
self._persist_queue()
|
||
|
||
def _mark_job_removed(self, job_id: str) -> None:
|
||
with self.queue_lock:
|
||
self.queue = [j for j in self.queue if j.get("id") != job_id]
|
||
self._persist_queue()
|
||
|
||
def _send_task_center_reminder(self, job: dict[str, Any]) -> None:
|
||
if job.get("chat_type") != "private":
|
||
return
|
||
username = job.get("username") or ""
|
||
reminder = self._task_center_counts_text(username)
|
||
if not reminder:
|
||
return
|
||
self._safe_send(int(job["chat_id"]), reminder, reply_to=int(job["message_id"]))
|
||
|
||
def _remember_public_report_chat(self, chat_id: int) -> None:
|
||
if self.state.get("public_report_chat_id") == chat_id:
|
||
return
|
||
self.state["public_report_chat_id"] = chat_id
|
||
self.state["updated_at"] = now_iso()
|
||
self._persist_state()
|
||
|
||
def _public_report_chat_id(self) -> int | str | None:
|
||
chat_id = self.state.get("public_report_chat_id")
|
||
if isinstance(chat_id, int):
|
||
return self._resolve_chat_id(chat_id)
|
||
if self.cfg.allowed_channel_username:
|
||
return f"@{self.cfg.allowed_channel_username}"
|
||
return None
|
||
|
||
def _send_private_job_public_report(self, job: dict[str, Any], answer: str) -> None:
|
||
if job.get("chat_type") != "private":
|
||
return
|
||
report_chat_id = self._public_report_chat_id()
|
||
if report_chat_id is None:
|
||
return
|
||
|
||
job_num = job.get("num", "?")
|
||
source_text = (job.get("text") or "").strip()
|
||
if not source_text:
|
||
source_text = "(пустой текст запроса)"
|
||
role = (job.get("role") or "owner").strip().lower()
|
||
author_label = "Айдар"
|
||
if role == "player":
|
||
player_name = (job.get("player_name") or "").strip() or job.get("username") or "Игрок"
|
||
author_label = f"{player_name} (@{job.get('username')})"
|
||
if job.get("type") == "voice":
|
||
voice_file_id = (job.get("telegram_file_id") or "").strip()
|
||
media_type = (job.get("telegram_media_type") or "voice").strip()
|
||
request_caption = self._trim_telegram_caption(
|
||
f"{author_label} сделал {media_type}-запрос, задача #{job_num}.\n\n"
|
||
f"Распознанный текст:\n{source_text}"
|
||
)
|
||
request_message_id = None
|
||
if voice_file_id:
|
||
request_message_id = self._safe_send_telegram_file(
|
||
report_chat_id,
|
||
voice_file_id,
|
||
media_type=media_type,
|
||
caption=request_caption,
|
||
)
|
||
if request_message_id is None:
|
||
request_message_id = self._safe_send(report_chat_id, request_caption)
|
||
else:
|
||
request_report = (
|
||
f"{author_label} сделал запрос, задача #{job_num}.\n\n"
|
||
f"{source_text}"
|
||
)
|
||
request_message_id = self._safe_send(report_chat_id, request_report)
|
||
if request_message_id is None:
|
||
return
|
||
|
||
answer_text = (answer or "").strip() or "(пустой ответ)"
|
||
answer_chunks = split_long_text(f"Ответ на задачу #{job_num}:\n\n{answer_text}")
|
||
for chunk in answer_chunks:
|
||
self._safe_send(report_chat_id, chunk, reply_to=request_message_id)
|
||
|
||
@staticmethod
|
||
def _trim_telegram_caption(text: str, limit: int = 1000) -> str:
|
||
text = (text or "").strip()
|
||
if len(text) <= limit:
|
||
return text
|
||
return text[:limit].rstrip() + "\n...[обрезано]"
|
||
|
||
def _safe_send_telegram_file(
|
||
self,
|
||
chat_id: int | str,
|
||
file_id: str,
|
||
*,
|
||
media_type: str = "voice",
|
||
caption: str = "",
|
||
reply_to: int | None = None,
|
||
) -> int | None:
|
||
file_id = (file_id or "").strip()
|
||
if not file_id:
|
||
return None
|
||
caption = self._trim_telegram_caption(caption)
|
||
resolved_chat_id: int | str = self._resolve_chat_id(chat_id) if isinstance(chat_id, int) else chat_id
|
||
resolved_reply_to = reply_to if resolved_chat_id == chat_id or isinstance(chat_id, str) else None
|
||
|
||
def send(target_chat_id: int | str, target_reply_to: int | None) -> dict[str, Any]:
|
||
if media_type == "audio":
|
||
return self.telegram.send_audio(target_chat_id, file_id, caption=caption, reply_to_message_id=target_reply_to)
|
||
return self.telegram.send_voice(target_chat_id, file_id, caption=caption, reply_to_message_id=target_reply_to)
|
||
|
||
try:
|
||
sent = send(resolved_chat_id, resolved_reply_to)
|
||
result = sent.get("result") or {}
|
||
message_id = result.get("message_id")
|
||
return message_id if isinstance(message_id, int) else None
|
||
except Exception as e:
|
||
migrate_to_chat_id = self._extract_migrate_to_chat_id(str(e))
|
||
if migrate_to_chat_id is not None:
|
||
if isinstance(resolved_chat_id, int):
|
||
self._remember_chat_migration(resolved_chat_id, migrate_to_chat_id, "send_file_error")
|
||
try:
|
||
sent = send(migrate_to_chat_id, None)
|
||
result = sent.get("result") or {}
|
||
message_id = result.get("message_id")
|
||
return message_id if isinstance(message_id, int) else None
|
||
except Exception as retry_error:
|
||
print(f"[py-bot] sendFile retry after migration error: {retry_error}", flush=True)
|
||
return None
|
||
print(f"[py-bot] sendFile error: {e}", flush=True)
|
||
return None
|
||
|
||
def _safe_send_voice_upload(
|
||
self,
|
||
chat_id: int | str,
|
||
voice_bytes: bytes,
|
||
filename: str,
|
||
*,
|
||
caption: str = "",
|
||
reply_to: int | None = None,
|
||
) -> int | None:
|
||
if not voice_bytes:
|
||
return None
|
||
caption = self._trim_telegram_caption(caption)
|
||
resolved_chat_id: int | str = self._resolve_chat_id(chat_id) if isinstance(chat_id, int) else chat_id
|
||
resolved_reply_to = reply_to if resolved_chat_id == chat_id or isinstance(chat_id, str) else None
|
||
|
||
def send(target_chat_id: int | str, target_reply_to: int | None) -> dict[str, Any]:
|
||
return self.telegram.send_voice_upload(
|
||
target_chat_id,
|
||
voice_bytes,
|
||
filename,
|
||
caption=caption,
|
||
reply_to_message_id=target_reply_to,
|
||
)
|
||
|
||
try:
|
||
sent = send(resolved_chat_id, resolved_reply_to)
|
||
result = sent.get("result") or {}
|
||
message_id = result.get("message_id")
|
||
return message_id if isinstance(message_id, int) else None
|
||
except Exception as e:
|
||
migrate_to_chat_id = self._extract_migrate_to_chat_id(str(e))
|
||
if migrate_to_chat_id is not None:
|
||
if isinstance(resolved_chat_id, int):
|
||
self._remember_chat_migration(resolved_chat_id, migrate_to_chat_id, "send_voice_upload_error")
|
||
try:
|
||
sent = send(migrate_to_chat_id, None)
|
||
result = sent.get("result") or {}
|
||
message_id = result.get("message_id")
|
||
return message_id if isinstance(message_id, int) else None
|
||
except Exception as retry_error:
|
||
print(f"[py-bot] sendVoiceUpload retry after migration error: {retry_error}", flush=True)
|
||
return None
|
||
print(f"[py-bot] sendVoiceUpload error: {e}", flush=True)
|
||
return None
|
||
|
||
def _safe_send(self, chat_id: int | str, text: str, reply_to: int | None = None) -> int | None:
|
||
text = (text or "").strip()
|
||
if not text:
|
||
return None
|
||
if len(text) > 3900:
|
||
text = text[:3900] + "\n...[обрезано]"
|
||
resolved_chat_id: int | str = self._resolve_chat_id(chat_id) if isinstance(chat_id, int) else chat_id
|
||
resolved_reply_to = reply_to if resolved_chat_id == chat_id or isinstance(chat_id, str) else None
|
||
try:
|
||
sent = self.telegram.send_message(resolved_chat_id, text, reply_to_message_id=resolved_reply_to)
|
||
result = sent.get("result") or {}
|
||
message_id = result.get("message_id")
|
||
return message_id if isinstance(message_id, int) else None
|
||
except Exception as e:
|
||
migrate_to_chat_id = self._extract_migrate_to_chat_id(str(e))
|
||
if migrate_to_chat_id is not None:
|
||
if isinstance(resolved_chat_id, int):
|
||
self._remember_chat_migration(resolved_chat_id, migrate_to_chat_id, "send_message_error")
|
||
try:
|
||
sent = self.telegram.send_message(migrate_to_chat_id, text, reply_to_message_id=None)
|
||
result = sent.get("result") or {}
|
||
message_id = result.get("message_id")
|
||
return message_id if isinstance(message_id, int) else None
|
||
except Exception as retry_error:
|
||
print(f"[py-bot] sendMessage retry after migration error: {retry_error}", flush=True)
|
||
return None
|
||
print(f"[py-bot] sendMessage error: {e}", flush=True)
|
||
return None
|
||
|
||
def _request_deferred_restart(self) -> None:
|
||
if self.restart_requested:
|
||
return
|
||
self.restart_requested = True
|
||
self._append_history_event("restart_service_deferred_scheduled", {})
|
||
with self.queue_lock:
|
||
has_active = any(j.get("status") == "active" for j in self.queue)
|
||
if not has_active:
|
||
threading.Thread(
|
||
target=lambda: self._exit_for_restart("deferred_restart_no_active_job"),
|
||
name="shine-py-bot-deferred-restart",
|
||
daemon=True,
|
||
).start()
|
||
|
||
def _exit_for_restart(self, reason: str) -> None:
|
||
print(f"[py-bot] restart now: {reason}", flush=True)
|
||
self._append_history_event("restart_service_executing", {"reason": reason})
|
||
time.sleep(0.5)
|
||
os._exit(0)
|
||
|
||
def _schedule_self_restart(self, reason: str = "restart_requested", *, force: bool = False) -> None:
|
||
if self.restart_requested and not force:
|
||
return
|
||
self.restart_requested = True
|
||
|
||
def restart() -> None:
|
||
time.sleep(1.5)
|
||
print(f"[py-bot] restart requested by Telegram command: {reason}", flush=True)
|
||
if force:
|
||
self._stop_active_codex_process()
|
||
os._exit(0)
|
||
|
||
threading.Thread(target=restart, name="shine-py-bot-self-restart", daemon=True).start()
|
||
|
||
def _voice_reply_targets(self, job: dict[str, Any]) -> list[tuple[int | str, int | None, str]]:
|
||
chat_id = int(job["chat_id"])
|
||
message_id = int(job["message_id"])
|
||
targets: list[tuple[int | str, int | None, str]] = [(chat_id, message_id, "source")]
|
||
username = job.get("username") or ""
|
||
|
||
private_chat_id = self._private_chat_id_for_user(username)
|
||
if private_chat_id is not None and private_chat_id != self._resolve_chat_id(chat_id):
|
||
targets.append((private_chat_id, None, "private"))
|
||
|
||
report_chat_id = self._public_report_chat_id()
|
||
if report_chat_id is not None:
|
||
resolved_report_chat_id = self._resolve_chat_id(report_chat_id) if isinstance(report_chat_id, int) else report_chat_id
|
||
resolved_current_chat_id: int | str = self._resolve_chat_id(chat_id)
|
||
if resolved_report_chat_id != resolved_current_chat_id:
|
||
targets.append((report_chat_id, None, "public"))
|
||
|
||
deduped: list[tuple[int | str, int | None, str]] = []
|
||
seen: set[str] = set()
|
||
for target_chat_id, target_reply_to, label in targets:
|
||
resolved: int | str = self._resolve_chat_id(target_chat_id) if isinstance(target_chat_id, int) else target_chat_id
|
||
key = str(resolved)
|
||
if key in seen:
|
||
continue
|
||
seen.add(key)
|
||
deduped.append((target_chat_id, target_reply_to, label))
|
||
return deduped
|
||
|
||
def _send_voice_reply_for_answer(
|
||
self,
|
||
job: dict[str, Any],
|
||
answer: str,
|
||
history_path: Path,
|
||
job_id: str,
|
||
) -> None:
|
||
chat_id = int(job["chat_id"])
|
||
message_id = int(job["message_id"])
|
||
job_num = job.get("num", "?")
|
||
username = job.get("username") or ""
|
||
if not self.cfg.openai_api_key:
|
||
note = "не настроен ключ OpenAI для озвучивания."
|
||
self._append_history(history_path, "voice_reply_failed", {"jobId": job_id, "jobNum": job_num, "error": note})
|
||
self._safe_send(chat_id, f"Озвучивание включено, но {note}", reply_to=message_id)
|
||
return
|
||
|
||
voice_text = answer
|
||
rewrite_enabled = self._voice_rewrite_enabled(username)
|
||
if rewrite_enabled:
|
||
try:
|
||
voice_text = self._openai_rewrite_text_for_voice(answer)
|
||
self._append_history(history_path, "voice_rewrite_done", {
|
||
"jobId": job_id,
|
||
"jobNum": job_num,
|
||
"model": self.cfg.openai_voice_rewrite_model,
|
||
"sourceChars": len(answer or ""),
|
||
"resultChars": len(voice_text or ""),
|
||
})
|
||
except VoiceReplyError as e:
|
||
self._append_history(history_path, "voice_rewrite_failed", {
|
||
"jobId": job_id,
|
||
"jobNum": job_num,
|
||
"model": self.cfg.openai_voice_rewrite_model,
|
||
"error": str(e),
|
||
})
|
||
self._safe_send(
|
||
chat_id,
|
||
f"Не удалось адаптировать ответ #{job_num} для озвучки, озвучиваю обычный текст: {e}",
|
||
reply_to=message_id,
|
||
)
|
||
voice_text = answer
|
||
|
||
chunks = split_text_for_tts(voice_text, self.cfg.openai_tts_chunk_chars)
|
||
if not chunks:
|
||
return
|
||
|
||
sent_count = 0
|
||
total = len(chunks)
|
||
targets = self._voice_reply_targets(job)
|
||
print(f"[py-bot] tts start job={str(job_id)[:8]} chunks={total} targets={len(targets)} rewrite={rewrite_enabled}", flush=True)
|
||
for index, chunk in enumerate(chunks, start=1):
|
||
try:
|
||
audio = self._openai_tts(chunk)
|
||
except VoiceReplyError as e:
|
||
self._append_history(history_path, "voice_reply_failed", {
|
||
"jobId": job_id,
|
||
"jobNum": job_num,
|
||
"part": index,
|
||
"parts": total,
|
||
"error": str(e),
|
||
})
|
||
self._safe_send(chat_id, f"Не удалось озвучить ответ #{job_num}: {e}", reply_to=message_id)
|
||
return
|
||
caption = f"Озвучка ответа #{job_num}"
|
||
if total > 1:
|
||
caption += f", часть {index}/{total}"
|
||
for target_chat_id, target_reply_to, target_label in targets:
|
||
message_sent = self._safe_send_voice_upload(
|
||
target_chat_id,
|
||
audio,
|
||
f"shine-answer-{job_num}-{index}.ogg",
|
||
caption=caption,
|
||
reply_to=target_reply_to,
|
||
)
|
||
if message_sent is None:
|
||
self._append_history(history_path, "voice_reply_failed", {
|
||
"jobId": job_id,
|
||
"jobNum": job_num,
|
||
"part": index,
|
||
"parts": total,
|
||
"target": target_label,
|
||
"error": "Telegram не принял voice-файл озвучки.",
|
||
})
|
||
if target_label == "source":
|
||
self._safe_send(chat_id, f"Озвучка ответа #{job_num} создана, но Telegram не принял voice-файл.", reply_to=message_id)
|
||
continue
|
||
sent_count += 1
|
||
self._append_history(history_path, "voice_reply_sent", {
|
||
"jobId": job_id,
|
||
"jobNum": job_num,
|
||
"parts": total,
|
||
"messages": sent_count,
|
||
"targets": len(targets),
|
||
"rewriteEnabled": rewrite_enabled,
|
||
})
|
||
print(f"[py-bot] tts done job={str(job_id)[:8]} sent={sent_count}", flush=True)
|
||
|
||
def _openai_rewrite_text_for_voice(self, text: str) -> str:
|
||
source = (text or "").strip()
|
||
if not source:
|
||
return ""
|
||
if len(source) > self.cfg.openai_voice_rewrite_max_input_chars:
|
||
source = source[:self.cfg.openai_voice_rewrite_max_input_chars].rstrip() + "\n\n...[текстовый ответ был длиннее и обрезан для голосовой версии]"
|
||
payload = {
|
||
"model": self.cfg.openai_voice_rewrite_model,
|
||
"messages": [
|
||
{
|
||
"role": "system",
|
||
"content": (
|
||
"Ты готовишь русскую версию финального ответа технического агента для озвучивания. "
|
||
"Не пересказывай заново и не меняй смысл: сохрани порядок мыслей, итог, предупреждения, статусы и важные действия. "
|
||
"Мягко убери только то, что плохо воспринимается на слух: длинные пути, хэши, ID, команды, JSON, "
|
||
"длинные списки файлов, точные размеры и счётчики символов. Если деталь важна, замени её коротким описанием. "
|
||
"Не добавляй новых фактов. Пиши естественно, без markdown, близко к исходному тексту."
|
||
),
|
||
},
|
||
{
|
||
"role": "user",
|
||
"content": f"Переделай этот финальный текст в вариант для озвучки:\n\n{source}",
|
||
},
|
||
],
|
||
"temperature": 0.2,
|
||
"max_tokens": self.cfg.openai_voice_rewrite_max_output_tokens,
|
||
}
|
||
data = json.dumps(payload, ensure_ascii=False).encode("utf-8")
|
||
req = request.Request("https://api.openai.com/v1/chat/completions", method="POST", data=data)
|
||
req.add_header("Authorization", f"Bearer {self.cfg.openai_api_key}")
|
||
req.add_header("Content-Type", "application/json")
|
||
try:
|
||
with request.urlopen(req, timeout=self.cfg.openai_voice_rewrite_timeout_seconds) as resp:
|
||
raw = resp.read().decode("utf-8", errors="replace")
|
||
except TimeoutError as e:
|
||
raise VoiceReplyError(
|
||
f"OpenAI не успел адаптировать текст за {self.cfg.openai_voice_rewrite_timeout_seconds} секунд."
|
||
) from e
|
||
except error.HTTPError as e:
|
||
detail = e.read().decode("utf-8", errors="replace")
|
||
if e.code == 401:
|
||
message = "OpenAI отклонил ключ API для адаптации текста."
|
||
elif e.code == 429:
|
||
message = "OpenAI временно ограничил адаптацию текста из-за лимита запросов."
|
||
elif e.code >= 500:
|
||
message = "OpenAI временно не смог адаптировать текст."
|
||
else:
|
||
message = f"OpenAI вернул ошибку HTTP {e.code} при адаптации текста."
|
||
if detail:
|
||
message = f"{message} Детали: {detail[:500]}"
|
||
raise VoiceReplyError(message) from e
|
||
except error.URLError as e:
|
||
raise VoiceReplyError(f"не удалось отправить текст в OpenAI для адаптации из-за сетевой ошибки: {e.reason}") from e
|
||
|
||
try:
|
||
body = json.loads(raw)
|
||
content = (((body.get("choices") or [{}])[0].get("message") or {}).get("content") or "").strip()
|
||
except Exception as e:
|
||
raise VoiceReplyError("OpenAI вернул неразборчивый ответ при адаптации текста.") from e
|
||
if not content:
|
||
raise VoiceReplyError("OpenAI вернул пустой текст адаптации.")
|
||
return content
|
||
|
||
def _openai_tts(self, text: str) -> bytes:
|
||
payload = {
|
||
"model": self.cfg.openai_tts_model,
|
||
"voice": self.cfg.openai_tts_voice,
|
||
"input": text,
|
||
"response_format": self.cfg.openai_tts_response_format,
|
||
}
|
||
data = json.dumps(payload, ensure_ascii=False).encode("utf-8")
|
||
req = request.Request("https://api.openai.com/v1/audio/speech", method="POST", data=data)
|
||
req.add_header("Authorization", f"Bearer {self.cfg.openai_api_key}")
|
||
req.add_header("Content-Type", "application/json")
|
||
try:
|
||
with request.urlopen(req, timeout=self.cfg.openai_tts_timeout_seconds) as resp:
|
||
audio = resp.read()
|
||
except TimeoutError as e:
|
||
raise VoiceReplyError(f"OpenAI не успел сгенерировать речь за {self.cfg.openai_tts_timeout_seconds} секунд.") from e
|
||
except error.HTTPError as e:
|
||
detail = e.read().decode("utf-8", errors="replace")
|
||
if e.code == 401:
|
||
message = "OpenAI отклонил ключ API для озвучивания."
|
||
elif e.code == 429:
|
||
message = "OpenAI временно ограничил озвучивание из-за лимита запросов."
|
||
elif e.code >= 500:
|
||
message = "OpenAI временно не смог сгенерировать речь."
|
||
else:
|
||
message = f"OpenAI вернул ошибку HTTP {e.code} при озвучивании."
|
||
if detail:
|
||
message = f"{message} Детали: {detail[:500]}"
|
||
raise VoiceReplyError(message) from e
|
||
except error.URLError as e:
|
||
raise VoiceReplyError(f"не удалось отправить текст в OpenAI TTS из-за сетевой ошибки: {e.reason}") from e
|
||
if not audio:
|
||
raise VoiceReplyError("OpenAI вернул пустой аудиофайл.")
|
||
return audio
|
||
|
||
def _transcribe_voice_job(self, job: dict[str, Any]) -> str:
|
||
if not self.cfg.openai_api_key:
|
||
raise VoiceTranscriptionError(
|
||
"не настроен ключ OpenAI для распознавания.",
|
||
stage="config",
|
||
retryable=False,
|
||
)
|
||
file_id = (job.get("telegram_file_id") or "").strip()
|
||
if not file_id:
|
||
raise VoiceTranscriptionError(
|
||
"Telegram не передал идентификатор файла.",
|
||
stage="telegram_file_id",
|
||
retryable=False,
|
||
)
|
||
job_id = str(job.get("id") or "")[:8]
|
||
job_num = job.get("num", "?")
|
||
media_type = (job.get("telegram_media_type") or "voice").strip()
|
||
started_at = time.time()
|
||
print(f"[py-bot] transcribe start job={job_id} num={job_num} media={media_type}", flush=True)
|
||
file_bytes, filename = self._download_telegram_file(file_id)
|
||
print(
|
||
f"[py-bot] transcribe downloaded job={job_id} filename={filename} size={len(file_bytes)} bytes",
|
||
flush=True,
|
||
)
|
||
text = self._openai_transcribe(file_bytes, filename).strip()
|
||
if not text:
|
||
raise VoiceTranscriptionError(
|
||
"сервис распознавания вернул пустой текст. Возможно, в записи нет слышимой речи или качество звука слишком низкое.",
|
||
stage="empty_text",
|
||
retryable=False,
|
||
)
|
||
elapsed = self._format_duration(int(time.time() - started_at))
|
||
print(f"[py-bot] transcribe done job={job_id} chars={len(text)} elapsed={elapsed}", flush=True)
|
||
return text
|
||
|
||
def _download_telegram_file(self, file_id: str) -> tuple[bytes, str]:
|
||
try:
|
||
result = self.telegram.call("getFile", {"file_id": file_id}, timeout=120)
|
||
except TimeoutError as e:
|
||
raise VoiceTranscriptionError(
|
||
"Telegram долго не отдавал информацию о файле.",
|
||
stage="telegram_get_file_timeout",
|
||
detail=str(e),
|
||
) from e
|
||
except Exception as e:
|
||
raise VoiceTranscriptionError(
|
||
"не удалось получить информацию о файле из Telegram.",
|
||
stage="telegram_get_file",
|
||
detail=str(e),
|
||
) from e
|
||
info = result.get("result") or {}
|
||
file_path = info.get("file_path")
|
||
if not file_path:
|
||
raise VoiceTranscriptionError(
|
||
"Telegram не вернул путь к файлу.",
|
||
stage="telegram_file_path",
|
||
retryable=True,
|
||
detail=json.dumps(info, ensure_ascii=False)[:1000],
|
||
)
|
||
file_url = f"https://api.telegram.org/file/bot{self.cfg.telegram_bot_token}/{file_path}"
|
||
req = request.Request(file_url, method="GET")
|
||
try:
|
||
with request.urlopen(req, timeout=self.cfg.telegram_file_download_timeout_seconds) as resp:
|
||
data = resp.read()
|
||
except TimeoutError as e:
|
||
raise VoiceTranscriptionError(
|
||
f"Telegram не успел отдать аудиофайл за {self.cfg.telegram_file_download_timeout_seconds} секунд.",
|
||
stage="telegram_download_timeout",
|
||
detail=str(e),
|
||
) from e
|
||
except error.HTTPError as e:
|
||
detail = e.read().decode("utf-8", errors="replace")
|
||
raise VoiceTranscriptionError(
|
||
f"Telegram вернул ошибку HTTP {e.code} при скачивании аудио.",
|
||
stage="telegram_download_http",
|
||
retryable=e.code >= 500 or e.code == 429,
|
||
detail=detail[:1000],
|
||
) from e
|
||
except error.URLError as e:
|
||
raise VoiceTranscriptionError(
|
||
"не удалось скачать аудиофайл из Telegram из-за сетевой ошибки.",
|
||
stage="telegram_download_network",
|
||
detail=str(e.reason),
|
||
) from e
|
||
if not data:
|
||
raise VoiceTranscriptionError(
|
||
"Telegram отдал пустой аудиофайл.",
|
||
stage="telegram_download_empty",
|
||
retryable=True,
|
||
)
|
||
original_name = Path(file_path).name or "audio.ogg"
|
||
lower = original_name.lower()
|
||
# OpenAI transcription может не принимать расширение .oga, нормализуем в .ogg.
|
||
if lower.endswith(".oga"):
|
||
base = original_name[:-4] if len(original_name) > 4 else "audio"
|
||
normalized = f"{base}.ogg"
|
||
else:
|
||
normalized = original_name
|
||
return data, normalized
|
||
|
||
def _openai_transcribe(self, file_bytes: bytes, filename: str) -> str:
|
||
boundary = "----shine-boundary-" + "".join(random.choices("abcdef0123456789", k=16))
|
||
mime = mimetypes.guess_type(filename)[0] or "application/octet-stream"
|
||
|
||
def text_part(name: str, value: str) -> bytes:
|
||
return (
|
||
f"--{boundary}\r\n"
|
||
f'Content-Disposition: form-data; name="{name}"\r\n\r\n'
|
||
f"{value}\r\n"
|
||
).encode("utf-8")
|
||
|
||
body = bytearray()
|
||
body.extend(text_part("model", self.cfg.openai_transcribe_model))
|
||
body.extend(text_part("response_format", "text"))
|
||
body.extend(
|
||
(
|
||
f"--{boundary}\r\n"
|
||
f'Content-Disposition: form-data; name="file"; filename="{filename}"\r\n'
|
||
f"Content-Type: {mime}\r\n\r\n"
|
||
).encode("utf-8")
|
||
)
|
||
body.extend(file_bytes)
|
||
body.extend(b"\r\n")
|
||
body.extend(f"--{boundary}--\r\n".encode("utf-8"))
|
||
|
||
req = request.Request("https://api.openai.com/v1/audio/transcriptions", method="POST", data=bytes(body))
|
||
req.add_header("Authorization", f"Bearer {self.cfg.openai_api_key}")
|
||
req.add_header("Content-Type", f"multipart/form-data; boundary={boundary}")
|
||
try:
|
||
with request.urlopen(req, timeout=self.cfg.openai_transcribe_timeout_seconds) as resp:
|
||
return resp.read().decode("utf-8", errors="replace")
|
||
except TimeoutError as e:
|
||
raise VoiceTranscriptionError(
|
||
f"OpenAI не успел распознать аудио за {self.cfg.openai_transcribe_timeout_seconds} секунд.",
|
||
stage="openai_transcribe_timeout",
|
||
detail=str(e),
|
||
) from e
|
||
except error.HTTPError as e:
|
||
detail = e.read().decode("utf-8", errors="replace")
|
||
if e.code == 400:
|
||
user_message = "OpenAI не принял аудиофайл для распознавания."
|
||
elif e.code == 401:
|
||
user_message = "OpenAI отклонил ключ API для распознавания."
|
||
elif e.code == 413:
|
||
user_message = "аудиофайл слишком большой для распознавания OpenAI."
|
||
elif e.code == 429:
|
||
user_message = "OpenAI временно ограничил распознавание из-за лимита запросов."
|
||
elif e.code >= 500:
|
||
user_message = "OpenAI временно не смог обработать распознавание."
|
||
else:
|
||
user_message = f"OpenAI вернул ошибку HTTP {e.code} при распознавании."
|
||
raise VoiceTranscriptionError(
|
||
user_message,
|
||
stage="openai_transcribe_http",
|
||
retryable=e.code == 429 or e.code >= 500,
|
||
detail=detail[:1500],
|
||
) from e
|
||
except error.URLError as e:
|
||
raise VoiceTranscriptionError(
|
||
"не удалось отправить аудио в OpenAI из-за сетевой ошибки.",
|
||
stage="openai_transcribe_network",
|
||
detail=str(e.reason),
|
||
) from e
|
||
|
||
@staticmethod
|
||
def _extract_codex_user_note(line: str) -> str | None:
|
||
s = (line or "").strip()
|
||
if not s.startswith("{"):
|
||
return None
|
||
try:
|
||
obj = json.loads(s)
|
||
except Exception:
|
||
return None
|
||
if obj.get("type") != "item.completed":
|
||
return None
|
||
item = obj.get("item") or {}
|
||
if item.get("type") != "agent_message":
|
||
return None
|
||
text = (item.get("text") or "").strip()
|
||
if not text:
|
||
return None
|
||
if len(text) > 220:
|
||
return text[:220] + "..."
|
||
return text
|
||
|
||
@staticmethod
|
||
def _extract_fallback_message(lines: list[str]) -> str:
|
||
for line in reversed(lines):
|
||
line = line.strip()
|
||
if not line:
|
||
continue
|
||
if line.startswith("{") and '"type":' in line:
|
||
continue
|
||
if line.startswith("mcp:") or line.startswith("OpenAI Codex"):
|
||
continue
|
||
return line
|
||
return ""
|
||
|
||
@staticmethod
|
||
def _format_duration(seconds: int) -> str:
|
||
seconds = max(0, seconds)
|
||
minutes, sec = divmod(seconds, 60)
|
||
hours, minutes = divmod(minutes, 60)
|
||
if hours:
|
||
return f"{hours}ч {minutes}м {sec}с"
|
||
if minutes:
|
||
return f"{minutes}м {sec}с"
|
||
return f"{sec}с"
|
||
|
||
|
||
def run_selftest(config: BotConfig, prompt: str) -> int:
|
||
cmd = [
|
||
str(config.codex_bin),
|
||
"exec",
|
||
"--dangerously-bypass-approvals-and-sandbox",
|
||
"--json",
|
||
"-C", str(config.codex_workdir),
|
||
prompt,
|
||
]
|
||
proc = subprocess.run(
|
||
cmd,
|
||
stdin=subprocess.DEVNULL,
|
||
capture_output=True,
|
||
text=True,
|
||
encoding="utf-8",
|
||
errors="replace",
|
||
)
|
||
print(proc.stdout)
|
||
if proc.stderr:
|
||
print(proc.stderr)
|
||
return proc.returncode
|
||
|
||
|
||
def main() -> int:
|
||
parser = argparse.ArgumentParser(description="SHiNE Python Telegram bot wrapper for Codex CLI")
|
||
parser.add_argument("--selftest-codex", default="", help="Выполнить только codex exec с этим prompt и выйти")
|
||
args = parser.parse_args()
|
||
|
||
root = Path(__file__).resolve().parent
|
||
cfg = BotConfig(root)
|
||
if args.selftest_codex:
|
||
return run_selftest(cfg, args.selftest_codex)
|
||
|
||
service = ShinePyBotService(cfg)
|
||
try:
|
||
service.run()
|
||
except KeyboardInterrupt:
|
||
service.shutdown()
|
||
return 0
|
||
except Exception as e:
|
||
print(f"[py-bot] FATAL: {e}", flush=True)
|
||
return 1
|
||
return 0
|
||
|
||
|
||
if __name__ == "__main__":
|
||
raise SystemExit(main())
|