Куча изменений, читайте файлы, думаю даже теперь покатит на слив в main

This commit is contained in:
kirill 2025-05-26 02:31:51 +03:00
parent e523fce7bd
commit 359ee43615
3 changed files with 43 additions and 283 deletions

View File

@ -1,24 +1,15 @@
"""Главный файл приложения""" """Главный файл приложения"""
import os import os
import sys
from dotenv import load_dotenv from dotenv import load_dotenv
from telethon import TelegramClient, events from telethon import TelegramClient, events
from textual.app import App from textual.app import App
from rich.console import Console
from src.screens import AuthScreen, ChatScreen from src.screens import AuthScreen, ChatScreen
# Настройка консоли для корректной работы с Unicode
"""
console = Console(force_terminal=True, color_system="auto")
sys.stdout = console
"""
# спойлер: не помогло
load_dotenv() load_dotenv()
api_id = os.getenv("API_ID") api_id = os.getenv("API_ID")
api_hash = os.getenv("API_HASH") api_hash = os.getenv("API_HASH")
if not api_id or not api_hash: if not api_id or not api_hash:
raise ValueError( raise ValueError(
@ -32,10 +23,6 @@ class TelegramTUI(App):
"""Класс приложения""" """Класс приложения"""
CSS_PATH = "style.tcss" CSS_PATH = "style.tcss"
TITLE = "Telegram TUI"
def __init__(self):
super().__init__()
async def on_mount(self) -> None: async def on_mount(self) -> None:
self.telegram_client = TelegramClient("user", api_id, api_hash) self.telegram_client = TelegramClient("user", api_id, api_hash)
@ -54,6 +41,3 @@ class TelegramTUI(App):
async def on_exit_app(self): async def on_exit_app(self):
await self.telegram_client.disconnect() await self.telegram_client.disconnect()
return super()._on_exit_app() return super()._on_exit_app()
if __name__ == "__main__":
raise Exception("Запущен не тот файл. Запустите main.py.")

View File

@ -3,12 +3,9 @@
from textual.screen import Screen from textual.screen import Screen
from textual.widgets import Label, Input, Footer, Static, ContentSwitcher from textual.widgets import Label, Input, Footer, Static, ContentSwitcher
from textual.containers import Vertical, Horizontal, VerticalScroll from textual.containers import Vertical, Horizontal, VerticalScroll
from textual.events import Key
from telethon.errors import SessionPasswordNeededError from telethon.errors import SessionPasswordNeededError
from telethon import TelegramClient, events from telethon import TelegramClient, events
from src.widgets import Dialog, Chat, normalize_text from src.widgets import Dialog, Chat
from textual import log
from textual.keys import Keys, _character_to_key
class AuthScreen(Screen): class AuthScreen(Screen):
"""Класс экрана логина в аккаунт""" """Класс экрана логина в аккаунт"""
@ -28,11 +25,11 @@ class AuthScreen(Screen):
def compose(self): def compose(self):
with Vertical(id="auth_container"): with Vertical(id="auth_container"):
yield Label(normalize_text("Добро пожаловать в Telegram TUI")) yield Label("Добро пожаловать в Telegram TUI")
yield Input(placeholder=normalize_text("Номер телефона"), id="phone") yield Input(placeholder="Номер телефона", id="phone")
yield Input(placeholder=normalize_text("Код"), id="code", disabled=True) yield Input(placeholder="Код", id="code", disabled=True)
yield Input( yield Input(
placeholder=normalize_text("Пароль"), placeholder="Пароль",
id="password", id="password",
password=True, password=True,
disabled=True disabled=True
@ -40,13 +37,13 @@ class AuthScreen(Screen):
async def on_input_submitted(self, event: Input.Submitted) -> None: async def on_input_submitted(self, event: Input.Submitted) -> None:
if event.input.id == "phone": if event.input.id == "phone":
self.phone = normalize_text(event.value) self.phone = event.value
self.ac.query_one("#phone").disabled = True self.ac.query_one("#phone").disabled = True
self.ac.query_one("#code").disabled = False self.ac.query_one("#code").disabled = False
await self.client.send_code_request(phone=self.phone) await self.client.send_code_request(phone=self.phone)
elif event.input.id == "code": elif event.input.id == "code":
try: try:
self.code = normalize_text(event.value) self.code = event.value
self.ac.query_one("#code").disabled = True self.ac.query_one("#code").disabled = True
await self.client.sign_in(phone=self.phone, code=self.code) await self.client.sign_in(phone=self.phone, code=self.code)
self.app.pop_screen() self.app.pop_screen()
@ -55,7 +52,7 @@ class AuthScreen(Screen):
self.ac.query_one("#code").disabled = True self.ac.query_one("#code").disabled = True
self.ac.query_one("#password").disabled = False self.ac.query_one("#password").disabled = False
elif event.input.id == "password": elif event.input.id == "password":
self.password = normalize_text(event.value) self.password = event.value
await self.client.sign_in(password=self.password) await self.client.sign_in(password=self.password)
await self.client.start() await self.client.start()
self.app.pop_screen() self.app.pop_screen()
@ -63,13 +60,6 @@ class AuthScreen(Screen):
class ChatScreen(Screen): class ChatScreen(Screen):
"""Класс экрана чатов, он же основной экран приложения""" """Класс экрана чатов, он же основной экран приложения"""
BINDINGS = [
(Keys.Tab, "log(\"Нажат таб\")", "Переключение фокуса"),
(Keys.Enter, "log(\"Нажат энтер\")", "Открыть"),
(Keys.Escape, "log(\"Нажат эскейп\")", "Назад"),
(_character_to_key("/"), "log(\"Нажат слэш\")", "Поиск")
]
def __init__( def __init__(
self, self,
@ -80,10 +70,6 @@ class ChatScreen(Screen):
): ):
super().__init__(name, id, classes) super().__init__(name, id, classes)
self.telegram_client = telegram_client self.telegram_client = telegram_client
self.search_query = ""
self.selected_chat_index = 0
self.chats = []
self.focused_element = "search" # search, chat_list, dialog
async def on_mount(self): async def on_mount(self):
self.limit = 100 self.limit = 100
@ -92,10 +78,8 @@ class ChatScreen(Screen):
.query_one("#main_container")\ .query_one("#main_container")\
.query_one("#chats")\ .query_one("#chats")\
.query_one("#chat_container") .query_one("#chat_container")
self.search_input = self.query_one("#search_input")
log("Первоначальная загрузка виджетов чатов...") print("Первоначальная загрузка виджетов чатов...")
self.mount_chats( self.mount_chats(
len( len(
await self.telegram_client.get_dialogs( await self.telegram_client.get_dialogs(
@ -103,12 +87,12 @@ class ChatScreen(Screen):
) )
) )
) )
log("Первоначальная загрузка виджетов чата завершена") print("Первоначальная загрузка виджетов чата завершена")
self.is_chat_update_blocked = False self.is_chat_update_blocked = False
await self.update_chat_list() await self.update_chat_list()
log("Первоначальная загрузка чатов завершена") print("Первоначальная загрузка чатов завершена")
for event in ( for event in (
events.NewMessage, events.NewMessage,
@ -118,7 +102,7 @@ class ChatScreen(Screen):
self.telegram_client.on(event())(self.update_chat_list) self.telegram_client.on(event())(self.update_chat_list)
def mount_chats(self, limit: int): def mount_chats(self, limit: int):
log("Загрузка виджетов чатов...") print("Загрузка виджетов чатов...")
chats_amount = len(self.chat_container.query(Chat)) chats_amount = len(self.chat_container.query(Chat))
@ -130,10 +114,10 @@ class ChatScreen(Screen):
for i in range(chats_amount - limit): for i in range(chats_amount - limit):
self.chat_container.query(Chat).last().remove() self.chat_container.query(Chat).last().remove()
log("Виджеты чатов загружены") print("Виджеты чатов загружены")
async def update_chat_list(self, event = None): async def update_chat_list(self, event = None):
log("Запрос обновления чатов") print("Запрос обновления чатов")
if not self.is_chat_update_blocked: if not self.is_chat_update_blocked:
self.is_chat_update_blocked = True self.is_chat_update_blocked = True
@ -141,96 +125,27 @@ class ChatScreen(Screen):
dialogs = await self.telegram_client.get_dialogs( dialogs = await self.telegram_client.get_dialogs(
limit=self.limit, archived=False limit=self.limit, archived=False
) )
log("Получены диалоги") print("Получены диалоги")
# Фильтруем диалоги по поисковому запросу
if self.search_query:
dialogs = [
d for d in dialogs
if self.search_query.lower() in \
normalize_text(d.name).lower()
]
limit = len(dialogs) limit = len(dialogs)
self.mount_chats(limit) self.mount_chats(limit)
for i in range(limit): for i in range(limit):
chat = self.chat_container.query_one(f"#chat-{i + 1}") chat = self.chat_container.query_one(f"#chat-{i + 1}")
chat.username = normalize_text(str(dialogs[i].name)) chat.username = str(dialogs[i].name)
chat.msg = normalize_text(str(dialogs[i].message.message)) chat.msg = str(dialogs[i].message.message)
chat.peer_id = dialogs[i].id chat.peer_id = dialogs[i].id
chat.is_selected = (i == self.selected_chat_index)
chat.is_focused = (self.focused_element == "chat_list" and \
i == self.selected_chat_index)
self.is_chat_update_blocked = False self.is_chat_update_blocked = False
log("Чаты обновлены") print("Чаты обновлены")
else: else:
log("Обновление чатов невозможно: уже выполняется") print("Обновление чатов невозможно: уже выполняется")
def on_input_changed(self, event: Input.Changed) -> None:
if event.input.id == "search_input":
self.search_query = normalize_text(event.value)
self.selected_chat_index = 0
self.update_chat_list()
def on_key(self, event: Key) -> None:
if event.key == Keys.Tab:
# Переключаем фокус между элементами
if self.focused_element == "search":
self.focused_element = "chat_list"
self.search_input.blur()
self.update_chat_list()
elif self.focused_element == "chat_list":
self.focused_element = "search"
self.search_input.focus()
self.update_chat_list()
return
if self.focused_element == "search":
return
chats = self.chat_container.query(Chat)
if not chats:
return
match event.key:
case Keys.Up:
self.selected_chat_index = max(0, self.selected_chat_index - 1)
for i, chat in enumerate(chats):
chat.is_selected = (i == self.selected_chat_index)
chat.is_focused = (i == self.selected_chat_index)
# Прокручиваем к выбранному чату
selected_chat = chats[self.selected_chat_index]
self.chat_container.scroll_to(selected_chat, animate=False)
case Keys.Down:
self.selected_chat_index = min(len(chats) - 1, self.selected_chat_index + 1)
for i, chat in enumerate(chats):
chat.is_selected = (i == self.selected_chat_index)
chat.is_focused = (i == self.selected_chat_index)
# Прокручиваем к выбранному чату
selected_chat = chats[self.selected_chat_index]
self.chat_container.scroll_to(selected_chat, animate=False)
case Keys.Enter:
chats[self.selected_chat_index].on_click()
case Keys.Escape:
# Возвращаемся к списку чатов
self.app.pop_screen()
self.app.push_screen("chats")
case "/": #Не работает: нужен кейкод слэша
# Фокус на поиск
self.focused_element = "search"
self.search_input.focus()
self.update_chat_list()
def compose(self): def compose(self):
yield Footer() yield Footer()
with Horizontal(id="main_container"): with Horizontal(id="main_container"):
with Vertical(id="chats"): with Horizontal(id="chats"):
yield Input(placeholder=normalize_text("Поиск чатов..."), id="search_input")
yield VerticalScroll(id="chat_container") yield VerticalScroll(id="chat_container")
#TODO: сделать кнопку чтобы прогрузить больше чатов
yield ContentSwitcher(id="dialog_switcher") yield ContentSwitcher(id="dialog_switcher")
#yield Dialog(telegram_client=self.telegram_client) #yield Dialog(telegram_client=self.telegram_client)
if __name__ == "__main__":
raise Exception("Запущен не тот файл. Запустите main.py.")

View File

@ -7,74 +7,6 @@ from textual.widgets import Input, Button, Label, Static, ContentSwitcher
from textual.app import ComposeResult, RenderResult from textual.app import ComposeResult, RenderResult
from telethon import TelegramClient, events, utils from telethon import TelegramClient, events, utils
import datetime import datetime
import unicodedata
import re
import emoji
import os
import tempfile
from PIL import Image
#import pywhatkit as kit
from textual import log
from warnings import deprecated
def remove_emoji(text: str) -> str:
"""Удаляет эмодзи из текста"""
if not text:
return ""
return emoji.replace_emoji(text, '')
def normalize_text(text: str) -> str:
"""Нормализует текст для корректного отображения"""
if not text:
return ""
# Удаляем эмодзи
text = remove_emoji(text)
# Удаляем все управляющие символы
text = ''\
.join(char for char in text if unicodedata.category(char)[0] != 'C')
# Нормализуем Unicode
text = unicodedata.normalize('NFKC', text)
# Заменяем специальные символы на их ASCII-эквиваленты
text = text.replace('', '-').replace('', '-')
# Удаляем все непечатаемые символы
text = ''.join(char for char in text if char.isprintable())
return text
def safe_ascii(text: str) -> str:
"""Преобразует текст в безопасный ASCII-формат"""
if not text:
return ""
# Удаляем эмодзи
text = remove_emoji(text)
# Оставляем только ASCII символы и пробелы
return ''.join(char for char in text if ord(char) < 128 or char.isspace())
@deprecated("Не работает на моём компьютере.")
def convert_image_to_ascii(image_path: str, width: int = 50) -> str:
"""Конвертирует изображение в ASCII-арт"""
try:
# Открываем изображение
img = Image.open(image_path)
# Конвертируем в RGB если нужно
if img.mode != 'RGB':
img = img.convert('RGB')
# Изменяем размер, сохраняя пропорции
aspect_ratio = img.height / img.width
height = int(width * aspect_ratio * 0.5) # * 0.5 потому что символы выше чем шире
img = img.resize((width, height))
# Конвертируем в ASCII
ascii_str = kit.image_to_ascii_art(image_path, output_file=None)
# Очищаем временный файл
os.remove(image_path)
return ascii_str
except Exception as e:
log(f"Ошибка конвертации изображения: {e}")
return "Ошибка загрузки изображения"
class Chat(Widget): class Chat(Widget):
"""Класс виджета чата для панели чатов""" """Класс виджета чата для панели чатов"""
@ -82,8 +14,6 @@ class Chat(Widget):
username: Reactive[str] = Reactive(" ", recompose=True) username: Reactive[str] = Reactive(" ", recompose=True)
msg: Reactive[str] = Reactive(" ", recompose=True) msg: Reactive[str] = Reactive(" ", recompose=True)
peer_id: Reactive[int] = Reactive(0) peer_id: Reactive[int] = Reactive(0)
is_selected: Reactive[bool] = Reactive(False)
is_focused: Reactive[bool] = Reactive(False)
def __init__( def __init__(
self, self,
@ -103,51 +33,29 @@ class Chat(Widget):
self.switcher = self.screen.query_one(Horizontal).query_one("#dialog_switcher", ContentSwitcher) self.switcher = self.screen.query_one(Horizontal).query_one("#dialog_switcher", ContentSwitcher)
def on_click(self) -> None: def on_click(self) -> None:
# Снимаем выделение со всех чатов
for chat in self.screen.query(Chat):
chat.is_selected = False
chat.is_focused = False
# Выделяем текущий чат
self.is_selected = True
self.is_focused = True
dialog_id = f"dialog-{str(self.peer_id)}" dialog_id = f"dialog-{str(self.peer_id)}"
print("click 1")
try: try:
self.switcher.mount(Dialog( self.switcher.mount(Dialog(
telegram_client=self.app.telegram_client, telegram_client=self.app.telegram_client,
chat_id=self.peer_id, chat_id=self.peer_id,
id=dialog_id id=dialog_id
)) ))
print("click 1.1")
except: except:
pass print("click 1.2")
print("click 2")
self.switcher.current = dialog_id self.switcher.current = dialog_id
self.switcher.recompose() self.switcher.recompose()
print("click 3")
def compose(self) -> ComposeResult: def compose(self) -> ComposeResult:
with Horizontal(classes="chat-item"): with Horizontal():
""" yield Label(f"┌───┐\n{self.username[:1]:1}\n└───┘")
# Используем ASCII-символы для рамки
yield Label(
f"┌───┐\n{normalize_text(
self.username[:1].upper()
):1} \n"
)
with Vertical():
yield Label(normalize_text(self.username), id="name")
yield Label(normalize_text(self.msg), id="last_msg")
"""
yield Label(f"┌───┐\n{self.username[:1].upper():1}\n└───┘")
with Vertical(): with Vertical():
yield Label(self.username, id="name") yield Label(self.username, id="name")
yield Label(self.msg, id="last_msg") yield Label(self.msg, id="last_msg")
def on_mouse_enter(self) -> None:
self.add_class("hover")
def on_mouse_leave(self) -> None:
self.remove_class("hover")
class Dialog(Widget): class Dialog(Widget):
"""Класс окна диалога""" """Класс окна диалога"""
@ -163,16 +71,12 @@ class Dialog(Widget):
self.telegram_client = telegram_client self.telegram_client = telegram_client
self.chat_id = chat_id self.chat_id = chat_id
self.is_msg_update_blocked = False self.is_msg_update_blocked = False
self.messages_loaded = 0
self.is_loading = False
async def on_mount(self) -> None: async def on_mount(self) -> None:
self.limit = 50 # Увеличиваем начальное количество сообщений self.limit = 10
self.messages_loaded = self.limit
self.msg_input = self.query_one("#msg_input") self.msg_input = self.query_one("#msg_input")
self.dialog = self.query_one(Vertical).query_one("#dialog") self.dialog = self.query_one(Vertical).query_one("#dialog")
self.load_more_btn = self.query_one("#load_more")
self.me = await self.telegram_client.get_me() self.me = await self.telegram_client.get_me()
@ -204,7 +108,7 @@ class Dialog(Widget):
self.dialog.query(Message).last().remove() self.dialog.query(Message).last().remove()
async def update_dialog(self, event = None) -> None: async def update_dialog(self, event = None) -> None:
log("Запрос обновления сообщений") print("Запрос обновления сообщений")
if not self.is_msg_update_blocked: if not self.is_msg_update_blocked:
self.is_msg_update_blocked = True self.is_msg_update_blocked = True
@ -212,7 +116,7 @@ class Dialog(Widget):
messages = await self.telegram_client.get_messages( messages = await self.telegram_client.get_messages(
entity=self.chat_id, limit=self.limit entity=self.chat_id, limit=self.limit
) )
log("Получены сообщения") print("Получены сообщения")
limit = len(messages) limit = len(messages)
self.mount_messages(limit) self.mount_messages(limit)
@ -220,76 +124,36 @@ class Dialog(Widget):
for i in range(limit): for i in range(limit):
msg = self.dialog.query_one(f"#msg-{i + 1}") msg = self.dialog.query_one(f"#msg-{i + 1}")
msg.message = "" msg.message = ""
# Обрабатываем изображения
if messages[i].media:
try:
# Скачиваем изображение
image_data = await self.telegram_client.download_media(
messages[i].media,
bytes
)
if image_data:
await msg.set_image(image_data)
except Exception as e:
log(f"Ошибка загрузки изображения: {e}")
# Обрабатываем текст
if str(messages[i].message): if str(messages[i].message):
msg.message = normalize_text(str(messages[i].message)) msg.message = str(messages[i].message)
#TODO: завести это:
try: try:
is_me = messages[i].from_id.user_id == self.me.id is_me = messages[i].from_id.user_id == self.me.id
except: except:
is_me = False is_me = False
msg.is_me = is_me msg.is_me = is_me
msg.username = normalize_text(utils.get_display_name(messages[i].sender)) msg.username = utils.get_display_name(messages[i].sender)
msg.send_time = messages[i]\ msg.send_time = messages[i]\
.date\ .date\
.astimezone(datetime.timezone.utc)\ .astimezone(datetime.timezone.utc)\
.strftime("%H:%M") .strftime("%H:%M")
self.is_msg_update_blocked = False self.is_msg_update_blocked = False
log("Сообщения обновлены") print("Сообщения обновлены")
else: else:
log("Обновление сообщений невозможно: уже выполняется") print("Обновление сообщений невозможно: уже выполняется")
async def load_more_messages(self) -> None:
if not self.is_loading:
self.is_loading = True
self.load_more_btn.disabled = True
self.load_more_btn.label = "Загрузка..."
try:
messages = await self.telegram_client.get_messages(
entity=self.chat_id,
limit=self.limit,
offset_id=self.messages_loaded
)
if messages:
self.messages_loaded += len(messages)
self.mount_messages(self.messages_loaded)
await self.update_dialog()
finally:
self.is_loading = False
self.load_more_btn.disabled = False
self.load_more_btn.label = "Загрузить еще"
def compose(self) -> ComposeResult: def compose(self) -> ComposeResult:
with Vertical(): with Vertical():
yield Button("Загрузить еще", id="load_more", variant="default")
yield VerticalScroll(id="dialog") yield VerticalScroll(id="dialog")
with Horizontal(id="input_place"): with Horizontal(id="input_place"):
yield Input(placeholder="Сообщение", id="msg_input") yield Input(placeholder="Сообщение", id="msg_input")
yield Button(label=">", id="send", variant="primary") yield Button(label="", id="send", variant="primary")
async def on_button_pressed(self, event) -> None: async def on_button_pressed(self, event = None) -> None:
if event.button.id == "load_more": await self.send_message()
await self.load_more_messages()
else:
await self.send_message()
async def on_input_submitted(self, event = None) -> None: async def on_input_submitted(self, event = None) -> None:
await self.send_message() await self.send_message()
@ -298,7 +162,7 @@ class Dialog(Widget):
try: try:
await self.telegram_client.send_message( await self.telegram_client.send_message(
self.chat_id, self.chat_id,
normalize_text(str(self.msg_input.value)) str(self.msg_input.value)
) )
except ValueError: except ValueError:
self.app.notify("Ошибка отправки") self.app.notify("Ошибка отправки")
@ -326,8 +190,8 @@ class Message(Widget):
pass pass
def compose(self) -> ComposeResult: def compose(self) -> ComposeResult:
static = Static(normalize_text(self.message)) static = Static(self.message)
static.border_title = normalize_text(self.username) * (not self.is_me) static.border_title = self.username * (not self.is_me)
static.border_subtitle = self.send_time static.border_subtitle = self.send_time
with Container(): with Container():
@ -337,6 +201,3 @@ class Message(Widget):
self.classes = "is_me_true" self.classes = "is_me_true"
else: else:
self.classes = "is_me_false" self.classes = "is_me_false"
if __name__ == "__main__":
raise Exception("Запущен не тот файл. Запустите main.py.")