mirror of
https://github.com/avitoras/telegram-tui.git
synced 2025-07-27 11:20:31 +00:00
Изменил структуру проекта + сообщения снова обновляются + теперь всё стабильно
я очень крутой
This commit is contained in:
commit
bcd1ec6fbe
5
.gitignore
vendored
Normal file
5
.gitignore
vendored
Normal file
@ -0,0 +1,5 @@
|
||||
test.py
|
||||
*.session
|
||||
*.session-journal
|
||||
__pycache__
|
||||
*/__pycache__
|
Binary file not shown.
Binary file not shown.
2
main.py
2
main.py
@ -1,4 +1,4 @@
|
||||
from app.app import TelegramTUI
|
||||
from src.app import TelegramTUI
|
||||
|
||||
if __name__ == "__main__":
|
||||
tg = TelegramTUI()
|
||||
|
@ -1,56 +0,0 @@
|
||||
from textual.screen import Screen
|
||||
from textual.widgets import Label, Input
|
||||
from textual.containers import Vertical
|
||||
from telethon import TelegramClient
|
||||
from telethon.errors import SessionPasswordNeededError
|
||||
|
||||
class AuthScreen(Screen):
|
||||
"""Класс логина в аккаунт"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name = None,
|
||||
id = None,
|
||||
classes = None,
|
||||
telegram_client: TelegramClient | None = None
|
||||
):
|
||||
super().__init__(name, id, classes)
|
||||
self.client = telegram_client
|
||||
|
||||
def on_mount(self):
|
||||
self.ac = self.query_one("#auth_container")
|
||||
|
||||
def compose(self):
|
||||
with Vertical(id="auth_container"):
|
||||
yield Label("Добро пожаловать в Telegram TUI")
|
||||
yield Input(placeholder="Номер телефона", id="phone")
|
||||
yield Input(placeholder="Код", id="code", disabled=True)
|
||||
yield Input(
|
||||
placeholder="Пароль",
|
||||
id="password",
|
||||
password=True,
|
||||
disabled=True
|
||||
)
|
||||
|
||||
async def on_input_submitted(self, event: Input.Submitted) -> None:
|
||||
if event.input.id == "phone":
|
||||
self.phone = event.value
|
||||
self.ac.query_one("#phone").disabled = True
|
||||
self.ac.query_one("#code").disabled = False
|
||||
await self.client.send_code_request(phone=self.phone)
|
||||
elif event.input.id == "code":
|
||||
try:
|
||||
self.code = event.value
|
||||
self.ac.query_one("#code").disabled = True
|
||||
await self.client.sign_in(phone=self.phone, code=self.code)
|
||||
self.app.pop_screen()
|
||||
self.app.push_screen("chats")
|
||||
except SessionPasswordNeededError:
|
||||
self.ac.query_one("#code").disabled = True
|
||||
self.ac.query_one("#password").disabled = False
|
||||
elif event.input.id == "password":
|
||||
self.password = event.value
|
||||
await self.client.sign_in(password=self.password)
|
||||
await self.client.start()
|
||||
self.app.pop_screen()
|
||||
self.app.push_screen("chats")
|
@ -1,64 +0,0 @@
|
||||
from textual.screen import Screen
|
||||
from textual.widgets import Footer, Static
|
||||
from textual.containers import Horizontal, VerticalScroll
|
||||
from telethon import TelegramClient, events
|
||||
from widgets.dialog import Dialog
|
||||
from widgets.chat import Chat
|
||||
|
||||
class ChatScreen(Screen):
|
||||
"""Класс экрана чатов, он же основной экран приложения"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name = None,
|
||||
id = None,
|
||||
classes = None,
|
||||
telegram_client: TelegramClient | None = None
|
||||
):
|
||||
super().__init__(name, id, classes)
|
||||
self.telegram_client = telegram_client
|
||||
self.telegram_client.on(events.NewMessage())(self.update_chat_list)
|
||||
|
||||
async def on_mount(self):
|
||||
self.chat_container = self\
|
||||
.query_one("#main_container")\
|
||||
.query_one("#chats")\
|
||||
.query_one("#chat_container")
|
||||
|
||||
self.limit = 100
|
||||
for i in range(self.limit):
|
||||
chat = Chat(id=f"chat-{i + 1}", notify_func=self.notify)
|
||||
self.chat_container.mount(chat)
|
||||
#self.mount_chats(limit=25)
|
||||
|
||||
await self.update_chat_list()
|
||||
|
||||
def mount_chats(self, limit: int):
|
||||
self.limit = limit
|
||||
chats_amount = len(self.chat_container.query(Chat))
|
||||
if limit > chats_amount:
|
||||
for i in range(limit - chats_amount):
|
||||
chat = Chat(id=f"chat-{i + 1 + (limit - chats_amount)}")
|
||||
self.chat_container.mount(chat)
|
||||
elif not (limit == chats_amount):
|
||||
for i in range(chats_amount - limit):
|
||||
self.chat_container.query(Chat).last().remove()
|
||||
|
||||
async def update_chat_list(self):
|
||||
dialogs = await self.telegram_client.get_dialogs(limit=self.limit)
|
||||
|
||||
for i in range(len(dialogs)):
|
||||
chat = self.chat_container.query_one(f"#chat-{i + 1}")
|
||||
chat.username = str(dialogs[i].name)
|
||||
chat.msg = str(dialogs[i].message.message)
|
||||
chat.peer_id = dialogs[i].id
|
||||
#self.notify("Новое сообщение") #колхоз дебаг
|
||||
|
||||
def compose(self):
|
||||
yield Footer()
|
||||
with Horizontal(id="main_container"):
|
||||
with Horizontal(id="chats"):
|
||||
yield VerticalScroll(Static(id="chat_container"))
|
||||
#TODO: сделать кнопку чтобы прогрузить больше чатов
|
||||
|
||||
yield Dialog()
|
@ -1,21 +1,12 @@
|
||||
from telethon import TelegramClient, events
|
||||
from telethon.errors import SessionPasswordNeededError
|
||||
from textual.app import App, ComposeResult
|
||||
from textual.containers import Horizontal, VerticalScroll, Vertical
|
||||
from textual.widgets import Static, Footer, Label, Input, Button
|
||||
from textual.screen import Screen
|
||||
from textual.events import Event
|
||||
from widgets.chat import Chat
|
||||
from widgets.dialog import Dialog
|
||||
from textual.app import App
|
||||
from tokens import api_id, api_hash
|
||||
from screens.auth_screen import AuthScreen
|
||||
from screens.chat_screen import ChatScreen
|
||||
from src.screens import AuthScreen, ChatScreen
|
||||
|
||||
class TelegramTUI(App):
|
||||
"""Класс приложения"""
|
||||
|
||||
CSS_PATH = "../tcss/style.tcss"
|
||||
#SCREENS = {"chats": ChatScreen}
|
||||
CSS_PATH = "style.tcss"
|
||||
|
||||
async def on_mount(self) -> None:
|
||||
self.telegram_client = TelegramClient("user", api_id, api_hash)
|
149
src/screens.py
Normal file
149
src/screens.py
Normal file
@ -0,0 +1,149 @@
|
||||
from textual.screen import Screen
|
||||
from textual.widgets import Label, Input, Footer, Static
|
||||
from textual.containers import Vertical, Horizontal, VerticalScroll
|
||||
from telethon.errors import SessionPasswordNeededError
|
||||
from telethon import TelegramClient, events, utils
|
||||
from src.widgets import Dialog, Chat
|
||||
|
||||
class AuthScreen(Screen):
|
||||
"""Класс логина в аккаунт"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name = None,
|
||||
id = None,
|
||||
classes = None,
|
||||
telegram_client: TelegramClient | None = None
|
||||
):
|
||||
super().__init__(name, id, classes)
|
||||
self.client = telegram_client
|
||||
|
||||
def on_mount(self):
|
||||
self.ac = self.query_one("#auth_container")
|
||||
|
||||
def compose(self):
|
||||
with Vertical(id="auth_container"):
|
||||
yield Label("Добро пожаловать в Telegram TUI")
|
||||
yield Input(placeholder="Номер телефона", id="phone")
|
||||
yield Input(placeholder="Код", id="code", disabled=True)
|
||||
yield Input(
|
||||
placeholder="Пароль",
|
||||
id="password",
|
||||
password=True,
|
||||
disabled=True
|
||||
)
|
||||
|
||||
async def on_input_submitted(self, event: Input.Submitted) -> None:
|
||||
if event.input.id == "phone":
|
||||
self.phone = event.value
|
||||
self.ac.query_one("#phone").disabled = True
|
||||
self.ac.query_one("#code").disabled = False
|
||||
await self.client.send_code_request(phone=self.phone)
|
||||
elif event.input.id == "code":
|
||||
try:
|
||||
self.code = event.value
|
||||
self.ac.query_one("#code").disabled = True
|
||||
await self.client.sign_in(phone=self.phone, code=self.code)
|
||||
self.app.pop_screen()
|
||||
self.app.push_screen("chats")
|
||||
except SessionPasswordNeededError:
|
||||
self.ac.query_one("#code").disabled = True
|
||||
self.ac.query_one("#password").disabled = False
|
||||
elif event.input.id == "password":
|
||||
self.password = event.value
|
||||
await self.client.sign_in(password=self.password)
|
||||
await self.client.start()
|
||||
self.app.pop_screen()
|
||||
self.app.push_screen("chats")
|
||||
self.app.notify("")
|
||||
|
||||
class ChatScreen(Screen):
|
||||
"""Класс экрана чатов, он же основной экран приложения"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name = None,
|
||||
id = None,
|
||||
classes = None,
|
||||
telegram_client: TelegramClient | None = None
|
||||
):
|
||||
super().__init__(name, id, classes)
|
||||
self.telegram_client = telegram_client
|
||||
|
||||
async def on_mount(self):
|
||||
self.limit = 30
|
||||
|
||||
self.chat_container = self\
|
||||
.query_one("#main_container")\
|
||||
.query_one("#chats")\
|
||||
.query_one("#chat_container")
|
||||
|
||||
print("Первоначальная загрузка виджетов чатов...")
|
||||
self.mount_chats(
|
||||
len(
|
||||
await self.telegram_client.get_dialogs(
|
||||
limit=self.limit, archived=False
|
||||
)
|
||||
)
|
||||
)
|
||||
print("Первоначальная загрузка виджетов чата завершена")
|
||||
|
||||
self.is_chat_update_blocked = False
|
||||
await self.update_chat_list()
|
||||
|
||||
print("Первоначальная загрузка чатов завершена")
|
||||
|
||||
for event in (
|
||||
events.NewMessage(),
|
||||
events.MessageDeleted(),
|
||||
events.MessageEdited()
|
||||
):
|
||||
self.telegram_client.on(event)(self.update_chat_list)
|
||||
|
||||
def mount_chats(self, limit: int):
|
||||
print("Загрузка виджетов чатов...")
|
||||
|
||||
chats_amount = len(self.chat_container.query(Chat))
|
||||
|
||||
if limit > chats_amount:
|
||||
for i in range(limit - chats_amount):
|
||||
chat = Chat(id=f"chat-{i + chats_amount + 1}")
|
||||
self.chat_container.mount(chat)
|
||||
elif limit < chats_amount:
|
||||
for i in range(chats_amount - limit):
|
||||
self.chat_container.query(Chat).last().remove()
|
||||
|
||||
print("Виджеты чатов загружены")
|
||||
|
||||
async def update_chat_list(self, event = None):
|
||||
print("Запрос обновления чатов")
|
||||
if not self.is_chat_update_blocked:
|
||||
self.is_chat_update_blocked = True
|
||||
dialogs = await self.telegram_client.get_dialogs(
|
||||
limit=self.limit, archived=False
|
||||
)
|
||||
print("Получены диалоги")
|
||||
limit = len(dialogs)
|
||||
#limit = 30
|
||||
self.mount_chats(limit)
|
||||
|
||||
for i in range(limit):
|
||||
chat = self.chat_container.query_one(f"#chat-{i + 1}")
|
||||
chat.username = str(dialogs[i].name)
|
||||
chat.msg = str(dialogs[i].message.message)
|
||||
chat.peer_id = dialogs[i].id
|
||||
#self.notify("Новое сообщение") #колхоз дебаг
|
||||
|
||||
self.is_chat_update_blocked = False
|
||||
print("Чаты обновлены")
|
||||
else:
|
||||
print("Обновление чатов невозможно: уже выполняется")
|
||||
|
||||
def compose(self):
|
||||
yield Footer()
|
||||
with Horizontal(id="main_container"):
|
||||
with Horizontal(id="chats"):
|
||||
yield VerticalScroll(Static(id="chat_container"))
|
||||
#TODO: сделать кнопку чтобы прогрузить больше чатов
|
||||
|
||||
yield Dialog()
|
101
src/widgets.py
Normal file
101
src/widgets.py
Normal file
@ -0,0 +1,101 @@
|
||||
from textual.containers import Horizontal, Vertical, Container, VerticalScroll
|
||||
from textual.widget import Widget
|
||||
from textual.reactive import Reactive
|
||||
from textual.widgets import Input, Button, Label
|
||||
|
||||
class Chat(Widget):
|
||||
"""Класс виджета чата для панели чатов"""
|
||||
|
||||
username = Reactive(" ", recompose=True)
|
||||
msg = Reactive(" ", recompose=True)
|
||||
peer_id = Reactive(0)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name: str | None = None,
|
||||
notify_func = None,
|
||||
id: str | None = None,
|
||||
classes: str | None = None,
|
||||
disabled: bool = False
|
||||
):
|
||||
super().__init__(
|
||||
name=str(name),
|
||||
id=id,
|
||||
classes=classes,
|
||||
disabled=disabled
|
||||
)
|
||||
self.notify = notify_func
|
||||
|
||||
def _on_click(self):
|
||||
self.msg = str(self.peer_id)
|
||||
self.notify("нажат чат")
|
||||
|
||||
def compose(self):
|
||||
with Horizontal():
|
||||
yield Label(f"┌───┐\n│ {self.username[:1]} │\n└───┘")
|
||||
with Vertical():
|
||||
yield Label(self.username, id="name")
|
||||
yield Label(self.msg, id="last_msg")
|
||||
|
||||
class Dialog(Widget):
|
||||
"""Класс окна диалога"""
|
||||
|
||||
def __init__(self, id=None, classes=None, disabled=False):
|
||||
super().__init__(id=id, classes=classes, disabled=disabled)
|
||||
|
||||
def compose(self):
|
||||
with Vertical():
|
||||
with VerticalScroll(id="dialog"):
|
||||
yield Message(message="привет, я ыплыжлп", is_me=True)
|
||||
yield Message(message="о, дщытрапшщцрущ", is_me=False)
|
||||
yield Message(message="ДАТОУШЩАРШЩУРЩША!!!!", is_me=False)
|
||||
# должно быть примерно
|
||||
# is_me = message.from_id == client.get_peer_id("me")
|
||||
|
||||
# но я могу ошибаться, я это фиш если что
|
||||
|
||||
#TODO: сделать кнопку чтобы прогрузить больше сообщений,
|
||||
#но при этом чтобы при перезаходе в чат оставались
|
||||
#прогруженными только 10 сообщений,
|
||||
#а остальные декомпоузились
|
||||
|
||||
with Horizontal(id="input_place"):
|
||||
yield Input(placeholder="Сообщение", id="msg_input")
|
||||
yield Button(label="➤", id="send", variant="primary")
|
||||
|
||||
def on_button_pressed(self, event): # self добавил
|
||||
self.app.notify("Нажато отправить")
|
||||
|
||||
class Message(Widget):
|
||||
"""Класс виджета сообщений для окна диалога"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name=None,
|
||||
message=None,
|
||||
is_me=None,
|
||||
id=None,
|
||||
classes=None,
|
||||
disabled=False
|
||||
):
|
||||
super().__init__(name=name, id=id, classes=classes, disabled=disabled)
|
||||
self.message = message
|
||||
self.is_me = is_me
|
||||
|
||||
def on_mount(self):
|
||||
container = self.query_one(Container)
|
||||
label = container.query_one(Label)
|
||||
if self.is_me:
|
||||
self.styles.padding = (0, 0, 0, 15)
|
||||
label.styles.text_align = "right"
|
||||
container.styles.align_horizontal = "right"
|
||||
label.styles.border = ("solid", "#4287f5")
|
||||
else:
|
||||
self.styles.padding = (0, 15, 0, 0)
|
||||
label.styles.text_align = "left"
|
||||
container.styles.align_horizontal = "left"
|
||||
label.styles.border = ("solid", "#ffffff")
|
||||
|
||||
def compose(self):
|
||||
with Container():
|
||||
yield Label(str(self.message))
|
Binary file not shown.
Binary file not shown.
@ -1,48 +0,0 @@
|
||||
"""
|
||||
ЭТОТ ФАЙЛ БОЛЬШЕ НЕ ИСПОЛЬЗУЕТСЯ
|
||||
СКОРО УДАЛИМ
|
||||
"""
|
||||
|
||||
from telethon import TelegramClient, events, utils
|
||||
|
||||
class TelegramClientWrapper:
|
||||
"""Обёртка для метода TelegramClient из Telethon"""
|
||||
|
||||
def __init__(self, api_id, api_hash, message_handler):
|
||||
self.message_handler = message_handler
|
||||
self.client = TelegramClient('user', api_id, api_hash)
|
||||
self.client.on(events.NewMessage())(self.local_message_handler)
|
||||
|
||||
async def local_message_handler(self, event):
|
||||
await self.message_handler()
|
||||
|
||||
async def connect(self):
|
||||
await self.client.connect()
|
||||
|
||||
async def start(self):
|
||||
await self.client.start()
|
||||
|
||||
async def disconnect(self):
|
||||
await self.client.disconnect()
|
||||
|
||||
async def get_dialogs(self, limit=None):
|
||||
await self.client.get_dialogs(limit=limit)
|
||||
dialogs_list = []
|
||||
async for dialog in self.client.iter_dialogs(limit=limit):
|
||||
dialogs_list.append(dialog)
|
||||
#return [self._map_dialog(d) for d in dialogs_list]
|
||||
return dialogs_list
|
||||
|
||||
#ого:
|
||||
"""def _map_dialog(self, dialog):
|
||||
return DialogInfo(
|
||||
id=dialog.id,
|
||||
name=utils.get_display_name(dialog.entity),
|
||||
message=dialog.message
|
||||
)"""
|
||||
|
||||
"""class DialogInfo:
|
||||
def __init__(self, id, name, message):
|
||||
self.id = id
|
||||
self.name = name
|
||||
self.message = message"""
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -1,38 +0,0 @@
|
||||
from textual.widgets import Label
|
||||
from textual.containers import Horizontal, Vertical
|
||||
from textual.widget import Widget
|
||||
from textual.reactive import Reactive
|
||||
|
||||
class Chat(Widget):
|
||||
"""Класс виджета чата для панели чатов"""
|
||||
|
||||
username = Reactive(" ", recompose=True)
|
||||
msg = Reactive(" ", recompose=True)
|
||||
peer_id = Reactive(0)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name: str | None = None,
|
||||
notify_func = None,
|
||||
id: str | None = None,
|
||||
classes: str | None = None,
|
||||
disabled: bool = False
|
||||
):
|
||||
super().__init__(
|
||||
name=str(name),
|
||||
id=id,
|
||||
classes=classes,
|
||||
disabled=disabled
|
||||
)
|
||||
self.notify = notify_func
|
||||
|
||||
def _on_click(self):
|
||||
self.msg = str(self.peer_id)
|
||||
self.notify("нажат чат")
|
||||
|
||||
def compose(self):
|
||||
with Horizontal():
|
||||
yield Label(f"┌───┐\n│ {self.username[:1]} │\n└───┘")
|
||||
with Vertical():
|
||||
yield Label(self.username, id="name")
|
||||
yield Label(self.msg, id="last_msg")
|
@ -1,33 +0,0 @@
|
||||
from textual.widgets import Input, Button, Label
|
||||
from textual.containers import Horizontal, VerticalScroll, Vertical
|
||||
from textual.widget import Widget
|
||||
from widgets.message import Message
|
||||
|
||||
class Dialog(Widget):
|
||||
"""Класс окна диалога"""
|
||||
|
||||
def __init__(self, id=None, classes=None, disabled=False):
|
||||
super().__init__(id=id, classes=classes, disabled=disabled)
|
||||
|
||||
def compose(self):
|
||||
with Vertical():
|
||||
with VerticalScroll(id="dialog"):
|
||||
yield Message(message="привет, я ыплыжлп", is_me=True)
|
||||
yield Message(message="о, дщытрапшщцрущ", is_me=False)
|
||||
yield Message(message="ДАТОУШЩАРШЩУРЩША!!!!", is_me=False)
|
||||
# должно быть примерно
|
||||
# is_me = message.from_id == client.get_peer_id("me")
|
||||
|
||||
# но я могу ошибаться, я это фиш если что
|
||||
|
||||
#TODO: сделать кнопку чтобы прогрузить больше сообщений,
|
||||
#но при этом чтобы при перезаходе в чат оставались
|
||||
#прогруженными только 10 сообщений,
|
||||
#а остальные декомпоузились
|
||||
|
||||
with Horizontal(id="input_place"):
|
||||
yield Input(placeholder="Сообщение", id="msg_input")
|
||||
yield Button(label="➤", id="send", variant="primary")
|
||||
|
||||
def on_button_pressed(self, event): # self добавил
|
||||
self.app.notify("Нажато отправить")
|
@ -1,37 +0,0 @@
|
||||
from textual.widgets import Label
|
||||
from textual.containers import Container
|
||||
from textual.widget import Widget
|
||||
|
||||
class Message(Widget):
|
||||
"""Класс виджета сообщений для окна диалога"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name=None,
|
||||
message=None,
|
||||
is_me=None,
|
||||
id=None,
|
||||
classes=None,
|
||||
disabled=False
|
||||
):
|
||||
super().__init__(name=name, id=id, classes=classes, disabled=disabled)
|
||||
self.message = message
|
||||
self.is_me = is_me
|
||||
|
||||
def on_mount(self):
|
||||
container = self.query_one(Container)
|
||||
label = container.query_one(Label)
|
||||
if self.is_me:
|
||||
self.styles.padding = (0, 0, 0, 15)
|
||||
label.styles.text_align = "right"
|
||||
container.styles.align_horizontal = "right"
|
||||
label.styles.border = ("solid", "#4287f5")
|
||||
else:
|
||||
self.styles.padding = (0, 15, 0, 0)
|
||||
label.styles.text_align = "left"
|
||||
container.styles.align_horizontal = "left"
|
||||
label.styles.border = ("solid", "#ffffff")
|
||||
|
||||
def compose(self):
|
||||
with Container():
|
||||
yield Label(str(self.message))
|
Loading…
x
Reference in New Issue
Block a user