diff --git a/pinecone/db_data/index.py b/pinecone/db_data/index.py index 08c1afcc..61c75036 100644 --- a/pinecone/db_data/index.py +++ b/pinecone/db_data/index.py @@ -914,6 +914,83 @@ def search_documents( documents=documents, usage=usage, _response_info=response_info ) + @validate_and_convert_errors + def upsert_documents(self, namespace: str, documents: list[dict[str, Any]]) -> UpsertResponse: + """Upsert documents into a namespace. + + This operation upserts flat JSON documents into a namespace. Documents are indexed + based on the configured index schema. Each document must have an ``_id`` field. + + Args: + namespace: The namespace to upsert documents into. + documents: A list of flat JSON documents to upsert. Each document must have an + ``_id`` field and fields that match the index's schema configuration. + + Returns: + UpsertResponse: Object containing the number of documents upserted. + + Examples: + + .. code-block:: python + + from pinecone import Pinecone + + pc = Pinecone() + index = pc.Index(host="example-index-host") + + # Upsert documents with pre-computed vectors + index.upsert_documents( + namespace="movies", + documents=[ + { + "_id": "movie-1", + "title": "Return of the Pink Panther", + "year": 1986, + "genre": "comedy", + "embedding": [0.1, 0.2, 0.3, ...] # matches schema field name + }, + { + "_id": "movie-2", + "title": "The Pink Panther Strikes Again", + "year": 1976, + "genre": "comedy", + "embedding": [0.3, 0.4, 0.5, ...] + } + ] + ) + + """ + if namespace is None: + raise ValueError("Namespace is required when upserting documents") + if not documents: + raise ValueError("At least one document is required") + + from pinecone.core.openapi.db_data.model.document_upsert_request import ( + DocumentUpsertRequest, + ) + + request = DocumentUpsertRequest(value=documents) + result = self.document_api.upsert_documents(namespace, request) + + # Extract response info + from pinecone.utils.response_info import extract_response_info + + response_info = None + if hasattr(result, "_response_info"): + response_info = result._response_info + if response_info is None: + response_info = extract_response_info({}) + + # Extract upserted_count from result + upserted_count = 0 + if hasattr(result, "upserted_count") and result.upserted_count is not None: + upserted_count = result.upserted_count + else: + # Fallback to document count if server doesn't return count + upserted_count = len(documents) + + return UpsertResponse(upserted_count=upserted_count, _response_info=response_info) + @validate_and_convert_errors def delete( self, diff --git a/pinecone/db_data/index_asyncio.py b/pinecone/db_data/index_asyncio.py index 1db2815f..9909f191 100644 --- a/pinecone/db_data/index_asyncio.py +++ b/pinecone/db_data/index_asyncio.py @@ -899,6 +899,88 @@ async def main(): documents=documents, usage=usage, _response_info=response_info ) + @validate_and_convert_errors + async def upsert_documents( + self, namespace: str, documents: List[Dict[str, Any]] + ) -> UpsertResponse: + """Upsert documents into a namespace. + + This operation upserts flat JSON documents into a namespace. Documents are indexed + based on the configured index schema. Each document must have an ``_id`` field. + + Args: + namespace: The namespace to upsert documents into. + documents: A list of flat JSON documents to upsert. Each document must have an + ``_id`` field and fields that match the index's schema configuration. + + Returns: + UpsertResponse: Object containing the number of documents upserted. + + Examples: + + .. code-block:: python + + import asyncio + from pinecone import Pinecone + + async def main(): + pc = Pinecone() + async with pc.IndexAsyncio(host="example-index-host") as index: + # Upsert documents with pre-computed vectors + await index.upsert_documents( + namespace="movies", + documents=[ + { + "_id": "movie-1", + "title": "Return of the Pink Panther", + "year": 1986, + "genre": "comedy", + "embedding": [0.1, 0.2, 0.3, ...] # matches schema field name + }, + { + "_id": "movie-2", + "title": "The Pink Panther Strikes Again", + "year": 1976, + "genre": "comedy", + "embedding": [0.3, 0.4, 0.5, ...] + } + ] + ) + + asyncio.run(main()) + + """ + if namespace is None: + raise ValueError("Namespace is required when upserting documents") + if not documents: + raise ValueError("At least one document is required") + + from pinecone.core.openapi.db_data.model.document_upsert_request import ( + DocumentUpsertRequest, + ) + + request = DocumentUpsertRequest(value=documents) + result = await self.document_api.upsert_documents(namespace, request) + + # Extract response info + from pinecone.utils.response_info import extract_response_info + + response_info = None + if hasattr(result, "_response_info"): + response_info = result._response_info + if response_info is None: + response_info = extract_response_info({}) + + # Extract upserted_count from result + upserted_count = 0 + if hasattr(result, "upserted_count") and result.upserted_count is not None: + upserted_count = result.upserted_count + else: + # Fallback to document count if server doesn't return count + upserted_count = len(documents) + + return UpsertResponse(upserted_count=upserted_count, _response_info=response_info) + def _openapi_kwargs(self, kwargs: dict[str, Any]) -> dict[str, Any]: return filter_dict(kwargs, OPENAPI_ENDPOINT_PARAMS) diff --git a/pinecone/db_data/index_asyncio_interface.py b/pinecone/db_data/index_asyncio_interface.py index 01eeafcd..8e6d6888 100644 --- a/pinecone/db_data/index_asyncio_interface.py +++ b/pinecone/db_data/index_asyncio_interface.py @@ -925,6 +925,59 @@ async def search_records( """Alias of the search() method.""" pass + @abstractmethod + async def upsert_documents( + self, namespace: str, documents: List[Dict[str, Any]] + ) -> UpsertResponse: + """Upsert documents into a namespace. + + This operation upserts flat JSON documents into a namespace. Documents are indexed + based on the configured index schema. Each document must have an ``_id`` field. + + Args: + namespace: The namespace to upsert documents into. + documents: A list of flat JSON documents to upsert. Each document must have an + ``_id`` field and fields that match the index's schema configuration. + + Returns: + UpsertResponse: Object containing the number of documents upserted. + + Examples: + + .. code-block:: python + + import asyncio + from pinecone import Pinecone + + async def main(): + pc = Pinecone() + async with pc.IndexAsyncio(host="example-index-host") as index: + # Upsert documents with pre-computed vectors + await index.upsert_documents( + namespace="movies", + documents=[ + { + "_id": "movie-1", + "title": "Return of the Pink Panther", + "year": 1986, + "genre": "comedy", + "embedding": [0.1, 0.2, 0.3, ...] + }, + { + "_id": "movie-2", + "title": "The Pink Panther Strikes Again", + "year": 1976, + "genre": "comedy", + "embedding": [0.3, 0.4, 0.5, ...] + } + ] + ) + + asyncio.run(main()) + + """ + pass + @abstractmethod @require_kwargs async def create_namespace( diff --git a/pinecone/db_data/interfaces.py b/pinecone/db_data/interfaces.py index e2f19efe..2adad40a 100644 --- a/pinecone/db_data/interfaces.py +++ b/pinecone/db_data/interfaces.py @@ -507,6 +507,54 @@ def search_documents( """ pass + @abstractmethod + def upsert_documents(self, namespace: str, documents: list[dict[str, Any]]) -> UpsertResponse: + """Upsert documents into a namespace. + + This operation upserts flat JSON documents into a namespace. Documents are indexed + based on the configured index schema. Each document must have an ``_id`` field. + + Args: + namespace: The namespace to upsert documents into. + documents: A list of flat JSON documents to upsert. Each document must have an + ``_id`` field and fields that match the index's schema configuration. + + Returns: + UpsertResponse: Object containing the number of documents upserted. + + Examples: + + .. code-block:: python + + from pinecone import Pinecone + + pc = Pinecone() + index = pc.Index(host="example-index-host") + + # Upsert documents with pre-computed vectors + index.upsert_documents( + namespace="movies", + documents=[ + { + "_id": "movie-1", + "title": "Return of the Pink Panther", + "year": 1986, + "genre": "comedy", + "embedding": [0.1, 0.2, 0.3, ...] # matches schema field name + }, + { + "_id": "movie-2", + "title": "The Pink Panther Strikes Again", + "year": 1976, + "genre": "comedy", + "embedding": [0.3, 0.4, 0.5, ...] + } + ] + ) + + """ + pass + @abstractmethod def delete( self, diff --git a/tests/unit/data/test_upsert_documents.py b/tests/unit/data/test_upsert_documents.py new file mode 100644 index 00000000..870b2dcf --- /dev/null +++ b/tests/unit/data/test_upsert_documents.py @@ -0,0 +1,217 @@ +"""Tests for upsert_documents functionality.""" + +import pytest +from unittest.mock import MagicMock, patch + +from pinecone.db_data.dataclasses import UpsertResponse +from pinecone.core.openapi.db_data.model.document_upsert_request import DocumentUpsertRequest + + +class TestDocumentUpsertRequest: + """Tests for the DocumentUpsertRequest model.""" + + def test_request_creation_with_documents(self): + """Test creating a DocumentUpsertRequest with documents.""" + documents = [ + {"_id": "doc1", "title": "Test Title", "embedding": [0.1, 0.2, 0.3]}, + {"_id": "doc2", "title": "Another Title", "embedding": [0.4, 0.5, 0.6]}, + ] + request = DocumentUpsertRequest(value=documents) + assert request.value == documents + + def test_request_with_single_document(self): + """Test creating a DocumentUpsertRequest with a single document.""" + documents = [{"_id": "doc1", "title": "Test", "embedding": [0.1, 0.2]}] + request = DocumentUpsertRequest(value=documents) + assert len(request.value) == 1 + assert request.value[0]["_id"] == "doc1" + + def test_request_with_various_field_types(self): + """Test request with various field types in documents.""" + documents = [ + { + "_id": "doc1", + "title": "Test", + "year": 2020, + "rating": 8.5, + "active": True, + "tags": ["action", "comedy"], + "embedding": [0.1, 0.2, 0.3], + } + ] + request = DocumentUpsertRequest(value=documents) + assert request.value[0]["year"] == 2020 + assert request.value[0]["rating"] == 8.5 + assert request.value[0]["active"] is True + assert request.value[0]["tags"] == ["action", "comedy"] + + +class TestUpsertDocumentsValidation: + """Tests for upsert_documents parameter validation.""" + + def test_namespace_required(self): + """Test that namespace is required.""" + from pinecone.db_data.index import Index + + # Create a mock index + with patch.object(Index, "__init__", lambda self, *args, **kwargs: None): + index = Index.__new__(Index) + index._document_api = None + + # Test with None namespace + with pytest.raises(ValueError, match="Namespace is required"): + index.upsert_documents(namespace=None, documents=[{"_id": "1"}]) + + def test_documents_required(self): + """Test that documents list is required and cannot be empty.""" + from pinecone.db_data.index import Index + + with patch.object(Index, "__init__", lambda self, *args, **kwargs: None): + index = Index.__new__(Index) + index._document_api = None + + # Test with empty documents + with pytest.raises(ValueError, match="At least one document is required"): + index.upsert_documents(namespace="test", documents=[]) + + +class TestUpsertDocumentsResponse: + """Tests for UpsertResponse from upsert_documents.""" + + def test_upsert_response_with_count(self): + """Test UpsertResponse with upserted_count.""" + from pinecone.utils.response_info import extract_response_info + + response = UpsertResponse(upserted_count=5, _response_info=extract_response_info({})) + assert response.upserted_count == 5 + + def test_upsert_response_access(self): + """Test accessing UpsertResponse fields.""" + from pinecone.utils.response_info import extract_response_info + + response = UpsertResponse(upserted_count=10, _response_info=extract_response_info({})) + assert response.upserted_count == 10 + assert response["upserted_count"] == 10 + + +class TestUpsertDocumentsIntegration: + """Integration-style tests for upsert_documents with mocked API.""" + + def test_upsert_documents_calls_api(self): + """Test that upsert_documents correctly calls the document API.""" + from pinecone.db_data.index import Index + + with patch.object(Index, "__init__", lambda self, *args, **kwargs: None): + index = Index.__new__(Index) + + # Mock the document_api + mock_api = MagicMock() + mock_response = MagicMock() + mock_response.upserted_count = 2 + mock_response._response_info = None + mock_api.upsert_documents.return_value = mock_response + + # Set up the mock + index._document_api = mock_api + + # Call the method + result = index.upsert_documents( + namespace="test-namespace", + documents=[{"_id": "doc1", "title": "Test 1"}, {"_id": "doc2", "title": "Test 2"}], + ) + + # Verify API was called + mock_api.upsert_documents.assert_called_once() + call_args = mock_api.upsert_documents.call_args + assert call_args[0][0] == "test-namespace" + assert isinstance(call_args[0][1], DocumentUpsertRequest) + + # Verify response + assert result.upserted_count == 2 + + def test_upsert_documents_uses_document_count_as_fallback(self): + """Test fallback to document count when server doesn't return count.""" + from pinecone.db_data.index import Index + + with patch.object(Index, "__init__", lambda self, *args, **kwargs: None): + index = Index.__new__(Index) + + # Mock the document_api with no upserted_count + mock_api = MagicMock() + mock_response = MagicMock() + mock_response.upserted_count = None + mock_response._response_info = None + mock_api.upsert_documents.return_value = mock_response + + index._document_api = mock_api + + # Call with 3 documents + result = index.upsert_documents( + namespace="test", + documents=[ + {"_id": "1", "text": "a"}, + {"_id": "2", "text": "b"}, + {"_id": "3", "text": "c"}, + ], + ) + + # Should fall back to document count + assert result.upserted_count == 3 + + +class TestUpsertDocumentsAsyncio: + """Tests for async upsert_documents.""" + + @pytest.mark.asyncio + async def test_async_upsert_documents_calls_api(self): + """Test that async upsert_documents correctly calls the document API.""" + from pinecone.db_data.index_asyncio import _IndexAsyncio + + with patch.object(_IndexAsyncio, "__init__", lambda self, *args, **kwargs: None): + index = _IndexAsyncio.__new__(_IndexAsyncio) + + # Mock the document_api + mock_api = MagicMock() + mock_response = MagicMock() + mock_response.upserted_count = 2 + mock_response._response_info = None + + # Make upsert_documents return a coroutine + async def mock_upsert(*args, **kwargs): + return mock_response + + mock_api.upsert_documents = mock_upsert + index._document_api = mock_api + + # Call the method + result = await index.upsert_documents( + namespace="test-namespace", + documents=[{"_id": "doc1", "title": "Test 1"}, {"_id": "doc2", "title": "Test 2"}], + ) + + # Verify response + assert result.upserted_count == 2 + + @pytest.mark.asyncio + async def test_async_namespace_required(self): + """Test that namespace is required for async method.""" + from pinecone.db_data.index_asyncio import _IndexAsyncio + + with patch.object(_IndexAsyncio, "__init__", lambda self, *args, **kwargs: None): + index = _IndexAsyncio.__new__(_IndexAsyncio) + index._document_api = None + + with pytest.raises(ValueError, match="Namespace is required"): + await index.upsert_documents(namespace=None, documents=[{"_id": "1"}]) + + @pytest.mark.asyncio + async def test_async_documents_required(self): + """Test that documents list is required for async method.""" + from pinecone.db_data.index_asyncio import _IndexAsyncio + + with patch.object(_IndexAsyncio, "__init__", lambda self, *args, **kwargs: None): + index = _IndexAsyncio.__new__(_IndexAsyncio) + index._document_api = None + + with pytest.raises(ValueError, match="At least one document is required"): + await index.upsert_documents(namespace="test", documents=[])