diff --git a/web/api/v1/admin.py b/web/api/v1/admin.py index 5ce7e89..19b28b9 100644 --- a/web/api/v1/admin.py +++ b/web/api/v1/admin.py @@ -6,15 +6,15 @@ router = APIRouter(prefix="/admin", tags=["admin"]) -security = HTTPBearer() +security = HTTPBearer(auto_error=False) -async def verify_admin_token( +async def verify_remote_reload_token( credentials: HTTPAuthorizationCredentials = Depends(security), admin_service: AdminService = Depends(get_admin_service) ) -> None: """ - Verify the bearer token for admin authentication. + Verify the bearer token for remote reload authentication. Args: credentials: HTTP authorization credentials from request header @@ -24,9 +24,15 @@ async def verify_admin_token( 401: Invalid or missing token 500: Server configuration error (token not configured) """ + if credentials is None or credentials.scheme.lower() != "bearer": + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Missing or invalid authentication token" + ) + token = credentials.credentials try: - if not await admin_service.verify_token(token): + if not await admin_service.verify_remote_reload_token(token): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication token" @@ -52,7 +58,7 @@ async def verify_admin_token( } ) async def refresh_remotes( - _: None = Depends(verify_admin_token), + _: None = Depends(verify_remote_reload_token), admin_service: AdminService = Depends(get_admin_service) ): """ diff --git a/web/core/config.py b/web/core/config.py index f322b0e..9bfa170 100644 --- a/web/core/config.py +++ b/web/core/config.py @@ -2,8 +2,13 @@ Application configuration and settings. """ import os +import logging from pathlib import Path from functools import lru_cache +from typing import Optional + + +logger = logging.getLogger(__name__) class Settings: @@ -62,21 +67,40 @@ def remotes_json_path(self) -> str: """Path to remotes.json configuration.""" return os.path.join(self.base_dir, 'configs', 'remotes.json') - @property - def admin_token_file_path(self) -> str: - """Path to admin token secret file.""" - return os.path.join(self.base_dir, 'secrets', 'reload_token') - @property def enable_inbuilt_builder(self) -> bool: """Whether to enable the inbuilt builder.""" return os.getenv('CBS_ENABLE_INBUILT_BUILDER', '1') == '1' @property - def admin_token_env(self) -> str: - """Token required to reload remotes.json via API.""" - env = os.getenv('CBS_REMOTES_RELOAD_TOKEN', '') - return env if env != '' else None + def remote_reload_token(self) -> Optional[str]: + """ + Get remote reload token from file or environment variable. + + Tries to read token from file first, falls back to environment variable. + + Returns: + The authorization token if found, None otherwise + """ + token_file_path = os.path.join(self.base_dir, 'secrets', 'reload_token') + + try: + # Try to read the secret token from the file + with open(token_file_path, 'r') as file: + token = file.read().strip() + return token + except (FileNotFoundError, PermissionError): + # If the file does not exist or no permission, check environment + env_token = os.getenv('CBS_REMOTES_RELOAD_TOKEN', '') + return env_token if env_token != '' else None + except Exception as e: + logger.error( + f"Unexpected error reading token file at {token_file_path}: {e}. " + "Checking environment for token." + ) + # For any other error, fall back to environment variable + env_token = os.getenv('CBS_REMOTES_RELOAD_TOKEN', None) + return env_token if env_token != '' else None @lru_cache() diff --git a/web/services/admin.py b/web/services/admin.py index 83aa79f..6ba95c4 100644 --- a/web/services/admin.py +++ b/web/services/admin.py @@ -2,11 +2,11 @@ Admin service for handling administrative operations. """ import logging -from typing import Optional, List +from typing import List -from fastapi import Request +from fastapi import Depends, Request +from web.core.config import get_settings, Settings -from core.config import get_settings logger = logging.getLogger(__name__) @@ -14,66 +14,31 @@ class AdminService: """Service for managing administrative operations.""" - def __init__(self, versions_fetcher=None): + def __init__(self, remote_reload_token: str, versions_fetcher=None): """ Initialize the admin service. Args: + remote_reload_token: Remote reload authentication token versions_fetcher: VersionsFetcher instance for managing remotes """ + self.remote_reload_token = remote_reload_token self.versions_fetcher = versions_fetcher - self.settings = get_settings() - def get_auth_token(self) -> Optional[str]: + async def verify_remote_reload_token(self, token: str) -> bool: """ - Retrieve the authorization token from file or environment. - - Returns: - The authorization token if found, None otherwise - """ - try: - # Try to read the secret token from the file - token_file_path = self.settings.admin_token_file_path - with open(token_file_path, 'r') as file: - token = file.read().strip() - return token - except (FileNotFoundError, PermissionError) as e: - logger.error( - f"Couldn't open token file at " - f"{self.settings.admin_token_file_path}: {e}. " - "Checking environment for token." - ) - # If the file does not exist or no permission, check environment - return self.settings.admin_token_env - except Exception as e: - logger.error( - f"Unexpected error reading token file at " - f"{self.settings.admin_token_file_path}: {e}. " - "Checking environment for token." - ) - # For any other error, fall back to environment variable - return self.settings.admin_token_env - - async def verify_token(self, token: str) -> bool: - """ - Verify that the provided token matches the expected admin token. + Verify that the provided token matches the expected remote reload token. Args: token: The token to verify Returns: True if token is valid, False otherwise - - Raises: - RuntimeError: If admin token is not configured on server """ - expected_token = self.get_auth_token() - - if expected_token is None: - logger.error("No admin token configured") - raise RuntimeError("Admin token not configured on server") + if not token: + return False - return token == expected_token + return token == self.remote_reload_token async def refresh_remotes(self) -> List[str]: """ @@ -102,14 +67,29 @@ async def refresh_remotes(self) -> List[str]: return remotes_refreshed -def get_admin_service(request: Request) -> AdminService: +def get_admin_service( + request: Request, + settings: Settings = Depends(get_settings) +) -> AdminService: """ Get AdminService instance with dependencies from app state. Args: request: FastAPI Request object + settings: Application settings Returns: AdminService instance initialized with app state dependencies + + Raises: + RuntimeError: If remote reload token is not configured """ - return AdminService(versions_fetcher=request.app.state.versions_fetcher) + remote_reload_token = settings.remote_reload_token + + if remote_reload_token is None: + raise RuntimeError("Remote reload token not configured on server") + + return AdminService( + remote_reload_token=remote_reload_token, + versions_fetcher=request.app.state.versions_fetcher + )