Compare commits

...

2 Commits

8 changed files with 226 additions and 339 deletions

View File

@ -2,6 +2,14 @@
# Их использование крайне нежелательно! # Их использование крайне нежелательно!
# Получите свои API-ключи приложения на https://my.telegram.org/apps и вставьте их ниже. # Получите свои API-ключи приложения на https://my.telegram.org/apps и вставьте их ниже.
# Спасибо за понимание! # Спасибо за понимание!
#
# In EN: WARNING: it's official TG API-keys, please, don't use them, get yours via link above. Thanks!
API_ID=2040 API_ID=2040
API_HASH=b18441a1ff607e10a989891a5462e627 API_HASH=b18441a1ff607e10a989891a5462e627
# Конфиг
CURRENT_USER = "user"
DO_NOTIFY = False
UTC_OFFSET = 0
LANGUAGE = "en"

View File

@ -1,8 +1,8 @@
#!/usr/bin/python #!/usr/bin/python
"""Файл инициализации приложения""" """Файл инициализации приложения"""
from src.app import TelegramTUI from src.app import Talc
if __name__ == "__main__": if __name__ == "__main__":
tg = TelegramTUI() talc = Talc()
tg.run() talc.run()

View File

@ -1,6 +1,3 @@
textual textual
telethon telethon
python-dotenv python-dotenv
emoji
Pillow
pywhatkit

View File

@ -1,44 +1,33 @@
"""Главный файл приложения""" """Главный файл приложения"""
import os from os import getenv
import sys
from dotenv import load_dotenv from dotenv import load_dotenv
from telethon import TelegramClient, events from telethon import TelegramClient
from textual.app import App from textual.app import App
from rich.console import Console
from src.screens import AuthScreen, ChatScreen from src.screens import AuthScreen, ChatScreen
import src.locales
# Настройка консоли для корректной работы с Unicode
"""
console = Console(force_terminal=True, color_system="auto")
sys.stdout = console
"""
# спойлер: не помогло
load_dotenv() load_dotenv()
API_ID = getenv("API_ID")
API_HASH = getenv("API_HASH")
api_id = os.getenv("API_ID") if not API_ID or not API_HASH:
api_hash = os.getenv("API_HASH")
if not api_id or not api_hash:
raise ValueError( raise ValueError(
"API_ID и API_HASH не найдены в .env файле. " "API_ID и API_HASH не найдены в .env файле. "
"Пожалуйста, скопируйте .env.example в .env и заполните свои ключи." "Пожалуйста, скопируйте .env.example в .env и заполните свои ключи."
) )
api_id = int(api_id) API_ID = int(API_ID)
class TelegramTUI(App): #locale = locales.
class Talc(App):
"""Класс приложения""" """Класс приложения"""
CSS_PATH = "style.tcss" CSS_PATH = "style.tcss"
TITLE = "Telegram TUI"
def __init__(self):
super().__init__()
async def on_mount(self) -> None: async def on_mount(self) -> None:
self.telegram_client = TelegramClient("user", api_id, api_hash) self.telegram_client = TelegramClient(getenv("CURRENT_USER"), API_ID, API_HASH)
await self.telegram_client.connect() await self.telegram_client.connect()
chat_screen = ChatScreen(telegram_client=self.telegram_client) chat_screen = ChatScreen(telegram_client=self.telegram_client)
@ -51,9 +40,8 @@ class TelegramTUI(App):
else: else:
self.push_screen("chats") self.push_screen("chats")
self.scroll_sensitivity_y = 1.0
async def on_exit_app(self): async def on_exit_app(self):
await self.telegram_client.disconnect() await self.telegram_client.disconnect()
return super()._on_exit_app() return super()._on_exit_app()
if __name__ == "__main__":
raise Exception("Запущен не тот файл. Запустите main.py.")

2
src/locales.py Normal file
View File

@ -0,0 +1,2 @@
ru = {"greeting_auth": "Добро пожаловать в Тальк", "phone_number": "Номер телефона", "code": "Код", "password": "Пароль", "you": "Вы", "mention": "Вас упомянули"}
en = {"greeting_auth": "Welcome to Talc", "phone_number": "Phone number", "code": "Code", "password": "Password", "you": "You", "mention": "You got mentioned"}

