import db import json import redis import config from fastapi import Request from typing import Optional, Any class AuditService: def __init__(self): # Redis connection for telegram queue try: self.redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True) except: self.redis_client = None def _format_details(self, details: Any) -> str: if not details: return "" if isinstance(details, dict): lines = [] for k, v in details.items(): lines.append(f"• {k}: {v}") return "\n" + "\n".join(lines) return str(details) def _get_readable_action(self, action: str) -> str: # Simple mapping for better readability mapping = { "warehouse_add_item": "📦 Добавление на склад", "warehouse_update_item": "📝 Обновление склада", "warehouse_delete_item": "🗑 Удаление со склада", "order_status_update": "🔔 Статус заказа изменен", "user_role_update": "👤 Смена роли пользователя" } return mapping.get(action, action) async def log( self, user_id: int, action: str, target_type: Optional[str] = None, target_id: Optional[int] = None, details: Optional[Any] = None, request: Optional[Request] = None ): ip_address = None if request: ip_address = request.headers.get("X-Forwarded-For", request.client.host if request.client else None) details_str = None if details: if isinstance(details, (dict, list)): def decimal_default(obj): from decimal import Decimal if isinstance(obj, Decimal): return float(obj) return str(obj) details_str = json.dumps(details, ensure_ascii=False, default=decimal_default) else: details_str = str(details) # 1. Save to DB query = """ INSERT INTO audit_logs (user_id, action, target_type, target_id, details, ip_address) VALUES (%s, %s, %s, %s, %s, %s) """ db.execute_commit(query, (user_id, action, target_type, target_id, details_str, ip_address)) # 2. Push to Telegram Queue if CHAT_ID is configured if config.TELEGRAM_CHAT_ID and self.redis_client: try: readable_action = self._get_readable_action(action) formatted_details = self._format_details(details) message = f"Audit Log: {readable_action}\n" if target_type: message += f"Target: {target_type} (ID: {target_id})\n" if formatted_details: message += f"Details:{formatted_details}" payload = { "id": config.TELEGRAM_CHAT_ID, "message": message } self.redis_client.rpush("messages_queue", json.dumps(payload, ensure_ascii=False)) except Exception as e: print(f"FAILED TO PUSH TO TELEGRAM QUEUE: {e}") audit_service = AuditService()