chat_manager.py 2.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. import json
  2. import datetime
  3. import db
  4. from typing import Dict, List, Any
  5. from fastapi import WebSocket
  6. class ChatConnectionManager:
  7. def __init__(self):
  8. # order_id -> list of active websockets
  9. # order_id -> list of dicts: {"ws": websocket, "role": str}
  10. self.active_connections: Dict[int, List[Dict[str, Any]]] = {}
  11. async def connect(self, websocket: WebSocket, order_id: int, role: str):
  12. await websocket.accept()
  13. if order_id not in self.active_connections:
  14. self.active_connections[order_id] = []
  15. self.active_connections[order_id].append({"ws": websocket, "role": role})
  16. await self.broadcast_presence(order_id)
  17. def disconnect(self, websocket: WebSocket, order_id: int):
  18. if order_id in self.active_connections:
  19. self.active_connections[order_id] = [c for c in self.active_connections[order_id] if c["ws"] != websocket]
  20. if not self.active_connections[order_id]:
  21. del self.active_connections[order_id]
  22. else:
  23. import asyncio
  24. try:
  25. asyncio.create_task(self.broadcast_presence(order_id))
  26. except Exception:
  27. pass
  28. async def broadcast_presence(self, order_id: int):
  29. if order_id in self.active_connections:
  30. roles = [c["role"] for c in self.active_connections[order_id]]
  31. payload = json.dumps({"type": "presence", "online_roles": list(set(roles))})
  32. for connection in self.active_connections[order_id]:
  33. try:
  34. await connection["ws"].send_text(payload)
  35. except:
  36. pass
  37. async def broadcast_to_order(self, order_id: int, message: Any):
  38. if order_id in self.active_connections:
  39. # We must serialize datetime to JSON string
  40. if isinstance(message, dict) and 'created_at' in message:
  41. if isinstance(message['created_at'], datetime.datetime):
  42. message['created_at'] = message['created_at'].isoformat()
  43. if "type" not in message:
  44. message["type"] = "message"
  45. payload = json.dumps(message)
  46. for connection in self.active_connections[order_id]:
  47. try:
  48. await connection["ws"].send_text(payload)
  49. except:
  50. pass
  51. manager = ChatConnectionManager()