1b50cd2cc5
The hardcoded placeholder '$2b$12$abc...' wasn't a valid bcrypt
hash (checksum had wrong base64 length), so passlib raised
ValueError on every user-not-found login attempt:
ValueError: malformed bcrypt hash (checksum must be ...)
Replace with hash_password('dummy-not-a-real-password') computed
once at module import. Same constant-time intent (verify cost is
identical to a real password check) but actually valid input.
95 lines
3.5 KiB
Python
95 lines
3.5 KiB
Python
"""
|
|
Local username/password authentication strategy.
|
|
|
|
Wraps the existing bcrypt-based login path so all auth flows go through
|
|
the same orchestrator. Account-lockout and failed-attempt counters live
|
|
on the User row (failed_login_attempts) — locking logic stays the same
|
|
as before this refactor.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.auth.jwt_handler import hash_password, verify_password
|
|
from app.auth.strategies.base import AuthError, AuthStrategy, ExternalIdentity
|
|
from app.models.user import AuthProvider, User
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
ACCOUNT_LOCKOUT_THRESHOLD = 5
|
|
ACCOUNT_LOCKOUT_DURATION_MIN = 15
|
|
|
|
# Pre-computed valid bcrypt hash used in the user-not-found branch so that
|
|
# verify cost is constant-time regardless of whether the username exists.
|
|
# Generated once at module import. The plaintext is irrelevant — it is never
|
|
# the user's secret and cannot match any real password.
|
|
_DUMMY_HASH = hash_password("dummy-not-a-real-password")
|
|
|
|
|
|
class LocalAuthStrategy(AuthStrategy):
|
|
"""
|
|
`auth_provider=LOCAL` users only. Username case-sensitive. Generic
|
|
error on every failure mode to prevent enumeration.
|
|
"""
|
|
|
|
name = AuthProvider.LOCAL
|
|
|
|
def __init__(self, db: Session):
|
|
self.db = db
|
|
|
|
@property
|
|
def is_enabled(self) -> bool:
|
|
return True # Local is always available as fallback / emergency admin
|
|
|
|
def authenticate_credentials(self, username: str, password: str) -> ExternalIdentity:
|
|
# Look up user but DO NOT branch behaviour based on existence —
|
|
# we always pay the bcrypt-verify cost to keep timing constant.
|
|
user = self.db.query(User).filter(User.username == username).first()
|
|
|
|
if user is None or user.auth_provider != AuthProvider.LOCAL:
|
|
# Constant-time dummy verify to prevent timing-based enumeration.
|
|
verify_password("anything", _DUMMY_HASH)
|
|
raise AuthError(
|
|
detail=f"unknown user '{username}'"
|
|
if user is None
|
|
else f"user '{username}' is not a local account (provider={user.auth_provider.value})",
|
|
)
|
|
|
|
if not user.is_active:
|
|
raise AuthError(detail=f"inactive user '{username}'")
|
|
|
|
# Account lockout
|
|
if user.failed_login_attempts >= ACCOUNT_LOCKOUT_THRESHOLD:
|
|
# If updated_at is older than lockout window, auto-reset.
|
|
unlock_after = (user.updated_at or user.created_at) + timedelta(
|
|
minutes=ACCOUNT_LOCKOUT_DURATION_MIN
|
|
)
|
|
if datetime.now() < unlock_after:
|
|
raise AuthError(
|
|
detail=f"user '{username}' is locked until {unlock_after.isoformat()}",
|
|
safe_message="Account temporarily locked. Try again later.",
|
|
)
|
|
user.failed_login_attempts = 0 # auto-reset
|
|
|
|
# Password check
|
|
if not user.password_hash or not verify_password(password, user.password_hash):
|
|
user.failed_login_attempts += 1
|
|
self.db.commit()
|
|
raise AuthError(detail=f"bad password for '{username}'")
|
|
|
|
# Success path: reset counter
|
|
user.failed_login_attempts = 0
|
|
self.db.commit()
|
|
|
|
return ExternalIdentity(
|
|
provider=AuthProvider.LOCAL,
|
|
external_id=str(user.id),
|
|
username=user.username,
|
|
email=user.email,
|
|
display_name=user.username,
|
|
groups=[],
|
|
)
|