Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 34 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

Python client SDK for the [Fila](https://github.com/faiscadev/fila) message broker.

Uses the FIBP (Fila Binary Protocol) transport — a length-prefixed binary
framing protocol over raw TCP, replacing the previous gRPC transport.

## Installation

```bash
Expand Down Expand Up @@ -102,7 +105,22 @@ client = Client(
)
```

The API key is sent as `authorization: Bearer <key>` metadata on every RPC.
The API key is sent as a FIBP AUTH frame immediately after the handshake.

## Accumulator Modes

```python
from fila import Client, AccumulatorMode, Linger

# AUTO (default): opportunistic accumulation via background thread.
client = Client("localhost:5555")

# DISABLED: each enqueue() sends a direct FIBP frame.
client = Client("localhost:5555", accumulator_mode=AccumulatorMode.DISABLED)

# LINGER: timer-based forced accumulation.
client = Client("localhost:5555", accumulator_mode=Linger(linger_ms=10, max_messages=100))
```

## API

Expand All @@ -114,24 +132,31 @@ Connect to a Fila broker. Both support context manager protocol.

Enqueue a message. Returns the broker-assigned message ID.

### `client.enqueue_many(messages) -> list[EnqueueResult]`

Enqueue multiple messages in one call. `messages` is a list of
`(queue, headers, payload)` tuples. Returns per-message results.

### `client.consume(queue) -> Iterator[ConsumeMessage]`

Open a streaming consumer. Returns an iterator (sync) or async iterator (async) that yields messages as they become available.
Open a streaming consumer. Returns an iterator (sync) or async iterator (async)
that yields messages as they become available.

### `client.ack(queue, msg_id)`

Acknowledge a successfully processed message. The message is permanently removed.

### `client.nack(queue, msg_id, error)`

Negatively acknowledge a failed message. The message is requeued or routed to the dead-letter queue based on the queue's configuration.
Negatively acknowledge a failed message. The message is requeued or routed to
the dead-letter queue based on the queue's configuration.

## Error Handling

Per-operation exception classes:

```python
from fila import QueueNotFoundError, MessageNotFoundError
from fila import QueueNotFoundError, MessageNotFoundError, TransportError

try:
client.enqueue("missing-queue", None, b"test")
Expand All @@ -142,6 +167,11 @@ try:
client.ack("my-queue", "missing-id")
except MessageNotFoundError:
print("Message does not exist")

try:
client.enqueue("my-queue", None, b"test")
except TransportError as e:
print(f"Transport error (code={e.code}): {e.message}")
```

## License
Expand Down
2 changes: 2 additions & 0 deletions fila/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
MessageNotFoundError,
QueueNotFoundError,
RPCError,
TransportError,
)
from fila.types import AccumulatorMode, ConsumeMessage, EnqueueResult, Linger

Expand All @@ -23,4 +24,5 @@
"MessageNotFoundError",
"QueueNotFoundError",
"RPCError",
"TransportError",
]
Loading
Loading