-
Notifications
You must be signed in to change notification settings - Fork 408
fix(x402): implement background reconciliation for silent orphan payments (#588) #589
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
Meenbudha
wants to merge
1
commit into
GetBindu:main
Choose a base branch
from
Meenbudha:fix/588-x402-reconciliation
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+256
−1
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,126 @@ | ||
| """Background worker for reconciling failed x402 payment settlements. | ||
|
|
||
| Queries the facilitator periodically to verify if transaction nonces | ||
| for failed tasks have been settled on-chain (reconciling transient timeouts). | ||
| """ | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import asyncio | ||
| from typing import Any | ||
|
|
||
| from x402 import PaymentPayload, PaymentRequirements | ||
| from x402.http import FacilitatorConfig, HTTPFacilitatorClient | ||
|
|
||
| from bindu.settings import app_settings | ||
| from bindu.utils.logging import get_logger | ||
|
|
||
| logger = get_logger("bindu.server.workers.x402_reconciliation") | ||
|
|
||
|
|
||
| async def run_x402_reconciliation_loop(storage: Any, interval_seconds: float = 30.0) -> None: | ||
| """Run the background periodic loop for x402 payment reconciliation.""" | ||
| logger.info("Starting x402 payment reconciliation background loop") | ||
| while True: | ||
| try: | ||
| await asyncio.sleep(interval_seconds) | ||
| await reconcile_failed_payments(storage) | ||
| except asyncio.CancelledError: | ||
| logger.info("x402 payment reconciliation loop cancelled") | ||
| break | ||
| except Exception as e: | ||
| logger.exception("Error in x402 payment reconciliation loop: %s", e) | ||
|
Meenbudha marked this conversation as resolved.
|
||
|
|
||
|
|
||
| async def reconcile_failed_payments(storage: Any) -> None: | ||
| """Scan recent tasks for failed settlements and check if they confirmed on-chain.""" | ||
| try: | ||
| # Check most recent 100 tasks in the database | ||
| tasks = await storage.list_tasks(length=100) | ||
| except Exception as e: | ||
| logger.warning("Failed to list tasks for x402 reconciliation: %s", e) | ||
| return | ||
|
|
||
| for task in tasks: | ||
| # We only reconcile tasks that are in a terminal 'failed' state | ||
| if task.get("status", {}).get("state") != "failed": | ||
| continue | ||
|
|
||
| metadata = task.get("metadata") or {} | ||
| payment_status = metadata.get(app_settings.x402.meta_status_key) | ||
|
|
||
| # Only process tasks whose payment status is specifically 'payment-failed' | ||
| if payment_status != app_settings.x402.status_failed: | ||
| continue | ||
|
|
||
| # Extract payment context from the initial message metadata to get the original payload | ||
| history = task.get("history") or [] | ||
| if not history: | ||
| continue | ||
|
|
||
| first_msg = history[0] | ||
| msg_metadata = first_msg.get("metadata") or {} | ||
| payment_context = msg_metadata.get("_payment_context") | ||
| if not payment_context: | ||
| continue | ||
|
|
||
| payment_payload_dict = payment_context.get("payment_payload") | ||
| payment_requirements_dict = payment_context.get("payment_requirements") | ||
|
|
||
| if not payment_payload_dict or not payment_requirements_dict: | ||
| continue | ||
|
|
||
| logger.info("Reconciling payment status for task: %s", task["id"]) | ||
|
|
||
| try: | ||
| # Reconstitute the Pydantic models from the stored payment context | ||
| payment_payload = PaymentPayload.model_validate(payment_payload_dict) | ||
| payment_requirements = PaymentRequirements.model_validate( | ||
| payment_requirements_dict | ||
| ) | ||
|
|
||
| # Build a facilitator client | ||
| facilitator = HTTPFacilitatorClient( | ||
| FacilitatorConfig(url=app_settings.x402.facilitator_url) | ||
| ) | ||
|
|
||
| # Re-attempt settle (this is idempotent on the facilitator/node side) | ||
| settle_response = await facilitator.settle( | ||
| payment_payload, payment_requirements | ||
| ) | ||
|
|
||
| if settle_response.success: | ||
| logger.info( | ||
| "Reconciliation succeeded: payment for task %s confirmed on-chain. " | ||
| "Updating payment status to payment-orphaned.", | ||
| task["id"], | ||
| ) | ||
| # Flip payment status to 'payment-orphaned' because payment went through | ||
| # but the task remained unexecuted (failed during initial worker state). | ||
| updated_metadata = { | ||
| **metadata, | ||
| app_settings.x402.meta_status_key: "payment-orphaned", | ||
| app_settings.x402.meta_receipts_key: [settle_response.model_dump()], | ||
| } | ||
| # Clear error reasons as we resolved this payment | ||
| updated_metadata[app_settings.x402.meta_error_key] = None | ||
|
|
||
| await storage.update_task( | ||
| task["id"], | ||
| state="failed", | ||
| metadata=updated_metadata, | ||
| ) | ||
| else: | ||
| logger.debug( | ||
| "Reconciliation check for task %s completed. On-chain settlement not confirmed: %s", | ||
| task["id"], | ||
| settle_response.error_reason, | ||
| ) | ||
|
|
||
| except Exception as e: | ||
| # Catch transient facilitator network/timeout errors so we try again next tick | ||
| logger.debug( | ||
| "Facilitator error during reconciliation check for task %s: %s", | ||
| task["id"], | ||
| e, | ||
| ) | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,109 @@ | ||
| from __future__ import annotations | ||
|
|
||
| from unittest.mock import AsyncMock, MagicMock, patch | ||
| from uuid import uuid4 | ||
|
|
||
| import pytest | ||
|
|
||
| from x402 import PaymentPayload, PaymentRequirements | ||
| from bindu.server.storage.memory_storage import InMemoryStorage | ||
| from bindu.server.workers.x402_reconciliation import reconcile_failed_payments | ||
| from bindu.settings import app_settings | ||
|
|
||
| REQUIREMENT = PaymentRequirements( | ||
| scheme="exact", | ||
| network="eip155:84532", | ||
| asset="0x036cbd53842c5426634e7929541ec2318f3dcf7e", | ||
| amount="1000000", # 1 USDC (6 decimals) | ||
| pay_to="0xa11ce000000000000000000000000000000a11ce", | ||
| max_timeout_seconds=60, | ||
| extra={"name": "USDC", "version": "2"}, | ||
| ) | ||
|
|
||
|
|
||
| def make_payload(nonce: str) -> PaymentPayload: | ||
| """Build a syntactically valid EIP-3009 payload for the canonical requirement.""" | ||
| return PaymentPayload( | ||
| x402_version=2, | ||
| payload={ | ||
| "signature": "0x" + "00" * 65, | ||
| "authorization": { | ||
| "from": "0xb0b0b0b0b0b0b0b0b0b0b0b0b0b0b0b0b0b0b0b0", | ||
| "to": REQUIREMENT.pay_to, | ||
| "value": REQUIREMENT.amount, | ||
| "validAfter": "0", | ||
| "validBefore": "9999999999", | ||
| "nonce": nonce, | ||
| }, | ||
| }, | ||
| accepted=REQUIREMENT, | ||
| ) | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_reconciliation_worker_success(): | ||
| """Verify that the reconciliation worker checks failed payments and marks successful ones as payment-orphaned.""" | ||
| storage = InMemoryStorage() | ||
| task_id = uuid4() | ||
| context_id = uuid4() | ||
| nonce = "0x" + "ab" * 32 | ||
|
|
||
| payload = make_payload(nonce) | ||
| payment_context = { | ||
| "payment_payload": payload.model_dump(by_alias=True), | ||
| "payment_requirements": REQUIREMENT.model_dump(by_alias=True), | ||
| "verify_response": {"is_valid": True, "invalid_reason": None}, | ||
| } | ||
|
|
||
| # Create initial message with payment context inside metadata | ||
| message = { | ||
| "task_id": task_id, | ||
| "context_id": context_id, | ||
| "message_id": uuid4(), | ||
| "role": "user", | ||
| "parts": [{"kind": "text", "text": "test task"}], | ||
| "metadata": { | ||
| "_payment_context": payment_context | ||
| } | ||
| } | ||
|
|
||
| # Submit task to storage | ||
| await storage.submit_task(context_id, message) # type: ignore[arg-type] # ty: ignore[invalid-argument-type] | ||
|
|
||
| # Initial metadata when payment failed during first settle attempt | ||
| metadata = { | ||
| app_settings.x402.meta_status_key: "payment-failed", | ||
| app_settings.x402.meta_error_key: "upstream timeout", | ||
| "x402_nonce": nonce, | ||
| "x402_network": "eip155:84532", | ||
| } | ||
|
|
||
| # Update task in storage to terminal failed state with failed payment metadata | ||
| await storage.update_task( | ||
| task_id, | ||
| state="failed", | ||
| metadata=metadata | ||
| ) | ||
|
|
||
| # Patch the facilitator at the boundary _settle_payment uses. | ||
| with patch("bindu.server.workers.x402_reconciliation.HTTPFacilitatorClient") as mock_fac_class: | ||
| mock_fac = AsyncMock() | ||
| response = MagicMock() | ||
| response.success = True | ||
| response.error_reason = None | ||
| response.model_dump = MagicMock(return_value={"transaction": "0xreconciledtx"}) | ||
| mock_fac.settle = AsyncMock(return_value=response) | ||
| mock_fac_class.return_value = mock_fac | ||
|
|
||
| # Run the reconciliation step | ||
| await reconcile_failed_payments(storage) | ||
|
|
||
| # Assert task is updated in storage | ||
| updated_task = await storage.load_task(task_id) | ||
| assert updated_task is not None | ||
| assert updated_task["status"]["state"] == "failed" | ||
|
|
||
| meta = updated_task.get("metadata") or {} | ||
| assert meta.get(app_settings.x402.meta_status_key) == "payment-orphaned" | ||
| assert meta.get(app_settings.x402.meta_receipts_key) == [{"transaction": "0xreconciledtx"}] | ||
| assert meta.get(app_settings.x402.meta_error_key) is None |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.