Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bindu/server/applications.py
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,7 @@ def _create_auth_middleware(self) -> Middleware:

if provider == "hydra":
logger.info("Hydra OAuth2 authentication enabled")
return Middleware(HydraMiddleware, auth_config=app_settings.hydra) # type: ignore[arg-type] # ty: ignore[invalid-argument-type]
return Middleware(HydraMiddleware, auth_config=app_settings.hydra) # type: ignore[arg-type]
else:
logger.error(f"Unknown authentication provider: {provider}")
raise ValueError(
Expand Down
20 changes: 20 additions & 0 deletions bindu/server/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@

from __future__ import annotations

import asyncio
import uuid
from contextlib import AsyncExitStack
from dataclasses import dataclass, field
Expand Down Expand Up @@ -119,6 +120,7 @@ class TaskManager:

_aexit_stack: AsyncExitStack | None = field(default=None, init=False)
_workers: list[ManifestWorker] = field(default_factory=list, init=False)
_reconciliation_task: asyncio.Task | None = field(default=None, init=False)
_push_manager: PushNotificationManager = field(init=False)
_message_handlers: MessageHandlers = field(init=False)
_task_handlers: TaskHandlers = field(init=False)
Expand Down Expand Up @@ -150,6 +152,14 @@ async def __aenter__(self) -> TaskManager:
self._workers.append(worker)
await self._aexit_stack.enter_async_context(worker.run())

# Start background x402 payment reconciliation loop
from .workers.x402_reconciliation import run_x402_reconciliation_loop

self._reconciliation_task = asyncio.create_task(
run_x402_reconciliation_loop(self.storage),
name="x402-reconciliation-loop",
)

# Initialize handlers after workers are created
self._message_handlers = MessageHandlers(
scheduler=self.scheduler,
Expand Down Expand Up @@ -179,6 +189,16 @@ def is_running(self) -> bool:

async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
"""Clean up resources and stop all components."""
if self._reconciliation_task:
self._reconciliation_task.cancel()
try:
await self._reconciliation_task
except asyncio.CancelledError:
pass
except Exception as e:
logger.warning("x402 reconciliation task exited with error: %s", e)
self._reconciliation_task = None

Comment thread
Meenbudha marked this conversation as resolved.
if self._aexit_stack is None:
raise RuntimeError("TaskManager was not properly initialized.")
await self._aexit_stack.__aexit__(exc_type, exc_value, traceback)
Expand Down
126 changes: 126 additions & 0 deletions bindu/server/workers/x402_reconciliation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
"""Background worker for reconciling failed x402 payment settlements.

Queries the facilitator periodically to verify if transaction nonces
for failed tasks have been settled on-chain (reconciling transient timeouts).
"""

from __future__ import annotations

import asyncio
from typing import Any

from x402 import PaymentPayload, PaymentRequirements
from x402.http import FacilitatorConfig, HTTPFacilitatorClient

from bindu.settings import app_settings
from bindu.utils.logging import get_logger

logger = get_logger("bindu.server.workers.x402_reconciliation")


async def run_x402_reconciliation_loop(storage: Any, interval_seconds: float = 30.0) -> None:
"""Run the background periodic loop for x402 payment reconciliation."""
logger.info("Starting x402 payment reconciliation background loop")
while True:
try:
await asyncio.sleep(interval_seconds)
await reconcile_failed_payments(storage)
except asyncio.CancelledError:
logger.info("x402 payment reconciliation loop cancelled")
break
except Exception as e:
logger.exception("Error in x402 payment reconciliation loop: %s", e)
Comment thread
Meenbudha marked this conversation as resolved.


async def reconcile_failed_payments(storage: Any) -> None:
"""Scan recent tasks for failed settlements and check if they confirmed on-chain."""
try:
# Check most recent 100 tasks in the database
tasks = await storage.list_tasks(length=100)
except Exception as e:
logger.warning("Failed to list tasks for x402 reconciliation: %s", e)
return

for task in tasks:
# We only reconcile tasks that are in a terminal 'failed' state
if task.get("status", {}).get("state") != "failed":
continue

metadata = task.get("metadata") or {}
payment_status = metadata.get(app_settings.x402.meta_status_key)

# Only process tasks whose payment status is specifically 'payment-failed'
if payment_status != app_settings.x402.status_failed:
continue

# Extract payment context from the initial message metadata to get the original payload
history = task.get("history") or []
if not history:
continue

first_msg = history[0]
msg_metadata = first_msg.get("metadata") or {}
payment_context = msg_metadata.get("_payment_context")
if not payment_context:
continue

payment_payload_dict = payment_context.get("payment_payload")
payment_requirements_dict = payment_context.get("payment_requirements")

if not payment_payload_dict or not payment_requirements_dict:
continue

logger.info("Reconciling payment status for task: %s", task["id"])

try:
# Reconstitute the Pydantic models from the stored payment context
payment_payload = PaymentPayload.model_validate(payment_payload_dict)
payment_requirements = PaymentRequirements.model_validate(
payment_requirements_dict
)

# Build a facilitator client
facilitator = HTTPFacilitatorClient(
FacilitatorConfig(url=app_settings.x402.facilitator_url)
)

# Re-attempt settle (this is idempotent on the facilitator/node side)
settle_response = await facilitator.settle(
payment_payload, payment_requirements
)

if settle_response.success:
logger.info(
"Reconciliation succeeded: payment for task %s confirmed on-chain. "
"Updating payment status to payment-orphaned.",
task["id"],
)
# Flip payment status to 'payment-orphaned' because payment went through
# but the task remained unexecuted (failed during initial worker state).
updated_metadata = {
**metadata,
app_settings.x402.meta_status_key: "payment-orphaned",
app_settings.x402.meta_receipts_key: [settle_response.model_dump()],
}
# Clear error reasons as we resolved this payment
updated_metadata[app_settings.x402.meta_error_key] = None

await storage.update_task(
task["id"],
state="failed",
metadata=updated_metadata,
)
else:
logger.debug(
"Reconciliation check for task %s completed. On-chain settlement not confirmed: %s",
task["id"],
settle_response.error_reason,
)

except Exception as e:
# Catch transient facilitator network/timeout errors so we try again next tick
logger.debug(
"Facilitator error during reconciliation check for task %s: %s",
task["id"],
e,
)
109 changes: 109 additions & 0 deletions tests/integration/x402/test_reconciliation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
from __future__ import annotations

from unittest.mock import AsyncMock, MagicMock, patch
from uuid import uuid4

import pytest

from x402 import PaymentPayload, PaymentRequirements
from bindu.server.storage.memory_storage import InMemoryStorage
from bindu.server.workers.x402_reconciliation import reconcile_failed_payments
from bindu.settings import app_settings

REQUIREMENT = PaymentRequirements(
scheme="exact",
network="eip155:84532",
asset="0x036cbd53842c5426634e7929541ec2318f3dcf7e",
amount="1000000", # 1 USDC (6 decimals)
pay_to="0xa11ce000000000000000000000000000000a11ce",
max_timeout_seconds=60,
extra={"name": "USDC", "version": "2"},
)


def make_payload(nonce: str) -> PaymentPayload:
"""Build a syntactically valid EIP-3009 payload for the canonical requirement."""
return PaymentPayload(
x402_version=2,
payload={
"signature": "0x" + "00" * 65,
"authorization": {
"from": "0xb0b0b0b0b0b0b0b0b0b0b0b0b0b0b0b0b0b0b0b0",
"to": REQUIREMENT.pay_to,
"value": REQUIREMENT.amount,
"validAfter": "0",
"validBefore": "9999999999",
"nonce": nonce,
},
},
accepted=REQUIREMENT,
)


@pytest.mark.asyncio
async def test_reconciliation_worker_success():
"""Verify that the reconciliation worker checks failed payments and marks successful ones as payment-orphaned."""
storage = InMemoryStorage()
task_id = uuid4()
context_id = uuid4()
nonce = "0x" + "ab" * 32

payload = make_payload(nonce)
payment_context = {
"payment_payload": payload.model_dump(by_alias=True),
"payment_requirements": REQUIREMENT.model_dump(by_alias=True),
"verify_response": {"is_valid": True, "invalid_reason": None},
}

# Create initial message with payment context inside metadata
message = {
"task_id": task_id,
"context_id": context_id,
"message_id": uuid4(),
"role": "user",
"parts": [{"kind": "text", "text": "test task"}],
"metadata": {
"_payment_context": payment_context
}
}

# Submit task to storage
await storage.submit_task(context_id, message) # type: ignore[arg-type] # ty: ignore[invalid-argument-type]

# Initial metadata when payment failed during first settle attempt
metadata = {
app_settings.x402.meta_status_key: "payment-failed",
app_settings.x402.meta_error_key: "upstream timeout",
"x402_nonce": nonce,
"x402_network": "eip155:84532",
}

# Update task in storage to terminal failed state with failed payment metadata
await storage.update_task(
task_id,
state="failed",
metadata=metadata
)

# Patch the facilitator at the boundary _settle_payment uses.
with patch("bindu.server.workers.x402_reconciliation.HTTPFacilitatorClient") as mock_fac_class:
mock_fac = AsyncMock()
response = MagicMock()
response.success = True
response.error_reason = None
response.model_dump = MagicMock(return_value={"transaction": "0xreconciledtx"})
mock_fac.settle = AsyncMock(return_value=response)
mock_fac_class.return_value = mock_fac

# Run the reconciliation step
await reconcile_failed_payments(storage)

# Assert task is updated in storage
updated_task = await storage.load_task(task_id)
assert updated_task is not None
assert updated_task["status"]["state"] == "failed"

meta = updated_task.get("metadata") or {}
assert meta.get(app_settings.x402.meta_status_key) == "payment-orphaned"
assert meta.get(app_settings.x402.meta_receipts_key) == [{"transaction": "0xreconciledtx"}]
assert meta.get(app_settings.x402.meta_error_key) is None