Skip to content

feat: add async 'for' loop support to LogScanner (#424)#438

Open
qzyu999 wants to merge 4 commits intoapache:mainfrom
qzyu999:feat/424-python-async-iterator
Open

feat: add async 'for' loop support to LogScanner (#424)#438
qzyu999 wants to merge 4 commits intoapache:mainfrom
qzyu999:feat/424-python-async-iterator

Conversation

@qzyu999
Copy link

@qzyu999 qzyu999 commented Mar 9, 2026

Purpose

Linked issue: close #424

This pull request completes Issue #424 by enabling standard cross-boundary native Python async for language built-ins over the high-performance PyO3 wrapped LogScanner stream instance.

Brief change log

Previously, PyFluss developers had to manually orchestrate while True polling loops over network boundaries using scanner.poll(timeout). This PR refactors the Python LogScanner iterator logic by implementing the async traversal natively via Rust __anext__ polling bindings and Python Generator __aiter__ context adapters:

  • State Independence: Refactored ScannerKind internals into a safely buffered Arc<tokio::sync::Mutex<ScannerState>>. This guarantees strict thread-safety and fulfills Rust's lifetime constraints enabling unboxed state transitions inside the python_async_runtimes tokio closure.
  • Asynchronous Execution: Polling evaluates non-blocking loops. PyFluss automatically maps Arrow records onto the .await future yield sequence smoothly without blocking event cycles or hardware threads directly!
  • Iterable Compliance: To correctly resolve runtime inspect.isasyncgen() compliance checks within strictly versioned Python 3.12+ engines (such as modern IPython Jupyter servers), __aiter__ dynamically generates a properly wrapped coroutine generator dynamically inside the codebase via py.run(). This completely masks the Python ecosystem's iterator type limitations automatically out-of-the-box.

Tests

  • [NEW] test_log_table.py::test_async_iterator: Integrated a testcontainers ecosystem confirming zero-configuration iteration capabilities function natively evaluating async for record in scanner perfectly without pipeline interruptions while yielding thousands of appended instances sequentially backwards matching existing legacy data frameworks.

API and Format

Yes, this expands the API natively extending capabilities allowing async for loops gracefully. Existing user logic leveraging explicit implementations of .poll_arrow() or legacy functions are untouched.

Documentation

Yes, I updated integration tests acting as live documentation proof demonstrating the capability natively.

@fresh-borzoni
Copy link
Contributor

fresh-borzoni commented Mar 10, 2026

@qzyu999 Ty for the PR, but I checked this branch out and integration tests for python still hang even when I run them locally. PTAL

@qzyu999
Copy link
Author

qzyu999 commented Mar 10, 2026

@qzyu999 Ty for the PR, but I checked this branch out and integration tests for python still hang even when I run them locally. PTAL

Hi @fresh-borzoni, applied the fix. Ran cargo fmt --all locally and it passed. Please run the CI again.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Python: add async 'for' loop support to LogScanner

2 participants