feat: migrate python sdk from grpc to fibp binary protocol#11
Open
vieiralucas wants to merge 19 commits intomainfrom
Open
feat: migrate python sdk from grpc to fibp binary protocol#11vieiralucas wants to merge 19 commits intomainfrom
vieiralucas wants to merge 19 commits intomainfrom
Conversation
feat: transparent leader hint reconnect on consume
add batch_enqueue() for explicit multi-message RPCs, smart batching via BatchMode (AUTO/DISABLED/Linger) that routes enqueue() through a background batcher thread, and delivery batching that unpacks ConsumeResponse.messages repeated field. update proto to include BatchEnqueue RPC and ConsumeResponse batched messages field. single-item optimization uses singular Enqueue RPC to preserve error types. close() drains pending messages before disconnecting.
Update Python SDK for the unified proto API: - Enqueue RPC now uses repeated EnqueueMessage/EnqueueResult - BatchEnqueue RPC removed; enqueue_many() replaces batch_enqueue() - Ack/Nack use repeated AckMessage/NackMessage with per-message results - ConsumeResponse only has repeated messages field - BatchMode renamed to AccumulatorMode, BatchEnqueueResult to EnqueueResult - BatchEnqueueError renamed to EnqueueError - Linger.batch_size renamed to Linger.max_messages - Per-message error codes mapped to typed SDK exceptions (e.g. ENQUEUE_ERROR_CODE_QUEUE_NOT_FOUND -> QueueNotFoundError) - All 31 tests pass (16 unit, 15 integration)
feat: 30.2 — unified api surface
- remove unused imports (EnqueueError in client/async_client, Any in batcher, threading in test_batcher) — fixes ruff F401 - move MessageNotFoundError/RPCError to top-level imports, eliminating inline imports that triggered ruff I001 (unsorted import blocks) - move ConsumeMessage/EnqueueResult into TYPE_CHECKING block in async_client — fixes ruff TC001 - fix EnqueueError docstring to reflect that it is also raised as fallback for per-message errors via _map_enqueue_result_error - strengthen test_flush_single to assert actual request content sent to Enqueue, not just call count - downgrade GRPC_GENERATED_VERSION from 1.78.1 to 1.78.0 in all generated grpc stubs (grpcio 1.78.1 was yanked from PyPI)
replace the entire grpc transport layer with a native fibp implementation. the sdk now communicates directly over tcp using the fila binary protocol, removing the grpcio and protobuf dependencies entirely. - add fila/fibp/ module: primitives (Reader/Writer), opcodes, codec - add fila/conn.py: sync Connection and async AsyncConnection classes - rewrite client.py and async_client.py to use fibp connections - rewrite batcher.py to use Connection instead of grpc stubs - rewrite errors.py with fibp error code mapping (18 error codes) - add admin methods: create/delete queue, stats, config, redrive - add auth methods: create/revoke api key, acl management - add new error types: UnauthorizedError, ForbiddenError, NotLeaderError, etc. - add ConsumeMessage fields: weight, throttle_keys, enqueued_at, leased_at - delete fila/v1/ (generated protobuf) and proto/ directories - remove grpcio/protobuf from dependencies, bump version to 0.3.0 - add test_fibp.py with 34 codec/primitives unit tests - rewrite test_batcher.py for fibp mock connections - update integration tests and conftest.py for fibp
There was a problem hiding this comment.
7 issues found across 32 files
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="fila/fibp/primitives.py">
<violation number="1" location="fila/fibp/primitives.py:138">
P1: `read_string` is missing a bounds check for the declared length, so truncated frames can be parsed as valid data and move the cursor past the buffer.</violation>
<violation number="2" location="fila/fibp/primitives.py:146">
P1: `read_bytes` does not verify that the requested byte length is available before slicing, allowing truncated payloads to be accepted silently.</violation>
</file>
<file name="fila/conn.py">
<violation number="1" location="fila/conn.py:54">
P1: `ssl.SSLContext.load_cert_chain()` does not accept `certdata`/`keydata` keyword arguments. Its signature is `load_cert_chain(certfile, keyfile=None, password=None)` and it expects file paths, not in-memory bytes. This will raise `TypeError` at runtime when mutual TLS is configured.</violation>
</file>
<file name="fila/batcher.py">
<violation number="1" location="fila/batcher.py:94">
P1: Futures left unresolved when server returns fewer results than items sent. If `len(results) < len(items)`, the trailing items' futures will never complete, causing callers to hang indefinitely. Add a fallback after the loop to set an exception on any unmatched futures.</violation>
</file>
<file name="tests/test_client.py">
<violation number="1" location="tests/test_client.py:210">
P2: The auth rejection test is over-broad: catching `FilaError` allows unrelated errors to satisfy the assertion, so the test can pass without actually verifying unauthorized access.</violation>
</file>
<file name="fila/async_client.py">
<violation number="1" location="fila/async_client.py:73">
P2: The docstring references `await AsyncClient.create("localhost:5555")` but no `create` classmethod exists. Users following this example will get `AttributeError`. Should match the README which shows direct construction (`AsyncClient(...)`) or the context manager pattern.</violation>
</file>
<file name="fila/client.py">
<violation number="1" location="fila/client.py:298">
P2: `consumer_id` is accepted but never used — the generator has no cleanup path to call `cancel_consume(consumer_id)` when the caller stops iterating, leaving an orphaned server-side consumer until the connection closes.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
the initial codec used generic string maps for admin frames, but the protocol spec uses typed fields. this aligns all encode/decode functions with the actual wire format from docs/protocol.md: - create_queue: [string name][optional on_enqueue][optional on_failure][u64 timeout] - get_stats_result: typed fields (depth, in_flight, etc.) not a string map - list_queues_result: [u8 error][u32 nodes][u16 count][per: name, depth, ...] - set_config: [string key][string value] (not queue + map) - redrive: [string dlq][u64 count] (not source + dest + u32) - create_api_key: [string name][u64 expires][bool superadmin] - set_acl: [key_id][u16 count][per: kind, pattern] (not patterns list) - all result frames now decode their error_code prefix also updates types.py with properly structured StatsResult, QueueInfo, AclEntry, ApiKeyInfo to match the wire format fields.
- add pushback buffer to Connection/AsyncConnection for subscribe() to handle servers that send Delivery directly without ConsumeOk - fix mypy errors: annotate struct.unpack_from return values, fix _request_with_leader_retry return type to FrameHeader, move ssl import to TYPE_CHECKING block, remove unused make_ssl_context function
There was a problem hiding this comment.
1 issue found across 4 files (changes from recent commits).
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="fila/conn.py">
<violation number="1" location="fila/conn.py:194">
P2: `subscribe()` should only fallback-push back `DELIVERY` frames; it currently accepts any non-error opcode and can silently mask protocol mismatches.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
the hot-path opcodes were out of order. the server uses sequential assignment starting from 0x10: Enqueue(0x10), EnqueueResult(0x11), Consume(0x12), ConsumeOk(0x13), Delivery(0x14), CancelConsume(0x15), Ack(0x16), AckResult(0x17), Nack(0x18), NackResult(0x19).
There was a problem hiding this comment.
1 issue found across 2 files (changes from recent commits).
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="fila/client.py">
<violation number="1" location="fila/client.py:292">
P1: Decoding failures are converted to `ConnectionError`, then swallowed by the outer handler, causing the consumer stream to terminate silently.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
- add bounds checks in Reader.read_string() and read_bytes() to detect truncated frames instead of silently parsing corrupt data - add fallback in batcher _flush_many() to fail unresolved futures when server returns fewer results than items sent - remove diagnostic ConnectionError wrapper in _consume_iter that would silently swallow decode failures - tighten subscribe() pushback to only accept Delivery frames, raise ConnectionError for unexpected opcodes
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
fila/fibp/module with primitives (Reader/Writer), opcodes (42 opcodes, 18 error codes), and codec (encode/decode for all opcodes)fila/conn.pywith synchronousConnectionand asyncAsyncConnectionclasses handling TCP framing, handshake, keepalive (Ping/Pong), and continuation framesclient.pyandasync_client.pyto use FIBP connections with leader-hint reconnectionbatcher.py(Auto/Linger accumulators) to useConnectioninstead of gRPC stubsUnauthorizedError,ForbiddenError,NotLeaderError,ChannelFullError, etc.ConsumeMessagefields:weight,throttle_keys,enqueued_at,leased_atfila/v1/(generated protobuf),proto/directorygrpcioandprotobuffrom dependencies (zero external deps now)Test plan
python -m pytest tests/test_fibp.py-- 34 codec/primitives unit testspython -m pytest tests/test_batcher.py-- 12 batcher unit tests with mock connectionspython -m pytest tests/test_enqueue_integration.py::TestAccumulatorModeTypes-- 4 type unit testsruff check fila/ tests/-- all checks passSummary by cubic
Migrated the Python SDK from gRPC to the FIBP (Fila Binary Protocol) over TCP, aligning codecs and result types with the wire spec. Hardened frame parsing and consume behavior per review feedback.
New Features
enqueue_manyreturns per‑message results; background accumulators (AccumulatorMode.AUTO/Linger) run on the new connection.StatsResult,QueueInfo,AclEntry,ApiKeyInfo).error_code; errors map to typed exceptions (UnauthorizedError,ForbiddenError,NotLeaderError,ChannelFullError, etc.).ConsumeMessageaddsweight,throttle_keys,enqueued_at,leased_at.FilaError.Migration
grpcio/protobufand anyfila.v1.*orproto/*imports; the SDK has no external deps now.AccumulatorModeorLinger; defaults remain sensible.enqueue_manyresults; singleenqueuepreserves exception mapping viaEnqueueErrorand typed errors.Written for commit 17a36b0. Summary will update on new commits.