#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Telegram TUI Client Консольный клиент Telegram на базе urwid """ import urwid import asyncio import os import nest_asyncio import unicodedata import emoji from telethon import TelegramClient, events, utils from telethon.errors import SessionPasswordNeededError from dotenv import load_dotenv import datetime from PIL import Image import io import hashlib import json # Разрешаем вложенные event loops nest_asyncio.apply() def normalize_text(text: str) -> str: """Нормализует текст для корректного отображения""" if not text: return "" try: # Преобразуем в строку, если это не строка text = str(text) # Удаляем эмодзи text = emoji.replace_emoji(text, '') # Нормализуем Unicode text = unicodedata.normalize('NFKC', text) # Заменяем специальные символы на их ASCII-эквиваленты text = text.replace('—', '-').replace('–', '-').replace('…', '...') # Удаляем все управляющие символы, кроме новой строки и табуляции text = ''.join(char for char in text if unicodedata.category(char)[0] != 'C' or char in ('\n', '\t')) # Удаляем множественные пробелы text = ' '.join(text.split()) return text except Exception as e: print(f"Ошибка нормализации текста: {e}") return "Ошибка отображения" def image_to_ascii(image_data, max_width=80, max_height=24): """Конвертирует изображение в ASCII-арт""" try: # Открываем изображение из байтов image = Image.open(io.BytesIO(image_data)) # Конвертируем в оттенки серого image = image.convert('L') # Определяем новые размеры с сохранением пропорций width, height = image.size aspect_ratio = height/width new_width = min(max_width, width) new_height = int(new_width * aspect_ratio * 0.5) # * 0.5 потому что символы в терминале выше, чем шире if new_height > max_height: new_height = max_height new_width = int(new_height / aspect_ratio * 2) # Изменяем размер image = image.resize((new_width, new_height)) # Символы от темного к светлому ascii_chars = '@%#*+=-:. ' # Конвертируем пиксели в ASCII pixels = image.getdata() ascii_str = '' for i, pixel in enumerate(pixels): ascii_str += ascii_chars[pixel//32] # 256//32 = 8 уровней if (i + 1) % new_width == 0: ascii_str += '\n' # Добавляем рамку вокруг ASCII-арта для стабильности lines = ascii_str.split('\n') if lines and lines[-1] == '': lines = lines[:-1] # Удаляем последнюю пустую строку max_line_length = max(len(line) for line in lines) border_top = '┌' + '─' * max_line_length + '┐\n' border_bottom = '└' + '─' * max_line_length + '┘' framed_ascii = border_top for line in lines: padding = ' ' * (max_line_length - len(line)) framed_ascii += '│' + line + padding + '│\n' framed_ascii += border_bottom return framed_ascii except Exception as e: print(f"Ошибка конвертации изображения: {e}") return "[Ошибка конвертации изображения]" class AsciiArtCache: """Класс для кэширования ASCII-артов""" def __init__(self, cache_dir='pics'): self.cache_dir = cache_dir os.makedirs(cache_dir, exist_ok=True) self.index_file = os.path.join(cache_dir, 'index.json') self.load_index() def load_index(self): """Загружает индекс кэшированных изображений""" try: with open(self.index_file, 'r') as f: self.index = json.load(f) except (FileNotFoundError, json.JSONDecodeError): self.index = {} self.save_index() def save_index(self): """Сохраняет индекс кэшированных изображений""" with open(self.index_file, 'w') as f: json.dump(self.index, f) def get_cache_key(self, image_data): """Генерирует ключ кэша для изображения""" return hashlib.md5(image_data).hexdigest() def get_cached_art(self, image_data): """Получает ASCII-арт из кэша или создает новый""" cache_key = self.get_cache_key(image_data) # Проверяем наличие в индексе if cache_key in self.index: try: with open(os.path.join(self.cache_dir, cache_key + '.txt'), 'r') as f: return f.read() except FileNotFoundError: pass # Если нет в кэше, создаем новый ascii_art = image_to_ascii(image_data) # Сохраняем в кэш with open(os.path.join(self.cache_dir, cache_key + '.txt'), 'w') as f: f.write(ascii_art) # Обновляем индекс self.index[cache_key] = { 'created_at': datetime.datetime.now().isoformat(), 'size': len(image_data) } self.save_index() return ascii_art class ChatWidget(urwid.WidgetWrap): """Виджет чата""" def __init__(self, chat_id, name, message="", is_selected=False, folder=0): self.chat_id = chat_id self.name = normalize_text(name) self.message = normalize_text(message) self.is_selected = is_selected self.folder = folder self.has_focus = False # Создаем содержимое виджета self.update_widget() super().__init__(self.widget) def update_widget(self): """Обновляет внешний вид виджета""" # Подготавливаем данные name = self.name if self.name else "Без названия" msg = self.message if self.message else "Нет сообщений" if len(msg) > 50: msg = msg[:47] + "..." # Добавляем метку папки если нужно if self.folder == 1: name += " [Архив]" # Получаем первую букву для аватара first_letter = next((c for c in name if c.isprintable()), "?") # Определяем стиль if self.has_focus: style = 'selected' elif self.is_selected: style = 'chat_selected' else: style = 'chat' # Создаем виджеты avatar = urwid.AttrMap( urwid.Text(f" {first_letter} ", align='center'), style ) content = urwid.Pile([ urwid.AttrMap( urwid.Text(name), style ), urwid.AttrMap( urwid.Text(msg), style ) ]) self.widget = urwid.AttrMap( urwid.Columns([ ('fixed', 3, avatar), content ]), None ) def selectable(self): return True def render(self, size, focus=False): if self.has_focus != focus: self.has_focus = focus self.update_widget() return super().render(size, focus) def keypress(self, size, key): if key == 'enter': return key elif key == 'tab': return key elif key in ('up', 'down'): return key return key class MessageWidget(urwid.WidgetWrap): """Виджет сообщения""" def __init__(self, message_id, text="", username="", is_me=False, send_time="", status="", is_selected=False, media_data=None): self.message_id = message_id self.text = normalize_text(text) self.username = normalize_text(username) self.is_me = is_me self.send_time = send_time self.status = status self.is_selected = is_selected self._media_data = None self._cached_content = None self.set_media_data(media_data) # Создаем содержимое виджета self.update_widget() super().__init__(self.widget) def set_media_data(self, media_data): """Устанавливает медиа-данные и обновляет кэш""" if self._media_data != media_data: self._media_data = media_data self._cached_content = None def get_content(self): """Получает содержимое сообщения с кэшированием""" if self._cached_content is None: text = self.text if self.text else "Пустое сообщение" if self._media_data: text = self._media_data + "\n" + text self._cached_content = text return self._cached_content def update_widget(self): """Обновляет внешний вид виджета""" # Подготавливаем текст username = self.username if self.username else "Неизвестный" # Добавляем статус к времени для исходящих сообщений time_text = self.send_time if self.is_me and self.status: time_text = f"{self.send_time} {self.status}" # Создаем заголовок header = urwid.Columns([ urwid.Text(username), ('fixed', 10 if self.is_me else 5, urwid.Text(time_text, align='right')) ]) # Определяем стиль style = 'message_selected' if self.is_selected else ('message_me' if self.is_me else 'message_other') # Создаем виджет с фиксированной шириной для ASCII-арта content = urwid.Text(self.get_content()) if self._media_data: content = urwid.BoxAdapter(urwid.Filler(content), len(self._media_data.split('\n'))) # Создаем виджет self.widget = urwid.AttrMap( urwid.Pile([ urwid.AttrMap(header, 'chat_name'), content ]), style ) def selectable(self): return True def keypress(self, size, key): if key == 'ctrl r': return 'reply' return key class SearchEdit(urwid.Edit): def __init__(self, *args, **kwargs): self.search_callback = kwargs.pop('search_callback', None) super().__init__(*args, **kwargs) def keypress(self, size, key): if key in ('up', 'down', 'esc', 'enter'): return key result = super().keypress(size, key) # Вызываем поиск при каждом изменении текста if self.search_callback and result is None: asyncio.create_task(self.search_callback()) return result class InputEdit(urwid.Edit): def keypress(self, size, key): if key in ('esc', 'up', 'down'): return key return super().keypress(size, key) class TelegramTUI: """Основной класс приложения""" palette = [ ('header', 'white', 'dark blue', 'bold'), ('footer', 'white', 'dark blue', 'bold'), ('bg', 'white', 'black'), ('selected', 'black', 'light gray'), ('chat', 'white', 'black'), ('chat_selected', 'black', 'light gray'), ('chat_name', 'light cyan', 'black', 'bold'), ('chat_message', 'light gray', 'black'), ('message_me', 'light green', 'black'), ('message_other', 'white', 'black'), ('help', 'yellow', 'black'), ('error', 'light red', 'black'), ('message_selected', 'black', 'light gray'), ('input_disabled', 'dark gray', 'black'), ] def __init__(self, telegram_client: TelegramClient): self.telegram_client = telegram_client self.current_screen = 'auth' # auth или chats self.phone = None self.code = None self.password = None self.auth_step = 'phone' # phone, code или password # Создаем виджеты авторизации self.phone_edit = urwid.Edit(('header', "Номер телефона: ")) self.code_edit = urwid.Edit(('header', "Код: ")) self.password_edit = urwid.Edit(('header', "Пароль: "), mask='*') self.error_text = urwid.Text(('error', "")) # Создаем виджеты чатов self.search_edit = SearchEdit( ('header', "Поиск: "), search_callback=self.update_chat_list ) self.chat_walker = urwid.SimpleFocusListWalker([]) self.chat_list = urwid.ListBox(self.chat_walker) self.message_walker = urwid.SimpleFocusListWalker([]) self.message_list = urwid.ListBox(self.message_walker) self.input_edit = InputEdit(('header', "Сообщение: ")) # Создаем экраны self.auth_widget = urwid.Filler( urwid.Pile([ urwid.Text(('header', "\nДобро пожаловать в Telegram TUI\n"), align='center'), urwid.Divider(), self.phone_edit, self.code_edit, self.password_edit, urwid.Divider(), self.error_text, urwid.Text(('help', "Нажмите Enter для подтверждения"), align='center') ]) ) # Создаем левую панель (чаты) self.left_panel = urwid.LineBox( urwid.Pile([ ('pack', urwid.Text(('help', "Tab - переключение фокуса, ↑↓ - выбор чата, Enter - открыть чат, Esc - назад, / - поиск, [] - папки"), align='center')), ('pack', self.search_edit), urwid.BoxAdapter(self.chat_list, 30) # Фиксированная высота для списка чатов ]) ) # Создаем правую панель (сообщения) self.right_panel = urwid.LineBox( urwid.Pile([ self.message_list, ('pack', self.input_edit) ]) ) # Создаем основной виджет чатов self.chat_widget = urwid.Columns([ ('weight', 30, self.left_panel), ('weight', 70, self.right_panel) ]) # Создаем основной виджет self.main_widget = urwid.Frame( self.auth_widget, header=urwid.AttrMap( urwid.Text(' Telegram TUI', align='center'), 'header' ), footer=urwid.AttrMap( urwid.Text(' Q: Выход | Tab: Переключение фокуса | Enter: Выбор/Отправка | Esc: Назад', align='center'), 'footer' ) ) # Состояние чатов self.current_folder = None self.folders = [] self.chats = [] self.selected_chat_index = 0 self.focused_element = "chat_list" # chat_list, search, messages, input self.current_chat_id = None # Добавляем таймеры обновления self.chat_update_task = None self.message_update_task = None self.last_update_time = 0 self.update_interval = 3 # секунды для чатов self.message_update_interval = 1 # секунда для сообщений self.last_message_update_time = 0 # Добавляем отслеживание отправляемых сообщений self.pending_messages = {} # message_id -> widget # Добавляем обработчик обновления сообщений @telegram_client.on(events.MessageEdited()) async def handle_edit(event): try: if event.message.out: await self.update_message_status(event.message) except Exception as e: print(f"Ошибка обработки редактирования: {e}") @telegram_client.on(events.NewMessage()) async def handle_new(event): try: if event.message.out: await self.update_message_status(event.message) elif event.message.chat_id == self.current_chat_id: # Обновляем сообщения если это текущий чат self.last_message_update_time = 0 except Exception as e: print(f"Ошибка обработки нового сообщения: {e}") # Добавляем состояния для ответов self.selected_message = None self.replying_to = None self.can_send_messages = False self.ascii_cache = AsciiArtCache() def switch_screen(self, screen_name: str): """Переключение между экранами""" self.current_screen = screen_name if screen_name == 'auth': self.main_widget.body = self.auth_widget elif screen_name == 'chats': self.main_widget.body = self.chat_widget async def handle_auth(self, key): """Обработка авторизации""" if key != 'enter': return try: if self.auth_step == 'phone': phone = normalize_text(self.phone_edit.get_edit_text()) if phone: self.phone = phone await self.telegram_client.send_code_request(phone=phone) self.auth_step = 'code' self.error_text.set_text(('help', "Код отправлен")) elif self.auth_step == 'code': code = normalize_text(self.code_edit.get_edit_text()) if code: try: await self.telegram_client.sign_in(phone=self.phone, code=code) self.switch_screen('chats') await self.update_chat_list() except SessionPasswordNeededError: self.auth_step = 'password' self.error_text.set_text(('help', "Требуется пароль")) elif self.auth_step == 'password': password = self.password_edit.get_edit_text() if password: await self.telegram_client.sign_in(password=password) self.switch_screen('chats') await self.update_chat_list() except Exception as e: self.error_text.set_text(('error', str(e))) async def update_chat_list(self): """Обновляет список чатов""" try: # Сохраняем текущий фокус и ID выбранного чата current_focus = self.chat_list.focus_position if self.chat_walker else 0 current_chat_id = self.current_chat_id # Получаем папки if not self.folders: try: folders = await self.telegram_client.get_dialogs(folder=1) if folders: self.folders = [0, 1] else: self.folders = [0] print(f"Доступные папки: {self.folders}") except Exception as e: print(f"Ошибка получения папок: {e}") self.folders = [0] # Получаем диалоги try: dialogs = await self.telegram_client.get_dialogs( limit=50, # Уменьшаем лимит для стабильности folder=self.current_folder ) except Exception as e: print(f"Ошибка получения диалогов: {e}") dialogs = [] # Фильтруем по поисковому запросу search_query = normalize_text(self.search_edit.get_edit_text().lower()) if search_query: filtered_dialogs = [] for dialog in dialogs: try: name = "" if hasattr(dialog.entity, 'title') and dialog.entity.title: name = dialog.entity.title elif hasattr(dialog.entity, 'first_name'): name = dialog.entity.first_name if hasattr(dialog.entity, 'last_name') and dialog.entity.last_name: name += f" {dialog.entity.last_name}" last_message = "" if dialog.message and hasattr(dialog.message, 'message'): last_message = dialog.message.message if (search_query in normalize_text(name).lower() or search_query in normalize_text(last_message).lower()): filtered_dialogs.append(dialog) except Exception as e: print(f"Ошибка фильтрации диалога: {e}") dialogs = filtered_dialogs # Сохраняем старые чаты для сравнения old_chats = {chat.chat_id: chat for chat in self.chat_walker} # Очищаем список self.chat_walker[:] = [] # Добавляем чаты restored_focus = False for i, dialog in enumerate(dialogs): try: entity = dialog.entity if hasattr(entity, 'title') and entity.title: name = entity.title elif hasattr(entity, 'first_name'): name = entity.first_name if hasattr(entity, 'last_name') and entity.last_name: name += f" {entity.last_name}" else: name = "Без названия" if dialog.message: message = dialog.message.message if hasattr(dialog.message, 'message') else "" else: message = "" # Проверяем, был ли этот чат раньше old_chat = old_chats.get(dialog.id) is_selected = (dialog.id == current_chat_id) chat = ChatWidget( chat_id=dialog.id, name=name, message=message, is_selected=is_selected, folder=1 if self.current_folder else 0 ) self.chat_walker.append(chat) # Восстанавливаем фокус если это текущий чат if dialog.id == current_chat_id and not restored_focus: current_focus = i restored_focus = True except Exception as e: print(f"Ошибка создания виджета чата: {e}") # Восстанавливаем фокус if self.chat_walker: if current_focus >= len(self.chat_walker): current_focus = len(self.chat_walker) - 1 self.chat_list.set_focus(max(0, current_focus)) self.selected_chat_index = current_focus self.update_selected_chat() except Exception as e: print(f"Ошибка обновления чатов: {e}") def update_selected_chat(self): """Обновляет выделение выбранного чата""" try: for i, chat in enumerate(self.chat_walker): was_selected = chat.is_selected chat.is_selected = (i == self.selected_chat_index) if was_selected != chat.is_selected: chat.update_widget() except Exception as e: print(f"Ошибка обновления выделения: {e}") async def update_message_list(self, chat_id): """Обновляет список сообщений""" try: if not chat_id: self.message_walker[:] = [] return # Сохраняем текущий фокус current_focus = self.message_list.focus_position if self.message_walker else None # Получаем сообщения messages = await self.telegram_client.get_messages( entity=chat_id, limit=30 # Уменьшаем лимит для стабильности ) # Получаем информацию о себе me = await self.telegram_client.get_me() # Сохраняем отслеживаемые сообщения tracked_messages = { msg_id: widget for msg_id, widget in self.pending_messages.items() } # Сохраняем старые сообщения для сравнения old_messages = {msg.message_id: msg for msg in self.message_walker} # Очищаем список self.message_walker[:] = [] # Добавляем сообщения for msg in reversed(messages): try: # Проверяем, было ли это сообщение раньше old_message = old_messages.get(msg.id) if old_message and not msg.photo: # Переиспользуем существующий виджет для не-фото сообщений self.message_walker.append(old_message) continue is_me = False if hasattr(msg, 'from_id') and msg.from_id: if hasattr(msg.from_id, 'user_id'): is_me = msg.from_id.user_id == me.id text = msg.message if hasattr(msg, 'message') else "" media_data = None if hasattr(msg, 'photo') and msg.photo: try: photo_data = await self.telegram_client.download_media(msg.photo, bytes) if photo_data: media_data = self.ascii_cache.get_cached_art(photo_data) if not text: text = "[Фото]" except Exception as e: print(f"Ошибка загрузки фото: {e}") text = "[Ошибка загрузки фото]" username = "" if hasattr(msg, 'sender') and msg.sender: if hasattr(msg.sender, 'first_name'): username = msg.sender.first_name if hasattr(msg.sender, 'last_name') and msg.sender.last_name: username += f" {msg.sender.last_name}" elif hasattr(msg.sender, 'title'): username = msg.sender.title if not username: username = "Я" if is_me else "Неизвестный" status = "" if is_me: if msg.id in tracked_messages: status = tracked_messages[msg.id].status else: status = "✓✓" message = MessageWidget( message_id=msg.id, text=text, username=username, is_me=is_me, send_time=msg.date.strftime("%H:%M"), status=status, is_selected=(msg.id == self.selected_message), media_data=media_data ) if msg.id in tracked_messages: self.pending_messages[msg.id] = message self.message_walker.append(message) except Exception as e: print(f"Ошибка создания виджета сообщения: {e}") # Восстанавливаем фокус if current_focus is not None and current_focus < len(self.message_walker): self.message_list.set_focus(current_focus) elif self.message_walker: self.message_list.set_focus(len(self.message_walker) - 1) except Exception as e: print(f"Ошибка обновления сообщений: {e}") self.message_walker[:] = [] async def update_message_status(self, message): """Обновляет статус сообщения""" try: if message.id in self.pending_messages: widget = self.pending_messages[message.id] # Определяем статус if getattr(message, 'from_id', None): widget.status = "✓✓" # Доставлено else: widget.status = "✓" # Отправлено widget.update_widget() # Если сообщение доставлено, удаляем из отслеживания if widget.status == "✓✓": del self.pending_messages[message.id] except Exception as e: print(f"Ошибка обновления статуса: {e}") async def check_chat_permissions(self, chat_id): """Проверяет права на отправку сообщений в чате""" try: # Получаем информацию о чате chat = await self.telegram_client.get_entity(chat_id) # Проверяем, является ли чат каналом if hasattr(chat, 'broadcast') and chat.broadcast: # Для каналов проверяем права администратора participant = await self.telegram_client.get_permissions(chat) self.can_send_messages = participant.is_admin else: # Для всех остальных чатов разрешаем отправку self.can_send_messages = True # Обновляем видимость поля ввода self.update_input_visibility() except Exception as e: print(f"Ошибка проверки прав: {e}") # В случае ошибки разрешаем отправку для не-каналов self.can_send_messages = not (hasattr(chat, 'broadcast') and chat.broadcast) self.update_input_visibility() def update_input_visibility(self): """Обновляет видимость поля ввода""" if self.can_send_messages: if self.replying_to: self.input_edit = InputEdit(('header', f"Ответ на '{self.replying_to[:30]}...': ")) else: self.input_edit = InputEdit(('header', "Сообщение: ")) else: self.input_edit = InputEdit(('input_disabled', "Отправка сообщений недоступна")) self.input_edit.set_edit_text("") # Обновляем правую панель if len(self.right_panel.original_widget.widget_list) > 1: self.right_panel.original_widget.widget_list[-1] = self.input_edit async def handle_chat_input(self, key): """Обработка ввода в экране чатов""" try: if key == 'reply' and self.focused_element == "messages": # Получаем выбранное сообщение if self.message_walker and self.message_list.focus is not None: msg_widget = self.message_walker[self.message_list.focus_position] self.selected_message = msg_widget.message_id self.replying_to = msg_widget.text self.update_input_visibility() # Переключаемся на ввод self.focused_element = "input" self.right_panel.original_widget.focus_position = 1 return elif key == 'esc': if self.replying_to: # Отменяем ответ self.selected_message = None self.replying_to = None self.update_input_visibility() return if self.focused_element in ("input", "messages"): # Закрываем текущий чат self.current_chat_id = None self.message_walker[:] = [] self.input_edit.set_edit_text("") self.focused_element = "chat_list" self.chat_widget.focus_position = 0 self.left_panel.original_widget.focus_position = 2 elif self.focused_element == "search": self.search_edit.set_edit_text("") await self.update_chat_list() self.focused_element = "chat_list" self.left_panel.original_widget.focus_position = 2 elif key == 'enter': if self.focused_element == "chat_list" and self.chat_walker: try: focused = self.chat_walker[self.chat_list.focus_position] if focused.chat_id != self.current_chat_id: self.current_chat_id = focused.chat_id self.selected_chat_index = self.chat_list.focus_position # Проверяем права при открытии чата await self.check_chat_permissions(focused.chat_id) await self.update_message_list(focused.chat_id) self.focused_element = "input" self.chat_widget.focus_position = 1 self.right_panel.original_widget.focus_position = 1 self.last_message_update_time = 0 except Exception as e: print(f"Ошибка при открытии чата: {e}") elif self.focused_element == "input" and self.current_chat_id and self.can_send_messages: message = self.input_edit.get_edit_text() if message.strip(): try: # Приостанавливаем автообновление на время отправки self.last_update_time = datetime.datetime.now().timestamp() self.last_message_update_time = self.last_update_time # Создаем виджет сообщения до отправки now = datetime.datetime.now() msg_widget = MessageWidget( message_id=0, # Временный ID text=message, username="Я", is_me=True, send_time=now.strftime("%H:%M"), status="⋯" # Отправляется ) # Отправляем сообщение sent_message = await self.telegram_client.send_message( self.current_chat_id, message, reply_to=self.selected_message ) # Обновляем ID сообщения и добавляем в отслеживание msg_widget.message_id = sent_message.id self.pending_messages[sent_message.id] = msg_widget # Добавляем сообщение в список self.message_walker.append(msg_widget) self.message_list.set_focus(len(self.message_walker) - 1) # Очищаем поле ввода и сбрасываем ответ self.input_edit.set_edit_text("") self.selected_message = None self.replying_to = None self.update_input_visibility() except Exception as e: print(f"Ошибка отправки сообщения: {e}") elif key == 'tab': if self.focused_element == "search": self.focused_element = "chat_list" self.left_panel.original_widget.focus_position = 2 elif self.focused_element == "chat_list": if self.current_chat_id: self.focused_element = "messages" self.chat_widget.focus_position = 1 self.right_panel.original_widget.focus_position = 0 else: self.focused_element = "search" self.left_panel.original_widget.focus_position = 1 elif self.focused_element == "messages": self.focused_element = "input" self.right_panel.original_widget.focus_position = 1 elif self.focused_element == "input": self.focused_element = "search" self.chat_widget.focus_position = 0 self.left_panel.original_widget.focus_position = 1 elif key in ('up', 'down'): if self.focused_element == "chat_list" and self.chat_walker: if key == 'up': if self.chat_list.focus_position > 0: self.chat_list.focus_position -= 1 else: if self.chat_list.focus_position < len(self.chat_walker) - 1: self.chat_list.focus_position += 1 # Обновляем выделение self.selected_chat_index = self.chat_list.focus_position self.update_selected_chat() # Если чат открыт, обновляем его содержимое if self.current_chat_id: focused = self.chat_walker[self.selected_chat_index] if self.current_chat_id != focused.chat_id: self.current_chat_id = focused.chat_id await self.update_message_list(focused.chat_id) elif self.focused_element == "messages" and self.message_walker: if key == 'up': if self.message_list.focus_position > 0: self.message_list.focus_position -= 1 else: if self.message_list.focus_position < len(self.message_walker) - 1: self.message_list.focus_position += 1 except Exception as e: print(f"Ошибка обработки ввода: {e}") # Восстанавливаем состояние в случае ошибки self.focused_element = "chat_list" self.chat_widget.focus_position = 0 def unhandled_input(self, key): """Обработка необработанных нажатий клавиш""" if key in ('q', 'Q'): raise urwid.ExitMainLoop() # Создаем задачу для асинхронной обработки if self.current_screen == 'auth': asyncio.create_task(self.handle_auth(key)) else: asyncio.create_task(self.handle_chat_input(key)) async def start_auto_updates(self): """Запускает автоматическое обновление чатов и сообщений""" if self.chat_update_task: self.chat_update_task.cancel() if self.message_update_task: self.message_update_task.cancel() async def chat_update_loop(): while True: try: # Обновляем только если не в процессе отправки сообщения if not self.pending_messages: current_time = datetime.datetime.now().timestamp() if current_time - self.last_update_time >= self.update_interval: await self.update_chat_list() self.last_update_time = current_time await asyncio.sleep(1) except Exception as e: print(f"Ошибка в цикле обновления чатов: {e}") await asyncio.sleep(1) async def message_update_loop(): while True: try: if self.current_chat_id and not self.pending_messages: current_time = datetime.datetime.now().timestamp() if current_time - self.last_message_update_time >= self.message_update_interval: await self.update_message_list(self.current_chat_id) self.last_message_update_time = current_time await asyncio.sleep(0.5) except Exception as e: print(f"Ошибка в цикле обновления сообщений: {e}") await asyncio.sleep(0.5) self.chat_update_task = asyncio.create_task(chat_update_loop()) self.message_update_task = asyncio.create_task(message_update_loop()) async def stop_auto_updates(self): """Останавливает автоматическое обновление""" if self.chat_update_task: self.chat_update_task.cancel() self.chat_update_task = None if self.message_update_task: self.message_update_task.cancel() self.message_update_task = None async def run(self): """Запуск приложения""" try: # Подключаемся к Telegram await self.telegram_client.connect() print("Подключено к Telegram") # Проверяем авторизацию if await self.telegram_client.is_user_authorized(): self.switch_screen('chats') await self.update_chat_list() # Запускаем автообновление await self.start_auto_updates() else: self.switch_screen('auth') # Создаем event loop для urwid event_loop = urwid.AsyncioEventLoop(loop=asyncio.get_event_loop()) # Запускаем интерфейс urwid.MainLoop( self.main_widget, self.palette, event_loop=event_loop, unhandled_input=self.unhandled_input ).run() except Exception as e: print(f"Ошибка при запуске приложения: {e}") finally: # Останавливаем автообновление await self.stop_auto_updates() if self.telegram_client and self.telegram_client.is_connected(): await self.telegram_client.disconnect() print("Отключено от Telegram") async def main(): # Загружаем переменные окружения load_dotenv() # Проверяем наличие API ключей api_id = os.getenv("API_ID") api_hash = os.getenv("API_HASH") if not api_id or not api_hash: print("API_ID и API_HASH не найдены в .env файле.") print("Пожалуйста, скопируйте .env.example в .env и заполните свои ключи.") return # Преобразуем API_ID в число api_id = int(api_id) # Инициализируем клиент Telegram session_file = "talc.session" # Создаем клиент client = TelegramClient( session_file, api_id=api_id, api_hash=api_hash, system_version="macOS 14.3.1", device_model="MacBook", app_version="1.0" ) # Создаем и запускаем приложение app = TelegramTUI(client) await app.run() if __name__ == "__main__": try: asyncio.run(main()) except Exception as e: print(f"Ошибка при запуске приложения: {e}")