Skip to content

feat: migrate python sdk from grpc to fibp binary protocol#11

Open
vieiralucas wants to merge 19 commits intomainfrom
feat/21.2-binary-protocol
Open

feat: migrate python sdk from grpc to fibp binary protocol#11
vieiralucas wants to merge 19 commits intomainfrom
feat/21.2-binary-protocol

Conversation

@vieiralucas
Copy link
Copy Markdown
Member

@vieiralucas vieiralucas commented Apr 4, 2026

Summary

  • Replace entire gRPC transport layer with native FIBP (Fila Binary Protocol) implementation
  • Add fila/fibp/ module with primitives (Reader/Writer), opcodes (42 opcodes, 18 error codes), and codec (encode/decode for all opcodes)
  • Add fila/conn.py with synchronous Connection and async AsyncConnection classes handling TCP framing, handshake, keepalive (Ping/Pong), and continuation frames
  • Rewrite client.py and async_client.py to use FIBP connections with leader-hint reconnection
  • Rewrite batcher.py (Auto/Linger accumulators) to use Connection instead of gRPC stubs
  • Add admin methods (create/delete queue, stats, config, redrive) and auth methods (API key CRUD, ACL management) to both sync and async clients
  • Add new error types mapped from FIBP error codes: UnauthorizedError, ForbiddenError, NotLeaderError, ChannelFullError, etc.
  • Add ConsumeMessage fields: weight, throttle_keys, enqueued_at, leased_at
  • Delete fila/v1/ (generated protobuf), proto/ directory
  • Remove grpcio and protobuf from dependencies (zero external deps now)
  • Bump version to 0.3.0
  • 49 unit tests passing (34 new codec/primitives tests + rewritten batcher/type tests)

Test plan

  • python -m pytest tests/test_fibp.py -- 34 codec/primitives unit tests
  • python -m pytest tests/test_batcher.py -- 12 batcher unit tests with mock connections
  • python -m pytest tests/test_enqueue_integration.py::TestAccumulatorModeTypes -- 4 type unit tests
  • ruff check fila/ tests/ -- all checks pass
  • Integration tests (test_client.py, test_enqueue_integration.py) require a running fila-server with FIBP support

Summary by cubic

