From 78c4640a1b00568b80dd80a80b7ad3aefc9e8f2e Mon Sep 17 00:00:00 2001 From: cauriol Date: Wed, 28 Jan 2026 17:41:06 +0100 Subject: [PATCH 01/26] feat: add zarr store --- .../eodag/extensions/data_download.py | 217 ++++++++++++++++-- 1 file changed, 199 insertions(+), 18 deletions(-) diff --git a/stac_fastapi/eodag/extensions/data_download.py b/stac_fastapi/eodag/extensions/data_download.py index e585f1dd..0e13ebaa 100644 --- a/stac_fastapi/eodag/extensions/data_download.py +++ b/stac_fastapi/eodag/extensions/data_download.py @@ -19,8 +19,10 @@ import glob import logging +import mimetypes import os from io import BufferedReader +from pathlib import Path from shutil import make_archive, rmtree from typing import Annotated, Iterator, Optional, Union, cast @@ -29,8 +31,8 @@ from eodag.api.product._product import EOProduct from eodag.api.product.metadata_mapping import ONLINE_STATUS, STAGING_STATUS, get_metadata_path_value from eodag.utils.exceptions import EodagError -from fastapi import APIRouter, FastAPI, Path, Request -from fastapi.responses import RedirectResponse, StreamingResponse +from fastapi import APIRouter, FastAPI, Path as PathParam, Request +from fastapi.responses import JSONResponse, RedirectResponse, StreamingResponse from stac_fastapi.api.errors import NotFoundError from stac_fastapi.api.routes import create_async_endpoint from stac_fastapi.types.extension import ApiExtension @@ -82,17 +84,81 @@ def _file_to_stream( }, ) + def _read_file_chunks(self, opened_file: BufferedReader, chunk_size: int = 64 * 1024) -> Iterator[bytes]: + """Yield file chunks without deleting.""" + try: + while True: + data = opened_file.read(chunk_size) + if not data: + break + yield data + finally: + opened_file.close() + def _read_file_chunks_and_delete(self, opened_file: BufferedReader, chunk_size: int = 64 * 1024) -> Iterator[bytes]: """Yield file chunks and delete file when finished.""" - while True: - data = opened_file.read(chunk_size) - if not data: - opened_file.close() - os.remove(opened_file.name) - logger.debug("%s deleted after streaming complete", opened_file.name) - break - yield data - yield data + try: + while True: + data = opened_file.read(chunk_size) + if not data: + break + yield data + finally: + opened_file.close() + os.remove(opened_file.name) + logger.debug("%s deleted after streaming complete", opened_file.name) + + def _list_zarr_files( + self, + zarr_store_path: str, + federation_backend: str, + collection_id: str, + item_id: str, + ) -> dict: + """List all files in a Zarr store.""" + files = [] + store_path = Path(zarr_store_path).resolve() + + try: + for file_path in store_path.rglob("*"): + if file_path.is_file(): + # Get relative path from store root + rel_path = file_path.relative_to(store_path) + files.append( + { + "path": str(rel_path), + "size": file_path.stat().st_size, + "url": f"/data/{federation_backend}/{collection_id}/{item_id}/{rel_path}", + } + ) + + # Sort files by path + files.sort(key=lambda x: x["path"]) + + return { + "type": "zarr-file-index", + "item_id": item_id, + "collection_id": collection_id, + "backend": federation_backend, + "store_path": str(store_path), + "file_count": len(files), + "files": files, + } + except Exception as e: + logger.error(f"Failed to list Zarr files for {item_id}: {e}") + raise + + def get_data_with_file( + self, + federation_backend: str, + collection_id: str, + item_id: str, + asset_name: Optional[str], + request: Request, + file_path: str, + ) -> Union[StreamingResponse, RedirectResponse, JSONResponse]: + """Download data with file path (wrapper for get_data).""" + return self.get_data(federation_backend, collection_id, item_id, asset_name, request, file_path) def get_data( self, @@ -101,7 +167,8 @@ def get_data( item_id: str, asset_name: Optional[str], request: Request, - ) -> Union[StreamingResponse, RedirectResponse]: + file_path: Optional[str] = None, + ) -> Union[StreamingResponse, RedirectResponse, JSONResponse]: """Download an asset""" dag = cast(EODataAccessGateway, request.app.state.dag) # type: ignore @@ -186,7 +253,7 @@ def get_data( raise NotFoundError(f"Item {item_id} does not exist. Please order it first") from e raise NotFoundError(e) from e - if product.downloader_auth and asset_name and asset_name != "downloadLink": + if product.downloader_auth and asset_name and asset_name not in ["downloadLink", "zarr"]: asset_values = product.assets[asset_name] # return presigned url if available try: @@ -197,6 +264,93 @@ def get_data( except EodagError: logger.info("Presigned url could not be fetched for %s", asset_name) + # Handle zarr store - return file listing or individual file + if asset_name == "zarr": + logger.debug("Accessing Zarr store for product %s", item_id) + try: + # Check if zarr asset exists + if "zarr" not in product.assets: + logger.error(f"No zarr asset found for product {item_id}") + logger.error(f"Available assets: {list(product.assets.keys())}") + raise NotFoundError(f"No zarr asset found for product '{item_id}'. Available assets: {list(product.assets.keys())}") + + logger.debug(f"Downloading zarr store for product {item_id}") + zarr_path = dag.download(product, extract=False, asset="zarr") + logger.debug(f"Zarr store downloaded to: {zarr_path}") + + if not Path(zarr_path).exists(): + logger.error(f"Zarr store path does not exist: {zarr_path}") + raise NotFoundError(f"Zarr store not found at {zarr_path}") + + # If file_path is provided, handle it specially + if file_path: + # Check if requesting the index/listing + if file_path == "index": + logger.debug(f"Returning Zarr index for product {item_id}") + zarr_index = self._list_zarr_files(zarr_path, federation_backend, collection_id, item_id) + return JSONResponse(content=zarr_index) + + # Otherwise, serve the requested file + logger.debug(f"Retrieving zarr file: {file_path}") + file_full_path = Path(zarr_path) / file_path + + # Security check: ensure path doesn't escape zarr store + try: + file_full_path.resolve().relative_to(Path(zarr_path).resolve()) + except ValueError: + logger.error(f"Path traversal attempt detected: {file_path}") + raise NotFoundError(f"Invalid file path: {file_path}") + + if not file_full_path.exists(): + logger.error(f"File not found: {file_full_path}") + raise NotFoundError(f"File not found: {file_path}") + + if not file_full_path.is_file(): + logger.error(f"Path is not a file: {file_full_path}") + raise NotFoundError(f"Path is not a file: {file_path}") + + # Stream the file + filename = os.path.basename(str(file_full_path)) + + # Determine content type and whether to download or display inline + content_type, _ = mimetypes.guess_type(str(file_full_path)) + if content_type is None: + content_type = "application/octet-stream" + + # For text-based formats (JSON, XML, etc), display inline in browser + # For binary formats (zarr, etc), force download + headers = { + "cache-control": "public, max-age=86400", + } + + if content_type.startswith("text/") or content_type in ["application/json", "application/xml"]: + # Display inline + headers["content-type"] = content_type + else: + # Force download + headers["content-disposition"] = f"attachment; filename={filename}" + headers["content-type"] = content_type + + return StreamingResponse( + content=self._read_file_chunks(open(str(file_full_path), "rb")), + headers=headers, + ) + + # If no file_path, try presigned URL or error + logger.debug(f"Attempting to get presigned URL for zarr asset") + try: + asset_values = product.assets["zarr"] + presigned_url = product.downloader_auth.presign_url(asset_values) + return RedirectResponse(presigned_url, status_code=302) + except (NotImplementedError, AttributeError): + logger.info("Presigned URLs not supported for zarr asset") + raise NotFoundError(f"Use /data/{federation_backend}/{collection_id}/{item_id}/zarr/index to list files or access individual files directly") + except NotFoundError: + raise + except Exception as e: + logger.error(f"Failed to access Zarr store: {type(e).__name__}: {e}", exc_info=True) + raise NotFoundError(f"Failed to retrieve Zarr store for item '{item_id}': {str(e)}") from e + try: s = product.downloader.stream_download( product, @@ -218,12 +372,23 @@ def get_data( @attr.s class DataDownloadUri(APIRequest): - """Download data.""" + """Download data without file path.""" + + federation_backend: Annotated[str, PathParam(description="Federation backend name")] = attr.ib() + collection_id: Annotated[str, PathParam(description="Collection ID")] = attr.ib() + item_id: Annotated[str, PathParam(description="Item ID")] = attr.ib() + asset_name: Annotated[str, PathParam(description="Asset name (e.g., 'zarr')")] = attr.ib() + - federation_backend: Annotated[str, Path(description="Federation backend name")] = attr.ib() - collection_id: Annotated[str, Path(description="Collection ID")] = attr.ib() - item_id: Annotated[str, Path(description="Item ID")] = attr.ib() - asset_name: Annotated[str, Path(description="Item ID")] = attr.ib() +@attr.s +class DataDownloadUriWithFile(APIRequest): + """Download data with file path.""" + + federation_backend: Annotated[str, PathParam(description="Federation backend name")] = attr.ib() + collection_id: Annotated[str, PathParam(description="Collection ID")] = attr.ib() + item_id: Annotated[str, PathParam(description="Item ID")] = attr.ib() + asset_name: Annotated[str, PathParam(description="Asset name (e.g., 'zarr')")] = attr.ib() + file_path: Annotated[str, PathParam(description="File path within zarr store")] = attr.ib() @attr.s @@ -250,6 +415,7 @@ def register(self, app: FastAPI) -> None: :returns: None """ self.router.prefix = app.state.router_prefix + # Route for /data/{backend}/{collection}/{item}/{asset_name} self.router.add_api_route( name="Download data", path="/data/{federation_backend}/{collection_id}/{item_id}/{asset_name}", @@ -263,4 +429,19 @@ def register(self, app: FastAPI) -> None: }, endpoint=create_async_endpoint(self.client.get_data, DataDownloadUri), ) + + # Route for /data/{backend}/{collection}/{item}/{asset_name}/{file_path} + self.router.add_api_route( + name="Download data with file path", + path="/data/{federation_backend}/{collection_id}/{item_id}/{asset_name}/{file_path:path}", + methods=["GET"], + responses={ + 200: { + "content": { + "application/octet-stream": {}, + }, + } + }, + endpoint=create_async_endpoint(self.client.get_data_with_file, DataDownloadUriWithFile), + ) app.include_router(self.router, tags=["Data download"]) From 354b7cf247a99f9f0d6dee70f91b7ad54d81a292 Mon Sep 17 00:00:00 2001 From: cauriol Date: Thu, 29 Jan 2026 16:00:50 +0100 Subject: [PATCH 02/26] fix: clean --- .../eodag/extensions/data_download.py | 60 ++++++++----------- 1 file changed, 25 insertions(+), 35 deletions(-) diff --git a/stac_fastapi/eodag/extensions/data_download.py b/stac_fastapi/eodag/extensions/data_download.py index 0e13ebaa..6e26f200 100644 --- a/stac_fastapi/eodag/extensions/data_download.py +++ b/stac_fastapi/eodag/extensions/data_download.py @@ -22,7 +22,7 @@ import mimetypes import os from io import BufferedReader -from pathlib import Path +from pathlib import Path as FilePath from shutil import make_archive, rmtree from typing import Annotated, Iterator, Optional, Union, cast @@ -31,7 +31,7 @@ from eodag.api.product._product import EOProduct from eodag.api.product.metadata_mapping import ONLINE_STATUS, STAGING_STATUS, get_metadata_path_value from eodag.utils.exceptions import EodagError -from fastapi import APIRouter, FastAPI, Path as PathParam, Request +from fastapi import APIRouter, FastAPI, Path, Request from fastapi.responses import JSONResponse, RedirectResponse, StreamingResponse from stac_fastapi.api.errors import NotFoundError from stac_fastapi.api.routes import create_async_endpoint @@ -97,16 +97,15 @@ def _read_file_chunks(self, opened_file: BufferedReader, chunk_size: int = 64 * def _read_file_chunks_and_delete(self, opened_file: BufferedReader, chunk_size: int = 64 * 1024) -> Iterator[bytes]: """Yield file chunks and delete file when finished.""" - try: - while True: - data = opened_file.read(chunk_size) - if not data: - break - yield data - finally: - opened_file.close() - os.remove(opened_file.name) - logger.debug("%s deleted after streaming complete", opened_file.name) + while True: + data = opened_file.read(chunk_size) + if not data: + opened_file.close() + os.remove(opened_file.name) + logger.debug("%s deleted after streaming complete", opened_file.name) + break + yield data + yield data def _list_zarr_files( self, @@ -117,7 +116,7 @@ def _list_zarr_files( ) -> dict: """List all files in a Zarr store.""" files = [] - store_path = Path(zarr_store_path).resolve() + store_path = FilePath(zarr_store_path).resolve() try: for file_path in store_path.rglob("*"): @@ -278,7 +277,7 @@ def get_data( zarr_path = dag.download(product, extract=False, asset="zarr") logger.debug(f"Zarr store downloaded to: {zarr_path}") - if not Path(zarr_path).exists(): + if not FilePath(zarr_path).exists(): logger.error(f"Zarr store path does not exist: {zarr_path}") raise NotFoundError(f"Zarr store not found at {zarr_path}") @@ -292,11 +291,11 @@ def get_data( # Otherwise, serve the requested file logger.debug(f"Retrieving zarr file: {file_path}") - file_full_path = Path(zarr_path) / file_path + file_full_path = FilePath(zarr_path) / file_path # Security check: ensure path doesn't escape zarr store try: - file_full_path.resolve().relative_to(Path(zarr_path).resolve()) + file_full_path.resolve().relative_to(FilePath(zarr_path).resolve()) except ValueError: logger.error(f"Path traversal attempt detected: {file_path}") raise NotFoundError(f"Invalid file path: {file_path}") @@ -336,15 +335,6 @@ def get_data( headers=headers, ) - # If no file_path, try presigned URL or error - logger.debug(f"Attempting to get presigned URL for zarr asset") - try: - asset_values = product.assets["zarr"] - presigned_url = product.downloader_auth.presign_url(asset_values) - return RedirectResponse(presigned_url, status_code=302) - except (NotImplementedError, AttributeError): - logger.info("Presigned URLs not supported for zarr asset") - raise NotFoundError(f"Use /data/{federation_backend}/{collection_id}/{item_id}/zarr/index to list files or access individual files directly") except NotFoundError: raise except Exception as e: @@ -372,23 +362,23 @@ def get_data( @attr.s class DataDownloadUri(APIRequest): - """Download data without file path.""" + """Download data.""" - federation_backend: Annotated[str, PathParam(description="Federation backend name")] = attr.ib() - collection_id: Annotated[str, PathParam(description="Collection ID")] = attr.ib() - item_id: Annotated[str, PathParam(description="Item ID")] = attr.ib() - asset_name: Annotated[str, PathParam(description="Asset name (e.g., 'zarr')")] = attr.ib() + federation_backend: Annotated[str, Path(description="Federation backend name")] = attr.ib() + collection_id: Annotated[str, Path(description="Collection ID")] = attr.ib() + item_id: Annotated[str, Path(description="Item ID")] = attr.ib() + asset_name: Annotated[str, Path(description="Item ID")] = attr.ib() @attr.s class DataDownloadUriWithFile(APIRequest): """Download data with file path.""" - federation_backend: Annotated[str, PathParam(description="Federation backend name")] = attr.ib() - collection_id: Annotated[str, PathParam(description="Collection ID")] = attr.ib() - item_id: Annotated[str, PathParam(description="Item ID")] = attr.ib() - asset_name: Annotated[str, PathParam(description="Asset name (e.g., 'zarr')")] = attr.ib() - file_path: Annotated[str, PathParam(description="File path within zarr store")] = attr.ib() + federation_backend: Annotated[str, Path(description="Federation backend name")] = attr.ib() + collection_id: Annotated[str, Path(description="Collection ID")] = attr.ib() + item_id: Annotated[str, Path(description="Item ID")] = attr.ib() + asset_name: Annotated[str, Path(description="Asset name")] = attr.ib() + file_path: Annotated[str, Path(description="File path within zarr store")] = attr.ib() @attr.s From baee4d92fc0a3715d24def96c1303d17c110bdc9 Mon Sep 17 00:00:00 2001 From: cauriol Date: Mon, 2 Feb 2026 17:31:49 +0100 Subject: [PATCH 03/26] fix: add return for the /zarr endpoint --- stac_fastapi/eodag/extensions/data_download.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/stac_fastapi/eodag/extensions/data_download.py b/stac_fastapi/eodag/extensions/data_download.py index 6e26f200..5ddf25ee 100644 --- a/stac_fastapi/eodag/extensions/data_download.py +++ b/stac_fastapi/eodag/extensions/data_download.py @@ -334,7 +334,13 @@ def get_data( content=self._read_file_chunks(open(str(file_full_path), "rb")), headers=headers, ) - + try: + asset_values = product.assets["zarr"] + presigned_url = product.downloader_auth.presign_url(asset_values) + return RedirectResponse(presigned_url, status_code=302) + except (NotImplementedError, AttributeError): + logger.info("Presigned URLs not supported for zarr asset") + raise NotFoundError(f"Use /data/{federation_backend}/{collection_id}/{item_id}/zarr/index to list files or access individual files directly") except NotFoundError: raise except Exception as e: From 31ead61d9a8e1975a21548a9da78cec9ee65a360 Mon Sep 17 00:00:00 2001 From: cauriol Date: Wed, 4 Feb 2026 17:44:27 +0100 Subject: [PATCH 04/26] fix: add some tests --- stac_fastapi/eodag/models/item.py | 7 ++ tests/test_zarr.py | 119 ++++++++++++++++++++++++++++++ 2 files changed, 126 insertions(+) create mode 100644 tests/test_zarr.py diff --git a/stac_fastapi/eodag/models/item.py b/stac_fastapi/eodag/models/item.py index 1f1eb5c7..4d556748 100644 --- a/stac_fastapi/eodag/models/item.py +++ b/stac_fastapi/eodag/models/item.py @@ -149,6 +149,13 @@ def create_stac_item( "type": mime_type, }, } + if "zarr" in product.assets and asset_proxy_url: + feature["assets"]["Zarr index"] = { + "title": "Download link", + "href": asset_proxy_url + "/zarr/index", + # TODO: download link is not always a ZIP archive + "type": mime_type, + } feature_model = model.model_validate( { diff --git a/tests/test_zarr.py b/tests/test_zarr.py new file mode 100644 index 00000000..376610ed --- /dev/null +++ b/tests/test_zarr.py @@ -0,0 +1,119 @@ +# -*- coding: utf-8 -*- +# Copyright 2025, CS GROUP - France, https://www.cs-soprasteria.com +# +# This file is part of stac-fastapi-eodag project +# https://www.github.com/CS-SI/stac-fastapi-eodag +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Zarr tests.""" + +from eodag import SearchResult +from eodag.api.product import EOProduct +from eodag.config import PluginConfig +from eodag.plugins.download.http import HTTPDownload + + +async def test_items_response_includes_zarr_index_asset(request_valid, defaults, mock_search, mock_search_result): + """Items response should include a Zarr index asset when zarr is present.""" + search_result = mock_search_result + product = search_result[0] + product.assets.update({"zarr": {"href": "https://data/internal_fdp/example.zarr"}}) + + mock_search.return_value = search_result + response = await request_valid(f"search?collections={defaults.collection}", search_result=search_result) + + item = response["features"][0] + assert "zarr" in item["assets"] + assert item["assets"]["zarr"]["href"] == f"http://testserver/data/peps/{item['collection']}/{item['id']}/zarr" + assert "Zarr index" in item["assets"] + assert ( + item["assets"]["Zarr index"]["href"] + == f"http://testserver/data/peps/{item['collection']}/{item['id']}/zarr/index" + ) + + +async def test_zarr_index_listing(request_valid_raw, defaults, mock_search, mock_download, tmp_dir): + """Zarr index should list all files in the store.""" + collection = defaults.collection + item_id = "dummy_id" + product = EOProduct( + "peps", + dict( + geometry="POINT (0 0)", + title="dummy_product", + id=item_id, + ), + collection=collection, + ) + product.assets.update({"zarr": {"href": "https://data/internal_fdp/example.zarr"}}) + config = PluginConfig() + config.priority = 0 + downloader = HTTPDownload("peps", config) + product.register_downloader(downloader=downloader, authenticator=None) + + store = tmp_dir / "store.zarr" + (store / "group").mkdir(parents=True) + (store / "group" / "foo.txt").write_text("hello", encoding="utf-8") + (store / "root.txt").write_text("root", encoding="utf-8") + + mock_search.return_value = SearchResult([product]) + mock_download.return_value = str(store) + + response = await request_valid_raw( + f"data/peps/{collection}/{item_id}/zarr/index", + search_result=SearchResult([product]), + ) + payload = response.json() + + assert payload["type"] == "zarr-file-index" + assert payload["item_id"] == item_id + assert payload["collection_id"] == collection + assert payload["backend"] == "peps" + assert payload["file_count"] == 2 + assert [f["path"] for f in payload["files"]] == ["group/foo.txt", "root.txt"] + assert payload["files"][0]["url"] == f"/data/peps/{collection}/{item_id}/group/foo.txt" + + +async def test_zarr_file_download(request_valid_raw, defaults, mock_search, mock_download, tmp_dir): + """Zarr file should be retrievable by path.""" + collection = defaults.collection + item_id = "dummy_id" + product = EOProduct( + "peps", + dict( + geometry="POINT (0 0)", + title="dummy_product", + id=item_id, + ), + collection=collection, + ) + product.assets.update({"zarr": {"href": "https://data/internal_fdp/example.zarr"}}) + config = PluginConfig() + config.priority = 0 + downloader = HTTPDownload("peps", config) + product.register_downloader(downloader=downloader, authenticator=None) + + store = tmp_dir / "store.zarr" + (store / "group").mkdir(parents=True) + (store / "group" / "foo.txt").write_text("hello", encoding="utf-8") + + mock_search.return_value = SearchResult([product]) + mock_download.return_value = str(store) + + response = await request_valid_raw( + f"data/peps/{collection}/{item_id}/zarr/group/foo.txt", + search_result=SearchResult([product]), + ) + + assert response.content == b"hello" + assert response.headers["content-type"].startswith("text/plain") From 2c9e5fffb4a9607bf1d95b1609d5bb28d598e064 Mon Sep 17 00:00:00 2001 From: cauriol Date: Thu, 5 Feb 2026 14:15:23 +0100 Subject: [PATCH 05/26] fix: pre commit --- .../eodag/extensions/data_download.py | 238 ++++++++++-------- tests/test_zarr.py | 18 +- 2 files changed, 148 insertions(+), 108 deletions(-) diff --git a/stac_fastapi/eodag/extensions/data_download.py b/stac_fastapi/eodag/extensions/data_download.py index 5ddf25ee..ef1be19b 100644 --- a/stac_fastapi/eodag/extensions/data_download.py +++ b/stac_fastapi/eodag/extensions/data_download.py @@ -24,7 +24,7 @@ from io import BufferedReader from pathlib import Path as FilePath from shutil import make_archive, rmtree -from typing import Annotated, Iterator, Optional, Union, cast +from typing import Annotated, Iterator, Optional, TypedDict, Union, cast import attr from eodag.api.core import EODataAccessGateway @@ -50,9 +50,136 @@ logger = logging.getLogger(__name__) +class ZarrFileEntry(TypedDict): + path: str + size: int + url: str + + class BaseDataDownloadClient: """Defines a pattern for implementing the data download extension.""" + def _try_presign_asset( + self, + product: EOProduct, + asset_name: Optional[str], + auth: Optional[dict], + ) -> Optional[RedirectResponse]: + """Return a presigned URL redirect when available.""" + if product.downloader_auth and asset_name and asset_name not in ["downloadLink", "zarr"]: + asset_values = product.assets[asset_name] + # return presigned url if available + try: + presigned_url = product.downloader_auth.presign_url(asset_values) + return RedirectResponse(presigned_url, status_code=302) + except NotImplementedError: + logger.info("Presigned urls not supported for %s with auth %s", product.downloader, auth) + except EodagError: + logger.info("Presigned url could not be fetched for %s", asset_name) + return None + + def _handle_zarr( + self, + dag: EODataAccessGateway, + product: EOProduct, + federation_backend: str, + collection_id: str, + item_id: str, + file_path: Optional[str], + ) -> Union[StreamingResponse, RedirectResponse, JSONResponse]: + """Handle Zarr store listing or file streaming.""" + logger.debug("Accessing Zarr store for product %s", item_id) + try: + # Check if zarr asset exists + if "zarr" not in product.assets: + logger.error(f"No zarr asset found for product {item_id}") + logger.error(f"Available assets: {list(product.assets.keys())}") + raise NotFoundError( + f"No zarr asset found for product '{item_id}'. Available assets: {list(product.assets.keys())}" + ) + + logger.debug(f"Downloading zarr store for product {item_id}") + zarr_path = dag.download(product, extract=False, asset="zarr") + logger.debug(f"Zarr store downloaded to: {zarr_path}") + + if not FilePath(zarr_path).exists(): + logger.error(f"Zarr store path does not exist: {zarr_path}") + raise NotFoundError(f"Zarr store not found at {zarr_path}") + + # If file_path is provided, handle it specially + if file_path: + # Check if requesting the index/listing + if file_path == "index": + logger.debug(f"Returning Zarr index for product {item_id}") + zarr_index = self._list_zarr_files(zarr_path, federation_backend, collection_id, item_id) + return JSONResponse(content=zarr_index) + + # Otherwise, serve the requested file + logger.debug(f"Retrieving zarr file: {file_path}") + file_full_path = FilePath(zarr_path) / file_path + + # Security check: ensure path doesn't escape zarr store + try: + file_full_path.resolve().relative_to(FilePath(zarr_path).resolve()) + except ValueError as err: + logger.error(f"Path traversal attempt detected: {file_path}") + raise NotFoundError(f"Invalid file path: {file_path}") from err + + if not file_full_path.exists(): + logger.error(f"File not found: {file_full_path}") + raise NotFoundError(f"File not found: {file_path}") + + if not file_full_path.is_file(): + logger.error(f"Path is not a file: {file_full_path}") + raise NotFoundError(f"Path is not a file: {file_path}") + + # Stream the file + filename = os.path.basename(str(file_full_path)) + + # Determine content type and whether to download or display inline + content_type, _ = mimetypes.guess_type(str(file_full_path)) + if content_type is None: + content_type = "application/octet-stream" + + # For text-based formats (JSON, XML, etc), display inline in browser + # For binary formats (zarr, etc), force download + headers = { + "cache-control": "public, max-age=86400", + } + + if content_type.startswith("text/") or content_type in ["application/json", "application/xml"]: + # Display inline + headers["content-type"] = content_type + else: + # Force download + headers["content-disposition"] = f"attachment; filename={filename}" + headers["content-type"] = content_type + + return StreamingResponse( + content=self._read_file_chunks(open(str(file_full_path), "rb")), + headers=headers, + ) + try: + if not product.downloader_auth: + raise NotFoundError( + f"Use /data/{federation_backend}/{collection_id}/{item_id}/zarr/index to list files " + "or access individual files directly" + ) + asset_values = product.assets["zarr"] + presigned_url = product.downloader_auth.presign_url(asset_values) + return RedirectResponse(presigned_url, status_code=302) + except NotImplementedError as err: + logger.info("Presigned URLs not supported for zarr asset") + raise NotFoundError( + f"Use /data/{federation_backend}/{collection_id}/{item_id}/zarr/index to list files \ + or access individual files directly" + ) from err + except NotFoundError: + raise + except Exception as e: + logger.error(f"Failed to access Zarr store: {type(e).__name__}: {e}", exc_info=True) + raise NotFoundError(f"Failed to retrieve Zarr store for item '{item_id}': {str(e)}") from e + def _file_to_stream( self, file_path: str, @@ -115,7 +242,7 @@ def _list_zarr_files( item_id: str, ) -> dict: """List all files in a Zarr store.""" - files = [] + files: list[ZarrFileEntry] = [] store_path = FilePath(zarr_store_path).resolve() try: @@ -124,11 +251,11 @@ def _list_zarr_files( # Get relative path from store root rel_path = file_path.relative_to(store_path) files.append( - { - "path": str(rel_path), - "size": file_path.stat().st_size, - "url": f"/data/{federation_backend}/{collection_id}/{item_id}/{rel_path}", - } + ZarrFileEntry( + path=str(rel_path), + size=file_path.stat().st_size, + url=f"/data/{federation_backend}/{collection_id}/{item_id}/zarr/{rel_path}", + ) ) # Sort files by path @@ -252,100 +379,13 @@ def get_data( raise NotFoundError(f"Item {item_id} does not exist. Please order it first") from e raise NotFoundError(e) from e - if product.downloader_auth and asset_name and asset_name not in ["downloadLink", "zarr"]: - asset_values = product.assets[asset_name] - # return presigned url if available - try: - presigned_url = product.downloader_auth.presign_url(asset_values) - return RedirectResponse(presigned_url, status_code=302) - except NotImplementedError: - logger.info("Presigned urls not supported for %s with auth %s", product.downloader, auth) - except EodagError: - logger.info("Presigned url could not be fetched for %s", asset_name) + presigned_response = self._try_presign_asset(product, asset_name, auth) + if presigned_response: + return presigned_response # Handle zarr store - return file listing or individual file if asset_name == "zarr": - logger.debug("Accessing Zarr store for product %s", item_id) - try: - # Check if zarr asset exists - if "zarr" not in product.assets: - logger.error(f"No zarr asset found for product {item_id}") - logger.error(f"Available assets: {list(product.assets.keys())}") - raise NotFoundError(f"No zarr asset found for product '{item_id}'. Available assets: {list(product.assets.keys())}") - - logger.debug(f"Downloading zarr store for product {item_id}") - zarr_path = dag.download(product, extract=False, asset="zarr") - logger.debug(f"Zarr store downloaded to: {zarr_path}") - - if not FilePath(zarr_path).exists(): - logger.error(f"Zarr store path does not exist: {zarr_path}") - raise NotFoundError(f"Zarr store not found at {zarr_path}") - - # If file_path is provided, handle it specially - if file_path: - # Check if requesting the index/listing - if file_path == "index": - logger.debug(f"Returning Zarr index for product {item_id}") - zarr_index = self._list_zarr_files(zarr_path, federation_backend, collection_id, item_id) - return JSONResponse(content=zarr_index) - - # Otherwise, serve the requested file - logger.debug(f"Retrieving zarr file: {file_path}") - file_full_path = FilePath(zarr_path) / file_path - - # Security check: ensure path doesn't escape zarr store - try: - file_full_path.resolve().relative_to(FilePath(zarr_path).resolve()) - except ValueError: - logger.error(f"Path traversal attempt detected: {file_path}") - raise NotFoundError(f"Invalid file path: {file_path}") - - if not file_full_path.exists(): - logger.error(f"File not found: {file_full_path}") - raise NotFoundError(f"File not found: {file_path}") - - if not file_full_path.is_file(): - logger.error(f"Path is not a file: {file_full_path}") - raise NotFoundError(f"Path is not a file: {file_path}") - - # Stream the file - filename = os.path.basename(str(file_full_path)) - - # Determine content type and whether to download or display inline - content_type, _ = mimetypes.guess_type(str(file_full_path)) - if content_type is None: - content_type = "application/octet-stream" - - # For text-based formats (JSON, XML, etc), display inline in browser - # For binary formats (zarr, etc), force download - headers = { - "cache-control": "public, max-age=86400", - } - - if content_type.startswith("text/") or content_type in ["application/json", "application/xml"]: - # Display inline - headers["content-type"] = content_type - else: - # Force download - headers["content-disposition"] = f"attachment; filename={filename}" - headers["content-type"] = content_type - - return StreamingResponse( - content=self._read_file_chunks(open(str(file_full_path), "rb")), - headers=headers, - ) - try: - asset_values = product.assets["zarr"] - presigned_url = product.downloader_auth.presign_url(asset_values) - return RedirectResponse(presigned_url, status_code=302) - except (NotImplementedError, AttributeError): - logger.info("Presigned URLs not supported for zarr asset") - raise NotFoundError(f"Use /data/{federation_backend}/{collection_id}/{item_id}/zarr/index to list files or access individual files directly") - except NotFoundError: - raise - except Exception as e: - logger.error(f"Failed to access Zarr store: {type(e).__name__}: {e}", exc_info=True) - raise NotFoundError(f"Failed to retrieve Zarr store for item '{item_id}': {str(e)}") from e + return self._handle_zarr(dag, product, federation_backend, collection_id, item_id, file_path) try: s = product.downloader.stream_download( @@ -425,7 +465,7 @@ def register(self, app: FastAPI) -> None: }, endpoint=create_async_endpoint(self.client.get_data, DataDownloadUri), ) - + # Route for /data/{backend}/{collection}/{item}/{asset_name}/{file_path} self.router.add_api_route( name="Download data with file path", diff --git a/tests/test_zarr.py b/tests/test_zarr.py index 376610ed..dc4b7912 100644 --- a/tests/test_zarr.py +++ b/tests/test_zarr.py @@ -73,15 +73,15 @@ async def test_zarr_index_listing(request_valid_raw, defaults, mock_search, mock f"data/peps/{collection}/{item_id}/zarr/index", search_result=SearchResult([product]), ) - payload = response.json() - - assert payload["type"] == "zarr-file-index" - assert payload["item_id"] == item_id - assert payload["collection_id"] == collection - assert payload["backend"] == "peps" - assert payload["file_count"] == 2 - assert [f["path"] for f in payload["files"]] == ["group/foo.txt", "root.txt"] - assert payload["files"][0]["url"] == f"/data/peps/{collection}/{item_id}/group/foo.txt" + res = response.json() + + assert res["type"] == "zarr-file-index" + assert res["item_id"] == item_id + assert res["collection_id"] == collection + assert res["backend"] == "peps" + assert res["file_count"] == 2 + assert [f["path"] for f in res["files"]] == ["group/foo.txt", "root.txt"] + assert res["files"][0]["url"] == f"/data/peps/{collection}/{item_id}/zarr/group/foo.txt" async def test_zarr_file_download(request_valid_raw, defaults, mock_search, mock_download, tmp_dir): From 5a96857b2cbc846e8ca6330ba308604c43652001 Mon Sep 17 00:00:00 2001 From: cauriol Date: Thu, 5 Feb 2026 16:05:25 +0100 Subject: [PATCH 06/26] fix: tests --- stac_fastapi/eodag/extensions/data_download.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/stac_fastapi/eodag/extensions/data_download.py b/stac_fastapi/eodag/extensions/data_download.py index ef1be19b..57b614ea 100644 --- a/stac_fastapi/eodag/extensions/data_download.py +++ b/stac_fastapi/eodag/extensions/data_download.py @@ -249,10 +249,10 @@ def _list_zarr_files( for file_path in store_path.rglob("*"): if file_path.is_file(): # Get relative path from store root - rel_path = file_path.relative_to(store_path) + rel_path = file_path.relative_to(store_path).as_posix() files.append( ZarrFileEntry( - path=str(rel_path), + path=rel_path, size=file_path.stat().st_size, url=f"/data/{federation_backend}/{collection_id}/{item_id}/zarr/{rel_path}", ) From 93287127ee767c13302977981fa53afe6f4fe36a Mon Sep 17 00:00:00 2001 From: cauriol Date: Thu, 5 Feb 2026 16:10:29 +0100 Subject: [PATCH 07/26] fix: add docstring --- stac_fastapi/eodag/extensions/data_download.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/stac_fastapi/eodag/extensions/data_download.py b/stac_fastapi/eodag/extensions/data_download.py index 57b614ea..7fc93366 100644 --- a/stac_fastapi/eodag/extensions/data_download.py +++ b/stac_fastapi/eodag/extensions/data_download.py @@ -51,6 +51,8 @@ class ZarrFileEntry(TypedDict): + """Zarr file listing item.""" + path: str size: int url: str From 905b5a3bc196d52a01617435951b5384a2814c54 Mon Sep 17 00:00:00 2001 From: cauriol Date: Thu, 5 Feb 2026 16:21:17 +0100 Subject: [PATCH 08/26] fix: clean code --- stac_fastapi/eodag/extensions/data_download.py | 13 +------------ stac_fastapi/eodag/models/item.py | 3 +-- 2 files changed, 2 insertions(+), 14 deletions(-) diff --git a/stac_fastapi/eodag/extensions/data_download.py b/stac_fastapi/eodag/extensions/data_download.py index 7fc93366..f088408e 100644 --- a/stac_fastapi/eodag/extensions/data_download.py +++ b/stac_fastapi/eodag/extensions/data_download.py @@ -158,7 +158,7 @@ def _handle_zarr( headers["content-type"] = content_type return StreamingResponse( - content=self._read_file_chunks(open(str(file_full_path), "rb")), + content=self._read_file_chunks_and_delete(open(str(file_full_path), "rb")), headers=headers, ) try: @@ -213,17 +213,6 @@ def _file_to_stream( }, ) - def _read_file_chunks(self, opened_file: BufferedReader, chunk_size: int = 64 * 1024) -> Iterator[bytes]: - """Yield file chunks without deleting.""" - try: - while True: - data = opened_file.read(chunk_size) - if not data: - break - yield data - finally: - opened_file.close() - def _read_file_chunks_and_delete(self, opened_file: BufferedReader, chunk_size: int = 64 * 1024) -> Iterator[bytes]: """Yield file chunks and delete file when finished.""" while True: diff --git a/stac_fastapi/eodag/models/item.py b/stac_fastapi/eodag/models/item.py index 4d556748..f3d49c35 100644 --- a/stac_fastapi/eodag/models/item.py +++ b/stac_fastapi/eodag/models/item.py @@ -153,8 +153,7 @@ def create_stac_item( feature["assets"]["Zarr index"] = { "title": "Download link", "href": asset_proxy_url + "/zarr/index", - # TODO: download link is not always a ZIP archive - "type": mime_type, + "type": "application/json", } feature_model = model.model_validate( From 77ef555ed17f7c7047cd516f25eafcd065868247 Mon Sep 17 00:00:00 2001 From: cauriol Date: Mon, 9 Feb 2026 17:56:23 +0100 Subject: [PATCH 09/26] fix: zarr index link title --- stac_fastapi/eodag/models/item.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stac_fastapi/eodag/models/item.py b/stac_fastapi/eodag/models/item.py index f3d49c35..8803375e 100644 --- a/stac_fastapi/eodag/models/item.py +++ b/stac_fastapi/eodag/models/item.py @@ -151,7 +151,7 @@ def create_stac_item( } if "zarr" in product.assets and asset_proxy_url: feature["assets"]["Zarr index"] = { - "title": "Download link", + "title": "Zarr store index", "href": asset_proxy_url + "/zarr/index", "type": "application/json", } From c2fd2d7b2d6e47423bf4460e4b34a17a0036ce64 Mon Sep 17 00:00:00 2001 From: cauriol Date: Wed, 18 Feb 2026 09:47:03 +0100 Subject: [PATCH 10/26] feat: use stream download for the index endpoint and zarr/path endpoint --- .../eodag/extensions/data_download.py | 243 +++++++++--------- 1 file changed, 123 insertions(+), 120 deletions(-) diff --git a/stac_fastapi/eodag/extensions/data_download.py b/stac_fastapi/eodag/extensions/data_download.py index f088408e..b5805123 100644 --- a/stac_fastapi/eodag/extensions/data_download.py +++ b/stac_fastapi/eodag/extensions/data_download.py @@ -21,16 +21,20 @@ import logging import mimetypes import os +import re +import tempfile from io import BufferedReader -from pathlib import Path as FilePath from shutil import make_archive, rmtree from typing import Annotated, Iterator, Optional, TypedDict, Union, cast +from urllib.parse import quote +from zipfile import BadZipFile, ZipFile import attr from eodag.api.core import EODataAccessGateway from eodag.api.product._product import EOProduct from eodag.api.product.metadata_mapping import ONLINE_STATUS, STAGING_STATUS, get_metadata_path_value from eodag.utils.exceptions import EodagError +from eodag.utils import StreamResponse from fastapi import APIRouter, FastAPI, Path, Request from fastapi.responses import JSONResponse, RedirectResponse, StreamingResponse from stac_fastapi.api.errors import NotFoundError @@ -58,6 +62,14 @@ class ZarrFileEntry(TypedDict): url: str +class StreamFileEntry(TypedDict): + """Stream file listing item.""" + + path: str + size: Optional[int] + url: str + + class BaseDataDownloadClient: """Defines a pattern for implementing the data download extension.""" @@ -82,105 +94,98 @@ def _try_presign_asset( def _handle_zarr( self, - dag: EODataAccessGateway, product: EOProduct, + stream: StreamResponse, federation_backend: str, collection_id: str, item_id: str, file_path: Optional[str], + asset_name: Optional[str], ) -> Union[StreamingResponse, RedirectResponse, JSONResponse]: """Handle Zarr store listing or file streaming.""" - logger.debug("Accessing Zarr store for product %s", item_id) - try: - # Check if zarr asset exists - if "zarr" not in product.assets: - logger.error(f"No zarr asset found for product {item_id}") - logger.error(f"Available assets: {list(product.assets.keys())}") - raise NotFoundError( - f"No zarr asset found for product '{item_id}'. Available assets: {list(product.assets.keys())}" - ) - - logger.debug(f"Downloading zarr store for product {item_id}") - zarr_path = dag.download(product, extract=False, asset="zarr") - logger.debug(f"Zarr store downloaded to: {zarr_path}") - - if not FilePath(zarr_path).exists(): - logger.error(f"Zarr store path does not exist: {zarr_path}") - raise NotFoundError(f"Zarr store not found at {zarr_path}") - - # If file_path is provided, handle it specially - if file_path: - # Check if requesting the index/listing - if file_path == "index": - logger.debug(f"Returning Zarr index for product {item_id}") - zarr_index = self._list_zarr_files(zarr_path, federation_backend, collection_id, item_id) - return JSONResponse(content=zarr_index) - - # Otherwise, serve the requested file - logger.debug(f"Retrieving zarr file: {file_path}") - file_full_path = FilePath(zarr_path) / file_path - - # Security check: ensure path doesn't escape zarr store - try: - file_full_path.resolve().relative_to(FilePath(zarr_path).resolve()) - except ValueError as err: - logger.error(f"Path traversal attempt detected: {file_path}") - raise NotFoundError(f"Invalid file path: {file_path}") from err - - if not file_full_path.exists(): - logger.error(f"File not found: {file_full_path}") - raise NotFoundError(f"File not found: {file_path}") - - if not file_full_path.is_file(): - logger.error(f"Path is not a file: {file_full_path}") - raise NotFoundError(f"Path is not a file: {file_path}") - - # Stream the file - filename = os.path.basename(str(file_full_path)) - - # Determine content type and whether to download or display inline - content_type, _ = mimetypes.guess_type(str(file_full_path)) - if content_type is None: - content_type = "application/octet-stream" - - # For text-based formats (JSON, XML, etc), display inline in browser - # For binary formats (zarr, etc), force download - headers = { - "cache-control": "public, max-age=86400", + if file_path == "index": + stream_file_url_prefix = f"/data/{federation_backend}/{collection_id}/{item_id}/{asset_name}" + stream_files = self._list_stream_files( + cast(Iterator[bytes], stream.content), + stream.headers, + stream.media_type, + stream_file_url_prefix, + ) + return JSONResponse( + content={ + "type": "stream-file-index", + "item_id": item_id, + "collection_id": collection_id, + "backend": federation_backend, + "media_type": stream.media_type, + "file_count": len(stream_files), + "files": stream_files, } + ) + elif file_path: + filename = file_path.split("/")[-1] + guessed_content_type, _ = mimetypes.guess_type(filename) + content_type = ( + guessed_content_type + or "application/octet-stream" + ) + stream_content_type = (stream.media_type or stream.headers.get("content-type", "")).lower() + if "application/zip" in stream_content_type: + temp_zip_path: Optional[str] = None + try: + with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as temp_file: + temp_zip_path = temp_file.name + for chunk in stream.content: + if chunk: + temp_file.write(chunk) + + with ZipFile(temp_zip_path) as zip_file: + normalized_requested_path = file_path.strip("/") + member_name = next( + ( + name + for name in zip_file.namelist() + if name.strip("/") == normalized_requested_path + or name.strip("/").endswith(f"/{normalized_requested_path}") + ), + None, + ) + if not member_name: + raise NotFoundError(f"File not found in zarr archive: {file_path}") + + extracted_temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=f"_{filename}") + extracted_temp_path = extracted_temp_file.name + with extracted_temp_file: + with zip_file.open(member_name) as source: + while True: + chunk = source.read(64 * 1024) + if not chunk: + break + extracted_temp_file.write(chunk) + finally: + if temp_zip_path and os.path.exists(temp_zip_path): + os.remove(temp_zip_path) + + headers = {"cache-control": "public, max-age=86400"} if content_type.startswith("text/") or content_type in ["application/json", "application/xml"]: - # Display inline headers["content-type"] = content_type else: - # Force download - headers["content-disposition"] = f"attachment; filename={filename}" headers["content-type"] = content_type + headers["content-disposition"] = f"attachment; filename={filename}" return StreamingResponse( - content=self._read_file_chunks_and_delete(open(str(file_full_path), "rb")), + content=self._read_file_chunks_and_delete(open(extracted_temp_path, "rb")), headers=headers, + media_type=content_type, ) - try: - if not product.downloader_auth: - raise NotFoundError( - f"Use /data/{federation_backend}/{collection_id}/{item_id}/zarr/index to list files " - "or access individual files directly" - ) - asset_values = product.assets["zarr"] - presigned_url = product.downloader_auth.presign_url(asset_values) - return RedirectResponse(presigned_url, status_code=302) - except NotImplementedError as err: - logger.info("Presigned URLs not supported for zarr asset") - raise NotFoundError( - f"Use /data/{federation_backend}/{collection_id}/{item_id}/zarr/index to list files \ - or access individual files directly" - ) from err - except NotFoundError: - raise - except Exception as e: - logger.error(f"Failed to access Zarr store: {type(e).__name__}: {e}", exc_info=True) - raise NotFoundError(f"Failed to retrieve Zarr store for item '{item_id}': {str(e)}") from e + # else: + # asset_values = product.assets["zarr"] + # presigned_url = product.downloader_auth.presign_url(asset_values) + # return RedirectResponse(presigned_url, status_code=302) + + + def _file_to_stream( self, @@ -225,45 +230,43 @@ def _read_file_chunks_and_delete(self, opened_file: BufferedReader, chunk_size: yield data yield data - def _list_zarr_files( + def _list_stream_files( self, - zarr_store_path: str, - federation_backend: str, - collection_id: str, - item_id: str, - ) -> dict: - """List all files in a Zarr store.""" - files: list[ZarrFileEntry] = [] - store_path = FilePath(zarr_store_path).resolve() - - try: - for file_path in store_path.rglob("*"): - if file_path.is_file(): - # Get relative path from store root - rel_path = file_path.relative_to(store_path).as_posix() - files.append( - ZarrFileEntry( - path=rel_path, - size=file_path.stat().st_size, - url=f"/data/{federation_backend}/{collection_id}/{item_id}/zarr/{rel_path}", + content: Iterator[bytes], + headers: dict[str, str], + media_type: Optional[str], + file_url_prefix: str, + ) -> list[StreamFileEntry]: + """List files contained in a streamed response.""" + normalized_media_type = (media_type or headers.get("content-type", "")).lower() + files: list[StreamFileEntry] = [] + + if "application/zip" in normalized_media_type: + temp_path: Optional[str] = None + try: + with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as temp_file: + temp_path = temp_file.name + for chunk in content: + if chunk: + temp_file.write(chunk) + + with ZipFile(temp_path) as zip_file: + for info in zip_file.infolist(): + quoted_path = quote(info.filename.lstrip("/"), safe="/") + files.append( + StreamFileEntry( + path=info.filename, + size=info.file_size, + url=f"{file_url_prefix}/{quoted_path}", + ) ) - ) - - # Sort files by path - files.sort(key=lambda x: x["path"]) - - return { - "type": "zarr-file-index", - "item_id": item_id, - "collection_id": collection_id, - "backend": federation_backend, - "store_path": str(store_path), - "file_count": len(files), - "files": files, - } - except Exception as e: - logger.error(f"Failed to list Zarr files for {item_id}: {e}") - raise + return files + except BadZipFile: + logger.warning("Could not inspect ZIP stream, falling back to headers") + finally: + if temp_path and os.path.exists(temp_path): + os.remove(temp_path) + def get_data_with_file( self, From 34569455d4fff41bc0e0254c8efba54429fab4ab Mon Sep 17 00:00:00 2001 From: cauriol Date: Thu, 19 Mar 2026 14:36:26 +0100 Subject: [PATCH 11/26] feat: zarr for destinee --- .../eodag/extensions/data_download.py | 20 +++++++++++++++++++ stac_fastapi/eodag/models/item.py | 2 +- 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/stac_fastapi/eodag/extensions/data_download.py b/stac_fastapi/eodag/extensions/data_download.py index b5805123..2b7104f7 100644 --- a/stac_fastapi/eodag/extensions/data_download.py +++ b/stac_fastapi/eodag/extensions/data_download.py @@ -18,6 +18,7 @@ """Data-download extension.""" import glob +from itertools import product import logging import mimetypes import os @@ -36,6 +37,7 @@ from eodag.utils.exceptions import EodagError from eodag.utils import StreamResponse from fastapi import APIRouter, FastAPI, Path, Request +import requests from fastapi.responses import JSONResponse, RedirectResponse, StreamingResponse from stac_fastapi.api.errors import NotFoundError from stac_fastapi.api.routes import create_async_endpoint @@ -373,6 +375,24 @@ def get_data( raise NotFoundError(f"Item {item_id} does not exist. Please order it first") from e raise NotFoundError(e) from e + zarr_asset_name = next( + (name for name in product.assets if name.endswith(".zarr")), None + ) + if zarr_asset_name == asset_name and "DT_CLIMATE_ADAPTATION" in collection_id: + asset_values = product.assets[zarr_asset_name] + base_url = asset_values["href"] + target_url = f"{base_url.rstrip('/')}/{file_path.lstrip('/')}" + + username = "anonymous" + r = requests.get(target_url, auth=(username, auth.refresh_token), stream=True) + + return StreamingResponse( + r.iter_content(chunk_size=1024*1024), + status_code=r.status_code, + media_type=r.headers.get("Content-Type", "application/octet-stream"), + headers={k: v for k, v in r.headers.items() if k.lower() not in ["content-encoding", "transfer-encoding"]} + ) + presigned_response = self._try_presign_asset(product, asset_name, auth) if presigned_response: return presigned_response diff --git a/stac_fastapi/eodag/models/item.py b/stac_fastapi/eodag/models/item.py index 8803375e..c43389a0 100644 --- a/stac_fastapi/eodag/models/item.py +++ b/stac_fastapi/eodag/models/item.py @@ -149,7 +149,7 @@ def create_stac_item( "type": mime_type, }, } - if "zarr" in product.assets and asset_proxy_url: + if any("zarr" in key for key in product.assets) and asset_proxy_url: feature["assets"]["Zarr index"] = { "title": "Zarr store index", "href": asset_proxy_url + "/zarr/index", From 756d3a6255923eecc855bd7d6895d69f98b8c333 Mon Sep 17 00:00:00 2001 From: cauriol Date: Mon, 23 Mar 2026 09:40:01 +0100 Subject: [PATCH 12/26] fix: auth for desp cache --- stac_fastapi/eodag/extensions/data_download.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/stac_fastapi/eodag/extensions/data_download.py b/stac_fastapi/eodag/extensions/data_download.py index 2b7104f7..7f030f51 100644 --- a/stac_fastapi/eodag/extensions/data_download.py +++ b/stac_fastapi/eodag/extensions/data_download.py @@ -383,8 +383,7 @@ def get_data( base_url = asset_values["href"] target_url = f"{base_url.rstrip('/')}/{file_path.lstrip('/')}" - username = "anonymous" - r = requests.get(target_url, auth=(username, auth.refresh_token), stream=True) + r = requests.get(target_url, auth=auth, stream=True) return StreamingResponse( r.iter_content(chunk_size=1024*1024), From 33beb2a4b6999828f606ceecc1d31991a63e4c25 Mon Sep 17 00:00:00 2001 From: cauriol Date: Mon, 23 Mar 2026 15:56:38 +0100 Subject: [PATCH 13/26] fix: zarr index --- .../eodag/extensions/data_download.py | 236 +++++++++--------- 1 file changed, 118 insertions(+), 118 deletions(-) diff --git a/stac_fastapi/eodag/extensions/data_download.py b/stac_fastapi/eodag/extensions/data_download.py index 7f030f51..acfa6c97 100644 --- a/stac_fastapi/eodag/extensions/data_download.py +++ b/stac_fastapi/eodag/extensions/data_download.py @@ -19,6 +19,7 @@ import glob from itertools import product +import json import logging import mimetypes import os @@ -31,6 +32,7 @@ from zipfile import BadZipFile, ZipFile import attr +import zarr from eodag.api.core import EODataAccessGateway from eodag.api.product._product import EOProduct from eodag.api.product.metadata_mapping import ONLINE_STATUS, STAGING_STATUS, get_metadata_path_value @@ -97,97 +99,110 @@ def _try_presign_asset( def _handle_zarr( self, product: EOProduct, - stream: StreamResponse, + base_url: str, federation_backend: str, collection_id: str, item_id: str, file_path: Optional[str], asset_name: Optional[str], + auth: Optional[dict] = None, ) -> Union[StreamingResponse, RedirectResponse, JSONResponse]: """Handle Zarr store listing or file streaming.""" - if file_path == "index": - stream_file_url_prefix = f"/data/{federation_backend}/{collection_id}/{item_id}/{asset_name}" - stream_files = self._list_stream_files( - cast(Iterator[bytes], stream.content), - stream.headers, - stream.media_type, - stream_file_url_prefix, + # List all files in the zarr store + try: + stream_files = self._list_zarr_files_from_metadata( + base_url, auth, federation_backend, collection_id, item_id, asset_name ) + return JSONResponse( content={ "type": "stream-file-index", "item_id": item_id, "collection_id": collection_id, "backend": federation_backend, - "media_type": stream.media_type, "file_count": len(stream_files), "files": stream_files, } ) - elif file_path: - filename = file_path.split("/")[-1] - guessed_content_type, _ = mimetypes.guess_type(filename) - content_type = ( - guessed_content_type - or "application/octet-stream" - ) + except Exception as e: + logger.error(f"Failed to list zarr files: {e}") + raise NotFoundError(f"Failed to list zarr store files: {e}") from e + - stream_content_type = (stream.media_type or stream.headers.get("content-type", "")).lower() - if "application/zip" in stream_content_type: - temp_zip_path: Optional[str] = None + def _list_zarr_files_from_metadata( + self, + base_url: str, + auth: Optional[dict], + federation_backend: str, + collection_id: str, + item_id: str, + asset_name: str, + ) -> list[StreamFileEntry]: + """List all files from zarr store by parsing .zmetadata.""" + import fsspec + import base64 + + files: list[StreamFileEntry] = [] + + try: + # Build headers for authentication if auth is provided + headers = {} + # if auth and isinstance(auth, dict) and "refresh_token" in auth: + auth_str = f"anonymous:{auth.refresh_token}" + headers["Authorization"] = "Basic " + base64.b64encode(auth_str.encode("utf-8")).decode("utf-8") + # Get mapper with fsspec + mapper = fsspec.get_mapper( + base_url, + client_kwargs={ + "headers": headers, + "trust_env": False + } + ) + + # Read .zmetadata + if ".zmetadata" in mapper: + meta = json.loads(mapper[".zmetadata"]) + logger.debug(f"Found {len(meta['metadata'])} entries in .zmetadata") + key = meta["metadata"].keys() + # Add .zmetadata file itself to the listing to allow clients to read it and get metadata for all files in the store + quoted_path = quote(".zmetadata", safe="/") + files.append( + StreamFileEntry( + path=quoted_path, + url=f"{federation_backend}/{collection_id}/{item_id}/{asset_name}/{quoted_path}", + ) + ) + else: # try zarr v3 metadata file + """TO DO + .zmetadata is present for zarr v2, but not for v3, we should support both. + For Zarr v3, there is no .zmetata but zarr.json file instead, we can try to read it and parse the metadata from it to list files in the store. + for exemple: + elif "zarr.json" in mapper: + meta = json.loads(mapper["zarr.json"]) + logger.debug(f"Found {len(meta['metadata'])} entries in zarr.json") + key = meta["metadata"].keys() + """ + + + # Iterate through all files in the metadata + for file_path in key: try: - with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as temp_file: - temp_zip_path = temp_file.name - for chunk in stream.content: - if chunk: - temp_file.write(chunk) - - with ZipFile(temp_zip_path) as zip_file: - normalized_requested_path = file_path.strip("/") - member_name = next( - ( - name - for name in zip_file.namelist() - if name.strip("/") == normalized_requested_path - or name.strip("/").endswith(f"/{normalized_requested_path}") - ), - None, + quoted_path = quote(file_path, safe="/") + files.append( + StreamFileEntry( + path=file_path, + url=f"{federation_backend}/{collection_id}/{item_id}/{asset_name}/{quoted_path}", ) - if not member_name: - raise NotFoundError(f"File not found in zarr archive: {file_path}") - - extracted_temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=f"_{filename}") - extracted_temp_path = extracted_temp_file.name - with extracted_temp_file: - with zip_file.open(member_name) as source: - while True: - chunk = source.read(64 * 1024) - if not chunk: - break - extracted_temp_file.write(chunk) - finally: - if temp_zip_path and os.path.exists(temp_zip_path): - os.remove(temp_zip_path) - - headers = {"cache-control": "public, max-age=86400"} - if content_type.startswith("text/") or content_type in ["application/json", "application/xml"]: - headers["content-type"] = content_type - else: - headers["content-type"] = content_type - headers["content-disposition"] = f"attachment; filename={filename}" - - return StreamingResponse( - content=self._read_file_chunks_and_delete(open(extracted_temp_path, "rb")), - headers=headers, - media_type=content_type, - ) - # else: - # asset_values = product.assets["zarr"] - # presigned_url = product.downloader_auth.presign_url(asset_values) - # return RedirectResponse(presigned_url, status_code=302) - - - + ) + except Exception as e: + logger.debug(f"Could not get metadata for {file_path}: {e}") + + logger.debug(f"Listed {len(files)} zarr files") + return files + + except Exception as e: + logger.error(f"Failed to list zarr files from metadata: {e}") + raise def _file_to_stream( self, @@ -232,44 +247,6 @@ def _read_file_chunks_and_delete(self, opened_file: BufferedReader, chunk_size: yield data yield data - def _list_stream_files( - self, - content: Iterator[bytes], - headers: dict[str, str], - media_type: Optional[str], - file_url_prefix: str, - ) -> list[StreamFileEntry]: - """List files contained in a streamed response.""" - normalized_media_type = (media_type or headers.get("content-type", "")).lower() - files: list[StreamFileEntry] = [] - - if "application/zip" in normalized_media_type: - temp_path: Optional[str] = None - try: - with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as temp_file: - temp_path = temp_file.name - for chunk in content: - if chunk: - temp_file.write(chunk) - - with ZipFile(temp_path) as zip_file: - for info in zip_file.infolist(): - quoted_path = quote(info.filename.lstrip("/"), safe="/") - files.append( - StreamFileEntry( - path=info.filename, - size=info.file_size, - url=f"{file_url_prefix}/{quoted_path}", - ) - ) - return files - except BadZipFile: - logger.warning("Could not inspect ZIP stream, falling back to headers") - finally: - if temp_path and os.path.exists(temp_path): - os.remove(temp_path) - - def get_data_with_file( self, federation_backend: str, @@ -378,20 +355,43 @@ def get_data( zarr_asset_name = next( (name for name in product.assets if name.endswith(".zarr")), None ) - if zarr_asset_name == asset_name and "DT_CLIMATE_ADAPTATION" in collection_id: + if zarr_asset_name: asset_values = product.assets[zarr_asset_name] + base_url = asset_values["href"] - target_url = f"{base_url.rstrip('/')}/{file_path.lstrip('/')}" + if file_path == "index": + logger.debug(f"Listing zarr files for: {base_url}, auth available: {auth is not None}") + return self._handle_zarr(product, base_url, federation_backend, collection_id, item_id, file_path, asset_name, auth) - r = requests.get(target_url, auth=auth, stream=True) + if asset_name == "zarr" and file_path != "index": + # request data/{backend}/{collection}/{item}/zarr/{file_path} to stream a specific file in the zarr store + base_url = base_url + "/" + file_path.lstrip("/") + + r = requests.get( + base_url, + auth=auth, + stream=True + ) + data = r.json() + return JSONResponse( + content=data) + + if zarr_asset_name == asset_name: + target_url = f"{base_url.rstrip('/')}/{file_path.lstrip('/')}" + + r = requests.get( + target_url, + auth=auth, + stream=True + ) - return StreamingResponse( - r.iter_content(chunk_size=1024*1024), - status_code=r.status_code, - media_type=r.headers.get("Content-Type", "application/octet-stream"), - headers={k: v for k, v in r.headers.items() if k.lower() not in ["content-encoding", "transfer-encoding"]} - ) - + return StreamingResponse( + r.iter_content(chunk_size=1024*1024), + status_code=r.status_code, + media_type=r.headers.get("Content-Type", "application/octet-stream"), + headers={k: v for k, v in r.headers.items() if k.lower() not in ["content-encoding", "transfer-encoding"]} + ) + presigned_response = self._try_presign_asset(product, asset_name, auth) if presigned_response: return presigned_response From 22431f42edeb09685d8db60bbf2ad76fcedbdf83 Mon Sep 17 00:00:00 2001 From: cauriol Date: Tue, 24 Mar 2026 09:46:27 +0100 Subject: [PATCH 14/26] fix: remove downloadlink for zarr asset --- stac_fastapi/eodag/models/item.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stac_fastapi/eodag/models/item.py b/stac_fastapi/eodag/models/item.py index c43389a0..1bef7ec9 100644 --- a/stac_fastapi/eodag/models/item.py +++ b/stac_fastapi/eodag/models/item.py @@ -126,7 +126,7 @@ def create_stac_item( feature["assets"][k]["alternate"] = {"origin": origin} # TODO: remove downloadLink asset after EODAG assets rework - if download_link := product.properties.get("eodag:download_link"): + if download_link := product.properties.get("eodag:download_link") and not any("zarr" in key for key in product.assets): origin_href = download_link if asset_proxy_url: download_link = asset_proxy_url + "/downloadLink" From 4e8c91446ad1b4293d96b189b4856fe68538ccfa Mon Sep 17 00:00:00 2001 From: cauriol Date: Tue, 24 Mar 2026 10:29:06 +0100 Subject: [PATCH 15/26] fix: clean code --- .../eodag/extensions/data_download.py | 152 ++++++++---------- stac_fastapi/eodag/models/item.py | 4 +- 2 files changed, 67 insertions(+), 89 deletions(-) diff --git a/stac_fastapi/eodag/extensions/data_download.py b/stac_fastapi/eodag/extensions/data_download.py index acfa6c97..a742d605 100644 --- a/stac_fastapi/eodag/extensions/data_download.py +++ b/stac_fastapi/eodag/extensions/data_download.py @@ -18,28 +18,21 @@ """Data-download extension.""" import glob -from itertools import product import json import logging -import mimetypes import os -import re -import tempfile from io import BufferedReader from shutil import make_archive, rmtree from typing import Annotated, Iterator, Optional, TypedDict, Union, cast from urllib.parse import quote -from zipfile import BadZipFile, ZipFile import attr -import zarr +import requests from eodag.api.core import EODataAccessGateway from eodag.api.product._product import EOProduct from eodag.api.product.metadata_mapping import ONLINE_STATUS, STAGING_STATUS, get_metadata_path_value from eodag.utils.exceptions import EodagError -from eodag.utils import StreamResponse from fastapi import APIRouter, FastAPI, Path, Request -import requests from fastapi.responses import JSONResponse, RedirectResponse, StreamingResponse from stac_fastapi.api.errors import NotFoundError from stac_fastapi.api.routes import create_async_endpoint @@ -99,35 +92,56 @@ def _try_presign_asset( def _handle_zarr( self, product: EOProduct, - base_url: str, + zarr_asset_name: str, federation_backend: str, collection_id: str, item_id: str, file_path: Optional[str], asset_name: Optional[str], auth: Optional[dict] = None, - ) -> Union[StreamingResponse, RedirectResponse, JSONResponse]: - """Handle Zarr store listing or file streaming.""" - # List all files in the zarr store - try: - stream_files = self._list_zarr_files_from_metadata( - base_url, auth, federation_backend, collection_id, item_id, asset_name - ) - - return JSONResponse( - content={ - "type": "stream-file-index", - "item_id": item_id, - "collection_id": collection_id, - "backend": federation_backend, - "file_count": len(stream_files), - "files": stream_files, - } + ): + asset_values = product.assets[zarr_asset_name] + base_url = asset_values["href"] + if file_path == "index": + try: + stream_files = self._list_zarr_files_from_metadata( + base_url, auth, federation_backend, collection_id, item_id, asset_name + ) + + return JSONResponse( + content={ + "type": "stream-file-index", + "item_id": item_id, + "collection_id": collection_id, + "backend": federation_backend, + "file_count": len(stream_files), + "files": stream_files, + } + ) + except Exception as e: + logger.error(f"Failed to list zarr files: {e}") + raise NotFoundError(f"Failed to list zarr store files: {e}") from e + if asset_name == "zarr" and file_path != "index": + # request data/{backend}/{collection}/{item}/zarr/{file_path} + # to stream a specific file in the zarr store + base_url = base_url + "/" + file_path.lstrip("/") + + r = requests.get(base_url, auth=auth, stream=True) + data = r.json() + return JSONResponse(content=data) + if zarr_asset_name == asset_name: + target_url = f"{base_url.rstrip('/')}/{file_path.lstrip('/')}" + + r = requests.get(target_url, auth=auth, stream=True) + + return StreamingResponse( + r.iter_content(chunk_size=1024 * 1024), + status_code=r.status_code, + media_type=r.headers.get("Content-Type", "application/octet-stream"), + headers={ + k: v for k, v in r.headers.items() if k.lower() not in ["content-encoding", "transfer-encoding"] + }, ) - except Exception as e: - logger.error(f"Failed to list zarr files: {e}") - raise NotFoundError(f"Failed to list zarr store files: {e}") from e - def _list_zarr_files_from_metadata( self, @@ -139,11 +153,12 @@ def _list_zarr_files_from_metadata( asset_name: str, ) -> list[StreamFileEntry]: """List all files from zarr store by parsing .zmetadata.""" - import fsspec import base64 - + + import fsspec + files: list[StreamFileEntry] = [] - + try: # Build headers for authentication if auth is provided headers = {} @@ -151,31 +166,27 @@ def _list_zarr_files_from_metadata( auth_str = f"anonymous:{auth.refresh_token}" headers["Authorization"] = "Basic " + base64.b64encode(auth_str.encode("utf-8")).decode("utf-8") # Get mapper with fsspec - mapper = fsspec.get_mapper( - base_url, - client_kwargs={ - "headers": headers, - "trust_env": False - } - ) - + mapper = fsspec.get_mapper(base_url, client_kwargs={"headers": headers, "trust_env": False}) + # Read .zmetadata if ".zmetadata" in mapper: meta = json.loads(mapper[".zmetadata"]) logger.debug(f"Found {len(meta['metadata'])} entries in .zmetadata") key = meta["metadata"].keys() - # Add .zmetadata file itself to the listing to allow clients to read it and get metadata for all files in the store + # Add .zmetadata file itself to the listing to allow clients to read it and + # get metadata for all files in the store quoted_path = quote(".zmetadata", safe="/") files.append( - StreamFileEntry( - path=quoted_path, - url=f"{federation_backend}/{collection_id}/{item_id}/{asset_name}/{quoted_path}", - ) + StreamFileEntry( + path=quoted_path, + url=f"{federation_backend}/{collection_id}/{item_id}/{asset_name}/{quoted_path}", ) + ) else: # try zarr v3 metadata file """TO DO .zmetadata is present for zarr v2, but not for v3, we should support both. - For Zarr v3, there is no .zmetata but zarr.json file instead, we can try to read it and parse the metadata from it to list files in the store. + For Zarr v3, there is no .zmetata but zarr.json file instead, + we can try to read it and parse the metadata from it to list files in the store. for exemple: elif "zarr.json" in mapper: meta = json.loads(mapper["zarr.json"]) @@ -183,7 +194,6 @@ def _list_zarr_files_from_metadata( key = meta["metadata"].keys() """ - # Iterate through all files in the metadata for file_path in key: try: @@ -196,10 +206,10 @@ def _list_zarr_files_from_metadata( ) except Exception as e: logger.debug(f"Could not get metadata for {file_path}: {e}") - + logger.debug(f"Listed {len(files)} zarr files") return files - + except Exception as e: logger.error(f"Failed to list zarr files from metadata: {e}") raise @@ -352,46 +362,12 @@ def get_data( raise NotFoundError(f"Item {item_id} does not exist. Please order it first") from e raise NotFoundError(e) from e - zarr_asset_name = next( - (name for name in product.assets if name.endswith(".zarr")), None + zarr_asset_name = next((name for name in product.assets if name.endswith(".zarr")), None) + if zarr_asset_name: + return self._handle_zarr( + product, zarr_asset_name, federation_backend, collection_id, item_id, file_path, asset_name, auth ) - if zarr_asset_name: - asset_values = product.assets[zarr_asset_name] - - base_url = asset_values["href"] - if file_path == "index": - logger.debug(f"Listing zarr files for: {base_url}, auth available: {auth is not None}") - return self._handle_zarr(product, base_url, federation_backend, collection_id, item_id, file_path, asset_name, auth) - - if asset_name == "zarr" and file_path != "index": - # request data/{backend}/{collection}/{item}/zarr/{file_path} to stream a specific file in the zarr store - base_url = base_url + "/" + file_path.lstrip("/") - - r = requests.get( - base_url, - auth=auth, - stream=True - ) - data = r.json() - return JSONResponse( - content=data) - - if zarr_asset_name == asset_name: - target_url = f"{base_url.rstrip('/')}/{file_path.lstrip('/')}" - - r = requests.get( - target_url, - auth=auth, - stream=True - ) - return StreamingResponse( - r.iter_content(chunk_size=1024*1024), - status_code=r.status_code, - media_type=r.headers.get("Content-Type", "application/octet-stream"), - headers={k: v for k, v in r.headers.items() if k.lower() not in ["content-encoding", "transfer-encoding"]} - ) - presigned_response = self._try_presign_asset(product, asset_name, auth) if presigned_response: return presigned_response diff --git a/stac_fastapi/eodag/models/item.py b/stac_fastapi/eodag/models/item.py index 1bef7ec9..d973e63a 100644 --- a/stac_fastapi/eodag/models/item.py +++ b/stac_fastapi/eodag/models/item.py @@ -126,7 +126,9 @@ def create_stac_item( feature["assets"][k]["alternate"] = {"origin": origin} # TODO: remove downloadLink asset after EODAG assets rework - if download_link := product.properties.get("eodag:download_link") and not any("zarr" in key for key in product.assets): + if download_link := product.properties.get("eodag:download_link") and not any( + "zarr" in key for key in product.assets + ): origin_href = download_link if asset_proxy_url: download_link = asset_proxy_url + "/downloadLink" From a60bb6b2acf99e768c5bac032afe179d989764a4 Mon Sep 17 00:00:00 2001 From: cauriol Date: Tue, 24 Mar 2026 10:59:08 +0100 Subject: [PATCH 16/26] fix: url in index --- stac_fastapi/eodag/extensions/data_download.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/stac_fastapi/eodag/extensions/data_download.py b/stac_fastapi/eodag/extensions/data_download.py index a742d605..b052cad2 100644 --- a/stac_fastapi/eodag/extensions/data_download.py +++ b/stac_fastapi/eodag/extensions/data_download.py @@ -93,6 +93,7 @@ def _handle_zarr( self, product: EOProduct, zarr_asset_name: str, + url: str, federation_backend: str, collection_id: str, item_id: str, @@ -105,7 +106,7 @@ def _handle_zarr( if file_path == "index": try: stream_files = self._list_zarr_files_from_metadata( - base_url, auth, federation_backend, collection_id, item_id, asset_name + base_url, url, auth, federation_backend, collection_id, item_id, asset_name ) return JSONResponse( @@ -146,6 +147,7 @@ def _handle_zarr( def _list_zarr_files_from_metadata( self, base_url: str, + url: str, auth: Optional[dict], federation_backend: str, collection_id: str, @@ -179,7 +181,7 @@ def _list_zarr_files_from_metadata( files.append( StreamFileEntry( path=quoted_path, - url=f"{federation_backend}/{collection_id}/{item_id}/{asset_name}/{quoted_path}", + url=f"{url}/{federation_backend}/{collection_id}/{item_id}/{asset_name}/{quoted_path}", ) ) else: # try zarr v3 metadata file @@ -201,7 +203,7 @@ def _list_zarr_files_from_metadata( files.append( StreamFileEntry( path=file_path, - url=f"{federation_backend}/{collection_id}/{item_id}/{asset_name}/{quoted_path}", + url=f"{url}/{federation_backend}/{collection_id}/{item_id}/{asset_name}/{quoted_path}", ) ) except Exception as e: @@ -364,8 +366,9 @@ def get_data( zarr_asset_name = next((name for name in product.assets if name.endswith(".zarr")), None) if zarr_asset_name: + url = request.base_url._url + "data" return self._handle_zarr( - product, zarr_asset_name, federation_backend, collection_id, item_id, file_path, asset_name, auth + product, zarr_asset_name, url, federation_backend, collection_id, item_id, file_path, asset_name, auth ) presigned_response = self._try_presign_asset(product, asset_name, auth) From 048f6e10a418f1cbc370d2ec0a5004afbac553f8 Mon Sep 17 00:00:00 2001 From: jlahovnik Date: Tue, 24 Mar 2026 12:08:05 +0100 Subject: [PATCH 17/26] fix: add misisng libraries to dependencies --- pyproject.toml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index cb82a9af..6466d34f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,6 +24,9 @@ dependencies = [ "starlette", "typing_extensions", "shapely", + "fsspec", + "aiohttp" + ] [project.urls] From 0808929e7caee481d6767046ffa5d44ac61117fb Mon Sep 17 00:00:00 2001 From: jlahovnik Date: Tue, 24 Mar 2026 12:21:06 +0100 Subject: [PATCH 18/26] fix: add missing brackets --- stac_fastapi/eodag/models/item.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stac_fastapi/eodag/models/item.py b/stac_fastapi/eodag/models/item.py index d973e63a..e005b26f 100644 --- a/stac_fastapi/eodag/models/item.py +++ b/stac_fastapi/eodag/models/item.py @@ -126,7 +126,7 @@ def create_stac_item( feature["assets"][k]["alternate"] = {"origin": origin} # TODO: remove downloadLink asset after EODAG assets rework - if download_link := product.properties.get("eodag:download_link") and not any( + if (download_link := product.properties.get("eodag:download_link")) and not any( "zarr" in key for key in product.assets ): origin_href = download_link From ea5eacbdb8b9e38c2bd754035e53d3b5654650aa Mon Sep 17 00:00:00 2001 From: cauriol Date: Tue, 24 Mar 2026 13:01:47 +0100 Subject: [PATCH 19/26] fix:add _get_auth_headers --- stac_fastapi/eodag/extensions/data_download.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/stac_fastapi/eodag/extensions/data_download.py b/stac_fastapi/eodag/extensions/data_download.py index b052cad2..4718e283 100644 --- a/stac_fastapi/eodag/extensions/data_download.py +++ b/stac_fastapi/eodag/extensions/data_download.py @@ -70,6 +70,14 @@ class StreamFileEntry(TypedDict): class BaseDataDownloadClient: """Defines a pattern for implementing the data download extension.""" + @staticmethod + def _get_auth_headers(auth: object) -> dict[str, str]: + """Return headers exposed by an auth object when available.""" + get_auth_headers = getattr(auth, "get_auth_headers", None) + if callable(get_auth_headers): + return cast(dict[str, str], get_auth_headers()) + return {} + def _try_presign_asset( self, product: EOProduct, @@ -155,18 +163,13 @@ def _list_zarr_files_from_metadata( asset_name: str, ) -> list[StreamFileEntry]: """List all files from zarr store by parsing .zmetadata.""" - import base64 - import fsspec files: list[StreamFileEntry] = [] try: # Build headers for authentication if auth is provided - headers = {} - # if auth and isinstance(auth, dict) and "refresh_token" in auth: - auth_str = f"anonymous:{auth.refresh_token}" - headers["Authorization"] = "Basic " + base64.b64encode(auth_str.encode("utf-8")).decode("utf-8") + headers = self._get_auth_headers(auth) if auth is not None else {} # Get mapper with fsspec mapper = fsspec.get_mapper(base_url, client_kwargs={"headers": headers, "trust_env": False}) From 7f3f0025f36c6ecb7387ecdeb2330a377316bd1c Mon Sep 17 00:00:00 2001 From: cauriol Date: Thu, 26 Mar 2026 09:14:04 +0100 Subject: [PATCH 20/26] feat: add tests --- pyproject.toml | 3 +- .../eodag/extensions/data_download.py | 10 +- tests/conftest.py | 33 ++++ tests/test_zarr.py | 166 +++++++++++++----- 4 files changed, 162 insertions(+), 50 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 6466d34f..fdcc3217 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,7 +25,8 @@ dependencies = [ "typing_extensions", "shapely", "fsspec", - "aiohttp" + "aiohttp", + "requests" ] diff --git a/stac_fastapi/eodag/extensions/data_download.py b/stac_fastapi/eodag/extensions/data_download.py index 4718e283..c8c57579 100644 --- a/stac_fastapi/eodag/extensions/data_download.py +++ b/stac_fastapi/eodag/extensions/data_download.py @@ -51,14 +51,6 @@ logger = logging.getLogger(__name__) -class ZarrFileEntry(TypedDict): - """Zarr file listing item.""" - - path: str - size: int - url: str - - class StreamFileEntry(TypedDict): """Stream file listing item.""" @@ -108,7 +100,7 @@ def _handle_zarr( file_path: Optional[str], asset_name: Optional[str], auth: Optional[dict] = None, - ): + ) -> Union[JSONResponse, StreamingResponse]: asset_values = product.assets[zarr_asset_name] base_url = asset_values["href"] if file_path == "index": diff --git a/tests/conftest.py b/tests/conftest.py index 788eafe6..3fc405e2 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -49,6 +49,7 @@ from stac_fastapi.eodag.app import api, stac_metadata_model from stac_fastapi.eodag.config import get_settings from stac_fastapi.eodag.dag import init_dag +from stac_fastapi.eodag.extensions.data_download import BaseDataDownloadClient from tests import TEST_RESOURCES_PATH @@ -526,6 +527,38 @@ def mock_http_base_stream_download(mocker): return mocker.patch.object(HTTPDownload, "stream_download") +@pytest.fixture(scope="function") +def mock_base_data_download_get_data(mocker): + """ + Mocks the `get_data` method of the data download client. + """ + return mocker.patch.object(BaseDataDownloadClient, "get_data") + + +@pytest.fixture(scope="function") +def mock_list_zarr_files_from_metadata(mocker): + """ + Mocks the `_list_zarr_files_from_metadata` method of the data download client. + """ + return mocker.patch.object(BaseDataDownloadClient, "_list_zarr_files_from_metadata") + + +@pytest.fixture(scope="function") +def mock_data_download_requests_get(mocker): + """ + Mocks the `requests.get` call used by the data download extension. + """ + return mocker.patch("stac_fastapi.eodag.extensions.data_download.requests.get") + + +@pytest.fixture(scope="function") +def mock_item_get_settings(mocker): + """ + Mocks the `get_settings` function used by STAC item creation. + """ + return mocker.patch("stac_fastapi.eodag.models.item.get_settings") + + @pytest.fixture(scope="function") def mock_order(mocker): """ diff --git a/tests/test_zarr.py b/tests/test_zarr.py index dc4b7912..547dbea2 100644 --- a/tests/test_zarr.py +++ b/tests/test_zarr.py @@ -17,24 +17,74 @@ # limitations under the License. """Zarr tests.""" +import json +from unittest import mock + from eodag import SearchResult from eodag.api.product import EOProduct from eodag.config import PluginConfig from eodag.plugins.download.http import HTTPDownload +from stac_fastapi.eodag.app import stac_metadata_model +from stac_fastapi.eodag.extensions.data_download import BaseDataDownloadClient +from stac_fastapi.eodag.models.item import create_stac_item + + +def test_get_data_with_file_delegates_to_get_data(mock_base_data_download_get_data): + """get_data_with_file must delegate to get_data with the provided file path.""" + client = BaseDataDownloadClient() + request = mock.Mock() + expected_response = object() + mock_base_data_download_get_data.return_value = expected_response + + response = client.get_data_with_file( + "desp_cache", + "collection", + "item", + "example.zarr", + request, + "group/foo.txt", + ) + + assert response is expected_response + mock_base_data_download_get_data.assert_called_once_with( + "desp_cache", + "collection", + "item", + "example.zarr", + request, + "group/foo.txt", + ) + -async def test_items_response_includes_zarr_index_asset(request_valid, defaults, mock_search, mock_search_result): - """Items response should include a Zarr index asset when zarr is present.""" +def test_items_response_includes_zarr_index_asset(defaults, mock_search_result, mock_item_get_settings): + """create_stac_item should include a Zarr index asset when a Zarr asset is present.""" search_result = mock_search_result product = search_result[0] - product.assets.update({"zarr": {"href": "https://data/internal_fdp/example.zarr"}}) - - mock_search.return_value = search_result - response = await request_valid(f"search?collections={defaults.collection}", search_result=search_result) + product.assets.update({"example.zarr": {"href": "https://data/peps/example.zarr"}}) + request = mock.Mock() + request.app.state.dag.collections_config = {} + mock_item_get_settings.return_value = mock.Mock( + download_base_url="http://testserver/", + auto_order_whitelist=[], + keep_origin_url=False, + origin_url_blacklist=[], + ) + response = create_stac_item( + product, + stac_metadata_model, + lambda extension_name: extension_name == "DataDownload", + request, + extension_names=[], + ) - item = response["features"][0] - assert "zarr" in item["assets"] - assert item["assets"]["zarr"]["href"] == f"http://testserver/data/peps/{item['collection']}/{item['id']}/zarr" + item = response + assert "example.zarr" in item["assets"] + assert "download_link" not in item["assets"] + assert ( + item["assets"]["example.zarr"]["href"] + == f"http://testserver/data/peps/{item['collection']}/{item['id']}/example.zarr" + ) assert "Zarr index" in item["assets"] assert ( item["assets"]["Zarr index"]["href"] @@ -42,10 +92,14 @@ async def test_items_response_includes_zarr_index_asset(request_valid, defaults, ) -async def test_zarr_index_listing(request_valid_raw, defaults, mock_search, mock_download, tmp_dir): - """Zarr index should list all files in the store.""" +async def test_zarr_index_listing( + defaults, + mock_list_zarr_files_from_metadata, +): + """get_data should return the streamed file index for a .zarr asset.""" collection = defaults.collection item_id = "dummy_id" + client = BaseDataDownloadClient() product = EOProduct( "peps", dict( @@ -55,39 +109,58 @@ async def test_zarr_index_listing(request_valid_raw, defaults, mock_search, mock ), collection=collection, ) - product.assets.update({"zarr": {"href": "https://data/internal_fdp/example.zarr"}}) + product.assets.update({"example.zarr": {"href": "https://data/peps/example.zarr"}}) config = PluginConfig() config.priority = 0 downloader = HTTPDownload("peps", config) product.register_downloader(downloader=downloader, authenticator=None) - store = tmp_dir / "store.zarr" - (store / "group").mkdir(parents=True) - (store / "group" / "foo.txt").write_text("hello", encoding="utf-8") - (store / "root.txt").write_text("root", encoding="utf-8") - - mock_search.return_value = SearchResult([product]) - mock_download.return_value = str(store) - - response = await request_valid_raw( - f"data/peps/{collection}/{item_id}/zarr/index", - search_result=SearchResult([product]), + dag = mock.Mock() + dag.search.return_value = SearchResult([product]) + request = mock.Mock() + request.app.state.dag = dag + request.base_url._url = "http://testserver/" + mock_list_zarr_files_from_metadata.return_value = [ + {"path": ".zmetadata", "size": None, "url": f"/data/peps/{collection}/{item_id}/example.zarr/.zmetadata"}, + {"path": "group/foo.txt", "size": None, "url": f"/data/peps/{collection}/{item_id}/example.zarr/group/foo.txt"}, + ] + + response = client.get_data( + "peps", + collection, + item_id, + "example.zarr", + request, + "index", ) - res = response.json() + res = json.loads(response.body) - assert res["type"] == "zarr-file-index" + assert res["type"] == "stream-file-index" assert res["item_id"] == item_id assert res["collection_id"] == collection assert res["backend"] == "peps" assert res["file_count"] == 2 - assert [f["path"] for f in res["files"]] == ["group/foo.txt", "root.txt"] - assert res["files"][0]["url"] == f"/data/peps/{collection}/{item_id}/zarr/group/foo.txt" + assert [f["path"] for f in res["files"]] == [".zmetadata", "group/foo.txt"] + assert res["files"][1]["url"] == f"/data/peps/{collection}/{item_id}/example.zarr/group/foo.txt" + mock_list_zarr_files_from_metadata.assert_called_once_with( + "https://data/peps/example.zarr", + "http://testserver/data", + None, + "peps", + collection, + item_id, + "example.zarr", + ) -async def test_zarr_file_download(request_valid_raw, defaults, mock_search, mock_download, tmp_dir): - """Zarr file should be retrievable by path.""" +async def test_zarr_file_display( + defaults, + mock_data_download_requests_get, +): + """get_data_with_file should request streaming for a file inside a .zarr asset.""" collection = defaults.collection item_id = "dummy_id" + client = BaseDataDownloadClient() product = EOProduct( "peps", dict( @@ -97,23 +170,36 @@ async def test_zarr_file_download(request_valid_raw, defaults, mock_search, mock ), collection=collection, ) - product.assets.update({"zarr": {"href": "https://data/internal_fdp/example.zarr"}}) + product.assets.update({"example.zarr": {"href": "https://data/peps/example.zarr"}}) config = PluginConfig() config.priority = 0 downloader = HTTPDownload("peps", config) product.register_downloader(downloader=downloader, authenticator=None) - store = tmp_dir / "store.zarr" - (store / "group").mkdir(parents=True) - (store / "group" / "foo.txt").write_text("hello", encoding="utf-8") - - mock_search.return_value = SearchResult([product]) - mock_download.return_value = str(store) + dag = mock.Mock() + dag.search.return_value = SearchResult([product]) + request = mock.Mock() + request.app.state.dag = dag + request.base_url._url = "http://testserver/" + mock_data_download_requests_get.return_value = mock.Mock( + status_code=200, + headers={"Content-Type": "text/plain"}, + ) + mock_data_download_requests_get.return_value.iter_content.return_value = iter([b"hello"]) - response = await request_valid_raw( - f"data/peps/{collection}/{item_id}/zarr/group/foo.txt", - search_result=SearchResult([product]), + response = client.get_data_with_file( + "peps", + collection, + item_id, + "example.zarr", + request, + "group/foo.txt", ) - assert response.content == b"hello" + assert response.status_code == 200 assert response.headers["content-type"].startswith("text/plain") + mock_data_download_requests_get.assert_called_once_with( + "https://data/peps/example.zarr/group/foo.txt", + auth=None, + stream=True, + ) From 4909f87c122567e65cd3886f4b02264d7d461d8a Mon Sep 17 00:00:00 2001 From: cauriol Date: Wed, 1 Apr 2026 11:58:26 +0200 Subject: [PATCH 21/26] fix: zarr handled by EODAG --- pyproject.toml | 3 - .../eodag/extensions/data_download.py | 119 ++++-------------- tests/conftest.py | 8 +- tests/test_zarr.py | 5 - 4 files changed, 25 insertions(+), 110 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index fdcc3217..417d68cf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,9 +24,6 @@ dependencies = [ "starlette", "typing_extensions", "shapely", - "fsspec", - "aiohttp", - "requests" ] diff --git a/stac_fastapi/eodag/extensions/data_download.py b/stac_fastapi/eodag/extensions/data_download.py index c8c57579..950c5770 100644 --- a/stac_fastapi/eodag/extensions/data_download.py +++ b/stac_fastapi/eodag/extensions/data_download.py @@ -18,16 +18,14 @@ """Data-download extension.""" import glob -import json import logging import os from io import BufferedReader from shutil import make_archive, rmtree -from typing import Annotated, Iterator, Optional, TypedDict, Union, cast +from typing import Annotated, Iterator, Optional, Union, cast from urllib.parse import quote import attr -import requests from eodag.api.core import EODataAccessGateway from eodag.api.product._product import EOProduct from eodag.api.product.metadata_mapping import ONLINE_STATUS, STAGING_STATUS, get_metadata_path_value @@ -51,25 +49,9 @@ logger = logging.getLogger(__name__) -class StreamFileEntry(TypedDict): - """Stream file listing item.""" - - path: str - size: Optional[int] - url: str - - class BaseDataDownloadClient: """Defines a pattern for implementing the data download extension.""" - @staticmethod - def _get_auth_headers(auth: object) -> dict[str, str]: - """Return headers exposed by an auth object when available.""" - get_auth_headers = getattr(auth, "get_auth_headers", None) - if callable(get_auth_headers): - return cast(dict[str, str], get_auth_headers()) - return {} - def _try_presign_asset( self, product: EOProduct, @@ -105,9 +87,17 @@ def _handle_zarr( base_url = asset_values["href"] if file_path == "index": try: - stream_files = self._list_zarr_files_from_metadata( - base_url, url, auth, federation_backend, collection_id, item_id, asset_name - ) + paths = product.list_zarr_files_from_metadata(base_url, auth) + + files = [ + { + "path": path, + "url": ( + f"{url}/{federation_backend}/{collection_id}/{item_id}/{asset_name}/{quote(path, safe='/')}" + ), + } + for path in paths + ] return JSONResponse( content={ @@ -115,8 +105,8 @@ def _handle_zarr( "item_id": item_id, "collection_id": collection_id, "backend": federation_backend, - "file_count": len(stream_files), - "files": stream_files, + "file_count": len(files), + "files": files, } ) except Exception as e: @@ -127,13 +117,13 @@ def _handle_zarr( # to stream a specific file in the zarr store base_url = base_url + "/" + file_path.lstrip("/") - r = requests.get(base_url, auth=auth, stream=True) + r = product.request_asset(url=base_url, auth=auth) data = r.json() return JSONResponse(content=data) if zarr_asset_name == asset_name: target_url = f"{base_url.rstrip('/')}/{file_path.lstrip('/')}" - r = requests.get(target_url, auth=auth, stream=True) + r = product.request_asset(url=target_url, auth=auth) return StreamingResponse( r.iter_content(chunk_size=1024 * 1024), @@ -144,73 +134,6 @@ def _handle_zarr( }, ) - def _list_zarr_files_from_metadata( - self, - base_url: str, - url: str, - auth: Optional[dict], - federation_backend: str, - collection_id: str, - item_id: str, - asset_name: str, - ) -> list[StreamFileEntry]: - """List all files from zarr store by parsing .zmetadata.""" - import fsspec - - files: list[StreamFileEntry] = [] - - try: - # Build headers for authentication if auth is provided - headers = self._get_auth_headers(auth) if auth is not None else {} - # Get mapper with fsspec - mapper = fsspec.get_mapper(base_url, client_kwargs={"headers": headers, "trust_env": False}) - - # Read .zmetadata - if ".zmetadata" in mapper: - meta = json.loads(mapper[".zmetadata"]) - logger.debug(f"Found {len(meta['metadata'])} entries in .zmetadata") - key = meta["metadata"].keys() - # Add .zmetadata file itself to the listing to allow clients to read it and - # get metadata for all files in the store - quoted_path = quote(".zmetadata", safe="/") - files.append( - StreamFileEntry( - path=quoted_path, - url=f"{url}/{federation_backend}/{collection_id}/{item_id}/{asset_name}/{quoted_path}", - ) - ) - else: # try zarr v3 metadata file - """TO DO - .zmetadata is present for zarr v2, but not for v3, we should support both. - For Zarr v3, there is no .zmetata but zarr.json file instead, - we can try to read it and parse the metadata from it to list files in the store. - for exemple: - elif "zarr.json" in mapper: - meta = json.loads(mapper["zarr.json"]) - logger.debug(f"Found {len(meta['metadata'])} entries in zarr.json") - key = meta["metadata"].keys() - """ - - # Iterate through all files in the metadata - for file_path in key: - try: - quoted_path = quote(file_path, safe="/") - files.append( - StreamFileEntry( - path=file_path, - url=f"{url}/{federation_backend}/{collection_id}/{item_id}/{asset_name}/{quoted_path}", - ) - ) - except Exception as e: - logger.debug(f"Could not get metadata for {file_path}: {e}") - - logger.debug(f"Listed {len(files)} zarr files") - return files - - except Exception as e: - logger.error(f"Failed to list zarr files from metadata: {e}") - raise - def _file_to_stream( self, file_path: str, @@ -314,7 +237,11 @@ def get_data( raise NotFoundError(f"Item {item_id} does not exist. Please order it first") from e raise NotFoundError(e) from e - auth = product.downloader_auth.authenticate() if product.downloader_auth else None + try: + auth = product.downloader_auth.authenticate() if product.downloader_auth else None + except Exception as e: + logger.error(f"Authentication failed: {e}") + raise NotAvailableError("Token not ready, authentication failed. Please try again later.") from e if product.downloader is None: logger.error("No downloader available for %s", product) @@ -370,10 +297,6 @@ def get_data( if presigned_response: return presigned_response - # Handle zarr store - return file listing or individual file - if asset_name == "zarr": - return self._handle_zarr(dag, product, federation_backend, collection_id, item_id, file_path) - try: s = product.downloader.stream_download( product, diff --git a/tests/conftest.py b/tests/conftest.py index 3fc405e2..2068f2f8 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -538,17 +538,17 @@ def mock_base_data_download_get_data(mocker): @pytest.fixture(scope="function") def mock_list_zarr_files_from_metadata(mocker): """ - Mocks the `_list_zarr_files_from_metadata` method of the data download client. + Mocks the `list_zarr_files_from_metadata` method of EOProduct. """ - return mocker.patch.object(BaseDataDownloadClient, "_list_zarr_files_from_metadata") + return mocker.patch("eodag.api.product._product.EOProduct.list_zarr_files_from_metadata") @pytest.fixture(scope="function") def mock_data_download_requests_get(mocker): """ - Mocks the `requests.get` call used by the data download extension. + Mocks the `requests.get` call used by EOProduct.request_asset. """ - return mocker.patch("stac_fastapi.eodag.extensions.data_download.requests.get") + return mocker.patch("eodag.api.product._product.requests.get") @pytest.fixture(scope="function") diff --git a/tests/test_zarr.py b/tests/test_zarr.py index 547dbea2..6246540d 100644 --- a/tests/test_zarr.py +++ b/tests/test_zarr.py @@ -144,12 +144,7 @@ async def test_zarr_index_listing( assert res["files"][1]["url"] == f"/data/peps/{collection}/{item_id}/example.zarr/group/foo.txt" mock_list_zarr_files_from_metadata.assert_called_once_with( "https://data/peps/example.zarr", - "http://testserver/data", None, - "peps", - collection, - item_id, - "example.zarr", ) From 90484340e008bd232c49ec47d9f029aa39474dae Mon Sep 17 00:00:00 2001 From: cauriol Date: Thu, 2 Apr 2026 12:01:51 +0200 Subject: [PATCH 22/26] fix: remove peps --- tests/test_zarr.py | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/tests/test_zarr.py b/tests/test_zarr.py index 6246540d..5d049bb1 100644 --- a/tests/test_zarr.py +++ b/tests/test_zarr.py @@ -61,7 +61,7 @@ def test_items_response_includes_zarr_index_asset(defaults, mock_search_result, """create_stac_item should include a Zarr index asset when a Zarr asset is present.""" search_result = mock_search_result product = search_result[0] - product.assets.update({"example.zarr": {"href": "https://data/peps/example.zarr"}}) + product.assets.update({"example.zarr": {"href": "https://data/cop_dataspace/example.zarr"}}) request = mock.Mock() request.app.state.dag.collections_config = {} mock_item_get_settings.return_value = mock.Mock( @@ -83,12 +83,12 @@ def test_items_response_includes_zarr_index_asset(defaults, mock_search_result, assert "download_link" not in item["assets"] assert ( item["assets"]["example.zarr"]["href"] - == f"http://testserver/data/peps/{item['collection']}/{item['id']}/example.zarr" + == f"http://testserver/data/cop_dataspace/{item['collection']}/{item['id']}/example.zarr" ) assert "Zarr index" in item["assets"] assert ( item["assets"]["Zarr index"]["href"] - == f"http://testserver/data/peps/{item['collection']}/{item['id']}/zarr/index" + == f"http://testserver/data/cop_dataspace/{item['collection']}/{item['id']}/zarr/index" ) @@ -101,7 +101,7 @@ async def test_zarr_index_listing( item_id = "dummy_id" client = BaseDataDownloadClient() product = EOProduct( - "peps", + "cop_dataspace", dict( geometry="POINT (0 0)", title="dummy_product", @@ -109,10 +109,10 @@ async def test_zarr_index_listing( ), collection=collection, ) - product.assets.update({"example.zarr": {"href": "https://data/peps/example.zarr"}}) + product.assets.update({"example.zarr": {"href": "https://data/cop_dataspace/example.zarr"}}) config = PluginConfig() config.priority = 0 - downloader = HTTPDownload("peps", config) + downloader = HTTPDownload("cop_dataspace", config) product.register_downloader(downloader=downloader, authenticator=None) dag = mock.Mock() @@ -121,12 +121,12 @@ async def test_zarr_index_listing( request.app.state.dag = dag request.base_url._url = "http://testserver/" mock_list_zarr_files_from_metadata.return_value = [ - {"path": ".zmetadata", "size": None, "url": f"/data/peps/{collection}/{item_id}/example.zarr/.zmetadata"}, - {"path": "group/foo.txt", "size": None, "url": f"/data/peps/{collection}/{item_id}/example.zarr/group/foo.txt"}, + {"path": ".zmetadata", "size": None, "url": f"/data/cop_dataspace/{collection}/{item_id}/example.zarr/.zmetadata"}, + {"path": "group/foo.txt", "size": None, "url": f"/data/cop_dataspace/{collection}/{item_id}/example.zarr/group/foo.txt"}, ] response = client.get_data( - "peps", + "cop_dataspace", collection, item_id, "example.zarr", @@ -138,12 +138,12 @@ async def test_zarr_index_listing( assert res["type"] == "stream-file-index" assert res["item_id"] == item_id assert res["collection_id"] == collection - assert res["backend"] == "peps" + assert res["backend"] == "cop_dataspace" assert res["file_count"] == 2 assert [f["path"] for f in res["files"]] == [".zmetadata", "group/foo.txt"] - assert res["files"][1]["url"] == f"/data/peps/{collection}/{item_id}/example.zarr/group/foo.txt" + assert res["files"][1]["url"] == f"/data/cop_dataspace/{collection}/{item_id}/example.zarr/group/foo.txt" mock_list_zarr_files_from_metadata.assert_called_once_with( - "https://data/peps/example.zarr", + "https://data/cop_dataspace/example.zarr", None, ) @@ -157,7 +157,7 @@ async def test_zarr_file_display( item_id = "dummy_id" client = BaseDataDownloadClient() product = EOProduct( - "peps", + "cop_dataspace", dict( geometry="POINT (0 0)", title="dummy_product", @@ -165,10 +165,10 @@ async def test_zarr_file_display( ), collection=collection, ) - product.assets.update({"example.zarr": {"href": "https://data/peps/example.zarr"}}) + product.assets.update({"example.zarr": {"href": "https://data/cop_dataspace/example.zarr"}}) config = PluginConfig() config.priority = 0 - downloader = HTTPDownload("peps", config) + downloader = HTTPDownload("cop_dataspace", config) product.register_downloader(downloader=downloader, authenticator=None) dag = mock.Mock() @@ -183,7 +183,7 @@ async def test_zarr_file_display( mock_data_download_requests_get.return_value.iter_content.return_value = iter([b"hello"]) response = client.get_data_with_file( - "peps", + "cop_dataspace", collection, item_id, "example.zarr", @@ -194,7 +194,7 @@ async def test_zarr_file_display( assert response.status_code == 200 assert response.headers["content-type"].startswith("text/plain") mock_data_download_requests_get.assert_called_once_with( - "https://data/peps/example.zarr/group/foo.txt", + "https://data/cop_dataspace/example.zarr/group/foo.txt", auth=None, stream=True, ) From 6107b637e35f923ff5275b98711d37d254513888 Mon Sep 17 00:00:00 2001 From: cauriol Date: Thu, 2 Apr 2026 15:24:33 +0200 Subject: [PATCH 23/26] fix: tests zarr --- stac_fastapi/eodag/extensions/data_download.py | 6 +++--- tests/test_zarr.py | 14 +++++++------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/stac_fastapi/eodag/extensions/data_download.py b/stac_fastapi/eodag/extensions/data_download.py index 950c5770..f1ccb952 100644 --- a/stac_fastapi/eodag/extensions/data_download.py +++ b/stac_fastapi/eodag/extensions/data_download.py @@ -87,7 +87,7 @@ def _handle_zarr( base_url = asset_values["href"] if file_path == "index": try: - paths = product.list_zarr_files_from_metadata(base_url, auth) + paths = product.list_zarr_files_from_metadata(base_url) files = [ { @@ -117,13 +117,13 @@ def _handle_zarr( # to stream a specific file in the zarr store base_url = base_url + "/" + file_path.lstrip("/") - r = product.request_asset(url=base_url, auth=auth) + r = product.request_asset(url=base_url) data = r.json() return JSONResponse(content=data) if zarr_asset_name == asset_name: target_url = f"{base_url.rstrip('/')}/{file_path.lstrip('/')}" - r = product.request_asset(url=target_url, auth=auth) + r = product.request_asset(url=target_url) return StreamingResponse( r.iter_content(chunk_size=1024 * 1024), diff --git a/tests/test_zarr.py b/tests/test_zarr.py index 5d049bb1..61c3d480 100644 --- a/tests/test_zarr.py +++ b/tests/test_zarr.py @@ -121,8 +121,8 @@ async def test_zarr_index_listing( request.app.state.dag = dag request.base_url._url = "http://testserver/" mock_list_zarr_files_from_metadata.return_value = [ - {"path": ".zmetadata", "size": None, "url": f"/data/cop_dataspace/{collection}/{item_id}/example.zarr/.zmetadata"}, - {"path": "group/foo.txt", "size": None, "url": f"/data/cop_dataspace/{collection}/{item_id}/example.zarr/group/foo.txt"}, + ".zmetadata", + "group/foo.txt", ] response = client.get_data( @@ -141,11 +141,11 @@ async def test_zarr_index_listing( assert res["backend"] == "cop_dataspace" assert res["file_count"] == 2 assert [f["path"] for f in res["files"]] == [".zmetadata", "group/foo.txt"] - assert res["files"][1]["url"] == f"/data/cop_dataspace/{collection}/{item_id}/example.zarr/group/foo.txt" - mock_list_zarr_files_from_metadata.assert_called_once_with( - "https://data/cop_dataspace/example.zarr", - None, + assert ( + res["files"][1]["url"] + == f"http://testserver/data/cop_dataspace/{collection}/{item_id}/example.zarr/group/foo.txt" ) + mock_list_zarr_files_from_metadata.assert_called_once_with("https://data/cop_dataspace/example.zarr") async def test_zarr_file_display( @@ -195,6 +195,6 @@ async def test_zarr_file_display( assert response.headers["content-type"].startswith("text/plain") mock_data_download_requests_get.assert_called_once_with( "https://data/cop_dataspace/example.zarr/group/foo.txt", - auth=None, + headers={}, stream=True, ) From 35100895497f7df3b02cdda0bda9514e2344cfdf Mon Sep 17 00:00:00 2001 From: cauriol Date: Fri, 10 Apr 2026 14:47:39 +0200 Subject: [PATCH 24/26] fix: zarr --- .../eodag/extensions/data_download.py | 82 ++++--------------- stac_fastapi/eodag/models/item.py | 10 +-- tests/conftest.py | 8 -- tests/test_zarr.py | 66 +-------------- 4 files changed, 17 insertions(+), 149 deletions(-) diff --git a/stac_fastapi/eodag/extensions/data_download.py b/stac_fastapi/eodag/extensions/data_download.py index f1ccb952..e613f26c 100644 --- a/stac_fastapi/eodag/extensions/data_download.py +++ b/stac_fastapi/eodag/extensions/data_download.py @@ -23,7 +23,6 @@ from io import BufferedReader from shutil import make_archive, rmtree from typing import Annotated, Iterator, Optional, Union, cast -from urllib.parse import quote import attr from eodag.api.core import EODataAccessGateway @@ -71,69 +70,6 @@ def _try_presign_asset( logger.info("Presigned url could not be fetched for %s", asset_name) return None - def _handle_zarr( - self, - product: EOProduct, - zarr_asset_name: str, - url: str, - federation_backend: str, - collection_id: str, - item_id: str, - file_path: Optional[str], - asset_name: Optional[str], - auth: Optional[dict] = None, - ) -> Union[JSONResponse, StreamingResponse]: - asset_values = product.assets[zarr_asset_name] - base_url = asset_values["href"] - if file_path == "index": - try: - paths = product.list_zarr_files_from_metadata(base_url) - - files = [ - { - "path": path, - "url": ( - f"{url}/{federation_backend}/{collection_id}/{item_id}/{asset_name}/{quote(path, safe='/')}" - ), - } - for path in paths - ] - - return JSONResponse( - content={ - "type": "stream-file-index", - "item_id": item_id, - "collection_id": collection_id, - "backend": federation_backend, - "file_count": len(files), - "files": files, - } - ) - except Exception as e: - logger.error(f"Failed to list zarr files: {e}") - raise NotFoundError(f"Failed to list zarr store files: {e}") from e - if asset_name == "zarr" and file_path != "index": - # request data/{backend}/{collection}/{item}/zarr/{file_path} - # to stream a specific file in the zarr store - base_url = base_url + "/" + file_path.lstrip("/") - - r = product.request_asset(url=base_url) - data = r.json() - return JSONResponse(content=data) - if zarr_asset_name == asset_name: - target_url = f"{base_url.rstrip('/')}/{file_path.lstrip('/')}" - - r = product.request_asset(url=target_url) - - return StreamingResponse( - r.iter_content(chunk_size=1024 * 1024), - status_code=r.status_code, - media_type=r.headers.get("Content-Type", "application/octet-stream"), - headers={ - k: v for k, v in r.headers.items() if k.lower() not in ["content-encoding", "transfer-encoding"] - }, - ) - def _file_to_stream( self, file_path: str, @@ -286,11 +222,21 @@ def get_data( raise NotFoundError(f"Item {item_id} does not exist. Please order it first") from e raise NotFoundError(e) from e - zarr_asset_name = next((name for name in product.assets if name.endswith(".zarr")), None) + zarr_asset_name = next((name for name in product.assets if (name.endswith("zarr") and asset_name!="downloadLink")), None) if zarr_asset_name: - url = request.base_url._url + "data" - return self._handle_zarr( - product, zarr_asset_name, url, federation_backend, collection_id, item_id, file_path, asset_name, auth + asset_values = product.assets[zarr_asset_name] + base_url = asset_values["href"] + target_url = f"{base_url.rstrip('/')}/{file_path.lstrip('/')}" + + r = product.request_asset(url=target_url) + + return StreamingResponse( + r.iter_content(chunk_size=1024 * 1024), + status_code=r.status_code, + media_type=r.headers.get("Content-Type", "application/octet-stream"), + headers={ + k: v for k, v in r.headers.items() if k.lower() not in ["content-encoding", "transfer-encoding"] + }, ) presigned_response = self._try_presign_asset(product, asset_name, auth) diff --git a/stac_fastapi/eodag/models/item.py b/stac_fastapi/eodag/models/item.py index e005b26f..1f1eb5c7 100644 --- a/stac_fastapi/eodag/models/item.py +++ b/stac_fastapi/eodag/models/item.py @@ -126,9 +126,7 @@ def create_stac_item( feature["assets"][k]["alternate"] = {"origin": origin} # TODO: remove downloadLink asset after EODAG assets rework - if (download_link := product.properties.get("eodag:download_link")) and not any( - "zarr" in key for key in product.assets - ): + if download_link := product.properties.get("eodag:download_link"): origin_href = download_link if asset_proxy_url: download_link = asset_proxy_url + "/downloadLink" @@ -151,12 +149,6 @@ def create_stac_item( "type": mime_type, }, } - if any("zarr" in key for key in product.assets) and asset_proxy_url: - feature["assets"]["Zarr index"] = { - "title": "Zarr store index", - "href": asset_proxy_url + "/zarr/index", - "type": "application/json", - } feature_model = model.model_validate( { diff --git a/tests/conftest.py b/tests/conftest.py index 2068f2f8..eeb56373 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -535,14 +535,6 @@ def mock_base_data_download_get_data(mocker): return mocker.patch.object(BaseDataDownloadClient, "get_data") -@pytest.fixture(scope="function") -def mock_list_zarr_files_from_metadata(mocker): - """ - Mocks the `list_zarr_files_from_metadata` method of EOProduct. - """ - return mocker.patch("eodag.api.product._product.EOProduct.list_zarr_files_from_metadata") - - @pytest.fixture(scope="function") def mock_data_download_requests_get(mocker): """ diff --git a/tests/test_zarr.py b/tests/test_zarr.py index 61c3d480..87d19eb5 100644 --- a/tests/test_zarr.py +++ b/tests/test_zarr.py @@ -17,7 +17,6 @@ # limitations under the License. """Zarr tests.""" -import json from unittest import mock from eodag import SearchResult @@ -57,8 +56,8 @@ def test_get_data_with_file_delegates_to_get_data(mock_base_data_download_get_da ) -def test_items_response_includes_zarr_index_asset(defaults, mock_search_result, mock_item_get_settings): - """create_stac_item should include a Zarr index asset when a Zarr asset is present.""" +def test_items_response_includes_zarr_asset(defaults, mock_search_result, mock_item_get_settings): + """create_stac_item should include a Zarr asset when present.""" search_result = mock_search_result product = search_result[0] product.assets.update({"example.zarr": {"href": "https://data/cop_dataspace/example.zarr"}}) @@ -85,67 +84,6 @@ def test_items_response_includes_zarr_index_asset(defaults, mock_search_result, item["assets"]["example.zarr"]["href"] == f"http://testserver/data/cop_dataspace/{item['collection']}/{item['id']}/example.zarr" ) - assert "Zarr index" in item["assets"] - assert ( - item["assets"]["Zarr index"]["href"] - == f"http://testserver/data/cop_dataspace/{item['collection']}/{item['id']}/zarr/index" - ) - - -async def test_zarr_index_listing( - defaults, - mock_list_zarr_files_from_metadata, -): - """get_data should return the streamed file index for a .zarr asset.""" - collection = defaults.collection - item_id = "dummy_id" - client = BaseDataDownloadClient() - product = EOProduct( - "cop_dataspace", - dict( - geometry="POINT (0 0)", - title="dummy_product", - id=item_id, - ), - collection=collection, - ) - product.assets.update({"example.zarr": {"href": "https://data/cop_dataspace/example.zarr"}}) - config = PluginConfig() - config.priority = 0 - downloader = HTTPDownload("cop_dataspace", config) - product.register_downloader(downloader=downloader, authenticator=None) - - dag = mock.Mock() - dag.search.return_value = SearchResult([product]) - request = mock.Mock() - request.app.state.dag = dag - request.base_url._url = "http://testserver/" - mock_list_zarr_files_from_metadata.return_value = [ - ".zmetadata", - "group/foo.txt", - ] - - response = client.get_data( - "cop_dataspace", - collection, - item_id, - "example.zarr", - request, - "index", - ) - res = json.loads(response.body) - - assert res["type"] == "stream-file-index" - assert res["item_id"] == item_id - assert res["collection_id"] == collection - assert res["backend"] == "cop_dataspace" - assert res["file_count"] == 2 - assert [f["path"] for f in res["files"]] == [".zmetadata", "group/foo.txt"] - assert ( - res["files"][1]["url"] - == f"http://testserver/data/cop_dataspace/{collection}/{item_id}/example.zarr/group/foo.txt" - ) - mock_list_zarr_files_from_metadata.assert_called_once_with("https://data/cop_dataspace/example.zarr") async def test_zarr_file_display( From b301dbbe469b81fb71d793a6b528cd7684049081 Mon Sep 17 00:00:00 2001 From: cauriol Date: Mon, 13 Apr 2026 09:48:57 +0200 Subject: [PATCH 25/26] fix: pre commit --- stac_fastapi/eodag/extensions/data_download.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/stac_fastapi/eodag/extensions/data_download.py b/stac_fastapi/eodag/extensions/data_download.py index e613f26c..6a76da1e 100644 --- a/stac_fastapi/eodag/extensions/data_download.py +++ b/stac_fastapi/eodag/extensions/data_download.py @@ -222,7 +222,9 @@ def get_data( raise NotFoundError(f"Item {item_id} does not exist. Please order it first") from e raise NotFoundError(e) from e - zarr_asset_name = next((name for name in product.assets if (name.endswith("zarr") and asset_name!="downloadLink")), None) + zarr_asset_name = next( + (name for name in product.assets if (name.endswith("zarr") and asset_name != "downloadLink")), None + ) if zarr_asset_name: asset_values = product.assets[zarr_asset_name] base_url = asset_values["href"] From d74acfbe536bc3be421188e0937099106c74d806 Mon Sep 17 00:00:00 2001 From: cauriol Date: Mon, 13 Apr 2026 11:29:27 +0200 Subject: [PATCH 26/26] fix: remove donwloadlink for zarr --- stac_fastapi/eodag/models/item.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/stac_fastapi/eodag/models/item.py b/stac_fastapi/eodag/models/item.py index 1f1eb5c7..0c40a5ea 100644 --- a/stac_fastapi/eodag/models/item.py +++ b/stac_fastapi/eodag/models/item.py @@ -126,7 +126,9 @@ def create_stac_item( feature["assets"][k]["alternate"] = {"origin": origin} # TODO: remove downloadLink asset after EODAG assets rework - if download_link := product.properties.get("eodag:download_link"): + if (download_link := product.properties.get("eodag:download_link")) and not any( + "zarr" in key for key in product.assets + ): origin_href = download_link if asset_proxy_url: download_link = asset_proxy_url + "/downloadLink"