| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354 |
- import secrets
- from session_utils import r
- class TokenService:
- @staticmethod
- def generate_token(length: int = 32) -> str:
- return secrets.token_urlsafe(length)
- def create_verification_token(self, user_id: int, expires_seconds: int = 86400) -> str:
- """Create an email verification token (default 24h)"""
- token = self.generate_token()
- # Key: verify_token:TOKEN -> user_id
- r.setex(f"verify_token:{token}", expires_seconds, str(user_id))
- return token
- def verify_email_token(self, token: str) -> int:
- """Verify token and return user_id, or None if invalid/expired"""
- user_id = r.get(f"verify_token:{token}")
- if user_id:
- # We don't delete immediately here? Usually yes, after successful verification.
- return int(user_id)
- return None
- def delete_verification_token(self, token: str):
- r.delete(f"verify_token:{token}")
- def create_reset_token(self, user_id: int, expires_seconds: int = 600) -> str:
- """Create a password reset token (default 10 minutes)"""
- token = self.generate_token()
- # Key: reset_token:TOKEN -> user_id
- r.setex(f"reset_token:{token}", expires_seconds, str(user_id))
-
- # Track all reset tokens for this user to allow bulk cleanup
- r.sadd(f"user_reset_tokens:{user_id}", token)
- r.expire(f"user_reset_tokens:{user_id}", expires_seconds)
-
- return token
- def verify_reset_token(self, token: str) -> int:
- """Verify password reset token"""
- user_id = r.get(f"reset_token:{token}")
- return int(user_id) if user_id else None
- def cleanup_reset_tokens(self, user_id: int):
- """Delete all reset tokens associated with a user"""
- tokens = r.smembers(f"user_reset_tokens:{user_id}")
- if tokens:
- # Delete individual tokens
- for token in tokens:
- r.delete(f"reset_token:{token}")
- # Delete the set itself
- r.delete(f"user_reset_tokens:{user_id}")
- token_service = TokenService()
|