Migrated the Python SDK from gRPC to the FIBP (Fila Binary Protocol) over TCP, aligning codecs and result types with the wire spec. Hardened frame parsing and consume behavior per review feedback.

  • New Features

    • Native FIBP codec and sync/async connections with handshake, keepalive, continuation frames.
    • Clients speak FIBP with leader‑hint reconnect; consume handles Delivery‑before‑ConsumeOk via a pushback buffer.
    • enqueue_many returns per‑message results; background accumulators (AccumulatorMode.AUTO/Linger) run on the new connection.
    • Admin/auth: queue CRUD, stats/config/redrive, API key CRUD, ACL management (sync and async) with spec‑aligned codecs and typed results (StatsResult, QueueInfo, AclEntry, ApiKeyInfo).
    • All result frames decode error_code; errors map to typed exceptions (UnauthorizedError, ForbiddenError, NotLeaderError, ChannelFullError, etc.).
    • ConsumeMessage adds weight, throttle_keys, enqueued_at, leased_at.
    • Delivery decode diagnostics added; mypy dict types tightened.
    • Correct hot‑path opcode assignments: Enqueue(0x10), EnqueueResult(0x11), Consume(0x12), ConsumeOk(0x13), Delivery(0x14), CancelConsume(0x15), Ack(0x16), AckResult(0x17), Nack(0x18), NackResult(0x19).
    • Hardening: bounds checks in string/bytes readers to catch truncated frames; batcher now fails unresolved futures if server returns fewer results; removed wrapper that swallowed decode errors in consume; subscribe pushback only accepts Delivery and raises on unexpected opcodes; broadened auth rejection test to accept any FilaError.
    • Version bumped to 0.3.0.
  • Migration

    • Requires a FIBP‑enabled fila‑server on port 5555.
    • Remove grpcio/protobuf and any fila.v1.* or proto/* imports; the SDK has no external deps now.
    • API key is sent during the FIBP handshake (not via gRPC metadata).
    • If you rely on batching, use AccumulatorMode or Linger; defaults remain sensible.
    • For per‑message enqueue errors, read them from enqueue_many results; single enqueue preserves exception mapping via EnqueueError and typed errors.

Written for commit 17a36b0. Summary will update on new commits.

feat: transparent leader hint reconnect on consume
add batch_enqueue() for explicit multi-message RPCs, smart batching via
BatchMode (AUTO/DISABLED/Linger) that routes enqueue() through a background
batcher thread, and delivery batching that unpacks ConsumeResponse.messages
repeated field. update proto to include BatchEnqueue RPC and ConsumeResponse
batched messages field. single-item optimization uses singular Enqueue RPC
to preserve error types. close() drains pending messages before disconnecting.
Update Python SDK for the unified proto API:
- Enqueue RPC now uses repeated EnqueueMessage/EnqueueResult
- BatchEnqueue RPC removed; enqueue_many() replaces batch_enqueue()
- Ack/Nack use repeated AckMessage/NackMessage with per-message results
- ConsumeResponse only has repeated messages field
- BatchMode renamed to AccumulatorMode, BatchEnqueueResult to EnqueueResult
- BatchEnqueueError renamed to EnqueueError
- Linger.batch_size renamed to Linger.max_messages
- Per-message error codes mapped to typed SDK exceptions
  (e.g. ENQUEUE_ERROR_CODE_QUEUE_NOT_FOUND -> QueueNotFoundError)
- All 31 tests pass (16 unit, 15 integration)
feat: 30.2 — unified api surface
- remove unused imports (EnqueueError in client/async_client, Any in
  batcher, threading in test_batcher) — fixes ruff F401
- move MessageNotFoundError/RPCError to top-level imports, eliminating
  inline imports that triggered ruff I001 (unsorted import blocks)
- move ConsumeMessage/EnqueueResult into TYPE_CHECKING block in
  async_client — fixes ruff TC001
- fix EnqueueError docstring to reflect that it is also raised as
  fallback for per-message errors via _map_enqueue_result_error
- strengthen test_flush_single to assert actual request content sent to
  Enqueue, not just call count
- downgrade GRPC_GENERATED_VERSION from 1.78.1 to 1.78.0 in all
  generated grpc stubs (grpcio 1.78.1 was yanked from PyPI)
replace the entire grpc transport layer with a native fibp implementation.
the sdk now communicates directly over tcp using the fila binary protocol,
removing the grpcio and protobuf dependencies entirely.

- add fila/fibp/ module: primitives (Reader/Writer), opcodes, codec
- add fila/conn.py: sync Connection and async AsyncConnection classes
- rewrite client.py and async_client.py to use fibp connections
- rewrite batcher.py to use Connection instead of grpc stubs
- rewrite errors.py with fibp error code mapping (18 error codes)
- add admin methods: create/delete queue, stats, config, redrive
- add auth methods: create/revoke api key, acl management
- add new error types: UnauthorizedError, ForbiddenError, NotLeaderError, etc.
- add ConsumeMessage fields: weight, throttle_keys, enqueued_at, leased_at
- delete fila/v1/ (generated protobuf) and proto/ directories
- remove grpcio/protobuf from dependencies, bump version to 0.3.0
- add test_fibp.py with 34 codec/primitives unit tests
- rewrite test_batcher.py for fibp mock connections
- update integration tests and conftest.py for fibp
Copy link
Copy Markdown

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7 issues found across 32 files

Prompt for AI agents (unresolved issues)

Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.


<file name="fila/fibp/primitives.py">

<violation number="1" location="fila/fibp/primitives.py:138">
P1: `read_string` is missing a bounds check for the declared length, so truncated frames can be parsed as valid data and move the cursor past the buffer.</violation>

<violation number="2" location="fila/fibp/primitives.py:146">
P1: `read_bytes` does not verify that the requested byte length is available before slicing, allowing truncated payloads to be accepted silently.</violation>
</file>

<file name="fila/conn.py">

<violation number="1" location="fila/conn.py:54">
P1: `ssl.SSLContext.load_cert_chain()` does not accept `certdata`/`keydata` keyword arguments. Its signature is `load_cert_chain(certfile, keyfile=None, password=None)` and it expects file paths, not in-memory bytes. This will raise `TypeError` at runtime when mutual TLS is configured.</violation>
</file>

<file name="fila/batcher.py">

<violation number="1" location="fila/batcher.py:94">
P1: Futures left unresolved when server returns fewer results than items sent. If `len(results) < len(items)`, the trailing items' futures will never complete, causing callers to hang indefinitely. Add a fallback after the loop to set an exception on any unmatched futures.</violation>
</file>

<file name="tests/test_client.py">

<violation number="1" location="tests/test_client.py:210">
P2: The auth rejection test is over-broad: catching `FilaError` allows unrelated errors to satisfy the assertion, so the test can pass without actually verifying unauthorized access.</violation>
</file>

<file name="fila/async_client.py">

<violation number="1" location="fila/async_client.py:73">
P2: The docstring references `await AsyncClient.create("localhost:5555")` but no `create` classmethod exists. Users following this example will get `AttributeError`. Should match the README which shows direct construction (`AsyncClient(...)`) or the context manager pattern.</violation>
</file>

<file name="fila/client.py">

<violation number="1" location="fila/client.py:298">
P2: `consumer_id` is accepted but never used — the generator has no cleanup path to call `cancel_consume(consumer_id)` when the caller stops iterating, leaving an orphaned server-side consumer until the connection closes.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

Comment thread fila/fibp/primitives.py
Comment thread fila/fibp/primitives.py
Comment thread fila/conn.py Outdated
Comment thread fila/batcher.py
Comment thread tests/test_client.py Outdated
Comment thread fila/async_client.py Outdated
Comment thread fila/client.py
the initial codec used generic string maps for admin frames, but the
protocol spec uses typed fields. this aligns all encode/decode functions
with the actual wire format from docs/protocol.md:

- create_queue: [string name][optional on_enqueue][optional on_failure][u64 timeout]
- get_stats_result: typed fields (depth, in_flight, etc.) not a string map
- list_queues_result: [u8 error][u32 nodes][u16 count][per: name, depth, ...]
- set_config: [string key][string value] (not queue + map)
- redrive: [string dlq][u64 count] (not source + dest + u32)
- create_api_key: [string name][u64 expires][bool superadmin]
- set_acl: [key_id][u16 count][per: kind, pattern] (not patterns list)
- all result frames now decode their error_code prefix

also updates types.py with properly structured StatsResult, QueueInfo,
AclEntry, ApiKeyInfo to match the wire format fields.
- add pushback buffer to Connection/AsyncConnection for subscribe() to
  handle servers that send Delivery directly without ConsumeOk
- fix mypy errors: annotate struct.unpack_from return values, fix
  _request_with_leader_retry return type to FrameHeader, move ssl import
  to TYPE_CHECKING block, remove unused make_ssl_context function
Copy link
Copy Markdown

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 issue found across 4 files (changes from recent commits).

Prompt for AI agents (unresolved issues)

Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.


<file name="fila/conn.py">

<violation number="1" location="fila/conn.py:194">
P2: `subscribe()` should only fallback-push back `DELIVERY` frames; it currently accepts any non-error opcode and can silently mask protocol mismatches.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

Comment thread fila/conn.py Outdated
the hot-path opcodes were out of order. the server uses sequential
assignment starting from 0x10: Enqueue(0x10), EnqueueResult(0x11),
Consume(0x12), ConsumeOk(0x13), Delivery(0x14), CancelConsume(0x15),
Ack(0x16), AckResult(0x17), Nack(0x18), NackResult(0x19).
Copy link
Copy Markdown

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 issue found across 2 files (changes from recent commits).

Prompt for AI agents (unresolved issues)

Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.


<file name="fila/client.py">

<violation number="1" location="fila/client.py:292">
P1: Decoding failures are converted to `ConnectionError`, then swallowed by the outer handler, causing the consumer stream to terminate silently.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

Comment thread fila/client.py Outdated
- add bounds checks in Reader.read_string() and read_bytes() to detect
  truncated frames instead of silently parsing corrupt data
- add fallback in batcher _flush_many() to fail unresolved futures when
  server returns fewer results than items sent
- remove diagnostic ConnectionError wrapper in _consume_iter that would
  silently swallow decode failures
- tighten subscribe() pushback to only accept Delivery frames, raise
  ConnectionError for unexpected opcodes
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant