Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 104 additions & 28 deletions src/aleph/sdk/client/authenticated_http.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import hashlib
import json
import logging
Expand Down Expand Up @@ -116,35 +117,81 @@ async def storage_push(self, content: Mapping) -> str:
resp.raise_for_status()
return (await resp.json()).get("hash")

async def ipfs_push_file(self, file_content: bytes) -> str:
async def ipfs_push_file(
self, file_content: bytes, max_retries: int = 3, retry_delay: float = 1.0
) -> str:
"""
Push a file to the IPFS service.

:param file_content: The file content to upload
:param max_retries: Maximum number of retries on 404 (for network propagation delays)
:param retry_delay: Initial delay between retries in seconds (doubles each retry)
"""
data = aiohttp.FormData()
data.add_field("file", BytesIO(file_content))

url = "/api/v0/ipfs/add_file"
logger.debug(f"Pushing file to IPFS on {url}")

async with self.http_session.post(url, data=data) as resp:
resp.raise_for_status()
return (await resp.json()).get("hash")
last_error: Optional[aiohttp.ClientResponseError] = None
for attempt in range(max_retries + 1):
data = aiohttp.FormData()
data.add_field("file", BytesIO(file_content))

async with self.http_session.post(url, data=data) as resp:
if resp.status == 404 and attempt < max_retries:
delay = retry_delay * (2**attempt)
logger.debug(
f"Got 404 on file upload, retrying in {delay}s "
f"(attempt {attempt + 1}/{max_retries})"
)
await asyncio.sleep(delay)
continue
resp.raise_for_status()
return (await resp.json()).get("hash")

# This should not be reached, but just in case
if last_error:
raise last_error
raise aiohttp.ClientResponseError(
request_info=None, # type: ignore
history=(),
status=404,
message="File upload failed after retries",
)

async def storage_push_file(self, file_content: bytes) -> Optional[str]:
async def storage_push_file(
self, file_content: bytes, max_retries: int = 3, retry_delay: float = 1.0
) -> Optional[str]:
"""
Push a file to the storage service.
"""
data = aiohttp.FormData()
data.add_field("file", BytesIO(file_content))

:param file_content: The file content to upload
:param max_retries: Maximum number of retries on 404 (for network propagation delays)
:param retry_delay: Initial delay between retries in seconds (doubles each retry)
"""
url = "/api/v0/storage/add_file"
logger.debug(f"Posting file on {url}")

async with self.http_session.post(url, data=data) as resp:
resp.raise_for_status()
return (await resp.json()).get("hash")
for attempt in range(max_retries + 1):
data = aiohttp.FormData()
data.add_field("file", BytesIO(file_content))

async with self.http_session.post(url, data=data) as resp:
if resp.status == 404 and attempt < max_retries:
delay = retry_delay * (2**attempt)
logger.debug(
f"Got 404 on file upload, retrying in {delay}s "
f"(attempt {attempt + 1}/{max_retries})"
)
await asyncio.sleep(delay)
continue
resp.raise_for_status()
return (await resp.json()).get("hash")

raise aiohttp.ClientResponseError(
request_info=None, # type: ignore
history=(),
status=404,
message="File upload failed after retries",
)

@staticmethod
def _log_publication_status(publication_status: Mapping[str, Any]):
Expand Down Expand Up @@ -619,10 +666,18 @@ async def _storage_push_file_with_message(
store_content: StoreContent,
channel: Optional[str] = settings.DEFAULT_CHANNEL,
sync: bool = False,
max_retries: int = 3,
retry_delay: float = 1.0,
) -> Tuple[StoreMessage, MessageStatus]:
"""Push a file to the storage service."""
data = aiohttp.FormData()
"""Push a file to the storage service.

:param file_content: The file content to upload
:param store_content: The store content metadata
:param channel: The channel to publish to
:param sync: Whether to wait for the message to be processed
:param max_retries: Maximum number of retries on 404 (for network propagation delays)
:param retry_delay: Initial delay between retries in seconds (doubles each retry)
"""
# Prepare the STORE message
message = await self.generate_signed_message(
message_type=MessageType.store,
Expand All @@ -633,23 +688,44 @@ async def _storage_push_file_with_message(
"message": message.model_dump(exclude_none=True),
"sync": sync,
}
data.add_field(
"metadata",
json.dumps(metadata, default=extended_json_encoder),
content_type="application/json",
)
# Add the file
data.add_field("file", BytesIO(file_content))
metadata_json = json.dumps(metadata, default=extended_json_encoder)

url = "/api/v0/storage/add_file"
logger.debug(f"Posting file on {url}")

async with self.http_session.post(url, data=data) as resp:
resp.raise_for_status()
message_status = (
MessageStatus.PENDING if resp.status == 202 else MessageStatus.PROCESSED
for attempt in range(max_retries + 1):
# FormData must be recreated for each attempt as it's consumed by the request
data = aiohttp.FormData()
data.add_field(
"metadata",
metadata_json,
content_type="application/json",
)
return message, message_status # type: ignore
data.add_field("file", BytesIO(file_content))

async with self.http_session.post(url, data=data) as resp:
if resp.status == 404 and attempt < max_retries:
delay = retry_delay * (2**attempt)
logger.debug(
f"Got 404 on file upload, retrying in {delay}s "
f"(attempt {attempt + 1}/{max_retries})"
)
await asyncio.sleep(delay)
continue
resp.raise_for_status()
message_status = (
MessageStatus.PENDING
if resp.status == 202
else MessageStatus.PROCESSED
)
return message, message_status # type: ignore

raise aiohttp.ClientResponseError(
request_info=None, # type: ignore
history=(),
status=404,
message="File upload failed after retries",
)

async def _upload_file_native(
self,
Expand Down
Loading