| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107 |
- import db
- import json
- import redis
- import config
- from fastapi import Request
- from typing import Optional, Any
- class AuditService:
- def __init__(self):
- # Redis connection for telegram queue
- try:
- self.redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
- except:
- self.redis_client = None
- def _format_details(self, details: Any) -> str:
- if not details:
- return ""
- if isinstance(details, dict):
- lines = []
- for k, v in details.items():
- lines.append(f"• <b>{k}</b>: {v}")
- return "\n" + "\n".join(lines)
- return str(details)
- def _get_readable_action(self, action: str) -> str:
- # Simple mapping for better readability
- mapping = {
- "warehouse_add_item": "📦 Добавление на склад",
- "warehouse_update_item": "📝 Обновление склада",
- "warehouse_delete_item": "🗑 Удаление со склада",
- "order_status_update": "🔔 Статус заказа изменен",
- "user_role_update": "👤 Смена роли пользователя",
- "user_register": "🆕 Новая регистрация",
- "user_login": "🔑 Вход в систему",
- "order_created": "🛒 Новый заказ оформлен",
- "order_review": "⭐ Оставлен отзыв",
- "update_order": "✏️ Заказ отредактирован",
- "delete_order_entirely": "🔥 Заказ полностью удален",
- "admin_create_user": "👤 Пользователь создан админом",
- "admin_update_user": "✏️ Пользователь отредактирован админом",
- "password_reset_success": "🔐 Пароль успешно сброшен",
- "upload_order_photo": "📸 Загружено фото заказа",
- "update_photo_visibility": "👁️ Изменена видимость фото",
- "delete_photo": "🗑️ Удалено фото из портфолио",
- "create_material": "🏗️ Создан новый материал",
- "update_material": "📝 Изменен материал",
- "delete_material": "❌ Удален материал",
- "create_service": "🛠️ Создана новая услуга",
- "update_service": "📝 Изменена услуга",
- "delete_service": "❌ Удалена услуга"
- }
- return mapping.get(action, action)
- async def log(
- self,
- user_id: int,
- action: str,
- target_type: Optional[str] = None,
- target_id: Optional[int] = None,
- details: Optional[Any] = None,
- request: Optional[Request] = None
- ):
- ip_address = None
- if request:
- ip_address = request.headers.get("X-Forwarded-For", request.client.host if request.client else None)
-
- details_str = None
- if details:
- if isinstance(details, (dict, list)):
- def decimal_default(obj):
- from decimal import Decimal
- if isinstance(obj, Decimal):
- return float(obj)
- return str(obj)
- details_str = json.dumps(details, ensure_ascii=False, default=decimal_default)
- else:
- details_str = str(details)
-
- # 1. Save to DB
- query = """
- INSERT INTO audit_logs (user_id, action, target_type, target_id, details, ip_address)
- VALUES (%s, %s, %s, %s, %s, %s)
- """
- db.execute_commit(query, (user_id, action, target_type, target_id, details_str, ip_address))
- # 2. Push to Telegram Queue if CHAT_ID is configured
- if config.TELEGRAM_CHAT_ID and self.redis_client:
- try:
- readable_action = self._get_readable_action(action)
- formatted_details = self._format_details(details)
-
- message = f"<b>Audit Log:</b> {readable_action}\n"
- if target_type:
- message += f"<b>Target:</b> {target_type} (ID: {target_id})\n"
- if formatted_details:
- message += f"<b>Details:</b>{formatted_details}"
-
- payload = {
- "id": config.TELEGRAM_CHAT_ID,
- "message": message
- }
- self.redis_client.rpush("messages_queue", json.dumps(payload, ensure_ascii=False))
- except Exception as e:
- print(f"FAILED TO PUSH TO TELEGRAM QUEUE: {e}")
- audit_service = AuditService()
|