Conversation
AndreaBozzo
left a comment
There was a problem hiding this comment.
while i'm very in favor of the idea behind this, there a few concerns about the implementation
| mut state: AvroReaderState<R>, | ||
| ) -> impl Stream<Item = Result<RecordBatch, ArrowError>> + Send + 'static { | ||
| async_stream::try_stream! { | ||
| if state.meta.range.start >= state.meta.range.end { |
There was a problem hiding this comment.
try_stream! is sequential? There's no actual prefetching or concurrent work happening, did i miss something?
There was a problem hiding this comment.
This implementation is entirely pull-based. The reader API exposes the stream, which can be driven by a dedicated async task (with the granularity of a record batch at a time) if beneficial to the application.
There is a comment about not being able to use the inner spawn helper because of other constraints on the trait design.
(Responding because I had an input on this change.)
There was a problem hiding this comment.
In my end-to-end benchmark, prefetching is done by the object-store implementation, specifically in the call to get_opts
| tokio = { version = "1.0", optional = true, default-features = false, features = ["macros", "rt", "io-util"] } | ||
| tokio-util = { version = "0.7.18", default-features = false, features = ["io"] } | ||
| async-stream = "0.3.6" | ||
|
|
There was a problem hiding this comment.
Async stuff should be feature gated i think
There was a problem hiding this comment.
do you mean feature gating AsyncFileReader entirely (which wasn't feature gated before) or having two implementations for it (which I disklike)?
There was a problem hiding this comment.
These dependencies should be optional and enabled by the "async" feature, like tokio above.
There was a problem hiding this comment.
AsyncFileReader was indeed feature-gated before, so I'll make these two dependencies optional and add them as part of async feature
Which issue does this PR close?
Rationale for this change
Implement an async streamed reader for avro, this is similar to how datafusion handles json and csv scanning.
There are two main advantages:
What changes are included in this PR?
Change the avro reader implementation to use async streams.
Are these changes tested?
Yes, added tests.
Are there any user-facing changes?
Users can now implement get_stream as part of the
AsyncFileReadertrait