diff --git a/proto/fila/v1/service.proto b/proto/fila/v1/service.proto index fc0f710..7d1db79 100644 --- a/proto/fila/v1/service.proto +++ b/proto/fila/v1/service.proto @@ -6,20 +6,49 @@ import "fila/v1/messages.proto"; // Hot-path RPCs for producers and consumers. service FilaService { rpc Enqueue(EnqueueRequest) returns (EnqueueResponse); - rpc BatchEnqueue(BatchEnqueueRequest) returns (BatchEnqueueResponse); + rpc StreamEnqueue(stream StreamEnqueueRequest) returns (stream StreamEnqueueResponse); rpc Consume(ConsumeRequest) returns (stream ConsumeResponse); rpc Ack(AckRequest) returns (AckResponse); rpc Nack(NackRequest) returns (NackResponse); } -message EnqueueRequest { +// Individual message to enqueue. +message EnqueueMessage { string queue = 1; map headers = 2; bytes payload = 3; } +// Enqueue one or more messages. +message EnqueueRequest { + repeated EnqueueMessage messages = 1; +} + +// Per-message enqueue result. +message EnqueueResult { + oneof result { + string message_id = 1; + EnqueueError error = 2; + } +} + +// Typed enqueue error with structured error code. +message EnqueueError { + EnqueueErrorCode code = 1; + string message = 2; +} + +enum EnqueueErrorCode { + ENQUEUE_ERROR_CODE_UNSPECIFIED = 0; + ENQUEUE_ERROR_CODE_QUEUE_NOT_FOUND = 1; + ENQUEUE_ERROR_CODE_STORAGE = 2; + ENQUEUE_ERROR_CODE_LUA = 3; + ENQUEUE_ERROR_CODE_PERMISSION_DENIED = 4; +} + +// One result per input message. message EnqueueResponse { - string message_id = 1; + repeated EnqueueResult results = 1; } message ConsumeRequest { @@ -27,36 +56,87 @@ message ConsumeRequest { } message ConsumeResponse { - Message message = 1; // Single message (backward compatible, used when batch size is 1) - repeated Message messages = 2; // Batched messages (populated when server sends multiple at once) + repeated Message messages = 1; } -message AckRequest { +// Individual ack item. +message AckMessage { string queue = 1; string message_id = 2; } -message AckResponse {} +message AckRequest { + repeated AckMessage messages = 1; +} + +message AckResult { + oneof result { + AckSuccess success = 1; + AckError error = 2; + } +} -message NackRequest { +message AckSuccess {} + +message AckError { + AckErrorCode code = 1; + string message = 2; +} + +enum AckErrorCode { + ACK_ERROR_CODE_UNSPECIFIED = 0; + ACK_ERROR_CODE_MESSAGE_NOT_FOUND = 1; + ACK_ERROR_CODE_STORAGE = 2; + ACK_ERROR_CODE_PERMISSION_DENIED = 3; +} + +message AckResponse { + repeated AckResult results = 1; +} + +// Individual nack item. +message NackMessage { string queue = 1; string message_id = 2; string error = 3; } -message NackResponse {} +message NackRequest { + repeated NackMessage messages = 1; +} + +message NackResult { + oneof result { + NackSuccess success = 1; + NackError error = 2; + } +} -message BatchEnqueueRequest { - repeated EnqueueRequest messages = 1; +message NackSuccess {} + +message NackError { + NackErrorCode code = 1; + string message = 2; } -message BatchEnqueueResponse { - repeated BatchEnqueueResult results = 1; +enum NackErrorCode { + NACK_ERROR_CODE_UNSPECIFIED = 0; + NACK_ERROR_CODE_MESSAGE_NOT_FOUND = 1; + NACK_ERROR_CODE_STORAGE = 2; + NACK_ERROR_CODE_PERMISSION_DENIED = 3; } -message BatchEnqueueResult { - oneof result { - EnqueueResponse success = 1; - string error = 2; - } +message NackResponse { + repeated NackResult results = 1; +} + +// Stream enqueue — per-write batch with sequence tracking. +message StreamEnqueueRequest { + repeated EnqueueMessage messages = 1; + uint64 sequence_number = 2; +} + +message StreamEnqueueResponse { + uint64 sequence_number = 1; + repeated EnqueueResult results = 2; } diff --git a/src/main/java/dev/faisca/fila/Batcher.java b/src/main/java/dev/faisca/fila/Batcher.java index 486f745..0694d37 100644 --- a/src/main/java/dev/faisca/fila/Batcher.java +++ b/src/main/java/dev/faisca/fila/Batcher.java @@ -15,10 +15,11 @@ import java.util.concurrent.atomic.AtomicBoolean; /** - * Background batcher that coalesces individual enqueue calls into batch RPCs. + * Background batcher that coalesces individual enqueue calls into multi-message RPCs. * *

