import db import json import redis import config from typing import Optional, Any class AuditService: def __init__(self): # Redis connection for telegram queue try: self.redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True) except: self.redis_client = None def _format_details(self, details: Any) -> str: if not details: return "" if isinstance(details, dict): lines = [] for k, v in details.items(): lines.append(f"• {k}: {v}") return "\n" + "\n".join(lines) return str(details) def _get_readable_action(self, action: str) -> str: # Simple mapping for better readability mapping = { "warehouse_add_item": "📦 Добавление на склад", "warehouse_update_item": "📝 Обновление склада", "warehouse_delete_item": "🗑 Удаление со склада", "order_status_update": "🔔 Статус заказа изменен", "user_role_update": "👤 Смена роли пользователя", "user_register": "🆕 Новая регистрация", "user_login": "🔑 Вход в систему", "order_created": "🛒 Новый заказ оформлен", "order_review": "⭐ Оставлен отзыв", "update_order": "✏️ Заказ отредактирован", "delete_order_entirely": "🔥 Заказ полностью удален", "admin_create_user": "👤 Пользователь создан админом", "admin_update_user": "✏️ Пользователь отредактирован админом", "password_reset_success": "🔐 Пароль успешно сброшен", "upload_order_photo": "📸 Загружено фото заказа", "update_photo_visibility": "👁️ Изменена видимость фото", "delete_photo": "🗑️ Удалено фото из портфолио", "create_material": "🏗️ Создан новый материал", "update_material": "📝 Изменен материал", "delete_material": "❌ Удален материал", "create_service": "🛠️ Создана новая услуга", "update_service": "📝 Изменена услуга", "delete_service": "❌ Удалена услуга" } return mapping.get(action, action) async def log( self, user_id: int, action: str, target_type: Optional[str] = None, target_id: Optional[int] = None, details: Optional[Any] = None ): details_str = None if details: if isinstance(details, (dict, list)): def decimal_default(obj): from decimal import Decimal if isinstance(obj, Decimal): return float(obj) return str(obj) details_str = json.dumps(details, ensure_ascii=False, default=decimal_default) else: details_str = str(details) # 1. Save to DB query = """ INSERT INTO audit_logs (user_id, action, target_type, target_id, details) VALUES (%s, %s, %s, %s, %s) """ db.execute_commit(query, (user_id, action, target_type, target_id, details_str)) # 2. Push to Telegram Queue if CHAT_ID is configured if config.TELEGRAM_CHAT_ID and self.redis_client: try: readable_action = self._get_readable_action(action) formatted_details = self._format_details(details) message = f"Audit Log: {readable_action}\n" if target_type: message += f"Target: {target_type} (ID: {target_id})\n" if formatted_details: message += f"Details:{formatted_details}" payload = { "id": config.TELEGRAM_CHAT_ID, "message": message } self.redis_client.rpush("messages_queue", json.dumps(payload, ensure_ascii=False)) except Exception as e: print(f"FAILED TO PUSH TO TELEGRAM QUEUE: {e}") audit_service = AuditService()