sumaq/backend/routers/non_conformities.py

143 lines
4.3 KiB
Python

import os
import shutil
import uuid
import datetime
from fastapi import APIRouter, Depends, HTTPException, status, UploadFile, File
from sqlalchemy.orm import Session
from typing import List, Optional
from database import get_db
from models import NonConformity, User, Activity, Evidence
from security import get_current_active_user
import schemas
router = APIRouter(
prefix="/non-conformities",
tags=["Non-Conformities"]
)
UPLOAD_DIR = "uploads"
if not os.path.exists(UPLOAD_DIR):
os.makedirs(UPLOAD_DIR)
@router.post("/", response_model=schemas.NonConformity)
def create_nc(
nc: schemas.NonConformityCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user)
):
db_activity = db.query(Activity).filter(Activity.id == nc.activity_id).first()
if not db_activity:
raise HTTPException(status_code=404, detail="Activity not found")
db_nc = NonConformity(**nc.dict())
db.add(db_nc)
db.commit()
db.refresh(db_nc)
return db_nc
@router.get("/", response_model=List[schemas.NonConformity])
def read_ncs(
activity_id: Optional[int] = None,
status: Optional[str] = None,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user)
):
query = db.query(NonConformity)
if activity_id:
query = query.filter(NonConformity.activity_id == activity_id)
if status:
query = query.filter(NonConformity.status == status)
return query.all()
@router.get("/{nc_id}", response_model=schemas.NonConformity)
def read_nc(
nc_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user)
):
db_nc = db.query(NonConformity).filter(NonConformity.id == nc_id).first()
if db_nc is None:
raise HTTPException(status_code=404, detail="Non-Conformity not found")
return db_nc
@router.put("/{nc_id}", response_model=schemas.NonConformity)
@router.patch("/{nc_id}", response_model=schemas.NonConformity)
def update_nc(
nc_id: int,
nc: schemas.NonConformityUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user)
):
db_nc = db.query(NonConformity).filter(NonConformity.id == nc_id).first()
if not db_nc:
raise HTTPException(status_code=404, detail="Non-Conformity not found")
update_data = nc.dict(exclude_unset=True)
for key, value in update_data.items():
setattr(db_nc, key, value)
db.commit()
db.refresh(db_nc)
return db_nc
@router.post("/{nc_id}/upload", response_model=schemas.Evidence)
async def upload_nc_evidence(
nc_id: int,
file: UploadFile = File(...),
description: Optional[str] = None,
captured_at: Optional[str] = None,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user)
):
# Verify NC exists
db_nc = db.query(NonConformity).filter(NonConformity.id == nc_id).first()
if not db_nc:
raise HTTPException(status_code=404, detail="Non-Conformity not found")
# Generate unique filename
file_ext = os.path.splitext(file.filename)[1]
unique_filename = f"nc_{uuid.uuid4()}{file_ext}"
file_path = os.path.join(UPLOAD_DIR, unique_filename)
# Save file
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
db_captured_at = None
if captured_at:
try:
db_captured_at = datetime.datetime.fromisoformat(captured_at.replace('Z', '+00:00'))
except:
db_captured_at = datetime.datetime.utcnow()
else:
db_captured_at = datetime.datetime.utcnow()
# Save to database
db_evidence = Evidence(
non_conformity_id=nc_id,
file_path=file_path,
media_type=file.content_type,
description=description,
captured_at=db_captured_at
)
db.add(db_evidence)
db.commit()
db.refresh(db_evidence)
return db_evidence
@router.delete("/{nc_id}")
def delete_nc(
nc_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user)
):
db_nc = db.query(NonConformity).filter(NonConformity.id == nc_id).first()
if not db_nc:
raise HTTPException(status_code=404, detail="Non-Conformity not found")
db.delete(db_nc)
db.commit()
return {"detail": "Non-Conformity deleted"}