import mysql.connector from mysql.connector import pooling import os from typing import List, Dict, Any, Optional # Database configuration (using environment variables for production) DB_CONFIG = { "host": os.getenv("DB_HOST", "127.0.0.1"), "user": os.getenv("DB_USER", "radionica"), "password": os.getenv("DB_PASS", "NY9B9VLifDC9ehZ"), "database": os.getenv("DB_NAME", "radionica3d"), "port": int(os.getenv("DB_PORT", 3306)) } # Connection pool try: connection_pool = pooling.MySQLConnectionPool( pool_name="radionica_pool", pool_size=10, pool_reset_session=True, **DB_CONFIG ) print("Database connection pool created successfully") except mysql.connector.Error as err: print(f"Error creating connection pool: {err}") connection_pool = None def get_connection(): """Get a connection from the pool""" if connection_pool: return connection_pool.get_connection() return mysql.connector.connect(**DB_CONFIG) def execute_query(query: str, params: tuple = ()) -> List[Dict[str, Any]]: """Execute a SELECT query and return results as a list of dictionaries""" conn = get_connection() cursor = conn.cursor(dictionary=True) try: cursor.execute(query, params) result = cursor.fetchall() return result finally: cursor.close() conn.close() def execute_commit(query: str, params: tuple = ()) -> int: """Execute an INSERT/UPDATE/DELETE query and return the last inserted ID""" conn = get_connection() cursor = conn.cursor() try: cursor.execute(query, params) conn.commit() return cursor.lastrowid finally: cursor.close() conn.close() def execute_batch(query: str, params_list: List[tuple]): """Execute multiple queries in a single transaction""" conn = get_connection() cursor = conn.cursor() try: cursor.executemany(query, params_list) conn.commit() finally: cursor.close() conn.close()