token_service.py 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
  1. import secrets
  2. from session_utils import r
  3. class TokenService:
  4. @staticmethod
  5. def generate_token(length: int = 32) -> str:
  6. return secrets.token_urlsafe(length)
  7. def create_verification_token(self, user_id: int, expires_seconds: int = 86400) -> str:
  8. """Create an email verification token (default 24h)"""
  9. token = self.generate_token()
  10. # Key: verify_token:TOKEN -> user_id
  11. r.setex(f"verify_token:{token}", expires_seconds, str(user_id))
  12. return token
  13. def verify_email_token(self, token: str) -> int:
  14. """Verify token and return user_id, or None if invalid/expired"""
  15. user_id = r.get(f"verify_token:{token}")
  16. if user_id:
  17. # We don't delete immediately here? Usually yes, after successful verification.
  18. return int(user_id)
  19. return None
  20. def delete_verification_token(self, token: str):
  21. r.delete(f"verify_token:{token}")
  22. def create_reset_token(self, user_id: int, expires_seconds: int = 600) -> str:
  23. """Create a password reset token (default 10 minutes)"""
  24. token = self.generate_token()
  25. # Key: reset_token:TOKEN -> user_id
  26. r.setex(f"reset_token:{token}", expires_seconds, str(user_id))
  27. # Track all reset tokens for this user to allow bulk cleanup
  28. r.sadd(f"user_reset_tokens:{user_id}", token)
  29. r.expire(f"user_reset_tokens:{user_id}", expires_seconds)
  30. return token
  31. def verify_reset_token(self, token: str) -> int:
  32. """Verify password reset token"""
  33. user_id = r.get(f"reset_token:{token}")
  34. return int(user_id) if user_id else None
  35. def cleanup_reset_tokens(self, user_id: int):
  36. """Delete all reset tokens associated with a user"""
  37. tokens = r.smembers(f"user_reset_tokens:{user_id}")
  38. if tokens:
  39. # Delete individual tokens
  40. for token in tokens:
  41. r.delete(f"reset_token:{token}")
  42. # Delete the set itself
  43. r.delete(f"user_reset_tokens:{user_id}")
  44. token_service = TokenService()