85 lines
3.3 KiB
Python
85 lines
3.3 KiB
Python
from sqlalchemy.orm import Session
|
|
from typing import List, Optional
|
|
from fastapi import HTTPException
|
|
from app.models.project import Project
|
|
from app.models.specialty import Specialty
|
|
from app.models.contractor import Contractor
|
|
from app.schemas.project import ProjectCreate
|
|
|
|
class ProjectService:
|
|
def __init__(self, db: Session):
|
|
self._db = db
|
|
|
|
def get_projects(self, skip: int = 0, limit: int = 100) -> List[Project]:
|
|
return self._db.query(Project).offset(skip).limit(limit).all()
|
|
|
|
def get_project(self, project_id: int) -> Project:
|
|
db_project = self._db.query(Project).filter(Project.id == project_id).first()
|
|
if db_project is None:
|
|
raise HTTPException(status_code=404, detail="Project not found")
|
|
return db_project
|
|
|
|
def create_project(self, project: ProjectCreate) -> Project:
|
|
db_project = self._db.query(Project).filter(Project.code == project.code).first()
|
|
if db_project:
|
|
raise HTTPException(status_code=400, detail="Project code already exists")
|
|
|
|
project_data = project.model_dump(exclude={'specialty_ids', 'contractor_ids'})
|
|
db_project = Project(**project_data)
|
|
|
|
# Handle Parent Project
|
|
if project.parent_id:
|
|
parent = self.get_project(project.parent_id)
|
|
if not parent:
|
|
raise HTTPException(status_code=404, detail="Parent project not found")
|
|
|
|
# Handle Specialties
|
|
if project.specialty_ids:
|
|
specialties = self._db.query(Specialty).filter(Specialty.id.in_(project.specialty_ids)).all()
|
|
db_project.specialties = specialties
|
|
|
|
# Handle Contractors
|
|
if project.contractor_ids:
|
|
contractors = self._db.query(Contractor).filter(Contractor.id.in_(project.contractor_ids)).all()
|
|
db_project.contractors = contractors
|
|
|
|
self._db.add(db_project)
|
|
self._db.commit()
|
|
self._db.refresh(db_project)
|
|
return db_project
|
|
|
|
def update_project(self, project_id: int, project: ProjectCreate) -> Project:
|
|
db_project = self.get_project(project_id)
|
|
|
|
# Update simple fields
|
|
for key, value in project.model_dump(exclude={'specialty_ids', 'contractor_ids'}).items():
|
|
setattr(db_project, key, value)
|
|
|
|
# Handle Parent Project
|
|
if project.parent_id is not None:
|
|
if project.parent_id != 0:
|
|
self.get_project(project.parent_id)
|
|
db_project.parent_id = project.parent_id
|
|
else:
|
|
db_project.parent_id = None
|
|
|
|
# Update Specialties
|
|
if project.specialty_ids is not None:
|
|
specialties = self._db.query(Specialty).filter(Specialty.id.in_(project.specialty_ids)).all()
|
|
db_project.specialties = specialties
|
|
|
|
# Update Contractors
|
|
if project.contractor_ids is not None:
|
|
contractors = self._db.query(Contractor).filter(Contractor.id.in_(project.contractor_ids)).all()
|
|
db_project.contractors = contractors
|
|
|
|
self._db.commit()
|
|
self._db.refresh(db_project)
|
|
return db_project
|
|
|
|
def delete_project(self, project_id: int):
|
|
db_project = self.get_project(project_id)
|
|
self._db.delete(db_project)
|
|
self._db.commit()
|
|
return None
|