Kaynağa Gözat

feat: integrated audit logs with telegram notifications

unknown 6 saat önce
ebeveyn
işleme
8ddd862e6b
2 değiştirilmiş dosya ile 57 ekleme ve 9 silme
  1. 3 0
      backend/config.py
  2. 54 9
      backend/services/audit_service.py

+ 3 - 0
backend/config.py

@@ -58,3 +58,6 @@ SMTP_PASS = os.getenv("SMTP_PASS", "")
 SMTP_FROM = os.getenv("SMTP_FROM", "hello@radionica3d.me")
 # Frontend URL for links in emails
 FRONTEND_URL = os.getenv("FRONTEND_URL", "https://radionica3d.me")
+
+# Telegram Notifications
+TELEGRAM_CHAT_ID = os.getenv("CHAT_ID")

+ 54 - 9
backend/services/audit_service.py

@@ -1,9 +1,39 @@
 import db
 import json
+import redis
+import config
 from fastapi import Request
 from typing import Optional, Any
 
 class AuditService:
+    def __init__(self):
+        # Redis connection for telegram queue
+        try:
+            self.redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
+        except:
+            self.redis_client = None
+
+    def _format_details(self, details: Any) -> str:
+        if not details:
+            return ""
+        if isinstance(details, dict):
+            lines = []
+            for k, v in details.items():
+                lines.append(f"• <b>{k}</b>: {v}")
+            return "\n" + "\n".join(lines)
+        return str(details)
+
+    def _get_readable_action(self, action: str) -> str:
+        # Simple mapping for better readability
+        mapping = {
+            "warehouse_add_item": "📦 Добавление на склад",
+            "warehouse_update_item": "📝 Обновление склада",
+            "warehouse_delete_item": "🗑 Удаление со склада",
+            "order_status_update": "🔔 Статус заказа изменен",
+            "user_role_update": "👤 Смена роли пользователя"
+        }
+        return mapping.get(action, action)
+
     async def log(
         self, 
         user_id: int, 
@@ -15,30 +45,45 @@ class AuditService:
     ):
         ip_address = None
         if request:
-            # Try to get real IP if behind proxy
             ip_address = request.headers.get("X-Forwarded-For", request.client.host if request.client else None)
         
         details_str = None
         if details:
             if isinstance(details, (dict, list)):
-                # Convert details to JSON string, handling Decimals and other types
                 def decimal_default(obj):
-                    try:
-                        from decimal import Decimal
-                        if isinstance(obj, Decimal):
-                            return float(obj)
-                    except:
-                        pass
+                    from decimal import Decimal
+                    if isinstance(obj, Decimal):
+                        return float(obj)
                     return str(obj)
-
                 details_str = json.dumps(details, ensure_ascii=False, default=decimal_default)
             else:
                 details_str = str(details)
         
+        # 1. Save to DB
         query = """
             INSERT INTO audit_logs (user_id, action, target_type, target_id, details, ip_address)
             VALUES (%s, %s, %s, %s, %s, %s)
         """
         db.execute_commit(query, (user_id, action, target_type, target_id, details_str, ip_address))
 
+        # 2. Push to Telegram Queue if CHAT_ID is configured
+        if config.TELEGRAM_CHAT_ID and self.redis_client:
+            try:
+                readable_action = self._get_readable_action(action)
+                formatted_details = self._format_details(details)
+                
+                message = f"<b>Audit Log:</b> {readable_action}\n"
+                if target_type:
+                    message += f"<b>Target:</b> {target_type} (ID: {target_id})\n"
+                if formatted_details:
+                    message += f"<b>Details:</b>{formatted_details}"
+                
+                payload = {
+                    "id": config.TELEGRAM_CHAT_ID,
+                    "message": message
+                }
+                self.redis_client.rpush("messages_queue", json.dumps(payload, ensure_ascii=False))
+            except Exception as e:
+                print(f"FAILED TO PUSH TO TELEGRAM QUEUE: {e}")
+
 audit_service = AuditService()