View File

@ -3,12 +3,12 @@
from textual.screen import Screen from textual.screen import Screen
from textual.widgets import Label, Input, Footer, Static, ContentSwitcher from textual.widgets import Label, Input, Footer, Static, ContentSwitcher
from textual.containers import Vertical, Horizontal, VerticalScroll from textual.containers import Vertical, Horizontal, VerticalScroll
from textual.events import Key from textual.app import ComposeResult
from telethon.errors import SessionPasswordNeededError from telethon.errors import SessionPasswordNeededError
from telethon import TelegramClient, events from telethon import TelegramClient, events
from src.widgets import Dialog, Chat, normalize_text from src.widgets import Chat
from textual import log from os import system, getenv
from textual.keys import Keys, _character_to_key from telethon.utils import get_display_name
class AuthScreen(Screen): class AuthScreen(Screen):
"""Класс экрана логина в аккаунт""" """Класс экрана логина в аккаунт"""
@ -19,57 +19,51 @@ class AuthScreen(Screen):
id = None, id = None,
classes = None, classes = None,
telegram_client: TelegramClient | None = None telegram_client: TelegramClient | None = None
): ) -> None:
super().__init__(name, id, classes) super().__init__(name, id, classes)
self.client = telegram_client self.client = telegram_client
def on_mount(self): def on_mount(self) -> None:
self.ac = self.query_one("#auth_container") self.ac = self.query_one("#auth_container")
def compose(self): def compose(self) -> ComposeResult:
with Vertical(id="auth_container"): with Vertical(id="auth_container"):
yield Label(normalize_text("Добро пожаловать в Telegram TUI")) yield Label("Добро пожаловать в Talc")
yield Input(placeholder=normalize_text("Номер телефона"), id="phone") yield Input(placeholder="Номер телефона", id="phone")
yield Input(placeholder=normalize_text("Код"), id="code", disabled=True) yield Input(placeholder="Код", id="code", disabled=True)
yield Input( yield Input(
placeholder=normalize_text("Пароль"), placeholder="Пароль",
id="password", id="password",
password=True, password=True,
disabled=True disabled=True
) )
async def on_input_submitted(self, event: Input.Submitted) -> None: async def on_input_submitted(self, event: Input.Submitted) -> None:
if event.input.id == "phone": match event.input.id:
self.phone = normalize_text(event.value) case "phone":
self.ac.query_one("#phone").disabled = True self.phone = event.value
self.ac.query_one("#code").disabled = False self.ac.query_one("#phone").disabled = True
await self.client.send_code_request(phone=self.phone) self.ac.query_one("#code").disabled = False
elif event.input.id == "code": await self.client.send_code_request(phone=self.phone)
try: case "code":
self.code = normalize_text(event.value) try:
self.ac.query_one("#code").disabled = True self.code = event.value
await self.client.sign_in(phone=self.phone, code=self.code) self.ac.query_one("#code").disabled = True
await self.client.sign_in(phone=self.phone, code=self.code)
self.app.pop_screen()
self.app.push_screen("chats")
except SessionPasswordNeededError:
self.ac.query_one("#code").disabled = True
self.ac.query_one("#password").disabled = False
case "password":
self.password = event.value
await self.client.sign_in(password=self.password)
await self.client.start()
self.app.pop_screen() self.app.pop_screen()
self.app.push_screen("chats") self.app.push_screen("chats")
except SessionPasswordNeededError:
self.ac.query_one("#code").disabled = True
self.ac.query_one("#password").disabled = False
elif event.input.id == "password":
self.password = normalize_text(event.value)
await self.client.sign_in(password=self.password)
await self.client.start()
self.app.pop_screen()
self.app.push_screen("chats")
class ChatScreen(Screen): class ChatScreen(Screen):
"""Класс экрана чатов, он же основной экран приложения""" """Класс экрана чатов, он же основной экран приложения"""
BINDINGS = [
(Keys.Tab, "log(\"Нажат таб\")", "Переключение фокуса"),
(Keys.Enter, "log(\"Нажат энтер\")", "Открыть"),
(Keys.Escape, "log(\"Нажат эскейп\")", "Назад"),
(_character_to_key("/"), "log(\"Нажат слэш\")", "Поиск")
]
def __init__( def __init__(
self, self,
@ -77,25 +71,23 @@ class ChatScreen(Screen):
id = None, id = None,
classes = None, classes = None,
telegram_client: TelegramClient | None = None telegram_client: TelegramClient | None = None
): ) -> None:
super().__init__(name, id, classes) super().__init__(name, id, classes)
self.telegram_client = telegram_client self.telegram_client = telegram_client
self.search_query = "" self.DO_NOTIFY = getenv("DO_NOTIFY")
self.selected_chat_index = 0
self.chats = []
self.focused_element = "search" # search, chat_list, dialog
async def on_mount(self): async def on_mount(self) -> None:
self.limit = 100 self.limit = 100
#Получение ID пользователя (себя)
self.me_id = await self.telegram_client.get_peer_id("me")
# Получение объекта контейнера чатов
self.chat_container = self\ self.chat_container = self\
.query_one("#main_container")\ .query_one("#main_container")\
.query_one("#chats")\ .query_one("#chats")\
.query_one("#chat_container") .query_one("#chat_container")
self.search_input = self.query_one("#search_input")
log("Первоначальная загрузка виджетов чатов...") print("Первоначальная загрузка виджетов чатов...")
self.mount_chats( self.mount_chats(
len( len(
await self.telegram_client.get_dialogs( await self.telegram_client.get_dialogs(
@ -103,37 +95,44 @@ class ChatScreen(Screen):
) )
) )
) )
log("Первоначальная загрузка виджетов чата завершена") print("Первоначальная загрузка виджетов чата завершена")
self.is_chat_update_blocked = False self.is_chat_update_blocked = False
await self.update_chat_list() await self.update_chat_list()
print("Первоначальная загрузка чатов завершена")
log("Первоначальная загрузка чатов завершена") # Автообновление чатов при следующих событиях
for event in ( for event in (
events.NewMessage, events.NewMessage,
events.MessageDeleted, events.MessageDeleted,
events.MessageEdited events.MessageEdited
): ):
self.telegram_client.on(event())(self.update_chat_list) self.telegram_client.on(event())(self.update_chat_list)
self.telegram_client.on(events.NewMessage)(self.notify_send)
def mount_chats(self, limit: int): def mount_chats(self, limit: int) -> None:
log("Загрузка виджетов чатов...") """Функция маунта чатов"""
print("Загрузка виджетов чатов...")
# Счёт текущего количества примонтированных чатов
chats_amount = len(self.chat_container.query(Chat)) chats_amount = len(self.chat_container.query(Chat))
if limit > chats_amount: if limit > chats_amount:
# Маунт недостоющих, если чатов меньше, чем нужно
for i in range(limit - chats_amount): for i in range(limit - chats_amount):
chat = Chat(id=f"chat-{i + chats_amount + 1}") chat = Chat(id=f"chat-{i + chats_amount + 1}")
self.chat_container.mount(chat) self.chat_container.mount(chat)
elif limit < chats_amount: elif limit < chats_amount:
# Удаление лишних, если чатов больше, чем нужно
for i in range(chats_amount - limit): for i in range(chats_amount - limit):
self.chat_container.query(Chat).last().remove() self.chat_container.query(Chat).last().remove()
# Ничего, если их ровно столько же
log("Виджеты чатов загружены") print("Виджеты чатов загружены")
async def update_chat_list(self, event = None): async def update_chat_list(self, event = None) -> None:
log("Запрос обновления чатов") """Функция обновления чатов (и уведомления)"""
print("Запрос обновления чатов")
if not self.is_chat_update_blocked: if not self.is_chat_update_blocked:
self.is_chat_update_blocked = True self.is_chat_update_blocked = True
@ -141,96 +140,62 @@ class ChatScreen(Screen):
dialogs = await self.telegram_client.get_dialogs( dialogs = await self.telegram_client.get_dialogs(
limit=self.limit, archived=False limit=self.limit, archived=False
) )
log("Получены диалоги") print("Получены диалоги")
# Фильтруем диалоги по поисковому запросу
if self.search_query:
dialogs = [
d for d in dialogs
if self.search_query.lower() in \
normalize_text(d.name).lower()
]
# Маунт виджетов чатов в панели чатов по лимиту
limit = len(dialogs) limit = len(dialogs)
self.mount_chats(limit) self.mount_chats(limit)
# Изменение надписей в виджетах чатов
for i in range(limit): for i in range(limit):
chat = self.chat_container.query_one(f"#chat-{i + 1}") chat = self.chat_container.query_one(f"#chat-{i + 1}")
chat.username = normalize_text(str(dialogs[i].name))
chat.msg = normalize_text(str(dialogs[i].message.message)) chat.peername = str(dialogs[i].name)
chat.is_group = dialogs[i].is_group
chat.peer_id = dialogs[i].id chat.peer_id = dialogs[i].id
chat.is_selected = (i == self.selected_chat_index)
chat.is_focused = (self.focused_element == "chat_list" and \ try:
i == self.selected_chat_index) is_my_msg = \
dialogs[i].message.from_id.user_id == self.me_id
except:
is_my_msg = dialogs[i].id == self.me_id
if dialogs[i].is_group and is_my_msg:
chat.username = "Вы"
chat.msg = str(dialogs[i].message.message)
elif dialogs[i].is_group:
chat.username = str(
get_display_name(dialogs[i].message.sender)
)
chat.msg = str(dialogs[i].message.message)
elif is_my_msg:
chat.msg = "Вы: " * is_my_msg + str(
dialogs[i].message.message
)
else:
chat.msg = str(dialogs[i].message.message)
self.is_chat_update_blocked = False self.is_chat_update_blocked = False
log("Чаты обновлены") print("Чаты обновлены")
else: else:
log("Обновление чатов невозможно: уже выполняется") print("Обновление чатов невозможно: уже выполняется")
def on_input_changed(self, event: Input.Changed) -> None: def notify_send(self, event) -> None:
if event.input.id == "search_input": if not event:
self.search_query = normalize_text(event.value) return None
self.selected_chat_index = 0 if bool(self.DO_NOTIFY) and event.mentioned and not self.app.focused:
self.update_chat_list() system(f"notify-send \"Вас упомянули\" Talc")
def on_key(self, event: Key) -> None: def compose(self) -> ComposeResult:
if event.key == Keys.Tab: yield Footer() # Нижняя панель с подсказками
# Переключаем фокус между элементами with Horizontal(id="main_container"): # Основной контейнер
if self.focused_element == "search": with Horizontal(id="chats"):
self.focused_element = "chat_list"
self.search_input.blur()
self.update_chat_list()
elif self.focused_element == "chat_list":
self.focused_element = "search"
self.search_input.focus()
self.update_chat_list()
return
if self.focused_element == "search":
return
chats = self.chat_container.query(Chat)
if not chats:
return
match event.key:
case Keys.Up:
self.selected_chat_index = max(0, self.selected_chat_index - 1)
for i, chat in enumerate(chats):
chat.is_selected = (i == self.selected_chat_index)
chat.is_focused = (i == self.selected_chat_index)
# Прокручиваем к выбранному чату
selected_chat = chats[self.selected_chat_index]
self.chat_container.scroll_to(selected_chat, animate=False)
case Keys.Down:
self.selected_chat_index = min(len(chats) - 1, self.selected_chat_index + 1)
for i, chat in enumerate(chats):
chat.is_selected = (i == self.selected_chat_index)
chat.is_focused = (i == self.selected_chat_index)
# Прокручиваем к выбранному чату
selected_chat = chats[self.selected_chat_index]
self.chat_container.scroll_to(selected_chat, animate=False)
case Keys.Enter:
chats[self.selected_chat_index].on_click()
case Keys.Escape:
# Возвращаемся к списку чатов
self.app.pop_screen()
self.app.push_screen("chats")
case "/": #Не работает: нужен кейкод слэша
# Фокус на поиск
self.focused_element = "search"
self.search_input.focus()
self.update_chat_list()
def compose(self):
yield Footer()
with Horizontal(id="main_container"):
with Vertical(id="chats"):
yield Input(placeholder=normalize_text("Поиск чатов..."), id="search_input")
yield VerticalScroll(id="chat_container") yield VerticalScroll(id="chat_container")
yield ContentSwitcher(id="dialog_switcher") #TODO: сделать кнопку, чтобы прогрузить больше чатов,
#yield Dialog(telegram_client=self.telegram_client) # или ленивую прокрутку
with ContentSwitcher(id="dialog_switcher"):
if __name__ == "__main__": # ↑ Внутри него как раз крутятся диалоги
raise Exception("Запущен не тот файл. Запустите main.py.") yield Label(
"Нажмите на чат в панели слева, чтобы начать общаться",
id="begin_talk_label"
) #TODO: не показывается надпись, надо будет исправить

