Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 98 additions & 18 deletions proto/fila/v1/service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -6,57 +6,137 @@ import "fila/v1/messages.proto";
// Hot-path RPCs for producers and consumers.
service FilaService {
rpc Enqueue(EnqueueRequest) returns (EnqueueResponse);
rpc BatchEnqueue(BatchEnqueueRequest) returns (BatchEnqueueResponse);
rpc StreamEnqueue(stream StreamEnqueueRequest) returns (stream StreamEnqueueResponse);
rpc Consume(ConsumeRequest) returns (stream ConsumeResponse);
rpc Ack(AckRequest) returns (AckResponse);
rpc Nack(NackRequest) returns (NackResponse);
}

message EnqueueRequest {
// Individual message to enqueue.
message EnqueueMessage {
string queue = 1;
map<string, string> headers = 2;
bytes payload = 3;
}

// Enqueue one or more messages.
message EnqueueRequest {
repeated EnqueueMessage messages = 1;
}

// Per-message enqueue result.
message EnqueueResult {
oneof result {
string message_id = 1;
EnqueueError error = 2;
}
}

// Typed enqueue error with structured error code.
message EnqueueError {
EnqueueErrorCode code = 1;
string message = 2;
}

enum EnqueueErrorCode {
ENQUEUE_ERROR_CODE_UNSPECIFIED = 0;
ENQUEUE_ERROR_CODE_QUEUE_NOT_FOUND = 1;
ENQUEUE_ERROR_CODE_STORAGE = 2;
ENQUEUE_ERROR_CODE_LUA = 3;
ENQUEUE_ERROR_CODE_PERMISSION_DENIED = 4;
}

// One result per input message.
message EnqueueResponse {
string message_id = 1;
repeated EnqueueResult results = 1;
}

message ConsumeRequest {
string queue = 1;
}

message ConsumeResponse {
Message message = 1; // Single message (backward compatible, used when batch size is 1)
repeated Message messages = 2; // Batched messages (populated when server sends multiple at once)
repeated Message messages = 1;
}

message AckRequest {
// Individual ack item.
message AckMessage {
string queue = 1;
string message_id = 2;
}

message AckResponse {}
message AckRequest {
repeated AckMessage messages = 1;
}

message AckResult {
oneof result {
AckSuccess success = 1;
AckError error = 2;
}
}

message NackRequest {
message AckSuccess {}

message AckError {
AckErrorCode code = 1;
string message = 2;
}

enum AckErrorCode {
ACK_ERROR_CODE_UNSPECIFIED = 0;
ACK_ERROR_CODE_MESSAGE_NOT_FOUND = 1;
ACK_ERROR_CODE_STORAGE = 2;
ACK_ERROR_CODE_PERMISSION_DENIED = 3;
}

message AckResponse {
repeated AckResult results = 1;
}

// Individual nack item.
message NackMessage {
string queue = 1;
string message_id = 2;
string error = 3;
}

message NackResponse {}
message NackRequest {
repeated NackMessage messages = 1;
}

message NackResult {
oneof result {
NackSuccess success = 1;
NackError error = 2;
}
}

message BatchEnqueueRequest {
repeated EnqueueRequest messages = 1;
message NackSuccess {}

message NackError {
NackErrorCode code = 1;
string message = 2;
}

message BatchEnqueueResponse {
repeated BatchEnqueueResult results = 1;
enum NackErrorCode {
NACK_ERROR_CODE_UNSPECIFIED = 0;
NACK_ERROR_CODE_MESSAGE_NOT_FOUND = 1;
NACK_ERROR_CODE_STORAGE = 2;
NACK_ERROR_CODE_PERMISSION_DENIED = 3;
}

message BatchEnqueueResult {
oneof result {
EnqueueResponse success = 1;
string error = 2;
}
message NackResponse {
repeated NackResult results = 1;
}

// Stream enqueue — per-write batch with sequence tracking.
message StreamEnqueueRequest {
repeated EnqueueMessage messages = 1;
uint64 sequence_number = 2;
}

message StreamEnqueueResponse {
uint64 sequence_number = 1;
repeated EnqueueResult results = 2;
}
65 changes: 23 additions & 42 deletions src/main/java/dev/faisca/fila/Batcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Background batcher that coalesces individual enqueue calls into batch RPCs.
* Background batcher that coalesces individual enqueue calls into multi-message RPCs.
*
* <p>Supports two modes: AUTO (opportunistic, Nagle-style) and LINGER (timer-based). The batcher
* runs on a dedicated daemon thread and flushes RPCs on an executor pool.
* runs on a dedicated daemon thread and flushes RPCs on an executor pool. Uses the unified Enqueue
* RPC with repeated messages for all batch sizes.
*/
final class Batcher {
private final LinkedBlockingQueue<BatchItem> queue = new LinkedBlockingQueue<>();
Expand Down Expand Up @@ -199,66 +200,36 @@ private void runLinger() {
}
}

/**
* Flush a batch of messages. Uses single-message Enqueue RPC for 1 message (preserves error
* types), BatchEnqueue for 2+ messages.
*/
/** Flush a batch of messages via the unified Enqueue RPC. */
private void flushBatch(List<BatchItem> items) {
if (items.isEmpty()) {
return;
}

if (items.size() == 1) {
flushSingle(items.get(0));
return;
}

flushMultiple(items);
}

/** Single-item optimization: use regular Enqueue RPC for exact error semantics. */
private void flushSingle(BatchItem item) {
Service.EnqueueRequest req =
Service.EnqueueRequest.newBuilder()
.setQueue(item.message.getQueue())
.putAllHeaders(item.message.getHeaders())
.setPayload(com.google.protobuf.ByteString.copyFrom(item.message.getPayload()))
.build();
try {
Service.EnqueueResponse resp = stub.enqueue(req);
item.future.complete(resp.getMessageId());
} catch (StatusRuntimeException e) {
item.future.completeExceptionally(FilaClient.mapEnqueueError(e));
}
}

/** Multi-item flush: use BatchEnqueue RPC for amortized overhead. */
private void flushMultiple(List<BatchItem> items) {
Service.BatchEnqueueRequest.Builder reqBuilder = Service.BatchEnqueueRequest.newBuilder();
Service.EnqueueRequest.Builder reqBuilder = Service.EnqueueRequest.newBuilder();
for (BatchItem item : items) {
reqBuilder.addMessages(
Service.EnqueueRequest.newBuilder()
Service.EnqueueMessage.newBuilder()
.setQueue(item.message.getQueue())
.putAllHeaders(item.message.getHeaders())
.setPayload(com.google.protobuf.ByteString.copyFrom(item.message.getPayload()))
.build());
}

try {
Service.BatchEnqueueResponse resp = stub.batchEnqueue(reqBuilder.build());
List<Service.BatchEnqueueResult> results = resp.getResultsList();
Service.EnqueueResponse resp = stub.enqueue(reqBuilder.build());
List<Service.EnqueueResult> results = resp.getResultsList();

for (int i = 0; i < items.size(); i++) {
BatchItem item = items.get(i);
if (i < results.size()) {
Service.BatchEnqueueResult result = results.get(i);
Service.EnqueueResult result = results.get(i);
switch (result.getResultCase()) {
case SUCCESS:
item.future.complete(result.getSuccess().getMessageId());
case MESSAGE_ID:
item.future.complete(result.getMessageId());
break;
case ERROR:
item.future.completeExceptionally(
new RpcException(io.grpc.Status.Code.INTERNAL, result.getError()));
item.future.completeExceptionally(mapEnqueueResultError(result.getError()));
break;
default:
item.future.completeExceptionally(
Expand All @@ -273,13 +244,23 @@ private void flushMultiple(List<BatchItem> items) {
}
}
} catch (StatusRuntimeException e) {
FilaException mapped = FilaClient.mapBatchEnqueueError(e);
FilaException mapped = FilaClient.mapEnqueueError(e);
for (BatchItem item : items) {
item.future.completeExceptionally(mapped);
}
}
}

private static FilaException mapEnqueueResultError(Service.EnqueueError error) {
return switch (error.getCode()) {
case ENQUEUE_ERROR_CODE_QUEUE_NOT_FOUND ->
new QueueNotFoundException("enqueue: " + error.getMessage());
case ENQUEUE_ERROR_CODE_PERMISSION_DENIED ->
new RpcException(io.grpc.Status.Code.PERMISSION_DENIED, error.getMessage());
default -> new RpcException(io.grpc.Status.Code.INTERNAL, error.getMessage());
};
}

private static Thread newDaemon(Runnable r, String name) {
Thread t = new Thread(r, name);
t.setDaemon(true);
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/dev/faisca/fila/EnqueueMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
import java.util.Map;

/**
* A message to be enqueued via {@link FilaClient#batchEnqueue(java.util.List)}.
* A message to be enqueued via {@link FilaClient#enqueueMany(java.util.List)}.
*
* <p>Each message specifies its target queue, headers, and payload independently, allowing a single
* batch to target multiple queues.
* call to target multiple queues.
*/
public final class EnqueueMessage {
private final String queue;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,29 +1,29 @@
package dev.faisca.fila;

/**
* The result of a single message within a batch enqueue call.
* The result of a single message within an enqueue call.
*
* <p>Each message in a batch is independently validated and processed. A failed message does not
* affect the others. Use {@link #isSuccess()} to check the outcome, then either {@link
* #getMessageId()} or {@link #getError()}.
* <p>Each message in a multi-message enqueue is independently validated and processed. A failed
* message does not affect the others. Use {@link #isSuccess()} to check the outcome, then either
* {@link #getMessageId()} or {@link #getError()}.
*/
public final class BatchEnqueueResult {
public final class EnqueueResult {
private final String messageId;
private final String error;

private BatchEnqueueResult(String messageId, String error) {
private EnqueueResult(String messageId, String error) {
this.messageId = messageId;
this.error = error;
}

/** Create a successful result with the broker-assigned message ID. */
static BatchEnqueueResult success(String messageId) {
return new BatchEnqueueResult(messageId, null);
static EnqueueResult success(String messageId) {
return new EnqueueResult(messageId, null);
}

/** Create a failed result with an error description. */
static BatchEnqueueResult error(String error) {
return new BatchEnqueueResult(null, error);
static EnqueueResult error(String error) {
return new EnqueueResult(null, error);
}

/** Returns true if the message was successfully enqueued. */
Expand Down
Loading
Loading