audit_service.py 3.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. import db
  2. import json
  3. import redis
  4. import config
  5. from fastapi import Request
  6. from typing import Optional, Any
  7. class AuditService:
  8. def __init__(self):
  9. # Redis connection for telegram queue
  10. try:
  11. self.redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
  12. except:
  13. self.redis_client = None
  14. def _format_details(self, details: Any) -> str:
  15. if not details:
  16. return ""
  17. if isinstance(details, dict):
  18. lines = []
  19. for k, v in details.items():
  20. lines.append(f"• <b>{k}</b>: {v}")
  21. return "\n" + "\n".join(lines)
  22. return str(details)
  23. def _get_readable_action(self, action: str) -> str:
  24. # Simple mapping for better readability
  25. mapping = {
  26. "warehouse_add_item": "📦 Добавление на склад",
  27. "warehouse_update_item": "📝 Обновление склада",
  28. "warehouse_delete_item": "🗑 Удаление со склада",
  29. "order_status_update": "🔔 Статус заказа изменен",
  30. "user_role_update": "👤 Смена роли пользователя"
  31. }
  32. return mapping.get(action, action)
  33. async def log(
  34. self,
  35. user_id: int,
  36. action: str,
  37. target_type: Optional[str] = None,
  38. target_id: Optional[int] = None,
  39. details: Optional[Any] = None,
  40. request: Optional[Request] = None
  41. ):
  42. ip_address = None
  43. if request:
  44. ip_address = request.headers.get("X-Forwarded-For", request.client.host if request.client else None)
  45. details_str = None
  46. if details:
  47. if isinstance(details, (dict, list)):
  48. def decimal_default(obj):
  49. from decimal import Decimal
  50. if isinstance(obj, Decimal):
  51. return float(obj)
  52. return str(obj)
  53. details_str = json.dumps(details, ensure_ascii=False, default=decimal_default)
  54. else:
  55. details_str = str(details)
  56. # 1. Save to DB
  57. query = """
  58. INSERT INTO audit_logs (user_id, action, target_type, target_id, details, ip_address)
  59. VALUES (%s, %s, %s, %s, %s, %s)
  60. """
  61. db.execute_commit(query, (user_id, action, target_type, target_id, details_str, ip_address))
  62. # 2. Push to Telegram Queue if CHAT_ID is configured
  63. if config.TELEGRAM_CHAT_ID and self.redis_client:
  64. try:
  65. readable_action = self._get_readable_action(action)
  66. formatted_details = self._format_details(details)
  67. message = f"<b>Audit Log:</b> {readable_action}\n"
  68. if target_type:
  69. message += f"<b>Target:</b> {target_type} (ID: {target_id})\n"
  70. if formatted_details:
  71. message += f"<b>Details:</b>{formatted_details}"
  72. payload = {
  73. "id": config.TELEGRAM_CHAT_ID,
  74. "message": message
  75. }
  76. self.redis_client.rpush("messages_queue", json.dumps(payload, ensure_ascii=False))
  77. except Exception as e:
  78. print(f"FAILED TO PUSH TO TELEGRAM QUEUE: {e}")
  79. audit_service = AuditService()