Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions web/api/v1/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@


router = APIRouter(prefix="/admin", tags=["admin"])
security = HTTPBearer()
security = HTTPBearer(auto_error=False)


async def verify_admin_token(
async def verify_remote_reload_token(
credentials: HTTPAuthorizationCredentials = Depends(security),
admin_service: AdminService = Depends(get_admin_service)
) -> None:
"""
Verify the bearer token for admin authentication.
Verify the bearer token for remote reload authentication.

Args:
credentials: HTTP authorization credentials from request header
Expand All @@ -24,9 +24,15 @@ async def verify_admin_token(
401: Invalid or missing token
500: Server configuration error (token not configured)
"""
if credentials is None or credentials.scheme.lower() != "bearer":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Missing or invalid authentication token"
)

token = credentials.credentials
try:
if not await admin_service.verify_token(token):
if not await admin_service.verify_remote_reload_token(token):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication token"
Expand All @@ -52,7 +58,7 @@ async def verify_admin_token(
}
)
async def refresh_remotes(
_: None = Depends(verify_admin_token),
_: None = Depends(verify_remote_reload_token),
admin_service: AdminService = Depends(get_admin_service)
):
"""
Expand Down
42 changes: 33 additions & 9 deletions web/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,13 @@
Application configuration and settings.
"""
import os
import logging
from pathlib import Path
from functools import lru_cache
from typing import Optional


logger = logging.getLogger(__name__)


class Settings:
Expand Down Expand Up @@ -62,21 +67,40 @@ def remotes_json_path(self) -> str:
"""Path to remotes.json configuration."""
return os.path.join(self.base_dir, 'configs', 'remotes.json')

@property
def admin_token_file_path(self) -> str:
"""Path to admin token secret file."""
return os.path.join(self.base_dir, 'secrets', 'reload_token')

@property
def enable_inbuilt_builder(self) -> bool:
"""Whether to enable the inbuilt builder."""
return os.getenv('CBS_ENABLE_INBUILT_BUILDER', '1') == '1'

@property
def admin_token_env(self) -> str:
"""Token required to reload remotes.json via API."""
env = os.getenv('CBS_REMOTES_RELOAD_TOKEN', '')
return env if env != '' else None
def remote_reload_token(self) -> Optional[str]:
"""
Get remote reload token from file or environment variable.

Tries to read token from file first, falls back to environment variable.

Returns:
The authorization token if found, None otherwise
"""
token_file_path = os.path.join(self.base_dir, 'secrets', 'reload_token')

try:
# Try to read the secret token from the file
with open(token_file_path, 'r') as file:
token = file.read().strip()
return token
except (FileNotFoundError, PermissionError):
# If the file does not exist or no permission, check environment
env_token = os.getenv('CBS_REMOTES_RELOAD_TOKEN', '')
return env_token if env_token != '' else None
except Exception as e:
logger.error(
f"Unexpected error reading token file at {token_file_path}: {e}. "
"Checking environment for token."
)
# For any other error, fall back to environment variable
env_token = os.getenv('CBS_REMOTES_RELOAD_TOKEN', None)
return env_token if env_token != '' else None


@lru_cache()
Expand Down
76 changes: 28 additions & 48 deletions web/services/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,78 +2,43 @@
Admin service for handling administrative operations.
"""
import logging
from typing import Optional, List
from typing import List

from fastapi import Request
from fastapi import Depends, Request
from web.core.config import get_settings, Settings

from core.config import get_settings

logger = logging.getLogger(__name__)


class AdminService:
"""Service for managing administrative operations."""

def __init__(self, versions_fetcher=None):
def __init__(self, remote_reload_token: str, versions_fetcher=None):
"""
Initialize the admin service.

Args:
remote_reload_token: Remote reload authentication token
versions_fetcher: VersionsFetcher instance for managing remotes
"""
self.remote_reload_token = remote_reload_token
self.versions_fetcher = versions_fetcher
self.settings = get_settings()

def get_auth_token(self) -> Optional[str]:
async def verify_remote_reload_token(self, token: str) -> bool:
"""
Retrieve the authorization token from file or environment.

Returns:
The authorization token if found, None otherwise
"""
try:
# Try to read the secret token from the file
token_file_path = self.settings.admin_token_file_path
with open(token_file_path, 'r') as file:
token = file.read().strip()
return token
except (FileNotFoundError, PermissionError) as e:
logger.error(
f"Couldn't open token file at "
f"{self.settings.admin_token_file_path}: {e}. "
"Checking environment for token."
)
# If the file does not exist or no permission, check environment
return self.settings.admin_token_env
except Exception as e:
logger.error(
f"Unexpected error reading token file at "
f"{self.settings.admin_token_file_path}: {e}. "
"Checking environment for token."
)
# For any other error, fall back to environment variable
return self.settings.admin_token_env

async def verify_token(self, token: str) -> bool:
"""
Verify that the provided token matches the expected admin token.
Verify that the provided token matches the expected remote reload token.

Args:
token: The token to verify

Returns:
True if token is valid, False otherwise

Raises:
RuntimeError: If admin token is not configured on server
"""
expected_token = self.get_auth_token()

if expected_token is None:
logger.error("No admin token configured")
raise RuntimeError("Admin token not configured on server")
if not token:
return False

return token == expected_token
return token == self.remote_reload_token

async def refresh_remotes(self) -> List[str]:
"""
Expand Down Expand Up @@ -102,14 +67,29 @@ async def refresh_remotes(self) -> List[str]:
return remotes_refreshed


def get_admin_service(request: Request) -> AdminService:
def get_admin_service(
request: Request,
settings: Settings = Depends(get_settings)
) -> AdminService:
"""
Get AdminService instance with dependencies from app state.

Args:
request: FastAPI Request object
settings: Application settings

Returns:
AdminService instance initialized with app state dependencies

Raises:
RuntimeError: If remote reload token is not configured
"""
return AdminService(versions_fetcher=request.app.state.versions_fetcher)
remote_reload_token = settings.remote_reload_token

if remote_reload_token is None:
raise RuntimeError("Remote reload token not configured on server")

return AdminService(
remote_reload_token=remote_reload_token,
versions_fetcher=request.app.state.versions_fetcher
)
Loading