import secrets from session_utils import r class TokenService: @staticmethod def generate_token(length: int = 32) -> str: return secrets.token_urlsafe(length) def create_verification_token(self, user_id: int, expires_seconds: int = 86400) -> str: """Create an email verification token (default 24h)""" token = self.generate_token() # Key: verify_token:TOKEN -> user_id r.setex(f"verify_token:{token}", expires_seconds, str(user_id)) return token def verify_email_token(self, token: str) -> int: """Verify token and return user_id, or None if invalid/expired""" user_id = r.get(f"verify_token:{token}") if user_id: # We don't delete immediately here? Usually yes, after successful verification. return int(user_id) return None def delete_verification_token(self, token: str): r.delete(f"verify_token:{token}") def create_reset_token(self, user_id: int, expires_seconds: int = 600) -> str: """Create a password reset token (default 10 minutes)""" token = self.generate_token() # Key: reset_token:TOKEN -> user_id r.setex(f"reset_token:{token}", expires_seconds, str(user_id)) # Track all reset tokens for this user to allow bulk cleanup r.sadd(f"user_reset_tokens:{user_id}", token) r.expire(f"user_reset_tokens:{user_id}", expires_seconds) return token def verify_reset_token(self, token: str) -> int: """Verify password reset token""" user_id = r.get(f"reset_token:{token}") return int(user_id) if user_id else None def cleanup_reset_tokens(self, user_id: int): """Delete all reset tokens associated with a user""" tokens = r.smembers(f"user_reset_tokens:{user_id}") if tokens: # Delete individual tokens for token in tokens: r.delete(f"reset_token:{token}") # Delete the set itself r.delete(f"user_reset_tokens:{user_id}") token_service = TokenService()