Compare commits

..

9 Commits

Author SHA1 Message Date
wheelchairy
cca1249526 UNSTABLE | Work! I'm working on a message replay 2025-03-27 11:52:43 +03:00
wheelchairy
6d11fee925 UNSTABLE | Messages work 2025-03-27 11:32:33 +03:00
wheelchairy
74efe83f07 UNSTABLE | ok 2025-03-27 11:30:35 +03:00
wheelchairy
4992e71948 UNSTABLE | switch to urwid (fixing chat list) 2025-03-27 11:23:20 +03:00
Lain
fca2d580b0
Delete venv directory 2025-03-27 11:16:17 +03:00
wheelchairy
7e58d94786 UNSTABLE 2025-03-27 11:15:56 +03:00
wheelchairy
ef6e69bc38 UNSTABLE | Switching to Urwid 2025-03-27 11:14:34 +03:00
wheelchairy
82d5642a00 UNSTABLE | Switching to Urwid 2025-03-27 11:14:12 +03:00
wheelchairy
9ac58a6bfe UNSTABLE | Backup 2025-03-27 10:55:08 +03:00
15 changed files with 883 additions and 918 deletions

View File

@ -2,6 +2,10 @@
# Их использование крайне нежелательно!
# Получите свои API-ключи приложения на https://my.telegram.org/apps и вставьте их ниже.
# Спасибо за понимание!
API_ID=2040
API_HASH=b18441a1ff607e10a989891a5462e627
# Настройки приложения
APP_NAME=Telegram TUI
APP_VERSION=1.0
DEVICE_MODEL=Desktop

20
.gitignore vendored
View File

@ -1,7 +1,19 @@
test.py
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
venv/
# Telegram session files
*.session
*.session-journal
__pycache__
*/__pycache__
tokens.py
# Environment
.env
# IDE
.vscode/
.idea/

View File

