chat.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Depends, Query, HTTPException
  2. from services.chat_manager import manager
  3. from services.global_manager import global_manager
  4. import db
  5. import auth_utils
  6. import datetime
  7. import schemas
  8. import locales
  9. router = APIRouter(tags=["chat"])
  10. # In-memory storage for flood control: {user_id: timestamp}
  11. last_message_times = {}
  12. @router.get("/orders/{order_id}/messages")
  13. async def get_order_messages(order_id: int, token: str = Depends(auth_utils.oauth2_scheme)):
  14. payload = auth_utils.decode_token(token)
  15. if not payload: raise HTTPException(status_code=401, detail="Invalid token")
  16. role = payload.get("role")
  17. user_id = payload.get("id")
  18. # Fetch user chat status
  19. user_info = db.execute_query("SELECT can_chat FROM users WHERE id = %s", (user_id,))
  20. can_chat = user_info[0]['can_chat'] if user_info else False
  21. order = db.execute_query("SELECT user_id FROM orders WHERE id = %s", (order_id,))
  22. if not order: raise HTTPException(status_code=404, detail="Order not found")
  23. if role != 'admin':
  24. if order[0]['user_id'] != user_id: raise HTTPException(status_code=403, detail="Not authorized")
  25. if not can_chat: raise HTTPException(status_code=403, detail="Chat access disabled for your account")
  26. messages = db.execute_query("SELECT id, is_from_admin, message, created_at FROM order_messages WHERE order_id = %s ORDER BY created_at ASC", (order_id,))
  27. for msg in messages:
  28. if msg.get('created_at'): msg['created_at'] = msg['created_at'].isoformat()
  29. msg['is_from_admin'] = bool(msg['is_from_admin'])
  30. # Mark messages as read
  31. if role == 'admin':
  32. db.execute_commit("UPDATE order_messages SET is_read = TRUE WHERE order_id = %s AND is_from_admin = FALSE AND is_read = FALSE", (order_id,))
  33. await global_manager.notify_admins()
  34. await global_manager.notify_order_read(order_id)
  35. else:
  36. db.execute_commit("UPDATE order_messages SET is_read = TRUE WHERE order_id = %s AND is_from_admin = TRUE AND is_read = FALSE", (order_id,))
  37. await global_manager.notify_user(user_id)
  38. return messages
  39. @router.post("/orders/{order_id}/messages")
  40. async def post_order_message(order_id: int, data: schemas.MessageCreate, token: str = Depends(auth_utils.oauth2_scheme), lang: str = "en"):
  41. payload = auth_utils.decode_token(token)
  42. if not payload: raise HTTPException(status_code=401, detail="Invalid token")
  43. role = payload.get("role")
  44. user_id = payload.get("id")
  45. # Flood control for non-admin users
  46. if role != 'admin':
  47. now = datetime.datetime.utcnow().timestamp()
  48. last_time = last_message_times.get(user_id, 0)
  49. if now - last_time < 10:
  50. raise HTTPException(status_code=429, detail=locales.translate_error("flood_control", lang))
  51. last_message_times[user_id] = now
  52. message = data.message.strip()
  53. if not message: raise HTTPException(status_code=400, detail="Empty message")
  54. role = payload.get("role")
  55. user_id = payload.get("id")
  56. is_admin = (role == 'admin')
  57. if not is_admin:
  58. user_info = db.execute_query("SELECT can_chat FROM users WHERE id = %s", (user_id,))
  59. if not user_info or not user_info[0]['can_chat']:
  60. raise HTTPException(status_code=403, detail="Chat access disabled")
  61. order = db.execute_query("SELECT user_id FROM orders WHERE id = %s", (order_id,))
  62. if not order: raise HTTPException(status_code=404, detail="Order not found")
  63. if not is_admin and order[0]['user_id'] != user_id: raise HTTPException(status_code=403, detail="Not authorized")
  64. query = "INSERT INTO order_messages (order_id, user_id, is_from_admin, message) VALUES (%s, %s, %s, %s)"
  65. msg_id = db.execute_commit(query, (order_id, user_id, is_admin, message))
  66. now = datetime.datetime.utcnow().isoformat()
  67. await manager.broadcast_to_order(order_id, {"id": msg_id, "is_from_admin": is_admin, "message": message, "created_at": now})
  68. if is_admin:
  69. await global_manager.notify_user(order[0]['user_id'])
  70. else:
  71. from services import event_hooks
  72. await global_manager.notify_admins()
  73. await global_manager.notify_admins_new_message(order_id, message)
  74. event_hooks.on_message_received(order_id, user_id, message)
  75. return {"id": msg_id, "status": "sent"}
  76. @router.websocket("/ws/chat/{order_id}")
  77. async def ws_chat(websocket: WebSocket, order_id: int, token: str = Query(...)):
  78. payload = auth_utils.decode_token(token)
  79. if not payload:
  80. await websocket.close(code=4001)
  81. return
  82. role = payload.get("role")
  83. user_id = payload.get("id")
  84. if role != 'admin':
  85. user_info = db.execute_query("SELECT can_chat FROM users WHERE id = %s", (user_id,))
  86. if not user_info or not user_info[0]['can_chat']:
  87. await websocket.close(code=4003)
  88. return
  89. order = db.execute_query("SELECT user_id FROM orders WHERE id = %s", (order_id,))
  90. if not order:
  91. await websocket.close(code=4004)
  92. return
  93. if role != 'admin' and order[0]['user_id'] != user_id:
  94. await websocket.close(code=4003)
  95. return
  96. await manager.connect(websocket, order_id, role)
  97. try:
  98. while True:
  99. data = await websocket.receive_text()
  100. if data == "typing":
  101. await manager.broadcast_to_order(order_id, {"type": "typing", "is_admin": role == 'admin'})
  102. elif data == "stop_typing":
  103. await manager.broadcast_to_order(order_id, {"type": "stop_typing", "is_admin": role == 'admin'})
  104. elif data == "read":
  105. if role == 'admin':
  106. db.execute_commit("UPDATE order_messages SET is_read = TRUE WHERE order_id = %s AND is_from_admin = FALSE AND is_read = FALSE", (order_id,))
  107. await global_manager.notify_admins()
  108. await global_manager.notify_order_read(order_id)
  109. else:
  110. db.execute_commit("UPDATE order_messages SET is_read = TRUE WHERE order_id = %s AND is_from_admin = TRUE AND is_read = FALSE", (order_id,))
  111. await global_manager.notify_user(user_id)
  112. except WebSocketDisconnect:
  113. manager.disconnect(websocket, order_id)