diff --git a/proto/fila/v1/service.proto b/proto/fila/v1/service.proto index f14fdd0..fc0f710 100644 --- a/proto/fila/v1/service.proto +++ b/proto/fila/v1/service.proto @@ -6,6 +6,7 @@ import "fila/v1/messages.proto"; // Hot-path RPCs for producers and consumers. service FilaService { rpc Enqueue(EnqueueRequest) returns (EnqueueResponse); + rpc BatchEnqueue(BatchEnqueueRequest) returns (BatchEnqueueResponse); rpc Consume(ConsumeRequest) returns (stream ConsumeResponse); rpc Ack(AckRequest) returns (AckResponse); rpc Nack(NackRequest) returns (NackResponse); @@ -26,7 +27,8 @@ message ConsumeRequest { } message ConsumeResponse { - Message message = 1; + Message message = 1; // Single message (backward compatible, used when batch size is 1) + repeated Message messages = 2; // Batched messages (populated when server sends multiple at once) } message AckRequest { @@ -43,3 +45,18 @@ message NackRequest { } message NackResponse {} + +message BatchEnqueueRequest { + repeated EnqueueRequest messages = 1; +} + +message BatchEnqueueResponse { + repeated BatchEnqueueResult results = 1; +} + +message BatchEnqueueResult { + oneof result { + EnqueueResponse success = 1; + string error = 2; + } +} diff --git a/src/main/java/dev/faisca/fila/BatchEnqueueResult.java b/src/main/java/dev/faisca/fila/BatchEnqueueResult.java new file mode 100644 index 0000000..eab5f50 --- /dev/null +++ b/src/main/java/dev/faisca/fila/BatchEnqueueResult.java @@ -0,0 +1,57 @@ +package dev.faisca.fila; + +/** + * The result of a single message within a batch enqueue call. + * + *