@ -1,26 +1,26 @@
# Тальк
# Telegram TUI Client
Тальк — клиент Telegram с текстовым пользовательским интерфейсом, написанный на Python, Telethon и Textual.
## Требования
- Python 3.12
- pyenv (рекомендуется для управления версиями Python)
Консольный клиент Telegram на базе urwid с поддержкой:
- Просмотра чатов и сообщений
- Поиска по чатам
- Навигации с помощью клавиатуры
- Поддержки папок (Архив)
- Корректного отображения эмодзи и Unicode
## Установка
1. Установите Python 3.12 с помощью pyenv:
1. Клонируйте репозиторий:
```bash
pyenv install 3.12
pyenv local 3.12
git clone https://github.com/yourusername/talc.git
cd talc
```
2. Создайте и активируйте виртуальное окружение:
2. Создайте виртуальное окружение и активируйте его:
```bash
python -m venv .venv
source .venv/bin/activate # для Linux/macOS
python -m venv venv
source venv/bin/activate # Linux/macOS
# или
.venv\Scripts\activate # для Windows
venv\Scripts\activate # Windows
```
3. Установите зависимости:
@ -28,15 +28,50 @@ source .venv/bin/activate # для Linux/macOS
pip install -r requirements.txt
```
4. Настройте переменные окружения:
4. Скопируйте `.env.example` в `.env`:
```bash
cp .env.example .env
# Отредактируйте .env файл, добавив свои API ключи
# Получите ключи на https://my.telegram.org/apps
```
5. Получите API ключи на https://my.telegram.org/apps и добавьте их в `.env`
## Запуск
1. Активируйте виртуальное окружение:
```bash
python src/app.py
source venv/bin/activate # Linux/macOS
# или
venv\Scripts\activate # Windows
```
2. Запустите приложение:
```bash
python main_urwid.py
```
## Управление
- Tab: Переключение фокуса между поиском и списком чатов
- ↑↓: Выбор чата
- Enter: Открыть выбранный чат
- Esc: Вернуться к списку чатов
- /: Быстрый доступ к поиску
- []: Переключение между основными чатами и архивом
- Q: Выход
## Структура проекта
```
talc/
├── main_urwid.py # Основной файл запуска
├── requirements.txt # Зависимости проекта
├── .env.example # Пример конфигурации
├── .env # Конфигурация (не включена в git)
└── urwid_client/ # Основной код приложения
├── __init__.py
└── telegram_tui.py # Реализация клиента
```
## Лицензия
MIT

View File

@ -1,8 +0,0 @@
#!/usr/bin/python
"""Файл инициализации приложения"""
from src.app import TelegramTUI
if __name__ == "__main__":
tg = TelegramTUI()
tg.run()

16
main_urwid.py Normal file
View File

@ -0,0 +1,16 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Telegram TUI Client
Консольный клиент Telegram на базе urwid
"""
import asyncio
import os
from urwid_client.telegram_tui import main
if __name__ == "__main__":
try:
asyncio.run(main())
except Exception as e:
print(f"Ошибка при запуске приложения: {e}")

View File

@ -1,6 +1,5 @@
textual
telethon
python-dotenv
emoji
Pillow
pywhatkit
urwid>=2.1.2
telethon>=1.34.0
python-dotenv>=1.0.0
emoji>=2.10.1
nest_asyncio>=1.6.0

View File

@ -1,54 +0,0 @@
"""Главный файл приложения"""
import os
import sys
from dotenv import load_dotenv
from telethon import TelegramClient, events
from textual.app import App
from rich.console import Console
from src.screens import AuthScreen, ChatScreen
# Настройка консоли для корректной работы с Unicode
console = Console(force_terminal=True, color_system="auto")
sys.stdout = console
load_dotenv()
api_id = os.getenv("API_ID")
api_hash = os.getenv("API_HASH")
if not api_id or not api_hash:
raise ValueError(
"API_ID и API_HASH не найдены в .env файле. "
"Пожалуйста, скопируйте .env.example в .env и заполните свои ключи."
)
api_id = int(api_id)
class TelegramTUI(App):
"""Класс приложения"""
CSS_PATH = "style.tcss"
TITLE = "Telegram TUI"
def __init__(self):
super().__init__()
self.console = console
async def on_mount(self) -> None:
self.telegram_client = TelegramClient("user", api_id, api_hash)
await self.telegram_client.connect()
chat_screen = ChatScreen(telegram_client=self.telegram_client)
self.install_screen(chat_screen, name="chats")
if not await self.telegram_client.is_user_authorized():
auth_screen = AuthScreen(telegram_client=self.telegram_client)
self.install_screen(auth_screen, name="auth")
self.push_screen("auth")
else:
self.push_screen("chats")
async def on_exit_app(self):
await self.telegram_client.disconnect()
return super()._on_exit_app()

View File

@ -1,228 +0,0 @@
"""Файл с кастомными экранами приложения"""
from textual.screen import Screen
from textual.widgets import Label, Input, Footer, Static, ContentSwitcher
from textual.containers import Vertical, Horizontal, VerticalScroll
from textual.events import Key
from telethon.errors import SessionPasswordNeededError
from telethon import TelegramClient, events
from src.widgets import Dialog, Chat, normalize_text
from textual import log
class AuthScreen(Screen):
"""Класс экрана логина в аккаунт"""
def __init__(
self,
name = None,
id = None,
classes = None,
telegram_client: TelegramClient | None = None
):
super().__init__(name, id, classes)
self.client = telegram_client
def on_mount(self):
self.ac = self.query_one("#auth_container")
def compose(self):
with Vertical(id="auth_container"):
yield Label(normalize_text("Добро пожаловать в Telegram TUI"))
yield Input(placeholder=normalize_text("Номер телефона"), id="phone")
yield Input(placeholder=normalize_text("Код"), id="code", disabled=True)
yield Input(
placeholder=normalize_text("Пароль"),
id="password",
password=True,
disabled=True
)
async def on_input_submitted(self, event: Input.Submitted) -> None:
if event.input.id == "phone":
self.phone = normalize_text(event.value)
self.ac.query_one("#phone").disabled = True
self.ac.query_one("#code").disabled = False
await self.client.send_code_request(phone=self.phone)
elif event.input.id == "code":
try:
self.code = normalize_text(event.value)
self.ac.query_one("#code").disabled = True
await self.client.sign_in(phone=self.phone, code=self.code)
self.app.pop_screen()
self.app.push_screen("chats")
except SessionPasswordNeededError:
self.ac.query_one("#code").disabled = True
self.ac.query_one("#password").disabled = False
elif event.input.id == "password":
self.password = normalize_text(event.value)
await self.client.sign_in(password=self.password)
await self.client.start()
self.app.pop_screen()
self.app.push_screen("chats")
class ChatScreen(Screen):
"""Класс экрана чатов, он же основной экран приложения"""
def __init__(
self,
name = None,
id = None,
classes = None,
telegram_client: TelegramClient | None = None
):
super().__init__(name, id, classes)
self.telegram_client = telegram_client
self.search_query = ""
self.selected_chat_index = 0
self.chats = []
self.focused_element = "search" # search, chat_list, dialog
async def on_mount(self):
self.limit = 100
self.chat_container = self\
.query_one("#main_container")\
.query_one("#chats")\
.query_one("#chat_container")
self.search_input = self.query_one("#search_input")
self.help_label = self.query_one("#help_label")
log("Первоначальная загрузка виджетов чатов...")
self.mount_chats(
len(
await self.telegram_client.get_dialogs(
limit=self.limit, archived=False
)
)
)
log("Первоначальная загрузка виджетов чата завершена")
self.is_chat_update_blocked = False
await self.update_chat_list()
log("Первоначальная загрузка чатов завершена")
for event in (
events.NewMessage,
events.MessageDeleted,
events.MessageEdited
):
self.telegram_client.on(event())(self.update_chat_list)
def mount_chats(self, limit: int):
log("Загрузка виджетов чатов...")
chats_amount = len(self.chat_container.query(Chat))
if limit > chats_amount:
for i in range(limit - chats_amount):
chat = Chat(id=f"chat-{i + chats_amount + 1}")
self.chat_container.mount(chat)
elif limit < chats_amount:
for i in range(chats_amount - limit):
self.chat_container.query(Chat).last().remove()
log("Виджеты чатов загружены")
async def update_chat_list(self, event = None):
log("Запрос обновления чатов")
if not self.is_chat_update_blocked:
self.is_chat_update_blocked = True
dialogs = await self.telegram_client.get_dialogs(
limit=self.limit, archived=False
)
log("Получены диалоги")
# Фильтруем диалоги по поисковому запросу
if self.search_query:
dialogs = [
d for d in dialogs
if self.search_query.lower() in normalize_text(d.name).lower()
]
limit = len(dialogs)
self.mount_chats(limit)
for i in range(limit):
chat = self.chat_container.query_one(f"#chat-{i + 1}")
chat.username = normalize_text(str(dialogs[i].name))
chat.msg = normalize_text(str(dialogs[i].message.message))
chat.peer_id = dialogs[i].id
chat.is_selected = (i == self.selected_chat_index)
chat.is_focused = (self.focused_element == "chat_list" and i == self.selected_chat_index)
self.is_chat_update_blocked = False
log("Чаты обновлены")
else:
log("Обновление чатов невозможно: уже выполняется")
def on_input_changed(self, event: Input.Changed) -> None:
if event.input.id == "search_input":
self.search_query = normalize_text(event.value)
self.selected_chat_index = 0
self.update_chat_list()
def on_key(self, event: Key) -> None:
if event.key == "tab":
# Переключаем фокус между элементами
if self.focused_element == "search":
self.focused_element = "chat_list"
self.search_input.blur()
self.update_chat_list()
elif self.focused_element == "chat_list":
self.focused_element = "search"
self.search_input.focus()
self.update_chat_list()
return
if self.focused_element == "search":
return
chats = self.chat_container.query(Chat)
if not chats:
return
if event.key == "up":
self.selected_chat_index = max(0, self.selected_chat_index - 1)
for i, chat in enumerate(chats):
chat.is_selected = (i == self.selected_chat_index)
chat.is_focused = (i == self.selected_chat_index)
# Прокручиваем к выбранному чату
selected_chat = chats[self.selected_chat_index]
self.chat_container.scroll_to(selected_chat, animate=False)
elif event.key == "down":
self.selected_chat_index = min(len(chats) - 1, self.selected_chat_index + 1)
for i, chat in enumerate(chats):
chat.is_selected = (i == self.selected_chat_index)
chat.is_focused = (i == self.selected_chat_index)
# Прокручиваем к выбранному чату
selected_chat = chats[self.selected_chat_index]
self.chat_container.scroll_to(selected_chat, animate=False)
elif event.key == "enter":
chats[self.selected_chat_index].on_click()
elif event.key == "escape":
# Возвращаемся к списку чатов
self.app.pop_screen()
self.app.push_screen("chats")
elif event.key == "/":
# Фокус на поиск
self.focused_element = "search"
self.search_input.focus()
self.update_chat_list()
def compose(self):
yield Footer()
with Horizontal(id="main_container"):
with Vertical(id="chats"):
yield Label(
"Навигация: Tab - переключение фокуса, ↑↓ - выбор чата, Enter - открыть, Esc - назад, / - поиск",
id="help_label",
classes="help-text"
)
yield Input(placeholder=normalize_text("Поиск чатов..."), id="search_input")
yield VerticalScroll(id="chat_container")
yield ContentSwitcher(id="dialog_switcher")
#yield Dialog(telegram_client=self.telegram_client)

View File

@ -1,64 +0,0 @@
#chats {
width: 30%;
}
#dialog {
width: 70%;
}
Chat {
height: 3;
}
Rule {
color: #FFFFFF;
}
Message {
height: auto;
width: auto;
}
Message Container {
height: auto;
}
#input_place {
height: 3;
width: 70%;
align-horizontal: center;
}
#msg_input {
width: 65%;
}
#auth_container{
align: center middle;
}
.is_me_true Container {
padding: 0 0 0 15;
align: right middle;
}
.is_me_true Static {
border: solid $primary;
width: auto;
height: auto;
text-align: right;
min-width: 11;
}
.is_me_false Container {
padding: 0 15 0 0;
align: left middle;
}
.is_me_false Static {
border: solid $foreground;
width: auto;
height: auto;
text-align: left;
min-width: 11;
}

View File

@ -1,416 +0,0 @@
"""Файл с кастомными виджетами приложения"""
from textual.containers import Horizontal, Vertical, Container, VerticalScroll
from textual.widget import Widget
from textual.reactive import Reactive
from textual.widgets import Input, Button, Label, Static, ContentSwitcher
from textual.app import ComposeResult, RenderResult
from textual.keys import Keys
from telethon import TelegramClient, events, utils
import datetime
import unicodedata
import re
import emoji
import os
import tempfile
from PIL import Image
import pywhatkit as kit
from textual import log
def remove_emoji(text: str) -> str:
"""Удаляет эмодзи из текста"""
if not text:
return ""
return emoji.replace_emoji(text, '')
def normalize_text(text: str) -> str:
"""Нормализует текст для корректного отображения"""
if not text:
return ""
# Удаляем эмодзи
text = remove_emoji(text)
# Удаляем все управляющие символы
text = ''.join(char for char in text if unicodedata.category(char)[0] != 'C')
# Нормализуем Unicode
text = unicodedata.normalize('NFKC', text)
# Заменяем специальные символы на их ASCII-эквиваленты
text = text.replace('', '-').replace('', '-').replace('', '...')
# Удаляем все непечатаемые символы
text = ''.join(char for char in text if char.isprintable())
return text
def safe_ascii(text: str) -> str:
"""Преобразует текст в безопасный ASCII-формат"""
if not text:
return ""
# Удаляем эмодзи
text = remove_emoji(text)
# Оставляем только ASCII символы и пробелы
return ''.join(char for char in text if ord(char) < 128 or char.isspace())
def convert_image_to_ascii(image_path: str, width: int = 50) -> str:
"""Конвертирует изображение в ASCII-арт"""
try:
# Открываем изображение
img = Image.open(image_path)
# Конвертируем в RGB если нужно
if img.mode != 'RGB':
img = img.convert('RGB')
# Изменяем размер, сохраняя пропорции
aspect_ratio = img.height / img.width
height = int(width * aspect_ratio * 0.5) # * 0.5 потому что символы выше чем шире
img = img.resize((width, height))
# Конвертируем в ASCII
ascii_str = kit.image_to_ascii_art(image_path, output_file=None)
# Очищаем временный файл
os.remove(image_path)
return ascii_str
except Exception as e:
log(f"Ошибка конвертации изображения: {e}")
return "Ошибка загрузки изображения"
class Chat(Static):
"""Класс виджета чата для панели чатов"""
DEFAULT_CSS = """
Chat {
width: 100%;
height: auto;
min-height: 3;
padding: 1 2;
border: solid $accent;
margin: 1 0;
background: $surface;
}
Chat:hover {
background: $accent 20%;
}
Chat.-selected {
background: $accent 30%;
}
Chat:focus {
background: $accent 40%;
border: double $accent;
}
.chat-avatar {
width: 3;
height: 3;
content-align: center middle;
border: solid $accent;
}
.chat-content {
width: 100%;
margin-left: 1;
}
.chat-name {
color: $text;
text-style: bold;
}
.chat-message {
color: $text-muted;
}
"""
def __init__(
self,
name: str | None = None,
id: str | None = None,
classes: str | None = None,
disabled: bool = False
) -> None:
super().__init__(name=name, id=id, classes=classes, disabled=disabled)
self.can_focus = True
self._username = ""
self._msg = ""
self._peer_id = 0
self._is_selected = False
def on_mount(self) -> None:
self.switcher = self.screen.query_one("#dialog_switcher", ContentSwitcher)
@property
def username(self) -> str:
return self._username
@username.setter
def username(self, value: str) -> None:
self._username = value
self.refresh()
@property
def msg(self) -> str:
return self._msg
@msg.setter
def msg(self, value: str) -> None:
self._msg = value
self.refresh()
@property
def peer_id(self) -> int:
return self._peer_id
@peer_id.setter
def peer_id(self, value: int) -> None:
self._peer_id = value
@property
def is_selected(self) -> bool:
return self._is_selected
@is_selected.setter
def is_selected(self, value: bool) -> None:
self._is_selected = value
self.set_class(value, "-selected")
def on_focus(self) -> None:
# Снимаем выделение со всех чатов
for chat in self.screen.query(Chat):
chat.is_selected = False
# Выделяем текущий чат
self.is_selected = True
# Прокручиваем к этому чату
self.screen.chat_container.scroll_to(self, animate=False)
def on_click(self) -> None:
self.focus()
dialog_id = f"dialog-{str(self.peer_id)}"
try:
self.switcher.mount(Dialog(
telegram_client=self.app.telegram_client,
chat_id=self.peer_id,
id=dialog_id
))
except Exception as e:
log(f"Ошибка открытия диалога: {e}")
return
self.switcher.current = dialog_id
def on_key(self, event: Keys) -> None:
if event.key == "enter":
self.on_click()
elif event.key == "up" and self.id != "chat-1":
# Фокусируемся на предыдущем чате
prev_chat = self.screen.chat_container.query_one(f"#chat-{int(self.id.split('-')[1]) - 1}")
if prev_chat:
prev_chat.focus()
elif event.key == "down":
# Фокусируемся на следующем чате
next_chat = self.screen.chat_container.query_one(f"#chat-{int(self.id.split('-')[1]) + 1}")
if next_chat:
next_chat.focus()
def compose(self) -> ComposeResult:
"""Компонуем виджет чата"""
with Horizontal():
# Аватар (первая буква имени)
first_letter = normalize_text(self._username[:1].upper()) or "?"
yield Static(first_letter, classes="chat-avatar")
# Контент (имя и сообщение)
with Vertical(classes="chat-content"):
name = normalize_text(self._username) or "Без названия"
msg = normalize_text(self._msg) or "Нет сообщений"
msg = msg[:50] + "..." if len(msg) > 50 else msg
yield Static(name, classes="chat-name")
yield Static(msg, classes="chat-message")
class Dialog(Widget):
"""Класс окна диалога"""
def __init__(
self,
id=None,
classes=None,
disabled=None,
telegram_client: TelegramClient | None = None,
chat_id = None
) -> None:
super().__init__(id=id, classes=classes, disabled=disabled)
self.telegram_client = telegram_client
self.chat_id = chat_id
self.is_msg_update_blocked = False
self.messages_loaded = 0
self.is_loading = False
async def on_mount(self) -> None:
self.limit = 50 # Увеличиваем начальное количество сообщений
self.messages_loaded = self.limit
self.msg_input = self.query_one("#msg_input")
self.dialog = self.query_one(Vertical).query_one("#dialog")
self.load_more_btn = self.query_one("#load_more")
self.me = await self.telegram_client.get_me()
self.dialog.scroll_end(animate=False)
await self.update_dialog()
for event in (
events.NewMessage,
events.MessageDeleted,
events.MessageEdited
):
self.telegram_client.on(
event(chats=(self.chat_id))
)(self.update_dialog)
def mount_messages(self, limit: int) -> None:
print("Загрузка виджетов сообщений...")
msg_amount = len(self.dialog.query(Message))
if limit > msg_amount:
for i in range(limit - msg_amount):
self.dialog.mount(
Message(id=f"msg-{i + msg_amount + 1}"),
before=0
)
elif limit < msg_amount:
for i in range(msg_amount - limit):
self.dialog.query(Message).last().remove()
async def update_dialog(self, event = None) -> None:
log("Запрос обновления сообщений")
if not self.is_msg_update_blocked:
self.is_msg_update_blocked = True
messages = await self.telegram_client.get_messages(
entity=self.chat_id, limit=self.limit
)
log("Получены сообщения")
limit = len(messages)
self.mount_messages(limit)
for i in range(limit):
msg = self.dialog.query_one(f"#msg-{i + 1}")
msg.message = ""
# Обрабатываем изображения
if messages[i].media:
try:
# Скачиваем изображение
image_data = await self.telegram_client.download_media(
messages[i].media,
bytes
)
if image_data:
await msg.set_image(image_data)
except Exception as e:
log(f"Ошибка загрузки изображения: {e}")
# Обрабатываем текст
if str(messages[i].message):
msg.message = normalize_text(str(messages[i].message))
try:
is_me = messages[i].from_id.user_id == self.me.id
except:
is_me = False
msg.is_me = is_me
msg.username = normalize_text(utils.get_display_name(messages[i].sender))
msg.send_time = messages[i]\
.date\
.astimezone(datetime.timezone.utc)\
.strftime("%H:%M")
self.is_msg_update_blocked = False
log("Сообщения обновлены")
else:
log("Обновление сообщений невозможно: уже выполняется")
async def load_more_messages(self) -> None:
if not self.is_loading:
self.is_loading = True
self.load_more_btn.disabled = True
self.load_more_btn.label = "Загрузка..."
try:
messages = await self.telegram_client.get_messages(
entity=self.chat_id,
limit=self.limit,
offset_id=self.messages_loaded
)
if messages:
self.messages_loaded += len(messages)
self.mount_messages(self.messages_loaded)
await self.update_dialog()
finally:
self.is_loading = False
self.load_more_btn.disabled = False
self.load_more_btn.label = "Загрузить еще"
def compose(self) -> ComposeResult:
with Vertical():
yield Button("Загрузить еще", id="load_more", variant="default")
yield VerticalScroll(id="dialog")
with Horizontal(id="input_place"):
yield Input(placeholder="Сообщение", id="msg_input")
yield Button(label=">", id="send", variant="primary")
async def on_button_pressed(self, event) -> None:
if event.button.id == "load_more":
await self.load_more_messages()
else:
await self.send_message()
async def on_input_submitted(self, event = None) -> None:
await self.send_message()
async def send_message(self) -> None:
try:
await self.telegram_client.send_message(
self.chat_id,
normalize_text(str(self.msg_input.value))
)
except ValueError:
self.app.notify("Ошибка отправки")
self.msg_input.value = ""
await self.update_dialog()
class Message(Widget):
"""Класс виджета сообщений для окна диалога"""
message: Reactive[str] = Reactive("", recompose=True)
is_me: Reactive[bool] = Reactive(False, recompose=True)
username: Reactive[str] = Reactive("", recompose=True)
send_time: Reactive[str] = Reactive("", recompose=True)
def __init__(
self,
name=None,
id=None,
classes=None,
disabled=False
) -> None:
super().__init__(name=name, id=id, classes=classes, disabled=disabled)
def on_mount(self) -> None:
pass
def compose(self) -> ComposeResult:
static = Static(normalize_text(self.message))
static.border_title = normalize_text(self.username) * (not self.is_me)
static.border_subtitle = self.send_time
with Container():
yield static
if self.is_me:
self.classes = "is_me_true"
else:
self.classes = "is_me_false"

View File

@ -1,119 +0,0 @@
/* Основные стили */
Screen {
background: $surface;
color: $text;
}
/* Стили для подсказки */
.help-text {
color: $text-muted;
text-align: center;
padding: 1;
background: $surface;
border: solid $accent;
margin: 1;
}
/* Стили для чатов */
.chat-item {
padding: 1 2;
height: auto;
min-height: 3;
border: solid $accent;
margin: 1 0;
transition: background 500ms;
}
.chat-item:hover {
background: $accent 20%;
}
.chat-item.selected {
background: $accent 30%;
border: solid $accent;
}
.chat-item.focused {
background: $accent 40%;
border: solid $accent;
outline: solid $accent;
}
.chat-item.selected.focused {
background: $accent 50%;
}
#chat_container {
height: 100%;
border: solid $accent;
background: $surface;
}
/* Стили для диалога */
#dialog {
height: 100%;
border: solid $accent;
background: $surface;
}
#input_place {
height: auto;
padding: 1;
background: $surface;
border: solid $accent;
}
/* Стили для сообщений */
.message {
margin: 1 0;
padding: 1;
border: solid $accent;
}
.message.is_me_true {
background: $accent 20%;
margin-left: 20%;
}
.message.is_me_false {
background: $surface;
margin-right: 20%;
}
/* Стили для ASCII-арта */
.ascii-art {
font-family: monospace;
white-space: pre;
margin: 1 0;
padding: 1;
background: $surface;
border: solid $accent;
}
/* Стили для поиска */
#search_input {
margin: 1;
border: solid $accent;
}
/* Стили для кнопок */
Button {
margin: 1;
min-width: 10;
}
Button#load_more {
width: 100%;
margin: 0;
border: none;
background: $accent 20%;
}
Button#load_more:hover {
background: $accent 30%;
}
Button#load_more:disabled {
background: $accent 10%;
color: $text-muted;
}

View File

@ -0,0 +1,4 @@
# Telegram API ключи
# Получите их на https://my.telegram.org/apps
API_ID=123456
API_HASH=abcdef1234567890abcdef1234567890

38
urwid_client/README.md Normal file
View File

@ -0,0 +1,38 @@
# Telegram TUI Client
Консольный клиент Telegram на базе urwid с поддержкой:
- Просмотра чатов и сообщений
- Поиска по чатам
- Навигации с помощью клавиатуры
- Поддержки папок (Архив)
- Корректного отображения эмодзи и Unicode
## Установка
1. Установите зависимости:
```bash
pip install telethon urwid python-dotenv nest-asyncio emoji
```
2. Скопируйте `.env.example` в `.env`:
```bash
cp .env.example .env
```
3. Получите API ключи на https://my.telegram.org/apps и добавьте их в `.env`
## Запуск
```bash
python telegram_tui.py
```
## Управление
- Tab: Переключение фокуса между поиском и списком чатов
- ↑↓: Выбор чата
- Enter: Открыть выбранный чат
- Esc: Вернуться к списку чатов
- /: Быстрый доступ к поиску
- []: Переключение между основными чатами и архивом
- Q: Выход

9
urwid_client/__init__.py Normal file
View File

@ -0,0 +1,9 @@
# -*- coding: utf-8 -*-
"""
Telegram TUI Client
Консольный клиент Telegram на базе urwid
"""
from .telegram_tui import main, TelegramTUI
__all__ = ['main', 'TelegramTUI']

View File

@ -0,0 +1,737 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Telegram TUI Client
Консольный клиент Telegram на базе urwid
"""
import urwid
import asyncio
import os
import nest_asyncio
import unicodedata
import emoji
from telethon import TelegramClient, events, utils
from telethon.errors import SessionPasswordNeededError
from dotenv import load_dotenv
import datetime
# Разрешаем вложенные event loops
nest_asyncio.apply()
def normalize_text(text: str) -> str:
"""Нормализует текст для корректного отображения"""
if not text:
return ""
try:
# Преобразуем в строку, если это не строка
text = str(text)
# Удаляем эмодзи
text = emoji.replace_emoji(text, '')
# Нормализуем Unicode
text = unicodedata.normalize('NFKC', text)
# Заменяем специальные символы на их ASCII-эквиваленты
text = text.replace('', '-').replace('', '-').replace('', '...')
# Удаляем все управляющие символы, кроме новой строки и табуляции
text = ''.join(char for char in text if unicodedata.category(char)[0] != 'C'
or char in ('\n', '\t'))
# Удаляем множественные пробелы
text = ' '.join(text.split())
return text
except Exception as e:
print(f"Ошибка нормализации текста: {e}")
return "Ошибка отображения"
class ChatWidget(urwid.WidgetWrap):
"""Виджет чата"""
def __init__(self, chat_id, name, message="", is_selected=False, folder=0):
self.chat_id = chat_id
self.name = normalize_text(name)
self.message = normalize_text(message)
self.is_selected = is_selected
self.folder = folder
self.has_focus = False
# Создаем содержимое виджета
self.update_widget()
super().__init__(self.widget)
def update_widget(self):
"""Обновляет внешний вид виджета"""
# Подготавливаем данные
name = self.name if self.name else "Без названия"
msg = self.message if self.message else "Нет сообщений"
if len(msg) > 50:
msg = msg[:47] + "..."
# Добавляем метку папки если нужно
if self.folder == 1:
name += " [Архив]"
# Получаем первую букву для аватара
first_letter = next((c for c in name if c.isprintable()), "?")
# Определяем стиль
if self.has_focus:
style = 'selected'
elif self.is_selected:
style = 'chat_selected'
else:
style = 'chat'
# Создаем виджеты
avatar = urwid.AttrMap(
urwid.Text(f" {first_letter} ", align='center'),
style
)
content = urwid.Pile([
urwid.AttrMap(
urwid.Text(name),
style
),
urwid.AttrMap(
urwid.Text(msg),
style
)
])
self.widget = urwid.AttrMap(
urwid.Columns([
('fixed', 3, avatar),
content
]),
None
)
def selectable(self):
return True
def render(self, size, focus=False):
if self.has_focus != focus:
self.has_focus = focus
self.update_widget()
return super().render(size, focus)
def keypress(self, size, key):
if key == 'enter':
return key
elif key == 'tab':
return key
elif key in ('up', 'down'):
return key
return key
class MessageWidget(urwid.WidgetWrap):
"""Виджет сообщения"""
def __init__(self, text="", username="", is_me=False, send_time=""):
self.text = normalize_text(text)
self.username = normalize_text(username)
self.is_me = is_me
self.send_time = send_time
# Создаем содержимое виджета
self.update_widget()
super().__init__(self.widget)
def update_widget(self):
"""Обновляет внешний вид виджета"""
# Подготавливаем текст
text = self.text if self.text else "Пустое сообщение"
username = self.username if self.username else "Неизвестный"
# Создаем заголовок
header = urwid.Columns([
urwid.Text(username),
('fixed', 5, urwid.Text(self.send_time, align='right'))
])
# Создаем виджет
self.widget = urwid.AttrMap(
urwid.Pile([
urwid.AttrMap(header, 'chat_name'),
urwid.Text(text)
]),
'message_me' if self.is_me else 'message_other'
)
def selectable(self):
return False
class SearchEdit(urwid.Edit):
def __init__(self, *args, **kwargs):
self.search_callback = kwargs.pop('search_callback', None)
super().__init__(*args, **kwargs)
def keypress(self, size, key):
if key in ('up', 'down', 'esc', 'enter'):
return key
result = super().keypress(size, key)
# Вызываем поиск при каждом изменении текста
if self.search_callback and result is None:
asyncio.create_task(self.search_callback())
return result
class InputEdit(urwid.Edit):
def keypress(self, size, key):
if key in ('esc', 'up', 'down'):
return key
return super().keypress(size, key)
class TelegramTUI:
"""Основной класс приложения"""
palette = [
('header', 'white', 'dark blue', 'bold'),
('footer', 'white', 'dark blue', 'bold'),
('bg', 'white', 'black'),
('selected', 'black', 'light gray'),
('chat', 'white', 'black'),
('chat_selected', 'black', 'light gray'),
('chat_name', 'light cyan', 'black', 'bold'),
('chat_message', 'light gray', 'black'),
('message_me', 'light green', 'black'),
('message_other', 'white', 'black'),
('help', 'yellow', 'black'),
('error', 'light red', 'black'),
]
def __init__(self, telegram_client: TelegramClient):
self.telegram_client = telegram_client
self.current_screen = 'auth' # auth или chats
self.phone = None
self.code = None
self.password = None
self.auth_step = 'phone' # phone, code или password
# Создаем виджеты авторизации
self.phone_edit = urwid.Edit(('header', "Номер телефона: "))
self.code_edit = urwid.Edit(('header', "Код: "))
self.password_edit = urwid.Edit(('header', "Пароль: "), mask='*')
self.error_text = urwid.Text(('error', ""))
# Создаем виджеты чатов
self.search_edit = SearchEdit(
('header', "Поиск: "),
search_callback=self.update_chat_list
)
self.chat_walker = urwid.SimpleFocusListWalker([])
self.chat_list = urwid.ListBox(self.chat_walker)
self.message_walker = urwid.SimpleFocusListWalker([])
self.message_list = urwid.ListBox(self.message_walker)
self.input_edit = InputEdit(('header', "Сообщение: "))
# Создаем экраны
self.auth_widget = urwid.Filler(
urwid.Pile([
urwid.Text(('header', "\nДобро пожаловать в Telegram TUI\n"), align='center'),
urwid.Divider(),
self.phone_edit,
self.code_edit,
self.password_edit,
urwid.Divider(),
self.error_text,
urwid.Text(('help', "Нажмите Enter для подтверждения"), align='center')
])
)
# Создаем левую панель (чаты)
self.left_panel = urwid.LineBox(
urwid.Pile([
('pack', urwid.Text(('help', "Tab - переключение фокуса, ↑↓ - выбор чата, Enter - открыть чат, Esc - назад, / - поиск"), align='center')),
('pack', self.search_edit),
urwid.BoxAdapter(self.chat_list, 30) # Фиксированная высота для списка чатов
])
)
# Создаем правую панель (сообщения)
self.right_panel = urwid.LineBox(
urwid.Pile([
self.message_list,
('pack', self.input_edit)
])
)
# Создаем основной виджет чатов
self.chat_widget = urwid.Columns([
('weight', 30, self.left_panel),
('weight', 70, self.right_panel)
])
# Создаем основной виджет
self.main_widget = urwid.Frame(
self.auth_widget,
header=urwid.AttrMap(
urwid.Text(' Telegram TUI', align='center'),
'header'
),
footer=urwid.AttrMap(
urwid.Text(' Q: Выход | Tab: Переключение фокуса | Enter: Выбор/Отправка | Esc: Назад', align='center'),
'footer'
)
)
# Состояние чатов
self.current_folder = None
self.folders = []
self.chats = []
self.selected_chat_index = 0
self.focused_element = "chat_list" # chat_list, search, messages, input
self.current_chat_id = None
# Добавляем таймеры обновления
self.chat_update_task = None
self.message_update_task = None
self.last_update_time = 0
self.update_interval = 3 # секунды для чатов
self.message_update_interval = 1 # секунда для сообщений
self.last_message_update_time = 0
def switch_screen(self, screen_name: str):
"""Переключение между экранами"""
self.current_screen = screen_name
if screen_name == 'auth':
self.main_widget.body = self.auth_widget
elif screen_name == 'chats':
self.main_widget.body = self.chat_widget
async def handle_auth(self, key):
"""Обработка авторизации"""
if key != 'enter':
return
try:
if self.auth_step == 'phone':
phone = normalize_text(self.phone_edit.get_edit_text())
if phone:
self.phone = phone
await self.telegram_client.send_code_request(phone=phone)
self.auth_step = 'code'
self.error_text.set_text(('help', "Код отправлен"))
elif self.auth_step == 'code':
code = normalize_text(self.code_edit.get_edit_text())
if code:
try:
await self.telegram_client.sign_in(phone=self.phone, code=code)
self.switch_screen('chats')
await self.update_chat_list()
except SessionPasswordNeededError:
self.auth_step = 'password'
self.error_text.set_text(('help', "Требуется пароль"))
elif self.auth_step == 'password':
password = self.password_edit.get_edit_text()
if password:
await self.telegram_client.sign_in(password=password)
self.switch_screen('chats')
await self.update_chat_list()
except Exception as e:
self.error_text.set_text(('error', str(e)))
async def update_chat_list(self):
"""Обновляет список чатов"""
try:
# Получаем папки
if not self.folders:
try:
# Проверяем наличие архива
self.folders = [1] if await self.telegram_client.get_dialogs(limit=1, folder=1) else []
except Exception as e:
print(f"Ошибка получения папок: {e}")
self.folders = []
# Получаем диалоги
try:
dialogs = await self.telegram_client.get_dialogs(
limit=100,
folder=self.current_folder
)
except Exception as e:
print(f"Ошибка получения диалогов: {e}")
dialogs = []
# Фильтруем по поисковому запросу
search_query = normalize_text(self.search_edit.get_edit_text().lower())
if search_query:
filtered_dialogs = []
for dialog in dialogs:
try:
# Поиск по имени
name = ""
if hasattr(dialog.entity, 'title') and dialog.entity.title:
name = dialog.entity.title
elif hasattr(dialog.entity, 'first_name'):
name = dialog.entity.first_name
if hasattr(dialog.entity, 'last_name') and dialog.entity.last_name:
name += f" {dialog.entity.last_name}"
# Поиск по последнему сообщению
last_message = ""
if dialog.message and hasattr(dialog.message, 'message'):
last_message = dialog.message.message
# Если есть совпадение, добавляем диалог
if (search_query in normalize_text(name).lower() or
search_query in normalize_text(last_message).lower()):
filtered_dialogs.append(dialog)
except Exception as e:
print(f"Ошибка фильтрации диалога: {e}")
dialogs = filtered_dialogs
# Очищаем список
self.chat_walker[:] = []
# Добавляем чаты
for i, dialog in enumerate(dialogs):
try:
# Получаем имя и сообщение
entity = dialog.entity
# Определяем имя чата
if hasattr(entity, 'title') and entity.title:
name = entity.title
elif hasattr(entity, 'first_name'):
name = entity.first_name
if hasattr(entity, 'last_name') and entity.last_name:
name += f" {entity.last_name}"
else:
name = "Без названия"
# Получаем последнее сообщение
if dialog.message:
message = dialog.message.message if hasattr(dialog.message, 'message') else ""
else:
message = ""
chat = ChatWidget(
chat_id=dialog.id,
name=name,
message=message,
is_selected=(i == self.selected_chat_index),
folder=1 if self.current_folder else 0
)
self.chat_walker.append(chat)
except Exception as e:
print(f"Ошибка создания виджета чата: {e}")
# Обновляем фокус
if self.chat_walker:
self.selected_chat_index = min(self.selected_chat_index, len(self.chat_walker) - 1)
self.chat_list.set_focus(self.selected_chat_index)
self.update_selected_chat()
except Exception as e:
print(f"Ошибка обновления чатов: {e}")
def update_selected_chat(self):
"""Обновляет выделение выбранного чата"""
try:
for i, chat in enumerate(self.chat_walker):
was_selected = chat.is_selected
chat.is_selected = (i == self.selected_chat_index)
if was_selected != chat.is_selected:
chat.update_widget()
except Exception as e:
print(f"Ошибка обновления выделения: {e}")
async def update_message_list(self, chat_id):
"""Обновляет список сообщений"""
try:
# Получаем сообщения
messages = await self.telegram_client.get_messages(
entity=chat_id,
limit=50
)
# Получаем информацию о себе
me = await self.telegram_client.get_me()
# Очищаем список
self.message_walker[:] = []
# Добавляем сообщения
for msg in reversed(messages):
try:
# Определяем, отправлено ли сообщение нами
is_me = False
if hasattr(msg, 'from_id') and msg.from_id:
if hasattr(msg.from_id, 'user_id'):
is_me = msg.from_id.user_id == me.id
# Получаем текст сообщения
text = msg.message if hasattr(msg, 'message') else "Медиа"
# Получаем имя отправителя
username = ""
if hasattr(msg, 'sender') and msg.sender:
if hasattr(msg.sender, 'first_name'):
username = msg.sender.first_name
if hasattr(msg.sender, 'last_name') and msg.sender.last_name:
username += f" {msg.sender.last_name}"
elif hasattr(msg.sender, 'title'):
username = msg.sender.title
# Если не удалось получить имя, используем Me/Другой
if not username:
username = "Я" if is_me else "Неизвестный"
message = MessageWidget(
text=text,
username=username,
is_me=is_me,
send_time=msg.date.strftime("%H:%M")
)
self.message_walker.append(message)
except Exception as e:
print(f"Ошибка создания виджета сообщения: {e}")
# Прокручиваем к последнему сообщению
if self.message_walker:
self.message_list.set_focus(len(self.message_walker) - 1)
except Exception as e:
print(f"Ошибка обновления сообщений: {e}")
async def handle_chat_input(self, key):
"""Обработка ввода в экране чатов"""
if key == 'tab':
if self.focused_element == "search":
self.focused_element = "chat_list"
self.left_panel.original_widget.focus_position = 2
# Обновляем список при переключении на чаты
await self.update_chat_list()
elif self.focused_element == "chat_list":
if self.current_chat_id:
self.focused_element = "messages"
self.chat_widget.focus_position = 1
self.right_panel.original_widget.focus_position = 0
# Обновляем сообщения при переключении на них
await self.update_message_list(self.current_chat_id)
else:
self.focused_element = "search"
self.left_panel.original_widget.focus_position = 1
elif self.focused_element == "messages":
self.focused_element = "input"
self.right_panel.original_widget.focus_position = 1
elif self.focused_element == "input":
self.focused_element = "search"
self.chat_widget.focus_position = 0
self.left_panel.original_widget.focus_position = 1
# Обновляем поиск при переключении на него
await self.update_chat_list()
elif key in ('up', 'down'):
if self.focused_element == "chat_list" and self.chat_walker:
if key == 'up':
if self.chat_list.focus_position > 0:
self.chat_list.focus_position -= 1
else:
if self.chat_list.focus_position < len(self.chat_walker) - 1:
self.chat_list.focus_position += 1
# Обновляем выделение
self.selected_chat_index = self.chat_list.focus_position
self.update_selected_chat()
# Если чат открыт, обновляем его содержимое
if self.current_chat_id:
focused = self.chat_walker[self.selected_chat_index]
self.current_chat_id = focused.chat_id
await self.update_message_list(focused.chat_id)
elif self.focused_element == "messages" and self.message_walker:
if key == 'up':
if self.message_list.focus_position > 0:
self.message_list.focus_position -= 1
else:
if self.message_list.focus_position < len(self.message_walker) - 1:
self.message_list.focus_position += 1
elif key == 'enter':
if self.focused_element == "search":
await self.update_chat_list()
self.focused_element = "chat_list"
self.left_panel.original_widget.focus_position = 2
elif self.focused_element == "chat_list" and self.chat_walker:
try:
focused = self.chat_walker[self.chat_list.focus_position]
self.current_chat_id = focused.chat_id
self.selected_chat_index = self.chat_list.focus_position
await self.update_message_list(focused.chat_id)
self.focused_element = "input"
self.chat_widget.focus_position = 1
self.right_panel.original_widget.focus_position = 1
# Сбрасываем время последнего обновления сообщений
self.last_message_update_time = 0
except Exception as e:
print(f"Ошибка при открытии чата: {e}")
elif self.focused_element == "input" and self.current_chat_id:
message = self.input_edit.get_edit_text()
if message.strip():
try:
await self.telegram_client.send_message(self.current_chat_id, message)
self.input_edit.set_edit_text("")
# Сразу обновляем сообщения после отправки
self.last_message_update_time = 0
await self.update_message_list(self.current_chat_id)
except Exception as e:
print(f"Ошибка отправки сообщения: {e}")
elif key == 'esc':
if self.focused_element in ("input", "messages"):
# Закрываем текущий чат
self.current_chat_id = None
self.message_walker[:] = []
self.input_edit.set_edit_text("")
self.focused_element = "chat_list"
self.chat_widget.focus_position = 0
self.left_panel.original_widget.focus_position = 2
elif self.focused_element == "search":
self.search_edit.set_edit_text("")
await self.update_chat_list()
self.focused_element = "chat_list"
self.left_panel.original_widget.focus_position = 2
def unhandled_input(self, key):
"""Обработка необработанных нажатий клавиш"""
if key in ('q', 'Q'):
raise urwid.ExitMainLoop()
# Создаем задачу для асинхронной обработки
if self.current_screen == 'auth':
asyncio.create_task(self.handle_auth(key))
else:
asyncio.create_task(self.handle_chat_input(key))
async def start_auto_updates(self):
"""Запускает автоматическое обновление чатов и сообщений"""
if self.chat_update_task:
self.chat_update_task.cancel()
if self.message_update_task:
self.message_update_task.cancel()
async def chat_update_loop():
while True:
try:
current_time = datetime.datetime.now().timestamp()
if current_time - self.last_update_time >= self.update_interval:
await self.update_chat_list()
self.last_update_time = current_time
await asyncio.sleep(1)
except Exception as e:
print(f"Ошибка в цикле обновления чатов: {e}")
await asyncio.sleep(1)
async def message_update_loop():
while True:
try:
if self.current_chat_id:
current_time = datetime.datetime.now().timestamp()
if current_time - self.last_message_update_time >= self.message_update_interval:
await self.update_message_list(self.current_chat_id)
self.last_message_update_time = current_time
await asyncio.sleep(0.5)
except Exception as e:
print(f"Ошибка в цикле обновления сообщений: {e}")
await asyncio.sleep(0.5)
self.chat_update_task = asyncio.create_task(chat_update_loop())
self.message_update_task = asyncio.create_task(message_update_loop())
async def stop_auto_updates(self):
"""Останавливает автоматическое обновление"""
if self.chat_update_task:
self.chat_update_task.cancel()
self.chat_update_task = None
if self.message_update_task:
self.message_update_task.cancel()
self.message_update_task = None
async def run(self):
"""Запуск приложения"""
try:
# Подключаемся к Telegram
await self.telegram_client.connect()
print("Подключено к Telegram")
# Проверяем авторизацию
if await self.telegram_client.is_user_authorized():
self.switch_screen('chats')
await self.update_chat_list()
# Запускаем автообновление
await self.start_auto_updates()
else:
self.switch_screen('auth')
# Создаем event loop для urwid
event_loop = urwid.AsyncioEventLoop(loop=asyncio.get_event_loop())
# Запускаем интерфейс
urwid.MainLoop(
self.main_widget,
self.palette,
event_loop=event_loop,
unhandled_input=self.unhandled_input
).run()
except Exception as e:
print(f"Ошибка при запуске приложения: {e}")
finally:
# Останавливаем автообновление
await self.stop_auto_updates()
if self.telegram_client and self.telegram_client.is_connected():
await self.telegram_client.disconnect()
print("Отключено от Telegram")
async def main():
# Загружаем переменные окружения
load_dotenv()
# Проверяем наличие API ключей
api_id = os.getenv("API_ID")
api_hash = os.getenv("API_HASH")
if not api_id or not api_hash:
print("API_ID и API_HASH не найдены в .env файле.")
print("Пожалуйста, скопируйте .env.example в .env и заполните свои ключи.")
return
# Преобразуем API_ID в число
api_id = int(api_id)
# Инициализируем клиент Telegram
session_file = "talc.session"
# Создаем клиент
client = TelegramClient(
session_file,
api_id=api_id,
api_hash=api_hash,
system_version="macOS 14.3.1",
device_model="MacBook",
app_version="1.0"
)
# Создаем и запускаем приложение
app = TelegramTUI(client)
await app.run()
if __name__ == "__main__":
try:
asyncio.run(main())
except Exception as e:
print(f"Ошибка при запуске приложения: {e}")