141 lines
5.2 KiB
Python
141 lines
5.2 KiB
Python
import os
|
|
import shutil
|
|
import uuid
|
|
import datetime
|
|
from sqlalchemy.orm import Session
|
|
from typing import List, Optional
|
|
from fastapi import HTTPException, UploadFile
|
|
from app.models.non_conformity import NonConformity
|
|
from app.models.activity import Activity
|
|
from app.models.evidence import Evidence
|
|
from app.models.contractor import Contractor
|
|
from app.schemas.non_conformity import NonConformityCreate, NonConformityUpdate
|
|
from app.services.email_service import EmailService
|
|
|
|
UPLOAD_DIR = "uploads"
|
|
|
|
class NonConformityService:
|
|
def __init__(self, db: Session):
|
|
self._db = db
|
|
|
|
def get_ncs(self, activity_id: Optional[int] = None, status: Optional[str] = None) -> List[NonConformity]:
|
|
query = self._db.query(NonConformity)
|
|
if activity_id:
|
|
query = query.filter(NonConformity.activity_id == activity_id)
|
|
if status:
|
|
query = query.filter(NonConformity.status == status)
|
|
return query.all()
|
|
|
|
def get_nc(self, nc_id: int) -> NonConformity:
|
|
db_nc = self._db.query(NonConformity).filter(NonConformity.id == nc_id).first()
|
|
if db_nc is None:
|
|
raise HTTPException(status_code=404, detail="Non-Conformity not found")
|
|
return db_nc
|
|
|
|
def create_nc(self, nc: NonConformityCreate) -> NonConformity:
|
|
db_activity = self._db.query(Activity).filter(Activity.id == nc.activity_id).first()
|
|
if not db_activity:
|
|
raise HTTPException(status_code=404, detail="Activity not found")
|
|
|
|
# Sync responsible email from contractor if not provided
|
|
if nc.contractor_id and not nc.responsible_email:
|
|
contractor = self._db.query(Contractor).filter(Contractor.id == nc.contractor_id).first()
|
|
if contractor and contractor.email:
|
|
nc.responsible_email = contractor.email
|
|
|
|
db_nc = NonConformity(**nc.model_dump())
|
|
self._db.add(db_nc)
|
|
self._db.commit()
|
|
self._db.refresh(db_nc)
|
|
return db_nc
|
|
|
|
def update_nc(self, nc_id: int, nc_update: NonConformityUpdate) -> NonConformity:
|
|
db_nc = self.get_nc(nc_id)
|
|
update_data = nc_update.model_dump(exclude_unset=True)
|
|
|
|
# Sync responsible email if contractor_id changes and email not explicitly provided
|
|
if 'contractor_id' in update_data and update_data['contractor_id']:
|
|
if 'responsible_email' not in update_data or not update_data['responsible_email']:
|
|
contractor = self._db.query(Contractor).filter(Contractor.id == update_data['contractor_id']).first()
|
|
if contractor and contractor.email:
|
|
update_data['responsible_email'] = contractor.email
|
|
|
|
for key, value in update_data.items():
|
|
setattr(db_nc, key, value)
|
|
|
|
self._db.commit()
|
|
self._db.refresh(db_nc)
|
|
return db_nc
|
|
|
|
def upload_nc_evidence(
|
|
self,
|
|
nc_id: int,
|
|
file: UploadFile,
|
|
description: Optional[str] = None,
|
|
captured_at: Optional[str] = None
|
|
) -> Evidence:
|
|
# Verify NC exists
|
|
self.get_nc(nc_id)
|
|
|
|
# Generate unique filename
|
|
file_ext = os.path.splitext(file.filename)[1]
|
|
unique_filename = f"nc_{uuid.uuid4()}{file_ext}"
|
|
file_path = os.path.join(UPLOAD_DIR, unique_filename)
|
|
|
|
# Save file
|
|
if not os.path.exists(UPLOAD_DIR):
|
|
os.makedirs(UPLOAD_DIR)
|
|
|
|
with open(file_path, "wb") as buffer:
|
|
shutil.copyfileobj(file.file, buffer)
|
|
|
|
db_captured_at = None
|
|
if captured_at:
|
|
try:
|
|
db_captured_at = datetime.datetime.fromisoformat(captured_at.replace('Z', '+00:00'))
|
|
except:
|
|
db_captured_at = datetime.datetime.utcnow()
|
|
else:
|
|
db_captured_at = datetime.datetime.utcnow()
|
|
|
|
# Save to database
|
|
db_evidence = Evidence(
|
|
non_conformity_id=nc_id,
|
|
file_path=file_path,
|
|
media_type=file.content_type,
|
|
description=description,
|
|
captured_at=db_captured_at
|
|
)
|
|
self._db.add(db_evidence)
|
|
self._db.commit()
|
|
self._db.refresh(db_evidence)
|
|
|
|
return db_evidence
|
|
|
|
def delete_nc(self, nc_id: int):
|
|
db_nc = self.get_nc(nc_id)
|
|
self._db.delete(db_nc)
|
|
self._db.commit()
|
|
return {"detail": "Non-Conformity deleted"}
|
|
|
|
def notify_responsible(self, nc_id: int):
|
|
db_nc = self.get_nc(nc_id)
|
|
|
|
if not db_nc.responsible_email:
|
|
raise HTTPException(status_code=400, detail="No responsible email configured for this Non-Conformity")
|
|
|
|
# Generate hash if it doesn't exist
|
|
if not db_nc.access_hash:
|
|
db_nc.access_hash = str(uuid.uuid4())
|
|
self._db.commit()
|
|
|
|
# Send email
|
|
EmailService.send_nc_notification(db_nc.responsible_email, db_nc.access_hash, db_nc.description)
|
|
|
|
# Update status to in-checking
|
|
db_nc.status = 'in-checking'
|
|
self._db.commit()
|
|
self._db.refresh(db_nc)
|
|
|
|
return {"message": "Notification sent successfully", "access_hash": db_nc.access_hash, "status": db_nc.status}
|