Each message in a batch is independently validated and processed. A failed message does not + * affect the others. Use {@link #isSuccess()} to check the outcome, then either {@link + * #getMessageId()} or {@link #getError()}. + */ +public final class BatchEnqueueResult { + private final String messageId; + private final String error; + + private BatchEnqueueResult(String messageId, String error) { + this.messageId = messageId; + this.error = error; + } + + /** Create a successful result with the broker-assigned message ID. */ + static BatchEnqueueResult success(String messageId) { + return new BatchEnqueueResult(messageId, null); + } + + /** Create a failed result with an error description. */ + static BatchEnqueueResult error(String error) { + return new BatchEnqueueResult(null, error); + } + + /** Returns true if the message was successfully enqueued. */ + public boolean isSuccess() { + return messageId != null; + } + + /** + * Returns the broker-assigned message ID. + * + * @throws IllegalStateException if this result is an error + */ + public String getMessageId() { + if (messageId == null) { + throw new IllegalStateException("result is an error: " + error); + } + return messageId; + } + + /** + * Returns the error description. + * + * @throws IllegalStateException if this result is a success + */ + public String getError() { + if (error == null) { + throw new IllegalStateException("result is a success"); + } + return error; + } +} diff --git a/src/main/java/dev/faisca/fila/BatchMode.java b/src/main/java/dev/faisca/fila/BatchMode.java new file mode 100644 index 0000000..0be14fc --- /dev/null +++ b/src/main/java/dev/faisca/fila/BatchMode.java @@ -0,0 +1,93 @@ +package dev.faisca.fila; + +/** + * Controls how the SDK batches {@link FilaClient#enqueue} calls. + * + *

The default is {@link #auto()} -- opportunistic batching that requires zero configuration. At + * low load each message is sent individually (zero added latency). At high load messages accumulate + * naturally and are flushed together. + */ +public final class BatchMode { + enum Kind { + AUTO, + LINGER, + DISABLED + } + + private final Kind kind; + private final int maxBatchSize; + private final long lingerMs; + + private BatchMode(Kind kind, int maxBatchSize, long lingerMs) { + this.kind = kind; + this.maxBatchSize = maxBatchSize; + this.lingerMs = lingerMs; + } + + /** + * Opportunistic batching (default). + * + *

A background thread blocks for the first message, then drains any additional messages that + * arrived concurrently. At low load each message is sent individually. At high load messages + * accumulate naturally into batches. Zero config, zero latency penalty. + * + * @return a new AUTO batch mode with default max batch size (100) + */ + public static BatchMode auto() { + return new BatchMode(Kind.AUTO, 100, 0); + } + + /** + * Opportunistic batching with a custom max batch size. + * + * @param maxBatchSize safety cap on batch size + * @return a new AUTO batch mode + */ + public static BatchMode auto(int maxBatchSize) { + if (maxBatchSize < 1) { + throw new IllegalArgumentException("maxBatchSize must be >= 1"); + } + return new BatchMode(Kind.AUTO, maxBatchSize, 0); + } + + /** + * Timer-based forced batching. + * + *

Buffers messages and flushes when either {@code batchSize} messages accumulate or {@code + * lingerMs} milliseconds elapse since the first message in the batch -- whichever comes first. + * + * @param lingerMs time threshold in milliseconds before a partial batch is flushed + * @param batchSize maximum messages per batch + * @return a new LINGER batch mode + */ + public static BatchMode linger(long lingerMs, int batchSize) { + if (lingerMs < 1) { + throw new IllegalArgumentException("lingerMs must be >= 1"); + } + if (batchSize < 1) { + throw new IllegalArgumentException("batchSize must be >= 1"); + } + return new BatchMode(Kind.LINGER, batchSize, lingerMs); + } + + /** + * No batching. Each {@link FilaClient#enqueue} call is a direct single-message RPC. + * + * @return a DISABLED batch mode + */ + public static BatchMode disabled() { + return new BatchMode(Kind.DISABLED, 0, 0); + } + + Kind getKind() { + return kind; + } + + int getMaxBatchSize() { + return maxBatchSize; + } + + long getLingerMs() { + return lingerMs; + } +} diff --git a/src/main/java/dev/faisca/fila/Batcher.java b/src/main/java/dev/faisca/fila/Batcher.java new file mode 100644 index 0000000..486f745 --- /dev/null +++ b/src/main/java/dev/faisca/fila/Batcher.java @@ -0,0 +1,288 @@ +package dev.faisca.fila; + +import fila.v1.FilaServiceGrpc; +import fila.v1.Service; +import io.grpc.StatusRuntimeException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Background batcher that coalesces individual enqueue calls into batch RPCs. + * + *

Supports two modes: AUTO (opportunistic, Nagle-style) and LINGER (timer-based). The batcher + * runs on a dedicated daemon thread and flushes RPCs on an executor pool. + */ +final class Batcher { + private final LinkedBlockingQueue queue = new LinkedBlockingQueue<>(); + private final AtomicBoolean running = new AtomicBoolean(true); + private final FilaServiceGrpc.FilaServiceBlockingStub stub; + private final BatchMode mode; + private final Thread batcherThread; + private final ExecutorService flushExecutor; + private final ScheduledExecutorService scheduler; + + static final class BatchItem { + final EnqueueMessage message; + final CompletableFuture future; + + BatchItem(EnqueueMessage message, CompletableFuture future) { + this.message = message; + this.future = future; + } + } + + Batcher(FilaServiceGrpc.FilaServiceBlockingStub stub, BatchMode mode) { + this.stub = stub; + this.mode = mode; + this.flushExecutor = Executors.newCachedThreadPool(r -> newDaemon(r, "fila-batch-flush")); + this.scheduler = + mode.getKind() == BatchMode.Kind.LINGER + ? Executors.newSingleThreadScheduledExecutor(r -> newDaemon(r, "fila-batch-scheduler")) + : null; + + this.batcherThread = + new Thread( + mode.getKind() == BatchMode.Kind.AUTO ? this::runAuto : this::runLinger, + "fila-batcher"); + this.batcherThread.setDaemon(true); + this.batcherThread.start(); + } + + /** + * Submit a message for batched enqueuing. + * + * @return a future that completes with the message ID or fails with a FilaException + */ + CompletableFuture submit(EnqueueMessage message) { + CompletableFuture future = new CompletableFuture<>(); + if (!running.get()) { + future.completeExceptionally(new FilaException("batcher is shut down")); + return future; + } + queue.add(new BatchItem(message, future)); + return future; + } + + /** Drain pending messages and shut down. Blocks until all pending flushes complete. */ + void shutdown() { + running.set(false); + batcherThread.interrupt(); + try { + batcherThread.join(5000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + // Drain any remaining items in the queue. + List remaining = new ArrayList<>(); + queue.drainTo(remaining); + if (!remaining.isEmpty()) { + flushBatch(remaining); + } + + flushExecutor.shutdown(); + try { + if (!flushExecutor.awaitTermination(5, TimeUnit.SECONDS)) { + flushExecutor.shutdownNow(); + } + } catch (InterruptedException e) { + flushExecutor.shutdownNow(); + Thread.currentThread().interrupt(); + } + + if (scheduler != null) { + scheduler.shutdown(); + try { + if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) { + scheduler.shutdownNow(); + } + } catch (InterruptedException e) { + scheduler.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + } + + /** AUTO mode: block for first message, drain any additional, flush concurrently. */ + private void runAuto() { + int maxBatchSize = mode.getMaxBatchSize(); + while (running.get()) { + try { + BatchItem first = queue.take(); + List batch = new ArrayList<>(); + batch.add(first); + queue.drainTo(batch, maxBatchSize - 1); + + List toFlush = List.copyOf(batch); + flushExecutor.submit(() -> flushBatch(toFlush)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + } + } + + /** LINGER mode: buffer messages and flush when batch is full or linger timer fires. */ + private void runLinger() { + int batchSize = mode.getMaxBatchSize(); + long lingerMs = mode.getLingerMs(); + List buffer = new ArrayList<>(); + ScheduledFuture lingerTimer = null; + + while (running.get()) { + try { + if (buffer.isEmpty()) { + // Block for first message. + BatchItem item = queue.take(); + buffer.add(item); + + if (buffer.size() >= batchSize) { + List toFlush = List.copyOf(buffer); + buffer.clear(); + flushExecutor.submit(() -> flushBatch(toFlush)); + } else { + // Start linger timer. + final List timerBuffer = buffer; + lingerTimer = + scheduler.schedule( + () -> { + // Signal the batcher thread to flush by adding a poison pill. + // The actual flush happens in the main loop. + batcherThread.interrupt(); + }, + lingerMs, + TimeUnit.MILLISECONDS); + } + } else { + // Buffer has items -- wait for more or timer expiry. + BatchItem item = queue.poll(lingerMs, TimeUnit.MILLISECONDS); + if (item != null) { + buffer.add(item); + // Drain any additional available items. + queue.drainTo(buffer, batchSize - buffer.size()); + } + + if (buffer.size() >= batchSize || item == null) { + if (lingerTimer != null) { + lingerTimer.cancel(false); + lingerTimer = null; + } + List toFlush = List.copyOf(buffer); + buffer.clear(); + flushExecutor.submit(() -> flushBatch(toFlush)); + } + } + } catch (InterruptedException e) { + // Timer or shutdown interrupt -- flush what we have. + if (!buffer.isEmpty()) { + if (lingerTimer != null) { + lingerTimer.cancel(false); + lingerTimer = null; + } + List toFlush = List.copyOf(buffer); + buffer.clear(); + flushExecutor.submit(() -> flushBatch(toFlush)); + } + if (!running.get()) { + Thread.currentThread().interrupt(); + return; + } + } + } + } + + /** + * Flush a batch of messages. Uses single-message Enqueue RPC for 1 message (preserves error + * types), BatchEnqueue for 2+ messages. + */ + private void flushBatch(List items) { + if (items.isEmpty()) { + return; + } + + if (items.size() == 1) { + flushSingle(items.get(0)); + return; + } + + flushMultiple(items); + } + + /** Single-item optimization: use regular Enqueue RPC for exact error semantics. */ + private void flushSingle(BatchItem item) { + Service.EnqueueRequest req = + Service.EnqueueRequest.newBuilder() + .setQueue(item.message.getQueue()) + .putAllHeaders(item.message.getHeaders()) + .setPayload(com.google.protobuf.ByteString.copyFrom(item.message.getPayload())) + .build(); + try { + Service.EnqueueResponse resp = stub.enqueue(req); + item.future.complete(resp.getMessageId()); + } catch (StatusRuntimeException e) { + item.future.completeExceptionally(FilaClient.mapEnqueueError(e)); + } + } + + /** Multi-item flush: use BatchEnqueue RPC for amortized overhead. */ + private void flushMultiple(List items) { + Service.BatchEnqueueRequest.Builder reqBuilder = Service.BatchEnqueueRequest.newBuilder(); + for (BatchItem item : items) { + reqBuilder.addMessages( + Service.EnqueueRequest.newBuilder() + .setQueue(item.message.getQueue()) + .putAllHeaders(item.message.getHeaders()) + .setPayload(com.google.protobuf.ByteString.copyFrom(item.message.getPayload())) + .build()); + } + + try { + Service.BatchEnqueueResponse resp = stub.batchEnqueue(reqBuilder.build()); + List results = resp.getResultsList(); + + for (int i = 0; i < items.size(); i++) { + BatchItem item = items.get(i); + if (i < results.size()) { + Service.BatchEnqueueResult result = results.get(i); + switch (result.getResultCase()) { + case SUCCESS: + item.future.complete(result.getSuccess().getMessageId()); + break; + case ERROR: + item.future.completeExceptionally( + new RpcException(io.grpc.Status.Code.INTERNAL, result.getError())); + break; + default: + item.future.completeExceptionally( + new RpcException(io.grpc.Status.Code.INTERNAL, "no result from server")); + break; + } + } else { + item.future.completeExceptionally( + new RpcException( + io.grpc.Status.Code.INTERNAL, + "server returned fewer results than messages sent")); + } + } + } catch (StatusRuntimeException e) { + FilaException mapped = FilaClient.mapBatchEnqueueError(e); + for (BatchItem item : items) { + item.future.completeExceptionally(mapped); + } + } + } + + private static Thread newDaemon(Runnable r, String name) { + Thread t = new Thread(r, name); + t.setDaemon(true); + return t; + } +} diff --git a/src/main/java/dev/faisca/fila/EnqueueMessage.java b/src/main/java/dev/faisca/fila/EnqueueMessage.java new file mode 100644 index 0000000..767de30 --- /dev/null +++ b/src/main/java/dev/faisca/fila/EnqueueMessage.java @@ -0,0 +1,43 @@ +package dev.faisca.fila; + +import java.util.Map; + +/** + * A message to be enqueued via {@link FilaClient#batchEnqueue(java.util.List)}. + * + *

Each message specifies its target queue, headers, and payload independently, allowing a single + * batch to target multiple queues. + */ +public final class EnqueueMessage { + private final String queue; + private final Map headers; + private final byte[] payload; + + /** + * Create a new enqueue message. + * + * @param queue target queue name + * @param headers message headers (may be empty) + * @param payload message payload bytes + */ + public EnqueueMessage(String queue, Map headers, byte[] payload) { + this.queue = queue; + this.headers = Map.copyOf(headers); + this.payload = payload.clone(); + } + + /** Returns the target queue name. */ + public String getQueue() { + return queue; + } + + /** Returns the message headers. */ + public Map getHeaders() { + return headers; + } + + /** Returns the message payload bytes. */ + public byte[] getPayload() { + return payload.clone(); + } +} diff --git a/src/main/java/dev/faisca/fila/FilaClient.java b/src/main/java/dev/faisca/fila/FilaClient.java index 188f92c..64ab1ed 100644 --- a/src/main/java/dev/faisca/fila/FilaClient.java +++ b/src/main/java/dev/faisca/fila/FilaClient.java @@ -13,15 +13,23 @@ import io.grpc.TlsChannelCredentials; import java.io.ByteArrayInputStream; import java.io.IOException; +import java.util.ArrayList; import java.util.Iterator; +import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; /** * Client for the Fila message broker. * - *

Wraps the hot-path gRPC operations: enqueue, consume, ack, nack. + *

Wraps the hot-path gRPC operations: enqueue, batch enqueue, consume, ack, nack. + * + *

By default, {@code enqueue()} routes through an opportunistic batcher that coalesces messages + * at high load without adding latency at low load. Use {@link Builder#withBatchMode(BatchMode)} to + * configure batching behavior. * *

{@code
  * try (FilaClient client = FilaClient.builder("localhost:5555").build()) {
@@ -45,19 +53,22 @@ public final class FilaClient implements AutoCloseable {
   private final byte[] clientCertPem;
   private final byte[] clientKeyPem;
   private final String apiKey;
+  private final Batcher batcher;
 
   private FilaClient(
       ManagedChannel channel,
       byte[] caCertPem,
       byte[] clientCertPem,
       byte[] clientKeyPem,
-      String apiKey) {
+      String apiKey,
+      Batcher batcher) {
     this.channel = channel;
     this.blockingStub = FilaServiceGrpc.newBlockingStub(channel);
     this.caCertPem = caCertPem;
     this.clientCertPem = clientCertPem;
     this.clientKeyPem = clientKeyPem;
     this.apiKey = apiKey;
+    this.batcher = batcher;
   }
 
   /** Returns a new builder for configuring a {@link FilaClient}. */
@@ -68,6 +79,10 @@ public static Builder builder(String address) {
   /**
    * Enqueue a message to the specified queue.
    *
+   * 

When batching is enabled (the default), the message is submitted to the background batcher + * and may be coalesced with other messages. The method blocks until the message is acknowledged + * by the broker. + * * @param queue target queue name * @param headers message headers (may be empty) * @param payload message payload bytes @@ -76,24 +91,78 @@ public static Builder builder(String address) { * @throws RpcException for unexpected gRPC failures */ public String enqueue(String queue, Map headers, byte[] payload) { - Service.EnqueueRequest req = - Service.EnqueueRequest.newBuilder() - .setQueue(queue) - .putAllHeaders(headers) - .setPayload(com.google.protobuf.ByteString.copyFrom(payload)) - .build(); + if (batcher != null) { + CompletableFuture future = + batcher.submit(new EnqueueMessage(queue, headers, payload)); + try { + return future.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new FilaException("enqueue interrupted", e); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof FilaException) { + throw (FilaException) cause; + } + throw new RpcException(io.grpc.Status.Code.INTERNAL, cause.getMessage()); + } + } + + return enqueueDirect(queue, headers, payload); + } + + /** + * Enqueue a batch of messages in a single RPC call. + * + *

Each message is independently validated and processed. A failed message does not affect the + * others in the batch. Returns a list of results with one entry per input message, in the same + * order. + * + *

This bypasses the batcher and always uses the {@code BatchEnqueue} RPC directly. + * + * @param messages the messages to enqueue + * @return a list of results, one per input message + * @throws RpcException for transport-level failures affecting the entire batch + */ + public List batchEnqueue(List messages) { + Service.BatchEnqueueRequest.Builder reqBuilder = Service.BatchEnqueueRequest.newBuilder(); + for (EnqueueMessage msg : messages) { + reqBuilder.addMessages( + Service.EnqueueRequest.newBuilder() + .setQueue(msg.getQueue()) + .putAllHeaders(msg.getHeaders()) + .setPayload(com.google.protobuf.ByteString.copyFrom(msg.getPayload())) + .build()); + } + try { - Service.EnqueueResponse resp = blockingStub.enqueue(req); - return resp.getMessageId(); + Service.BatchEnqueueResponse resp = blockingStub.batchEnqueue(reqBuilder.build()); + List protoResults = resp.getResultsList(); + List results = new ArrayList<>(protoResults.size()); + for (Service.BatchEnqueueResult r : protoResults) { + switch (r.getResultCase()) { + case SUCCESS: + results.add(BatchEnqueueResult.success(r.getSuccess().getMessageId())); + break; + case ERROR: + results.add(BatchEnqueueResult.error(r.getError())); + break; + default: + results.add(BatchEnqueueResult.error("no result from server")); + break; + } + } + return results; } catch (StatusRuntimeException e) { - throw mapEnqueueError(e); + throw mapBatchEnqueueError(e); } } /** * Open a streaming consumer on the specified queue. * - *

Messages are delivered to the handler on a background thread. Nacked messages are + *

Messages are delivered to the handler on a background thread. The handler transparently + * receives messages from both singular and batched server responses. Nacked messages are * redelivered on the same stream. Call {@link ConsumerHandle#cancel()} to stop consuming. * * @param queue queue to consume from @@ -174,9 +243,16 @@ public void nack(String queue, String msgId, String error) { } } - /** Shut down the underlying gRPC channel. */ + /** + * Shut down the client, draining any pending batched messages before disconnecting. + * + *

If a batcher is running, pending messages are flushed before the gRPC channel is closed. + */ @Override public void close() { + if (batcher != null) { + batcher.shutdown(); + } channel.shutdown(); try { if (!channel.awaitTermination(5, TimeUnit.SECONDS)) { @@ -188,14 +264,46 @@ public void close() { } } + /** Direct single-message enqueue RPC (no batcher). */ + private String enqueueDirect(String queue, Map headers, byte[] payload) { + Service.EnqueueRequest req = + Service.EnqueueRequest.newBuilder() + .setQueue(queue) + .putAllHeaders(headers) + .setPayload(com.google.protobuf.ByteString.copyFrom(payload)) + .build(); + try { + Service.EnqueueResponse resp = blockingStub.enqueue(req); + return resp.getMessageId(); + } catch (StatusRuntimeException e) { + throw mapEnqueueError(e); + } + } + + /** + * Consume a stream, unpacking both singular and batched responses into individual messages. + * + *

Prefers the batched {@code messages} field when non-empty, falling back to the singular + * {@code message} field for backward compatibility with older servers. + */ private static void consumeStream( Iterator stream, Consumer handler) { while (stream.hasNext()) { Service.ConsumeResponse resp = stream.next(); - if (!resp.hasMessage() || resp.getMessage().getId().isEmpty()) { - continue; + + // Prefer the batched messages field when non-empty. + List batchedMessages = resp.getMessagesList(); + if (!batchedMessages.isEmpty()) { + for (Messages.Message msg : batchedMessages) { + if (msg.getId().isEmpty()) { + continue; + } + handler.accept(buildConsumeMessage(msg)); + } + } else if (resp.hasMessage() && !resp.getMessage().getId().isEmpty()) { + // Fall back to singular message for backward compatibility. + handler.accept(buildConsumeMessage(resp.getMessage())); } - handler.accept(buildConsumeMessage(resp.getMessage())); } } @@ -230,7 +338,7 @@ private static void validateLeaderAddr(String addr) { int port; try { port = Integer.parseInt(portStr); - } catch (NumberFormatException e) { + } catch (NumberFormatException ex) { throw new FilaException("invalid leader address: non-numeric port, got: " + addr); } if (port < 1 || port > 65535) { @@ -296,13 +404,21 @@ private static ConsumeMessage buildConsumeMessage(Messages.Message msg) { meta.getQueueId()); } - private static FilaException mapEnqueueError(StatusRuntimeException e) { + static FilaException mapEnqueueError(StatusRuntimeException e) { return switch (e.getStatus().getCode()) { case NOT_FOUND -> new QueueNotFoundException("enqueue: " + e.getStatus().getDescription()); default -> new RpcException(e.getStatus().getCode(), e.getStatus().getDescription()); }; } + static FilaException mapBatchEnqueueError(StatusRuntimeException e) { + return switch (e.getStatus().getCode()) { + case NOT_FOUND -> + new QueueNotFoundException("batch enqueue: " + e.getStatus().getDescription()); + default -> new RpcException(e.getStatus().getCode(), e.getStatus().getDescription()); + }; + } + private static FilaException mapConsumeError(StatusRuntimeException e) { return switch (e.getStatus().getCode()) { case NOT_FOUND -> new QueueNotFoundException("consume: " + e.getStatus().getDescription()); @@ -332,6 +448,7 @@ public static final class Builder { private byte[] clientCertPem; private byte[] clientKeyPem; private String apiKey; + private BatchMode batchMode = BatchMode.auto(); private Builder(String address) { this.address = address; @@ -396,6 +513,20 @@ public Builder withApiKey(String apiKey) { return this; } + /** + * Set the batching mode for {@link FilaClient#enqueue} calls. + * + *

Default is {@link BatchMode#auto()} -- opportunistic batching. Use {@link + * BatchMode#disabled()} to turn off batching entirely. + * + * @param batchMode the batch mode + * @return this builder + */ + public Builder withBatchMode(BatchMode batchMode) { + this.batchMode = batchMode; + return this; + } + /** Build and connect the client. */ public FilaClient build() { if (clientCertPem != null && !tlsEnabled) { @@ -447,7 +578,19 @@ public FilaClient build() { channel = channelBuilder.build(); } - return new FilaClient(channel, caCertPem, clientCertPem, clientKeyPem, apiKey); + Batcher batcherInstance = null; + if (batchMode.getKind() != BatchMode.Kind.DISABLED) { + FilaServiceGrpc.FilaServiceBlockingStub batcherStub = + FilaServiceGrpc.newBlockingStub(channel); + if (apiKey != null) { + // The stub needs the interceptor applied at channel level (already done above). + // No additional interceptor needed on the stub. + } + batcherInstance = new Batcher(batcherStub, batchMode); + } + + return new FilaClient( + channel, caCertPem, clientCertPem, clientKeyPem, apiKey, batcherInstance); } static String parseHost(String address) { diff --git a/src/test/java/dev/faisca/fila/BatchClientTest.java b/src/test/java/dev/faisca/fila/BatchClientTest.java new file mode 100644 index 0000000..6338b58 --- /dev/null +++ b/src/test/java/dev/faisca/fila/BatchClientTest.java @@ -0,0 +1,241 @@ +package dev.faisca.fila; + +import static org.junit.jupiter.api.Assertions.*; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledIf; + +/** + * Integration tests for batch enqueue and smart batching. + * + *

Requires a fila-server binary. Skipped if not available. + */ +@EnabledIf("serverAvailable") +class BatchClientTest { + private static TestServer server; + + @BeforeAll + static void setUp() throws Exception { + server = TestServer.start(); + server.createQueue("test-batch-explicit"); + server.createQueue("test-batch-auto"); + server.createQueue("test-batch-linger"); + server.createQueue("test-batch-disabled"); + server.createQueue("test-batch-consume"); + server.createQueue("test-batch-mixed"); + } + + @AfterAll + static void tearDown() { + if (server != null) server.stop(); + } + + static boolean serverAvailable() { + return TestServer.isBinaryAvailable(); + } + + @Test + void explicitBatchEnqueue() { + try (FilaClient client = + FilaClient.builder(server.address()).withBatchMode(BatchMode.disabled()).build()) { + List messages = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + messages.add( + new EnqueueMessage( + "test-batch-explicit", + Map.of("idx", String.valueOf(i)), + ("batch-msg-" + i).getBytes())); + } + + List results = client.batchEnqueue(messages); + assertEquals(5, results.size()); + + Set ids = new HashSet<>(); + for (BatchEnqueueResult result : results) { + assertTrue(result.isSuccess(), "each message should succeed"); + assertFalse(result.getMessageId().isEmpty()); + ids.add(result.getMessageId()); + } + assertEquals(5, ids.size(), "all message IDs should be unique"); + } + } + + @Test + void explicitBatchWithNonexistentQueue() { + try (FilaClient client = + FilaClient.builder(server.address()).withBatchMode(BatchMode.disabled()).build()) { + List messages = new ArrayList<>(); + messages.add(new EnqueueMessage("test-batch-explicit", Map.of(), "good-msg".getBytes())); + messages.add(new EnqueueMessage("no-such-queue", Map.of(), "bad-msg".getBytes())); + messages.add(new EnqueueMessage("test-batch-explicit", Map.of(), "another-good".getBytes())); + + List results = client.batchEnqueue(messages); + assertEquals(3, results.size()); + + // First and third should succeed, second should fail + assertTrue(results.get(0).isSuccess()); + assertFalse(results.get(1).isSuccess()); + assertTrue(results.get(2).isSuccess()); + } + } + + @Test + void autoBatchingEnqueue() throws Exception { + try (FilaClient client = + FilaClient.builder(server.address()).withBatchMode(BatchMode.auto()).build()) { + // Enqueue messages through the auto batcher + String msgId = + client.enqueue("test-batch-auto", Map.of("mode", "auto"), "auto-msg".getBytes()); + assertNotNull(msgId); + assertFalse(msgId.isEmpty()); + + // Verify the message can be consumed + CountDownLatch latch = new CountDownLatch(1); + AtomicReference received = new AtomicReference<>(); + + ConsumerHandle handle = + client.consume( + "test-batch-auto", + msg -> { + received.set(msg); + client.ack("test-batch-auto", msg.getId()); + latch.countDown(); + }); + + assertTrue(latch.await(10, TimeUnit.SECONDS), "should receive message within 10s"); + handle.cancel(); + + ConsumeMessage msg = received.get(); + assertNotNull(msg); + assertEquals(msgId, msg.getId()); + assertEquals("auto", msg.getHeaders().get("mode")); + assertArrayEquals("auto-msg".getBytes(), msg.getPayload()); + } + } + + @Test + void autoBatchingMultipleMessages() throws Exception { + try (FilaClient client = + FilaClient.builder(server.address()).withBatchMode(BatchMode.auto(50)).build()) { + // Send multiple messages quickly to exercise batching under load + int count = 10; + Set sentIds = new HashSet<>(); + for (int i = 0; i < count; i++) { + String msgId = + client.enqueue( + "test-batch-consume", Map.of("idx", String.valueOf(i)), ("msg-" + i).getBytes()); + assertNotNull(msgId); + sentIds.add(msgId); + } + assertEquals(count, sentIds.size(), "all message IDs should be unique"); + + // Consume all messages + CountDownLatch latch = new CountDownLatch(count); + Set receivedIds = java.util.Collections.synchronizedSet(new HashSet<>()); + + ConsumerHandle handle = + client.consume( + "test-batch-consume", + msg -> { + receivedIds.add(msg.getId()); + client.ack("test-batch-consume", msg.getId()); + latch.countDown(); + }); + + assertTrue(latch.await(15, TimeUnit.SECONDS), "should receive all messages within 15s"); + handle.cancel(); + + assertEquals(sentIds, receivedIds, "should receive all sent messages"); + } + } + + @Test + void lingerBatchingEnqueue() throws Exception { + try (FilaClient client = + FilaClient.builder(server.address()).withBatchMode(BatchMode.linger(50, 10)).build()) { + String msgId = + client.enqueue("test-batch-linger", Map.of("mode", "linger"), "linger-msg".getBytes()); + assertNotNull(msgId); + assertFalse(msgId.isEmpty()); + + // Verify consumption + CountDownLatch latch = new CountDownLatch(1); + AtomicReference received = new AtomicReference<>(); + + ConsumerHandle handle = + client.consume( + "test-batch-linger", + msg -> { + received.set(msg); + client.ack("test-batch-linger", msg.getId()); + latch.countDown(); + }); + + assertTrue(latch.await(10, TimeUnit.SECONDS), "should receive message within 10s"); + handle.cancel(); + + assertEquals(msgId, received.get().getId()); + } + } + + @Test + void disabledBatchingEnqueue() throws Exception { + try (FilaClient client = + FilaClient.builder(server.address()).withBatchMode(BatchMode.disabled()).build()) { + String msgId = + client.enqueue( + "test-batch-disabled", Map.of("mode", "disabled"), "direct-msg".getBytes()); + assertNotNull(msgId); + assertFalse(msgId.isEmpty()); + + // Verify consumption + CountDownLatch latch = new CountDownLatch(1); + AtomicReference received = new AtomicReference<>(); + + ConsumerHandle handle = + client.consume( + "test-batch-disabled", + msg -> { + received.set(msg); + client.ack("test-batch-disabled", msg.getId()); + latch.countDown(); + }); + + assertTrue(latch.await(10, TimeUnit.SECONDS), "should receive message within 10s"); + handle.cancel(); + + assertEquals(msgId, received.get().getId()); + } + } + + @Test + void enqueueNonexistentQueueThroughBatcher() { + try (FilaClient client = + FilaClient.builder(server.address()).withBatchMode(BatchMode.auto()).build()) { + assertThrows( + QueueNotFoundException.class, + () -> client.enqueue("no-such-queue-batch", Map.of(), "data".getBytes())); + } + } + + @Test + void defaultBatchModeIsAuto() throws Exception { + // Default builder should use AUTO batching + try (FilaClient client = FilaClient.builder(server.address()).build()) { + String msgId = + client.enqueue("test-batch-mixed", Map.of("default", "true"), "default-batch".getBytes()); + assertNotNull(msgId); + assertFalse(msgId.isEmpty()); + } + } +} diff --git a/src/test/java/dev/faisca/fila/BatchEnqueueResultTest.java b/src/test/java/dev/faisca/fila/BatchEnqueueResultTest.java new file mode 100644 index 0000000..2ba657b --- /dev/null +++ b/src/test/java/dev/faisca/fila/BatchEnqueueResultTest.java @@ -0,0 +1,35 @@ +package dev.faisca.fila; + +import static org.junit.jupiter.api.Assertions.*; + +import org.junit.jupiter.api.Test; + +/** Unit tests for BatchEnqueueResult. */ +class BatchEnqueueResultTest { + + @Test + void successResult() { + BatchEnqueueResult result = BatchEnqueueResult.success("msg-123"); + assertTrue(result.isSuccess()); + assertEquals("msg-123", result.getMessageId()); + } + + @Test + void successGetErrorThrows() { + BatchEnqueueResult result = BatchEnqueueResult.success("msg-123"); + assertThrows(IllegalStateException.class, result::getError); + } + + @Test + void errorResult() { + BatchEnqueueResult result = BatchEnqueueResult.error("queue not found"); + assertFalse(result.isSuccess()); + assertEquals("queue not found", result.getError()); + } + + @Test + void errorGetMessageIdThrows() { + BatchEnqueueResult result = BatchEnqueueResult.error("queue not found"); + assertThrows(IllegalStateException.class, result::getMessageId); + } +} diff --git a/src/test/java/dev/faisca/fila/BatchModeTest.java b/src/test/java/dev/faisca/fila/BatchModeTest.java new file mode 100644 index 0000000..2c5a930 --- /dev/null +++ b/src/test/java/dev/faisca/fila/BatchModeTest.java @@ -0,0 +1,57 @@ +package dev.faisca.fila; + +import static org.junit.jupiter.api.Assertions.*; + +import org.junit.jupiter.api.Test; + +/** Unit tests for BatchMode configuration. */ +class BatchModeTest { + + @Test + void autoDefaultMaxBatchSize() { + BatchMode mode = BatchMode.auto(); + assertEquals(BatchMode.Kind.AUTO, mode.getKind()); + assertEquals(100, mode.getMaxBatchSize()); + } + + @Test + void autoCustomMaxBatchSize() { + BatchMode mode = BatchMode.auto(50); + assertEquals(BatchMode.Kind.AUTO, mode.getKind()); + assertEquals(50, mode.getMaxBatchSize()); + } + + @Test + void autoRejectsZeroMaxBatchSize() { + assertThrows(IllegalArgumentException.class, () -> BatchMode.auto(0)); + } + + @Test + void autoRejectsNegativeMaxBatchSize() { + assertThrows(IllegalArgumentException.class, () -> BatchMode.auto(-1)); + } + + @Test + void lingerConfigValues() { + BatchMode mode = BatchMode.linger(10, 50); + assertEquals(BatchMode.Kind.LINGER, mode.getKind()); + assertEquals(10, mode.getLingerMs()); + assertEquals(50, mode.getMaxBatchSize()); + } + + @Test + void lingerRejectsZeroLingerMs() { + assertThrows(IllegalArgumentException.class, () -> BatchMode.linger(0, 50)); + } + + @Test + void lingerRejectsZeroBatchSize() { + assertThrows(IllegalArgumentException.class, () -> BatchMode.linger(10, 0)); + } + + @Test + void disabledMode() { + BatchMode mode = BatchMode.disabled(); + assertEquals(BatchMode.Kind.DISABLED, mode.getKind()); + } +} diff --git a/src/test/java/dev/faisca/fila/BuilderTest.java b/src/test/java/dev/faisca/fila/BuilderTest.java index b144878..c28c562 100644 --- a/src/test/java/dev/faisca/fila/BuilderTest.java +++ b/src/test/java/dev/faisca/fila/BuilderTest.java @@ -9,12 +9,39 @@ class BuilderTest { @Test void builderPlaintextDoesNotThrow() { - // Plaintext builder should create a client without error + // Plaintext builder should create a client without error (default AUTO batching) FilaClient client = FilaClient.builder("localhost:5555").build(); assertNotNull(client); client.close(); } + @Test + void builderWithBatchDisabledDoesNotThrow() { + // Plaintext builder with batching disabled + FilaClient client = + FilaClient.builder("localhost:5555").withBatchMode(BatchMode.disabled()).build(); + assertNotNull(client); + client.close(); + } + + @Test + void builderWithBatchAutoDoesNotThrow() { + // Explicit AUTO batch mode + FilaClient client = + FilaClient.builder("localhost:5555").withBatchMode(BatchMode.auto(50)).build(); + assertNotNull(client); + client.close(); + } + + @Test + void builderWithBatchLingerDoesNotThrow() { + // LINGER batch mode + FilaClient client = + FilaClient.builder("localhost:5555").withBatchMode(BatchMode.linger(10, 50)).build(); + assertNotNull(client); + client.close(); + } + @Test void builderWithApiKeyDoesNotThrow() { // API key without TLS should work (for backward compat / dev mode) @@ -40,6 +67,7 @@ void builderChainingReturnsBuilder() { FilaClient.Builder builder = FilaClient.builder("localhost:5555") .withApiKey("key") + .withBatchMode(BatchMode.auto()) .withTlsCaCert("cert".getBytes()) .withTlsClientCert("cert".getBytes(), "key".getBytes()); assertNotNull(builder);