rate_limit_service.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. import time
  2. from session_utils import r
  3. # Configuration
  4. MAX_ATTEMPTS_BEFORE_CAPTCHA = 3
  5. RATE_LIMIT_LOGIN_WINDOW = 3600 # 1 hour
  6. MAX_LOGIN_ATTEMPTS_PER_WINDOW = 20
  7. class RateLimitService:
  8. def get_login_attempts(self, email: str, ip: str) -> int:
  9. """Get the number of failed login attempts for this email and IP"""
  10. # We use a combined key to prevent distributed brute force on one account
  11. # and IP-based spraying.
  12. count_ip = r.get(f"login_fail_ip:{ip}") or 0
  13. count_email = r.get(f"login_fail_email:{email}") or 0
  14. return max(int(count_ip), int(count_email))
  15. def record_failed_attempt(self, email: str, ip: str):
  16. """Increment failure counts for IP and Email"""
  17. # Increment IP failures (expires in 1 hour)
  18. r.incr(f"login_fail_ip:{ip}")
  19. r.expire(f"login_fail_ip:{ip}", RATE_LIMIT_LOGIN_WINDOW)
  20. # Increment Email failures (expires in 1 hour)
  21. r.incr(f"login_fail_email:{email}")
  22. r.expire(f"login_fail_email:{email}", RATE_LIMIT_LOGIN_WINDOW)
  23. def reset_attempts(self, email: str, ip: str):
  24. """Reset counts after successful login"""
  25. r.delete(f"login_fail_ip:{ip}")
  26. r.delete(f"login_fail_email:{email}")
  27. def is_captcha_required(self, email: str, ip: str) -> bool:
  28. """Check if captcha should be presented to the user"""
  29. return self.get_login_attempts(email, ip) >= MAX_ATTEMPTS_BEFORE_CAPTCHA
  30. def is_rate_limited(self, email: str, ip: str) -> bool:
  31. """Check if global rate limit for these identifiers is exceeded"""
  32. import config
  33. if config.TESTING:
  34. return False
  35. return self.get_login_attempts(email, ip) >= MAX_LOGIN_ATTEMPTS_PER_WINDOW
  36. async def verify_captcha(self, captcha_token: str) -> bool:
  37. """
  38. Stub for captcha verification logic.
  39. In production, this would call Google reCAPTCHA or Cloudflare Turnstile API.
  40. """
  41. if not captcha_token:
  42. return False
  43. # For demonstration purposes, we'll accept 'valid-captcha-token'
  44. # In a real app, we'd use requests.post to a verification endpoint.
  45. if captcha_token == "valid-captcha-token":
  46. return True
  47. # Verification failing for everything else for now to show the logic
  48. return False
  49. def is_order_flooding(self, email: str, ip: str) -> bool:
  50. """Check if user/ip is placing orders too frequently (limit: 1 per minute)"""
  51. import config
  52. if config.TESTING:
  53. return False
  54. if r.exists(f"order_limit_ip:{ip}") or r.exists(f"order_limit_email:{email}"):
  55. return True
  56. return False
  57. def record_order_placement(self, email: str, ip: str):
  58. """Record order placement and set 60s cooldown"""
  59. r.setex(f"order_limit_ip:{ip}", 60, "1")
  60. r.setex(f"order_limit_email:{email}", 60, "1")
  61. rate_limit_service = RateLimitService()