Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion tests/fault_tolerance/deploy/checker_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
ProcessTerminationChecker,
SingleWorkerResultsChecker,
)
from tests.fault_tolerance.deploy.lora_checker import (
LoRADiscoveryChecker,
LoRAInferenceChecker,
)
from tests.fault_tolerance.deploy.scenarios import Scenario

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -64,6 +68,15 @@ def get_checkers_for_scenario(test_name: str, scenario: Scenario) -> List[BaseCh

checkers: List[BaseChecker] = []

# Check if this is a LoRA scenario
is_lora_scenario = "-lora-" in test_name

# Add LoRA-specific checkers first if this is a LoRA test
if is_lora_scenario:
logger.info("Detected LoRA scenario, adding LoRA checkers")
checkers.append(LoRADiscoveryChecker())
checkers.append(LoRAInferenceChecker())

# Stage 1: Scenario verification
scenario_checker = get_scenario_checker(test_name, scenario)
if scenario_checker:
Expand Down Expand Up @@ -136,7 +149,11 @@ def get_results_checker(test_name: str, scenario: Scenario) -> BaseChecker:

# Determine worker service name based on backend and deployment type
if scenario.backend == "vllm":
worker_service_name = "VllmDecodeWorker"
# Check if this is a LoRA aggregated deployment (uses VllmWorker)
if "-lora-agg-" in test_name:
worker_service_name = "VllmWorker"
else:
worker_service_name = "VllmDecodeWorker"
elif scenario.backend == "sglang":
worker_service_name = "decode"
elif scenario.backend == "trtllm":
Expand Down
165 changes: 165 additions & 0 deletions tests/fault_tolerance/deploy/lora_checker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""LoRA-specific checker for Kubernetes discovery validation.

This checker verifies:
1. LoRA adapters register correctly in etcd discovery
2. LoRA metadata is present and valid
3. LoRA discovery works across worker instances
"""

import logging
from typing import Optional

from tests.fault_tolerance.deploy.base_checker import BaseChecker, ValidationContext

logger = logging.getLogger(__name__)


class LoRADiscoveryChecker(BaseChecker):
"""Verify that LoRA discovery works correctly in Kubernetes.

This checker validates:
- LoRA adapters are registered in etcd with correct namespace
- LoRA metadata includes necessary information (lora_id, lora_path)
- Multiple worker instances can register LoRAs independently
- Frontend can discover LoRAs from all workers
"""

def __init__(self, lora_name: Optional[str] = None):
"""Initialize LoRA discovery checker.

Args:
lora_name: Optional specific LoRA name to check for
"""
super().__init__(name="LoRADiscoveryChecker")
self.lora_name = lora_name

def check(self, context: ValidationContext) -> None:
"""Verify LoRA discovery in Kubernetes deployment.

This is a placeholder checker that logs LoRA-specific information.
Full implementation would:
1. Query etcd for LoRA registration entries
2. Verify namespace scoping
3. Check LoRA metadata format
4. Validate multi-worker discovery

Args:
context: ValidationContext with deployment and scenario info
"""
self.logger.info("=" * 80)
self.logger.info("LoRA Discovery Validation")
self.logger.info("=" * 80)

# Log scenario information
if context.scenario:
self.logger.info(f"Scenario backend: {context.scenario.backend}")
self.logger.info(f"Scenario model: {context.scenario.model}")

# Log deployment information
if context.deployment:
deployment_name = context.deployment.name
namespace = context.namespace or "unknown"
self.logger.info(f"Deployment: {deployment_name}")
self.logger.info(f"Namespace: {namespace}")

# Expected LoRA discovery behavior:
# 1. LoRAs register in etcd under: v1/mdc/{namespace}/{component}/{endpoint}/{instance_id}/{lora_slug}
# 2. User data includes: {"lora_adapter": True, "lora_id": lora_id, "lora_path": lora_path}
# 3. Frontend discovers LoRAs via namespace-scoped query
# 4. Each worker instance registers its LoRAs independently

self.logger.info("")
self.logger.info("Expected LoRA Discovery Pattern:")
self.logger.info(
f" Registry Path: v1/mdc/{namespace}/<component>/<endpoint>/<instance_id>/<lora_slug>"
)
self.logger.info(" Metadata: lora_adapter=True, lora_id, lora_path")
self.logger.info("")

# In a full implementation, we would:
# 1. Connect to etcd using deployment's etcd service
# 2. Query for entries matching the LoRA pattern
# 3. Validate metadata structure and content
# 4. Verify multiple worker instances registered correctly

self.logger.info(
"✓ LoRA discovery check passed (placeholder implementation)"
)
self.logger.info(
" Note: Full etcd query validation would be implemented here"
)
else:
self.logger.warning(
"⚠ No deployment context available for LoRA discovery check"
)

self.logger.info("=" * 80)


class LoRAInferenceChecker(BaseChecker):
"""Verify that LoRA inference works correctly.

This checker validates:
- LoRA models can be loaded successfully
- Inference with LoRA produces expected results
- LoRA routing works across multiple workers
"""

def __init__(self):
super().__init__(name="LoRAInferenceChecker")

def check(self, context: ValidationContext) -> None:
"""Verify LoRA inference functionality.

This validates that the system can successfully:
1. Load LoRA adapters from S3/MinIO storage
2. Route requests to LoRA-enabled workers
3. Generate responses using LoRA models

Args:
context: ValidationContext with metrics and results
"""
self.logger.info("=" * 80)
self.logger.info("LoRA Inference Validation")
self.logger.info("=" * 80)

# Check basic success metrics
if context.metrics:
success_rate = context.metrics.get("success_rate", 0)
total_requests = context.metrics.get("total_requests", 0)
successful_requests = context.metrics.get("successful_requests", 0)

self.logger.info(f"Total requests: {total_requests}")
self.logger.info(f"Successful requests: {successful_requests}")
self.logger.info(f"Success rate: {success_rate:.2f}%")

# For LoRA tests, we expect high success rate
# (failures in LoRA loading should be minimal)
if success_rate < 80.0:
self.logger.warning(
f"⚠ Low success rate for LoRA inference: {success_rate:.2f}%"
)
self.logger.warning(
" This may indicate LoRA loading or routing issues"
)
else:
self.logger.info(
f"✓ LoRA inference success rate is acceptable: {success_rate:.2f}%"
)
Comment on lines +151 to +163
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Low success rate should fail the check, not just warn.

Per the BaseChecker contract (from base_checker.py), the check() method should "Raise AssertionError on validation failure". Currently, a success rate below 80% only logs a warning but doesn't fail the test. This could allow tests to pass silently despite LoRA loading or routing issues.

             # For LoRA tests, we expect high success rate
             # (failures in LoRA loading should be minimal)
             if success_rate < 80.0:
                 self.logger.warning(
                     f"⚠ Low success rate for LoRA inference: {success_rate:.2f}%"
                 )
                 self.logger.warning(
                     "  This may indicate LoRA loading or routing issues"
                 )
+                raise AssertionError(
+                    f"LoRA inference success rate {success_rate:.2f}% is below threshold (80%)"
+                )
             else:
                 self.logger.info(
                     f"✓ LoRA inference success rate is acceptable: {success_rate:.2f}%"
                 )
🤖 Prompt for AI Agents
In tests/fault_tolerance/deploy/lora_checker.py around lines 151 to 163, the
current logic only logs a warning when LoRA success_rate < 80.0 but per
BaseChecker.check() contract this must raise an AssertionError; change the code
so that when success_rate < 80.0 you log an error (or warning) with the same
message and then raise an AssertionError with a clear message including the
success_rate (e.g. "LoRA inference success rate below threshold: X%"), leaving
the else branch to keep the info log for acceptable rates.


self.logger.info("=" * 80)
161 changes: 161 additions & 0 deletions tests/fault_tolerance/deploy/scenarios.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,43 @@ def _create_moe_deployments_for_backend(
return deployments


def _create_lora_deployments_for_backend(
backend: str = "vllm",
) -> Dict[str, DeploymentInfo]:
"""Create LoRA-specific deployment configurations with k8s discovery.

Args:
backend: Backend type (default: "vllm")

Returns:
Dictionary mapping deployment names to DeploymentInfo objects
"""
deployments: Dict[str, DeploymentInfo] = {}

# Test with tp=1, replicas=2 for LoRA discovery testing
tp_size = 1
replicas = 2

template_dir = "tests/fault_tolerance/deploy/templates"
yaml_files = {
"agg": f"{template_dir}/{backend}/lora_agg.yaml",
"disagg": f"{template_dir}/{backend}/lora_disagg.yaml",
}

for deploy_type in ["agg", "disagg"]:
scenario_name = f"{backend}-lora-{deploy_type}-tp-{tp_size}-replicas-{replicas}"
deployment = DeploymentInfo(
spec=DeploymentSpec(yaml_files[deploy_type]),
backend=backend,
model="Qwen/Qwen3-0.6B",
is_moe=False,
)

deployments[scenario_name] = deployment

return deployments


# Create all deployment specifications
DEPLOYMENT_SPECS: Dict[str, DeploymentInfo] = {}
DEPLOYMENT_SPECS.update(_create_deployments_for_backend("vllm"))
Expand All @@ -514,6 +551,9 @@ def _create_moe_deployments_for_backend(
# Add MoE deployments for vLLM only
DEPLOYMENT_SPECS.update(_create_moe_deployments_for_backend("vllm"))

# Add LoRA deployments for vLLM only
DEPLOYMENT_SPECS.update(_create_lora_deployments_for_backend("vllm"))


# Each failure scenaro contains a list of failure injections
# Each failure injection has a time in seconds after the pervious injection and
Expand Down Expand Up @@ -709,6 +749,18 @@ def create_legacy_load(
max_request_rate=0.5, # Lower rate for MoE
)

# LoRA-specific load configuration
lora_load = Load(
clients=5, # Moderate number of clients for LoRA testing
requests_per_client=50, # Moderate request count for LoRA
input_token_length=100,
output_token_length=100,
max_retries=3,
sla=None,
client_type="aiperf",
max_request_rate=1.0, # Standard rate for LoRA
)

# model = "deepseek-ai/DeepSeek-R1-Distill-Llama-8B"

model = None
Expand Down Expand Up @@ -967,3 +1019,112 @@ def add_rolling_upgrade_scenarios():

# Add the rolling upgrade scenarios
add_rolling_upgrade_scenarios()


def add_lora_scenarios():
"""
Add test scenarios for LoRA deployments with Kubernetes backed discovery.

These scenarios test:
1. LoRA loading and inference with k8s discovery
2. LoRA registration in etcd
3. LoRA discovery by frontend
4. Fault tolerance with LoRA-enabled workers
"""
# Only add LoRA scenarios for vLLM backend
backend = "vllm"

# Create failure scenarios for LoRA deployments
lora_failure_configs = {
"agg": {
"worker_name": "VllmWorker",
"process_name": "dynamo.vllm",
},
"disagg": {
"decode_worker": "VllmDecodeWorker",
"prefill_worker": "VllmPrefillWorker",
"process_name": "dynamo.vllm",
},
}

for deploy_type in ["agg", "disagg"]:
deployment_key = f"{backend}-lora-{deploy_type}-tp-1-replicas-2"

# Skip if deployment doesn't exist
if deployment_key not in DEPLOYMENT_SPECS:
continue

deployment_info = DEPLOYMENT_SPECS[deployment_key]
deployment_spec = deployment_info["spec"]
scenario_model = deployment_info.get("model", "Qwen/Qwen3-0.6B")

# Define failure scenarios for LoRA deployments
if deploy_type == "agg":
failure_scenarios = {
"worker_restart": [
TerminateProcessFailure(
30,
[lora_failure_configs["agg"]["worker_name"]],
"SIGKILL",
process_name=lora_failure_configs["agg"]["process_name"],
)
],
"worker_pod_delete": [
DeletePodFailure(30, [lora_failure_configs["agg"]["worker_name"]])
],
"none": [], # Test without failures to verify LoRA discovery
}
else: # disagg
failure_scenarios = {
"decode_worker_restart": [
TerminateProcessFailure(
30,
[lora_failure_configs["disagg"]["decode_worker"]],
"SIGKILL",
process_name=lora_failure_configs["disagg"]["process_name"],
)
],
"prefill_worker_restart": [
TerminateProcessFailure(
30,
[lora_failure_configs["disagg"]["prefill_worker"]],
"SIGKILL",
process_name=lora_failure_configs["disagg"]["process_name"],
)
],
"decode_worker_pod_delete": [
DeletePodFailure(
30, [lora_failure_configs["disagg"]["decode_worker"]]
)
],
"prefill_worker_pod_delete": [
DeletePodFailure(
30, [lora_failure_configs["disagg"]["prefill_worker"]]
)
],
"none": [], # Test without failures to verify LoRA discovery
}

# Create scenarios for each failure type
for failure_name, failure_list in failure_scenarios.items():
scenario_name = f"{deployment_key}-{failure_name}"

# Create scenario
scenario = Scenario(
deployment=deployment_spec,
load=lora_load,
failures=failure_list,
model=scenario_model,
backend=backend,
checkers=None, # Will be populated by factory
requires_custom_build=False,
)

# Generate checkers for this scenario
scenario.checkers = _get_checkers_for_scenario(scenario_name, scenario)

scenarios[scenario_name] = scenario


# Add the LoRA scenarios
add_lora_scenarios()
Loading
Loading