Куча изменений, вот теперь точно

This commit is contained in:
kirill 2025-05-26 02:32:44 +03:00
parent 359ee43615
commit 83f4cac86b
8 changed files with 204 additions and 77 deletions

View File

@ -2,6 +2,14 @@
# Их использование крайне нежелательно!
# Получите свои API-ключи приложения на https://my.telegram.org/apps и вставьте их ниже.
# Спасибо за понимание!
#
# In EN: WARNING: it's official TG API-keys, please, don't use them, get yours via link above. Thanks!
API_ID=2040
API_HASH=b18441a1ff607e10a989891a5462e627
# Конфиг
CURRENT_USER = "user"
DO_NOTIFY = False
UTC_OFFSET = 0
LANGUAGE = "en"

View File

@ -1,8 +1,8 @@
#!/usr/bin/python
"""Файл инициализации приложения"""
from src.app import TelegramTUI
from src.app import Talc
if __name__ == "__main__":
tg = TelegramTUI()
tg.run()
talc = Talc()
talc.run()

View File

@ -1,6 +1,3 @@
textual
telethon
python-dotenv
emoji
Pillow
pywhatkit

View File

@ -1,31 +1,33 @@
"""Главный файл приложения"""
import os
from os import getenv
from dotenv import load_dotenv
from telethon import TelegramClient, events
from telethon import TelegramClient
from textual.app import App
from src.screens import AuthScreen, ChatScreen
import src.locales
load_dotenv()
API_ID = getenv("API_ID")
API_HASH = getenv("API_HASH")
api_id = os.getenv("API_ID")
api_hash = os.getenv("API_HASH")
if not api_id or not api_hash:
if not API_ID or not API_HASH:
raise ValueError(
"API_ID и API_HASH не найдены в .env файле. "
"Пожалуйста, скопируйте .env.example в .env и заполните свои ключи."
)
api_id = int(api_id)
API_ID = int(API_ID)
class TelegramTUI(App):
#locale = locales.
class Talc(App):
"""Класс приложения"""
CSS_PATH = "style.tcss"
async def on_mount(self) -> None:
self.telegram_client = TelegramClient("user", api_id, api_hash)
self.telegram_client = TelegramClient(getenv("CURRENT_USER"), API_ID, API_HASH)
await self.telegram_client.connect()
chat_screen = ChatScreen(telegram_client=self.telegram_client)
@ -38,6 +40,8 @@ class TelegramTUI(App):
else:
self.push_screen("chats")
self.scroll_sensitivity_y = 1.0
async def on_exit_app(self):
await self.telegram_client.disconnect()
return super()._on_exit_app()

2
src/locales.py Normal file
View File

@ -0,0 +1,2 @@
ru = {"greeting_auth": "Добро пожаловать в Тальк", "phone_number": "Номер телефона", "code": "Код", "password": "Пароль", "you": "Вы", "mention": "Вас упомянули"}
en = {"greeting_auth": "Welcome to Talc", "phone_number": "Phone number", "code": "Code", "password": "Password", "you": "You", "mention": "You got mentioned"}

View File

