:Added autocomplete for user names, improved ASCII-art cache handling and message updating. Fixed errors in reading and saving cache, and improved navigation in the autocomplete menu.

This commit is contained in:
wheelchairy 2025-03-27 13:00:52 +03:00
parent 4490ec3bfe
commit 6f7d2ffabf

View File

@ -137,28 +137,32 @@ class AsciiArtCache:
def get_cached_art(self, image_data):
"""Получает ASCII-арт из кэша или создает новый"""
cache_key = self.get_cache_key(image_data)
cache_file = os.path.join(self.cache_dir, f"{cache_key}.txt")
# Проверяем наличие в индексе
if cache_key in self.index:
# Проверяем наличие в кэше
if os.path.exists(cache_file):
try:
with open(os.path.join(self.cache_dir, cache_key + '.txt'), 'r') as f:
with open(cache_file, 'r') as f:
return f.read()
except FileNotFoundError:
pass
except Exception as e:
print(f"Ошибка чтения кэша ASCII: {e}")
# Если нет в кэше, создаем новый
ascii_art = image_to_ascii(image_data)
# Сохраняем в кэш
with open(os.path.join(self.cache_dir, cache_key + '.txt'), 'w') as f:
f.write(ascii_art)
# Обновляем индекс
self.index[cache_key] = {
'created_at': datetime.datetime.now().isoformat(),
'size': len(image_data)
}
self.save_index()
try:
with open(cache_file, 'w') as f:
f.write(ascii_art)
# Обновляем индекс
self.index[cache_key] = {
'created_at': datetime.datetime.now().isoformat(),
'size': len(image_data)
}
self.save_index()
except Exception as e:
print(f"Ошибка сохранения ASCII в кэш: {e}")
return ascii_art
@ -470,116 +474,118 @@ class SearchEdit(urwid.Edit):
return result
async def update_chat_list(self):
"""Обновляет список чатов"""
try:
# Сохраняем текущий фокус и ID выбранного чата
current_focus = self.chat_list.focus_position if self.chat_walker else 0
current_chat_id = self.current_chat_id
# Получаем диалоги
try:
dialogs = await self.telegram_client.get_dialogs(
limit=50,
folder=self.current_folder
)
except Exception as e:
print(f"Ошибка получения диалогов: {e}")
dialogs = []
# Фильтруем по поисковому запросу
search_query = normalize_text(self.search_edit.get_edit_text().lower())
if search_query:
filtered_dialogs = []
for dialog in dialogs:
try:
# Получаем имя
name = ""
if hasattr(dialog.entity, 'title') and dialog.entity.title:
name = dialog.entity.title
elif hasattr(dialog.entity, 'first_name'):
name = dialog.entity.first_name
if hasattr(dialog.entity, 'last_name') and dialog.entity.last_name:
name += f" {dialog.entity.last_name}"
name = normalize_text(name).lower()
# Получаем последнее сообщение
last_message = ""
if dialog.message and hasattr(dialog.message, 'message'):
last_message = normalize_text(dialog.message.message).lower()
# Проверяем совпадение
if search_query in name or search_query in last_message:
filtered_dialogs.append(dialog)
# Ограничиваем количество результатов для производительности
if len(filtered_dialogs) >= 20:
break
except Exception as e:
print(f"Ошибка фильтрации диалога: {e}")
dialogs = filtered_dialogs
# Очищаем список
self.chat_walker[:] = []
# Добавляем чаты
restored_focus = False
for i, dialog in enumerate(dialogs):
try:
# Получаем имя чата
name = ""
if hasattr(dialog.entity, 'title') and dialog.entity.title:
name = dialog.entity.title
elif hasattr(dialog.entity, 'first_name'):
name = dialog.entity.first_name
if hasattr(dialog.entity, 'last_name') and dialog.entity.last_name:
name += f" {dialog.entity.last_name}"
else:
name = "Без названия"
# Получаем последнее сообщение
message = ""
if dialog.message and hasattr(dialog.message, 'message'):
message = dialog.message.message
# Создаем виджет чата
chat = ChatWidget(
chat_id=dialog.id,
name=name,
message=message,
is_selected=(dialog.id == current_chat_id),
folder=1 if self.current_folder else 0
)
self.chat_walker.append(chat)
# Восстанавливаем фокус если это текущий чат
if dialog.id == current_chat_id and not restored_focus:
current_focus = i
restored_focus = True
except Exception as e:
print(f"Ошибка создания виджета чата: {e}")
# Восстанавливаем фокус
if self.chat_walker:
if current_focus >= len(self.chat_walker):
current_focus = len(self.chat_walker) - 1
self.chat_list.set_focus(max(0, current_focus))
self.selected_chat_index = current_focus
self.update_selected_chat()
except Exception as e:
print(f"Ошибка обновления чатов: {e}")
# В случае ошибки очищаем список
self.chat_walker[:] = []
class InputEdit(urwid.Edit):
def __init__(self, *args, **kwargs):
self.telegram_client = kwargs.pop('telegram_client', None)
self.chat_id = kwargs.pop('chat_id', None)
self.users_cache = {} # username -> (full_name, user_id)
self.completion_state = None # (start_pos, partial, matches, current_index)
self.parent = None
super().__init__(*args, **kwargs)
async def update_users_cache(self):
"""Обновляет кэш пользователей для текущего чата"""
try:
if not self.chat_id or not self.telegram_client:
return
# Получаем участников чата
participants = await self.telegram_client.get_participants(self.chat_id)
# Обновляем кэш
for user in participants:
if user.username:
full_name = f"{user.first_name}"
if user.last_name:
full_name += f" {user.last_name}"
self.users_cache[user.username.lower()] = (full_name, user.id)
except Exception as e:
print(f"Ошибка обновления кэша пользователей: {e}")
def find_username_matches(self, partial):
"""Находит совпадения для частичного имени пользователя"""
partial = partial.lower()
matches = []
for username in self.users_cache:
if username.startswith(partial):
full_name, _ = self.users_cache[username]
matches.append((username, full_name))
return sorted(matches, key=lambda x: len(x[0])) # Сортируем по длине юзернейма
def show_completion_menu(self, matches, start_pos, partial):
"""Показывает меню автодополнения"""
if not matches:
self.completion_state = None
return
# Сохраняем состояние автодополнения
self.completion_state = (start_pos, partial, matches, 0)
# Уведомляем родителя о необходимости показать меню
if self.parent and hasattr(self.parent, 'show_completion_overlay'):
self.parent.show_completion_overlay(matches)
def hide_completion_menu(self):
"""Скрывает меню автодополнения"""
self.completion_state = None
if self.parent and hasattr(self.parent, 'hide_completion_overlay'):
self.parent.hide_completion_overlay()
def keypress(self, size, key):
if key in ('esc', 'up', 'down'):
if self.completion_state:
if key == 'esc':
self.hide_completion_menu()
return None
elif key in ('up', 'down'):
# Передаем управление родителю для навигации по меню
if self.parent and hasattr(self.parent, 'handle_completion_navigation'):
return self.parent.handle_completion_navigation(key)
return key
return super().keypress(size, key)
if key == 'enter' and self.completion_state:
# Получаем выбранный вариант из родителя
if self.parent and hasattr(self.parent, 'get_selected_completion'):
selected = self.parent.get_selected_completion()
if selected is not None:
start_pos, partial, matches, _ = self.completion_state
username, _ = matches[selected]
# Обновляем текст
text = self.get_edit_text()
new_text = text[:start_pos] + "@" + username + " "
if len(text) > start_pos + len(partial) + 1:
new_text += text[start_pos + len(partial) + 1:]
self.set_edit_text(new_text)
self.set_edit_pos(len(new_text))
# Скрываем меню
self.hide_completion_menu()
return None
result = super().keypress(size, key)
# Проверяем, нужно ли показать меню автодополнения
if result is None and self.users_cache:
text = self.get_edit_text()
pos = self.edit_pos
# Ищем @ перед курсором
start_pos = text.rfind('@', 0, pos)
if start_pos != -1 and start_pos < pos:
partial = text[start_pos + 1:pos]
if partial:
matches = self.find_username_matches(partial)
if matches:
self.show_completion_menu(matches, start_pos, partial)
else:
self.hide_completion_menu()
else:
self.hide_completion_menu()
else:
self.hide_completion_menu()
return result
class TelegramTUI:
"""Основной класс приложения"""
@ -599,6 +605,8 @@ class TelegramTUI:
('error', 'light red', 'black'),
('message_selected', 'black', 'light gray'),
('input_disabled', 'dark gray', 'black'),
('completion_normal', 'white', 'black'),
('completion_focus', 'black', 'light gray'),
]
def __init__(self, telegram_client: TelegramClient):
@ -624,7 +632,7 @@ class TelegramTUI:
self.chat_list = urwid.ListBox(self.chat_walker)
self.message_walker = urwid.SimpleFocusListWalker([])
self.message_list = urwid.ListBox(self.message_walker)
self.input_edit = InputEdit(('header', "Сообщение: "))
self.input_edit = InputEdit(('header', "Сообщение: "), telegram_client=telegram_client)
# Создаем экраны
self.auth_widget = urwid.Filler(
@ -722,6 +730,10 @@ class TelegramTUI:
self.ascii_cache = AsciiArtCache()
self.media_cache = MediaCache(max_size_mb=1000) # 1GB по умолчанию
# Добавляем состояние для меню автодополнения
self.completion_listbox = None
self.completion_overlay = None
def switch_screen(self, screen_name: str):
"""Переключение между экранами"""
@ -914,26 +926,52 @@ class TelegramTUI:
media_type = 'gif'
if media_type:
# Загружаем медиа
# Для фото проверяем сначала кэш ASCII-арта
if media_type == 'photo':
# Получаем ID фото для кэша
photo_id = message.photo.id
cache_key = f"photo_{photo_id}"
# Проверяем кэш ASCII-арта
try:
with open(os.path.join('pics', f"{cache_key}.txt"), 'r') as f:
return f.read()
except FileNotFoundError:
# Если нет в кэше, загружаем и конвертируем
media_data = await self.telegram_client.download_media(message.media, bytes)
if media_data:
ascii_art = self.ascii_cache.get_cached_art(media_data)
return ascii_art
# Для остальных типов медиа
media_data = await self.telegram_client.download_media(message.media, bytes)
if media_data:
# Сохраняем в кэш
cached_path = self.media_cache.get_cached_file(media_data, media_type)
if cached_path:
if media_type == 'photo':
# Для фото создаем ASCII-арт
with open(cached_path, 'rb') as f:
return self.ascii_cache.get_cached_art(f.read())
else:
# Для других типов возвращаем описание
size_mb = len(media_data) / (1024 * 1024)
return f"[{media_type.upper()}: {size_mb:.1f}MB - {cached_path.name}]"
# Для других типов возвращаем описание
size_mb = len(media_data) / (1024 * 1024)
return f"[{media_type.upper()}: {size_mb:.1f}MB - {cached_path.name}]"
return None
except Exception as e:
print(f"Ошибка обработки медиа: {e}")
return f"[Ошибка обработки {media_type if media_type else 'медиа'}]"
async def message_update_loop(self):
"""Цикл обновления сообщений"""
while True:
try:
if self.current_chat_id:
current_time = datetime.datetime.now().timestamp()
if current_time - self.last_message_update_time >= self.message_update_interval:
await self.update_message_list(self.current_chat_id)
self.last_message_update_time = current_time
await asyncio.sleep(1)
except Exception as e:
print(f"Ошибка в цикле обновления сообщений: {e}")
await asyncio.sleep(1)
async def update_message_list(self, chat_id):
"""Обновляет список сообщений"""
try:
@ -941,40 +979,47 @@ class TelegramTUI:
self.message_walker[:] = []
return
# Сохраняем текущий фокус
# Сохраняем текущий фокус и ID выбранного сообщения
current_focus = self.message_list.focus_position if self.message_walker else None
selected_message_id = self.selected_message
# Получаем сообщения
messages = await self.telegram_client.get_messages(
entity=chat_id,
limit=30 # Уменьшаем лимит для стабильности
limit=30
)
# Получаем информацию о себе
me = await self.telegram_client.get_me()
# Сохраняем отслеживаемые сообщения
# Сохраняем отслеживаемые сообщения и их состояния
tracked_messages = {
msg_id: widget
for msg_id, widget in self.pending_messages.items()
}
# Сохраняем старые сообщения для сравнения
old_messages = {msg.message_id: msg for msg in self.message_walker}
# Сохраняем старые сообщения для переиспользования
old_messages = {
msg.message_id: msg
for msg in self.message_walker
}
# Очищаем список
self.message_walker[:] = []
# Создаем новый список сообщений
new_messages = []
new_focus = None
# Добавляем сообщения
for msg in reversed(messages):
for i, msg in enumerate(reversed(messages)):
try:
# Проверяем, было ли это сообщение раньше
old_message = old_messages.get(msg.id)
if old_message and not msg.photo:
# Переиспользуем существующий виджет для не-фото сообщений
self.message_walker.append(old_message)
# Проверяем, есть ли сообщение в старом списке
old_widget = old_messages.get(msg.id)
if old_widget:
# Переиспользуем существующий виджет
new_messages.append(old_widget)
if msg.id == selected_message_id:
new_focus = len(new_messages) - 1
continue
# Создаем новый виджет только для новых сообщений
is_me = False
if hasattr(msg, 'from_id') and msg.from_id:
if hasattr(msg.from_id, 'user_id'):
@ -983,13 +1028,14 @@ class TelegramTUI:
text = msg.message if hasattr(msg, 'message') else ""
media_data = None
# Обрабатываем все типы медиа
media_data = await self.process_media(msg)
if media_data:
if not text:
text = media_data
else:
text = media_data + "\n" + text
# Проверяем наличие медиа только для новых сообщений
if msg.media:
media_data = await self.process_media(msg)
if media_data:
if not text:
text = media_data
else:
text = media_data + "\n" + text
username = ""
if hasattr(msg, 'sender') and msg.sender:
@ -1017,44 +1063,38 @@ class TelegramTUI:
is_me=is_me,
send_time=msg.date.strftime("%H:%M"),
status=status,
is_selected=(msg.id == self.selected_message),
is_selected=(msg.id == selected_message_id),
media_data=media_data
)
if msg.id in tracked_messages:
self.pending_messages[msg.id] = message
self.message_walker.append(message)
new_messages.append(message)
if msg.id == selected_message_id:
new_focus = len(new_messages) - 1
except Exception as e:
print(f"Ошибка создания виджета сообщения: {e}")
# Восстанавливаем фокус
if current_focus is not None and current_focus < len(self.message_walker):
self.message_list.set_focus(current_focus)
elif self.message_walker:
self.message_list.set_focus(len(self.message_walker) - 1)
# Проверяем, изменился ли список сообщений
if new_messages:
current_messages = list(self.message_walker)
if len(current_messages) != len(new_messages) or any(a.message_id != b.message_id for a, b in zip(current_messages, new_messages)):
# Обновляем список только если есть изменения
self.message_walker[:] = new_messages
# Восстанавливаем фокус
if new_focus is not None:
self.message_list.set_focus(new_focus)
elif new_messages:
self.message_list.set_focus(len(new_messages) - 1)
except Exception as e:
print(f"Ошибка обновления сообщений: {e}")
self.message_walker[:] = []
async def update_message_status(self, message):
"""Обновляет статус сообщения"""
try:
if message.id in self.pending_messages:
widget = self.pending_messages[message.id]
# Определяем статус
if getattr(message, 'from_id', None):
widget.status = "vv" # Доставлено
else:
widget.status = "v" # Отправлено
widget.update_widget()
# Если сообщение доставлено, удаляем из отслеживания
if widget.status == "vv":
del self.pending_messages[message.id]
except Exception as e:
print(f"Ошибка обновления статуса: {e}")
# В случае ошибки НЕ очищаем список
# self.message_walker[:] = []
async def check_chat_permissions(self, chat_id):
"""Проверяет права на отправку сообщений в чате"""
try:
@ -1078,22 +1118,56 @@ class TelegramTUI:
# В случае ошибки разрешаем отправку для не-каналов
self.can_send_messages = not (hasattr(chat, 'broadcast') and chat.broadcast)
self.update_input_visibility()
def update_input_visibility(self):
"""Обновляет видимость поля ввода"""
if self.can_send_messages:
if self.replying_to:
self.input_edit = InputEdit(('header', f"Ответ на '{self.replying_to[:30]}...': "))
self.input_edit = InputEdit(
('header', f"Ответ на '{self.replying_to[:30]}...': "),
telegram_client=self.telegram_client,
chat_id=self.current_chat_id
)
else:
self.input_edit = InputEdit(('header', "Сообщение: "))
self.input_edit = InputEdit(
('header', "Сообщение: "),
telegram_client=self.telegram_client,
chat_id=self.current_chat_id
)
# Устанавливаем ссылку на родительский виджет
self.input_edit.parent = self
# Обновляем кэш пользователей
asyncio.create_task(self.input_edit.update_users_cache())
else:
self.input_edit = InputEdit(('input_disabled', "Отправка сообщений недоступна"))
self.input_edit = InputEdit(
('input_disabled', "Отправка сообщений недоступна"),
telegram_client=self.telegram_client,
chat_id=self.current_chat_id
)
self.input_edit.parent = self
self.input_edit.set_edit_text("")
# Обновляем правую панель
if len(self.right_panel.original_widget.widget_list) > 1:
self.right_panel.original_widget.widget_list[-1] = self.input_edit
async def update_message_status(self, message):
"""Обновляет статус сообщения"""
try:
if message.id in self.pending_messages:
widget = self.pending_messages[message.id]
# Определяем статус
if getattr(message, 'from_id', None):
widget.status = "✓✓" # Доставлено
else:
widget.status = "" # Отправлено
widget.update_widget()
# Если сообщение доставлено, удаляем из отслеживания
if widget.status == "✓✓":
del self.pending_messages[message.id]
except Exception as e:
print(f"Ошибка обновления статуса: {e}")
async def handle_chat_input(self, key):
"""Обработка ввода в экране чатов"""
try:
@ -1141,11 +1215,14 @@ class TelegramTUI:
# Проверяем права при открытии чата
await self.check_chat_permissions(focused.chat_id)
# Сбрасываем таймеры обновления перед загрузкой сообщений
self.last_message_update_time = 0
self.last_update_time = 0
await self.update_message_list(focused.chat_id)
self.focused_element = "input"
self.chat_widget.focus_position = 1
self.right_panel.original_widget.focus_position = 1
self.last_message_update_time = 0
except Exception as e:
print(f"Ошибка при открытии чата: {e}")
@ -1153,10 +1230,6 @@ class TelegramTUI:
message = self.input_edit.get_edit_text()
if message.strip():
try:
# Приостанавливаем автообновление на время отправки
self.last_update_time = datetime.datetime.now().timestamp()
self.last_message_update_time = self.last_update_time
# Создаем виджет сообщения до отправки
now = datetime.datetime.now()
msg_widget = MessageWidget(
@ -1168,6 +1241,10 @@ class TelegramTUI:
status="" # Отправляется
)
# Добавляем сообщение в список сразу
self.message_walker.append(msg_widget)
self.message_list.set_focus(len(self.message_walker) - 1)
# Отправляем сообщение
sent_message = await self.telegram_client.send_message(
self.current_chat_id,
@ -1179,9 +1256,9 @@ class TelegramTUI:
msg_widget.message_id = sent_message.id
self.pending_messages[sent_message.id] = msg_widget
# Добавляем сообщение в список
self.message_walker.append(msg_widget)
self.message_list.set_focus(len(self.message_walker) - 1)
# Обновляем статус
msg_widget.status = ""
msg_widget.update_widget()
# Очищаем поле ввода и сбрасываем ответ
self.input_edit.set_edit_text("")
@ -1189,8 +1266,14 @@ class TelegramTUI:
self.replying_to = None
self.update_input_visibility()
# Форсируем обновление списка сообщений
await self.update_message_list(self.current_chat_id)
except Exception as e:
print(f"Ошибка отправки сообщения: {e}")
# Удаляем виджет сообщения в случае ошибки
if msg_widget in self.message_walker:
self.message_walker.remove(msg_widget)
elif key == 'tab':
if self.focused_element == "search":
@ -1245,7 +1328,7 @@ class TelegramTUI:
# Восстанавливаем состояние в случае ошибки
self.focused_element = "chat_list"
self.chat_widget.focus_position = 0
def unhandled_input(self, key):
"""Обработка необработанных нажатий клавиш"""
if key in ('q', 'Q'):
@ -1256,7 +1339,7 @@ class TelegramTUI:
asyncio.create_task(self.handle_auth(key))
else:
asyncio.create_task(self.handle_chat_input(key))
async def start_auto_updates(self):
"""Запускает автоматическое обновление чатов и сообщений"""
if self.chat_update_task:
@ -1267,32 +1350,17 @@ class TelegramTUI:
async def chat_update_loop():
while True:
try:
# Обновляем только если не в процессе отправки сообщения
if not self.pending_messages:
current_time = datetime.datetime.now().timestamp()
if current_time - self.last_update_time >= self.update_interval:
await self.update_chat_list()
self.last_update_time = current_time
await asyncio.sleep(1)
current_time = datetime.datetime.now().timestamp()
if current_time - self.last_update_time >= self.update_interval:
await self.update_chat_list()
self.last_update_time = current_time
await asyncio.sleep(3)
except Exception as e:
print(f"Ошибка в цикле обновления чатов: {e}")
await asyncio.sleep(1)
async def message_update_loop():
while True:
try:
if self.current_chat_id and not self.pending_messages:
current_time = datetime.datetime.now().timestamp()
if current_time - self.last_message_update_time >= self.message_update_interval:
await self.update_message_list(self.current_chat_id)
self.last_message_update_time = current_time
await asyncio.sleep(0.5)
except Exception as e:
print(f"Ошибка в цикле обновления сообщений: {e}")
await asyncio.sleep(0.5)
await asyncio.sleep(3)
self.chat_update_task = asyncio.create_task(chat_update_loop())
self.message_update_task = asyncio.create_task(message_update_loop())
self.message_update_task = asyncio.create_task(self.message_update_loop())
async def stop_auto_updates(self):
"""Останавливает автоматическое обновление"""
@ -1339,6 +1407,60 @@ class TelegramTUI:
await self.telegram_client.disconnect()
print("Отключено от Telegram")
def show_completion_overlay(self, matches):
"""Показывает оверлей с меню автодополнения"""
# Создаем список вариантов
items = []
for username, full_name in matches:
text = f"@{username} ({full_name})"
items.append(urwid.AttrMap(
urwid.Text(text),
'completion_normal',
'completion_focus'
))
# Создаем меню
self.completion_listbox = urwid.ListBox(urwid.SimpleFocusListWalker(items))
box = urwid.LineBox(
urwid.BoxAdapter(self.completion_listbox, min(len(matches), 5)),
title="Автодополнение"
)
# Создаем оверлей
self.completion_overlay = urwid.Overlay(
box,
self.main_widget,
'center', ('relative', 50),
'middle', ('relative', 30)
)
# Обновляем главный виджет
self.main_widget = self.completion_overlay
def hide_completion_overlay(self):
"""Скрывает оверлей с меню автодополнения"""
if self.completion_overlay:
self.main_widget = self.completion_overlay.bottom_w
self.completion_overlay = None
self.completion_listbox = None
def handle_completion_navigation(self, key):
"""Обрабатывает навигацию по меню автодополнения"""
if self.completion_listbox:
if key == 'up' and self.completion_listbox.focus_position > 0:
self.completion_listbox.focus_position -= 1
return None
elif key == 'down' and self.completion_listbox.focus_position < len(self.completion_listbox.body) - 1:
self.completion_listbox.focus_position += 1
return None
return key
def get_selected_completion(self):
"""Возвращает индекс выбранного варианта автодополнения"""
if self.completion_listbox:
return self.completion_listbox.focus_position
return None
async def main():
# Загружаем переменные окружения
load_dotenv()