From 359ee436150fee3e2a22d4a29762f60b4b9e2c3b Mon Sep 17 00:00:00 2001 From: kirill Date: Mon, 26 May 2025 02:31:51 +0300 Subject: [PATCH] =?UTF-8?q?=D0=9A=D1=83=D1=87=D0=B0=20=D0=B8=D0=B7=D0=BC?= =?UTF-8?q?=D0=B5=D0=BD=D0=B5=D0=BD=D0=B8=D0=B9,=20=D1=87=D0=B8=D1=82?= =?UTF-8?q?=D0=B0=D0=B9=D1=82=D0=B5=20=D1=84=D0=B0=D0=B9=D0=BB=D1=8B,=20?= =?UTF-8?q?=D0=B4=D1=83=D0=BC=D0=B0=D1=8E=20=D0=B4=D0=B0=D0=B6=D0=B5=20?= =?UTF-8?q?=D1=82=D0=B5=D0=BF=D0=B5=D1=80=D1=8C=20=D0=BF=D0=BE=D0=BA=D0=B0?= =?UTF-8?q?=D1=82=D0=B8=D1=82=20=D0=BD=D0=B0=20=D1=81=D0=BB=D0=B8=D0=B2=20?= =?UTF-8?q?=D0=B2=20main?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/app.py | 18 +---- src/screens.py | 127 ++++++---------------------------- src/widgets.py | 181 ++++++------------------------------------------- 3 files changed, 43 insertions(+), 283 deletions(-) diff --git a/src/app.py b/src/app.py index 50585dd..e7b896c 100644 --- a/src/app.py +++ b/src/app.py @@ -1,24 +1,15 @@ """Главный файл приложения""" import os -import sys from dotenv import load_dotenv from telethon import TelegramClient, events from textual.app import App -from rich.console import Console from src.screens import AuthScreen, ChatScreen -# Настройка консоли для корректной работы с Unicode -""" -console = Console(force_terminal=True, color_system="auto") -sys.stdout = console -""" -# спойлер: не помогло - load_dotenv() api_id = os.getenv("API_ID") -api_hash = os.getenv("API_HASH") +api_hash = os.getenv("API_HASH") if not api_id or not api_hash: raise ValueError( @@ -32,10 +23,6 @@ class TelegramTUI(App): """Класс приложения""" CSS_PATH = "style.tcss" - TITLE = "Telegram TUI" - - def __init__(self): - super().__init__() async def on_mount(self) -> None: self.telegram_client = TelegramClient("user", api_id, api_hash) @@ -54,6 +41,3 @@ class TelegramTUI(App): async def on_exit_app(self): await self.telegram_client.disconnect() return super()._on_exit_app() - -if __name__ == "__main__": - raise Exception("Запущен не тот файл. Запустите main.py.") diff --git a/src/screens.py b/src/screens.py index 27dd204..15a0a49 100644 --- a/src/screens.py +++ b/src/screens.py @@ -3,12 +3,9 @@ from textual.screen import Screen from textual.widgets import Label, Input, Footer, Static, ContentSwitcher from textual.containers import Vertical, Horizontal, VerticalScroll -from textual.events import Key from telethon.errors import SessionPasswordNeededError from telethon import TelegramClient, events -from src.widgets import Dialog, Chat, normalize_text -from textual import log -from textual.keys import Keys, _character_to_key +from src.widgets import Dialog, Chat class AuthScreen(Screen): """Класс экрана логина в аккаунт""" @@ -28,11 +25,11 @@ class AuthScreen(Screen): def compose(self): with Vertical(id="auth_container"): - yield Label(normalize_text("Добро пожаловать в Telegram TUI")) - yield Input(placeholder=normalize_text("Номер телефона"), id="phone") - yield Input(placeholder=normalize_text("Код"), id="code", disabled=True) + yield Label("Добро пожаловать в Telegram TUI") + yield Input(placeholder="Номер телефона", id="phone") + yield Input(placeholder="Код", id="code", disabled=True) yield Input( - placeholder=normalize_text("Пароль"), + placeholder="Пароль", id="password", password=True, disabled=True @@ -40,13 +37,13 @@ class AuthScreen(Screen): async def on_input_submitted(self, event: Input.Submitted) -> None: if event.input.id == "phone": - self.phone = normalize_text(event.value) + self.phone = event.value self.ac.query_one("#phone").disabled = True self.ac.query_one("#code").disabled = False await self.client.send_code_request(phone=self.phone) elif event.input.id == "code": try: - self.code = normalize_text(event.value) + self.code = event.value self.ac.query_one("#code").disabled = True await self.client.sign_in(phone=self.phone, code=self.code) self.app.pop_screen() @@ -55,7 +52,7 @@ class AuthScreen(Screen): self.ac.query_one("#code").disabled = True self.ac.query_one("#password").disabled = False elif event.input.id == "password": - self.password = normalize_text(event.value) + self.password = event.value await self.client.sign_in(password=self.password) await self.client.start() self.app.pop_screen() @@ -63,13 +60,6 @@ class AuthScreen(Screen): class ChatScreen(Screen): """Класс экрана чатов, он же основной экран приложения""" - - BINDINGS = [ - (Keys.Tab, "log(\"Нажат таб\")", "Переключение фокуса"), - (Keys.Enter, "log(\"Нажат энтер\")", "Открыть"), - (Keys.Escape, "log(\"Нажат эскейп\")", "Назад"), - (_character_to_key("/"), "log(\"Нажат слэш\")", "Поиск") - ] def __init__( self, @@ -80,10 +70,6 @@ class ChatScreen(Screen): ): super().__init__(name, id, classes) self.telegram_client = telegram_client - self.search_query = "" - self.selected_chat_index = 0 - self.chats = [] - self.focused_element = "search" # search, chat_list, dialog async def on_mount(self): self.limit = 100 @@ -92,10 +78,8 @@ class ChatScreen(Screen): .query_one("#main_container")\ .query_one("#chats")\ .query_one("#chat_container") - - self.search_input = self.query_one("#search_input") - log("Первоначальная загрузка виджетов чатов...") + print("Первоначальная загрузка виджетов чатов...") self.mount_chats( len( await self.telegram_client.get_dialogs( @@ -103,12 +87,12 @@ class ChatScreen(Screen): ) ) ) - log("Первоначальная загрузка виджетов чата завершена") + print("Первоначальная загрузка виджетов чата завершена") self.is_chat_update_blocked = False await self.update_chat_list() - log("Первоначальная загрузка чатов завершена") + print("Первоначальная загрузка чатов завершена") for event in ( events.NewMessage, @@ -118,7 +102,7 @@ class ChatScreen(Screen): self.telegram_client.on(event())(self.update_chat_list) def mount_chats(self, limit: int): - log("Загрузка виджетов чатов...") + print("Загрузка виджетов чатов...") chats_amount = len(self.chat_container.query(Chat)) @@ -130,10 +114,10 @@ class ChatScreen(Screen): for i in range(chats_amount - limit): self.chat_container.query(Chat).last().remove() - log("Виджеты чатов загружены") + print("Виджеты чатов загружены") async def update_chat_list(self, event = None): - log("Запрос обновления чатов") + print("Запрос обновления чатов") if not self.is_chat_update_blocked: self.is_chat_update_blocked = True @@ -141,96 +125,27 @@ class ChatScreen(Screen): dialogs = await self.telegram_client.get_dialogs( limit=self.limit, archived=False ) - log("Получены диалоги") - - # Фильтруем диалоги по поисковому запросу - if self.search_query: - dialogs = [ - d for d in dialogs - if self.search_query.lower() in \ - normalize_text(d.name).lower() - ] + print("Получены диалоги") limit = len(dialogs) self.mount_chats(limit) for i in range(limit): chat = self.chat_container.query_one(f"#chat-{i + 1}") - chat.username = normalize_text(str(dialogs[i].name)) - chat.msg = normalize_text(str(dialogs[i].message.message)) + chat.username = str(dialogs[i].name) + chat.msg = str(dialogs[i].message.message) chat.peer_id = dialogs[i].id - chat.is_selected = (i == self.selected_chat_index) - chat.is_focused = (self.focused_element == "chat_list" and \ - i == self.selected_chat_index) self.is_chat_update_blocked = False - log("Чаты обновлены") + print("Чаты обновлены") else: - log("Обновление чатов невозможно: уже выполняется") - - def on_input_changed(self, event: Input.Changed) -> None: - if event.input.id == "search_input": - self.search_query = normalize_text(event.value) - self.selected_chat_index = 0 - self.update_chat_list() - - def on_key(self, event: Key) -> None: - if event.key == Keys.Tab: - # Переключаем фокус между элементами - if self.focused_element == "search": - self.focused_element = "chat_list" - self.search_input.blur() - self.update_chat_list() - elif self.focused_element == "chat_list": - self.focused_element = "search" - self.search_input.focus() - self.update_chat_list() - return - - if self.focused_element == "search": - return - - chats = self.chat_container.query(Chat) - if not chats: - return - - match event.key: - case Keys.Up: - self.selected_chat_index = max(0, self.selected_chat_index - 1) - for i, chat in enumerate(chats): - chat.is_selected = (i == self.selected_chat_index) - chat.is_focused = (i == self.selected_chat_index) - # Прокручиваем к выбранному чату - selected_chat = chats[self.selected_chat_index] - self.chat_container.scroll_to(selected_chat, animate=False) - case Keys.Down: - self.selected_chat_index = min(len(chats) - 1, self.selected_chat_index + 1) - for i, chat in enumerate(chats): - chat.is_selected = (i == self.selected_chat_index) - chat.is_focused = (i == self.selected_chat_index) - # Прокручиваем к выбранному чату - selected_chat = chats[self.selected_chat_index] - self.chat_container.scroll_to(selected_chat, animate=False) - case Keys.Enter: - chats[self.selected_chat_index].on_click() - case Keys.Escape: - # Возвращаемся к списку чатов - self.app.pop_screen() - self.app.push_screen("chats") - case "/": #Не работает: нужен кейкод слэша - # Фокус на поиск - self.focused_element = "search" - self.search_input.focus() - self.update_chat_list() + print("Обновление чатов невозможно: уже выполняется") def compose(self): yield Footer() with Horizontal(id="main_container"): - with Vertical(id="chats"): - yield Input(placeholder=normalize_text("Поиск чатов..."), id="search_input") + with Horizontal(id="chats"): yield VerticalScroll(id="chat_container") + #TODO: сделать кнопку чтобы прогрузить больше чатов yield ContentSwitcher(id="dialog_switcher") #yield Dialog(telegram_client=self.telegram_client) - -if __name__ == "__main__": - raise Exception("Запущен не тот файл. Запустите main.py.") diff --git a/src/widgets.py b/src/widgets.py index 034652a..f8926fe 100644 --- a/src/widgets.py +++ b/src/widgets.py @@ -7,74 +7,6 @@ from textual.widgets import Input, Button, Label, Static, ContentSwitcher from textual.app import ComposeResult, RenderResult from telethon import TelegramClient, events, utils import datetime -import unicodedata -import re -import emoji -import os -import tempfile -from PIL import Image -#import pywhatkit as kit -from textual import log -from warnings import deprecated - -def remove_emoji(text: str) -> str: - """Удаляет эмодзи из текста""" - if not text: - return "" - return emoji.replace_emoji(text, '') - -def normalize_text(text: str) -> str: - """Нормализует текст для корректного отображения""" - if not text: - return "" - # Удаляем эмодзи - text = remove_emoji(text) - # Удаляем все управляющие символы - text = ''\ - .join(char for char in text if unicodedata.category(char)[0] != 'C') - # Нормализуем Unicode - text = unicodedata.normalize('NFKC', text) - # Заменяем специальные символы на их ASCII-эквиваленты - text = text.replace('—', '-').replace('–', '-') - # Удаляем все непечатаемые символы - text = ''.join(char for char in text if char.isprintable()) - return text - -def safe_ascii(text: str) -> str: - """Преобразует текст в безопасный ASCII-формат""" - if not text: - return "" - # Удаляем эмодзи - text = remove_emoji(text) - # Оставляем только ASCII символы и пробелы - return ''.join(char for char in text if ord(char) < 128 or char.isspace()) - -@deprecated("Не работает на моём компьютере.") -def convert_image_to_ascii(image_path: str, width: int = 50) -> str: - """Конвертирует изображение в ASCII-арт""" - try: - # Открываем изображение - img = Image.open(image_path) - - # Конвертируем в RGB если нужно - if img.mode != 'RGB': - img = img.convert('RGB') - - # Изменяем размер, сохраняя пропорции - aspect_ratio = img.height / img.width - height = int(width * aspect_ratio * 0.5) # * 0.5 потому что символы выше чем шире - img = img.resize((width, height)) - - # Конвертируем в ASCII - ascii_str = kit.image_to_ascii_art(image_path, output_file=None) - - # Очищаем временный файл - os.remove(image_path) - - return ascii_str - except Exception as e: - log(f"Ошибка конвертации изображения: {e}") - return "Ошибка загрузки изображения" class Chat(Widget): """Класс виджета чата для панели чатов""" @@ -82,8 +14,6 @@ class Chat(Widget): username: Reactive[str] = Reactive(" ", recompose=True) msg: Reactive[str] = Reactive(" ", recompose=True) peer_id: Reactive[int] = Reactive(0) - is_selected: Reactive[bool] = Reactive(False) - is_focused: Reactive[bool] = Reactive(False) def __init__( self, @@ -103,51 +33,29 @@ class Chat(Widget): self.switcher = self.screen.query_one(Horizontal).query_one("#dialog_switcher", ContentSwitcher) def on_click(self) -> None: - # Снимаем выделение со всех чатов - for chat in self.screen.query(Chat): - chat.is_selected = False - chat.is_focused = False - - # Выделяем текущий чат - self.is_selected = True - self.is_focused = True - dialog_id = f"dialog-{str(self.peer_id)}" + print("click 1") try: self.switcher.mount(Dialog( telegram_client=self.app.telegram_client, chat_id=self.peer_id, id=dialog_id )) + print("click 1.1") except: - pass + print("click 1.2") + print("click 2") self.switcher.current = dialog_id self.switcher.recompose() + print("click 3") def compose(self) -> ComposeResult: - with Horizontal(classes="chat-item"): - """ - # Используем ASCII-символы для рамки - yield Label( - f"┌───┐\n│ {normalize_text( - self.username[:1].upper() - ):1} │\n└───┘" - ) - with Vertical(): - yield Label(normalize_text(self.username), id="name") - yield Label(normalize_text(self.msg), id="last_msg") - """ - yield Label(f"┌───┐\n│ {self.username[:1].upper():1} │\n└───┘") + with Horizontal(): + yield Label(f"┌───┐\n│ {self.username[:1]:1} │\n└───┘") with Vertical(): yield Label(self.username, id="name") yield Label(self.msg, id="last_msg") - def on_mouse_enter(self) -> None: - self.add_class("hover") - - def on_mouse_leave(self) -> None: - self.remove_class("hover") - class Dialog(Widget): """Класс окна диалога""" @@ -163,16 +71,12 @@ class Dialog(Widget): self.telegram_client = telegram_client self.chat_id = chat_id self.is_msg_update_blocked = False - self.messages_loaded = 0 - self.is_loading = False async def on_mount(self) -> None: - self.limit = 50 # Увеличиваем начальное количество сообщений - self.messages_loaded = self.limit + self.limit = 10 self.msg_input = self.query_one("#msg_input") self.dialog = self.query_one(Vertical).query_one("#dialog") - self.load_more_btn = self.query_one("#load_more") self.me = await self.telegram_client.get_me() @@ -204,7 +108,7 @@ class Dialog(Widget): self.dialog.query(Message).last().remove() async def update_dialog(self, event = None) -> None: - log("Запрос обновления сообщений") + print("Запрос обновления сообщений") if not self.is_msg_update_blocked: self.is_msg_update_blocked = True @@ -212,7 +116,7 @@ class Dialog(Widget): messages = await self.telegram_client.get_messages( entity=self.chat_id, limit=self.limit ) - log("Получены сообщения") + print("Получены сообщения") limit = len(messages) self.mount_messages(limit) @@ -220,76 +124,36 @@ class Dialog(Widget): for i in range(limit): msg = self.dialog.query_one(f"#msg-{i + 1}") msg.message = "" - - # Обрабатываем изображения - if messages[i].media: - try: - # Скачиваем изображение - image_data = await self.telegram_client.download_media( - messages[i].media, - bytes - ) - if image_data: - await msg.set_image(image_data) - except Exception as e: - log(f"Ошибка загрузки изображения: {e}") - - # Обрабатываем текст if str(messages[i].message): - msg.message = normalize_text(str(messages[i].message)) + msg.message = str(messages[i].message) + #TODO: завести это: try: is_me = messages[i].from_id.user_id == self.me.id except: is_me = False msg.is_me = is_me - msg.username = normalize_text(utils.get_display_name(messages[i].sender)) + msg.username = utils.get_display_name(messages[i].sender) msg.send_time = messages[i]\ .date\ .astimezone(datetime.timezone.utc)\ .strftime("%H:%M") self.is_msg_update_blocked = False - log("Сообщения обновлены") + print("Сообщения обновлены") else: - log("Обновление сообщений невозможно: уже выполняется") - - async def load_more_messages(self) -> None: - if not self.is_loading: - self.is_loading = True - self.load_more_btn.disabled = True - self.load_more_btn.label = "Загрузка..." - - try: - messages = await self.telegram_client.get_messages( - entity=self.chat_id, - limit=self.limit, - offset_id=self.messages_loaded - ) - - if messages: - self.messages_loaded += len(messages) - self.mount_messages(self.messages_loaded) - await self.update_dialog() - finally: - self.is_loading = False - self.load_more_btn.disabled = False - self.load_more_btn.label = "Загрузить еще" + print("Обновление сообщений невозможно: уже выполняется") def compose(self) -> ComposeResult: with Vertical(): - yield Button("Загрузить еще", id="load_more", variant="default") yield VerticalScroll(id="dialog") with Horizontal(id="input_place"): yield Input(placeholder="Сообщение", id="msg_input") - yield Button(label=">", id="send", variant="primary") + yield Button(label="➤", id="send", variant="primary") - async def on_button_pressed(self, event) -> None: - if event.button.id == "load_more": - await self.load_more_messages() - else: - await self.send_message() + async def on_button_pressed(self, event = None) -> None: + await self.send_message() async def on_input_submitted(self, event = None) -> None: await self.send_message() @@ -298,7 +162,7 @@ class Dialog(Widget): try: await self.telegram_client.send_message( self.chat_id, - normalize_text(str(self.msg_input.value)) + str(self.msg_input.value) ) except ValueError: self.app.notify("Ошибка отправки") @@ -326,8 +190,8 @@ class Message(Widget): pass def compose(self) -> ComposeResult: - static = Static(normalize_text(self.message)) - static.border_title = normalize_text(self.username) * (not self.is_me) + static = Static(self.message) + static.border_title = self.username * (not self.is_me) static.border_subtitle = self.send_time with Container(): @@ -337,6 +201,3 @@ class Message(Widget): self.classes = "is_me_true" else: self.classes = "is_me_false" - -if __name__ == "__main__": - raise Exception("Запущен не тот файл. Запустите main.py.")