diff --git a/pinecone/db_control/models/__init__.py b/pinecone/db_control/models/__init__.py index eb958414..4ad33b20 100644 --- a/pinecone/db_control/models/__init__.py +++ b/pinecone/db_control/models/__init__.py @@ -22,6 +22,12 @@ ) from .schema_builder import SchemaBuilder from .deployment import ServerlessDeployment, ByocDeployment, PodDeployment, Deployment +from .compatibility_spec import ( + CompatibilitySpec, + ServerlessSpecCompat, + PodSpecCompat, + ByocSpecCompat, +) __all__ = [ @@ -51,4 +57,8 @@ "ByocDeployment", "PodDeployment", "Deployment", + "CompatibilitySpec", + "ServerlessSpecCompat", + "PodSpecCompat", + "ByocSpecCompat", ] diff --git a/pinecone/db_control/models/compatibility_spec.py b/pinecone/db_control/models/compatibility_spec.py new file mode 100644 index 00000000..c6891fec --- /dev/null +++ b/pinecone/db_control/models/compatibility_spec.py @@ -0,0 +1,131 @@ +"""Compatibility shim classes for backward compatibility with the old spec-based API. + +These classes map the new deployment-based API structure to the old spec-based +access patterns, ensuring existing code continues to work with the new API. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from typing import Dict + + +@dataclass +class ServerlessSpecCompat: + """Compatibility shim for serverless spec access. + + Provides the same interface as the old ServerlessSpec response for + backward compatibility. + + :param cloud: The cloud provider (e.g., "aws", "gcp", "azure"). + :param region: The cloud region (e.g., "us-east-1"). + """ + + cloud: str + region: str + + +@dataclass +class PodSpecCompat: + """Compatibility shim for pod spec access. + + Provides the same interface as the old PodSpec response for + backward compatibility. + + :param environment: The pod environment. + :param pod_type: The pod type (e.g., "p1.x1"). + :param replicas: Number of replicas. + :param shards: Number of shards. + :param pods: Total number of pods. + :param metadata_config: Metadata configuration dict. + :param source_collection: Source collection name, if any. + """ + + environment: str + pod_type: str + replicas: int + shards: int + pods: int + metadata_config: Dict[str, object] | None = None + source_collection: str | None = None + + +@dataclass +class ByocSpecCompat: + """Compatibility shim for BYOC spec access. + + Provides the same interface as the old ByocSpec response for + backward compatibility. + + :param environment: The BYOC environment identifier. + """ + + environment: str + + +class CompatibilitySpec: + """Compatibility wrapper that provides old-style spec access from new deployment data. + + This class wraps a deployment object from the alpha API and provides the old + `.spec.serverless` / `.spec.pod` / `.spec.byoc` access patterns for backward + compatibility. + + :param deployment: The deployment object from the API response. + """ + + def __init__(self, deployment: object): + self._deployment = deployment + + @property + def serverless(self) -> ServerlessSpecCompat | None: + """Get serverless spec if this is a serverless deployment. + + :returns: ServerlessSpecCompat if serverless deployment, None otherwise. + """ + deployment_type = getattr(self._deployment, "deployment_type", None) + if deployment_type == "serverless": + cloud = getattr(self._deployment, "cloud", "") + region = getattr(self._deployment, "region", "") + return ServerlessSpecCompat(cloud=cloud, region=region) + return None + + @property + def pod(self) -> PodSpecCompat | None: + """Get pod spec if this is a pod deployment. + + :returns: PodSpecCompat if pod deployment, None otherwise. + """ + deployment_type = getattr(self._deployment, "deployment_type", None) + if deployment_type == "pod": + environment = getattr(self._deployment, "environment", "") + pod_type = getattr(self._deployment, "pod_type", "p1.x1") + replicas = getattr(self._deployment, "replicas", 1) + shards = getattr(self._deployment, "shards", 1) + pods = getattr(self._deployment, "pods", 1) + metadata_config = getattr(self._deployment, "metadata_config", None) + source_collection = getattr(self._deployment, "source_collection", None) + return PodSpecCompat( + environment=environment, + pod_type=pod_type, + replicas=replicas, + shards=shards, + pods=pods, + metadata_config=metadata_config, + source_collection=source_collection, + ) + return None + + @property + def byoc(self) -> ByocSpecCompat | None: + """Get BYOC spec if this is a BYOC deployment. + + :returns: ByocSpecCompat if BYOC deployment, None otherwise. + """ + deployment_type = getattr(self._deployment, "deployment_type", None) + if deployment_type == "byoc": + environment = getattr(self._deployment, "environment", "") + return ByocSpecCompat(environment=environment) + return None diff --git a/pinecone/db_control/models/index_model.py b/pinecone/db_control/models/index_model.py index 769667df..5284b4c8 100644 --- a/pinecone/db_control/models/index_model.py +++ b/pinecone/db_control/models/index_model.py @@ -1,175 +1,187 @@ -from pinecone.core.openapi.db_control.model.index_model import IndexModel as OpenAPIIndexModel -from pinecone.core.openapi.db_control.model.index_spec import IndexSpec -from pinecone.core.openapi.db_control.model.serverless import Serverless -from pinecone.core.openapi.db_control.model.serverless_spec_response import ServerlessSpecResponse -from pinecone.core.openapi.db_control.model.read_capacity_response import ReadCapacityResponse -from pinecone.core.openapi.db_control.model.read_capacity_on_demand_spec_response import ( - ReadCapacityOnDemandSpecResponse, -) -from pinecone.core.openapi.db_control.model.read_capacity_dedicated_spec_response import ( - ReadCapacityDedicatedSpecResponse, -) -from pinecone.core.openapi.db_control.model.pod_based import PodBased -from pinecone.core.openapi.db_control.model.pod_spec import PodSpec -from pinecone.core.openapi.db_control.model.byoc import BYOC -from pinecone.core.openapi.db_control.model.byoc_spec import ByocSpec +"""IndexModel wrapper with backward compatibility shims. + +This module provides the IndexModel class that wraps the generated OpenAPI +IndexModel and provides backward compatibility for code written against the +old API structure. +""" + +from __future__ import annotations + import json +from typing import Literal, TYPE_CHECKING + +from pinecone.core.openapi.db_control.model.index_model import IndexModel as OpenAPIIndexModel from pinecone.utils.repr_overrides import custom_serializer -from pinecone.openapi_support.model_utils import deserialize_model + +from .compatibility_spec import CompatibilitySpec + +if TYPE_CHECKING: + pass class IndexModel: + """Wrapper for OpenAPI IndexModel that provides compatibility shims. + + This class wraps the generated OpenAPI IndexModel and provides backward + compatibility properties for code that was written against the old API + structure (using ``spec``, ``dimension``, ``metric`` at the top level). + + The new alpha API uses ``deployment`` instead of ``spec``, and stores + dimension/metric inside ``schema.fields``. This wrapper provides both + access patterns transparently. + + :param index: The underlying OpenAPI IndexModel instance. + + Example usage:: + + # New alpha API format with schema and deployment + index = pc.describe_index("my-index") + + # Old-style access still works via compatibility shims + print(index.spec.serverless.cloud) # Via CompatibilitySpec + print(index.dimension) # Extracted from schema.fields + print(index.metric) # Extracted from schema.fields + + # New-style access also works + print(index.deployment.cloud) + print(index.schema.fields) + """ + def __init__(self, index: OpenAPIIndexModel): self.index = index - self._spec_cache = None + self._spec_cache: CompatibilitySpec | None = None + self._vector_info_cache: ( + tuple[int | None, str | None, Literal["dense", "sparse"] | None] | None + ) = None - def __str__(self): + def __str__(self) -> str: return str(self.index) - def __getattr__(self, attr): + def __getattr__(self, attr: str) -> object: if attr == "spec": return self._get_spec() + if attr == "dimension": + return self._get_dimension() + if attr == "metric": + return self._get_metric() + if attr == "vector_type": + return self._get_vector_type() return getattr(self.index, attr) - def _get_spec(self): + def _get_spec(self) -> CompatibilitySpec | None: + """Get spec with backward compatibility from deployment data. + + Builds a CompatibilitySpec from the deployment data, providing the old + ``.spec.serverless`` / ``.spec.pod`` / ``.spec.byoc`` access patterns. + + :returns: CompatibilitySpec wrapper or None if no deployment. + """ if self._spec_cache is not None: return self._spec_cache - # Access _data_store directly to avoid OpenAPI model attribute resolution - spec_value = self.index._data_store.get("spec") - if spec_value is None: - # Fallback to getattr in case spec is stored differently - spec_value = getattr(self.index, "spec", None) - - if isinstance(spec_value, dict): - # Manually detect which oneOf schema to use and construct it directly - # This bypasses the broken oneOf matching logic in deserialize_model - # Get configuration from the underlying model if available - config = getattr(self.index, "_configuration", None) - path_to_item = getattr(self.index, "_path_to_item", ()) - # Convert to list if needed and append 'spec' to path_to_item for proper error reporting - if isinstance(path_to_item, (list, tuple)): - spec_path = list(path_to_item) + ["spec"] - else: - spec_path = ["spec"] - - # Check which oneOf key exists and construct the appropriate wrapper class - if "serverless" in spec_value: - # Deserialize the nested serverless dict to ServerlessSpecResponse - # (responses use ServerlessSpecResponse, not ServerlessSpec) - # First, handle nested read_capacity if present (it's also a oneOf with discriminator) - serverless_dict = dict(spec_value["serverless"]) - if "read_capacity" in serverless_dict and isinstance( - serverless_dict["read_capacity"], dict - ): - read_capacity_dict = serverless_dict["read_capacity"] - # Use discriminator to determine which ReadCapacity spec to use - mode = read_capacity_dict.get("mode") - if mode == "OnDemand": - read_capacity_spec = deserialize_model( - read_capacity_dict, - ReadCapacityOnDemandSpecResponse, - spec_path + ["serverless", "read_capacity"], - check_type=True, - configuration=config, - spec_property_naming=False, - ) - elif mode == "Dedicated": - read_capacity_spec = deserialize_model( - read_capacity_dict, - ReadCapacityDedicatedSpecResponse, - spec_path + ["serverless", "read_capacity"], - check_type=True, - configuration=config, - spec_property_naming=False, - ) - else: - # Fallback to ReadCapacityResponse (should use discriminator) - read_capacity_spec = deserialize_model( - read_capacity_dict, - ReadCapacityResponse, - spec_path + ["serverless", "read_capacity"], - check_type=True, - configuration=config, - spec_property_naming=False, - ) - serverless_dict["read_capacity"] = read_capacity_spec - - serverless_spec = deserialize_model( - serverless_dict, - ServerlessSpecResponse, - spec_path + ["serverless"], - check_type=True, - configuration=config, - spec_property_naming=False, - ) - # Instantiate Serverless wrapper, which IS the IndexSpec (oneOf union) - self._spec_cache = Serverless._new_from_openapi_data( - serverless=serverless_spec, - _check_type=True, - _path_to_item=spec_path, - _configuration=config, - _spec_property_naming=False, - ) - elif "pod" in spec_value: - # Deserialize the nested pod dict to PodSpec - pod_spec = deserialize_model( - spec_value["pod"], - PodSpec, - spec_path + ["pod"], - check_type=True, - configuration=config, - spec_property_naming=False, - ) - # Instantiate PodBased wrapper, which IS the IndexSpec (oneOf union) - self._spec_cache = PodBased._new_from_openapi_data( - pod=pod_spec, - _check_type=True, - _path_to_item=spec_path, - _configuration=config, - _spec_property_naming=False, - ) - elif "byoc" in spec_value: - # Deserialize the nested byoc dict to ByocSpec - byoc_spec = deserialize_model( - spec_value["byoc"], - ByocSpec, - spec_path + ["byoc"], - check_type=True, - configuration=config, - spec_property_naming=False, - ) - # Instantiate BYOC wrapper, which IS the IndexSpec (oneOf union) - self._spec_cache = BYOC._new_from_openapi_data( - byoc=byoc_spec, - _check_type=True, - _path_to_item=spec_path, - _configuration=config, - _spec_property_naming=False, - ) - else: - # Fallback: try deserialize_model (shouldn't happen with valid API responses) - self._spec_cache = deserialize_model( - spec_value, - IndexSpec, - spec_path, - check_type=True, - configuration=config, - spec_property_naming=False, - ) - elif spec_value is None: - self._spec_cache = None - else: - # Already an IndexSpec instance or some other object - self._spec_cache = spec_value - - return self._spec_cache - - def __getitem__(self, key): + deployment = self.index._data_store.get("deployment") + if deployment is not None: + self._spec_cache = CompatibilitySpec(deployment) + return self._spec_cache + + return None + + def _get_vector_field_info( + self, + ) -> tuple[int | None, str | None, Literal["dense", "sparse"] | None]: + """Extract dimension, metric, and vector_type from schema fields. + + Searches through the schema fields to find a vector field (dense_vector, + sparse_vector, or semantic_text) and extracts its properties. + + :returns: Tuple of (dimension, metric, vector_type). All values may be None + for FTS-only indexes. + """ + if self._vector_info_cache is not None: + return self._vector_info_cache + + schema = self.index._data_store.get("schema") + if schema is None: + result: tuple[int | None, str | None, Literal["dense", "sparse"] | None] = ( + None, + None, + None, + ) + self._vector_info_cache = result + return result + + fields = getattr(schema, "fields", None) + if fields is None: + result = (None, None, None) + self._vector_info_cache = result + return result + + # Look for vector fields in order of precedence + for field_config in fields.values(): + field_type = getattr(field_config, "type", None) + + if field_type == "dense_vector": + dimension: int | None = getattr(field_config, "dimension", None) + metric: str | None = getattr(field_config, "metric", None) + result = (dimension, metric, "dense") + self._vector_info_cache = result + return result + elif field_type == "sparse_vector": + metric = getattr(field_config, "metric", None) + result = (None, metric, "sparse") + self._vector_info_cache = result + return result + elif field_type == "semantic_text": + metric = getattr(field_config, "metric", None) + result = (None, metric, "dense") + self._vector_info_cache = result + return result + + # No vector fields found (FTS-only index) + result = (None, None, None) + self._vector_info_cache = result + return result + + def _get_dimension(self) -> int | None: + """Get the dimension of the index's vector field. + + Extracts dimension from schema.fields for dense vector fields. + + :returns: The dimension if a dense vector field exists, None otherwise. + """ + dimension, _, _ = self._get_vector_field_info() + return dimension + + def _get_metric(self) -> str | None: + """Get the metric of the index's vector field. + + Extracts metric from schema.fields for vector fields. + + :returns: The metric if a vector field exists, None otherwise. + """ + _, metric, _ = self._get_vector_field_info() + return metric + + def _get_vector_type(self) -> Literal["dense", "sparse"] | None: + """Get the vector type of the index. + + Derived from schema.fields based on the vector field type. + + :returns: "dense" for dense vectors, "sparse" for sparse vectors, None for FTS-only. + """ + _, _, vector_type = self._get_vector_field_info() + return vector_type + + def __getitem__(self, key: str) -> object: return self.__getattr__(key) - def __repr__(self): + def __repr__(self) -> str: return json.dumps(self.to_dict(), indent=4, default=custom_serializer) - def to_dict(self): - return self.index.to_dict() + def to_dict(self) -> dict[str, object]: + """Convert the IndexModel to a dictionary. + + :returns: Dictionary representation of the index. + """ + result: dict[str, object] = self.index.to_dict() + return result diff --git a/tests/unit/models/test_index_model.py b/tests/unit/models/test_index_model.py index 58d4288f..3e26ef8d 100644 --- a/tests/unit/models/test_index_model.py +++ b/tests/unit/models/test_index_model.py @@ -2,35 +2,282 @@ IndexModel as OpenApiIndexModel, IndexModelStatus, ) +from pinecone.core.openapi.db_control.model.schema import Schema +from pinecone.core.openapi.db_control.model.schema_fields import SchemaFields +from pinecone.core.openapi.db_control.model.deployment import Deployment from pinecone.db_control.models import IndexModel -from pinecone import CloudProvider, AwsRegion +from pinecone.db_control.models.compatibility_spec import ( + CompatibilitySpec, + ServerlessSpecCompat, + PodSpecCompat, + ByocSpecCompat, +) class TestIndexModel: - def test_index_model(self): + """Tests for IndexModel with the new alpha API format (2026-01.alpha).""" + + def test_index_model_serverless_deployment(self): + """Test IndexModel with serverless deployment in alpha format.""" + schema = Schema( + fields={ + "embedding": SchemaFields(type="dense_vector", dimension=1536, metric="cosine"), + "title": SchemaFields(type="string", filterable=True), + } + ) + deployment = Deployment(deployment_type="serverless", cloud="aws", region="us-east-1") openapi_model = OpenApiIndexModel( - name="test-index-1", - dimension=2, - metric="cosine", - host="https://test-index-1.pinecone.io", + name="test-alpha-index", + schema=schema, + deployment=deployment, + host="https://test-alpha-index.pinecone.io", status=IndexModelStatus(ready=True, state="Ready"), - deletion_protection="enabled", - spec={ - "serverless": { - "cloud": CloudProvider.AWS.value, - "region": AwsRegion.US_EAST_1.value, - } - }, ) wrapped = IndexModel(openapi_model) - assert wrapped.name == "test-index-1" - assert wrapped.dimension == 2 + # Test basic properties + assert wrapped.name == "test-alpha-index" + assert wrapped.host == "https://test-alpha-index.pinecone.io" + + # Test compatibility properties extracted from schema + assert wrapped.dimension == 1536 assert wrapped.metric == "cosine" - assert wrapped.host == "https://test-index-1.pinecone.io" - assert wrapped.status.ready == True - assert wrapped.status.state == "Ready" - assert wrapped.deletion_protection == "enabled" + assert wrapped.vector_type == "dense" + + # Test spec compatibility shim + assert wrapped.spec is not None + assert isinstance(wrapped.spec, CompatibilitySpec) + assert wrapped.spec.serverless is not None + assert isinstance(wrapped.spec.serverless, ServerlessSpecCompat) + assert wrapped.spec.serverless.cloud == "aws" + assert wrapped.spec.serverless.region == "us-east-1" + assert wrapped.spec.pod is None + assert wrapped.spec.byoc is None + + def test_index_model_pod_deployment(self): + """Test IndexModel with pod deployment in alpha format.""" + schema = Schema( + fields={ + "embedding": SchemaFields(type="dense_vector", dimension=768, metric="euclidean") + } + ) + deployment = Deployment( + deployment_type="pod", + environment="us-east-1-aws", + pod_type="p1.x1", + replicas=2, + shards=1, + pods=2, + ) + openapi_model = OpenApiIndexModel( + name="test-pod-index", + schema=schema, + deployment=deployment, + host="https://test-pod-index.pinecone.io", + status=IndexModelStatus(ready=True, state="Ready"), + ) + + wrapped = IndexModel(openapi_model) + + # Test compatibility properties + assert wrapped.dimension == 768 + assert wrapped.metric == "euclidean" + assert wrapped.vector_type == "dense" + + # Test spec compatibility shim + assert wrapped.spec is not None + assert wrapped.spec.pod is not None + assert isinstance(wrapped.spec.pod, PodSpecCompat) + assert wrapped.spec.pod.environment == "us-east-1-aws" + assert wrapped.spec.pod.pod_type == "p1.x1" + assert wrapped.spec.pod.replicas == 2 + assert wrapped.spec.pod.shards == 1 + assert wrapped.spec.pod.pods == 2 + assert wrapped.spec.serverless is None + assert wrapped.spec.byoc is None + + def test_index_model_byoc_deployment(self): + """Test IndexModel with BYOC deployment in alpha format.""" + schema = Schema( + fields={ + "embedding": SchemaFields(type="dense_vector", dimension=512, metric="dotproduct") + } + ) + deployment = Deployment(deployment_type="byoc", environment="aws-us-east-1-b92") + openapi_model = OpenApiIndexModel( + name="test-byoc-index", + schema=schema, + deployment=deployment, + host="https://test-byoc-index.pinecone.io", + status=IndexModelStatus(ready=True, state="Ready"), + ) + + wrapped = IndexModel(openapi_model) + + # Test compatibility properties + assert wrapped.dimension == 512 + assert wrapped.metric == "dotproduct" + assert wrapped.vector_type == "dense" + + # Test spec compatibility shim + assert wrapped.spec is not None + assert wrapped.spec.byoc is not None + assert isinstance(wrapped.spec.byoc, ByocSpecCompat) + assert wrapped.spec.byoc.environment == "aws-us-east-1-b92" + assert wrapped.spec.serverless is None + assert wrapped.spec.pod is None + + def test_index_model_sparse_vector(self): + """Test IndexModel with sparse vector field.""" + schema = Schema( + fields={"sparse_embedding": SchemaFields(type="sparse_vector", metric="dotproduct")} + ) + deployment = Deployment(deployment_type="serverless", cloud="aws", region="us-east-1") + openapi_model = OpenApiIndexModel( + name="test-sparse-index", + schema=schema, + deployment=deployment, + host="https://test-sparse-index.pinecone.io", + status=IndexModelStatus(ready=True, state="Ready"), + ) + + wrapped = IndexModel(openapi_model) + + # Sparse vectors don't have dimension + assert wrapped.dimension is None + assert wrapped.metric == "dotproduct" + assert wrapped.vector_type == "sparse" + + def test_index_model_fts_only(self): + """Test IndexModel with FTS-only index (no vector fields).""" + schema = Schema( + fields={ + "title": SchemaFields(type="string", full_text_searchable=True), + "content": SchemaFields(type="string", full_text_searchable=True), + "year": SchemaFields(type="integer", filterable=True), + } + ) + deployment = Deployment(deployment_type="serverless", cloud="aws", region="us-east-1") + openapi_model = OpenApiIndexModel( + name="test-fts-index", + schema=schema, + deployment=deployment, + host="https://test-fts-index.pinecone.io", + status=IndexModelStatus(ready=True, state="Ready"), + ) + + wrapped = IndexModel(openapi_model) + + # FTS-only indexes return None for vector properties + assert wrapped.dimension is None + assert wrapped.metric is None + assert wrapped.vector_type is None + + # spec still works + assert wrapped.spec is not None + assert wrapped.spec.serverless is not None + + def test_index_model_dict_access(self): + """Test dict-style access to IndexModel properties.""" + schema = Schema( + fields={"embedding": SchemaFields(type="dense_vector", dimension=1024, metric="cosine")} + ) + deployment = Deployment(deployment_type="serverless", cloud="gcp", region="us-central1") + openapi_model = OpenApiIndexModel( + name="test-dict-access", + schema=schema, + deployment=deployment, + host="https://test-dict-access.pinecone.io", + status=IndexModelStatus(ready=True, state="Ready"), + ) + + wrapped = IndexModel(openapi_model) + + # Test dict-style access + assert wrapped["name"] == "test-dict-access" + assert wrapped["dimension"] == 1024 + assert wrapped["metric"] == "cosine" + assert wrapped["vector_type"] == "dense" + + +class TestCompatibilitySpec: + """Direct tests for CompatibilitySpec class.""" + + def test_serverless_spec_compat(self): + """Test ServerlessSpecCompat dataclass.""" + spec = ServerlessSpecCompat(cloud="aws", region="us-west-2") + assert spec.cloud == "aws" + assert spec.region == "us-west-2" + + def test_pod_spec_compat(self): + """Test PodSpecCompat dataclass.""" + spec = PodSpecCompat( + environment="us-east-1-aws", + pod_type="s1.x2", + replicas=3, + shards=2, + pods=6, + metadata_config={"indexed": ["field1"]}, + source_collection="my-collection", + ) + assert spec.environment == "us-east-1-aws" + assert spec.pod_type == "s1.x2" + assert spec.replicas == 3 + assert spec.shards == 2 + assert spec.pods == 6 + assert spec.metadata_config == {"indexed": ["field1"]} + assert spec.source_collection == "my-collection" + + def test_byoc_spec_compat(self): + """Test ByocSpecCompat dataclass.""" + spec = ByocSpecCompat(environment="my-byoc-env") + assert spec.environment == "my-byoc-env" + + def test_compatibility_spec_serverless(self): + """Test CompatibilitySpec with serverless deployment.""" + + class MockDeployment: + deployment_type = "serverless" + cloud = "azure" + region = "eastus" + + compat = CompatibilitySpec(MockDeployment()) + assert compat.serverless is not None + assert compat.serverless.cloud == "azure" + assert compat.serverless.region == "eastus" + assert compat.pod is None + assert compat.byoc is None + + def test_compatibility_spec_pod(self): + """Test CompatibilitySpec with pod deployment.""" + + class MockDeployment: + deployment_type = "pod" + environment = "us-west-2-aws" + pod_type = "p2.x1" + replicas = 1 + shards = 1 + pods = 1 + metadata_config = None + source_collection = None + + compat = CompatibilitySpec(MockDeployment()) + assert compat.pod is not None + assert compat.pod.environment == "us-west-2-aws" + assert compat.pod.pod_type == "p2.x1" + assert compat.serverless is None + assert compat.byoc is None + + def test_compatibility_spec_byoc(self): + """Test CompatibilitySpec with BYOC deployment.""" + + class MockDeployment: + deployment_type = "byoc" + environment = "custom-env" - assert wrapped["name"] == "test-index-1" + compat = CompatibilitySpec(MockDeployment()) + assert compat.byoc is not None + assert compat.byoc.environment == "custom-env" + assert compat.serverless is None + assert compat.pod is None