Supports two modes: AUTO (opportunistic, Nagle-style) and LINGER (timer-based). The batcher - * runs on a dedicated daemon thread and flushes RPCs on an executor pool. + * runs on a dedicated daemon thread and flushes RPCs on an executor pool. Uses the unified Enqueue + * RPC with repeated messages for all batch sizes. */ final class Batcher { private final LinkedBlockingQueue queue = new LinkedBlockingQueue<>(); @@ -199,45 +200,16 @@ private void runLinger() { } } - /** - * Flush a batch of messages. Uses single-message Enqueue RPC for 1 message (preserves error - * types), BatchEnqueue for 2+ messages. - */ + /** Flush a batch of messages via the unified Enqueue RPC. */ private void flushBatch(List items) { if (items.isEmpty()) { return; } - if (items.size() == 1) { - flushSingle(items.get(0)); - return; - } - - flushMultiple(items); - } - - /** Single-item optimization: use regular Enqueue RPC for exact error semantics. */ - private void flushSingle(BatchItem item) { - Service.EnqueueRequest req = - Service.EnqueueRequest.newBuilder() - .setQueue(item.message.getQueue()) - .putAllHeaders(item.message.getHeaders()) - .setPayload(com.google.protobuf.ByteString.copyFrom(item.message.getPayload())) - .build(); - try { - Service.EnqueueResponse resp = stub.enqueue(req); - item.future.complete(resp.getMessageId()); - } catch (StatusRuntimeException e) { - item.future.completeExceptionally(FilaClient.mapEnqueueError(e)); - } - } - - /** Multi-item flush: use BatchEnqueue RPC for amortized overhead. */ - private void flushMultiple(List items) { - Service.BatchEnqueueRequest.Builder reqBuilder = Service.BatchEnqueueRequest.newBuilder(); + Service.EnqueueRequest.Builder reqBuilder = Service.EnqueueRequest.newBuilder(); for (BatchItem item : items) { reqBuilder.addMessages( - Service.EnqueueRequest.newBuilder() + Service.EnqueueMessage.newBuilder() .setQueue(item.message.getQueue()) .putAllHeaders(item.message.getHeaders()) .setPayload(com.google.protobuf.ByteString.copyFrom(item.message.getPayload())) @@ -245,20 +217,19 @@ private void flushMultiple(List items) { } try { - Service.BatchEnqueueResponse resp = stub.batchEnqueue(reqBuilder.build()); - List results = resp.getResultsList(); + Service.EnqueueResponse resp = stub.enqueue(reqBuilder.build()); + List results = resp.getResultsList(); for (int i = 0; i < items.size(); i++) { BatchItem item = items.get(i); if (i < results.size()) { - Service.BatchEnqueueResult result = results.get(i); + Service.EnqueueResult result = results.get(i); switch (result.getResultCase()) { - case SUCCESS: - item.future.complete(result.getSuccess().getMessageId()); + case MESSAGE_ID: + item.future.complete(result.getMessageId()); break; case ERROR: - item.future.completeExceptionally( - new RpcException(io.grpc.Status.Code.INTERNAL, result.getError())); + item.future.completeExceptionally(mapEnqueueResultError(result.getError())); break; default: item.future.completeExceptionally( @@ -273,13 +244,23 @@ private void flushMultiple(List items) { } } } catch (StatusRuntimeException e) { - FilaException mapped = FilaClient.mapBatchEnqueueError(e); + FilaException mapped = FilaClient.mapEnqueueError(e); for (BatchItem item : items) { item.future.completeExceptionally(mapped); } } } + private static FilaException mapEnqueueResultError(Service.EnqueueError error) { + return switch (error.getCode()) { + case ENQUEUE_ERROR_CODE_QUEUE_NOT_FOUND -> + new QueueNotFoundException("enqueue: " + error.getMessage()); + case ENQUEUE_ERROR_CODE_PERMISSION_DENIED -> + new RpcException(io.grpc.Status.Code.PERMISSION_DENIED, error.getMessage()); + default -> new RpcException(io.grpc.Status.Code.INTERNAL, error.getMessage()); + }; + } + private static Thread newDaemon(Runnable r, String name) { Thread t = new Thread(r, name); t.setDaemon(true); diff --git a/src/main/java/dev/faisca/fila/EnqueueMessage.java b/src/main/java/dev/faisca/fila/EnqueueMessage.java index 767de30..80d63ec 100644 --- a/src/main/java/dev/faisca/fila/EnqueueMessage.java +++ b/src/main/java/dev/faisca/fila/EnqueueMessage.java @@ -3,10 +3,10 @@ import java.util.Map; /** - * A message to be enqueued via {@link FilaClient#batchEnqueue(java.util.List)}. + * A message to be enqueued via {@link FilaClient#enqueueMany(java.util.List)}. * *

Each message specifies its target queue, headers, and payload independently, allowing a single - * batch to target multiple queues. + * call to target multiple queues. */ public final class EnqueueMessage { private final String queue; diff --git a/src/main/java/dev/faisca/fila/BatchEnqueueResult.java b/src/main/java/dev/faisca/fila/EnqueueResult.java similarity index 62% rename from src/main/java/dev/faisca/fila/BatchEnqueueResult.java rename to src/main/java/dev/faisca/fila/EnqueueResult.java index eab5f50..3bea453 100644 --- a/src/main/java/dev/faisca/fila/BatchEnqueueResult.java +++ b/src/main/java/dev/faisca/fila/EnqueueResult.java @@ -1,29 +1,29 @@ package dev.faisca.fila; /** - * The result of a single message within a batch enqueue call. + * The result of a single message within an enqueue call. * - *

Each message in a batch is independently validated and processed. A failed message does not - * affect the others. Use {@link #isSuccess()} to check the outcome, then either {@link - * #getMessageId()} or {@link #getError()}. + *

Each message in a multi-message enqueue is independently validated and processed. A failed + * message does not affect the others. Use {@link #isSuccess()} to check the outcome, then either + * {@link #getMessageId()} or {@link #getError()}. */ -public final class BatchEnqueueResult { +public final class EnqueueResult { private final String messageId; private final String error; - private BatchEnqueueResult(String messageId, String error) { + private EnqueueResult(String messageId, String error) { this.messageId = messageId; this.error = error; } /** Create a successful result with the broker-assigned message ID. */ - static BatchEnqueueResult success(String messageId) { - return new BatchEnqueueResult(messageId, null); + static EnqueueResult success(String messageId) { + return new EnqueueResult(messageId, null); } /** Create a failed result with an error description. */ - static BatchEnqueueResult error(String error) { - return new BatchEnqueueResult(null, error); + static EnqueueResult error(String error) { + return new EnqueueResult(null, error); } /** Returns true if the message was successfully enqueued. */ diff --git a/src/main/java/dev/faisca/fila/FilaClient.java b/src/main/java/dev/faisca/fila/FilaClient.java index 64ab1ed..8be44cb 100644 --- a/src/main/java/dev/faisca/fila/FilaClient.java +++ b/src/main/java/dev/faisca/fila/FilaClient.java @@ -25,7 +25,7 @@ /** * Client for the Fila message broker. * - *

Wraps the hot-path gRPC operations: enqueue, batch enqueue, consume, ack, nack. + *

Wraps the hot-path gRPC operations: enqueue, consume, ack, nack. * *

By default, {@code enqueue()} routes through an opportunistic batcher that coalesces messages * at high load without adding latency at low load. Use {@link Builder#withBatchMode(BatchMode)} to @@ -112,23 +112,23 @@ public String enqueue(String queue, Map headers, byte[] payload) } /** - * Enqueue a batch of messages in a single RPC call. + * Enqueue multiple messages in a single RPC call. * *

Each message is independently validated and processed. A failed message does not affect the * others in the batch. Returns a list of results with one entry per input message, in the same * order. * - *

This bypasses the batcher and always uses the {@code BatchEnqueue} RPC directly. + *

This bypasses the batcher and always uses the {@code Enqueue} RPC directly. * * @param messages the messages to enqueue * @return a list of results, one per input message * @throws RpcException for transport-level failures affecting the entire batch */ - public List batchEnqueue(List messages) { - Service.BatchEnqueueRequest.Builder reqBuilder = Service.BatchEnqueueRequest.newBuilder(); + public List enqueueMany(List messages) { + Service.EnqueueRequest.Builder reqBuilder = Service.EnqueueRequest.newBuilder(); for (EnqueueMessage msg : messages) { reqBuilder.addMessages( - Service.EnqueueRequest.newBuilder() + Service.EnqueueMessage.newBuilder() .setQueue(msg.getQueue()) .putAllHeaders(msg.getHeaders()) .setPayload(com.google.protobuf.ByteString.copyFrom(msg.getPayload())) @@ -136,25 +136,25 @@ public List batchEnqueue(List messages) { } try { - Service.BatchEnqueueResponse resp = blockingStub.batchEnqueue(reqBuilder.build()); - List protoResults = resp.getResultsList(); - List results = new ArrayList<>(protoResults.size()); - for (Service.BatchEnqueueResult r : protoResults) { + Service.EnqueueResponse resp = blockingStub.enqueue(reqBuilder.build()); + List protoResults = resp.getResultsList(); + List results = new ArrayList<>(protoResults.size()); + for (Service.EnqueueResult r : protoResults) { switch (r.getResultCase()) { - case SUCCESS: - results.add(BatchEnqueueResult.success(r.getSuccess().getMessageId())); + case MESSAGE_ID: + results.add(EnqueueResult.success(r.getMessageId())); break; case ERROR: - results.add(BatchEnqueueResult.error(r.getError())); + results.add(EnqueueResult.error(r.getError().getMessage())); break; default: - results.add(BatchEnqueueResult.error("no result from server")); + results.add(EnqueueResult.error("no result from server")); break; } } return results; } catch (StatusRuntimeException e) { - throw mapBatchEnqueueError(e); + throw mapEnqueueError(e); } } @@ -162,8 +162,8 @@ public List batchEnqueue(List messages) { * Open a streaming consumer on the specified queue. * *

Messages are delivered to the handler on a background thread. The handler transparently - * receives messages from both singular and batched server responses. Nacked messages are - * redelivered on the same stream. Call {@link ConsumerHandle#cancel()} to stop consuming. + * receives messages from batched server responses. Nacked messages are redelivered on the same + * stream. Call {@link ConsumerHandle#cancel()} to stop consuming. * * @param queue queue to consume from * @param handler callback invoked for each message @@ -212,9 +212,23 @@ public ConsumerHandle consume(String queue, Consumer handler) { */ public void ack(String queue, String msgId) { Service.AckRequest req = - Service.AckRequest.newBuilder().setQueue(queue).setMessageId(msgId).build(); + Service.AckRequest.newBuilder() + .addMessages( + Service.AckMessage.newBuilder().setQueue(queue).setMessageId(msgId).build()) + .build(); try { - blockingStub.ack(req); + Service.AckResponse resp = blockingStub.ack(req); + List results = resp.getResultsList(); + if (results.size() != 1) { + throw new RpcException(io.grpc.Status.Code.INTERNAL, "no result from server"); + } + Service.AckResult first = results.get(0); + if (first.getResultCase() == Service.AckResult.ResultCase.ERROR) { + throw mapAckResultError(first.getError()); + } + if (first.getResultCase() == Service.AckResult.ResultCase.RESULT_NOT_SET) { + throw new RpcException(io.grpc.Status.Code.INTERNAL, "no result from server"); + } } catch (StatusRuntimeException e) { throw mapAckError(e); } @@ -232,12 +246,26 @@ public void ack(String queue, String msgId) { public void nack(String queue, String msgId, String error) { Service.NackRequest req = Service.NackRequest.newBuilder() - .setQueue(queue) - .setMessageId(msgId) - .setError(error) + .addMessages( + Service.NackMessage.newBuilder() + .setQueue(queue) + .setMessageId(msgId) + .setError(error) + .build()) .build(); try { - blockingStub.nack(req); + Service.NackResponse resp = blockingStub.nack(req); + List results = resp.getResultsList(); + if (results.size() != 1) { + throw new RpcException(io.grpc.Status.Code.INTERNAL, "no result from server"); + } + Service.NackResult first = results.get(0); + if (first.getResultCase() == Service.NackResult.ResultCase.ERROR) { + throw mapNackResultError(first.getError()); + } + if (first.getResultCase() == Service.NackResult.ResultCase.RESULT_NOT_SET) { + throw new RpcException(io.grpc.Status.Code.INTERNAL, "no result from server"); + } } catch (StatusRuntimeException e) { throw mapNackError(e); } @@ -268,41 +296,45 @@ public void close() { private String enqueueDirect(String queue, Map headers, byte[] payload) { Service.EnqueueRequest req = Service.EnqueueRequest.newBuilder() - .setQueue(queue) - .putAllHeaders(headers) - .setPayload(com.google.protobuf.ByteString.copyFrom(payload)) + .addMessages( + Service.EnqueueMessage.newBuilder() + .setQueue(queue) + .putAllHeaders(headers) + .setPayload(com.google.protobuf.ByteString.copyFrom(payload)) + .build()) .build(); try { Service.EnqueueResponse resp = blockingStub.enqueue(req); - return resp.getMessageId(); + List results = resp.getResultsList(); + if (results.isEmpty()) { + throw new RpcException(io.grpc.Status.Code.INTERNAL, "no result from server"); + } + Service.EnqueueResult first = results.get(0); + switch (first.getResultCase()) { + case MESSAGE_ID: + return first.getMessageId(); + case ERROR: + throw mapEnqueueResultError(first.getError()); + default: + throw new RpcException(io.grpc.Status.Code.INTERNAL, "no result from server"); + } } catch (StatusRuntimeException e) { throw mapEnqueueError(e); } } - /** - * Consume a stream, unpacking both singular and batched responses into individual messages. - * - *

Prefers the batched {@code messages} field when non-empty, falling back to the singular - * {@code message} field for backward compatibility with older servers. - */ + /** Consume a stream, unpacking batched responses into individual messages. */ private static void consumeStream( Iterator stream, Consumer handler) { while (stream.hasNext()) { Service.ConsumeResponse resp = stream.next(); - // Prefer the batched messages field when non-empty. - List batchedMessages = resp.getMessagesList(); - if (!batchedMessages.isEmpty()) { - for (Messages.Message msg : batchedMessages) { - if (msg.getId().isEmpty()) { - continue; - } - handler.accept(buildConsumeMessage(msg)); + List messages = resp.getMessagesList(); + for (Messages.Message msg : messages) { + if (msg.getId().isEmpty()) { + continue; } - } else if (resp.hasMessage() && !resp.getMessage().getId().isEmpty()) { - // Fall back to singular message for backward compatibility. - handler.accept(buildConsumeMessage(resp.getMessage())); + handler.accept(buildConsumeMessage(msg)); } } } @@ -411,11 +443,13 @@ static FilaException mapEnqueueError(StatusRuntimeException e) { }; } - static FilaException mapBatchEnqueueError(StatusRuntimeException e) { - return switch (e.getStatus().getCode()) { - case NOT_FOUND -> - new QueueNotFoundException("batch enqueue: " + e.getStatus().getDescription()); - default -> new RpcException(e.getStatus().getCode(), e.getStatus().getDescription()); + private static FilaException mapEnqueueResultError(Service.EnqueueError error) { + return switch (error.getCode()) { + case ENQUEUE_ERROR_CODE_QUEUE_NOT_FOUND -> + new QueueNotFoundException("enqueue: " + error.getMessage()); + case ENQUEUE_ERROR_CODE_PERMISSION_DENIED -> + new RpcException(io.grpc.Status.Code.PERMISSION_DENIED, error.getMessage()); + default -> new RpcException(io.grpc.Status.Code.INTERNAL, error.getMessage()); }; } @@ -433,6 +467,16 @@ private static FilaException mapAckError(StatusRuntimeException e) { }; } + private static FilaException mapAckResultError(Service.AckError error) { + return switch (error.getCode()) { + case ACK_ERROR_CODE_MESSAGE_NOT_FOUND -> + new MessageNotFoundException("ack: " + error.getMessage()); + case ACK_ERROR_CODE_PERMISSION_DENIED -> + new RpcException(io.grpc.Status.Code.PERMISSION_DENIED, error.getMessage()); + default -> new RpcException(io.grpc.Status.Code.INTERNAL, error.getMessage()); + }; + } + private static FilaException mapNackError(StatusRuntimeException e) { return switch (e.getStatus().getCode()) { case NOT_FOUND -> new MessageNotFoundException("nack: " + e.getStatus().getDescription()); @@ -440,6 +484,16 @@ private static FilaException mapNackError(StatusRuntimeException e) { }; } + private static FilaException mapNackResultError(Service.NackError error) { + return switch (error.getCode()) { + case NACK_ERROR_CODE_MESSAGE_NOT_FOUND -> + new MessageNotFoundException("nack: " + error.getMessage()); + case NACK_ERROR_CODE_PERMISSION_DENIED -> + new RpcException(io.grpc.Status.Code.PERMISSION_DENIED, error.getMessage()); + default -> new RpcException(io.grpc.Status.Code.INTERNAL, error.getMessage()); + }; + } + /** Builder for {@link FilaClient}. */ public static final class Builder { private final String address; diff --git a/src/test/java/dev/faisca/fila/BatchClientTest.java b/src/test/java/dev/faisca/fila/BatchClientTest.java index 6338b58..7214cd7 100644 --- a/src/test/java/dev/faisca/fila/BatchClientTest.java +++ b/src/test/java/dev/faisca/fila/BatchClientTest.java @@ -16,7 +16,7 @@ import org.junit.jupiter.api.condition.EnabledIf; /** - * Integration tests for batch enqueue and smart batching. + * Integration tests for enqueueMany and smart batching. * *

Requires a fila-server binary. Skipped if not available. */ @@ -45,7 +45,7 @@ static boolean serverAvailable() { } @Test - void explicitBatchEnqueue() { + void explicitEnqueueMany() { try (FilaClient client = FilaClient.builder(server.address()).withBatchMode(BatchMode.disabled()).build()) { List messages = new ArrayList<>(); @@ -57,11 +57,11 @@ void explicitBatchEnqueue() { ("batch-msg-" + i).getBytes())); } - List results = client.batchEnqueue(messages); + List results = client.enqueueMany(messages); assertEquals(5, results.size()); Set ids = new HashSet<>(); - for (BatchEnqueueResult result : results) { + for (EnqueueResult result : results) { assertTrue(result.isSuccess(), "each message should succeed"); assertFalse(result.getMessageId().isEmpty()); ids.add(result.getMessageId()); @@ -71,7 +71,7 @@ void explicitBatchEnqueue() { } @Test - void explicitBatchWithNonexistentQueue() { + void explicitEnqueueManyWithNonexistentQueue() { try (FilaClient client = FilaClient.builder(server.address()).withBatchMode(BatchMode.disabled()).build()) { List messages = new ArrayList<>(); @@ -79,7 +79,7 @@ void explicitBatchWithNonexistentQueue() { messages.add(new EnqueueMessage("no-such-queue", Map.of(), "bad-msg".getBytes())); messages.add(new EnqueueMessage("test-batch-explicit", Map.of(), "another-good".getBytes())); - List results = client.batchEnqueue(messages); + List results = client.enqueueMany(messages); assertEquals(3, results.size()); // First and third should succeed, second should fail diff --git a/src/test/java/dev/faisca/fila/BatchEnqueueResultTest.java b/src/test/java/dev/faisca/fila/EnqueueResultTest.java similarity index 61% rename from src/test/java/dev/faisca/fila/BatchEnqueueResultTest.java rename to src/test/java/dev/faisca/fila/EnqueueResultTest.java index 2ba657b..d22a317 100644 --- a/src/test/java/dev/faisca/fila/BatchEnqueueResultTest.java +++ b/src/test/java/dev/faisca/fila/EnqueueResultTest.java @@ -4,32 +4,32 @@ import org.junit.jupiter.api.Test; -/** Unit tests for BatchEnqueueResult. */ -class BatchEnqueueResultTest { +/** Unit tests for EnqueueResult. */ +class EnqueueResultTest { @Test void successResult() { - BatchEnqueueResult result = BatchEnqueueResult.success("msg-123"); + EnqueueResult result = EnqueueResult.success("msg-123"); assertTrue(result.isSuccess()); assertEquals("msg-123", result.getMessageId()); } @Test void successGetErrorThrows() { - BatchEnqueueResult result = BatchEnqueueResult.success("msg-123"); + EnqueueResult result = EnqueueResult.success("msg-123"); assertThrows(IllegalStateException.class, result::getError); } @Test void errorResult() { - BatchEnqueueResult result = BatchEnqueueResult.error("queue not found"); + EnqueueResult result = EnqueueResult.error("queue not found"); assertFalse(result.isSuccess()); assertEquals("queue not found", result.getError()); } @Test void errorGetMessageIdThrows() { - BatchEnqueueResult result = BatchEnqueueResult.error("queue not found"); + EnqueueResult result = EnqueueResult.error("queue not found"); assertThrows(IllegalStateException.class, result::getMessageId); } }