Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 24 additions & 15 deletions src/test/java/dev/faisca/fila/FibpAdminClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@
/**
* Minimal FIBP admin client for test infrastructure.
*
* <p>Supports CreateQueue only. Admin operation payloads are protobuf-encoded (matching the
* server's fila-core admin dispatch). We hand-roll the minimal protobuf needed to avoid a test
* dependency on a protobuf runtime.
* <p>Supports CreateQueue only. Admin operation payloads use binary encoding (same wire format as
* data ops).
*
* <p>Supports both plaintext and TLS/mTLS connections.
*/
Expand Down Expand Up @@ -133,26 +132,36 @@ private void authenticate(String apiKey) throws IOException {
}

/**
* Encode a CreateQueueRequest protobuf message with just the name field.
* Encode a CreateQueue request using the binary wire format.
*
* <p>Protobuf encoding: field 1 (name, string) = tag 0x0A (field=1, wire_type=2) + varint(len) +
* utf8 bytes.
* <p>Wire format: queue_len:u16 + queue:utf8 + on_enqueue_len:u16 + on_enqueue:utf8 +
* on_failure_len:u16 + on_failure:utf8 + visibility_timeout_ms:u32
*/
private static byte[] encodeCreateQueueRequest(String name) {
private static byte[] encodeCreateQueueRequest(String name) throws IOException {
byte[] nameBytes = name.getBytes(StandardCharsets.UTF_8);
if (nameBytes.length > 0xFFFF) {
throw new IOException(
"queue name too long: UTF-8 length " + nameBytes.length + " exceeds u16 max (65535)");
}
ByteArrayOutputStream buf = new ByteArrayOutputStream();
buf.write(0x0A); // field 1, wire type 2 (length-delimited)
writeVarint(buf, nameBytes.length);
writeU16(buf, nameBytes.length);
Comment thread
cubic-dev-ai[bot] marked this conversation as resolved.
buf.write(nameBytes, 0, nameBytes.length);
writeU16(buf, 0); // on_enqueue: empty
writeU16(buf, 0); // on_failure: empty
writeU32(buf, 0); // visibility_timeout_ms: 0
return buf.toByteArray();
}

private static void writeVarint(ByteArrayOutputStream buf, int value) {
while ((value & ~0x7F) != 0) {
buf.write((value & 0x7F) | 0x80);
value >>>= 7;
}
buf.write(value);
private static void writeU16(ByteArrayOutputStream buf, int value) {
buf.write((value >> 8) & 0xFF);
buf.write(value & 0xFF);
}

private static void writeU32(ByteArrayOutputStream buf, int value) {
buf.write((value >> 24) & 0xFF);
buf.write((value >> 16) & 0xFF);
buf.write((value >> 8) & 0xFF);
buf.write(value & 0xFF);
}

private static KeyManagerFactory buildKeyManagerFactory(byte[] certPem, byte[] keyPem)
Expand Down
Loading