diff --git a/src/aleph/sdk/client/authenticated_http.py b/src/aleph/sdk/client/authenticated_http.py index 4528a5b7..3f12eb5a 100644 --- a/src/aleph/sdk/client/authenticated_http.py +++ b/src/aleph/sdk/client/authenticated_http.py @@ -1,3 +1,4 @@ +import asyncio import hashlib import json import logging @@ -116,35 +117,81 @@ async def storage_push(self, content: Mapping) -> str: resp.raise_for_status() return (await resp.json()).get("hash") - async def ipfs_push_file(self, file_content: bytes) -> str: + async def ipfs_push_file( + self, file_content: bytes, max_retries: int = 3, retry_delay: float = 1.0 + ) -> str: """ Push a file to the IPFS service. :param file_content: The file content to upload + :param max_retries: Maximum number of retries on 404 (for network propagation delays) + :param retry_delay: Initial delay between retries in seconds (doubles each retry) """ - data = aiohttp.FormData() - data.add_field("file", BytesIO(file_content)) - url = "/api/v0/ipfs/add_file" logger.debug(f"Pushing file to IPFS on {url}") - async with self.http_session.post(url, data=data) as resp: - resp.raise_for_status() - return (await resp.json()).get("hash") + last_error: Optional[aiohttp.ClientResponseError] = None + for attempt in range(max_retries + 1): + data = aiohttp.FormData() + data.add_field("file", BytesIO(file_content)) + + async with self.http_session.post(url, data=data) as resp: + if resp.status == 404 and attempt < max_retries: + delay = retry_delay * (2**attempt) + logger.debug( + f"Got 404 on file upload, retrying in {delay}s " + f"(attempt {attempt + 1}/{max_retries})" + ) + await asyncio.sleep(delay) + continue + resp.raise_for_status() + return (await resp.json()).get("hash") + + # This should not be reached, but just in case + if last_error: + raise last_error + raise aiohttp.ClientResponseError( + request_info=None, # type: ignore + history=(), + status=404, + message="File upload failed after retries", + ) - async def storage_push_file(self, file_content: bytes) -> Optional[str]: + async def storage_push_file( + self, file_content: bytes, max_retries: int = 3, retry_delay: float = 1.0 + ) -> Optional[str]: """ Push a file to the storage service. - """ - data = aiohttp.FormData() - data.add_field("file", BytesIO(file_content)) + :param file_content: The file content to upload + :param max_retries: Maximum number of retries on 404 (for network propagation delays) + :param retry_delay: Initial delay between retries in seconds (doubles each retry) + """ url = "/api/v0/storage/add_file" logger.debug(f"Posting file on {url}") - async with self.http_session.post(url, data=data) as resp: - resp.raise_for_status() - return (await resp.json()).get("hash") + for attempt in range(max_retries + 1): + data = aiohttp.FormData() + data.add_field("file", BytesIO(file_content)) + + async with self.http_session.post(url, data=data) as resp: + if resp.status == 404 and attempt < max_retries: + delay = retry_delay * (2**attempt) + logger.debug( + f"Got 404 on file upload, retrying in {delay}s " + f"(attempt {attempt + 1}/{max_retries})" + ) + await asyncio.sleep(delay) + continue + resp.raise_for_status() + return (await resp.json()).get("hash") + + raise aiohttp.ClientResponseError( + request_info=None, # type: ignore + history=(), + status=404, + message="File upload failed after retries", + ) @staticmethod def _log_publication_status(publication_status: Mapping[str, Any]): @@ -619,10 +666,18 @@ async def _storage_push_file_with_message( store_content: StoreContent, channel: Optional[str] = settings.DEFAULT_CHANNEL, sync: bool = False, + max_retries: int = 3, + retry_delay: float = 1.0, ) -> Tuple[StoreMessage, MessageStatus]: - """Push a file to the storage service.""" - data = aiohttp.FormData() + """Push a file to the storage service. + :param file_content: The file content to upload + :param store_content: The store content metadata + :param channel: The channel to publish to + :param sync: Whether to wait for the message to be processed + :param max_retries: Maximum number of retries on 404 (for network propagation delays) + :param retry_delay: Initial delay between retries in seconds (doubles each retry) + """ # Prepare the STORE message message = await self.generate_signed_message( message_type=MessageType.store, @@ -633,23 +688,44 @@ async def _storage_push_file_with_message( "message": message.model_dump(exclude_none=True), "sync": sync, } - data.add_field( - "metadata", - json.dumps(metadata, default=extended_json_encoder), - content_type="application/json", - ) - # Add the file - data.add_field("file", BytesIO(file_content)) + metadata_json = json.dumps(metadata, default=extended_json_encoder) url = "/api/v0/storage/add_file" logger.debug(f"Posting file on {url}") - async with self.http_session.post(url, data=data) as resp: - resp.raise_for_status() - message_status = ( - MessageStatus.PENDING if resp.status == 202 else MessageStatus.PROCESSED + for attempt in range(max_retries + 1): + # FormData must be recreated for each attempt as it's consumed by the request + data = aiohttp.FormData() + data.add_field( + "metadata", + metadata_json, + content_type="application/json", ) - return message, message_status # type: ignore + data.add_field("file", BytesIO(file_content)) + + async with self.http_session.post(url, data=data) as resp: + if resp.status == 404 and attempt < max_retries: + delay = retry_delay * (2**attempt) + logger.debug( + f"Got 404 on file upload, retrying in {delay}s " + f"(attempt {attempt + 1}/{max_retries})" + ) + await asyncio.sleep(delay) + continue + resp.raise_for_status() + message_status = ( + MessageStatus.PENDING + if resp.status == 202 + else MessageStatus.PROCESSED + ) + return message, message_status # type: ignore + + raise aiohttp.ClientResponseError( + request_info=None, # type: ignore + history=(), + status=404, + message="File upload failed after retries", + ) async def _upload_file_native( self,