diff --git a/proto/fila/v1/service.proto b/proto/fila/v1/service.proto
index f14fdd0..fc0f710 100644
--- a/proto/fila/v1/service.proto
+++ b/proto/fila/v1/service.proto
@@ -6,6 +6,7 @@ import "fila/v1/messages.proto";
// Hot-path RPCs for producers and consumers.
service FilaService {
rpc Enqueue(EnqueueRequest) returns (EnqueueResponse);
+ rpc BatchEnqueue(BatchEnqueueRequest) returns (BatchEnqueueResponse);
rpc Consume(ConsumeRequest) returns (stream ConsumeResponse);
rpc Ack(AckRequest) returns (AckResponse);
rpc Nack(NackRequest) returns (NackResponse);
@@ -26,7 +27,8 @@ message ConsumeRequest {
}
message ConsumeResponse {
- Message message = 1;
+ Message message = 1; // Single message (backward compatible, used when batch size is 1)
+ repeated Message messages = 2; // Batched messages (populated when server sends multiple at once)
}
message AckRequest {
@@ -43,3 +45,18 @@ message NackRequest {
}
message NackResponse {}
+
+message BatchEnqueueRequest {
+ repeated EnqueueRequest messages = 1;
+}
+
+message BatchEnqueueResponse {
+ repeated BatchEnqueueResult results = 1;
+}
+
+message BatchEnqueueResult {
+ oneof result {
+ EnqueueResponse success = 1;
+ string error = 2;
+ }
+}
diff --git a/src/main/java/dev/faisca/fila/BatchEnqueueResult.java b/src/main/java/dev/faisca/fila/BatchEnqueueResult.java
new file mode 100644
index 0000000..eab5f50
--- /dev/null
+++ b/src/main/java/dev/faisca/fila/BatchEnqueueResult.java
@@ -0,0 +1,57 @@
+package dev.faisca.fila;
+
+/**
+ * The result of a single message within a batch enqueue call.
+ *
+ *
Each message in a batch is independently validated and processed. A failed message does not
+ * affect the others. Use {@link #isSuccess()} to check the outcome, then either {@link
+ * #getMessageId()} or {@link #getError()}.
+ */
+public final class BatchEnqueueResult {
+ private final String messageId;
+ private final String error;
+
+ private BatchEnqueueResult(String messageId, String error) {
+ this.messageId = messageId;
+ this.error = error;
+ }
+
+ /** Create a successful result with the broker-assigned message ID. */
+ static BatchEnqueueResult success(String messageId) {
+ return new BatchEnqueueResult(messageId, null);
+ }
+
+ /** Create a failed result with an error description. */
+ static BatchEnqueueResult error(String error) {
+ return new BatchEnqueueResult(null, error);
+ }
+
+ /** Returns true if the message was successfully enqueued. */
+ public boolean isSuccess() {
+ return messageId != null;
+ }
+
+ /**
+ * Returns the broker-assigned message ID.
+ *
+ * @throws IllegalStateException if this result is an error
+ */
+ public String getMessageId() {
+ if (messageId == null) {
+ throw new IllegalStateException("result is an error: " + error);
+ }
+ return messageId;
+ }
+
+ /**
+ * Returns the error description.
+ *
+ * @throws IllegalStateException if this result is a success
+ */
+ public String getError() {
+ if (error == null) {
+ throw new IllegalStateException("result is a success");
+ }
+ return error;
+ }
+}
diff --git a/src/main/java/dev/faisca/fila/BatchMode.java b/src/main/java/dev/faisca/fila/BatchMode.java
new file mode 100644
index 0000000..0be14fc
--- /dev/null
+++ b/src/main/java/dev/faisca/fila/BatchMode.java
@@ -0,0 +1,93 @@
+package dev.faisca.fila;
+
+/**
+ * Controls how the SDK batches {@link FilaClient#enqueue} calls.
+ *
+ *
The default is {@link #auto()} -- opportunistic batching that requires zero configuration. At
+ * low load each message is sent individually (zero added latency). At high load messages accumulate
+ * naturally and are flushed together.
+ */
+public final class BatchMode {
+ enum Kind {
+ AUTO,
+ LINGER,
+ DISABLED
+ }
+
+ private final Kind kind;
+ private final int maxBatchSize;
+ private final long lingerMs;
+
+ private BatchMode(Kind kind, int maxBatchSize, long lingerMs) {
+ this.kind = kind;
+ this.maxBatchSize = maxBatchSize;
+ this.lingerMs = lingerMs;
+ }
+
+ /**
+ * Opportunistic batching (default).
+ *
+ *
A background thread blocks for the first message, then drains any additional messages that
+ * arrived concurrently. At low load each message is sent individually. At high load messages
+ * accumulate naturally into batches. Zero config, zero latency penalty.
+ *
+ * @return a new AUTO batch mode with default max batch size (100)
+ */
+ public static BatchMode auto() {
+ return new BatchMode(Kind.AUTO, 100, 0);
+ }
+
+ /**
+ * Opportunistic batching with a custom max batch size.
+ *
+ * @param maxBatchSize safety cap on batch size
+ * @return a new AUTO batch mode
+ */
+ public static BatchMode auto(int maxBatchSize) {
+ if (maxBatchSize < 1) {
+ throw new IllegalArgumentException("maxBatchSize must be >= 1");
+ }
+ return new BatchMode(Kind.AUTO, maxBatchSize, 0);
+ }
+
+ /**
+ * Timer-based forced batching.
+ *
+ *
Buffers messages and flushes when either {@code batchSize} messages accumulate or {@code
+ * lingerMs} milliseconds elapse since the first message in the batch -- whichever comes first.
+ *
+ * @param lingerMs time threshold in milliseconds before a partial batch is flushed
+ * @param batchSize maximum messages per batch
+ * @return a new LINGER batch mode
+ */
+ public static BatchMode linger(long lingerMs, int batchSize) {
+ if (lingerMs < 1) {
+ throw new IllegalArgumentException("lingerMs must be >= 1");
+ }
+ if (batchSize < 1) {
+ throw new IllegalArgumentException("batchSize must be >= 1");
+ }
+ return new BatchMode(Kind.LINGER, batchSize, lingerMs);
+ }
+
+ /**
+ * No batching. Each {@link FilaClient#enqueue} call is a direct single-message RPC.
+ *
+ * @return a DISABLED batch mode
+ */
+ public static BatchMode disabled() {
+ return new BatchMode(Kind.DISABLED, 0, 0);
+ }
+
+ Kind getKind() {
+ return kind;
+ }
+
+ int getMaxBatchSize() {
+ return maxBatchSize;
+ }
+
+ long getLingerMs() {
+ return lingerMs;
+ }
+}
diff --git a/src/main/java/dev/faisca/fila/Batcher.java b/src/main/java/dev/faisca/fila/Batcher.java
new file mode 100644
index 0000000..486f745
--- /dev/null
+++ b/src/main/java/dev/faisca/fila/Batcher.java
@@ -0,0 +1,288 @@
+package dev.faisca.fila;
+
+import fila.v1.FilaServiceGrpc;
+import fila.v1.Service;
+import io.grpc.StatusRuntimeException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Background batcher that coalesces individual enqueue calls into batch RPCs.
+ *
+ *
Supports two modes: AUTO (opportunistic, Nagle-style) and LINGER (timer-based). The batcher
+ * runs on a dedicated daemon thread and flushes RPCs on an executor pool.
+ */
+final class Batcher {
+ private final LinkedBlockingQueue queue = new LinkedBlockingQueue<>();
+ private final AtomicBoolean running = new AtomicBoolean(true);
+ private final FilaServiceGrpc.FilaServiceBlockingStub stub;
+ private final BatchMode mode;
+ private final Thread batcherThread;
+ private final ExecutorService flushExecutor;
+ private final ScheduledExecutorService scheduler;
+
+ static final class BatchItem {
+ final EnqueueMessage message;
+ final CompletableFuture future;
+
+ BatchItem(EnqueueMessage message, CompletableFuture future) {
+ this.message = message;
+ this.future = future;
+ }
+ }
+
+ Batcher(FilaServiceGrpc.FilaServiceBlockingStub stub, BatchMode mode) {
+ this.stub = stub;
+ this.mode = mode;
+ this.flushExecutor = Executors.newCachedThreadPool(r -> newDaemon(r, "fila-batch-flush"));
+ this.scheduler =
+ mode.getKind() == BatchMode.Kind.LINGER
+ ? Executors.newSingleThreadScheduledExecutor(r -> newDaemon(r, "fila-batch-scheduler"))
+ : null;
+
+ this.batcherThread =
+ new Thread(
+ mode.getKind() == BatchMode.Kind.AUTO ? this::runAuto : this::runLinger,
+ "fila-batcher");
+ this.batcherThread.setDaemon(true);
+ this.batcherThread.start();
+ }
+
+ /**
+ * Submit a message for batched enqueuing.
+ *
+ * @return a future that completes with the message ID or fails with a FilaException
+ */
+ CompletableFuture submit(EnqueueMessage message) {
+ CompletableFuture future = new CompletableFuture<>();
+ if (!running.get()) {
+ future.completeExceptionally(new FilaException("batcher is shut down"));
+ return future;
+ }
+ queue.add(new BatchItem(message, future));
+ return future;
+ }
+
+ /** Drain pending messages and shut down. Blocks until all pending flushes complete. */
+ void shutdown() {
+ running.set(false);
+ batcherThread.interrupt();
+ try {
+ batcherThread.join(5000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+
+ // Drain any remaining items in the queue.
+ List remaining = new ArrayList<>();
+ queue.drainTo(remaining);
+ if (!remaining.isEmpty()) {
+ flushBatch(remaining);
+ }
+
+ flushExecutor.shutdown();
+ try {
+ if (!flushExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+ flushExecutor.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ flushExecutor.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
+
+ if (scheduler != null) {
+ scheduler.shutdown();
+ try {
+ if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
+ scheduler.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ scheduler.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ /** AUTO mode: block for first message, drain any additional, flush concurrently. */
+ private void runAuto() {
+ int maxBatchSize = mode.getMaxBatchSize();
+ while (running.get()) {
+ try {
+ BatchItem first = queue.take();
+ List batch = new ArrayList<>();
+ batch.add(first);
+ queue.drainTo(batch, maxBatchSize - 1);
+
+ List toFlush = List.copyOf(batch);
+ flushExecutor.submit(() -> flushBatch(toFlush));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return;
+ }
+ }
+ }
+
+ /** LINGER mode: buffer messages and flush when batch is full or linger timer fires. */
+ private void runLinger() {
+ int batchSize = mode.getMaxBatchSize();
+ long lingerMs = mode.getLingerMs();
+ List buffer = new ArrayList<>();
+ ScheduledFuture> lingerTimer = null;
+
+ while (running.get()) {
+ try {
+ if (buffer.isEmpty()) {
+ // Block for first message.
+ BatchItem item = queue.take();
+ buffer.add(item);
+
+ if (buffer.size() >= batchSize) {
+ List toFlush = List.copyOf(buffer);
+ buffer.clear();
+ flushExecutor.submit(() -> flushBatch(toFlush));
+ } else {
+ // Start linger timer.
+ final List timerBuffer = buffer;
+ lingerTimer =
+ scheduler.schedule(
+ () -> {
+ // Signal the batcher thread to flush by adding a poison pill.
+ // The actual flush happens in the main loop.
+ batcherThread.interrupt();
+ },
+ lingerMs,
+ TimeUnit.MILLISECONDS);
+ }
+ } else {
+ // Buffer has items -- wait for more or timer expiry.
+ BatchItem item = queue.poll(lingerMs, TimeUnit.MILLISECONDS);
+ if (item != null) {
+ buffer.add(item);
+ // Drain any additional available items.
+ queue.drainTo(buffer, batchSize - buffer.size());
+ }
+
+ if (buffer.size() >= batchSize || item == null) {
+ if (lingerTimer != null) {
+ lingerTimer.cancel(false);
+ lingerTimer = null;
+ }
+ List toFlush = List.copyOf(buffer);
+ buffer.clear();
+ flushExecutor.submit(() -> flushBatch(toFlush));
+ }
+ }
+ } catch (InterruptedException e) {
+ // Timer or shutdown interrupt -- flush what we have.
+ if (!buffer.isEmpty()) {
+ if (lingerTimer != null) {
+ lingerTimer.cancel(false);
+ lingerTimer = null;
+ }
+ List toFlush = List.copyOf(buffer);
+ buffer.clear();
+ flushExecutor.submit(() -> flushBatch(toFlush));
+ }
+ if (!running.get()) {
+ Thread.currentThread().interrupt();
+ return;
+ }
+ }
+ }
+ }
+
+ /**
+ * Flush a batch of messages. Uses single-message Enqueue RPC for 1 message (preserves error
+ * types), BatchEnqueue for 2+ messages.
+ */
+ private void flushBatch(List items) {
+ if (items.isEmpty()) {
+ return;
+ }
+
+ if (items.size() == 1) {
+ flushSingle(items.get(0));
+ return;
+ }
+
+ flushMultiple(items);
+ }
+
+ /** Single-item optimization: use regular Enqueue RPC for exact error semantics. */
+ private void flushSingle(BatchItem item) {
+ Service.EnqueueRequest req =
+ Service.EnqueueRequest.newBuilder()
+ .setQueue(item.message.getQueue())
+ .putAllHeaders(item.message.getHeaders())
+ .setPayload(com.google.protobuf.ByteString.copyFrom(item.message.getPayload()))
+ .build();
+ try {
+ Service.EnqueueResponse resp = stub.enqueue(req);
+ item.future.complete(resp.getMessageId());
+ } catch (StatusRuntimeException e) {
+ item.future.completeExceptionally(FilaClient.mapEnqueueError(e));
+ }
+ }
+
+ /** Multi-item flush: use BatchEnqueue RPC for amortized overhead. */
+ private void flushMultiple(List items) {
+ Service.BatchEnqueueRequest.Builder reqBuilder = Service.BatchEnqueueRequest.newBuilder();
+ for (BatchItem item : items) {
+ reqBuilder.addMessages(
+ Service.EnqueueRequest.newBuilder()
+ .setQueue(item.message.getQueue())
+ .putAllHeaders(item.message.getHeaders())
+ .setPayload(com.google.protobuf.ByteString.copyFrom(item.message.getPayload()))
+ .build());
+ }
+
+ try {
+ Service.BatchEnqueueResponse resp = stub.batchEnqueue(reqBuilder.build());
+ List results = resp.getResultsList();
+
+ for (int i = 0; i < items.size(); i++) {
+ BatchItem item = items.get(i);
+ if (i < results.size()) {
+ Service.BatchEnqueueResult result = results.get(i);
+ switch (result.getResultCase()) {
+ case SUCCESS:
+ item.future.complete(result.getSuccess().getMessageId());
+ break;
+ case ERROR:
+ item.future.completeExceptionally(
+ new RpcException(io.grpc.Status.Code.INTERNAL, result.getError()));
+ break;
+ default:
+ item.future.completeExceptionally(
+ new RpcException(io.grpc.Status.Code.INTERNAL, "no result from server"));
+ break;
+ }
+ } else {
+ item.future.completeExceptionally(
+ new RpcException(
+ io.grpc.Status.Code.INTERNAL,
+ "server returned fewer results than messages sent"));
+ }
+ }
+ } catch (StatusRuntimeException e) {
+ FilaException mapped = FilaClient.mapBatchEnqueueError(e);
+ for (BatchItem item : items) {
+ item.future.completeExceptionally(mapped);
+ }
+ }
+ }
+
+ private static Thread newDaemon(Runnable r, String name) {
+ Thread t = new Thread(r, name);
+ t.setDaemon(true);
+ return t;
+ }
+}
diff --git a/src/main/java/dev/faisca/fila/EnqueueMessage.java b/src/main/java/dev/faisca/fila/EnqueueMessage.java
new file mode 100644
index 0000000..767de30
--- /dev/null
+++ b/src/main/java/dev/faisca/fila/EnqueueMessage.java
@@ -0,0 +1,43 @@
+package dev.faisca.fila;
+
+import java.util.Map;
+
+/**
+ * A message to be enqueued via {@link FilaClient#batchEnqueue(java.util.List)}.
+ *
+ * Each message specifies its target queue, headers, and payload independently, allowing a single
+ * batch to target multiple queues.
+ */
+public final class EnqueueMessage {
+ private final String queue;
+ private final Map headers;
+ private final byte[] payload;
+
+ /**
+ * Create a new enqueue message.
+ *
+ * @param queue target queue name
+ * @param headers message headers (may be empty)
+ * @param payload message payload bytes
+ */
+ public EnqueueMessage(String queue, Map headers, byte[] payload) {
+ this.queue = queue;
+ this.headers = Map.copyOf(headers);
+ this.payload = payload.clone();
+ }
+
+ /** Returns the target queue name. */
+ public String getQueue() {
+ return queue;
+ }
+
+ /** Returns the message headers. */
+ public Map getHeaders() {
+ return headers;
+ }
+
+ /** Returns the message payload bytes. */
+ public byte[] getPayload() {
+ return payload.clone();
+ }
+}
diff --git a/src/main/java/dev/faisca/fila/FilaClient.java b/src/main/java/dev/faisca/fila/FilaClient.java
index 188f92c..64ab1ed 100644
--- a/src/main/java/dev/faisca/fila/FilaClient.java
+++ b/src/main/java/dev/faisca/fila/FilaClient.java
@@ -13,15 +13,23 @@
import io.grpc.TlsChannelCredentials;
import java.io.ByteArrayInputStream;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
/**
* Client for the Fila message broker.
*
- * Wraps the hot-path gRPC operations: enqueue, consume, ack, nack.
+ *
Wraps the hot-path gRPC operations: enqueue, batch enqueue, consume, ack, nack.
+ *
+ *
By default, {@code enqueue()} routes through an opportunistic batcher that coalesces messages
+ * at high load without adding latency at low load. Use {@link Builder#withBatchMode(BatchMode)} to
+ * configure batching behavior.
*
*
{@code
* try (FilaClient client = FilaClient.builder("localhost:5555").build()) {
@@ -45,19 +53,22 @@ public final class FilaClient implements AutoCloseable {
private final byte[] clientCertPem;
private final byte[] clientKeyPem;
private final String apiKey;
+ private final Batcher batcher;
private FilaClient(
ManagedChannel channel,
byte[] caCertPem,
byte[] clientCertPem,
byte[] clientKeyPem,
- String apiKey) {
+ String apiKey,
+ Batcher batcher) {
this.channel = channel;
this.blockingStub = FilaServiceGrpc.newBlockingStub(channel);
this.caCertPem = caCertPem;
this.clientCertPem = clientCertPem;
this.clientKeyPem = clientKeyPem;
this.apiKey = apiKey;
+ this.batcher = batcher;
}
/** Returns a new builder for configuring a {@link FilaClient}. */
@@ -68,6 +79,10 @@ public static Builder builder(String address) {
/**
* Enqueue a message to the specified queue.
*
+ * When batching is enabled (the default), the message is submitted to the background batcher
+ * and may be coalesced with other messages. The method blocks until the message is acknowledged
+ * by the broker.
+ *
* @param queue target queue name
* @param headers message headers (may be empty)
* @param payload message payload bytes
@@ -76,24 +91,78 @@ public static Builder builder(String address) {
* @throws RpcException for unexpected gRPC failures
*/
public String enqueue(String queue, Map headers, byte[] payload) {
- Service.EnqueueRequest req =
- Service.EnqueueRequest.newBuilder()
- .setQueue(queue)
- .putAllHeaders(headers)
- .setPayload(com.google.protobuf.ByteString.copyFrom(payload))
- .build();
+ if (batcher != null) {
+ CompletableFuture future =
+ batcher.submit(new EnqueueMessage(queue, headers, payload));
+ try {
+ return future.get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new FilaException("enqueue interrupted", e);
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof FilaException) {
+ throw (FilaException) cause;
+ }
+ throw new RpcException(io.grpc.Status.Code.INTERNAL, cause.getMessage());
+ }
+ }
+
+ return enqueueDirect(queue, headers, payload);
+ }
+
+ /**
+ * Enqueue a batch of messages in a single RPC call.
+ *
+ * Each message is independently validated and processed. A failed message does not affect the
+ * others in the batch. Returns a list of results with one entry per input message, in the same
+ * order.
+ *
+ *
This bypasses the batcher and always uses the {@code BatchEnqueue} RPC directly.
+ *
+ * @param messages the messages to enqueue
+ * @return a list of results, one per input message
+ * @throws RpcException for transport-level failures affecting the entire batch
+ */
+ public List batchEnqueue(List messages) {
+ Service.BatchEnqueueRequest.Builder reqBuilder = Service.BatchEnqueueRequest.newBuilder();
+ for (EnqueueMessage msg : messages) {
+ reqBuilder.addMessages(
+ Service.EnqueueRequest.newBuilder()
+ .setQueue(msg.getQueue())
+ .putAllHeaders(msg.getHeaders())
+ .setPayload(com.google.protobuf.ByteString.copyFrom(msg.getPayload()))
+ .build());
+ }
+
try {
- Service.EnqueueResponse resp = blockingStub.enqueue(req);
- return resp.getMessageId();
+ Service.BatchEnqueueResponse resp = blockingStub.batchEnqueue(reqBuilder.build());
+ List protoResults = resp.getResultsList();
+ List results = new ArrayList<>(protoResults.size());
+ for (Service.BatchEnqueueResult r : protoResults) {
+ switch (r.getResultCase()) {
+ case SUCCESS:
+ results.add(BatchEnqueueResult.success(r.getSuccess().getMessageId()));
+ break;
+ case ERROR:
+ results.add(BatchEnqueueResult.error(r.getError()));
+ break;
+ default:
+ results.add(BatchEnqueueResult.error("no result from server"));
+ break;
+ }
+ }
+ return results;
} catch (StatusRuntimeException e) {
- throw mapEnqueueError(e);
+ throw mapBatchEnqueueError(e);
}
}
/**
* Open a streaming consumer on the specified queue.
*
- * Messages are delivered to the handler on a background thread. Nacked messages are
+ *
Messages are delivered to the handler on a background thread. The handler transparently
+ * receives messages from both singular and batched server responses. Nacked messages are
* redelivered on the same stream. Call {@link ConsumerHandle#cancel()} to stop consuming.
*
* @param queue queue to consume from
@@ -174,9 +243,16 @@ public void nack(String queue, String msgId, String error) {
}
}
- /** Shut down the underlying gRPC channel. */
+ /**
+ * Shut down the client, draining any pending batched messages before disconnecting.
+ *
+ *
If a batcher is running, pending messages are flushed before the gRPC channel is closed.
+ */
@Override
public void close() {
+ if (batcher != null) {
+ batcher.shutdown();
+ }
channel.shutdown();
try {
if (!channel.awaitTermination(5, TimeUnit.SECONDS)) {
@@ -188,14 +264,46 @@ public void close() {
}
}
+ /** Direct single-message enqueue RPC (no batcher). */
+ private String enqueueDirect(String queue, Map headers, byte[] payload) {
+ Service.EnqueueRequest req =
+ Service.EnqueueRequest.newBuilder()
+ .setQueue(queue)
+ .putAllHeaders(headers)
+ .setPayload(com.google.protobuf.ByteString.copyFrom(payload))
+ .build();
+ try {
+ Service.EnqueueResponse resp = blockingStub.enqueue(req);
+ return resp.getMessageId();
+ } catch (StatusRuntimeException e) {
+ throw mapEnqueueError(e);
+ }
+ }
+
+ /**
+ * Consume a stream, unpacking both singular and batched responses into individual messages.
+ *
+ * Prefers the batched {@code messages} field when non-empty, falling back to the singular
+ * {@code message} field for backward compatibility with older servers.
+ */
private static void consumeStream(
Iterator stream, Consumer handler) {
while (stream.hasNext()) {
Service.ConsumeResponse resp = stream.next();
- if (!resp.hasMessage() || resp.getMessage().getId().isEmpty()) {
- continue;
+
+ // Prefer the batched messages field when non-empty.
+ List batchedMessages = resp.getMessagesList();
+ if (!batchedMessages.isEmpty()) {
+ for (Messages.Message msg : batchedMessages) {
+ if (msg.getId().isEmpty()) {
+ continue;
+ }
+ handler.accept(buildConsumeMessage(msg));
+ }
+ } else if (resp.hasMessage() && !resp.getMessage().getId().isEmpty()) {
+ // Fall back to singular message for backward compatibility.
+ handler.accept(buildConsumeMessage(resp.getMessage()));
}
- handler.accept(buildConsumeMessage(resp.getMessage()));
}
}
@@ -230,7 +338,7 @@ private static void validateLeaderAddr(String addr) {
int port;
try {
port = Integer.parseInt(portStr);
- } catch (NumberFormatException e) {
+ } catch (NumberFormatException ex) {
throw new FilaException("invalid leader address: non-numeric port, got: " + addr);
}
if (port < 1 || port > 65535) {
@@ -296,13 +404,21 @@ private static ConsumeMessage buildConsumeMessage(Messages.Message msg) {
meta.getQueueId());
}
- private static FilaException mapEnqueueError(StatusRuntimeException e) {
+ static FilaException mapEnqueueError(StatusRuntimeException e) {
return switch (e.getStatus().getCode()) {
case NOT_FOUND -> new QueueNotFoundException("enqueue: " + e.getStatus().getDescription());
default -> new RpcException(e.getStatus().getCode(), e.getStatus().getDescription());
};
}
+ static FilaException mapBatchEnqueueError(StatusRuntimeException e) {
+ return switch (e.getStatus().getCode()) {
+ case NOT_FOUND ->
+ new QueueNotFoundException("batch enqueue: " + e.getStatus().getDescription());
+ default -> new RpcException(e.getStatus().getCode(), e.getStatus().getDescription());
+ };
+ }
+
private static FilaException mapConsumeError(StatusRuntimeException e) {
return switch (e.getStatus().getCode()) {
case NOT_FOUND -> new QueueNotFoundException("consume: " + e.getStatus().getDescription());
@@ -332,6 +448,7 @@ public static final class Builder {
private byte[] clientCertPem;
private byte[] clientKeyPem;
private String apiKey;
+ private BatchMode batchMode = BatchMode.auto();
private Builder(String address) {
this.address = address;
@@ -396,6 +513,20 @@ public Builder withApiKey(String apiKey) {
return this;
}
+ /**
+ * Set the batching mode for {@link FilaClient#enqueue} calls.
+ *
+ * Default is {@link BatchMode#auto()} -- opportunistic batching. Use {@link
+ * BatchMode#disabled()} to turn off batching entirely.
+ *
+ * @param batchMode the batch mode
+ * @return this builder
+ */
+ public Builder withBatchMode(BatchMode batchMode) {
+ this.batchMode = batchMode;
+ return this;
+ }
+
/** Build and connect the client. */
public FilaClient build() {
if (clientCertPem != null && !tlsEnabled) {
@@ -447,7 +578,19 @@ public FilaClient build() {
channel = channelBuilder.build();
}
- return new FilaClient(channel, caCertPem, clientCertPem, clientKeyPem, apiKey);
+ Batcher batcherInstance = null;
+ if (batchMode.getKind() != BatchMode.Kind.DISABLED) {
+ FilaServiceGrpc.FilaServiceBlockingStub batcherStub =
+ FilaServiceGrpc.newBlockingStub(channel);
+ if (apiKey != null) {
+ // The stub needs the interceptor applied at channel level (already done above).
+ // No additional interceptor needed on the stub.
+ }
+ batcherInstance = new Batcher(batcherStub, batchMode);
+ }
+
+ return new FilaClient(
+ channel, caCertPem, clientCertPem, clientKeyPem, apiKey, batcherInstance);
}
static String parseHost(String address) {
diff --git a/src/test/java/dev/faisca/fila/BatchClientTest.java b/src/test/java/dev/faisca/fila/BatchClientTest.java
new file mode 100644
index 0000000..6338b58
--- /dev/null
+++ b/src/test/java/dev/faisca/fila/BatchClientTest.java
@@ -0,0 +1,241 @@
+package dev.faisca.fila;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledIf;
+
+/**
+ * Integration tests for batch enqueue and smart batching.
+ *
+ *
Requires a fila-server binary. Skipped if not available.
+ */
+@EnabledIf("serverAvailable")
+class BatchClientTest {
+ private static TestServer server;
+
+ @BeforeAll
+ static void setUp() throws Exception {
+ server = TestServer.start();
+ server.createQueue("test-batch-explicit");
+ server.createQueue("test-batch-auto");
+ server.createQueue("test-batch-linger");
+ server.createQueue("test-batch-disabled");
+ server.createQueue("test-batch-consume");
+ server.createQueue("test-batch-mixed");
+ }
+
+ @AfterAll
+ static void tearDown() {
+ if (server != null) server.stop();
+ }
+
+ static boolean serverAvailable() {
+ return TestServer.isBinaryAvailable();
+ }
+
+ @Test
+ void explicitBatchEnqueue() {
+ try (FilaClient client =
+ FilaClient.builder(server.address()).withBatchMode(BatchMode.disabled()).build()) {
+ List messages = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ messages.add(
+ new EnqueueMessage(
+ "test-batch-explicit",
+ Map.of("idx", String.valueOf(i)),
+ ("batch-msg-" + i).getBytes()));
+ }
+
+ List results = client.batchEnqueue(messages);
+ assertEquals(5, results.size());
+
+ Set ids = new HashSet<>();
+ for (BatchEnqueueResult result : results) {
+ assertTrue(result.isSuccess(), "each message should succeed");
+ assertFalse(result.getMessageId().isEmpty());
+ ids.add(result.getMessageId());
+ }
+ assertEquals(5, ids.size(), "all message IDs should be unique");
+ }
+ }
+
+ @Test
+ void explicitBatchWithNonexistentQueue() {
+ try (FilaClient client =
+ FilaClient.builder(server.address()).withBatchMode(BatchMode.disabled()).build()) {
+ List messages = new ArrayList<>();
+ messages.add(new EnqueueMessage("test-batch-explicit", Map.of(), "good-msg".getBytes()));
+ messages.add(new EnqueueMessage("no-such-queue", Map.of(), "bad-msg".getBytes()));
+ messages.add(new EnqueueMessage("test-batch-explicit", Map.of(), "another-good".getBytes()));
+
+ List results = client.batchEnqueue(messages);
+ assertEquals(3, results.size());
+
+ // First and third should succeed, second should fail
+ assertTrue(results.get(0).isSuccess());
+ assertFalse(results.get(1).isSuccess());
+ assertTrue(results.get(2).isSuccess());
+ }
+ }
+
+ @Test
+ void autoBatchingEnqueue() throws Exception {
+ try (FilaClient client =
+ FilaClient.builder(server.address()).withBatchMode(BatchMode.auto()).build()) {
+ // Enqueue messages through the auto batcher
+ String msgId =
+ client.enqueue("test-batch-auto", Map.of("mode", "auto"), "auto-msg".getBytes());
+ assertNotNull(msgId);
+ assertFalse(msgId.isEmpty());
+
+ // Verify the message can be consumed
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicReference received = new AtomicReference<>();
+
+ ConsumerHandle handle =
+ client.consume(
+ "test-batch-auto",
+ msg -> {
+ received.set(msg);
+ client.ack("test-batch-auto", msg.getId());
+ latch.countDown();
+ });
+
+ assertTrue(latch.await(10, TimeUnit.SECONDS), "should receive message within 10s");
+ handle.cancel();
+
+ ConsumeMessage msg = received.get();
+ assertNotNull(msg);
+ assertEquals(msgId, msg.getId());
+ assertEquals("auto", msg.getHeaders().get("mode"));
+ assertArrayEquals("auto-msg".getBytes(), msg.getPayload());
+ }
+ }
+
+ @Test
+ void autoBatchingMultipleMessages() throws Exception {
+ try (FilaClient client =
+ FilaClient.builder(server.address()).withBatchMode(BatchMode.auto(50)).build()) {
+ // Send multiple messages quickly to exercise batching under load
+ int count = 10;
+ Set sentIds = new HashSet<>();
+ for (int i = 0; i < count; i++) {
+ String msgId =
+ client.enqueue(
+ "test-batch-consume", Map.of("idx", String.valueOf(i)), ("msg-" + i).getBytes());
+ assertNotNull(msgId);
+ sentIds.add(msgId);
+ }
+ assertEquals(count, sentIds.size(), "all message IDs should be unique");
+
+ // Consume all messages
+ CountDownLatch latch = new CountDownLatch(count);
+ Set receivedIds = java.util.Collections.synchronizedSet(new HashSet<>());
+
+ ConsumerHandle handle =
+ client.consume(
+ "test-batch-consume",
+ msg -> {
+ receivedIds.add(msg.getId());
+ client.ack("test-batch-consume", msg.getId());
+ latch.countDown();
+ });
+
+ assertTrue(latch.await(15, TimeUnit.SECONDS), "should receive all messages within 15s");
+ handle.cancel();
+
+ assertEquals(sentIds, receivedIds, "should receive all sent messages");
+ }
+ }
+
+ @Test
+ void lingerBatchingEnqueue() throws Exception {
+ try (FilaClient client =
+ FilaClient.builder(server.address()).withBatchMode(BatchMode.linger(50, 10)).build()) {
+ String msgId =
+ client.enqueue("test-batch-linger", Map.of("mode", "linger"), "linger-msg".getBytes());
+ assertNotNull(msgId);
+ assertFalse(msgId.isEmpty());
+
+ // Verify consumption
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicReference received = new AtomicReference<>();
+
+ ConsumerHandle handle =
+ client.consume(
+ "test-batch-linger",
+ msg -> {
+ received.set(msg);
+ client.ack("test-batch-linger", msg.getId());
+ latch.countDown();
+ });
+
+ assertTrue(latch.await(10, TimeUnit.SECONDS), "should receive message within 10s");
+ handle.cancel();
+
+ assertEquals(msgId, received.get().getId());
+ }
+ }
+
+ @Test
+ void disabledBatchingEnqueue() throws Exception {
+ try (FilaClient client =
+ FilaClient.builder(server.address()).withBatchMode(BatchMode.disabled()).build()) {
+ String msgId =
+ client.enqueue(
+ "test-batch-disabled", Map.of("mode", "disabled"), "direct-msg".getBytes());
+ assertNotNull(msgId);
+ assertFalse(msgId.isEmpty());
+
+ // Verify consumption
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicReference received = new AtomicReference<>();
+
+ ConsumerHandle handle =
+ client.consume(
+ "test-batch-disabled",
+ msg -> {
+ received.set(msg);
+ client.ack("test-batch-disabled", msg.getId());
+ latch.countDown();
+ });
+
+ assertTrue(latch.await(10, TimeUnit.SECONDS), "should receive message within 10s");
+ handle.cancel();
+
+ assertEquals(msgId, received.get().getId());
+ }
+ }
+
+ @Test
+ void enqueueNonexistentQueueThroughBatcher() {
+ try (FilaClient client =
+ FilaClient.builder(server.address()).withBatchMode(BatchMode.auto()).build()) {
+ assertThrows(
+ QueueNotFoundException.class,
+ () -> client.enqueue("no-such-queue-batch", Map.of(), "data".getBytes()));
+ }
+ }
+
+ @Test
+ void defaultBatchModeIsAuto() throws Exception {
+ // Default builder should use AUTO batching
+ try (FilaClient client = FilaClient.builder(server.address()).build()) {
+ String msgId =
+ client.enqueue("test-batch-mixed", Map.of("default", "true"), "default-batch".getBytes());
+ assertNotNull(msgId);
+ assertFalse(msgId.isEmpty());
+ }
+ }
+}
diff --git a/src/test/java/dev/faisca/fila/BatchEnqueueResultTest.java b/src/test/java/dev/faisca/fila/BatchEnqueueResultTest.java
new file mode 100644
index 0000000..2ba657b
--- /dev/null
+++ b/src/test/java/dev/faisca/fila/BatchEnqueueResultTest.java
@@ -0,0 +1,35 @@
+package dev.faisca.fila;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import org.junit.jupiter.api.Test;
+
+/** Unit tests for BatchEnqueueResult. */
+class BatchEnqueueResultTest {
+
+ @Test
+ void successResult() {
+ BatchEnqueueResult result = BatchEnqueueResult.success("msg-123");
+ assertTrue(result.isSuccess());
+ assertEquals("msg-123", result.getMessageId());
+ }
+
+ @Test
+ void successGetErrorThrows() {
+ BatchEnqueueResult result = BatchEnqueueResult.success("msg-123");
+ assertThrows(IllegalStateException.class, result::getError);
+ }
+
+ @Test
+ void errorResult() {
+ BatchEnqueueResult result = BatchEnqueueResult.error("queue not found");
+ assertFalse(result.isSuccess());
+ assertEquals("queue not found", result.getError());
+ }
+
+ @Test
+ void errorGetMessageIdThrows() {
+ BatchEnqueueResult result = BatchEnqueueResult.error("queue not found");
+ assertThrows(IllegalStateException.class, result::getMessageId);
+ }
+}
diff --git a/src/test/java/dev/faisca/fila/BatchModeTest.java b/src/test/java/dev/faisca/fila/BatchModeTest.java
new file mode 100644
index 0000000..2c5a930
--- /dev/null
+++ b/src/test/java/dev/faisca/fila/BatchModeTest.java
@@ -0,0 +1,57 @@
+package dev.faisca.fila;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import org.junit.jupiter.api.Test;
+
+/** Unit tests for BatchMode configuration. */
+class BatchModeTest {
+
+ @Test
+ void autoDefaultMaxBatchSize() {
+ BatchMode mode = BatchMode.auto();
+ assertEquals(BatchMode.Kind.AUTO, mode.getKind());
+ assertEquals(100, mode.getMaxBatchSize());
+ }
+
+ @Test
+ void autoCustomMaxBatchSize() {
+ BatchMode mode = BatchMode.auto(50);
+ assertEquals(BatchMode.Kind.AUTO, mode.getKind());
+ assertEquals(50, mode.getMaxBatchSize());
+ }
+
+ @Test
+ void autoRejectsZeroMaxBatchSize() {
+ assertThrows(IllegalArgumentException.class, () -> BatchMode.auto(0));
+ }
+
+ @Test
+ void autoRejectsNegativeMaxBatchSize() {
+ assertThrows(IllegalArgumentException.class, () -> BatchMode.auto(-1));
+ }
+
+ @Test
+ void lingerConfigValues() {
+ BatchMode mode = BatchMode.linger(10, 50);
+ assertEquals(BatchMode.Kind.LINGER, mode.getKind());
+ assertEquals(10, mode.getLingerMs());
+ assertEquals(50, mode.getMaxBatchSize());
+ }
+
+ @Test
+ void lingerRejectsZeroLingerMs() {
+ assertThrows(IllegalArgumentException.class, () -> BatchMode.linger(0, 50));
+ }
+
+ @Test
+ void lingerRejectsZeroBatchSize() {
+ assertThrows(IllegalArgumentException.class, () -> BatchMode.linger(10, 0));
+ }
+
+ @Test
+ void disabledMode() {
+ BatchMode mode = BatchMode.disabled();
+ assertEquals(BatchMode.Kind.DISABLED, mode.getKind());
+ }
+}
diff --git a/src/test/java/dev/faisca/fila/BuilderTest.java b/src/test/java/dev/faisca/fila/BuilderTest.java
index b144878..c28c562 100644
--- a/src/test/java/dev/faisca/fila/BuilderTest.java
+++ b/src/test/java/dev/faisca/fila/BuilderTest.java
@@ -9,12 +9,39 @@ class BuilderTest {
@Test
void builderPlaintextDoesNotThrow() {
- // Plaintext builder should create a client without error
+ // Plaintext builder should create a client without error (default AUTO batching)
FilaClient client = FilaClient.builder("localhost:5555").build();
assertNotNull(client);
client.close();
}
+ @Test
+ void builderWithBatchDisabledDoesNotThrow() {
+ // Plaintext builder with batching disabled
+ FilaClient client =
+ FilaClient.builder("localhost:5555").withBatchMode(BatchMode.disabled()).build();
+ assertNotNull(client);
+ client.close();
+ }
+
+ @Test
+ void builderWithBatchAutoDoesNotThrow() {
+ // Explicit AUTO batch mode
+ FilaClient client =
+ FilaClient.builder("localhost:5555").withBatchMode(BatchMode.auto(50)).build();
+ assertNotNull(client);
+ client.close();
+ }
+
+ @Test
+ void builderWithBatchLingerDoesNotThrow() {
+ // LINGER batch mode
+ FilaClient client =
+ FilaClient.builder("localhost:5555").withBatchMode(BatchMode.linger(10, 50)).build();
+ assertNotNull(client);
+ client.close();
+ }
+
@Test
void builderWithApiKeyDoesNotThrow() {
// API key without TLS should work (for backward compat / dev mode)
@@ -40,6 +67,7 @@ void builderChainingReturnsBuilder() {
FilaClient.Builder builder =
FilaClient.builder("localhost:5555")
.withApiKey("key")
+ .withBatchMode(BatchMode.auto())
.withTlsCaCert("cert".getBytes())
.withTlsClientCert("cert".getBytes(), "key".getBytes());
assertNotNull(builder);