| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475 |
- import time
- from session_utils import r
- # Configuration
- MAX_ATTEMPTS_BEFORE_CAPTCHA = 3
- RATE_LIMIT_LOGIN_WINDOW = 3600 # 1 hour
- MAX_LOGIN_ATTEMPTS_PER_WINDOW = 20
- class RateLimitService:
- def get_login_attempts(self, email: str, ip: str) -> int:
- """Get the number of failed login attempts for this email and IP"""
- # We use a combined key to prevent distributed brute force on one account
- # and IP-based spraying.
- count_ip = r.get(f"login_fail_ip:{ip}") or 0
- count_email = r.get(f"login_fail_email:{email}") or 0
- return max(int(count_ip), int(count_email))
- def record_failed_attempt(self, email: str, ip: str):
- """Increment failure counts for IP and Email"""
- # Increment IP failures (expires in 1 hour)
- r.incr(f"login_fail_ip:{ip}")
- r.expire(f"login_fail_ip:{ip}", RATE_LIMIT_LOGIN_WINDOW)
-
- # Increment Email failures (expires in 1 hour)
- r.incr(f"login_fail_email:{email}")
- r.expire(f"login_fail_email:{email}", RATE_LIMIT_LOGIN_WINDOW)
- def reset_attempts(self, email: str, ip: str):
- """Reset counts after successful login"""
- r.delete(f"login_fail_ip:{ip}")
- r.delete(f"login_fail_email:{email}")
- def is_captcha_required(self, email: str, ip: str) -> bool:
- """Check if captcha should be presented to the user"""
- return self.get_login_attempts(email, ip) >= MAX_ATTEMPTS_BEFORE_CAPTCHA
- def is_rate_limited(self, email: str, ip: str) -> bool:
- """Check if global rate limit for these identifiers is exceeded"""
- import config
- if config.TESTING:
- return False
- return self.get_login_attempts(email, ip) >= MAX_LOGIN_ATTEMPTS_PER_WINDOW
- async def verify_captcha(self, captcha_token: str) -> bool:
- """
- Stub for captcha verification logic.
- In production, this would call Google reCAPTCHA or Cloudflare Turnstile API.
- """
- if not captcha_token:
- return False
-
- # For demonstration purposes, we'll accept 'valid-captcha-token'
- # In a real app, we'd use requests.post to a verification endpoint.
- if captcha_token == "valid-captcha-token":
- return True
-
- # Verification failing for everything else for now to show the logic
- return False
- def is_order_flooding(self, email: str, ip: str) -> bool:
- """Check if user/ip is placing orders too frequently (limit: 1 per minute)"""
- import config
- if config.TESTING:
- return False
-
- if r.exists(f"order_limit_ip:{ip}") or r.exists(f"order_limit_email:{email}"):
- return True
- return False
- def record_order_placement(self, email: str, ip: str):
- """Record order placement and set 60s cooldown"""
- r.setex(f"order_limit_ip:{ip}", 60, "1")
- r.setex(f"order_limit_email:{email}", 60, "1")
- rate_limit_service = RateLimitService()
|