@ -3,9 +3,12 @@
from textual.screen import Screen
from textual.widgets import Label, Input, Footer, Static, ContentSwitcher
from textual.containers import Vertical, Horizontal, VerticalScroll
from textual.app import ComposeResult
from telethon.errors import SessionPasswordNeededError
from telethon import TelegramClient, events
from src.widgets import Dialog, Chat
from src.widgets import Chat
from os import system, getenv
from telethon.utils import get_display_name
class AuthScreen(Screen):
"""Класс экрана логина в аккаунт"""
@ -16,16 +19,16 @@ class AuthScreen(Screen):
id = None,
classes = None,
telegram_client: TelegramClient | None = None
):
) -> None:
super().__init__(name, id, classes)
self.client = telegram_client
def on_mount(self):
def on_mount(self) -> None:
self.ac = self.query_one("#auth_container")
def compose(self):
def compose(self) -> ComposeResult:
with Vertical(id="auth_container"):
yield Label("Добро пожаловать в Telegram TUI")
yield Label("Добро пожаловать в Talc")
yield Input(placeholder="Номер телефона", id="phone")
yield Input(placeholder="Код", id="code", disabled=True)
yield Input(
@ -36,12 +39,13 @@ class AuthScreen(Screen):
)
async def on_input_submitted(self, event: Input.Submitted) -> None:
if event.input.id == "phone":
match event.input.id:
case "phone":
self.phone = event.value
self.ac.query_one("#phone").disabled = True
self.ac.query_one("#code").disabled = False
await self.client.send_code_request(phone=self.phone)
elif event.input.id == "code":
case "code":
try:
self.code = event.value
self.ac.query_one("#code").disabled = True
@ -51,7 +55,7 @@ class AuthScreen(Screen):
except SessionPasswordNeededError:
self.ac.query_one("#code").disabled = True
self.ac.query_one("#password").disabled = False
elif event.input.id == "password":
case "password":
self.password = event.value
await self.client.sign_in(password=self.password)
await self.client.start()
@ -67,13 +71,17 @@ class ChatScreen(Screen):
id = None,
classes = None,
telegram_client: TelegramClient | None = None
):
) -> None:
super().__init__(name, id, classes)
self.telegram_client = telegram_client
self.DO_NOTIFY = getenv("DO_NOTIFY")
async def on_mount(self):
async def on_mount(self) -> None:
self.limit = 100
#Получение ID пользователя (себя)
self.me_id = await self.telegram_client.get_peer_id("me")
# Получение объекта контейнера чатов
self.chat_container = self\
.query_one("#main_container")\
.query_one("#chats")\
@ -91,32 +99,39 @@ class ChatScreen(Screen):
self.is_chat_update_blocked = False
await self.update_chat_list()
print("Первоначальная загрузка чатов завершена")
# Автообновление чатов при следующих событиях
for event in (
events.NewMessage,
events.MessageDeleted,
events.MessageEdited
):
self.telegram_client.on(event())(self.update_chat_list)
self.telegram_client.on(events.NewMessage)(self.notify_send)
def mount_chats(self, limit: int):
def mount_chats(self, limit: int) -> None:
"""Функция маунта чатов"""
print("Загрузка виджетов чатов...")
# Счёт текущего количества примонтированных чатов
chats_amount = len(self.chat_container.query(Chat))
if limit > chats_amount:
# Маунт недостоющих, если чатов меньше, чем нужно
for i in range(limit - chats_amount):
chat = Chat(id=f"chat-{i + chats_amount + 1}")
self.chat_container.mount(chat)
elif limit < chats_amount:
# Удаление лишних, если чатов больше, чем нужно
for i in range(chats_amount - limit):
self.chat_container.query(Chat).last().remove()
# Ничего, если их ровно столько же
print("Виджеты чатов загружены")
async def update_chat_list(self, event = None):
async def update_chat_list(self, event = None) -> None:
"""Функция обновления чатов (и уведомления)"""
print("Запрос обновления чатов")
if not self.is_chat_update_blocked:
@ -127,25 +142,60 @@ class ChatScreen(Screen):
)
print("Получены диалоги")
# Маунт виджетов чатов в панели чатов по лимиту
limit = len(dialogs)
self.mount_chats(limit)
# Изменение надписей в виджетах чатов
for i in range(limit):
chat = self.chat_container.query_one(f"#chat-{i + 1}")
chat.username = str(dialogs[i].name)
chat.msg = str(dialogs[i].message.message)
chat.peername = str(dialogs[i].name)
chat.is_group = dialogs[i].is_group
chat.peer_id = dialogs[i].id
try:
is_my_msg = \
dialogs[i].message.from_id.user_id == self.me_id
except:
is_my_msg = dialogs[i].id == self.me_id
if dialogs[i].is_group and is_my_msg:
chat.username = "Вы"
chat.msg = str(dialogs[i].message.message)
elif dialogs[i].is_group:
chat.username = str(
get_display_name(dialogs[i].message.sender)
)
chat.msg = str(dialogs[i].message.message)
elif is_my_msg:
chat.msg = "Вы: " * is_my_msg + str(
dialogs[i].message.message
)
else:
chat.msg = str(dialogs[i].message.message)
self.is_chat_update_blocked = False
print("Чаты обновлены")
else:
print("Обновление чатов невозможно: уже выполняется")
def compose(self):
yield Footer()
with Horizontal(id="main_container"):
def notify_send(self, event) -> None:
if not event:
return None
if bool(self.DO_NOTIFY) and event.mentioned and not self.app.focused:
system(f"notify-send \"Вас упомянули\" Talc")
def compose(self) -> ComposeResult:
yield Footer() # Нижняя панель с подсказками
with Horizontal(id="main_container"): # Основной контейнер
with Horizontal(id="chats"):
yield VerticalScroll(id="chat_container")
#TODO: сделать кнопку чтобы прогрузить больше чатов
yield ContentSwitcher(id="dialog_switcher")
#yield Dialog(telegram_client=self.telegram_client)
#TODO: сделать кнопку, чтобы прогрузить больше чатов,
# или ленивую прокрутку
with ContentSwitcher(id="dialog_switcher"):
# ↑ Внутри него как раз крутятся диалоги
yield Label(
"Нажмите на чат в панели слева, чтобы начать общаться",
id="begin_talk_label"
) #TODO: не показывается надпись, надо будет исправить

