import db
import json
import redis
import config
from typing import Optional, Any
class AuditService:
def __init__(self):
# Redis connection for telegram queue
try:
self.redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
except:
self.redis_client = None
def _format_details(self, details: Any) -> str:
if not details:
return ""
if isinstance(details, dict):
lines = []
for k, v in details.items():
lines.append(f"• {k}: {v}")
return "\n" + "\n".join(lines)
return str(details)
def _get_readable_action(self, action: str) -> str:
# Simple mapping for better readability
mapping = {
"warehouse_add_item": "📦 Добавление на склад",
"warehouse_update_item": "📝 Обновление склада",
"warehouse_delete_item": "🗑 Удаление со склада",
"order_status_update": "🔔 Статус заказа изменен",
"user_role_update": "👤 Смена роли пользователя",
"user_register": "🆕 Новая регистрация",
"user_login": "🔑 Вход в систему",
"order_created": "🛒 Новый заказ оформлен",
"order_review": "⭐ Оставлен отзыв",
"update_order": "✏️ Заказ отредактирован",
"delete_order_entirely": "🔥 Заказ полностью удален",
"admin_create_user": "👤 Пользователь создан админом",
"admin_update_user": "✏️ Пользователь отредактирован админом",
"password_reset_success": "🔐 Пароль успешно сброшен",
"upload_order_photo": "📸 Загружено фото заказа",
"update_photo_visibility": "👁️ Изменена видимость фото",
"delete_photo": "🗑️ Удалено фото из портфолио",
"create_material": "🏗️ Создан новый материал",
"update_material": "📝 Изменен материал",
"delete_material": "❌ Удален материал",
"create_service": "🛠️ Создана новая услуга",
"update_service": "📝 Изменена услуга",
"delete_service": "❌ Удалена услуга"
}
return mapping.get(action, action)
async def log(
self,
user_id: int,
action: str,
target_type: Optional[str] = None,
target_id: Optional[int] = None,
details: Optional[Any] = None
):
details_str = None
if details:
if isinstance(details, (dict, list)):
def decimal_default(obj):
from decimal import Decimal
if isinstance(obj, Decimal):
return float(obj)
return str(obj)
details_str = json.dumps(details, ensure_ascii=False, default=decimal_default)
else:
details_str = str(details)
# 1. Save to DB
query = """
INSERT INTO audit_logs (user_id, action, target_type, target_id, details)
VALUES (%s, %s, %s, %s, %s)
"""
db.execute_commit(query, (user_id, action, target_type, target_id, details_str))
# 2. Push to Telegram Queue if CHAT_ID is configured
if config.TELEGRAM_CHAT_ID and self.redis_client:
try:
readable_action = self._get_readable_action(action)
formatted_details = self._format_details(details)
message = f"Audit Log: {readable_action}\n"
if target_type:
message += f"Target: {target_type} (ID: {target_id})\n"
if formatted_details:
message += f"Details:{formatted_details}"
payload = {
"id": config.TELEGRAM_CHAT_ID,
"message": message
}
self.redis_client.rpush("messages_queue", json.dumps(payload, ensure_ascii=False))
except Exception as e:
print(f"FAILED TO PUSH TO TELEGRAM QUEUE: {e}")
audit_service = AuditService()