#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Telegram TUI Client Консольный клиент Telegram на базе urwid """ import urwid import asyncio import os import nest_asyncio import unicodedata import emoji from telethon import TelegramClient, events, utils from telethon.errors import SessionPasswordNeededError from dotenv import load_dotenv import datetime from PIL import Image import io import hashlib import json import shutil from pathlib import Path # Разрешаем вложенные event loops nest_asyncio.apply() def normalize_text(text: str) -> str: """Нормализует текст для корректного отображения""" if not text: return "" try: # Преобразуем в строку, если это не строка text = str(text) # Удаляем эмодзи text = emoji.replace_emoji(text, '') # Нормализуем Unicode text = unicodedata.normalize('NFKC', text) # Заменяем специальные символы на их ASCII-эквиваленты text = text.replace('—', '-').replace('–', '-').replace('…', '...') # Удаляем все управляющие символы, кроме новой строки и табуляции text = ''.join(char for char in text if unicodedata.category(char)[0] != 'C' or char in ('\n', '\t')) # Удаляем множественные пробелы text = ' '.join(text.split()) return text except Exception as e: print(f"Ошибка нормализации текста: {e}") return "Ошибка отображения" def image_to_ascii(image_data, max_width=80, max_height=24): """Конвертирует изображение в ASCII-арт""" try: # Открываем изображение из байтов image = Image.open(io.BytesIO(image_data)) # Конвертируем в оттенки серого image = image.convert('L') # Определяем новые размеры с сохранением пропорций width, height = image.size aspect_ratio = height/width new_width = min(max_width, width) new_height = int(new_width * aspect_ratio * 0.5) # * 0.5 потому что символы в терминале выше, чем шире if new_height > max_height: new_height = max_height new_width = int(new_height / aspect_ratio * 2) # Изменяем размер image = image.resize((new_width, new_height)) # Символы от темного к светлому ascii_chars = '@%#*+=-:. ' # Конвертируем пиксели в ASCII pixels = image.getdata() ascii_str = '' for i, pixel in enumerate(pixels): ascii_str += ascii_chars[pixel//32] # 256//32 = 8 уровней if (i + 1) % new_width == 0: ascii_str += '\n' # Добавляем рамку вокруг ASCII-арта для стабильности lines = ascii_str.split('\n') if lines and lines[-1] == '': lines = lines[:-1] # Удаляем последнюю пустую строку max_line_length = max(len(line) for line in lines) border_top = '┌' + '─' * max_line_length + '┐\n' border_bottom = '└' + '─' * max_line_length + '┘' framed_ascii = border_top for line in lines: padding = ' ' * (max_line_length - len(line)) framed_ascii += '│' + line + padding + '│\n' framed_ascii += border_bottom return framed_ascii except Exception as e: print(f"Ошибка конвертации изображения: {e}") return "[Ошибка конвертации изображения]" class AsciiArtCache: """Класс для кэширования ASCII-артов""" def __init__(self, cache_dir='pics'): self.cache_dir = cache_dir os.makedirs(cache_dir, exist_ok=True) self.index_file = os.path.join(cache_dir, 'index.json') self.load_index() def load_index(self): """Загружает индекс кэшированных изображений""" try: with open(self.index_file, 'r') as f: self.index = json.load(f) except (FileNotFoundError, json.JSONDecodeError): self.index = {} self.save_index() def save_index(self): """Сохраняет индекс кэшированных изображений""" with open(self.index_file, 'w') as f: json.dump(self.index, f) def get_cache_key(self, image_data): """Генерирует ключ кэша для изображения""" return hashlib.md5(image_data).hexdigest() def get_cached_art(self, image_data): """Получает ASCII-арт из кэша или создает новый""" cache_key = self.get_cache_key(image_data) cache_file = os.path.join(self.cache_dir, f"{cache_key}.txt") # Проверяем наличие в кэше if os.path.exists(cache_file): try: with open(cache_file, 'r') as f: return f.read() except Exception as e: print(f"Ошибка чтения кэша ASCII: {e}") # Если нет в кэше, создаем новый ascii_art = image_to_ascii(image_data) # Сохраняем в кэш try: with open(cache_file, 'w') as f: f.write(ascii_art) # Обновляем индекс self.index[cache_key] = { 'created_at': datetime.datetime.now().isoformat(), 'size': len(image_data) } self.save_index() except Exception as e: print(f"Ошибка сохранения ASCII в кэш: {e}") return ascii_art class MediaCache: """Класс для кэширования медиафайлов""" def __init__(self, cache_dir='cache', max_size_mb=1000): self.cache_dir = Path(cache_dir) self.files_dir = self.cache_dir / 'files' self.index_file = self.cache_dir / 'index.json' self.max_size = max_size_mb * 1024 * 1024 # Конвертируем в байты self.current_size = 0 # Создаем директории self.files_dir.mkdir(parents=True, exist_ok=True) # Загружаем индекс self.load_index() def load_index(self): """Загружает индекс кэшированных файлов""" try: with open(self.index_file, 'r') as f: self.index = json.load(f) # Подсчитываем текущий размер кэша self.current_size = sum(item['size'] for item in self.index.values()) except (FileNotFoundError, json.JSONDecodeError): self.index = {} self.current_size = 0 self.save_index() def save_index(self): """Сохраняет индекс кэшированных файлов""" with open(self.index_file, 'w') as f: json.dump(self.index, f) def get_cache_key(self, data): """Генерирует ключ кэша""" return hashlib.md5(data).hexdigest() def cleanup(self, needed_space=0): """Очищает старые файлы для освобождения места""" if self.current_size + needed_space <= self.max_size: return True # Сортируем файлы по времени последнего доступа files = [(k, v) for k, v in self.index.items()] files.sort(key=lambda x: x[1]['last_access']) # Удаляем старые файлы, пока не освободится достаточно места for key, info in files: file_path = self.files_dir / f"{key}{info['ext']}" try: file_path.unlink() self.current_size -= info['size'] del self.index[key] if self.current_size + needed_space <= self.max_size: self.save_index() return True except Exception as e: print(f"Ошибка при удалении файла {file_path}: {e}") return False def get_cached_file(self, file_data, file_type): """Получает файл из кэша или сохраняет новый""" cache_key = self.get_cache_key(file_data) # Проверяем наличие в кэше if cache_key in self.index: info = self.index[cache_key] file_path = self.files_dir / f"{cache_key}{info['ext']}" if file_path.exists(): # Обновляем время последнего доступа info['last_access'] = datetime.datetime.now().isoformat() self.save_index() return file_path # Определяем расширение файла ext = self.get_extension_for_type(file_type) # Проверяем и освобождаем место если нужно if not self.cleanup(len(file_data)): print("Недостаточно места в кэше") return None # Сохраняем файл file_path = self.files_dir / f"{cache_key}{ext}" try: with open(file_path, 'wb') as f: f.write(file_data) # Обновляем индекс self.index[cache_key] = { 'type': file_type, 'ext': ext, 'size': len(file_data), 'created_at': datetime.datetime.now().isoformat(), 'last_access': datetime.datetime.now().isoformat() } self.current_size += len(file_data) self.save_index() return file_path except Exception as e: print(f"Ошибка сохранения файла: {e}") return None def get_extension_for_type(self, file_type): """Возвращает расширение файла для типа медиа""" extensions = { 'photo': '.jpg', 'video': '.mp4', 'audio': '.ogg', 'voice': '.ogg', 'document': '', # Будет использовано оригинальное расширение 'sticker': '.webp', 'gif': '.gif' } return extensions.get(file_type, '') class ChatWidget(urwid.WidgetWrap): """Виджет чата""" def __init__(self, chat_id, name, message="", is_selected=False, folder=0): self.chat_id = chat_id self.name = normalize_text(name) self.message = normalize_text(message) self.is_selected = is_selected self.folder = folder self.has_focus = False # Создаем содержимое виджета self.update_widget() super().__init__(self.widget) def update_widget(self): """Обновляет внешний вид виджета""" # Подготавливаем данные name = self.name if self.name else "Без названия" msg = self.message if self.message else "Нет сообщений" if len(msg) > 50: msg = msg[:47] + "..." # Добавляем метку папки если нужно if self.folder == 1: name += " [Архив]" # Получаем первую букву для аватара first_letter = next((c for c in name if c.isprintable()), "?") # Определяем стиль if self.has_focus: style = 'selected' elif self.is_selected: style = 'chat_selected' else: style = 'chat' # Создаем виджеты avatar = urwid.AttrMap( urwid.Text(f" {first_letter} ", align='center'), style ) content = urwid.Pile([ urwid.AttrMap( urwid.Text(name), style ), urwid.AttrMap( urwid.Text(msg), style ) ]) self.widget = urwid.AttrMap( urwid.Columns([ ('fixed', 3, avatar), content ]), None ) def selectable(self): return True def render(self, size, focus=False): if self.has_focus != focus: self.has_focus = focus self.update_widget() return super().render(size, focus) def keypress(self, size, key): if key == 'enter': return key elif key == 'tab': return key elif key in ('up', 'down'): return key return key class MessageWidget(urwid.WidgetWrap): """Виджет сообщения""" def __init__(self, message_id, text="", username="", is_me=False, send_time="", status="", is_selected=False, media_data=None): self.message_id = message_id self.text = normalize_text(text) self.username = normalize_text(username) self.is_me = is_me self.send_time = send_time self.status = status self.is_selected = is_selected self._media_data = None self._cached_content = None self.set_media_data(media_data) # Создаем содержимое виджета self.update_widget() super().__init__(self.widget) def set_media_data(self, media_data): """Устанавливает медиа-данные и обновляет кэш""" if self._media_data != media_data: self._media_data = media_data self._cached_content = None def get_content(self): """Получает содержимое сообщения с кэшированием""" if self._cached_content is None: text = self.text if self.text else "Пустое сообщение" if self._media_data: text = self._media_data + "\n" + text self._cached_content = text return self._cached_content def update_widget(self): """Обновляет внешний вид виджета""" # Подготавливаем текст username = self.username if self.username else "Неизвестный" # Добавляем статус к времени для исходящих сообщений time_text = self.send_time if self.is_me and self.status: time_text = f"{self.send_time} {self.status}" # Создаем заголовок header = urwid.Columns([ urwid.Text(username), ('fixed', 10 if self.is_me else 5, urwid.Text(time_text, align='right')) ]) # Определяем стиль style = 'message_selected' if self.is_selected else ('message_me' if self.is_me else 'message_other') # Создаем виджет с фиксированной шириной для ASCII-арта content = urwid.Text(self.get_content()) if self._media_data: content = urwid.BoxAdapter(urwid.Filler(content), len(self._media_data.split('\n'))) # Создаем виджет self.widget = urwid.AttrMap( urwid.Pile([ urwid.AttrMap(header, 'chat_name'), content ]), style ) def selectable(self): return True def keypress(self, size, key): if key == 'ctrl r': return 'reply' return key class SearchEdit(urwid.Edit): """Виджет поиска с отложенным обновлением""" def __init__(self, *args, **kwargs): self.search_callback = kwargs.pop('search_callback', None) self.search_delay = 0.5 # Задержка поиска в секундах self.last_search = 0 self.pending_search = None super().__init__(*args, **kwargs) def keypress(self, size, key): if key in ('up', 'down', 'esc', 'enter', 'tab'): return key result = super().keypress(size, key) # Отменяем предыдущий отложенный поиск if self.pending_search: self.pending_search.cancel() # Создаем новый отложенный поиск if self.search_callback and result is None: async def delayed_search(): try: await asyncio.sleep(self.search_delay) await self.search_callback() except asyncio.CancelledError: pass except Exception as e: print(f"Ошибка отложенного поиска: {e}") self.pending_search = asyncio.create_task(delayed_search()) return result class InputEdit(urwid.Edit): def __init__(self, *args, **kwargs): self.telegram_client = kwargs.pop('telegram_client', None) self.chat_id = kwargs.pop('chat_id', None) self.users_cache = {} # username -> (full_name, user_id) self.completion_state = None # (start_pos, partial, matches, current_index) self.parent = None super().__init__(*args, **kwargs) async def update_users_cache(self): """Обновляет кэш пользователей для текущего чата""" try: if not self.chat_id or not self.telegram_client: return # Получаем участников чата participants = await self.telegram_client.get_participants(self.chat_id) # Обновляем кэш for user in participants: if user.username: full_name = f"{user.first_name}" if user.last_name: full_name += f" {user.last_name}" self.users_cache[user.username.lower()] = (full_name, user.id) except Exception as e: print(f"Ошибка обновления кэша пользователей: {e}") def find_username_matches(self, partial): """Находит совпадения для частичного имени пользователя""" partial = partial.lower() matches = [] for username in self.users_cache: if username.startswith(partial): full_name, _ = self.users_cache[username] matches.append((username, full_name)) return sorted(matches, key=lambda x: len(x[0])) # Сортируем по длине юзернейма def show_completion_menu(self, matches, start_pos, partial): """Показывает меню автодополнения""" if not matches: self.completion_state = None return # Сохраняем состояние автодополнения self.completion_state = (start_pos, partial, matches, 0) # Уведомляем родителя о необходимости показать меню if self.parent and hasattr(self.parent, 'show_completion_overlay'): self.parent.show_completion_overlay(matches) def hide_completion_menu(self): """Скрывает меню автодополнения""" self.completion_state = None if self.parent and hasattr(self.parent, 'hide_completion_overlay'): self.parent.hide_completion_overlay() def keypress(self, size, key): if key in ('esc', 'up', 'down'): if self.completion_state: if key == 'esc': self.hide_completion_menu() return None elif key in ('up', 'down'): # Передаем управление родителю для навигации по меню if self.parent and hasattr(self.parent, 'handle_completion_navigation'): return self.parent.handle_completion_navigation(key) return key if key == 'enter' and self.completion_state: # Получаем выбранный вариант из родителя if self.parent and hasattr(self.parent, 'get_selected_completion'): selected = self.parent.get_selected_completion() if selected is not None: start_pos, partial, matches, _ = self.completion_state username, _ = matches[selected] # Обновляем текст text = self.get_edit_text() new_text = text[:start_pos] + "@" + username + " " if len(text) > start_pos + len(partial) + 1: new_text += text[start_pos + len(partial) + 1:] self.set_edit_text(new_text) self.set_edit_pos(len(new_text)) # Скрываем меню self.hide_completion_menu() return None result = super().keypress(size, key) # Проверяем, нужно ли показать меню автодополнения if result is None and self.users_cache: text = self.get_edit_text() pos = self.edit_pos # Ищем @ перед курсором start_pos = text.rfind('@', 0, pos) if start_pos != -1 and start_pos < pos: partial = text[start_pos + 1:pos] if partial: matches = self.find_username_matches(partial) if matches: self.show_completion_menu(matches, start_pos, partial) else: self.hide_completion_menu() else: self.hide_completion_menu() else: self.hide_completion_menu() return result class ClockWidget(urwid.Text): """Виджет часов""" def __init__(self): super().__init__("") self.update_time() def update_time(self): """Обновляет отображаемое время""" now = datetime.datetime.now() date_str = now.strftime("%d.%m.%Y") time_str = now.strftime("%H:%M:%S") self.set_text(f"{date_str} {time_str}") return True class WallpaperWidget(urwid.WidgetWrap): """Виджет для отображения ASCII-обоев""" def __init__(self, content=""): self.content = content self.widget = self.create_widget() super().__init__(self.widget) def create_widget(self): """Создает виджет с обоями""" # Создаем виджет с обоями, используя Fill для заполнения всего пространства return urwid.AttrMap( urwid.Filler( urwid.Text(self.content), 'top' ), 'wallpaper' ) def set_content(self, content): """Обновляет содержимое обоев""" self.content = content self._w = self.create_widget() def selectable(self): return False class TelegramTUI: """Основной класс приложения""" palette = [ ('header', 'white', 'dark blue', 'bold'), ('footer', 'white', 'dark blue', 'bold'), ('bg', 'black', 'black'), ('selected', 'black', 'light gray'), ('chat', 'white', 'black'), ('chat_selected', 'black', 'light gray'), ('chat_name', 'light cyan', 'black', 'bold'), ('chat_message', 'light gray', 'black'), ('message_me', 'light green', 'black'), ('message_other', 'white', 'black'), ('help', 'yellow', 'black'), ('error', 'light red', 'black'), ('message_selected', 'black', 'light gray'), ('input_disabled', 'dark gray', 'black'), ('completion_normal', 'white', 'black'), ('completion_focus', 'black', 'light gray'), ('wallpaper', 'dark gray', 'black'), # Цвет для обоев ('main_content', 'white', 'black'), # Цвет для основного контента ] def __init__(self, telegram_client: TelegramClient): self.telegram_client = telegram_client self.current_screen = 'auth' # auth или chats self.phone = None self.code = None self.password = None self.auth_step = 'phone' # phone, code или password # Создаем виджеты авторизации self.phone_edit = urwid.Edit(('header', "Номер телефона: ")) self.code_edit = urwid.Edit(('header', "Код: ")) self.password_edit = urwid.Edit(('header', "Пароль: "), mask='*') self.error_text = urwid.Text(('error', "")) # Создаем виджеты чатов self.search_edit = SearchEdit( ('header', "Поиск: "), search_callback=self.update_chat_list ) self.chat_walker = urwid.SimpleFocusListWalker([]) self.chat_list = urwid.ListBox(self.chat_walker) self.message_walker = urwid.SimpleFocusListWalker([]) self.message_list = urwid.ListBox(self.message_walker) self.input_edit = InputEdit(('header', "Сообщение: "), telegram_client=telegram_client) # Создаем виджет часов self.clock = ClockWidget() # Создаем экраны self.auth_widget = urwid.Pile([ urwid.Text(('header', "\nДобро пожаловать в Telegram TUI\n"), align='center'), urwid.Divider(), self.phone_edit, self.code_edit, self.password_edit, urwid.Divider(), self.error_text, urwid.Text(('help', "Нажмите Enter для подтверждения"), align='center') ]) # Создаем левую панель (чаты) self.left_panel = urwid.LineBox( urwid.Pile([ ('pack', urwid.Text(('help', "Tab - переключение фокуса, ↑↓ - выбор чата, Enter - открыть чат, Esc - назад, / - поиск, [] - папки"), align='center')), ('pack', self.search_edit), urwid.BoxAdapter(self.chat_list, 30) # Фиксированная высота для списка чатов ]) ) # Создаем правую панель (сообщения) self.right_panel = urwid.LineBox( urwid.Pile([ self.message_list, ('pack', self.input_edit) ]) ) # Создаем основной виджет чатов self.chat_widget = urwid.Columns([ ('weight', 30, self.left_panel), ('weight', 70, self.right_panel) ]) # Создаем заголовок с часами header = urwid.Columns([ ('weight', 80, urwid.Text(' Telegram TUI', align='center')), ('pack', self.clock) ]) # Создаем основной контент self.main_content = urwid.Frame( urwid.Filler(self.auth_widget), header=urwid.AttrMap(header, 'header'), footer=urwid.AttrMap( urwid.Text(' Q: Выход | Tab: Переключение фокуса | Enter: Выбор/Отправка | Esc: Назад', align='center'), 'footer' ) ) # Добавляем виджет обоев self.wallpaper = WallpaperWidget() self.load_wallpaper() # Создаем основной виджет с обоями self.main_widget = urwid.Overlay( urwid.AttrMap(self.main_content, 'main_content'), self.wallpaper, 'center', ('relative', 100), 'middle', ('relative', 100) ) # Состояние чатов self.current_folder = None self.folders = [] self.chats = [] self.selected_chat_index = 0 self.focused_element = "chat_list" # chat_list, search, messages, input self.current_chat_id = None # Добавляем таймеры обновления self.chat_update_task = None self.message_update_task = None self.last_update_time = 0 self.update_interval = 3 # секунды для чатов self.message_update_interval = 1 # секунда для сообщений self.last_message_update_time = 0 # Добавляем отслеживание отправляемых сообщений self.pending_messages = {} # message_id -> widget # Добавляем обработчик обновления сообщений @telegram_client.on(events.MessageEdited()) async def handle_edit(event): try: if event.message.out: await self.update_message_status(event.message) except Exception as e: print(f"Ошибка обработки редактирования: {e}") @telegram_client.on(events.NewMessage()) async def handle_new(event): try: if event.message.out: await self.update_message_status(event.message) elif event.message.chat_id == self.current_chat_id: # Обновляем сообщения если это текущий чат self.last_message_update_time = 0 except Exception as e: print(f"Ошибка обработки нового сообщения: {e}") # Добавляем состояния для ответов self.selected_message = None self.replying_to = None self.can_send_messages = False self.ascii_cache = AsciiArtCache() self.media_cache = MediaCache(max_size_mb=1000) # 1GB по умолчанию # Добавляем состояние для меню автодополнения self.completion_listbox = None self.completion_overlay = None self.completion_focus = False # Новое состояние для отслеживания фокуса # Добавляем таймер обновления часов self.clock_update_task = None def load_wallpaper(self): """Загружает ASCII-обои""" try: # Получаем имя файла обоев из .env wallpaper_name = os.getenv('WALLPAPER', 'default') wallpaper_path = os.path.join('ascii', f"{wallpaper_name}.txt") # Проверяем существование файла if os.path.exists(wallpaper_path): with open(wallpaper_path, 'r') as f: content = f.read() # Обновляем виджет обоев self.wallpaper.set_content(content) else: print(f"Файл обоев не найден: {wallpaper_path}") # Используем простой паттерн self.wallpaper.set_content("░" * 80 + "\n" + ("░" + " " * 78 + "░\n") * 23 + "░" * 80) except Exception as e: print(f"Ошибка загрузки обоев: {e}") # В случае ошибки используем простой паттерн self.wallpaper.set_content("░" * 80 + "\n" + ("░" + " " * 78 + "░\n") * 23 + "░" * 80) def switch_screen(self, screen_name: str): """Переключение между экранами""" self.current_screen = screen_name if screen_name == 'auth': self.main_content.body = urwid.Filler(self.auth_widget) else: self.main_content.body = self.chat_widget async def handle_auth(self, key): """Обработка авторизации""" if key != 'enter': return try: if self.auth_step == 'phone': # Скрываем поля кода и пароля self.code_edit.set_edit_text("") self.password_edit.set_edit_text("") phone = normalize_text(self.phone_edit.get_edit_text()) if phone: self.phone = phone await self.telegram_client.send_code_request(phone=phone) self.auth_step = 'code' self.error_text.set_text(('help', "Код отправлен")) elif self.auth_step == 'code': code = normalize_text(self.code_edit.get_edit_text()) if code: try: await self.telegram_client.sign_in(phone=self.phone, code=code) self.switch_screen('chats') await self.update_chat_list() except SessionPasswordNeededError: self.auth_step = 'password' self.error_text.set_text(('help', "Требуется пароль")) elif self.auth_step == 'password': password = self.password_edit.get_edit_text() if password: await self.telegram_client.sign_in(password=password) self.switch_screen('chats') await self.update_chat_list() except Exception as e: self.error_text.set_text(('error', str(e))) async def update_chat_list(self): """Обновляет список чатов""" try: # Сохраняем текущий фокус и ID выбранного чата current_focus = self.chat_list.focus_position if self.chat_walker else 0 current_chat_id = self.current_chat_id # Получаем папки if not self.folders: try: folders = await self.telegram_client.get_dialogs(folder=1) if folders: self.folders = [0, 1] else: self.folders = [0] print(f"Доступные папки: {self.folders}") except Exception as e: print(f"Ошибка получения папок: {e}") self.folders = [0] # Получаем диалоги try: dialogs = await self.telegram_client.get_dialogs( limit=50, # Уменьшаем лимит для стабильности folder=self.current_folder ) except Exception as e: print(f"Ошибка получения диалогов: {e}") dialogs = [] # Фильтруем по поисковому запросу search_query = normalize_text(self.search_edit.get_edit_text().lower()) if search_query: filtered_dialogs = [] for dialog in dialogs: try: name = "" if hasattr(dialog.entity, 'title') and dialog.entity.title: name = dialog.entity.title elif hasattr(dialog.entity, 'first_name'): name = dialog.entity.first_name if hasattr(dialog.entity, 'last_name') and dialog.entity.last_name: name += f" {dialog.entity.last_name}" last_message = "" if dialog.message and hasattr(dialog.message, 'message'): last_message = dialog.message.message if (search_query in normalize_text(name).lower() or search_query in normalize_text(last_message).lower()): filtered_dialogs.append(dialog) except Exception as e: print(f"Ошибка фильтрации диалога: {e}") dialogs = filtered_dialogs # Сохраняем старые чаты для сравнения old_chats = {chat.chat_id: chat for chat in self.chat_walker} # Очищаем список self.chat_walker[:] = [] # Добавляем чаты restored_focus = False for i, dialog in enumerate(dialogs): try: entity = dialog.entity if hasattr(entity, 'title') and entity.title: name = entity.title elif hasattr(entity, 'first_name'): name = entity.first_name if hasattr(entity, 'last_name') and entity.last_name: name += f" {entity.last_name}" else: name = "Без названия" if dialog.message: message = dialog.message.message if hasattr(dialog.message, 'message') else "" else: message = "" # Проверяем, был ли этот чат раньше old_chat = old_chats.get(dialog.id) is_selected = (dialog.id == current_chat_id) chat = ChatWidget( chat_id=dialog.id, name=name, message=message, is_selected=is_selected, folder=1 if self.current_folder else 0 ) self.chat_walker.append(chat) # Восстанавливаем фокус если это текущий чат if dialog.id == current_chat_id and not restored_focus: current_focus = i restored_focus = True except Exception as e: print(f"Ошибка создания виджета чата: {e}") # Восстанавливаем фокус if self.chat_walker: if current_focus >= len(self.chat_walker): current_focus = len(self.chat_walker) - 1 self.chat_list.set_focus(max(0, current_focus)) self.selected_chat_index = current_focus self.update_selected_chat() except Exception as e: print(f"Ошибка обновления чатов: {e}") def update_selected_chat(self): """Обновляет выделение выбранного чата""" try: for i, chat in enumerate(self.chat_walker): was_selected = chat.is_selected chat.is_selected = (i == self.selected_chat_index) if was_selected != chat.is_selected: chat.update_widget() except Exception as e: print(f"Ошибка обновления выделения: {e}") async def process_media(self, message): """Обрабатывает медиа в сообщении""" try: media_type = None media_data = None if message.photo: media_type = 'photo' elif message.video: media_type = 'video' elif message.audio: media_type = 'audio' elif message.voice: media_type = 'voice' elif message.document: media_type = 'document' elif message.sticker: media_type = 'sticker' elif getattr(message, 'gif', None): media_type = 'gif' if media_type: # Для фото проверяем сначала кэш ASCII-арта if media_type == 'photo': # Получаем ID фото для кэша photo_id = message.photo.id cache_key = f"photo_{photo_id}" # Проверяем кэш ASCII-арта try: with open(os.path.join('pics', f"{cache_key}.txt"), 'r') as f: return f.read() except FileNotFoundError: # Если нет в кэше, загружаем и конвертируем media_data = await self.telegram_client.download_media(message.media, bytes) if media_data: ascii_art = self.ascii_cache.get_cached_art(media_data) return ascii_art # Для остальных типов медиа media_data = await self.telegram_client.download_media(message.media, bytes) if media_data: # Сохраняем в кэш cached_path = self.media_cache.get_cached_file(media_data, media_type) if cached_path: # Для других типов возвращаем описание size_mb = len(media_data) / (1024 * 1024) return f"[{media_type.upper()}: {size_mb:.1f}MB - {cached_path.name}]" return None except Exception as e: print(f"Ошибка обработки медиа: {e}") return f"[Ошибка обработки {media_type if media_type else 'медиа'}]" async def message_update_loop(self): """Цикл обновления сообщений""" while True: try: if self.current_chat_id: current_time = datetime.datetime.now().timestamp() if current_time - self.last_message_update_time >= self.message_update_interval: await self.update_message_list(self.current_chat_id) self.last_message_update_time = current_time await asyncio.sleep(1) except Exception as e: print(f"Ошибка в цикле обновления сообщений: {e}") await asyncio.sleep(1) async def update_message_list(self, chat_id): """Обновляет список сообщений""" try: if not chat_id: self.message_walker[:] = [] return # Сохраняем текущий фокус и ID выбранного сообщения current_focus = self.message_list.focus_position if self.message_walker else None selected_message_id = self.selected_message # Получаем сообщения messages = await self.telegram_client.get_messages( entity=chat_id, limit=30 ) # Получаем информацию о себе me = await self.telegram_client.get_me() # Сохраняем отслеживаемые сообщения и их состояния tracked_messages = { msg_id: widget for msg_id, widget in self.pending_messages.items() } # Сохраняем старые сообщения для переиспользования old_messages = { msg.message_id: msg for msg in self.message_walker } # Создаем новый список сообщений new_messages = [] new_focus = None for i, msg in enumerate(reversed(messages)): try: # Проверяем, есть ли сообщение в старом списке old_widget = old_messages.get(msg.id) if old_widget: # Переиспользуем существующий виджет new_messages.append(old_widget) if msg.id == selected_message_id: new_focus = len(new_messages) - 1 continue # Создаем новый виджет только для новых сообщений is_me = False if hasattr(msg, 'from_id') and msg.from_id: if hasattr(msg.from_id, 'user_id'): is_me = msg.from_id.user_id == me.id text = msg.message if hasattr(msg, 'message') else "" media_data = None # Проверяем наличие медиа только для новых сообщений if msg.media: media_data = await self.process_media(msg) if media_data: if not text: text = media_data else: text = media_data + "\n" + text username = "" if hasattr(msg, 'sender') and msg.sender: if hasattr(msg.sender, 'first_name'): username = msg.sender.first_name if hasattr(msg.sender, 'last_name') and msg.sender.last_name: username += f" {msg.sender.last_name}" elif hasattr(msg.sender, 'title'): username = msg.sender.title if not username: username = "Я" if is_me else "Неизвестный" status = "" if is_me: if msg.id in tracked_messages: status = tracked_messages[msg.id].status else: status = "✓✓" message = MessageWidget( message_id=msg.id, text=text, username=username, is_me=is_me, send_time=msg.date.strftime("%H:%M"), status=status, is_selected=(msg.id == selected_message_id), media_data=media_data ) if msg.id in tracked_messages: self.pending_messages[msg.id] = message new_messages.append(message) if msg.id == selected_message_id: new_focus = len(new_messages) - 1 except Exception as e: print(f"Ошибка создания виджета сообщения: {e}") # Проверяем, изменился ли список сообщений if new_messages: current_messages = list(self.message_walker) if len(current_messages) != len(new_messages) or any(a.message_id != b.message_id for a, b in zip(current_messages, new_messages)): # Обновляем список только если есть изменения self.message_walker[:] = new_messages # Восстанавливаем фокус if new_focus is not None: self.message_list.set_focus(new_focus) elif new_messages: self.message_list.set_focus(len(new_messages) - 1) except Exception as e: print(f"Ошибка обновления сообщений: {e}") # В случае ошибки НЕ очищаем список # self.message_walker[:] = [] async def check_chat_permissions(self, chat_id): """Проверяет права на отправку сообщений в чате""" try: # Получаем информацию о чате chat = await self.telegram_client.get_entity(chat_id) # Проверяем, является ли чат каналом if hasattr(chat, 'broadcast') and chat.broadcast: # Для каналов проверяем права администратора participant = await self.telegram_client.get_permissions(chat) self.can_send_messages = participant.is_admin else: # Для всех остальных чатов разрешаем отправку self.can_send_messages = True # Обновляем видимость поля ввода self.update_input_visibility() except Exception as e: print(f"Ошибка проверки прав: {e}") # В случае ошибки разрешаем отправку для не-каналов self.can_send_messages = not (hasattr(chat, 'broadcast') and chat.broadcast) self.update_input_visibility() def update_input_visibility(self): """Обновляет видимость поля ввода""" if self.can_send_messages: if self.replying_to: self.input_edit = InputEdit( ('header', f"Ответ на '{self.replying_to[:30]}...': "), telegram_client=self.telegram_client, chat_id=self.current_chat_id ) else: self.input_edit = InputEdit( ('header', "Сообщение: "), telegram_client=self.telegram_client, chat_id=self.current_chat_id ) # Устанавливаем ссылку на родительский виджет self.input_edit.parent = self # Обновляем кэш пользователей asyncio.create_task(self.input_edit.update_users_cache()) else: self.input_edit = InputEdit( ('input_disabled', "Отправка сообщений недоступна"), telegram_client=self.telegram_client, chat_id=self.current_chat_id ) self.input_edit.parent = self self.input_edit.set_edit_text("") # Обновляем правую панель if len(self.right_panel.original_widget.widget_list) > 1: self.right_panel.original_widget.widget_list[-1] = self.input_edit async def update_message_status(self, message): """Обновляет статус сообщения""" try: if message.id in self.pending_messages: widget = self.pending_messages[message.id] # Определяем статус if getattr(message, 'from_id', None): widget.status = "✓✓" # Доставлено else: widget.status = "✓" # Отправлено widget.update_widget() # Если сообщение доставлено, удаляем из отслеживания if widget.status == "✓✓": del self.pending_messages[message.id] except Exception as e: print(f"Ошибка обновления статуса: {e}") async def handle_chat_input(self, key): """Обработка ввода в экране чатов""" try: if key == 'reply' and self.focused_element == "messages": # Получаем выбранное сообщение if self.message_walker and self.message_list.focus is not None: msg_widget = self.message_walker[self.message_list.focus_position] self.selected_message = msg_widget.message_id self.replying_to = msg_widget.text self.update_input_visibility() # Переключаемся на ввод self.focused_element = "input" self.right_panel.original_widget.focus_position = 1 return elif key == 'esc': if self.replying_to: # Отменяем ответ self.selected_message = None self.replying_to = None self.update_input_visibility() return if self.focused_element in ("input", "messages"): # Закрываем текущий чат self.current_chat_id = None self.message_walker[:] = [] self.input_edit.set_edit_text("") self.focused_element = "chat_list" self.chat_widget.focus_position = 0 self.left_panel.original_widget.focus_position = 2 elif self.focused_element == "search": self.search_edit.set_edit_text("") await self.update_chat_list() self.focused_element = "chat_list" self.left_panel.original_widget.focus_position = 2 elif key == 'enter': if self.focused_element == "chat_list" and self.chat_walker: try: focused = self.chat_walker[self.chat_list.focus_position] if focused.chat_id != self.current_chat_id: self.current_chat_id = focused.chat_id self.selected_chat_index = self.chat_list.focus_position # Проверяем права при открытии чата await self.check_chat_permissions(focused.chat_id) # Сбрасываем таймеры обновления перед загрузкой сообщений self.last_message_update_time = 0 self.last_update_time = 0 await self.update_message_list(focused.chat_id) self.focused_element = "input" self.chat_widget.focus_position = 1 self.right_panel.original_widget.focus_position = 1 except Exception as e: print(f"Ошибка при открытии чата: {e}") elif self.focused_element == "input" and self.current_chat_id and self.can_send_messages: message = self.input_edit.get_edit_text() if message.strip(): try: # Создаем виджет сообщения до отправки now = datetime.datetime.now() msg_widget = MessageWidget( message_id=0, # Временный ID text=message, username="Я", is_me=True, send_time=now.strftime("%H:%M"), status="⋯" # Отправляется ) # Добавляем сообщение в список сразу self.message_walker.append(msg_widget) self.message_list.set_focus(len(self.message_walker) - 1) # Отправляем сообщение sent_message = await self.telegram_client.send_message( self.current_chat_id, message, reply_to=self.selected_message ) # Обновляем ID сообщения и добавляем в отслеживание msg_widget.message_id = sent_message.id self.pending_messages[sent_message.id] = msg_widget # Обновляем статус msg_widget.status = "✓" msg_widget.update_widget() # Очищаем поле ввода и сбрасываем ответ self.input_edit.set_edit_text("") self.selected_message = None self.replying_to = None self.update_input_visibility() # Форсируем обновление списка сообщений await self.update_message_list(self.current_chat_id) except Exception as e: print(f"Ошибка отправки сообщения: {e}") # Удаляем виджет сообщения в случае ошибки if msg_widget in self.message_walker: self.message_walker.remove(msg_widget) elif key == 'tab': if self.focused_element == "search": self.focused_element = "chat_list" self.left_panel.original_widget.focus_position = 2 elif self.focused_element == "chat_list": if self.current_chat_id: self.focused_element = "messages" self.chat_widget.focus_position = 1 self.right_panel.original_widget.focus_position = 0 else: self.focused_element = "search" self.left_panel.original_widget.focus_position = 1 elif self.focused_element == "messages": self.focused_element = "input" self.right_panel.original_widget.focus_position = 1 elif self.focused_element == "input": self.focused_element = "search" self.chat_widget.focus_position = 0 self.left_panel.original_widget.focus_position = 1 elif key in ('up', 'down'): if self.focused_element == "chat_list" and self.chat_walker: if key == 'up': if self.chat_list.focus_position > 0: self.chat_list.focus_position -= 1 else: if self.chat_list.focus_position < len(self.chat_walker) - 1: self.chat_list.focus_position += 1 # Обновляем выделение self.selected_chat_index = self.chat_list.focus_position self.update_selected_chat() # Если чат открыт, обновляем его содержимое if self.current_chat_id: focused = self.chat_walker[self.selected_chat_index] if self.current_chat_id != focused.chat_id: self.current_chat_id = focused.chat_id await self.update_message_list(focused.chat_id) elif self.focused_element == "messages" and self.message_walker: if key == 'up': if self.message_list.focus_position > 0: self.message_list.focus_position -= 1 else: if self.message_list.focus_position < len(self.message_walker) - 1: self.message_list.focus_position += 1 except Exception as e: print(f"Ошибка обработки ввода: {e}") # Восстанавливаем состояние в случае ошибки self.focused_element = "chat_list" self.chat_widget.focus_position = 0 def unhandled_input(self, key): """Обработка необработанных нажатий клавиш""" if key in ('q', 'Q'): raise urwid.ExitMainLoop() # Обрабатываем клавиши для меню автодополнения if self.completion_overlay and self.completion_focus: if key == 'tab': # Переключаем фокус между полем ввода и списком self.completion_focus = not self.completion_focus return None elif key in ('up', 'down'): # Обрабатываем навигацию только если фокус на списке if self.completion_focus: if key == 'up' and self.completion_listbox.focus_position > 0: self.completion_listbox.focus_position -= 1 elif key == 'down' and self.completion_listbox.focus_position < len(self.completion_listbox.body) - 1: self.completion_listbox.focus_position += 1 return None elif key == 'enter' and self.completion_focus: # Выбираем текущий вариант if self.input_edit and self.input_edit.completion_state: selected = self.get_selected_completion() if selected is not None: start_pos, partial, matches, _ = self.input_edit.completion_state username, _ = matches[selected] # Обновляем текст text = self.input_edit.get_edit_text() new_text = text[:start_pos] + "@" + username + " " if len(text) > start_pos + len(partial) + 1: new_text += text[start_pos + len(partial) + 1:] self.input_edit.set_edit_text(new_text) self.input_edit.set_edit_pos(len(new_text)) # Скрываем меню self.hide_completion_overlay() return None elif key == 'esc': self.hide_completion_overlay() return None # Создаем задачу для асинхронной обработки if self.current_screen == 'auth': asyncio.create_task(self.handle_auth(key)) else: asyncio.create_task(self.handle_chat_input(key)) async def start_auto_updates(self): """Запускает автоматическое обновление чатов и сообщений""" if self.chat_update_task: self.chat_update_task.cancel() if self.message_update_task: self.message_update_task.cancel() if self.clock_update_task: self.clock_update_task.cancel() async def chat_update_loop(): while True: try: current_time = datetime.datetime.now().timestamp() if current_time - self.last_update_time >= self.update_interval: await self.update_chat_list() self.last_update_time = current_time await asyncio.sleep(3) except Exception as e: print(f"Ошибка в цикле обновления чатов: {e}") await asyncio.sleep(3) self.chat_update_task = asyncio.create_task(chat_update_loop()) self.message_update_task = asyncio.create_task(self.message_update_loop()) self.clock_update_task = asyncio.create_task(self.update_clock()) async def stop_auto_updates(self): """Останавливает автоматическое обновление""" if self.chat_update_task: self.chat_update_task.cancel() self.chat_update_task = None if self.message_update_task: self.message_update_task.cancel() self.message_update_task = None if self.clock_update_task: self.clock_update_task.cancel() self.clock_update_task = None async def run(self): """Запуск приложения""" try: # Подключаемся к Telegram await self.telegram_client.connect() print("Подключено к Telegram") # Проверяем авторизацию if await self.telegram_client.is_user_authorized(): self.switch_screen('chats') await self.update_chat_list() # Запускаем автообновление await self.start_auto_updates() else: self.switch_screen('auth') # Создаем event loop для urwid event_loop = urwid.AsyncioEventLoop(loop=asyncio.get_event_loop()) # Запускаем интерфейс urwid.MainLoop( self.main_widget, self.palette, event_loop=event_loop, unhandled_input=self.unhandled_input ).run() except Exception as e: print(f"Ошибка при запуске приложения: {e}") finally: # Останавливаем автообновление await self.stop_auto_updates() if self.telegram_client and self.telegram_client.is_connected(): await self.telegram_client.disconnect() print("Отключено от Telegram") def show_completion_overlay(self, matches): """Показывает оверлей с меню автодополнения""" # Создаем список вариантов items = [] for username, full_name in matches: text = f"@{username} ({full_name})" items.append(urwid.AttrMap( urwid.Text(text), 'completion_normal', 'completion_focus' )) # Создаем меню self.completion_listbox = urwid.ListBox(urwid.SimpleFocusListWalker(items)) box = urwid.LineBox( urwid.BoxAdapter(self.completion_listbox, min(len(matches), 5)), title="Автодополнение" ) # Создаем оверлей if not hasattr(self, 'main_frame'): self.main_frame = self.main_widget self.completion_overlay = urwid.Overlay( box, self.main_frame, 'left', ('relative', 30), 'top', ('relative', 20), ('relative', 50), ('relative', 30) ) # Обновляем главный виджет self.main_widget = self.completion_overlay self.completion_focus = True def hide_completion_overlay(self): """Скрывает оверлей с меню автодополнения""" if self.completion_overlay: self.main_widget = self.main_frame self.completion_overlay = None self.completion_listbox = None self.completion_focus = False def handle_completion_navigation(self, key): """Обрабатывает навигацию по меню автодополнения""" if self.completion_listbox: if key == 'up' and self.completion_listbox.focus_position > 0: self.completion_listbox.focus_position -= 1 return None elif key == 'down' and self.completion_listbox.focus_position < len(self.completion_listbox.body) - 1: self.completion_listbox.focus_position += 1 return None return key def get_selected_completion(self): """Возвращает индекс выбранного варианта автодополнения""" if self.completion_listbox: return self.completion_listbox.focus_position return None async def update_clock(self): """Обновляет время в виджете часов""" while True: try: self.clock.update_time() await asyncio.sleep(1) except Exception as e: print(f"Ошибка обновления часов: {e}") await asyncio.sleep(1) async def main(): # Загружаем переменные окружения load_dotenv() # Проверяем наличие API ключей api_id = os.getenv("API_ID") api_hash = os.getenv("API_HASH") if not api_id or not api_hash: print("API_ID и API_HASH не найдены в .env файле.") print("Пожалуйста, скопируйте .env.example в .env и заполните свои ключи.") return # Преобразуем API_ID в число api_id = int(api_id) # Инициализируем клиент Telegram session_file = "talc.session" # Создаем клиент client = TelegramClient( session_file, api_id=api_id, api_hash=api_hash, system_version="macOS 14.3.1", device_model="MacBook", app_version="1.0" ) # Создаем и запускаем приложение app = TelegramTUI(client) await app.run() if __name__ == "__main__": try: asyncio.run(main()) except Exception as e: print(f"Ошибка при запуске приложения: {e}")