| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156 |
- from fastapi import APIRouter, Depends, HTTPException, Request
- from typing import List, Optional
- from pydantic import BaseModel
- from datetime import datetime
- import db
- import mysql.connector
- import auth_utils
- from services.audit_service import audit_service
- router = APIRouter(prefix="/blog", tags=["blog"])
- class PostBase(BaseModel):
- slug: str
- title_en: str
- title_me: Optional[str] = None
- title_ru: Optional[str] = None
- title_ua: Optional[str] = None
- excerpt_en: Optional[str] = None
- excerpt_me: Optional[str] = None
- excerpt_ru: Optional[str] = None
- excerpt_ua: Optional[str] = None
- content_en: str
- content_me: Optional[str] = None
- content_ru: Optional[str] = None
- content_ua: Optional[str] = None
- category: Optional[str] = None
- image_url: Optional[str] = None
- is_published: bool = False
- class PostCreate(PostBase):
- pass
- class PostUpdate(PostBase):
- pass
- class Post(PostBase):
- id: int
- created_at: datetime
- updated_at: datetime
- class Config:
- from_attributes = True
- @router.get("", response_model=List[Post])
- async def get_posts(published_only: bool = True):
- query = "SELECT * FROM posts"
- if published_only:
- query += " WHERE is_published = TRUE"
- query += " ORDER BY created_at DESC"
- return db.execute_query(query)
- @router.get("/{id_or_slug}", response_model=Post)
- async def get_post(id_or_slug: str):
- # Try by slug first
- res = db.execute_query("SELECT * FROM posts WHERE slug = %s", (id_or_slug,))
- if res: return res[0]
-
- # If not found, try by ID if it looks like an int
- if id_or_slug.isdigit():
- res = db.execute_query("SELECT * FROM posts WHERE id = %s", (int(id_or_slug),))
- if res: return res[0]
-
- raise HTTPException(status_code=404, detail="Post not found")
- @router.post("", response_model=Post)
- async def create_post(post: PostCreate, request: Request, token: str = Depends(auth_utils.oauth2_scheme)):
- payload = auth_utils.decode_token(token)
- if not payload or payload.get("role") != 'admin':
- raise HTTPException(status_code=403, detail="Admin role required")
- query = """
- INSERT INTO posts (
- slug, title_en, title_me, title_ru, title_ua,
- excerpt_en, excerpt_me, excerpt_ru, excerpt_ua,
- content_en, content_me, content_ru, content_ua,
- category, image_url, is_published
- ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
- """
- values = (
- post.slug, post.title_en, post.title_me, post.title_ru, post.title_ua,
- post.excerpt_en, post.excerpt_me, post.excerpt_ru, post.excerpt_ua,
- post.content_en, post.content_me, post.content_ru, post.content_ua,
- post.category, post.image_url, post.is_published
- )
-
- try:
- new_id = db.execute_commit(query, values)
- res = db.execute_query("SELECT * FROM posts WHERE id = %s", (new_id,))
-
- await audit_service.log(
- user_id=payload.get("id"),
- action="create_blog_post",
- target_type="blog_post",
- target_id=new_id,
- details={"slug": post.slug, "title": post.title_en},
- request=request
- )
-
- return res[0]
- except mysql.connector.Error as err:
- raise HTTPException(status_code=400, detail=str(err))
- @router.put("/{post_id}", response_model=Post)
- async def update_post(post_id: int, post: PostUpdate, request: Request, token: str = Depends(auth_utils.oauth2_scheme)):
- payload = auth_utils.decode_token(token)
- if not payload or payload.get("role") != 'admin':
- raise HTTPException(status_code=403, detail="Admin role required")
- query = """
- UPDATE posts SET
- slug=%s, title_en=%s, title_me=%s, title_ru=%s, title_ua=%s,
- excerpt_en=%s, excerpt_me=%s, excerpt_ru=%s, excerpt_ua=%s,
- content_en=%s, content_me=%s, content_ru=%s, content_ua=%s,
- category=%s, image_url=%s, is_published=%s
- WHERE id = %s
- """
- values = (
- post.slug, post.title_en, post.title_me, post.title_ru, post.title_ua,
- post.excerpt_en, post.excerpt_me, post.excerpt_ru, post.excerpt_ua,
- post.content_en, post.content_me, post.content_ru, post.content_ua,
- post.category, post.image_url, post.is_published, post_id
- )
-
- db.execute_commit(query, values)
- res = db.execute_query("SELECT * FROM posts WHERE id = %s", (post_id,))
- if not res:
- raise HTTPException(status_code=404, detail="Post not found")
-
- await audit_service.log(
- user_id=payload.get("id"),
- action="update_blog_post",
- target_type="blog_post",
- target_id=post_id,
- details={"slug": post.slug, "title": post.title_en},
- request=request
- )
-
- return res[0]
- @router.delete("/{post_id}")
- async def delete_post(post_id: int, request: Request, token: str = Depends(auth_utils.oauth2_scheme)):
- payload = auth_utils.decode_token(token)
- if not payload or payload.get("role") != 'admin':
- raise HTTPException(status_code=403, detail="Admin role required")
- # We don't easily get rowcount from execute_commit as I wrote it,
- # but we can check existence first or modify execute_commit.
- # For now, let's just execute it.
- db.execute_commit("DELETE FROM posts WHERE id = %s", (post_id,))
-
- await audit_service.log(
- user_id=payload.get("id"),
- action="delete_blog_post",
- target_type="blog_post",
- target_id=post_id,
- request=request
- )
-
- return {"message": "Post deleted successfully"}
|