Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion proto/fila/v1/service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import "fila/v1/messages.proto";
// Hot-path RPCs for producers and consumers.
service FilaService {
rpc Enqueue(EnqueueRequest) returns (EnqueueResponse);
rpc BatchEnqueue(BatchEnqueueRequest) returns (BatchEnqueueResponse);
rpc Consume(ConsumeRequest) returns (stream ConsumeResponse);
rpc Ack(AckRequest) returns (AckResponse);
rpc Nack(NackRequest) returns (NackResponse);
Expand All @@ -26,7 +27,8 @@ message ConsumeRequest {
}

message ConsumeResponse {
Message message = 1;
Message message = 1; // Single message (backward compatible, used when batch size is 1)
repeated Message messages = 2; // Batched messages (populated when server sends multiple at once)
}

message AckRequest {
Expand All @@ -43,3 +45,18 @@ message NackRequest {
}

message NackResponse {}

message BatchEnqueueRequest {
repeated EnqueueRequest messages = 1;
}

message BatchEnqueueResponse {
repeated BatchEnqueueResult results = 1;
}

message BatchEnqueueResult {
oneof result {
EnqueueResponse success = 1;
string error = 2;
}
}
57 changes: 57 additions & 0 deletions src/main/java/dev/faisca/fila/BatchEnqueueResult.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package dev.faisca.fila;

/**
* The result of a single message within a batch enqueue call.
*
* <p>Each message in a batch is independently validated and processed. A failed message does not
* affect the others. Use {@link #isSuccess()} to check the outcome, then either {@link
* #getMessageId()} or {@link #getError()}.
*/
public final class BatchEnqueueResult {
private final String messageId;
private final String error;

private BatchEnqueueResult(String messageId, String error) {
this.messageId = messageId;
this.error = error;
}

/** Create a successful result with the broker-assigned message ID. */
static BatchEnqueueResult success(String messageId) {
return new BatchEnqueueResult(messageId, null);
}

/** Create a failed result with an error description. */
static BatchEnqueueResult error(String error) {
return new BatchEnqueueResult(null, error);
}

/** Returns true if the message was successfully enqueued. */
public boolean isSuccess() {
return messageId != null;
}

/**
* Returns the broker-assigned message ID.
*
* @throws IllegalStateException if this result is an error
*/
public String getMessageId() {
if (messageId == null) {
throw new IllegalStateException("result is an error: " + error);
}
return messageId;
}

/**
* Returns the error description.
*
* @throws IllegalStateException if this result is a success
*/
public String getError() {
if (error == null) {
throw new IllegalStateException("result is a success");
}
return error;
}
}
93 changes: 93 additions & 0 deletions src/main/java/dev/faisca/fila/BatchMode.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package dev.faisca.fila;

/**
* Controls how the SDK batches {@link FilaClient#enqueue} calls.
*
* <p>The default is {@link #auto()} -- opportunistic batching that requires zero configuration. At
* low load each message is sent individually (zero added latency). At high load messages accumulate
* naturally and are flushed together.
*/
public final class BatchMode {
enum Kind {
AUTO,
LINGER,
DISABLED
}

private final Kind kind;
private final int maxBatchSize;
private final long lingerMs;

private BatchMode(Kind kind, int maxBatchSize, long lingerMs) {
this.kind = kind;
this.maxBatchSize = maxBatchSize;
this.lingerMs = lingerMs;
}

/**
* Opportunistic batching (default).
*
* <p>A background thread blocks for the first message, then drains any additional messages that
* arrived concurrently. At low load each message is sent individually. At high load messages
* accumulate naturally into batches. Zero config, zero latency penalty.
*
* @return a new AUTO batch mode with default max batch size (100)
*/
public static BatchMode auto() {
return new BatchMode(Kind.AUTO, 100, 0);
}

/**
* Opportunistic batching with a custom max batch size.
*
* @param maxBatchSize safety cap on batch size
* @return a new AUTO batch mode
*/
public static BatchMode auto(int maxBatchSize) {
if (maxBatchSize < 1) {
throw new IllegalArgumentException("maxBatchSize must be >= 1");
}
return new BatchMode(Kind.AUTO, maxBatchSize, 0);
}

/**
* Timer-based forced batching.
*
* <p>Buffers messages and flushes when either {@code batchSize} messages accumulate or {@code
* lingerMs} milliseconds elapse since the first message in the batch -- whichever comes first.
*
* @param lingerMs time threshold in milliseconds before a partial batch is flushed
* @param batchSize maximum messages per batch
* @return a new LINGER batch mode
*/
public static BatchMode linger(long lingerMs, int batchSize) {
if (lingerMs < 1) {
throw new IllegalArgumentException("lingerMs must be >= 1");
}
if (batchSize < 1) {
throw new IllegalArgumentException("batchSize must be >= 1");
}
return new BatchMode(Kind.LINGER, batchSize, lingerMs);
}

/**
* No batching. Each {@link FilaClient#enqueue} call is a direct single-message RPC.
*
* @return a DISABLED batch mode
*/
public static BatchMode disabled() {
return new BatchMode(Kind.DISABLED, 0, 0);
}

Kind getKind() {
return kind;
}

int getMaxBatchSize() {
return maxBatchSize;
}

long getLingerMs() {
return lingerMs;
}
}
Loading
Loading