rate_limit_service.py 2.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. import time
  2. from session_utils import r
  3. # Configuration
  4. MAX_ATTEMPTS_BEFORE_CAPTCHA = 3
  5. RATE_LIMIT_LOGIN_WINDOW = 3600 # 1 hour
  6. MAX_LOGIN_ATTEMPTS_PER_WINDOW = 20
  7. class RateLimitService:
  8. def get_login_attempts(self, email: str, ip: str) -> int:
  9. """Get the number of failed login attempts for this email and IP"""
  10. # We use a combined key to prevent distributed brute force on one account
  11. # and IP-based spraying.
  12. count_ip = r.get(f"login_fail_ip:{ip}") or 0
  13. count_email = r.get(f"login_fail_email:{email}") or 0
  14. return max(int(count_ip), int(count_email))
  15. def record_failed_attempt(self, email: str, ip: str):
  16. """Increment failure counts for IP and Email"""
  17. # Increment IP failures (expires in 1 hour)
  18. r.incr(f"login_fail_ip:{ip}")
  19. r.expire(f"login_fail_ip:{ip}", RATE_LIMIT_LOGIN_WINDOW)
  20. # Increment Email failures (expires in 1 hour)
  21. r.incr(f"login_fail_email:{email}")
  22. r.expire(f"login_fail_email:{email}", RATE_LIMIT_LOGIN_WINDOW)
  23. def reset_attempts(self, email: str, ip: str):
  24. """Reset counts after successful login"""
  25. r.delete(f"login_fail_ip:{ip}")
  26. r.delete(f"login_fail_email:{email}")
  27. def is_captcha_required(self, email: str, ip: str) -> bool:
  28. """Check if captcha should be presented to the user"""
  29. return self.get_login_attempts(email, ip) >= MAX_ATTEMPTS_BEFORE_CAPTCHA
  30. def is_rate_limited(self, email: str, ip: str) -> bool:
  31. """Check if global rate limit for these identifiers is exceeded"""
  32. return self.get_login_attempts(email, ip) >= MAX_LOGIN_ATTEMPTS_PER_WINDOW
  33. async def verify_captcha(self, captcha_token: str) -> bool:
  34. """
  35. Stub for captcha verification logic.
  36. In production, this would call Google reCAPTCHA or Cloudflare Turnstile API.
  37. """
  38. if not captcha_token:
  39. return False
  40. # For demonstration purposes, we'll accept 'valid-captcha-token'
  41. # In a real app, we'd use requests.post to a verification endpoint.
  42. if captcha_token == "valid-captcha-token":
  43. return True
  44. # Verification failing for everything else for now to show the logic
  45. return False
  46. def is_order_flooding(self, email: str, ip: str) -> bool:
  47. """Check if user/ip is placing orders too frequently (limit: 1 per minute)"""
  48. if r.exists(f"order_limit_ip:{ip}") or r.exists(f"order_limit_email:{email}"):
  49. return True
  50. return False
  51. def record_order_placement(self, email: str, ip: str):
  52. """Record order placement and set 60s cooldown"""
  53. r.setex(f"order_limit_ip:{ip}", 60, "1")
  54. r.setex(f"order_limit_email:{email}", 60, "1")
  55. rate_limit_service = RateLimitService()