sumaq/backend/routers/non_conformities.py

187 lines
6.1 KiB
Python

import os
import shutil
import uuid
import datetime
from fastapi import APIRouter, Depends, HTTPException, status, UploadFile, File
from sqlalchemy.orm import Session
from typing import List, Optional
from database import get_db
from models import NonConformity, User, Activity, Evidence, Contractor
from security import get_current_active_user
import schemas
router = APIRouter(
prefix="/non-conformities",
tags=["Non-Conformities"]
)
UPLOAD_DIR = "uploads"
if not os.path.exists(UPLOAD_DIR):
os.makedirs(UPLOAD_DIR)
@router.post("/", response_model=schemas.NonConformity)
def create_nc(
nc: schemas.NonConformityCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user)
):
db_activity = db.query(Activity).filter(Activity.id == nc.activity_id).first()
if not db_activity:
raise HTTPException(status_code=404, detail="Activity not found")
# Sync responsible email from contractor if not provided
if nc.contractor_id and not nc.responsible_email:
contractor = db.query(Contractor).filter(Contractor.id == nc.contractor_id).first()
if contractor and contractor.email:
nc.responsible_email = contractor.email
db_nc = NonConformity(**nc.dict())
db.add(db_nc)
db.commit()
db.refresh(db_nc)
return db_nc
@router.get("/", response_model=List[schemas.NonConformity])
def read_ncs(
activity_id: Optional[int] = None,
status: Optional[str] = None,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user)
):
query = db.query(NonConformity)
if activity_id:
query = query.filter(NonConformity.activity_id == activity_id)
if status:
query = query.filter(NonConformity.status == status)
return query.all()
@router.get("/{nc_id}", response_model=schemas.NonConformity)
def read_nc(
nc_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user)
):
db_nc = db.query(NonConformity).filter(NonConformity.id == nc_id).first()
if db_nc is None:
raise HTTPException(status_code=404, detail="Non-Conformity not found")
return db_nc
@router.put("/{nc_id}", response_model=schemas.NonConformity)
@router.patch("/{nc_id}", response_model=schemas.NonConformity)
def update_nc(
nc_id: int,
nc: schemas.NonConformityUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user)
):
db_nc = db.query(NonConformity).filter(NonConformity.id == nc_id).first()
if not db_nc:
raise HTTPException(status_code=404, detail="Non-Conformity not found")
update_data = nc.dict(exclude_unset=True)
# Sync responsible email if contractor_id changes and email not explicitly provided
if 'contractor_id' in update_data and update_data['contractor_id']:
if 'responsible_email' not in update_data or not update_data['responsible_email']:
contractor = db.query(Contractor).filter(Contractor.id == update_data['contractor_id']).first()
if contractor and contractor.email:
update_data['responsible_email'] = contractor.email
for key, value in update_data.items():
setattr(db_nc, key, value)
db.commit()
db.refresh(db_nc)
return db_nc
@router.post("/{nc_id}/upload", response_model=schemas.Evidence)
async def upload_nc_evidence(
nc_id: int,
file: UploadFile = File(...),
description: Optional[str] = None,
captured_at: Optional[str] = None,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user)
):
# Verify NC exists
db_nc = db.query(NonConformity).filter(NonConformity.id == nc_id).first()
if not db_nc:
raise HTTPException(status_code=404, detail="Non-Conformity not found")
# Generate unique filename
file_ext = os.path.splitext(file.filename)[1]
unique_filename = f"nc_{uuid.uuid4()}{file_ext}"
file_path = os.path.join(UPLOAD_DIR, unique_filename)
# Save file
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
db_captured_at = None
if captured_at:
try:
db_captured_at = datetime.datetime.fromisoformat(captured_at.replace('Z', '+00:00'))
except:
db_captured_at = datetime.datetime.utcnow()
else:
db_captured_at = datetime.datetime.utcnow()
# Save to database
db_evidence = Evidence(
non_conformity_id=nc_id,
file_path=file_path,
media_type=file.content_type,
description=description,
captured_at=db_captured_at
)
db.add(db_evidence)
db.commit()
db.refresh(db_evidence)
return db_evidence
@router.delete("/{nc_id}")
def delete_nc(
nc_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user)
):
db_nc = db.query(NonConformity).filter(NonConformity.id == nc_id).first()
if not db_nc:
raise HTTPException(status_code=404, detail="Non-Conformity not found")
db.delete(db_nc)
db.commit()
return {"detail": "Non-Conformity deleted"}
@router.post("/{nc_id}/notify")
def notify_responsible(
nc_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user)
):
db_nc = db.query(NonConformity).filter(NonConformity.id == nc_id).first()
if not db_nc:
raise HTTPException(status_code=404, detail="Non-Conformity not found")
if not db_nc.responsible_email:
raise HTTPException(status_code=400, detail="No responsible email configured for this Non-Conformity")
# Generate hash if it doesn't exist
if not db_nc.access_hash:
db_nc.access_hash = str(uuid.uuid4())
db.commit()
db.refresh(db_nc)
# Send email
from services.email_service import EmailService
EmailService.send_nc_notification(db_nc.responsible_email, db_nc.access_hash, db_nc.description)
# Update status to in-checking
db_nc.status = 'in-checking'
db.commit()
db.refresh(db_nc)
return {"message": "Notification sent successfully", "access_hash": db_nc.access_hash, "status": db_nc.status}