from fastapi import APIRouter, Request, Depends, HTTPException, WebSocket, WebSocketDisconnect, Query from typing import Optional, List from services.global_manager import global_manager from services.rate_limit_service import rate_limit_service import auth_utils import db import schemas import session_utils import uuid from datetime import datetime, timedelta import locales from dependencies import get_current_user, require_admin import config import secrets from services.email_service import send_verification_email, send_password_reset_email from services.token_service import token_service try: from google.oauth2 import id_token from google.auth.transport import requests as google_requests except ImportError: id_token = None google_requests = None router = APIRouter(prefix="/auth", tags=["auth"]) @router.post("/register", response_model=schemas.UserResponse) async def register(request: Request, user: schemas.UserCreate, lang: str = "en"): existing_user = db.execute_query("SELECT id FROM users WHERE email = %s", (user.email,)) if existing_user: raise HTTPException(status_code=400, detail=locales.translate_error("email_already_registered", lang)) ip_address = request.client.host if request.client else None hashed_password = auth_utils.get_password_hash(user.password) query = """ INSERT INTO users (email, password_hash, first_name, last_name, phone, shipping_address, preferred_language, role, ip_address, is_company, company_name, company_pib, company_address, is_active) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 0) """ params = (user.email, hashed_password, user.first_name, user.last_name, user.phone, user.shipping_address, user.preferred_language, 'user', ip_address, user.is_company, user.company_name, user.company_pib, user.company_address) user_id = db.execute_commit(query, params) # Generate Verification Token (Redis) token = token_service.create_verification_token(user_id) # Send Email send_verification_email(user.email, token, user.preferred_language or lang) new_user = db.execute_query("SELECT id, email, first_name, last_name, phone, shipping_address, preferred_language, role, can_chat, is_active, is_company, company_name, company_pib, company_address, ip_address, created_at FROM users WHERE id = %s", (user_id,)) return new_user[0] @router.get("/verify-email") async def verify_email(token: str, lang: str = "en"): user_id = token_service.verify_email_token(token) if not user_id: raise HTTPException(status_code=400, detail="Invalid or expired verification token") db.execute_commit("UPDATE users SET is_active = 1 WHERE id = %s", (user_id,)) token_service.delete_verification_token(token) return {"message": "Email verified successfully. You can now log in."} @router.post("/login", response_model=schemas.Token) async def login(request: Request, user_data: schemas.UserLogin, lang: str = "en"): ip = request.client.host if request.client else "unknown" email = user_data.email.lower() # 1. Check Global Rate Limit if rate_limit_service.is_rate_limited(email, ip): raise HTTPException( status_code=429, detail=locales.translate_error("too_many_attempts", lang) ) # 2. Check if Captcha is Required if rate_limit_service.is_captcha_required(email, ip): if not user_data.captcha_token: raise HTTPException( status_code=403, detail=locales.translate_error("captcha_required", lang) ) # 3. Verify Captcha if not await rate_limit_service.verify_captcha(user_data.captcha_token): raise HTTPException( status_code=403, detail=locales.translate_error("invalid_token", lang) ) # 4. Attempt Authentication user = db.execute_query("SELECT * FROM users WHERE email = %s", (email,)) if not user or not auth_utils.verify_password(user_data.password, user[0]['password_hash']): # Log failure rate_limit_service.record_failed_attempt(email, ip) raise HTTPException(status_code=401, detail=locales.translate_error("incorrect_credentials", lang)) if not user[0].get('is_active', True): raise HTTPException(status_code=403, detail=locales.translate_error("account_not_active", lang)) # 5. Success - Reset Rate Limits rate_limit_service.reset_attempts(email, ip) access_token = auth_utils.create_access_token( data={"sub": user[0]['email'], "id": user[0]['id'], "role": user[0]['role']} ) return {"access_token": access_token, "token_type": "bearer"} @router.post("/social-login", response_model=schemas.Token) async def social_login(request: Request, data: schemas.SocialLogin): email = data.email.lower() if data.email else None first_name = data.first_name last_name = data.last_name # 1. Verify token if provider is Google if data.provider == 'google': print(f"DEBUG: Social Login attempt. id_token library available: {id_token is not None}") print(f"DEBUG: config.GOOGLE_CLIENT_ID exists: {bool(config.GOOGLE_CLIENT_ID)}") print(f"DEBUG: config.GOOGLE_CLIENT_ID value: {config.GOOGLE_CLIENT_ID}") if not id_token or not config.GOOGLE_CLIENT_ID: msg = f"Config error: id_token_lib={id_token is not None}, client_id_set={bool(config.GOOGLE_CLIENT_ID)}" raise HTTPException(status_code=500, detail=f"Google Auth not configured on server ({msg})") try: # Verify the ID token idinfo = id_token.verify_oauth2_token(data.token, google_requests.Request(), config.GOOGLE_CLIENT_ID) # ID token is valid. Get user's Google info if idinfo['iss'] not in ['accounts.google.com', 'https://accounts.google.com']: raise ValueError('Wrong issuer.') email = idinfo['email'].lower() first_name = idinfo.get('given_name', first_name) last_name = idinfo.get('family_name', last_name) except Exception as e: print(f"Google Token Verification Error: {e}") raise HTTPException(status_code=401, detail="Invalid Google token") if not email: raise HTTPException(status_code=400, detail="Email is required") # 2. Proceed with login/registration user = db.execute_query("SELECT id, email, role, is_active FROM users WHERE email = %s", (email,)) if user: if not user[0].get('is_active', True): raise HTTPException(status_code=403, detail="Your account has been suspended.") access_token = auth_utils.create_access_token( data={"sub": user[0]['email'], "id": user[0]['id'], "role": user[0]['role']} ) return {"access_token": access_token, "token_type": "bearer"} else: ip_address = request.client.host if request.client else None hashed_password = auth_utils.get_password_hash(str(uuid.uuid4())) query = """ INSERT INTO users (email, password_hash, first_name, last_name, preferred_language, role, ip_address, is_active) VALUES (%s, %s, %s, %s, %s, %s, %s, 1) """ params = (email, hashed_password, first_name, last_name, data.preferred_language, 'user', ip_address) user_id = db.execute_commit(query, params) access_token = auth_utils.create_access_token(data={"sub": email, "id": user_id, "role": 'user'}) return {"access_token": access_token, "token_type": "bearer"} @router.post("/logout") async def logout(user: dict = Depends(get_current_user)): sid = user.get("sid") if sid: session_utils.delete_session(sid) return {"message": "Successfully logged out"} @router.post("/forgot-password") async def forgot_password(request: schemas.ForgotPassword, lang: str = "en"): user = db.execute_query("SELECT id, preferred_language FROM users WHERE email = %s", (request.email,)) if not user: raise HTTPException(status_code=404, detail="Email not found") # Generate Reset Token (Redis - 10 min TTL) token = token_service.create_reset_token(user[0]['id']) # Send Email user_lang = user[0]['preferred_language'] or lang send_password_reset_email(request.email, token, user_lang) return {"message": "Reset instructions sent to your email"} @router.post("/reset-password") async def reset_password(request: schemas.ResetPassword): user_id = token_service.verify_reset_token(request.token) if not user_id: raise HTTPException(status_code=400, detail="Invalid or expired reset token") hashed_password = auth_utils.get_password_hash(request.new_password) db.execute_commit("UPDATE users SET password_hash = %s WHERE id = %s", (hashed_password, user_id)) # Successful reset - Cleanup ALL reset tokens for this user token_service.cleanup_reset_tokens(user_id) return {"message": "Password updated successfully"} @router.get("/me", response_model=schemas.UserResponse) async def get_me(user: dict = Depends(get_current_user)): user_id = user.get("id") user_data = db.execute_query("SELECT id, email, first_name, last_name, phone, shipping_address, preferred_language, role, can_chat, is_active, is_company, company_name, company_pib, company_address, ip_address, created_at FROM users WHERE id = %s", (user_id,)) if not user_data: raise HTTPException(status_code=404, detail="User not found") return user_data[0] @router.put("/me", response_model=schemas.UserResponse) async def update_me(data: schemas.UserUpdate, user: dict = Depends(get_current_user)): user_id = user.get("id") update_fields = [] params = [] for field, value in data.dict(exclude_unset=True).items(): update_fields.append(f"{field} = %s") params.append(value) if update_fields: query = f"UPDATE users SET {', '.join(update_fields)} WHERE id = %s" params.append(user_id) db.execute_commit(query, tuple(params)) user = db.execute_query("SELECT id, email, first_name, last_name, phone, shipping_address, preferred_language, role, can_chat, is_active, is_company, company_name, company_pib, company_address, ip_address, created_at FROM users WHERE id = %s", (user_id,)) return user[0] @router.get("/admin/users") async def admin_get_users(page: int = 1, size: int = 50, search: Optional[str] = None, admin: dict = Depends(require_admin)): offset = (page - 1) * size base_query = "SELECT id, email, first_name, last_name, phone, shipping_address, preferred_language, role, can_chat, is_active, is_company, company_name, company_pib, company_address, ip_address, created_at FROM users" count_query = "SELECT COUNT(*) as total FROM users" params = [] if search and search.strip(): where_clause = " WHERE email LIKE %s OR first_name LIKE %s OR last_name LIKE %s OR phone LIKE %s" base_query += where_clause count_query += where_clause pattern = f"%{search.strip()}%" params = [pattern] * 4 base_query += " ORDER BY id DESC LIMIT %s OFFSET %s" users = db.execute_query(base_query, tuple(params + [size, offset])) total = db.execute_query(count_query, tuple(params))[0]['total'] return {"users": users, "total": total, "page": page, "size": size} @router.post("/admin/users", response_model=schemas.UserResponse) async def admin_create_user(data: schemas.UserCreate, admin: dict = Depends(require_admin)): existing_user = db.execute_query("SELECT id FROM users WHERE email = %s", (data.email,)) if existing_user: raise HTTPException(status_code=400, detail="Email already registered") hashed_password = auth_utils.get_password_hash(data.password) user_id = db.execute_commit( "INSERT INTO users (email, password_hash, first_name, last_name, phone, role, can_chat) VALUES (%s, %s, %s, %s, %s, %s, %s)", (data.email, hashed_password, data.first_name, data.last_name, data.phone, 'user', True) ) user = db.execute_query("SELECT id, email, first_name, last_name, phone, shipping_address, preferred_language, role, can_chat, is_active, is_company, company_name, company_pib, company_address, ip_address, created_at FROM users WHERE id = %s", (user_id,)) return user[0] @router.patch("/users/{target_id}/admin", response_model=schemas.UserResponse) async def admin_update_user(target_id: int, data: schemas.AdminUserUpdate, admin: dict = Depends(require_admin)): update_fields = [] params = [] update_dict = data.dict(exclude_unset=True) # Handle password hashing if "password" in update_dict: password = update_dict.pop("password") update_dict["password_hash"] = auth_utils.get_password_hash(password) for field, value in update_dict.items(): update_fields.append(f"`{field}` = %s") params.append(value) if update_fields: query = f"UPDATE users SET {', '.join(update_fields)} WHERE id = %s" params.append(target_id) db.execute_commit(query, tuple(params)) # If user was deactivated, kick from active sessions if update_dict.get("is_active") is False: await global_manager.kick_user(target_id) user = db.execute_query("SELECT id, email, first_name, last_name, phone, shipping_address, preferred_language, role, can_chat, is_active, is_company, company_name, company_pib, company_address, ip_address, created_at FROM users WHERE id = %s", (target_id,)) if not user: raise HTTPException(status_code=404, detail="User not found") return user[0] # WebSocket implementation moved to main.py to handle path prefixing issues # @router.websocket("/ws/global") # async def ws_global(websocket: WebSocket, token: str = Query(...)): # ...