mirror of
https://github.com/avitoras/telegram-tui.git
synced 2025-07-27 19:26:10 +00:00
Compare commits
No commits in common. "83f4cac86b8258b9d079b26211d8d729197d24e5" and "e523fce7bd4d4647c194003072b8996faae90f5b" have entirely different histories.
83f4cac86b
...
e523fce7bd
@ -2,14 +2,6 @@
|
|||||||
# Их использование крайне нежелательно!
|
# Их использование крайне нежелательно!
|
||||||
# Получите свои API-ключи приложения на https://my.telegram.org/apps и вставьте их ниже.
|
# Получите свои API-ключи приложения на https://my.telegram.org/apps и вставьте их ниже.
|
||||||
# Спасибо за понимание!
|
# Спасибо за понимание!
|
||||||
#
|
|
||||||
# In EN: WARNING: it's official TG API-keys, please, don't use them, get yours via link above. Thanks!
|
|
||||||
|
|
||||||
API_ID=2040
|
API_ID=2040
|
||||||
API_HASH=b18441a1ff607e10a989891a5462e627
|
API_HASH=b18441a1ff607e10a989891a5462e627
|
||||||
|
|
||||||
# Конфиг
|
|
||||||
CURRENT_USER = "user"
|
|
||||||
DO_NOTIFY = False
|
|
||||||
UTC_OFFSET = 0
|
|
||||||
LANGUAGE = "en"
|
|
6
main.py
6
main.py
@ -1,8 +1,8 @@
|
|||||||
#!/usr/bin/python
|
#!/usr/bin/python
|
||||||
"""Файл инициализации приложения"""
|
"""Файл инициализации приложения"""
|
||||||
|
|
||||||
from src.app import Talc
|
from src.app import TelegramTUI
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
talc = Talc()
|
tg = TelegramTUI()
|
||||||
talc.run()
|
tg.run()
|
||||||
|
@ -1,3 +1,6 @@
|
|||||||
textual
|
textual
|
||||||
telethon
|
telethon
|
||||||
python-dotenv
|
python-dotenv
|
||||||
|
emoji
|
||||||
|
Pillow
|
||||||
|
pywhatkit
|
38
src/app.py
38
src/app.py
@ -1,33 +1,44 @@
|
|||||||
"""Главный файл приложения"""
|
"""Главный файл приложения"""
|
||||||
|
|
||||||
from os import getenv
|
import os
|
||||||
|
import sys
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
from telethon import TelegramClient
|
from telethon import TelegramClient, events
|
||||||
from textual.app import App
|
from textual.app import App
|
||||||
|
from rich.console import Console
|
||||||
from src.screens import AuthScreen, ChatScreen
|
from src.screens import AuthScreen, ChatScreen
|
||||||
import src.locales
|
|
||||||
|
# Настройка консоли для корректной работы с Unicode
|
||||||
|
"""
|
||||||
|
console = Console(force_terminal=True, color_system="auto")
|
||||||
|
sys.stdout = console
|
||||||
|
"""
|
||||||
|
# спойлер: не помогло
|
||||||
|
|
||||||
load_dotenv()
|
load_dotenv()
|
||||||
API_ID = getenv("API_ID")
|
|
||||||
API_HASH = getenv("API_HASH")
|
|
||||||
|
|
||||||
if not API_ID or not API_HASH:
|
api_id = os.getenv("API_ID")
|
||||||
|
api_hash = os.getenv("API_HASH")
|
||||||
|
|
||||||
|
if not api_id or not api_hash:
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
"API_ID и API_HASH не найдены в .env файле. "
|
"API_ID и API_HASH не найдены в .env файле. "
|
||||||
"Пожалуйста, скопируйте .env.example в .env и заполните свои ключи."
|
"Пожалуйста, скопируйте .env.example в .env и заполните свои ключи."
|
||||||
)
|
)
|
||||||
|
|
||||||
API_ID = int(API_ID)
|
api_id = int(api_id)
|
||||||
|
|
||||||
#locale = locales.
|
class TelegramTUI(App):
|
||||||
|
|
||||||
class Talc(App):
|
|
||||||
"""Класс приложения"""
|
"""Класс приложения"""
|
||||||
|
|
||||||
CSS_PATH = "style.tcss"
|
CSS_PATH = "style.tcss"
|
||||||
|
TITLE = "Telegram TUI"
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__()
|
||||||
|
|
||||||
async def on_mount(self) -> None:
|
async def on_mount(self) -> None:
|
||||||
self.telegram_client = TelegramClient(getenv("CURRENT_USER"), API_ID, API_HASH)
|
self.telegram_client = TelegramClient("user", api_id, api_hash)
|
||||||
await self.telegram_client.connect()
|
await self.telegram_client.connect()
|
||||||
|
|
||||||
chat_screen = ChatScreen(telegram_client=self.telegram_client)
|
chat_screen = ChatScreen(telegram_client=self.telegram_client)
|
||||||
@ -40,8 +51,9 @@ class Talc(App):
|
|||||||
else:
|
else:
|
||||||
self.push_screen("chats")
|
self.push_screen("chats")
|
||||||
|
|
||||||
self.scroll_sensitivity_y = 1.0
|
|
||||||
|
|
||||||
async def on_exit_app(self):
|
async def on_exit_app(self):
|
||||||
await self.telegram_client.disconnect()
|
await self.telegram_client.disconnect()
|
||||||
return super()._on_exit_app()
|
return super()._on_exit_app()
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
raise Exception("Запущен не тот файл. Запустите main.py.")
|
||||||
|
@ -1,2 +0,0 @@
|
|||||||
ru = {"greeting_auth": "Добро пожаловать в Тальк", "phone_number": "Номер телефона", "code": "Код", "password": "Пароль", "you": "Вы", "mention": "Вас упомянули"}
|
|
||||||
en = {"greeting_auth": "Welcome to Talc", "phone_number": "Phone number", "code": "Code", "password": "Password", "you": "You", "mention": "You got mentioned"}
|
|
235
src/screens.py
235
src/screens.py
@ -3,12 +3,12 @@
|
|||||||
from textual.screen import Screen
|
from textual.screen import Screen
|
||||||
from textual.widgets import Label, Input, Footer, Static, ContentSwitcher
|
from textual.widgets import Label, Input, Footer, Static, ContentSwitcher
|
||||||
from textual.containers import Vertical, Horizontal, VerticalScroll
|
from textual.containers import Vertical, Horizontal, VerticalScroll
|
||||||
from textual.app import ComposeResult
|
from textual.events import Key
|
||||||
from telethon.errors import SessionPasswordNeededError
|
from telethon.errors import SessionPasswordNeededError
|
||||||
from telethon import TelegramClient, events
|
from telethon import TelegramClient, events
|
||||||
from src.widgets import Chat
|
from src.widgets import Dialog, Chat, normalize_text
|
||||||
from os import system, getenv
|
from textual import log
|
||||||
from telethon.utils import get_display_name
|
from textual.keys import Keys, _character_to_key
|
||||||
|
|
||||||
class AuthScreen(Screen):
|
class AuthScreen(Screen):
|
||||||
"""Класс экрана логина в аккаунт"""
|
"""Класс экрана логина в аккаунт"""
|
||||||
@ -19,51 +19,57 @@ class AuthScreen(Screen):
|
|||||||
id = None,
|
id = None,
|
||||||
classes = None,
|
classes = None,
|
||||||
telegram_client: TelegramClient | None = None
|
telegram_client: TelegramClient | None = None
|
||||||
) -> None:
|
):
|
||||||
super().__init__(name, id, classes)
|
super().__init__(name, id, classes)
|
||||||
self.client = telegram_client
|
self.client = telegram_client
|
||||||
|
|
||||||
def on_mount(self) -> None:
|
def on_mount(self):
|
||||||
self.ac = self.query_one("#auth_container")
|
self.ac = self.query_one("#auth_container")
|
||||||
|
|
||||||
def compose(self) -> ComposeResult:
|
def compose(self):
|
||||||
with Vertical(id="auth_container"):
|
with Vertical(id="auth_container"):
|
||||||
yield Label("Добро пожаловать в Talc")
|
yield Label(normalize_text("Добро пожаловать в Telegram TUI"))
|
||||||
yield Input(placeholder="Номер телефона", id="phone")
|
yield Input(placeholder=normalize_text("Номер телефона"), id="phone")
|
||||||
yield Input(placeholder="Код", id="code", disabled=True)
|
yield Input(placeholder=normalize_text("Код"), id="code", disabled=True)
|
||||||
yield Input(
|
yield Input(
|
||||||
placeholder="Пароль",
|
placeholder=normalize_text("Пароль"),
|
||||||
id="password",
|
id="password",
|
||||||
password=True,
|
password=True,
|
||||||
disabled=True
|
disabled=True
|
||||||
)
|
)
|
||||||
|
|
||||||
async def on_input_submitted(self, event: Input.Submitted) -> None:
|
async def on_input_submitted(self, event: Input.Submitted) -> None:
|
||||||
match event.input.id:
|
if event.input.id == "phone":
|
||||||
case "phone":
|
self.phone = normalize_text(event.value)
|
||||||
self.phone = event.value
|
self.ac.query_one("#phone").disabled = True
|
||||||
self.ac.query_one("#phone").disabled = True
|
self.ac.query_one("#code").disabled = False
|
||||||
self.ac.query_one("#code").disabled = False
|
await self.client.send_code_request(phone=self.phone)
|
||||||
await self.client.send_code_request(phone=self.phone)
|
elif event.input.id == "code":
|
||||||
case "code":
|
try:
|
||||||
try:
|
self.code = normalize_text(event.value)
|
||||||
self.code = event.value
|
self.ac.query_one("#code").disabled = True
|
||||||
self.ac.query_one("#code").disabled = True
|
await self.client.sign_in(phone=self.phone, code=self.code)
|
||||||
await self.client.sign_in(phone=self.phone, code=self.code)
|
|
||||||
self.app.pop_screen()
|
|
||||||
self.app.push_screen("chats")
|
|
||||||
except SessionPasswordNeededError:
|
|
||||||
self.ac.query_one("#code").disabled = True
|
|
||||||
self.ac.query_one("#password").disabled = False
|
|
||||||
case "password":
|
|
||||||
self.password = event.value
|
|
||||||
await self.client.sign_in(password=self.password)
|
|
||||||
await self.client.start()
|
|
||||||
self.app.pop_screen()
|
self.app.pop_screen()
|
||||||
self.app.push_screen("chats")
|
self.app.push_screen("chats")
|
||||||
|
except SessionPasswordNeededError:
|
||||||
|
self.ac.query_one("#code").disabled = True
|
||||||
|
self.ac.query_one("#password").disabled = False
|
||||||
|
elif event.input.id == "password":
|
||||||
|
self.password = normalize_text(event.value)
|
||||||
|
await self.client.sign_in(password=self.password)
|
||||||
|
await self.client.start()
|
||||||
|
self.app.pop_screen()
|
||||||
|
self.app.push_screen("chats")
|
||||||
|
|
||||||
class ChatScreen(Screen):
|
class ChatScreen(Screen):
|
||||||
"""Класс экрана чатов, он же основной экран приложения"""
|
"""Класс экрана чатов, он же основной экран приложения"""
|
||||||
|
|
||||||
|
BINDINGS = [
|
||||||
|
(Keys.Tab, "log(\"Нажат таб\")", "Переключение фокуса"),
|
||||||
|
(Keys.Enter, "log(\"Нажат энтер\")", "Открыть"),
|
||||||
|
(Keys.Escape, "log(\"Нажат эскейп\")", "Назад"),
|
||||||
|
(_character_to_key("/"), "log(\"Нажат слэш\")", "Поиск")
|
||||||
|
]
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
@ -71,23 +77,25 @@ class ChatScreen(Screen):
|
|||||||
id = None,
|
id = None,
|
||||||
classes = None,
|
classes = None,
|
||||||
telegram_client: TelegramClient | None = None
|
telegram_client: TelegramClient | None = None
|
||||||
) -> None:
|
):
|
||||||
super().__init__(name, id, classes)
|
super().__init__(name, id, classes)
|
||||||
self.telegram_client = telegram_client
|
self.telegram_client = telegram_client
|
||||||
self.DO_NOTIFY = getenv("DO_NOTIFY")
|
self.search_query = ""
|
||||||
|
self.selected_chat_index = 0
|
||||||
|
self.chats = []
|
||||||
|
self.focused_element = "search" # search, chat_list, dialog
|
||||||
|
|
||||||
async def on_mount(self) -> None:
|
async def on_mount(self):
|
||||||
self.limit = 100
|
self.limit = 100
|
||||||
|
|
||||||
#Получение ID пользователя (себя)
|
|
||||||
self.me_id = await self.telegram_client.get_peer_id("me")
|
|
||||||
# Получение объекта контейнера чатов
|
|
||||||
self.chat_container = self\
|
self.chat_container = self\
|
||||||
.query_one("#main_container")\
|
.query_one("#main_container")\
|
||||||
.query_one("#chats")\
|
.query_one("#chats")\
|
||||||
.query_one("#chat_container")
|
.query_one("#chat_container")
|
||||||
|
|
||||||
|
self.search_input = self.query_one("#search_input")
|
||||||
|
|
||||||
print("Первоначальная загрузка виджетов чатов...")
|
log("Первоначальная загрузка виджетов чатов...")
|
||||||
self.mount_chats(
|
self.mount_chats(
|
||||||
len(
|
len(
|
||||||
await self.telegram_client.get_dialogs(
|
await self.telegram_client.get_dialogs(
|
||||||
@ -95,44 +103,37 @@ class ChatScreen(Screen):
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
print("Первоначальная загрузка виджетов чата завершена")
|
log("Первоначальная загрузка виджетов чата завершена")
|
||||||
|
|
||||||
self.is_chat_update_blocked = False
|
self.is_chat_update_blocked = False
|
||||||
await self.update_chat_list()
|
await self.update_chat_list()
|
||||||
print("Первоначальная загрузка чатов завершена")
|
|
||||||
|
|
||||||
# Автообновление чатов при следующих событиях
|
log("Первоначальная загрузка чатов завершена")
|
||||||
|
|
||||||
for event in (
|
for event in (
|
||||||
events.NewMessage,
|
events.NewMessage,
|
||||||
events.MessageDeleted,
|
events.MessageDeleted,
|
||||||
events.MessageEdited
|
events.MessageEdited
|
||||||
):
|
):
|
||||||
self.telegram_client.on(event())(self.update_chat_list)
|
self.telegram_client.on(event())(self.update_chat_list)
|
||||||
self.telegram_client.on(events.NewMessage)(self.notify_send)
|
|
||||||
|
|
||||||
def mount_chats(self, limit: int) -> None:
|
def mount_chats(self, limit: int):
|
||||||
"""Функция маунта чатов"""
|
log("Загрузка виджетов чатов...")
|
||||||
print("Загрузка виджетов чатов...")
|
|
||||||
|
|
||||||
# Счёт текущего количества примонтированных чатов
|
|
||||||
chats_amount = len(self.chat_container.query(Chat))
|
chats_amount = len(self.chat_container.query(Chat))
|
||||||
|
|
||||||
if limit > chats_amount:
|
if limit > chats_amount:
|
||||||
# Маунт недостоющих, если чатов меньше, чем нужно
|
|
||||||
for i in range(limit - chats_amount):
|
for i in range(limit - chats_amount):
|
||||||
chat = Chat(id=f"chat-{i + chats_amount + 1}")
|
chat = Chat(id=f"chat-{i + chats_amount + 1}")
|
||||||
self.chat_container.mount(chat)
|
self.chat_container.mount(chat)
|
||||||
elif limit < chats_amount:
|
elif limit < chats_amount:
|
||||||
# Удаление лишних, если чатов больше, чем нужно
|
|
||||||
for i in range(chats_amount - limit):
|
for i in range(chats_amount - limit):
|
||||||
self.chat_container.query(Chat).last().remove()
|
self.chat_container.query(Chat).last().remove()
|
||||||
# Ничего, если их ровно столько же
|
|
||||||
|
|
||||||
print("Виджеты чатов загружены")
|
log("Виджеты чатов загружены")
|
||||||
|
|
||||||
async def update_chat_list(self, event = None) -> None:
|
async def update_chat_list(self, event = None):
|
||||||
"""Функция обновления чатов (и уведомления)"""
|
log("Запрос обновления чатов")
|
||||||
print("Запрос обновления чатов")
|
|
||||||
|
|
||||||
if not self.is_chat_update_blocked:
|
if not self.is_chat_update_blocked:
|
||||||
self.is_chat_update_blocked = True
|
self.is_chat_update_blocked = True
|
||||||
@ -140,62 +141,96 @@ class ChatScreen(Screen):
|
|||||||
dialogs = await self.telegram_client.get_dialogs(
|
dialogs = await self.telegram_client.get_dialogs(
|
||||||
limit=self.limit, archived=False
|
limit=self.limit, archived=False
|
||||||
)
|
)
|
||||||
print("Получены диалоги")
|
log("Получены диалоги")
|
||||||
|
|
||||||
|
# Фильтруем диалоги по поисковому запросу
|
||||||
|
if self.search_query:
|
||||||
|
dialogs = [
|
||||||
|
d for d in dialogs
|
||||||
|
if self.search_query.lower() in \
|
||||||
|
normalize_text(d.name).lower()
|
||||||
|
]
|
||||||
|
|
||||||
# Маунт виджетов чатов в панели чатов по лимиту
|
|
||||||
limit = len(dialogs)
|
limit = len(dialogs)
|
||||||
self.mount_chats(limit)
|
self.mount_chats(limit)
|
||||||
|
|
||||||
# Изменение надписей в виджетах чатов
|
|
||||||
for i in range(limit):
|
for i in range(limit):
|
||||||
chat = self.chat_container.query_one(f"#chat-{i + 1}")
|
chat = self.chat_container.query_one(f"#chat-{i + 1}")
|
||||||
|
chat.username = normalize_text(str(dialogs[i].name))
|
||||||
chat.peername = str(dialogs[i].name)
|
chat.msg = normalize_text(str(dialogs[i].message.message))
|
||||||
chat.is_group = dialogs[i].is_group
|
|
||||||
chat.peer_id = dialogs[i].id
|
chat.peer_id = dialogs[i].id
|
||||||
|
chat.is_selected = (i == self.selected_chat_index)
|
||||||
try:
|
chat.is_focused = (self.focused_element == "chat_list" and \
|
||||||
is_my_msg = \
|
i == self.selected_chat_index)
|
||||||
dialogs[i].message.from_id.user_id == self.me_id
|
|
||||||
except:
|
|
||||||
is_my_msg = dialogs[i].id == self.me_id
|
|
||||||
|
|
||||||
if dialogs[i].is_group and is_my_msg:
|
|
||||||
chat.username = "Вы"
|
|
||||||
chat.msg = str(dialogs[i].message.message)
|
|
||||||
elif dialogs[i].is_group:
|
|
||||||
chat.username = str(
|
|
||||||
get_display_name(dialogs[i].message.sender)
|
|
||||||
)
|
|
||||||
chat.msg = str(dialogs[i].message.message)
|
|
||||||
elif is_my_msg:
|
|
||||||
chat.msg = "Вы: " * is_my_msg + str(
|
|
||||||
dialogs[i].message.message
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
chat.msg = str(dialogs[i].message.message)
|
|
||||||
|
|
||||||
self.is_chat_update_blocked = False
|
self.is_chat_update_blocked = False
|
||||||
print("Чаты обновлены")
|
log("Чаты обновлены")
|
||||||
else:
|
else:
|
||||||
print("Обновление чатов невозможно: уже выполняется")
|
log("Обновление чатов невозможно: уже выполняется")
|
||||||
|
|
||||||
def notify_send(self, event) -> None:
|
def on_input_changed(self, event: Input.Changed) -> None:
|
||||||
if not event:
|
if event.input.id == "search_input":
|
||||||
return None
|
self.search_query = normalize_text(event.value)
|
||||||
if bool(self.DO_NOTIFY) and event.mentioned and not self.app.focused:
|
self.selected_chat_index = 0
|
||||||
system(f"notify-send \"Вас упомянули\" Talc")
|
self.update_chat_list()
|
||||||
|
|
||||||
def compose(self) -> ComposeResult:
|
def on_key(self, event: Key) -> None:
|
||||||
yield Footer() # Нижняя панель с подсказками
|
if event.key == Keys.Tab:
|
||||||
with Horizontal(id="main_container"): # Основной контейнер
|
# Переключаем фокус между элементами
|
||||||
with Horizontal(id="chats"):
|
if self.focused_element == "search":
|
||||||
|
self.focused_element = "chat_list"
|
||||||
|
self.search_input.blur()
|
||||||
|
self.update_chat_list()
|
||||||
|
elif self.focused_element == "chat_list":
|
||||||
|
self.focused_element = "search"
|
||||||
|
self.search_input.focus()
|
||||||
|
self.update_chat_list()
|
||||||
|
return
|
||||||
|
|
||||||
|
if self.focused_element == "search":
|
||||||
|
return
|
||||||
|
|
||||||
|
chats = self.chat_container.query(Chat)
|
||||||
|
if not chats:
|
||||||
|
return
|
||||||
|
|
||||||
|
match event.key:
|
||||||
|
case Keys.Up:
|
||||||
|
self.selected_chat_index = max(0, self.selected_chat_index - 1)
|
||||||
|
for i, chat in enumerate(chats):
|
||||||
|
chat.is_selected = (i == self.selected_chat_index)
|
||||||
|
chat.is_focused = (i == self.selected_chat_index)
|
||||||
|
# Прокручиваем к выбранному чату
|
||||||
|
selected_chat = chats[self.selected_chat_index]
|
||||||
|
self.chat_container.scroll_to(selected_chat, animate=False)
|
||||||
|
case Keys.Down:
|
||||||
|
self.selected_chat_index = min(len(chats) - 1, self.selected_chat_index + 1)
|
||||||
|
for i, chat in enumerate(chats):
|
||||||
|
chat.is_selected = (i == self.selected_chat_index)
|
||||||
|
chat.is_focused = (i == self.selected_chat_index)
|
||||||
|
# Прокручиваем к выбранному чату
|
||||||
|
selected_chat = chats[self.selected_chat_index]
|
||||||
|
self.chat_container.scroll_to(selected_chat, animate=False)
|
||||||
|
case Keys.Enter:
|
||||||
|
chats[self.selected_chat_index].on_click()
|
||||||
|
case Keys.Escape:
|
||||||
|
# Возвращаемся к списку чатов
|
||||||
|
self.app.pop_screen()
|
||||||
|
self.app.push_screen("chats")
|
||||||
|
case "/": #Не работает: нужен кейкод слэша
|
||||||
|
# Фокус на поиск
|
||||||
|
self.focused_element = "search"
|
||||||
|
self.search_input.focus()
|
||||||
|
self.update_chat_list()
|
||||||
|
|
||||||
|
def compose(self):
|
||||||
|
yield Footer()
|
||||||
|
with Horizontal(id="main_container"):
|
||||||
|
with Vertical(id="chats"):
|
||||||
|
yield Input(placeholder=normalize_text("Поиск чатов..."), id="search_input")
|
||||||
yield VerticalScroll(id="chat_container")
|
yield VerticalScroll(id="chat_container")
|
||||||
#TODO: сделать кнопку, чтобы прогрузить больше чатов,
|
yield ContentSwitcher(id="dialog_switcher")
|
||||||
# или ленивую прокрутку
|
#yield Dialog(telegram_client=self.telegram_client)
|
||||||
with ContentSwitcher(id="dialog_switcher"):
|
|
||||||
# ↑ Внутри него как раз крутятся диалоги
|
if __name__ == "__main__":
|
||||||
yield Label(
|
raise Exception("Запущен не тот файл. Запустите main.py.")
|
||||||
"Нажмите на чат в панели слева, чтобы начать общаться",
|
|
||||||
id="begin_talk_label"
|
|
||||||
) #TODO: не показывается надпись, надо будет исправить
|
|
||||||
|
@ -10,10 +10,6 @@ Chat {
|
|||||||
height: 3;
|
height: 3;
|
||||||
}
|
}
|
||||||
|
|
||||||
Chat Horizontal Vertical Label{
|
|
||||||
height: 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
Rule {
|
Rule {
|
||||||
color: #FFFFFF;
|
color: #FFFFFF;
|
||||||
}
|
}
|
||||||
@ -46,13 +42,12 @@ Message Container {
|
|||||||
align: right middle;
|
align: right middle;
|
||||||
}
|
}
|
||||||
|
|
||||||
.is_me_true Container Label {
|
.is_me_true Static {
|
||||||
border: solid $primary;
|
border: solid $primary;
|
||||||
width: auto;
|
width: auto;
|
||||||
height: auto;
|
height: auto;
|
||||||
text-align: left;
|
text-align: right;
|
||||||
min-width: 11;
|
min-width: 11;
|
||||||
max-width: 100%;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
.is_me_false Container {
|
.is_me_false Container {
|
||||||
@ -60,24 +55,10 @@ Message Container {
|
|||||||
align: left middle;
|
align: left middle;
|
||||||
}
|
}
|
||||||
|
|
||||||
.is_me_false Container Label {
|
.is_me_false Static {
|
||||||
border: solid $foreground;
|
border: solid $foreground;
|
||||||
width: auto;
|
width: auto;
|
||||||
height: auto;
|
height: auto;
|
||||||
text-align: left;
|
text-align: left;
|
||||||
min-width: 11;
|
min-width: 11;
|
||||||
max-width: 100%;
|
|
||||||
}
|
|
||||||
|
|
||||||
ContentSwitcher {
|
|
||||||
width: 100%;
|
|
||||||
height: 100%;
|
|
||||||
}
|
|
||||||
|
|
||||||
.begin_talk_label {
|
|
||||||
width: 100%;
|
|
||||||
height: 100%;
|
|
||||||
text-align: center;
|
|
||||||
content-align: center middle;
|
|
||||||
color: $panel;
|
|
||||||
}
|
}
|
||||||
|
246
src/widgets.py
246
src/widgets.py
@ -5,19 +5,85 @@ from textual.widget import Widget
|
|||||||
from textual.reactive import Reactive
|
from textual.reactive import Reactive
|
||||||
from textual.widgets import Input, Button, Label, Static, ContentSwitcher
|
from textual.widgets import Input, Button, Label, Static, ContentSwitcher
|
||||||
from textual.app import ComposeResult, RenderResult
|
from textual.app import ComposeResult, RenderResult
|
||||||
from textual.content import Content
|
from telethon import TelegramClient, events, utils
|
||||||
from telethon import TelegramClient, events, utils, types
|
|
||||||
import datetime
|
import datetime
|
||||||
from os import getenv
|
import unicodedata
|
||||||
|
import re
|
||||||
|
import emoji
|
||||||
|
import os
|
||||||
|
import tempfile
|
||||||
|
from PIL import Image
|
||||||
|
#import pywhatkit as kit
|
||||||
|
from textual import log
|
||||||
|
from warnings import deprecated
|
||||||
|
|
||||||
|
def remove_emoji(text: str) -> str:
|
||||||
|
"""Удаляет эмодзи из текста"""
|
||||||
|
if not text:
|
||||||
|
return ""
|
||||||
|
return emoji.replace_emoji(text, '')
|
||||||
|
|
||||||
|
def normalize_text(text: str) -> str:
|
||||||
|
"""Нормализует текст для корректного отображения"""
|
||||||
|
if not text:
|
||||||
|
return ""
|
||||||
|
# Удаляем эмодзи
|
||||||
|
text = remove_emoji(text)
|
||||||
|
# Удаляем все управляющие символы
|
||||||
|
text = ''\
|
||||||
|
.join(char for char in text if unicodedata.category(char)[0] != 'C')
|
||||||
|
# Нормализуем Unicode
|
||||||
|
text = unicodedata.normalize('NFKC', text)
|
||||||
|
# Заменяем специальные символы на их ASCII-эквиваленты
|
||||||
|
text = text.replace('—', '-').replace('–', '-')
|
||||||
|
# Удаляем все непечатаемые символы
|
||||||
|
text = ''.join(char for char in text if char.isprintable())
|
||||||
|
return text
|
||||||
|
|
||||||
|
def safe_ascii(text: str) -> str:
|
||||||
|
"""Преобразует текст в безопасный ASCII-формат"""
|
||||||
|
if not text:
|
||||||
|
return ""
|
||||||
|
# Удаляем эмодзи
|
||||||
|
text = remove_emoji(text)
|
||||||
|
# Оставляем только ASCII символы и пробелы
|
||||||
|
return ''.join(char for char in text if ord(char) < 128 or char.isspace())
|
||||||
|
|
||||||
|
@deprecated("Не работает на моём компьютере.")
|
||||||
|
def convert_image_to_ascii(image_path: str, width: int = 50) -> str:
|
||||||
|
"""Конвертирует изображение в ASCII-арт"""
|
||||||
|
try:
|
||||||
|
# Открываем изображение
|
||||||
|
img = Image.open(image_path)
|
||||||
|
|
||||||
|
# Конвертируем в RGB если нужно
|
||||||
|
if img.mode != 'RGB':
|
||||||
|
img = img.convert('RGB')
|
||||||
|
|
||||||
|
# Изменяем размер, сохраняя пропорции
|
||||||
|
aspect_ratio = img.height / img.width
|
||||||
|
height = int(width * aspect_ratio * 0.5) # * 0.5 потому что символы выше чем шире
|
||||||
|
img = img.resize((width, height))
|
||||||
|
|
||||||
|
# Конвертируем в ASCII
|
||||||
|
ascii_str = kit.image_to_ascii_art(image_path, output_file=None)
|
||||||
|
|
||||||
|
# Очищаем временный файл
|
||||||
|
os.remove(image_path)
|
||||||
|
|
||||||
|
return ascii_str
|
||||||
|
except Exception as e:
|
||||||
|
log(f"Ошибка конвертации изображения: {e}")
|
||||||
|
return "Ошибка загрузки изображения"
|
||||||
|
|
||||||
class Chat(Widget):
|
class Chat(Widget):
|
||||||
"""Класс виджета чата для панели чатов"""
|
"""Класс виджета чата для панели чатов"""
|
||||||
|
|
||||||
username: Reactive[str] = Reactive(" ", recompose=True)
|
username: Reactive[str] = Reactive(" ", recompose=True)
|
||||||
peername: Reactive[str] = Reactive(" ", recompose=True)
|
|
||||||
msg: Reactive[str] = Reactive(" ", recompose=True)
|
msg: Reactive[str] = Reactive(" ", recompose=True)
|
||||||
is_group: Reactive[bool] = Reactive(False, recompose=True)
|
|
||||||
peer_id: Reactive[int] = Reactive(0)
|
peer_id: Reactive[int] = Reactive(0)
|
||||||
|
is_selected: Reactive[bool] = Reactive(False)
|
||||||
|
is_focused: Reactive[bool] = Reactive(False)
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
@ -34,14 +100,19 @@ class Chat(Widget):
|
|||||||
)
|
)
|
||||||
|
|
||||||
def on_mount(self) -> None:
|
def on_mount(self) -> None:
|
||||||
self.switcher = self.screen.query_one(Horizontal)\
|
self.switcher = self.screen.query_one(Horizontal).query_one("#dialog_switcher", ContentSwitcher)
|
||||||
.query_one("#dialog_switcher", ContentSwitcher)
|
|
||||||
|
|
||||||
def on_click(self) -> None:
|
def on_click(self) -> None:
|
||||||
# Получение ID диалога и создание DOM-ID на его основе
|
# Снимаем выделение со всех чатов
|
||||||
|
for chat in self.screen.query(Chat):
|
||||||
|
chat.is_selected = False
|
||||||
|
chat.is_focused = False
|
||||||
|
|
||||||
|
# Выделяем текущий чат
|
||||||
|
self.is_selected = True
|
||||||
|
self.is_focused = True
|
||||||
|
|
||||||
dialog_id = f"dialog-{str(self.peer_id)}"
|
dialog_id = f"dialog-{str(self.peer_id)}"
|
||||||
|
|
||||||
# Маунт диалога
|
|
||||||
try:
|
try:
|
||||||
self.switcher.mount(Dialog(
|
self.switcher.mount(Dialog(
|
||||||
telegram_client=self.app.telegram_client,
|
telegram_client=self.app.telegram_client,
|
||||||
@ -49,21 +120,34 @@ class Chat(Widget):
|
|||||||
id=dialog_id
|
id=dialog_id
|
||||||
))
|
))
|
||||||
except:
|
except:
|
||||||
# Диалог уже есть: ничего не делаем
|
|
||||||
pass
|
pass
|
||||||
|
|
||||||
self.switcher.current = dialog_id
|
self.switcher.current = dialog_id
|
||||||
self.switcher.recompose()
|
self.switcher.recompose()
|
||||||
|
|
||||||
def compose(self) -> ComposeResult:
|
def compose(self) -> ComposeResult:
|
||||||
with Horizontal():
|
with Horizontal(classes="chat-item"):
|
||||||
yield Label(f"┌───┐\n│ {self.peername[:1]:1} │\n└───┘")
|
"""
|
||||||
|
# Используем ASCII-символы для рамки
|
||||||
|
yield Label(
|
||||||
|
f"┌───┐\n│ {normalize_text(
|
||||||
|
self.username[:1].upper()
|
||||||
|
):1} │\n└───┘"
|
||||||
|
)
|
||||||
with Vertical():
|
with Vertical():
|
||||||
yield Label(self.peername, id="peername")
|
yield Label(normalize_text(self.username), id="name")
|
||||||
if self.is_group:
|
yield Label(normalize_text(self.msg), id="last_msg")
|
||||||
yield Label(self.username, id="name")
|
"""
|
||||||
|
yield Label(f"┌───┐\n│ {self.username[:1].upper():1} │\n└───┘")
|
||||||
|
with Vertical():
|
||||||
|
yield Label(self.username, id="name")
|
||||||
yield Label(self.msg, id="last_msg")
|
yield Label(self.msg, id="last_msg")
|
||||||
|
|
||||||
|
def on_mouse_enter(self) -> None:
|
||||||
|
self.add_class("hover")
|
||||||
|
|
||||||
|
def on_mouse_leave(self) -> None:
|
||||||
|
self.remove_class("hover")
|
||||||
|
|
||||||
class Dialog(Widget):
|
class Dialog(Widget):
|
||||||
"""Класс окна диалога"""
|
"""Класс окна диалога"""
|
||||||
|
|
||||||
@ -74,23 +158,25 @@ class Dialog(Widget):
|
|||||||
disabled=None,
|
disabled=None,
|
||||||
telegram_client: TelegramClient | None = None,
|
telegram_client: TelegramClient | None = None,
|
||||||
chat_id = None
|
chat_id = None
|
||||||
) -> None:
|
) -> None:
|
||||||
super().__init__(id=id, classes=classes, disabled=disabled)
|
super().__init__(id=id, classes=classes, disabled=disabled)
|
||||||
self.telegram_client = telegram_client
|
self.telegram_client = telegram_client
|
||||||
self.chat_id = chat_id
|
self.chat_id = chat_id
|
||||||
self.is_msg_update_blocked = False
|
self.is_msg_update_blocked = False
|
||||||
self.timezone = datetime.timezone(
|
self.messages_loaded = 0
|
||||||
datetime.timedelta(hours=int(getenv("UTC_OFFSET")))
|
self.is_loading = False
|
||||||
)
|
|
||||||
|
|
||||||
async def on_mount(self) -> None:
|
async def on_mount(self) -> None:
|
||||||
self.limit = 10
|
self.limit = 50 # Увеличиваем начальное количество сообщений
|
||||||
|
self.messages_loaded = self.limit
|
||||||
|
|
||||||
self.msg_input = self.query_one("#msg_input")
|
self.msg_input = self.query_one("#msg_input")
|
||||||
self.dialog = self.query_one(Vertical).query_one("#dialog")
|
self.dialog = self.query_one(Vertical).query_one("#dialog")
|
||||||
|
self.load_more_btn = self.query_one("#load_more")
|
||||||
|
|
||||||
self.me = await self.telegram_client.get_me()
|
self.me = await self.telegram_client.get_me()
|
||||||
|
|
||||||
|
self.dialog.scroll_end(animate=False)
|
||||||
await self.update_dialog()
|
await self.update_dialog()
|
||||||
|
|
||||||
for event in (
|
for event in (
|
||||||
@ -102,8 +188,6 @@ class Dialog(Widget):
|
|||||||
event(chats=(self.chat_id))
|
event(chats=(self.chat_id))
|
||||||
)(self.update_dialog)
|
)(self.update_dialog)
|
||||||
|
|
||||||
self.dialog.scroll_down(animate=False, immediate=True)
|
|
||||||
|
|
||||||
def mount_messages(self, limit: int) -> None:
|
def mount_messages(self, limit: int) -> None:
|
||||||
print("Загрузка виджетов сообщений...")
|
print("Загрузка виджетов сообщений...")
|
||||||
|
|
||||||
@ -120,7 +204,7 @@ class Dialog(Widget):
|
|||||||
self.dialog.query(Message).last().remove()
|
self.dialog.query(Message).last().remove()
|
||||||
|
|
||||||
async def update_dialog(self, event = None) -> None:
|
async def update_dialog(self, event = None) -> None:
|
||||||
print("Запрос обновления сообщений")
|
log("Запрос обновления сообщений")
|
||||||
|
|
||||||
if not self.is_msg_update_blocked:
|
if not self.is_msg_update_blocked:
|
||||||
self.is_msg_update_blocked = True
|
self.is_msg_update_blocked = True
|
||||||
@ -128,52 +212,31 @@ class Dialog(Widget):
|
|||||||
messages = await self.telegram_client.get_messages(
|
messages = await self.telegram_client.get_messages(
|
||||||
entity=self.chat_id, limit=self.limit
|
entity=self.chat_id, limit=self.limit
|
||||||
)
|
)
|
||||||
print("Получены сообщения")
|
log("Получены сообщения")
|
||||||
|
|
||||||
limit = len(messages)
|
limit = len(messages)
|
||||||
self.mount_messages(limit)
|
self.mount_messages(limit)
|
||||||
|
|
||||||
for i in range(limit):
|
for i in range(limit):
|
||||||
msg = self.dialog.query_one(f"#msg-{i + 1}")
|
msg = self.dialog.query_one(f"#msg-{i + 1}")
|
||||||
if str(messages[i].message):
|
msg.message = ""
|
||||||
message = Content(
|
|
||||||
"[Медиа] " * \
|
|
||||||
bool(messages[i].media) + \
|
|
||||||
str(messages[i].message)
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
message = Content("[Медиа]" * bool(messages[i].media) + "")
|
|
||||||
|
|
||||||
entities = messages[i].entities
|
# Обрабатываем изображения
|
||||||
if entities:
|
if messages[i].media:
|
||||||
for entity in entities:
|
try:
|
||||||
match type(entity):
|
# Скачиваем изображение
|
||||||
case types.MessageEntityBold:
|
image_data = await self.telegram_client.download_media(
|
||||||
message = message.stylize(
|
messages[i].media,
|
||||||
"bold",
|
bytes
|
||||||
entity.offset,
|
)
|
||||||
entity.offset + entity.length
|
if image_data:
|
||||||
)
|
await msg.set_image(image_data)
|
||||||
case types.MessageEntityUnderline:
|
except Exception as e:
|
||||||
message = message.stylize(
|
log(f"Ошибка загрузки изображения: {e}")
|
||||||
"underline",
|
|
||||||
entity.offset,
|
# Обрабатываем текст
|
||||||
entity.offset + entity.length
|
if str(messages[i].message):
|
||||||
)
|
msg.message = normalize_text(str(messages[i].message))
|
||||||
case types.MessageEntityItalic:
|
|
||||||
message = message.stylize(
|
|
||||||
"italic",
|
|
||||||
entity.offset,
|
|
||||||
entity.offset + entity.length
|
|
||||||
)
|
|
||||||
case types.MessageEntityStrike:
|
|
||||||
message = message.stylize(
|
|
||||||
"strike",
|
|
||||||
entity.offset,
|
|
||||||
entity.offset + entity.length
|
|
||||||
)
|
|
||||||
|
|
||||||
msg.message = message
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
is_me = messages[i].from_id.user_id == self.me.id
|
is_me = messages[i].from_id.user_id == self.me.id
|
||||||
@ -181,26 +244,52 @@ class Dialog(Widget):
|
|||||||
is_me = False
|
is_me = False
|
||||||
|
|
||||||
msg.is_me = is_me
|
msg.is_me = is_me
|
||||||
msg.username = utils.get_display_name(messages[i].sender)
|
msg.username = normalize_text(utils.get_display_name(messages[i].sender))
|
||||||
msg.send_time = messages[i]\
|
msg.send_time = messages[i]\
|
||||||
.date\
|
.date\
|
||||||
.astimezone(self.timezone)\
|
.astimezone(datetime.timezone.utc)\
|
||||||
.strftime("%H:%M")
|
.strftime("%H:%M")
|
||||||
|
|
||||||
self.is_msg_update_blocked = False
|
self.is_msg_update_blocked = False
|
||||||
print("Сообщения обновлены")
|
log("Сообщения обновлены")
|
||||||
else:
|
else:
|
||||||
print("Обновление сообщений невозможно: уже выполняется")
|
log("Обновление сообщений невозможно: уже выполняется")
|
||||||
|
|
||||||
|
async def load_more_messages(self) -> None:
|
||||||
|
if not self.is_loading:
|
||||||
|
self.is_loading = True
|
||||||
|
self.load_more_btn.disabled = True
|
||||||
|
self.load_more_btn.label = "Загрузка..."
|
||||||
|
|
||||||
|
try:
|
||||||
|
messages = await self.telegram_client.get_messages(
|
||||||
|
entity=self.chat_id,
|
||||||
|
limit=self.limit,
|
||||||
|
offset_id=self.messages_loaded
|
||||||
|
)
|
||||||
|
|
||||||
|
if messages:
|
||||||
|
self.messages_loaded += len(messages)
|
||||||
|
self.mount_messages(self.messages_loaded)
|
||||||
|
await self.update_dialog()
|
||||||
|
finally:
|
||||||
|
self.is_loading = False
|
||||||
|
self.load_more_btn.disabled = False
|
||||||
|
self.load_more_btn.label = "Загрузить еще"
|
||||||
|
|
||||||
def compose(self) -> ComposeResult:
|
def compose(self) -> ComposeResult:
|
||||||
with Vertical():
|
with Vertical():
|
||||||
|
yield Button("Загрузить еще", id="load_more", variant="default")
|
||||||
yield VerticalScroll(id="dialog")
|
yield VerticalScroll(id="dialog")
|
||||||
with Horizontal(id="input_place"):
|
with Horizontal(id="input_place"):
|
||||||
yield Input(placeholder="Сообщение", id="msg_input")
|
yield Input(placeholder="Сообщение", id="msg_input")
|
||||||
yield Button(label="➤", id="send", variant="primary")
|
yield Button(label=">", id="send", variant="primary")
|
||||||
|
|
||||||
async def on_button_pressed(self, event = None) -> None:
|
async def on_button_pressed(self, event) -> None:
|
||||||
await self.send_message()
|
if event.button.id == "load_more":
|
||||||
|
await self.load_more_messages()
|
||||||
|
else:
|
||||||
|
await self.send_message()
|
||||||
|
|
||||||
async def on_input_submitted(self, event = None) -> None:
|
async def on_input_submitted(self, event = None) -> None:
|
||||||
await self.send_message()
|
await self.send_message()
|
||||||
@ -209,7 +298,7 @@ class Dialog(Widget):
|
|||||||
try:
|
try:
|
||||||
await self.telegram_client.send_message(
|
await self.telegram_client.send_message(
|
||||||
self.chat_id,
|
self.chat_id,
|
||||||
str(self.msg_input.value)
|
normalize_text(str(self.msg_input.value))
|
||||||
)
|
)
|
||||||
except ValueError:
|
except ValueError:
|
||||||
self.app.notify("Ошибка отправки")
|
self.app.notify("Ошибка отправки")
|
||||||
@ -219,7 +308,7 @@ class Dialog(Widget):
|
|||||||
class Message(Widget):
|
class Message(Widget):
|
||||||
"""Класс виджета сообщений для окна диалога"""
|
"""Класс виджета сообщений для окна диалога"""
|
||||||
|
|
||||||
message: Reactive[Content] = Reactive("", recompose=True)
|
message: Reactive[str] = Reactive("", recompose=True)
|
||||||
is_me: Reactive[bool] = Reactive(False, recompose=True)
|
is_me: Reactive[bool] = Reactive(False, recompose=True)
|
||||||
username: Reactive[str] = Reactive("", recompose=True)
|
username: Reactive[str] = Reactive("", recompose=True)
|
||||||
send_time: Reactive[str] = Reactive("", recompose=True)
|
send_time: Reactive[str] = Reactive("", recompose=True)
|
||||||
@ -237,14 +326,17 @@ class Message(Widget):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
def compose(self) -> ComposeResult:
|
def compose(self) -> ComposeResult:
|
||||||
label = Label(self.message, markup=False)
|
static = Static(normalize_text(self.message))
|
||||||
label.border_title = self.username * (not self.is_me)
|
static.border_title = normalize_text(self.username) * (not self.is_me)
|
||||||
label.border_subtitle = self.send_time
|
static.border_subtitle = self.send_time
|
||||||
|
|
||||||
with Container():
|
with Container():
|
||||||
yield label
|
yield static
|
||||||
|
|
||||||
if self.is_me:
|
if self.is_me:
|
||||||
self.classes = "is_me_true"
|
self.classes = "is_me_true"
|
||||||
else:
|
else:
|
||||||
self.classes = "is_me_false"
|
self.classes = "is_me_false"
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
raise Exception("Запущен не тот файл. Запустите main.py.")
|
||||||
|
Loading…
x
Reference in New Issue
Block a user