import time from session_utils import r # Configuration MAX_ATTEMPTS_BEFORE_CAPTCHA = 3 RATE_LIMIT_LOGIN_WINDOW = 3600 # 1 hour MAX_LOGIN_ATTEMPTS_PER_WINDOW = 20 class RateLimitService: def get_login_attempts(self, email: str, ip: str) -> int: """Get the number of failed login attempts for this email and IP""" # We use a combined key to prevent distributed brute force on one account # and IP-based spraying. count_ip = r.get(f"login_fail_ip:{ip}") or 0 count_email = r.get(f"login_fail_email:{email}") or 0 return max(int(count_ip), int(count_email)) def record_failed_attempt(self, email: str, ip: str): """Increment failure counts for IP and Email""" # Increment IP failures (expires in 1 hour) r.incr(f"login_fail_ip:{ip}") r.expire(f"login_fail_ip:{ip}", RATE_LIMIT_LOGIN_WINDOW) # Increment Email failures (expires in 1 hour) r.incr(f"login_fail_email:{email}") r.expire(f"login_fail_email:{email}", RATE_LIMIT_LOGIN_WINDOW) def reset_attempts(self, email: str, ip: str): """Reset counts after successful login""" r.delete(f"login_fail_ip:{ip}") r.delete(f"login_fail_email:{email}") def is_captcha_required(self, email: str, ip: str) -> bool: """Check if captcha should be presented to the user""" return self.get_login_attempts(email, ip) >= MAX_ATTEMPTS_BEFORE_CAPTCHA def is_rate_limited(self, email: str, ip: str) -> bool: """Check if global rate limit for these identifiers is exceeded""" return self.get_login_attempts(email, ip) >= MAX_LOGIN_ATTEMPTS_PER_WINDOW async def verify_captcha(self, captcha_token: str) -> bool: """ Stub for captcha verification logic. In production, this would call Google reCAPTCHA or Cloudflare Turnstile API. """ if not captcha_token: return False # For demonstration purposes, we'll accept 'valid-captcha-token' # In a real app, we'd use requests.post to a verification endpoint. if captcha_token == "valid-captcha-token": return True # Verification failing for everything else for now to show the logic return False def is_order_flooding(self, email: str, ip: str) -> bool: """Check if user/ip is placing orders too frequently (limit: 1 per minute)""" if r.exists(f"order_limit_ip:{ip}") or r.exists(f"order_limit_email:{email}"): return True return False def record_order_placement(self, email: str, ip: str): """Record order placement and set 60s cooldown""" r.setex(f"order_limit_ip:{ip}", 60, "1") r.setex(f"order_limit_email:{email}", 60, "1") rate_limit_service = RateLimitService()