feat: replace grpc transport with fibp (fila binary protocol)#5
feat: replace grpc transport with fibp (fila binary protocol)#5vieiralucas merged 2 commits intomainfrom
Conversation
replace the grpc transport layer with a custom binary protocol (fibp) over raw tcp. fibp uses length-prefixed frames with a 6-byte handshake, per-operation correlation-id multiplexing, and a binary wire format for hot-path ops (enqueue, consume, ack, nack). admin ops retain protobuf payloads over fibp frames. - add fila/fibp.py: frame encoding/decoding, sync FibpConnection (background reader thread + future dispatch), async AsyncFibpConnection (asyncio reader task + asyncio.Queue dispatch), tls via ssl module - rewrite fila/client.py: uses FibpConnection, removes grpc dependency - rewrite fila/async_client.py: uses AsyncFibpConnection - rewrite fila/batcher.py: accumulator takes FibpConnection instead of grpc stub; multi-queue batches split into per-queue fibp frames - rewrite fila/errors.py: fibp error codes replace grpc status codes; add TransportError (RPCError aliased for backward compat) - update tests/conftest.py: server-readiness via fibp handshake; grpcio kept as dev dep for admin create_queue calls in fixtures - update tests/test_batcher.py: mock FibpConnection instead of grpc stub - update tests/test_client.py: auth error check uses fibp error codes - update pyproject.toml: grpcio moved to dev dep (not runtime) - update README.md: document fibp transport, remove grpc references public api surface is unchanged: Client, AsyncClient, enqueue, enqueue_many, consume, ack, nack, all error types preserved.
the dev-latest binary predates the fibp transport. when the fibp handshake receives an http/2 response (indicating a grpc-only server), skip the test with an informative message instead of failing the suite. integration tests will run as intended once a fibp-capable server binary is published to the dev-latest release.
There was a problem hiding this comment.
7 issues found across 11 files
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="fila/client.py">
<violation number="1" location="fila/client.py:279">
P2: The docstring claims the client "transparently reconnects to the leader address and retries once," but this reconnection logic was removed in the transport rewrite. Either implement the leader-hint handling over FIBP or update the docstring to reflect the current behavior.</violation>
<violation number="2" location="fila/client.py:312">
P1: Bare `except Exception: continue` silently drops messages that fail to decode. If the server sends a malformed frame or the protocol changes, consumed messages will vanish without any indication. At minimum, log the exception so operators can detect the data loss.</violation>
</file>
<file name="fila/async_client.py">
<violation number="1" location="fila/async_client.py:162">
P2: Independent per-queue requests in `enqueue_many` are awaited sequentially. Since these use separate correlation IDs and target different queues, they can be dispatched concurrently via `asyncio.gather` to avoid unnecessary serialisation in the async client.</violation>
<violation number="2" location="fila/async_client.py:265">
P1: Bare `except Exception: continue` silently drops messages that fail to decode, with no logging or metrics. This can hide protocol mismatches, corrupt frames, or bugs in `decode_consume_message`, making data loss invisible to operators. At minimum, log a warning with the exception before continuing.</violation>
</file>
<file name="fila/batcher.py">
<violation number="1" location="fila/batcher.py:102">
P2: Map FibpError via _map_enqueue_error_code here so batch enqueues preserve the same specific error types as single enqueues.</violation>
</file>
<file name="fila/fibp.py">
<violation number="1" location="fila/fibp.py:413">
P1: Missing write lock for socket sends — concurrent `sendall` calls from different threads can interleave frame bytes on the wire, corrupting the protocol stream. The async version correctly uses `_write_lock`; the sync version needs the same protection.</violation>
<violation number="2" location="fila/fibp.py:710">
P2: Unnecessary temp file for CA cert — `SSLContext.load_verify_locations` accepts PEM data directly via the `cadata` parameter, avoiding writing to disk: `ctx.load_verify_locations(cadata=ca_cert.decode('ascii'))`.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
| except Exception: | ||
| continue |
There was a problem hiding this comment.
P1: Bare except Exception: continue silently drops messages that fail to decode. If the server sends a malformed frame or the protocol changes, consumed messages will vanish without any indication. At minimum, log the exception so operators can detect the data loss.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At fila/client.py, line 312:
<comment>Bare `except Exception: continue` silently drops messages that fail to decode. If the server sends a malformed frame or the protocol changes, consumed messages will vanish without any indication. At minimum, log the exception so operators can detect the data loss.</comment>
<file context>
@@ -384,114 +287,84 @@ def consume(self, queue: str) -> Iterator[ConsumeMessage]:
+ msg_id, queue, headers, payload, fairness_key, attempt_count = (
+ decode_consume_message(body)
+ )
+ except Exception:
+ continue
+ yield ConsumeMessage(
</file context>
| except Exception: | |
| continue | |
| except Exception: # noqa: BLE001 | |
| import logging | |
| logging.getLogger(__name__).warning("failed to decode consume frame, skipping", exc_info=True) | |
| continue |
| msg_id, queue, headers, payload, fairness_key, attempt_count = ( | ||
| decode_consume_message(body) | ||
| ) | ||
| except Exception: |
There was a problem hiding this comment.
P1: Bare except Exception: continue silently drops messages that fail to decode, with no logging or metrics. This can hide protocol mismatches, corrupt frames, or bugs in decode_consume_message, making data loss invisible to operators. At minimum, log a warning with the exception before continuing.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At fila/async_client.py, line 265:
<comment>Bare `except Exception: continue` silently drops messages that fail to decode, with no logging or metrics. This can hide protocol mismatches, corrupt frames, or bugs in `decode_consume_message`, making data loss invisible to operators. At minimum, log a warning with the exception before continuing.</comment>
<file context>
@@ -253,190 +154,170 @@ async def enqueue(
+ msg_id, queue, headers, payload, fairness_key, attempt_count = (
+ decode_consume_message(body)
+ )
+ except Exception:
+ continue
+ yield ConsumeMessage(
</file context>
| fut: Future[bytes] = Future() | ||
| with self._lock: | ||
| self._pending[corr_id] = fut | ||
| self._sock.sendall(frame) |
There was a problem hiding this comment.
P1: Missing write lock for socket sends — concurrent sendall calls from different threads can interleave frame bytes on the wire, corrupting the protocol stream. The async version correctly uses _write_lock; the sync version needs the same protection.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At fila/fibp.py, line 413:
<comment>Missing write lock for socket sends — concurrent `sendall` calls from different threads can interleave frame bytes on the wire, corrupting the protocol stream. The async version correctly uses `_write_lock`; the sync version needs the same protection.</comment>
<file context>
@@ -0,0 +1,735 @@
+ fut: Future[bytes] = Future()
+ with self._lock:
+ self._pending[corr_id] = fut
+ self._sock.sendall(frame)
+ return fut
+
</file context>
| If the server returns UNAVAILABLE with an ``x-fila-leader-addr`` | ||
| trailing metadata entry, the client transparently reconnects to the | ||
| leader address and retries the consume call once. | ||
| If the server returns a leader-hint error, the client transparently |
There was a problem hiding this comment.
P2: The docstring claims the client "transparently reconnects to the leader address and retries once," but this reconnection logic was removed in the transport rewrite. Either implement the leader-hint handling over FIBP or update the docstring to reflect the current behavior.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At fila/client.py, line 279:
<comment>The docstring claims the client "transparently reconnects to the leader address and retries once," but this reconnection logic was removed in the transport rewrite. Either implement the leader-hint handling over FIBP or update the docstring to reflect the current behavior.</comment>
<file context>
@@ -298,83 +187,97 @@ def enqueue(
- If the server returns UNAVAILABLE with an ``x-fila-leader-addr``
- trailing metadata entry, the client transparently reconnects to the
- leader address and retries the consume call once.
+ If the server returns a leader-hint error, the client transparently
+ reconnects to the leader address and retries once.
</file context>
| ) | ||
| except grpc.RpcError as e: | ||
| raise _map_enqueue_error(e) from e | ||
| body = await self._conn.send_request(frame, corr_id) |
There was a problem hiding this comment.
P2: Independent per-queue requests in enqueue_many are awaited sequentially. Since these use separate correlation IDs and target different queues, they can be dispatched concurrently via asyncio.gather to avoid unnecessary serialisation in the async client.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At fila/async_client.py, line 162:
<comment>Independent per-queue requests in `enqueue_many` are awaited sequentially. Since these use separate correlation IDs and target different queues, they can be dispatched concurrently via `asyncio.gather` to avoid unnecessary serialisation in the async client.</comment>
<file context>
@@ -253,190 +154,170 @@ async def enqueue(
- )
- except grpc.RpcError as e:
- raise _map_enqueue_error(e) from e
+ body = await self._conn.send_request(frame, corr_id)
+ except FibpError as e:
+ raise _map_fibp_error(e.code, e.message) from e
</file context>
| err = EnqueueError(f"enqueue rpc failed: {e.details()}") | ||
| body = conn.send_request(frame, corr_id).result() | ||
| except FibpError as e: | ||
| err = EnqueueError(f"enqueue transport error: {e.message}") |
There was a problem hiding this comment.
P2: Map FibpError via _map_enqueue_error_code here so batch enqueues preserve the same specific error types as single enqueues.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At fila/batcher.py, line 102:
<comment>Map FibpError via _map_enqueue_error_code here so batch enqueues preserve the same specific error types as single enqueues.</comment>
<file context>
@@ -24,65 +26,80 @@
- err = EnqueueError(f"enqueue rpc failed: {e.details()}")
+ body = conn.send_request(frame, corr_id).result()
+ except FibpError as e:
+ err = EnqueueError(f"enqueue transport error: {e.message}")
for item in items:
item.future.set_exception(err)
</file context>
| err = EnqueueError(f"enqueue transport error: {e.message}") | |
| err = _map_enqueue_error_code(e.code, e.message) |
| # Write CA cert to a temp file (SSLContext only accepts file paths). | ||
| with tempfile.NamedTemporaryFile(delete=False, suffix=".pem") as f: | ||
| f.write(ca_cert) | ||
| ca_path = f.name |
There was a problem hiding this comment.
P2: Unnecessary temp file for CA cert — SSLContext.load_verify_locations accepts PEM data directly via the cadata parameter, avoiding writing to disk: ctx.load_verify_locations(cadata=ca_cert.decode('ascii')).
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At fila/fibp.py, line 710:
<comment>Unnecessary temp file for CA cert — `SSLContext.load_verify_locations` accepts PEM data directly via the `cadata` parameter, avoiding writing to disk: `ctx.load_verify_locations(cadata=ca_cert.decode('ascii'))`.</comment>
<file context>
@@ -0,0 +1,735 @@
+ # Write CA cert to a temp file (SSLContext only accepts file paths).
+ with tempfile.NamedTemporaryFile(delete=False, suffix=".pem") as f:
+ f.write(ca_cert)
+ ca_path = f.name
+ try:
+ ctx.load_verify_locations(ca_path)
</file context>
Summary
fila/fibp.py: full FIBP implementation (frame encoding/decoding, sync + async connections, TLS viasslmodule, correlation-ID multiplexing)client.py,async_client.py,batcher.py, anderrors.pyto use the new transport; public API is unchangedgrpcioas a runtime dependency (kept as dev dep for admin fixtures in tests)FibpConnection, conftest readiness check uses FIBP handshakeProtocol details
[4-byte big-endian length][flags:u8][op:u8][corr_id:u32][body...]FIBP\x01\x00, server echoes same 6 bytesOP_AUTH(0x30) frame sent immediately after handshake whenapi_keyis setstruct.pack/unpackssl.SSLContextwraps the socket; system trust store or custom CA + mTLS supportedTest plan
test_batcher.py,TestAccumulatorModeTypes) — verified locallyruff check .— cleanmypy fila/— clean🤖 Generated with Claude Code
Summary by cubic
Replaced the gRPC transport with FIBP (Fila Binary Protocol) over raw TCP/TLS for the Python client. This removes the
grpcioruntime dependency and enables faster, multiplexed requests without changing the public API.New Features
fila/fibp.py: frame encode/decode, sync/async connections, correlation-ID multiplexing, and TLS via the stdlibsslmodule.protobufpayloads over FIBP frames.Refactors
client.py,async_client.py, andbatcher.pywith FIBP.TransportErrorand mapped FIBP error codes; keptRPCErroras an alias for backward compatibility.grpcioto a dev-only dependency; updated tests and README accordingly.Written for commit 6e769c8. Summary will update on new commits.