Skip to content

Commit 466f30f

Browse files
committed
docs(async): explain the asynchronous streaming steps
1 parent e76f92c commit 466f30f

File tree

1 file changed

+6
-3
lines changed

1 file changed

+6
-3
lines changed

mellea/stdlib/base.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -292,15 +292,16 @@ async def astream(self) -> str:
292292

293293
# Type of the chunk depends on the backend.
294294
chunks: list[Any | None] = []
295+
296+
# Step 1: collect all immediately available chunks
295297
while True:
296298
try:
297299
item = self._async_queue.get_nowait()
298300
chunks.append(item)
299301
except asyncio.QueueEmpty:
300-
# We've exhausted the current items in the queue.
301302
break
302303

303-
# Make sure we always get the minimum chunk size.
304+
# Step 2: Loop forever until it collects a certain number of chunks or it collects a None (a sentinel value) or an Exception
304305
while len(chunks) <= self._chunk_size:
305306
if len(chunks) > 0:
306307
if chunks[-1] is None or isinstance(chunks[-1], Exception):
@@ -312,7 +313,7 @@ async def astream(self) -> str:
312313
item = await self._async_queue.get()
313314
chunks.append(item)
314315

315-
# Process the sentinel value if it's there.
316+
# Step 3: If Step 2 stopped because of the sentinel value, cancel other tasks
316317
if chunks[-1] is None:
317318
chunks.pop() # Remove the sentinel value.
318319
self._computed = True
@@ -334,10 +335,12 @@ async def astream(self) -> str:
334335
# chunks. We should investigate allowing recovery in the future.
335336
raise chunks[-1]
336337

338+
# Step 4: process collected chunks
337339
for chunk in chunks:
338340
assert self._process is not None
339341
await self._process(self, chunk)
340342

343+
# Step 5: run postprocess
341344
if self._computed:
342345
assert self._post_process is not None
343346
await self._post_process(self)

0 commit comments

Comments
 (0)