View File

@ -10,6 +10,10 @@ Chat {
height: 3; height: 3;
} }
Chat Horizontal Vertical Label{
height: 1;
}
Rule { Rule {
color: #FFFFFF; color: #FFFFFF;
} }
@ -42,12 +46,13 @@ Message Container {
align: right middle; align: right middle;
} }
.is_me_true Static { .is_me_true Container Label {
border: solid $primary; border: solid $primary;
width: auto; width: auto;
height: auto; height: auto;
text-align: right; text-align: left;
min-width: 11; min-width: 11;
max-width: 100%;
} }
.is_me_false Container { .is_me_false Container {
@ -55,10 +60,24 @@ Message Container {
align: left middle; align: left middle;
} }
.is_me_false Static { .is_me_false Container Label {
border: solid $foreground; border: solid $foreground;
width: auto; width: auto;
height: auto; height: auto;
text-align: left; text-align: left;
min-width: 11; min-width: 11;
max-width: 100%;
}
ContentSwitcher {
width: 100%;
height: 100%;
}
.begin_talk_label {
width: 100%;
height: 100%;
text-align: center;
content-align: center middle;
color: $panel;
} }

View File

@ -5,85 +5,19 @@ from textual.widget import Widget
from textual.reactive import Reactive from textual.reactive import Reactive
from textual.widgets import Input, Button, Label, Static, ContentSwitcher from textual.widgets import Input, Button, Label, Static, ContentSwitcher
from textual.app import ComposeResult, RenderResult from textual.app import ComposeResult, RenderResult
from telethon import TelegramClient, events, utils from textual.content import Content
from telethon import TelegramClient, events, utils, types
import datetime import datetime
import unicodedata from os import getenv
import re
import emoji
import os
import tempfile
from PIL import Image
#import pywhatkit as kit
from textual import log
from warnings import deprecated
def remove_emoji(text: str) -> str:
"""Удаляет эмодзи из текста"""
if not text:
return ""
return emoji.replace_emoji(text, '')
def normalize_text(text: str) -> str:
"""Нормализует текст для корректного отображения"""
if not text:
return ""
# Удаляем эмодзи
text = remove_emoji(text)
# Удаляем все управляющие символы
text = ''\
.join(char for char in text if unicodedata.category(char)[0] != 'C')
# Нормализуем Unicode
text = unicodedata.normalize('NFKC', text)
# Заменяем специальные символы на их ASCII-эквиваленты
text = text.replace('', '-').replace('', '-')
# Удаляем все непечатаемые символы
text = ''.join(char for char in text if char.isprintable())
return text
def safe_ascii(text: str) -> str:
"""Преобразует текст в безопасный ASCII-формат"""
if not text:
return ""
# Удаляем эмодзи
text = remove_emoji(text)
# Оставляем только ASCII символы и пробелы
return ''.join(char for char in text if ord(char) < 128 or char.isspace())
@deprecated("Не работает на моём компьютере.")
def convert_image_to_ascii(image_path: str, width: int = 50) -> str:
"""Конвертирует изображение в ASCII-арт"""
try:
# Открываем изображение
img = Image.open(image_path)
# Конвертируем в RGB если нужно
if img.mode != 'RGB':
img = img.convert('RGB')
# Изменяем размер, сохраняя пропорции
aspect_ratio = img.height / img.width
height = int(width * aspect_ratio * 0.5) # * 0.5 потому что символы выше чем шире
img = img.resize((width, height))
# Конвертируем в ASCII
ascii_str = kit.image_to_ascii_art(image_path, output_file=None)
# Очищаем временный файл
os.remove(image_path)
return ascii_str
except Exception as e:
log(f"Ошибка конвертации изображения: {e}")
return "Ошибка загрузки изображения"
class Chat(Widget): class Chat(Widget):
"""Класс виджета чата для панели чатов""" """Класс виджета чата для панели чатов"""
username: Reactive[str] = Reactive(" ", recompose=True) username: Reactive[str] = Reactive(" ", recompose=True)
peername: Reactive[str] = Reactive(" ", recompose=True)
msg: Reactive[str] = Reactive(" ", recompose=True) msg: Reactive[str] = Reactive(" ", recompose=True)
is_group: Reactive[bool] = Reactive(False, recompose=True)
peer_id: Reactive[int] = Reactive(0) peer_id: Reactive[int] = Reactive(0)
is_selected: Reactive[bool] = Reactive(False)
is_focused: Reactive[bool] = Reactive(False)
def __init__( def __init__(
self, self,
@ -100,19 +34,14 @@ class Chat(Widget):
) )
def on_mount(self) -> None: def on_mount(self) -> None:
self.switcher = self.screen.query_one(Horizontal).query_one("#dialog_switcher", ContentSwitcher) self.switcher = self.screen.query_one(Horizontal)\
.query_one("#dialog_switcher", ContentSwitcher)
def on_click(self) -> None: def on_click(self) -> None:
# Снимаем выделение со всех чатов # Получение ID диалога и создание DOM-ID на его основе
for chat in self.screen.query(Chat):
chat.is_selected = False
chat.is_focused = False
# Выделяем текущий чат
self.is_selected = True
self.is_focused = True
dialog_id = f"dialog-{str(self.peer_id)}" dialog_id = f"dialog-{str(self.peer_id)}"
# Маунт диалога
try: try:
self.switcher.mount(Dialog( self.switcher.mount(Dialog(
telegram_client=self.app.telegram_client, telegram_client=self.app.telegram_client,
@ -120,34 +49,21 @@ class Chat(Widget):
id=dialog_id id=dialog_id
)) ))
except: except:
# Диалог уже есть: ничего не делаем
pass pass
self.switcher.current = dialog_id self.switcher.current = dialog_id
self.switcher.recompose() self.switcher.recompose()
def compose(self) -> ComposeResult: def compose(self) -> ComposeResult:
with Horizontal(classes="chat-item"): with Horizontal():
""" yield Label(f"┌───┐\n{self.peername[:1]:1}\n└───┘")
# Используем ASCII-символы для рамки
yield Label(
f"┌───┐\n{normalize_text(
self.username[:1].upper()
):1} \n"
)
with Vertical(): with Vertical():
yield Label(normalize_text(self.username), id="name") yield Label(self.peername, id="peername")
yield Label(normalize_text(self.msg), id="last_msg") if self.is_group:
""" yield Label(self.username, id="name")
yield Label(f"┌───┐\n{self.username[:1].upper():1}\n└───┘")
with Vertical():
yield Label(self.username, id="name")
yield Label(self.msg, id="last_msg") yield Label(self.msg, id="last_msg")
def on_mouse_enter(self) -> None:
self.add_class("hover")
def on_mouse_leave(self) -> None:
self.remove_class("hover")
class Dialog(Widget): class Dialog(Widget):
"""Класс окна диалога""" """Класс окна диалога"""
@ -158,25 +74,23 @@ class Dialog(Widget):
disabled=None, disabled=None,
telegram_client: TelegramClient | None = None, telegram_client: TelegramClient | None = None,
chat_id = None chat_id = None
) -> None: ) -> None:
super().__init__(id=id, classes=classes, disabled=disabled) super().__init__(id=id, classes=classes, disabled=disabled)
self.telegram_client = telegram_client self.telegram_client = telegram_client
self.chat_id = chat_id self.chat_id = chat_id
self.is_msg_update_blocked = False self.is_msg_update_blocked = False
self.messages_loaded = 0 self.timezone = datetime.timezone(
self.is_loading = False datetime.timedelta(hours=int(getenv("UTC_OFFSET")))
)
async def on_mount(self) -> None: async def on_mount(self) -> None:
self.limit = 50 # Увеличиваем начальное количество сообщений self.limit = 10
self.messages_loaded = self.limit
self.msg_input = self.query_one("#msg_input") self.msg_input = self.query_one("#msg_input")
self.dialog = self.query_one(Vertical).query_one("#dialog") self.dialog = self.query_one(Vertical).query_one("#dialog")
self.load_more_btn = self.query_one("#load_more")
self.me = await self.telegram_client.get_me() self.me = await self.telegram_client.get_me()
self.dialog.scroll_end(animate=False)
await self.update_dialog() await self.update_dialog()
for event in ( for event in (
@ -188,6 +102,8 @@ class Dialog(Widget):
event(chats=(self.chat_id)) event(chats=(self.chat_id))
)(self.update_dialog) )(self.update_dialog)
self.dialog.scroll_down(animate=False, immediate=True)
def mount_messages(self, limit: int) -> None: def mount_messages(self, limit: int) -> None:
print("Загрузка виджетов сообщений...") print("Загрузка виджетов сообщений...")
@ -204,7 +120,7 @@ class Dialog(Widget):
self.dialog.query(Message).last().remove() self.dialog.query(Message).last().remove()
async def update_dialog(self, event = None) -> None: async def update_dialog(self, event = None) -> None:
log("Запрос обновления сообщений") print("Запрос обновления сообщений")
if not self.is_msg_update_blocked: if not self.is_msg_update_blocked:
self.is_msg_update_blocked = True self.is_msg_update_blocked = True
@ -212,31 +128,52 @@ class Dialog(Widget):
messages = await self.telegram_client.get_messages( messages = await self.telegram_client.get_messages(
entity=self.chat_id, limit=self.limit entity=self.chat_id, limit=self.limit
) )
log("Получены сообщения") print("Получены сообщения")
limit = len(messages) limit = len(messages)
self.mount_messages(limit) self.mount_messages(limit)
for i in range(limit): for i in range(limit):
msg = self.dialog.query_one(f"#msg-{i + 1}") msg = self.dialog.query_one(f"#msg-{i + 1}")
msg.message = ""
# Обрабатываем изображения
if messages[i].media:
try:
# Скачиваем изображение
image_data = await self.telegram_client.download_media(
messages[i].media,
bytes
)
if image_data:
await msg.set_image(image_data)
except Exception as e:
log(f"Ошибка загрузки изображения: {e}")
# Обрабатываем текст
if str(messages[i].message): if str(messages[i].message):
msg.message = normalize_text(str(messages[i].message)) message = Content(
"[Медиа] " * \
bool(messages[i].media) + \
str(messages[i].message)
)
else:
message = Content("[Медиа]" * bool(messages[i].media) + "")
entities = messages[i].entities
if entities:
for entity in entities:
match type(entity):
case types.MessageEntityBold:
message = message.stylize(
"bold",
entity.offset,
entity.offset + entity.length
)
case types.MessageEntityUnderline:
message = message.stylize(
"underline",
entity.offset,
entity.offset + entity.length
)
case types.MessageEntityItalic:
message = message.stylize(
"italic",
entity.offset,
entity.offset + entity.length
)
case types.MessageEntityStrike:
message = message.stylize(
"strike",
entity.offset,
entity.offset + entity.length
)
msg.message = message
try: try:
is_me = messages[i].from_id.user_id == self.me.id is_me = messages[i].from_id.user_id == self.me.id
@ -244,52 +181,26 @@ class Dialog(Widget):
is_me = False is_me = False
msg.is_me = is_me msg.is_me = is_me
msg.username = normalize_text(utils.get_display_name(messages[i].sender)) msg.username = utils.get_display_name(messages[i].sender)
msg.send_time = messages[i]\ msg.send_time = messages[i]\
.date\ .date\
.astimezone(datetime.timezone.utc)\ .astimezone(self.timezone)\
.strftime("%H:%M") .strftime("%H:%M")
self.is_msg_update_blocked = False self.is_msg_update_blocked = False
log("Сообщения обновлены") print("Сообщения обновлены")
else: else:
log("Обновление сообщений невозможно: уже выполняется") print("Обновление сообщений невозможно: уже выполняется")
async def load_more_messages(self) -> None:
if not self.is_loading:
self.is_loading = True
self.load_more_btn.disabled = True
self.load_more_btn.label = "Загрузка..."
try:
messages = await self.telegram_client.get_messages(
entity=self.chat_id,
limit=self.limit,
offset_id=self.messages_loaded
)
if messages:
self.messages_loaded += len(messages)
self.mount_messages(self.messages_loaded)
await self.update_dialog()
finally:
self.is_loading = False
self.load_more_btn.disabled = False
self.load_more_btn.label = "Загрузить еще"
def compose(self) -> ComposeResult: def compose(self) -> ComposeResult:
with Vertical(): with Vertical():
yield Button("Загрузить еще", id="load_more", variant="default")
yield VerticalScroll(id="dialog") yield VerticalScroll(id="dialog")
with Horizontal(id="input_place"): with Horizontal(id="input_place"):
yield Input(placeholder="Сообщение", id="msg_input") yield Input(placeholder="Сообщение", id="msg_input")
yield Button(label=">", id="send", variant="primary") yield Button(label="", id="send", variant="primary")
async def on_button_pressed(self, event) -> None: async def on_button_pressed(self, event = None) -> None:
if event.button.id == "load_more": await self.send_message()
await self.load_more_messages()
else:
await self.send_message()
async def on_input_submitted(self, event = None) -> None: async def on_input_submitted(self, event = None) -> None:
await self.send_message() await self.send_message()
@ -298,7 +209,7 @@ class Dialog(Widget):
try: try:
await self.telegram_client.send_message( await self.telegram_client.send_message(
self.chat_id, self.chat_id,
normalize_text(str(self.msg_input.value)) str(self.msg_input.value)
) )
except ValueError: except ValueError:
self.app.notify("Ошибка отправки") self.app.notify("Ошибка отправки")
@ -308,7 +219,7 @@ class Dialog(Widget):
class Message(Widget): class Message(Widget):
"""Класс виджета сообщений для окна диалога""" """Класс виджета сообщений для окна диалога"""
message: Reactive[str] = Reactive("", recompose=True) message: Reactive[Content] = Reactive("", recompose=True)
is_me: Reactive[bool] = Reactive(False, recompose=True) is_me: Reactive[bool] = Reactive(False, recompose=True)
username: Reactive[str] = Reactive("", recompose=True) username: Reactive[str] = Reactive("", recompose=True)
send_time: Reactive[str] = Reactive("", recompose=True) send_time: Reactive[str] = Reactive("", recompose=True)
@ -326,17 +237,14 @@ class Message(Widget):
pass pass
def compose(self) -> ComposeResult: def compose(self) -> ComposeResult:
static = Static(normalize_text(self.message)) label = Label(self.message, markup=False)
static.border_title = normalize_text(self.username) * (not self.is_me) label.border_title = self.username * (not self.is_me)
static.border_subtitle = self.send_time label.border_subtitle = self.send_time
with Container(): with Container():
yield static yield label
if self.is_me: if self.is_me:
self.classes = "is_me_true" self.classes = "is_me_true"
else: else:
self.classes = "is_me_false" self.classes = "is_me_false"
if __name__ == "__main__":
raise Exception("Запущен не тот файл. Запустите main.py.")