View File

@ -10,6 +10,10 @@ Chat {
height: 3;
}
Chat Horizontal Vertical Label{
height: 1;
}
Rule {
color: #FFFFFF;
}
@ -42,12 +46,13 @@ Message Container {
align: right middle;
}
.is_me_true Static {
.is_me_true Container Label {
border: solid $primary;
width: auto;
height: auto;
text-align: right;
text-align: left;
min-width: 11;
max-width: 100%;
}
.is_me_false Container {
@ -55,10 +60,24 @@ Message Container {
align: left middle;
}
.is_me_false Static {
.is_me_false Container Label {
border: solid $foreground;
width: auto;
height: auto;
text-align: left;
min-width: 11;
max-width: 100%;
}
ContentSwitcher {
width: 100%;
height: 100%;
}
.begin_talk_label {
width: 100%;
height: 100%;
text-align: center;
content-align: center middle;
color: $panel;
}

View File

@ -5,14 +5,18 @@ from textual.widget import Widget
from textual.reactive import Reactive
from textual.widgets import Input, Button, Label, Static, ContentSwitcher
from textual.app import ComposeResult, RenderResult
from telethon import TelegramClient, events, utils
from textual.content import Content
from telethon import TelegramClient, events, utils, types
import datetime
from os import getenv
class Chat(Widget):
"""Класс виджета чата для панели чатов"""
username: Reactive[str] = Reactive(" ", recompose=True)
peername: Reactive[str] = Reactive(" ", recompose=True)
msg: Reactive[str] = Reactive(" ", recompose=True)
is_group: Reactive[bool] = Reactive(False, recompose=True)
peer_id: Reactive[int] = Reactive(0)
def __init__(
@ -30,29 +34,33 @@ class Chat(Widget):
)
def on_mount(self) -> None:
self.switcher = self.screen.query_one(Horizontal).query_one("#dialog_switcher", ContentSwitcher)
self.switcher = self.screen.query_one(Horizontal)\
.query_one("#dialog_switcher", ContentSwitcher)
def on_click(self) -> None:
# Получение ID диалога и создание DOM-ID на его основе
dialog_id = f"dialog-{str(self.peer_id)}"
print("click 1")
# Маунт диалога
try:
self.switcher.mount(Dialog(
telegram_client=self.app.telegram_client,
chat_id=self.peer_id,
id=dialog_id
))
print("click 1.1")
except:
print("click 1.2")
print("click 2")
# Диалог уже есть: ничего не делаем
pass
self.switcher.current = dialog_id
self.switcher.recompose()
print("click 3")
def compose(self) -> ComposeResult:
with Horizontal():
yield Label(f"┌───┐\n{self.username[:1]:1}\n└───┘")
yield Label(f"┌───┐\n{self.peername[:1]:1}\n└───┘")
with Vertical():
yield Label(self.peername, id="peername")
if self.is_group:
yield Label(self.username, id="name")
yield Label(self.msg, id="last_msg")
@ -71,6 +79,9 @@ class Dialog(Widget):
self.telegram_client = telegram_client
self.chat_id = chat_id
self.is_msg_update_blocked = False
self.timezone = datetime.timezone(
datetime.timedelta(hours=int(getenv("UTC_OFFSET")))
)
async def on_mount(self) -> None:
self.limit = 10
@ -80,7 +91,6 @@ class Dialog(Widget):
self.me = await self.telegram_client.get_me()
self.dialog.scroll_end(animate=False)
await self.update_dialog()
for event in (
@ -92,6 +102,8 @@ class Dialog(Widget):
event(chats=(self.chat_id))
)(self.update_dialog)
self.dialog.scroll_down(animate=False, immediate=True)
def mount_messages(self, limit: int) -> None:
print("Загрузка виджетов сообщений...")
@ -123,11 +135,46 @@ class Dialog(Widget):
for i in range(limit):
msg = self.dialog.query_one(f"#msg-{i + 1}")
msg.message = ""
if str(messages[i].message):
msg.message = str(messages[i].message)
message = Content(
"[Медиа] " * \
bool(messages[i].media) + \
str(messages[i].message)
)
else:
message = Content("[Медиа]" * bool(messages[i].media) + "")
entities = messages[i].entities
if entities:
for entity in entities:
match type(entity):
case types.MessageEntityBold:
message = message.stylize(
"bold",
entity.offset,
entity.offset + entity.length
)
case types.MessageEntityUnderline:
message = message.stylize(
"underline",
entity.offset,
entity.offset + entity.length
)
case types.MessageEntityItalic:
message = message.stylize(
"italic",
entity.offset,
entity.offset + entity.length
)
case types.MessageEntityStrike:
message = message.stylize(
"strike",
entity.offset,
entity.offset + entity.length
)
msg.message = message
#TODO: завести это:
try:
is_me = messages[i].from_id.user_id == self.me.id
except:
@ -137,7 +184,7 @@ class Dialog(Widget):
msg.username = utils.get_display_name(messages[i].sender)
msg.send_time = messages[i]\
.date\
.astimezone(datetime.timezone.utc)\
.astimezone(self.timezone)\
.strftime("%H:%M")
self.is_msg_update_blocked = False
@ -172,7 +219,7 @@ class Dialog(Widget):
class Message(Widget):
"""Класс виджета сообщений для окна диалога"""
message: Reactive[str] = Reactive("", recompose=True)
message: Reactive[Content] = Reactive("", recompose=True)
is_me: Reactive[bool] = Reactive(False, recompose=True)
username: Reactive[str] = Reactive("", recompose=True)
send_time: Reactive[str] = Reactive("", recompose=True)
@ -190,12 +237,12 @@ class Message(Widget):
pass
def compose(self) -> ComposeResult:
static = Static(self.message)
static.border_title = self.username * (not self.is_me)
static.border_subtitle = self.send_time
label = Label(self.message, markup=False)
label.border_title = self.username * (not self.is_me)
label.border_subtitle = self.send_time
with Container():
yield static
yield label
if self.is_me:
self.classes = "is_me_true"