sumaq/backend/app/security.py

75 lines
2.9 KiB
Python

from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
from app.db.database import get_db
from app.models.models import User
import app.schemas
import hashlib
# Secret key for JWT (should be in env vars in production)
SECRET_KEY = "Bt50MaUvRYJ28UOIberyBlRVQCcKiYzVF2JHOFKjbBQq5xoOpowyxjY1tCOEzYEL"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 300
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
def verify_password(plain_password, hashed_password):
# Pre-hash the password with SHA-256 to ensure it's within bcrypt's 72-byte limit
pre_hashed = hashlib.sha256(plain_password.encode()).hexdigest()
# Try verifying the pre-hashed password first (new approach)
if pwd_context.verify(pre_hashed, hashed_password):
return True
# Fallback: Try verifying the raw password (legacy approach for existing accounts)
try:
return pwd_context.verify(plain_password, hashed_password)
except ValueError:
# If the password was already too long, it would have failed during registration
# but just in case, we catch the ValueError from bcrypt
return False
def get_password_hash(password):
# Pre-hash with SHA-256 to handle any length and stay under bcrypt's 72-byte limit
pre_hashed = hashlib.sha256(password.encode()).hexdigest()
return pwd_context.hash(pre_hashed)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
email: str = payload.get("sub")
if email is None:
raise credentials_exception
token_data = app.schemas.TokenData(email=email)
except JWTError:
raise credentials_exception
user = db.query(User).filter(User.email == token_data.email).first()
if user is None:
raise credentials_exception
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if not current_user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user