diff --git a/.rubocop.yml b/.rubocop.yml index 106174c..ea1e5b8 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -3,9 +3,11 @@ AllCops: NewCops: enable SuggestExtensions: false Exclude: - - 'lib/fila/proto/**/*' - 'vendor/**/*' +Metrics/ClassLength: + Max: 300 + Metrics/MethodLength: Max: 25 Exclude: diff --git a/README.md b/README.md index 9c59120..e3e1d09 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # fila-ruby -Ruby client SDK for the [Fila](https://github.com/faisca/fila) message broker. +Ruby client SDK for the [Fila](https://github.com/faisca/fila) message broker using the FIBP binary protocol. ## Installation @@ -44,21 +44,9 @@ end client.close ``` -### TLS (system trust store) - -```ruby -require "fila" - -# TLS using the OS system trust store (e.g., server uses a public CA). -client = Fila::Client.new("localhost:5555", tls: true) -``` - ### TLS (custom CA) ```ruby -require "fila" - -# TLS with an explicit CA certificate (e.g., private/self-signed CA). client = Fila::Client.new("localhost:5555", ca_cert: File.read("ca.pem") ) @@ -67,16 +55,6 @@ client = Fila::Client.new("localhost:5555", ### mTLS (mutual TLS) ```ruby -require "fila" - -# Mutual TLS with system trust store. -client = Fila::Client.new("localhost:5555", - tls: true, - client_cert: File.read("client.pem"), - client_key: File.read("client-key.pem") -) - -# Mutual TLS with explicit CA certificate. client = Fila::Client.new("localhost:5555", ca_cert: File.read("ca.pem"), client_cert: File.read("client.pem"), @@ -87,9 +65,6 @@ client = Fila::Client.new("localhost:5555", ### API Key Authentication ```ruby -require "fila" - -# API key sent as Bearer token on every request. client = Fila::Client.new("localhost:5555", api_key: "fila_your_api_key_here" ) @@ -98,9 +73,6 @@ client = Fila::Client.new("localhost:5555", ### mTLS + API Key ```ruby -require "fila" - -# Full security: mTLS transport + API key authentication. client = Fila::Client.new("localhost:5555", ca_cert: File.read("ca.pem"), client_cert: File.read("client.pem"), @@ -109,9 +81,73 @@ client = Fila::Client.new("localhost:5555", ) ``` +### Batch Enqueue + +```ruby +results = client.enqueue_many([ + { queue: "orders", payload: "order-1", headers: { "tenant" => "acme" } }, + { queue: "orders", payload: "order-2" }, +]) + +results.each do |r| + if r.success? + puts "Enqueued: #{r.message_id}" + else + puts "Failed: #{r.error}" + end +end +``` + +### Admin Operations + +```ruby +# Create a queue. +client.create_queue(name: "my-queue") + +# Delete a queue. +client.delete_queue(queue: "my-queue") + +# Get queue statistics. +stats = client.get_stats(queue: "my-queue") + +# List all queues. +queues = client.list_queues + +# Runtime configuration. +client.set_config(key: "queues.my-queue.visibility_timeout_ms", value: "30000") +value = client.get_config(key: "queues.my-queue.visibility_timeout_ms") +entries = client.list_config(prefix: "queues.") + +# Redrive DLQ messages. +count = client.redrive(dlq_queue: "my-queue-dlq", count: 100) +``` + +### Auth Operations + +```ruby +# Create an API key. +result = client.create_api_key(name: "my-key", is_superadmin: false) +puts result[:key] + +# Revoke an API key. +client.revoke_api_key(key_id: result[:key_id]) + +# List API keys. +keys = client.list_api_keys + +# Set ACL permissions. +client.set_acl(key_id: "key-id", permissions: [ + { kind: "produce", pattern: "orders.*" }, + { kind: "consume", pattern: "orders.*" }, +]) + +# Get ACL permissions. +acl = client.get_acl(key_id: "key-id") +``` + ## API -### `Fila::Client.new(addr, tls: false, ca_cert: nil, client_cert: nil, client_key: nil, api_key: nil)` +### `Fila::Client.new(addr, ...)` Connect to a Fila broker at the given address (e.g., `"localhost:5555"`). @@ -122,47 +158,46 @@ Connect to a Fila broker at the given address (e.g., `"localhost:5555"`). | `ca_cert:` | `String` or `nil` | PEM-encoded CA certificate for TLS (implies `tls: true`) | | `client_cert:` | `String` or `nil` | PEM-encoded client certificate for mTLS | | `client_key:` | `String` or `nil` | PEM-encoded client private key for mTLS | -| `api_key:` | `String` or `nil` | API key for Bearer token authentication | - -When no TLS/auth options are provided, the client connects over plaintext (backward compatible). When `tls: true` is set without `ca_cert:`, the OS system trust store is used for server certificate verification. +| `api_key:` | `String` or `nil` | API key sent during FIBP handshake | +| `batch_mode:` | `Symbol` | `:auto` (default), `:linger`, or `:disabled` | -### `client.enqueue(queue:, headers:, payload:)` +### `client.enqueue(queue:, payload:, headers: nil)` Enqueue a message. Returns the broker-assigned message ID (UUIDv7). +### `client.enqueue_many(messages)` + +Enqueue multiple messages in a single request. Returns an array of `Fila::EnqueueResult`. + ### `client.consume(queue:) { |msg| ... }` -Open a streaming consumer. Yields `Fila::ConsumeMessage` objects as they become available. If no block is given, returns an `Enumerator`. Nacked messages are redelivered on the same stream. +Open a streaming consumer. Yields `Fila::ConsumeMessage` objects. If no block is given, returns an `Enumerator`. ### `client.ack(queue:, msg_id:)` -Acknowledge a successfully processed message. The message is permanently removed. +Acknowledge a successfully processed message. ### `client.nack(queue:, msg_id:, error:)` -Negatively acknowledge a failed message. The message is requeued or routed to the dead-letter queue based on the queue's configuration. +Negatively acknowledge a failed message. ### `client.close` -Close the underlying gRPC channel. +Drain pending batches and close the TCP connection. ## Error Handling Per-operation error classes are raised for specific failure modes: -```ruby -begin - client.enqueue(queue: "missing-queue", payload: "test") -rescue Fila::QueueNotFoundError => e - # handle queue not found -end - -begin - client.ack(queue: "my-queue", msg_id: "missing-id") -rescue Fila::MessageNotFoundError => e - # handle message not found -end -``` +| Error Class | Description | +|---|---| +| `Fila::QueueNotFoundError` | Queue does not exist | +| `Fila::MessageNotFoundError` | Message not found or not leased | +| `Fila::QueueAlreadyExistsError` | Queue already exists | +| `Fila::AuthenticationError` | Missing or invalid API key | +| `Fila::ForbiddenError` | Insufficient permissions | +| `Fila::NotLeaderError` | Not the leader (includes leader hint) | +| `Fila::RPCError` | Transport or protocol failure | ## License diff --git a/fila-client.gemspec b/fila-client.gemspec index 98cbfad..1881081 100644 --- a/fila-client.gemspec +++ b/fila-client.gemspec @@ -7,15 +7,13 @@ Gem::Specification.new do |spec| spec.version = Fila::VERSION spec.authors = ['Faisca'] spec.summary = 'Ruby client SDK for the Fila message broker' - spec.description = "Idiomatic Ruby client wrapping Fila's gRPC API for enqueue, consume, ack, and nack operations." + spec.description = 'Idiomatic Ruby client for the Fila message broker using the FIBP binary protocol.' spec.homepage = 'https://github.com/faiscadev/fila-ruby' spec.license = 'AGPL-3.0-or-later' spec.required_ruby_version = '>= 3.1' - spec.files = Dir['lib/**/*.rb', 'proto/**/*.proto', 'LICENSE', 'README.md'] + spec.files = Dir['lib/**/*.rb', 'LICENSE', 'README.md'] spec.require_paths = ['lib'] - spec.add_dependency 'google-protobuf', '~> 4.0' - spec.add_dependency 'grpc', '~> 1.60' spec.metadata['rubygems_mfa_required'] = 'true' end diff --git a/lib/fila.rb b/lib/fila.rb index 91e321c..39e9462 100644 --- a/lib/fila.rb +++ b/lib/fila.rb @@ -3,4 +3,9 @@ require_relative 'fila/version' require_relative 'fila/errors' require_relative 'fila/consume_message' +require_relative 'fila/enqueue_result' +require_relative 'fila/fibp/opcodes' +require_relative 'fila/fibp/codec' +require_relative 'fila/fibp/connection' +require_relative 'fila/batcher' require_relative 'fila/client' diff --git a/lib/fila/batcher.rb b/lib/fila/batcher.rb new file mode 100644 index 0000000..1616013 --- /dev/null +++ b/lib/fila/batcher.rb @@ -0,0 +1,198 @@ +# frozen_string_literal: true + +module Fila + # Background batcher that collects enqueue messages and flushes them + # in batches via the FIBP binary protocol. Supports auto (opportunistic) + # and linger (timer-based) modes. + # + # @api private + class Batcher + # An item queued for batching, pairing a message hash with its result slot. + BatchItem = Struct.new(:message, :result_queue, keyword_init: true) + + attr_writer :conn + + # @param conn [Fila::FIBP::Connection] FIBP connection + # @param mode [Symbol] :auto or :linger + # @param max_batch_size [Integer] cap on batch size (auto mode) + # @param batch_size [Integer] batch size threshold (linger mode) + # @param linger_ms [Integer] linger time in ms (linger mode) + def initialize(conn:, mode:, max_batch_size: 100, batch_size: 100, linger_ms: 10) + @conn = conn + @mode = mode + @max_batch_size = mode == :auto ? max_batch_size : batch_size + @linger_ms = linger_ms + @queue = Queue.new + @stopped = false + @mutex = Mutex.new + + @thread = Thread.new { run_loop } + @thread.abort_on_exception = true + end + + # Submit a message for batched sending. Blocks until the batch + # containing this message is flushed and the result is available. + # + # @param message [Hash] message hash with :queue, :headers, :payload + # @return [String] message ID on success + # @raise [Fila::QueueNotFoundError] if the queue does not exist + # @raise [Fila::RPCError] for unexpected failures + def submit(message) + result_queue = Queue.new + item = BatchItem.new(message: message, result_queue: result_queue) + + @mutex.synchronize do + raise Fila::Error, 'batcher is closed' if @stopped + + @queue.push(item) + end + + outcome = result_queue.pop + case outcome + when String then outcome + when Exception then raise outcome + else raise Fila::Error, "unexpected batcher result: #{outcome.inspect}" + end + end + + # Drain pending messages and stop the background thread. + def close + @mutex.synchronize { @stopped = true } + @queue.push(:shutdown) + @thread.join + end + + private + + def run_loop + case @mode + when :auto then run_auto_loop + when :linger then run_linger_loop + end + end + + def run_auto_loop + loop do + first = @queue.pop + break if first == :shutdown + + batch = [first] + drain_nonblocking(batch) + flush_batch(batch) + end + end + + def run_linger_loop + loop do + first = @queue.pop + break if first == :shutdown + + batch = [first] + deadline = current_time_ms + @linger_ms + + while batch.size < @max_batch_size + remaining_ms = deadline - current_time_ms + break if remaining_ms <= 0 + + begin + item = pop_with_timeout(remaining_ms) + break if item == :shutdown + + batch << item + rescue ThreadError + break + end + end + + flush_batch(batch) + end + end + + def drain_nonblocking(batch) + while batch.size < @max_batch_size + begin + item = @queue.pop(true) + if item == :shutdown + @queue.push(:shutdown) + break + end + batch << item + rescue ThreadError + break + end + end + end + + # Flush a batch of items via FIBP Enqueue. + def flush_batch(items) + opcode, resp = send_enqueue_batch(items) + handle_flush_error(items, resp) && return if opcode == FIBP::Opcodes::ERROR + + dispatch_results(items, resp) + rescue StandardError => e + broadcast_error(items, Fila::Error.new(e.message)) + end + + def send_enqueue_batch(items) + payload = encode_enqueue_batch(items.map(&:message)) + @conn.request(FIBP::Opcodes::ENQUEUE, payload) + end + + def handle_flush_error(items, resp) + reader = FIBP::Codec::Reader.new(resp) + broadcast_error(items, RPCError.new(reader.read_u8, reader.read_string)) + end + + def dispatch_results(items, resp) + reader = FIBP::Codec::Reader.new(resp) + count = reader.read_u32 + items.each_with_index do |item, idx| + outcome = if idx < count + result_to_outcome(reader.read_u8, reader.read_string) + else + Fila::Error.new('no result from server') + end + item.result_queue.push(outcome) + end + end + + def encode_enqueue_batch(messages) + buf = FIBP::Codec.encode_u32(messages.size) + messages.each do |m| + buf << FIBP::Codec.encode_string(m[:queue]) + buf << FIBP::Codec.encode_map(m[:headers] || {}) + buf << FIBP::Codec.encode_bytes(m[:payload]) + end + buf + end + + def result_to_outcome(code, msg_id) + case code + when FIBP::ErrorCodes::OK then msg_id + when FIBP::ErrorCodes::QUEUE_NOT_FOUND + QueueNotFoundError.new('enqueue: queue not found') + else + RPCError.new(code, 'enqueue failed') + end + end + + def broadcast_error(items, err) + items.each { |item| item.result_queue.push(err) } + end + + def current_time_ms + (Process.clock_gettime(Process::CLOCK_MONOTONIC) * 1000).to_i + end + + def pop_with_timeout(timeout_ms) + deadline = current_time_ms + timeout_ms + loop do + return @queue.pop(true) + rescue ThreadError + raise if current_time_ms >= deadline + + sleep(0.001) + end + end + end +end diff --git a/lib/fila/client.rb b/lib/fila/client.rb index dbcdc73..3ebc9c8 100644 --- a/lib/fila/client.rb +++ b/lib/fila/client.rb @@ -1,27 +1,25 @@ # frozen_string_literal: true -require 'grpc' - -# Add proto directory to load path so generated requires resolve correctly. -$LOAD_PATH.unshift(File.expand_path('proto', __dir__)) unless $LOAD_PATH.include?(File.expand_path('proto', __dir__)) - -require_relative 'proto/fila/v1/service_services_pb' require_relative 'errors' require_relative 'consume_message' +require_relative 'enqueue_result' +require_relative 'batcher' +require_relative 'fibp/opcodes' +require_relative 'fibp/codec' +require_relative 'fibp/connection' module Fila - # Client for the Fila message broker. - # - # Wraps the hot-path gRPC operations: enqueue, consume, ack, nack. + # Client for the Fila message broker over the FIBP binary protocol. # - # @example Plain-text (no auth) + # @example Plain-text, default auto-batching # client = Fila::Client.new("localhost:5555") # - # @example TLS with system trust store - # client = Fila::Client.new("localhost:5555", tls: true) + # @example Batching disabled + # client = Fila::Client.new("localhost:5555", batch_mode: :disabled) # # @example TLS with custom CA - # client = Fila::Client.new("localhost:5555", ca_cert: File.read("ca.pem")) + # client = Fila::Client.new("localhost:5555", + # ca_cert: File.read("ca.pem")) # # @example mTLS + API key # client = Fila::Client.new("localhost:5555", @@ -29,95 +27,110 @@ module Fila # client_cert: File.read("client.pem"), # client_key: File.read("client-key.pem"), # api_key: "fila_abc123") - class Client - # Connect to a Fila broker at the given address. - # - # @param addr [String] broker address in "host:port" format (e.g., "localhost:5555") - # @param tls [Boolean] enable TLS using the OS system trust store (default: false) - # @param ca_cert [String, nil] PEM-encoded CA certificate for TLS verification (implies tls: true) - # @param client_cert [String, nil] PEM-encoded client certificate for mTLS - # @param client_key [String, nil] PEM-encoded client private key for mTLS - # @param api_key [String, nil] API key for Bearer token authentication - def initialize(addr, tls: false, ca_cert: nil, client_cert: nil, client_key: nil, api_key: nil) + class Client # rubocop:disable Metrics/ClassLength + BATCH_MODES = %i[auto linger disabled].freeze + + private_constant :BATCH_MODES + + # @param addr [String] server address (host:port) + # @param tls [Boolean] enable TLS with system trust store + # @param ca_cert [String, nil] PEM-encoded CA certificate + # @param client_cert [String, nil] PEM-encoded client certificate (mTLS) + # @param client_key [String, nil] PEM-encoded client key (mTLS) + # @param api_key [String, nil] API key for authentication + # @param batch_mode [Symbol] :auto (default), :linger, or :disabled + # @param max_batch_size [Integer] max batch size for auto mode + # @param batch_size [Integer] batch size for linger mode + # @param linger_ms [Integer] linger time in ms for linger mode + def initialize( # rubocop:disable Metrics/ParameterLists + addr, tls: false, ca_cert: nil, client_cert: nil, client_key: nil, + api_key: nil, batch_mode: :auto, max_batch_size: 100, + batch_size: 100, linger_ms: 10 + ) + validate_batch_mode(batch_mode) + @addr = addr + @tls = tls + @ca_cert = ca_cert + @client_cert = client_cert + @client_key = client_key @api_key = api_key - credentials = build_credentials(tls: tls, ca_cert: ca_cert, client_cert: client_cert, client_key: client_key) - @stub = ::Fila::V1::FilaService::Stub.new(addr, credentials) + + @conn = build_connection(addr) + @batcher = start_batcher(batch_mode, max_batch_size, batch_size, linger_ms) end - # Close the underlying gRPC channel. + # Drain pending batched messages and disconnect. def close - # grpc-ruby doesn't expose a direct channel close on stubs; - # the channel is garbage-collected. This is a no-op for API symmetry. + @batcher&.close + @batcher = nil + @conn&.close + @conn = nil end - # Enqueue a message to the specified queue. + # Enqueue a single message to a queue. # # @param queue [String] target queue name - # @param headers [Hash, nil] optional message headers - # @param payload [String] message payload bytes - # @return [String] broker-assigned message ID (UUIDv7) - # @raise [QueueNotFoundError] if the queue does not exist - # @raise [RPCError] for unexpected gRPC failures + # @param payload [String] message payload + # @param headers [Hash, nil] optional headers + # @return [String] broker-assigned message ID def enqueue(queue:, payload:, headers: nil) - req = ::Fila::V1::EnqueueRequest.new( - queue: queue, - headers: headers || {}, - payload: payload - ) - resp = @stub.enqueue(req, metadata: call_metadata) - resp.message_id - rescue GRPC::NotFound => e - raise QueueNotFoundError, "enqueue: #{e.details}" - rescue GRPC::BadStatus => e - raise RPCError.new(e.code, e.details) + msg = { queue: queue, headers: headers || {}, payload: payload } + + if @batcher + @batcher.submit(msg) + else + enqueue_single(msg) + end end - # Open a streaming consumer on the specified queue. - # - # Yields messages as they become available. Nil message frames (keepalive - # signals) are skipped automatically. Nacked messages are redelivered on - # the same stream. - # - # If no block is given, returns an Enumerator. + # Enqueue multiple messages in a single request. # - # @param queue [String] queue to consume from - # @yield [ConsumeMessage] each message received from the broker - # @return [Enumerator] if no block given - # @raise [QueueNotFoundError] if the queue does not exist - # @raise [RPCError] for unexpected gRPC failures + # @param messages [Array] messages with :queue, :payload, :headers + # @return [Array] + def enqueue_many(messages) + return [] if messages.empty? + + payload = encode_enqueue_batch(messages) + opcode, resp = @conn.request(FIBP::Opcodes::ENQUEUE, payload) + + raise_from_error_frame(resp) if opcode == FIBP::Opcodes::ERROR + + decode_enqueue_results(resp) + end + + # Open a streaming consumer. Yields messages as they arrive. + # Returns an Enumerator if no block given. def consume(queue:, &block) return enum_for(:consume, queue: queue) unless block - req = ::Fila::V1::ConsumeRequest.new(queue: queue) - stream = @stub.consume(req, metadata: call_metadata) - stream.each do |resp| - msg = resp.message - next if msg.nil? || msg.id.empty? - - block.call(build_consume_message(msg)) - end - rescue GRPC::Cancelled - # Stream cancelled — normal when consumer breaks out of the loop. - rescue GRPC::NotFound => e - raise QueueNotFoundError, "consume: #{e.details}" - rescue GRPC::BadStatus => e - raise RPCError.new(e.code, e.details) + consume_with_redirect(queue: queue, redirected: false, &block) end # Acknowledge a successfully processed message. # # @param queue [String] queue the message belongs to # @param msg_id [String] ID of the message to acknowledge - # @raise [MessageNotFoundError] if the message does not exist - # @raise [RPCError] for unexpected gRPC failures def ack(queue:, msg_id:) - req = ::Fila::V1::AckRequest.new(queue: queue, message_id: msg_id) - @stub.ack(req, metadata: call_metadata) - nil - rescue GRPC::NotFound => e - raise MessageNotFoundError, "ack: #{e.details}" - rescue GRPC::BadStatus => e - raise RPCError.new(e.code, e.details) + payload = FIBP::Codec.encode_u32(1) + + FIBP::Codec.encode_string(queue) + + FIBP::Codec.encode_string(msg_id) + opcode, resp = @conn.request(FIBP::Opcodes::ACK, payload) + + raise_from_error_frame(resp) if opcode == FIBP::Opcodes::ERROR + + reader = FIBP::Codec::Reader.new(resp) + count = reader.read_u32 + raise RPCError.new(0xFF, 'no result from server') if count.zero? + + code = reader.read_u8 + return if code == FIBP::ErrorCodes::OK + + case code + when FIBP::ErrorCodes::MESSAGE_NOT_FOUND + raise MessageNotFoundError, 'ack: message not found' + else + raise RPCError.new(code, 'ack failed') + end end # Negatively acknowledge a message that failed processing. @@ -125,72 +138,527 @@ def ack(queue:, msg_id:) # @param queue [String] queue the message belongs to # @param msg_id [String] ID of the message to nack # @param error [String] description of the failure - # @raise [MessageNotFoundError] if the message does not exist - # @raise [RPCError] for unexpected gRPC failures def nack(queue:, msg_id:, error:) - req = ::Fila::V1::NackRequest.new(queue: queue, message_id: msg_id, error: error) - @stub.nack(req, metadata: call_metadata) - nil - rescue GRPC::NotFound => e - raise MessageNotFoundError, "nack: #{e.details}" - rescue GRPC::BadStatus => e - raise RPCError.new(e.code, e.details) + payload = FIBP::Codec.encode_u32(1) + + FIBP::Codec.encode_string(queue) + + FIBP::Codec.encode_string(msg_id) + + FIBP::Codec.encode_string(error) + opcode, resp = @conn.request(FIBP::Opcodes::NACK, payload) + + raise_from_error_frame(resp) if opcode == FIBP::Opcodes::ERROR + + reader = FIBP::Codec::Reader.new(resp) + count = reader.read_u32 + raise RPCError.new(0xFF, 'no result from server') if count.zero? + + code = reader.read_u8 + return if code == FIBP::ErrorCodes::OK + + case code + when FIBP::ErrorCodes::MESSAGE_NOT_FOUND + raise MessageNotFoundError, 'nack: message not found' + else + raise RPCError.new(code, 'nack failed') + end end - private + # --- Admin operations --- + + # Create a queue. + # + # @param name [String] queue name + # @param on_enqueue_script [String, nil] Lua enqueue script + # @param on_failure_script [String, nil] Lua failure script + # @param visibility_timeout_ms [Integer] visibility timeout (0 = server default) + # @return [String] created queue ID + def create_queue(name:, on_enqueue_script: nil, on_failure_script: nil, visibility_timeout_ms: 0) + payload = FIBP::Codec.encode_string(name) + + FIBP::Codec.encode_optional_string(on_enqueue_script) + + FIBP::Codec.encode_optional_string(on_failure_script) + + FIBP::Codec.encode_u64(visibility_timeout_ms) + opcode, resp = @conn.request(FIBP::Opcodes::CREATE_QUEUE, payload) + + raise_from_error_frame(resp) if opcode == FIBP::Opcodes::ERROR + + reader = FIBP::Codec::Reader.new(resp) + code = reader.read_u8 + queue_id = reader.read_string + + case code + when FIBP::ErrorCodes::OK then queue_id + when FIBP::ErrorCodes::QUEUE_ALREADY_EXISTS + raise QueueAlreadyExistsError, "queue '#{name}' already exists" + else + raise RPCError.new(code, 'create queue failed') + end + end - # Build gRPC channel credentials from the provided TLS options. + # Delete a queue. # - # When +ca_cert+ is provided, it is used for server verification (implies TLS). - # When +tls+ is true without +ca_cert+, the OS system trust store is used. - # When neither is set and no client certs are given, plaintext is used. + # @param queue [String] queue name + def delete_queue(queue:) + payload = FIBP::Codec.encode_string(queue) + opcode, resp = @conn.request(FIBP::Opcodes::DELETE_QUEUE, payload) + + raise_from_error_frame(resp) if opcode == FIBP::Opcodes::ERROR + + reader = FIBP::Codec::Reader.new(resp) + code = reader.read_u8 + return if code == FIBP::ErrorCodes::OK + + raise_for_error_code(code, 'delete queue') + end + + # Get statistics for a queue. # - # @return [Symbol, GRPC::Core::ChannelCredentials] credentials object - def build_credentials(tls:, ca_cert:, client_cert:, client_key:) - tls_enabled = tls || ca_cert - validate_tls_options(tls_enabled, client_cert, client_key) - return :this_channel_is_insecure unless tls_enabled + # @param queue [String] queue name + # @return [Hash] queue statistics + def get_stats(queue:) + payload = FIBP::Codec.encode_string(queue) + opcode, resp = @conn.request(FIBP::Opcodes::GET_STATS, payload) + + raise_from_error_frame(resp) if opcode == FIBP::Opcodes::ERROR + + decode_stats_result(resp) + end + + # List all queues. + # + # @return [Hash] with :cluster_node_count and :queues array + def list_queues + opcode, resp = @conn.request(FIBP::Opcodes::LIST_QUEUES, '') + + raise_from_error_frame(resp) if opcode == FIBP::Opcodes::ERROR + + decode_list_queues_result(resp) + end + + # Set a runtime config key. + def set_config(key:, value:) + payload = FIBP::Codec.encode_string(key) + FIBP::Codec.encode_string(value) + opcode, resp = @conn.request(FIBP::Opcodes::SET_CONFIG, payload) + + raise_from_error_frame(resp) if opcode == FIBP::Opcodes::ERROR + + reader = FIBP::Codec::Reader.new(resp) + code = reader.read_u8 + return if code == FIBP::ErrorCodes::OK + + raise_for_error_code(code, 'set config') + end + + # Get a runtime config value. + def get_config(key:) + payload = FIBP::Codec.encode_string(key) + opcode, resp = @conn.request(FIBP::Opcodes::GET_CONFIG, payload) + + raise_from_error_frame(resp) if opcode == FIBP::Opcodes::ERROR + + reader = FIBP::Codec::Reader.new(resp) + code = reader.read_u8 + raise_for_error_code(code, 'get config') unless code == FIBP::ErrorCodes::OK + + reader.read_string + end + + # List config keys by prefix. + def list_config(prefix:) + payload = FIBP::Codec.encode_string(prefix) + opcode, resp = @conn.request(FIBP::Opcodes::LIST_CONFIG, payload) + + raise_from_error_frame(resp) if opcode == FIBP::Opcodes::ERROR + + reader = FIBP::Codec::Reader.new(resp) + code = reader.read_u8 + raise_for_error_code(code, 'list config') unless code == FIBP::ErrorCodes::OK + + count = reader.read_u16 + Array.new(count) do + { key: reader.read_string, value: reader.read_string } + end + end + + # Redrive messages from a DLQ back to their parent queue. + def redrive(dlq_queue:, count:) + payload = FIBP::Codec.encode_string(dlq_queue) + FIBP::Codec.encode_u64(count) + opcode, resp = @conn.request(FIBP::Opcodes::REDRIVE, payload) + + raise_from_error_frame(resp) if opcode == FIBP::Opcodes::ERROR + + reader = FIBP::Codec::Reader.new(resp) + code = reader.read_u8 + raise_for_error_code(code, 'redrive') unless code == FIBP::ErrorCodes::OK + + reader.read_u64 + end + + # --- Auth operations --- + + # Create an API key. + def create_api_key(name:, expires_at_ms: 0, is_superadmin: false) + payload = FIBP::Codec.encode_string(name) + + FIBP::Codec.encode_u64(expires_at_ms) + + FIBP::Codec.encode_bool(is_superadmin) + opcode, resp = @conn.request(FIBP::Opcodes::CREATE_API_KEY, payload) + + raise_from_error_frame(resp) if opcode == FIBP::Opcodes::ERROR + + reader = FIBP::Codec::Reader.new(resp) + code = reader.read_u8 + raise_for_error_code(code, 'create api key') unless code == FIBP::ErrorCodes::OK + + { key_id: reader.read_string, key: reader.read_string, is_superadmin: reader.read_bool } + end + + # Revoke an API key. + def revoke_api_key(key_id:) + payload = FIBP::Codec.encode_string(key_id) + opcode, resp = @conn.request(FIBP::Opcodes::REVOKE_API_KEY, payload) + + raise_from_error_frame(resp) if opcode == FIBP::Opcodes::ERROR + + reader = FIBP::Codec::Reader.new(resp) + code = reader.read_u8 + return if code == FIBP::ErrorCodes::OK + + raise_for_error_code(code, 'revoke api key') + end + + # List all API keys. + def list_api_keys + opcode, resp = @conn.request(FIBP::Opcodes::LIST_API_KEYS, '') + + raise_from_error_frame(resp) if opcode == FIBP::Opcodes::ERROR + + reader = FIBP::Codec::Reader.new(resp) + code = reader.read_u8 + raise_for_error_code(code, 'list api keys') unless code == FIBP::ErrorCodes::OK + + count = reader.read_u16 + Array.new(count) do + { + key_id: reader.read_string, name: reader.read_string, + created_at_ms: reader.read_u64, expires_at_ms: reader.read_u64, + is_superadmin: reader.read_bool + } + end + end + + # Set ACL permissions for an API key. + def set_acl(key_id:, permissions:) + payload = FIBP::Codec.encode_string(key_id) + + FIBP::Codec.encode_u16(permissions.size) + permissions.each do |perm| + payload += FIBP::Codec.encode_string(perm[:kind]) + + FIBP::Codec.encode_string(perm[:pattern]) + end + opcode, resp = @conn.request(FIBP::Opcodes::SET_ACL, payload) + + raise_from_error_frame(resp) if opcode == FIBP::Opcodes::ERROR + + reader = FIBP::Codec::Reader.new(resp) + code = reader.read_u8 + return if code == FIBP::ErrorCodes::OK + + raise_for_error_code(code, 'set acl') + end + + # Get ACL permissions for an API key. + def get_acl(key_id:) + payload = FIBP::Codec.encode_string(key_id) + opcode, resp = @conn.request(FIBP::Opcodes::GET_ACL, payload) + + raise_from_error_frame(resp) if opcode == FIBP::Opcodes::ERROR + + reader = FIBP::Codec::Reader.new(resp) + code = reader.read_u8 + raise_for_error_code(code, 'get acl') unless code == FIBP::ErrorCodes::OK + + key_id_resp = reader.read_string + is_superadmin = reader.read_bool + perm_count = reader.read_u16 + permissions = Array.new(perm_count) do + { kind: reader.read_string, pattern: reader.read_string } + end + { key_id: key_id_resp, is_superadmin: is_superadmin, permissions: permissions } + end + + ERROR_CODE_TO_CLASS = { + FIBP::ErrorCodes::QUEUE_NOT_FOUND => QueueNotFoundError, + FIBP::ErrorCodes::MESSAGE_NOT_FOUND => MessageNotFoundError, + FIBP::ErrorCodes::QUEUE_ALREADY_EXISTS => QueueAlreadyExistsError, + FIBP::ErrorCodes::UNAUTHORIZED => AuthenticationError, + FIBP::ErrorCodes::FORBIDDEN => ForbiddenError, + FIBP::ErrorCodes::API_KEY_NOT_FOUND => ApiKeyNotFoundError + }.freeze + + private_constant :ERROR_CODE_TO_CLASS + + private + + def validate_batch_mode(mode) + return if BATCH_MODES.include?(mode) + + raise ArgumentError, "invalid batch_mode: #{mode.inspect}, must be one of #{BATCH_MODES.inspect}" + end - build_channel_credentials(ca_cert, client_cert, client_key) + def build_connection(addr) + host, port_str = addr.split(':') + port = port_str.to_i + validate_tls_options + FIBP::Connection.new( + host: host, port: port, + tls: @tls, ca_cert: @ca_cert, + client_cert: @client_cert, client_key: @client_key, + api_key: @api_key + ) end - def validate_tls_options(tls_enabled, client_cert, client_key) - return if tls_enabled || (!client_cert && !client_key) + def validate_tls_options + tls_enabled = @tls || @ca_cert + return if tls_enabled || (!@client_cert && !@client_key) raise ArgumentError, 'tls: true or ca_cert is required when client_cert or client_key is provided' end - def build_channel_credentials(ca_cert, client_cert, client_key) - has_client_certs = client_cert && client_key + def start_batcher(mode, max_batch_size, batch_size, linger_ms) + return nil if mode == :disabled + + Batcher.new( + conn: @conn, + mode: mode, + max_batch_size: max_batch_size, + batch_size: batch_size, + linger_ms: linger_ms + ) + end + + def enqueue_single(msg) + payload = encode_enqueue_batch([msg]) + opcode, resp = @conn.request(FIBP::Opcodes::ENQUEUE, payload) + + raise_from_error_frame(resp) if opcode == FIBP::Opcodes::ERROR + + reader = FIBP::Codec::Reader.new(resp) + count = reader.read_u32 + raise RPCError.new(0xFF, 'no result from server') if count.zero? - if ca_cert - GRPC::Core::ChannelCredentials.new(ca_cert, client_key, client_cert) - elsif has_client_certs - GRPC::Core::ChannelCredentials.new(nil, client_key, client_cert) + code = reader.read_u8 + msg_id = reader.read_string + + case code + when FIBP::ErrorCodes::OK then msg_id + when FIBP::ErrorCodes::QUEUE_NOT_FOUND + raise QueueNotFoundError, 'enqueue: queue not found' else - GRPC::Core::ChannelCredentials.new + raise RPCError.new(code, 'enqueue failed') end end - # Return metadata hash for gRPC calls, including Bearer token when api_key is set. - # - # @return [Hash] metadata hash (may be empty) - def call_metadata - return {} unless @api_key + def encode_enqueue_batch(messages) + payload = FIBP::Codec.encode_u32(messages.size) + messages.each do |m| + payload << FIBP::Codec.encode_string(m[:queue]) + payload << FIBP::Codec.encode_map(m[:headers] || {}) + payload << FIBP::Codec.encode_bytes(m[:payload]) + end + payload + end + + def decode_enqueue_results(resp) + reader = FIBP::Codec::Reader.new(resp) + count = reader.read_u32 + Array.new(count) do + code = reader.read_u8 + msg_id = reader.read_string + if code == FIBP::ErrorCodes::OK + EnqueueResult.new(message_id: msg_id) + else + EnqueueResult.new(error: error_name(code)) + end + end + end + + def consume_with_redirect(queue:, redirected:, &) + payload = FIBP::Codec.encode_string(queue) + delivery_queue = Queue.new + done = [false] # mutable container for closure capture + rid = nil + + rid, response = subscribe_to_queue(payload, delivery_queue, done) + check_consume_response(response) + consume_delivery_loop(delivery_queue, &) + rescue NotLeaderError => e + rid = handle_not_leader_redirect(e, rid, done, redirected, queue, &) + rescue LocalJumpError + nil # Consumer break + ensure + done[0] = true + @conn&.cancel_consume(rid) if rid + end + + def handle_not_leader_redirect(error, rid, done, redirected, queue, &) + raise error if redirected || error.leader_addr.nil? - { 'authorization' => "Bearer #{@api_key}" } + done[0] = true + @conn&.cancel_consume(rid) if rid + reconnect_to(error.leader_addr) + consume_with_redirect(queue: queue, redirected: true, &) + nil + end + + def subscribe_to_queue(payload, delivery_queue, done) + @conn.subscribe(FIBP::Opcodes::CONSUME, payload) do |_opcode, del_payload| + delivery_queue.push(del_payload) unless done[0] + end + end + + def check_consume_response(response) + opcode, resp_payload = response + raise_from_error_frame(resp_payload) if opcode == FIBP::Opcodes::ERROR + end + + def consume_delivery_loop(delivery_queue, &block) + loop do + del_payload = delivery_queue.pop + break if del_payload.nil? + + process_delivery(del_payload, &block) + end + end + + def process_delivery(payload, &block) + reader = FIBP::Codec::Reader.new(payload) + count = reader.read_u32 + count.times do + msg = decode_delivery_message(reader) + block.call(msg) + end end - def build_consume_message(msg) - metadata = msg.metadata + def decode_delivery_message(reader) + msg_id = reader.read_string + queue = reader.read_string + headers = reader.read_map + payload = reader.read_bytes + fairness_key = reader.read_string + _weight = reader.read_u32 + _throttle_keys = reader.read_string_array + attempt_count = reader.read_u32 + _enqueued_at = reader.read_u64 + _leased_at = reader.read_u64 + ConsumeMessage.new( - id: msg.id, - headers: msg.headers.to_h, - payload: msg.payload, - fairness_key: metadata&.fairness_key.to_s, - attempt_count: metadata&.attempt_count.to_i, - queue: metadata&.queue_id.to_s + id: msg_id, + headers: headers, + payload: payload, + fairness_key: fairness_key, + attempt_count: attempt_count, + queue: queue ) end + + def reconnect_to(addr) + @conn&.close + @conn = build_connection(addr) + @batcher.conn = @conn if @batcher + end + + def raise_from_error_frame(resp) + reader = FIBP::Codec::Reader.new(resp) + code = reader.read_u8 + message = reader.read_string + metadata = reader.remaining.positive? ? reader.read_map : {} + + raise NotLeaderError.new(message, leader_addr: metadata['leader_addr']) if code == FIBP::ErrorCodes::NOT_LEADER + + klass = ERROR_CODE_TO_CLASS[code] + raise klass, message if klass + + raise RPCError.new(code, message) + end + + def raise_for_error_code(code, context) + case code + when FIBP::ErrorCodes::QUEUE_NOT_FOUND + raise QueueNotFoundError, "#{context}: queue not found" + when FIBP::ErrorCodes::MESSAGE_NOT_FOUND + raise MessageNotFoundError, "#{context}: message not found" + when FIBP::ErrorCodes::QUEUE_ALREADY_EXISTS + raise QueueAlreadyExistsError, "#{context}: queue already exists" + when FIBP::ErrorCodes::UNAUTHORIZED + raise AuthenticationError, "#{context}: unauthorized" + when FIBP::ErrorCodes::FORBIDDEN + raise ForbiddenError, "#{context}: forbidden" + when FIBP::ErrorCodes::API_KEY_NOT_FOUND + raise ApiKeyNotFoundError, "#{context}: api key not found" + else + raise RPCError.new(code, "#{context} failed") + end + end + + def decode_stats_result(resp) + reader = FIBP::Codec::Reader.new(resp) + code = reader.read_u8 + raise_for_error_code(code, 'get stats') unless code == FIBP::ErrorCodes::OK + + result = read_stats_base(reader) + result[:per_key_stats] = read_per_key_stats(reader) + result[:per_throttle_stats] = read_per_throttle_stats(reader) + result + end + + def read_stats_base(reader) + { + depth: reader.read_u64, in_flight: reader.read_u64, + active_fairness_keys: reader.read_u64, active_consumers: reader.read_u32, + quantum: reader.read_u32, leader_node_id: reader.read_u64, + replication_count: reader.read_u32 + } + end + + def read_per_key_stats(reader) + Array.new(reader.read_u16) do + { key: reader.read_string, pending_count: reader.read_u64, + current_deficit: reader.read_i64, weight: reader.read_u32 } + end + end + + def read_per_throttle_stats(reader) + Array.new(reader.read_u16) do + { key: reader.read_string, tokens: reader.read_f64, + rate_per_second: reader.read_f64, burst: reader.read_f64 } + end + end + + def decode_list_queues_result(resp) + reader = FIBP::Codec::Reader.new(resp) + code = reader.read_u8 + raise_for_error_code(code, 'list queues') unless code == FIBP::ErrorCodes::OK + + cluster_node_count = reader.read_u32 + queue_count = reader.read_u16 + queues = Array.new(queue_count) do + { + name: reader.read_string, + depth: reader.read_u64, + in_flight: reader.read_u64, + active_consumers: reader.read_u32, + leader_node_id: reader.read_u64 + } + end + + { cluster_node_count: cluster_node_count, queues: queues } + end + + def error_name(code) + case code + when FIBP::ErrorCodes::QUEUE_NOT_FOUND then 'queue not found' + when FIBP::ErrorCodes::MESSAGE_NOT_FOUND then 'message not found' + when FIBP::ErrorCodes::UNAUTHORIZED then 'unauthorized' + when FIBP::ErrorCodes::FORBIDDEN then 'forbidden' + else "error code 0x#{code.to_s(16).rjust(2, '0')}" + end + end end end diff --git a/lib/fila/enqueue_result.rb b/lib/fila/enqueue_result.rb new file mode 100644 index 0000000..ce8b3db --- /dev/null +++ b/lib/fila/enqueue_result.rb @@ -0,0 +1,37 @@ +# frozen_string_literal: true + +module Fila + # Result of a single message within an enqueue_many call. + # + # Each message is independently validated and processed. + # A failed message does not affect the others. + # + # @example + # results = client.enqueue_many(messages) + # results.each do |r| + # if r.success? + # puts "Enqueued: #{r.message_id}" + # else + # puts "Failed: #{r.error}" + # end + # end + class EnqueueResult + # @return [String, nil] broker-assigned message ID on success + attr_reader :message_id + + # @return [String, nil] error description on failure + attr_reader :error + + # @param message_id [String, nil] message ID if successful + # @param error [String, nil] error string if failed + def initialize(message_id: nil, error: nil) + @message_id = message_id + @error = error + end + + # @return [Boolean] true if the message was successfully enqueued + def success? + !@message_id.nil? + end + end +end diff --git a/lib/fila/errors.rb b/lib/fila/errors.rb index 2f3dcb8..b6307b8 100644 --- a/lib/fila/errors.rb +++ b/lib/fila/errors.rb @@ -10,16 +10,41 @@ class QueueNotFoundError < Error; end # Raised when the specified message does not exist. class MessageNotFoundError < Error; end - # Raised for unexpected gRPC failures, preserving status code and message. + # Raised when the queue already exists. + class QueueAlreadyExistsError < Error; end + + # Raised when authentication fails (missing or invalid API key). + class AuthenticationError < Error; end + + # Raised when the client lacks permission for the operation. + class ForbiddenError < Error; end + + # Raised when the server is not the leader for the target queue. + class NotLeaderError < Error + # @return [String, nil] address of the current leader + attr_reader :leader_addr + + # @param message [String] error message + # @param leader_addr [String, nil] leader address hint + def initialize(message, leader_addr: nil) + @leader_addr = leader_addr + super(message) + end + end + + # Raised when the API key is not found. + class ApiKeyNotFoundError < Error; end + + # Raised for transport or protocol failures, preserving the error code. class RPCError < Error - # @return [Integer] gRPC status code + # @return [Integer] FIBP error code attr_reader :code - # @param code [Integer] gRPC status code + # @param code [Integer] error code # @param message [String] error message def initialize(code, message) @code = code - super("rpc error (code = #{code}): #{message}") + super("rpc error (code = 0x#{code.to_s(16).rjust(2, '0')}): #{message}") end end end diff --git a/lib/fila/fibp/codec.rb b/lib/fila/fibp/codec.rb new file mode 100644 index 0000000..c5f87df --- /dev/null +++ b/lib/fila/fibp/codec.rb @@ -0,0 +1,172 @@ +# frozen_string_literal: true + +module Fila + module FIBP + # Low-level encoding/decoding primitives for the Fila binary protocol. + # + # All multi-byte integers are big-endian (network byte order). + # Strings are length-prefixed with u16; bytes with u32; maps with u16 count. + module Codec + module_function + + # --- Encoding primitives --- + + def encode_u8(val) + [val].pack('C') + end + + def encode_u16(val) + [val].pack('n') + end + + def encode_u32(val) + [val].pack('N') + end + + def encode_u64(val) + [val >> 32, val & 0xFFFFFFFF].pack('NN') + end + + def encode_i64(val) + [val >> 32, val & 0xFFFFFFFF].pack('NN') + end + + def encode_f64(val) + [val].pack('G') + end + + def encode_bool(val) + [val ? 1 : 0].pack('C') + end + + def encode_string(str) + bytes = str.to_s.encode('UTF-8').b + [bytes.bytesize].pack('n') + bytes + end + + def encode_bytes(data) + raw = data.to_s.b + [raw.bytesize].pack('N') + raw + end + + def encode_map(hash) + hash ||= {} + buf = [hash.size].pack('n') + hash.each do |k, v| + buf += encode_string(k) + encode_string(v) + end + buf + end + + def encode_string_array(arr) + arr ||= [] + buf = [arr.size].pack('n') + arr.each { |s| buf += encode_string(s) } + buf + end + + def encode_optional_string(val) + if val.nil? + encode_u8(0) + else + encode_u8(1) + encode_string(val) + end + end + + # Build a frame: [u32 frame_length][u8 opcode][u8 flags][u32 request_id][payload] + def encode_frame(opcode, request_id, payload, flags: 0) + body = [opcode, flags, request_id].pack('CCN') + payload + [body.bytesize].pack('N') + body + end + + # --- Decoding primitives --- + + # Reader wraps a binary string with a cursor for sequential reads. + class Reader + attr_reader :pos + + def initialize(data) + @data = data.b + @pos = 0 + end + + def remaining + @data.bytesize - @pos + end + + def read_raw(size) + raise ProtocolError, "unexpected end of frame (need #{size}, have #{remaining})" if remaining < size + + slice = @data.byteslice(@pos, size) + @pos += size + slice + end + + def read_u8 + read_raw(1).unpack1('C') + end + + def read_u16 + read_raw(2).unpack1('n') + end + + def read_u32 + read_raw(4).unpack1('N') + end + + def read_u64 + hi, lo = read_raw(8).unpack('NN') + (hi << 32) | lo + end + + def read_i64 + hi, lo = read_raw(8).unpack('NN') + val = (hi << 32) | lo + val >= (1 << 63) ? val - (1 << 64) : val + end + + def read_f64 + read_raw(8).unpack1('G') + end + + def read_bool # rubocop:disable Naming/PredicateMethod + read_u8 != 0 + end + + def read_string + len = read_u16 + read_raw(len).force_encoding('UTF-8') + end + + def read_bytes + len = read_u32 + read_raw(len) + end + + def read_map + count = read_u16 + hash = {} + count.times do + k = read_string + v = read_string + hash[k] = v + end + hash + end + + def read_string_array + count = read_u16 + Array.new(count) { read_string } + end + + def read_optional_string + present = read_u8 + present == 1 ? read_string : nil + end + end + end + + # Raised for protocol-level decode errors. + class ProtocolError < Fila::Error; end + end +end diff --git a/lib/fila/fibp/connection.rb b/lib/fila/fibp/connection.rb new file mode 100644 index 0000000..97dfcbe --- /dev/null +++ b/lib/fila/fibp/connection.rb @@ -0,0 +1,315 @@ +# frozen_string_literal: true + +require 'socket' +require 'openssl' +require 'monitor' + +module Fila + module FIBP + # TCP connection with TLS support, FIBP framing, handshake, and + # request/response correlation. Supports both synchronous + # request/response and asynchronous delivery streaming. + class Connection + PROTOCOL_VERSION = 1 + DEFAULT_MAX_FRAME_SIZE = 16 * 1024 * 1024 # 16 MiB + + attr_reader :node_id, :max_frame_size + + # @param host [String] server hostname or IP + # @param port [Integer] server port + # @param tls [Boolean] enable TLS with system trust store + # @param ca_cert [String, nil] PEM CA certificate + # @param client_cert [String, nil] PEM client certificate (mTLS) + # @param client_key [String, nil] PEM client key (mTLS) + # @param api_key [String, nil] API key for handshake auth + def initialize(host:, port:, tls: false, ca_cert: nil, client_cert: nil, client_key: nil, api_key: nil) # rubocop:disable Metrics/ParameterLists + @host = host + @port = port + @api_key = api_key + @tls_enabled = tls || !ca_cert.nil? + @ca_cert = ca_cert + @client_cert = client_cert + @client_key = client_key + + @request_id_counter = 0 + @pending = {} + @delivery_callbacks = {} + @monitor = Monitor.new + @read_monitor = Monitor.new + @write_monitor = Monitor.new + @closed = false + @continuation_buffers = {} + + connect! + perform_handshake + start_reader_thread + end + + # Send a request frame and wait for the response. + # + # @param opcode [Integer] request opcode + # @param payload [String] encoded payload bytes + # @return [Array(Integer, String)] [response_opcode, response_payload] + def request(opcode, payload) + raise Fila::Error, 'connection closed' if @closed + + rid = next_request_id + result_queue = Queue.new + @monitor.synchronize { @pending[rid] = result_queue } + + send_frame(opcode, rid, payload) + + response = result_queue.pop + raise response if response.is_a?(Exception) + + response + end + + # Register a delivery callback for a consume subscription. + # + # @param request_id [Integer] the consume request ID + # @param callback [Proc] called with (opcode, payload) for each delivery + # @return [Integer] the request_id + def subscribe(opcode, payload, &callback) + raise Fila::Error, 'connection closed' if @closed + + rid = next_request_id + result_queue = Queue.new + @monitor.synchronize do + @pending[rid] = result_queue + @delivery_callbacks[rid] = callback + end + + send_frame(opcode, rid, payload) + + # Wait for ConsumeOk or Error + response = result_queue.pop + raise response if response.is_a?(Exception) + + [rid, response] + end + + # Cancel a consume subscription. + def cancel_consume(request_id) + send_frame(Opcodes::CANCEL_CONSUME, request_id, '') + @monitor.synchronize { @delivery_callbacks.delete(request_id) } + end + + # Close the connection gracefully. + def close + return if @closed + + @closed = true + begin + send_frame(Opcodes::DISCONNECT, 0, '') + rescue StandardError + nil + end + @socket&.close + @reader_thread&.join(2) + + # Unblock any waiting requests. + @monitor.synchronize do + @pending.each_value { |q| q.push(Fila::Error.new('connection closed')) } + @pending.clear + @delivery_callbacks.clear + end + end + + private + + def connect! + tcp = TCPSocket.new(@host, @port) + tcp.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) + + @socket = if @tls_enabled + wrap_tls(tcp) + else + tcp + end + end + + def wrap_tls(tcp) + ctx = OpenSSL::SSL::SSLContext.new + ctx.min_version = OpenSSL::SSL::TLS1_2_VERSION + + if @ca_cert + store = OpenSSL::X509::Store.new + store.add_cert(OpenSSL::X509::Certificate.new(@ca_cert)) + ctx.cert_store = store + ctx.verify_mode = OpenSSL::SSL::VERIFY_PEER + else + ctx.set_params # uses system trust store + end + + if @client_cert && @client_key + ctx.cert = OpenSSL::X509::Certificate.new(@client_cert) + ctx.key = OpenSSL::PKey.read(@client_key) + end + + ssl = OpenSSL::SSL::SSLSocket.new(tcp, ctx) + ssl.hostname = @host + ssl.sync_close = true + ssl.connect + ssl + end + + def perform_handshake + payload = Codec.encode_u16(PROTOCOL_VERSION) + payload += Codec.encode_optional_string(@api_key) + + send_frame(Opcodes::HANDSHAKE, 0, payload) + + opcode, resp_payload = read_frame + if opcode == Opcodes::ERROR + reader = Codec::Reader.new(resp_payload) + code = reader.read_u8 + message = reader.read_string + raise_protocol_error(code, message, reader) + end + + unless opcode == Opcodes::HANDSHAKE_OK + raise ProtocolError, "expected HandshakeOk, got opcode 0x#{opcode.to_s(16)}" + end + + reader = Codec::Reader.new(resp_payload) + _version = reader.read_u16 + @node_id = reader.read_u64 + max_frame = reader.read_u32 + @max_frame_size = max_frame.zero? ? DEFAULT_MAX_FRAME_SIZE : max_frame + end + + def start_reader_thread + @reader_thread = Thread.new { reader_loop } + @reader_thread.abort_on_exception = false + end + + def reader_loop + until @closed + opcode, payload, request_id = read_frame_with_id + break if opcode.nil? + + handle_incoming(opcode, payload, request_id) + end + rescue IOError, Errno::ECONNRESET, OpenSSL::SSL::SSLError + # Connection closed + ensure + @closed = true + @monitor.synchronize do + err = Fila::Error.new('connection closed') + @pending.each_value { |q| q.push(err) } + @pending.clear + end + end + + def handle_incoming(opcode, payload, request_id) + if opcode == Opcodes::PING + send_frame(Opcodes::PONG, request_id, '') + return + end + + if opcode == Opcodes::DELIVERY + cb = @monitor.synchronize { @delivery_callbacks[request_id] } + cb&.call(opcode, payload) + return + end + + queue = @monitor.synchronize { @pending.delete(request_id) } + queue&.push([opcode, payload]) + end + + def send_frame(opcode, request_id, payload) + frame = Codec.encode_frame(opcode, request_id, payload) + @write_monitor.synchronize { write_all(frame) } + end + + def write_all(data) + total = 0 + while total < data.bytesize + written = @socket.write(data.byteslice(total..)) + total += written + end + end + + def read_frame + opcode, payload, _rid = read_frame_with_id + [opcode, payload] + end + + def read_frame_with_id + @read_monitor.synchronize do + loop do + opcode, flags, request_id, payload = read_raw_frame + return [nil, nil, nil] if opcode.nil? + + next buffer_continuation(request_id, opcode, payload) if flags.anybits?(FLAG_CONTINUATION) + + opcode, payload = reassemble_continuation(request_id, opcode, payload) + return [opcode, payload, request_id] + end + end + end + + def read_raw_frame + length_bytes = read_exact(4) + return [nil, nil, nil, nil] if length_bytes.nil? + + body = read_exact(length_bytes.unpack1('N')) + return [nil, nil, nil, nil] if body.nil? + + [body.getbyte(0), body.getbyte(1), body.byteslice(2, 4).unpack1('N'), body.byteslice(6..)] + end + + def buffer_continuation(request_id, opcode, payload) + @continuation_buffers[request_id] ||= { opcode: opcode, data: +''.b } + @continuation_buffers[request_id][:data] << payload + end + + def reassemble_continuation(request_id, opcode, payload) + if @continuation_buffers.key?(request_id) + buf = @continuation_buffers.delete(request_id) + [buf[:opcode], buf[:data] + payload] + else + [opcode, payload] + end + end + + def read_exact(size) + buf = +''.b + while buf.bytesize < size + chunk = @socket.read(size - buf.bytesize) + return nil if chunk.nil? || chunk.empty? + + buf << chunk + end + buf + end + + def next_request_id + @monitor.synchronize do + @request_id_counter = (@request_id_counter + 1) & 0xFFFFFFFF + @request_id_counter + end + end + + def raise_protocol_error(code, message, reader) + metadata = reader.remaining.positive? ? reader.read_map : {} + case code + when ErrorCodes::UNAUTHORIZED + raise Fila::AuthenticationError, message + when ErrorCodes::FORBIDDEN + raise Fila::ForbiddenError, message + when ErrorCodes::NOT_LEADER + leader_addr = metadata['leader_addr'] + raise Fila::NotLeaderError.new(message, leader_addr: leader_addr) + when ErrorCodes::QUEUE_NOT_FOUND + raise Fila::QueueNotFoundError, message + when ErrorCodes::UNSUPPORTED_VERSION + raise ProtocolError, "unsupported version: #{message}" + else + raise Fila::RPCError.new(code, message) + end + end + end + end +end diff --git a/lib/fila/fibp/opcodes.rb b/lib/fila/fibp/opcodes.rb new file mode 100644 index 0000000..c426144 --- /dev/null +++ b/lib/fila/fibp/opcodes.rb @@ -0,0 +1,83 @@ +# frozen_string_literal: true + +module Fila + module FIBP + # Protocol opcodes matching the server's fila-fibp crate. + module Opcodes + # Control opcodes (0x00-0x0F) + HANDSHAKE = 0x01 + HANDSHAKE_OK = 0x02 + PING = 0x03 + PONG = 0x04 + DISCONNECT = 0x05 + + # Hot-path opcodes (0x10-0x1F) + ENQUEUE = 0x10 + ENQUEUE_RESULT = 0x11 + CONSUME = 0x12 + CONSUME_OK = 0x13 + DELIVERY = 0x14 + CANCEL_CONSUME = 0x15 + ACK = 0x16 + ACK_RESULT = 0x17 + NACK = 0x18 + NACK_RESULT = 0x19 + + # Error opcode + ERROR = 0xFE + + # Admin opcodes (0xFD downward) + CREATE_QUEUE = 0xFD + CREATE_QUEUE_RESULT = 0xFC + DELETE_QUEUE = 0xFB + DELETE_QUEUE_RESULT = 0xFA + GET_STATS = 0xF9 + GET_STATS_RESULT = 0xF8 + LIST_QUEUES = 0xF7 + LIST_QUEUES_RESULT = 0xF6 + SET_CONFIG = 0xF5 + SET_CONFIG_RESULT = 0xF4 + GET_CONFIG = 0xF3 + GET_CONFIG_RESULT = 0xF2 + LIST_CONFIG = 0xF1 + LIST_CONFIG_RESULT = 0xF0 + REDRIVE = 0xEF + REDRIVE_RESULT = 0xEE + CREATE_API_KEY = 0xED + CREATE_API_KEY_RESULT = 0xEC + REVOKE_API_KEY = 0xEB + REVOKE_API_KEY_RESULT = 0xEA + LIST_API_KEYS = 0xE9 + LIST_API_KEYS_RESULT = 0xE8 + SET_ACL = 0xE7 + SET_ACL_RESULT = 0xE6 + GET_ACL = 0xE5 + GET_ACL_RESULT = 0xE4 + end + + # Protocol error codes. + module ErrorCodes + OK = 0x00 + QUEUE_NOT_FOUND = 0x01 + MESSAGE_NOT_FOUND = 0x02 + QUEUE_ALREADY_EXISTS = 0x03 + LUA_COMPILATION = 0x04 + STORAGE_ERROR = 0x05 + NOT_A_DLQ = 0x06 + PARENT_QUEUE_NOT_FOUND = 0x07 + INVALID_CONFIG_VALUE = 0x08 + CHANNEL_FULL = 0x09 + UNAUTHORIZED = 0x0A + FORBIDDEN = 0x0B + NOT_LEADER = 0x0C + UNSUPPORTED_VERSION = 0x0D + INVALID_FRAME = 0x0E + API_KEY_NOT_FOUND = 0x0F + NODE_NOT_READY = 0x10 + INTERNAL_ERROR = 0xFF + end + + # Continuation flag in frame flags byte. + FLAG_CONTINUATION = 0x01 + end +end diff --git a/lib/fila/proto/fila/v1/admin_pb.rb b/lib/fila/proto/fila/v1/admin_pb.rb deleted file mode 100644 index 5138c83..0000000 --- a/lib/fila/proto/fila/v1/admin_pb.rb +++ /dev/null @@ -1,49 +0,0 @@ -# frozen_string_literal: true -# Generated by the protocol buffer compiler. DO NOT EDIT! -# source: fila/v1/admin.proto - -require 'google/protobuf' - - -descriptor_data = "\n\x13\x66ila/v1/admin.proto\x12\x07\x66ila.v1\"H\n\x12\x43reateQueueRequest\x12\x0c\n\x04name\x18\x01 \x01(\t\x12$\n\x06\x63onfig\x18\x02 \x01(\x0b\x32\x14.fila.v1.QueueConfig\"b\n\x0bQueueConfig\x12\x19\n\x11on_enqueue_script\x18\x01 \x01(\t\x12\x19\n\x11on_failure_script\x18\x02 \x01(\t\x12\x1d\n\x15visibility_timeout_ms\x18\x03 \x01(\x04\"\'\n\x13\x43reateQueueResponse\x12\x10\n\x08queue_id\x18\x01 \x01(\t\"#\n\x12\x44\x65leteQueueRequest\x12\r\n\x05queue\x18\x01 \x01(\t\"\x15\n\x13\x44\x65leteQueueResponse\".\n\x10SetConfigRequest\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t\"\x13\n\x11SetConfigResponse\"\x1f\n\x10GetConfigRequest\x12\x0b\n\x03key\x18\x01 \x01(\t\"\"\n\x11GetConfigResponse\x12\r\n\x05value\x18\x01 \x01(\t\")\n\x0b\x43onfigEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t\"#\n\x11ListConfigRequest\x12\x0e\n\x06prefix\x18\x01 \x01(\t\"P\n\x12ListConfigResponse\x12%\n\x07\x65ntries\x18\x01 \x03(\x0b\x32\x14.fila.v1.ConfigEntry\x12\x13\n\x0btotal_count\x18\x02 \x01(\r\" \n\x0fGetStatsRequest\x12\r\n\x05queue\x18\x01 \x01(\t\"b\n\x13PerFairnessKeyStats\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x15\n\rpending_count\x18\x02 \x01(\x04\x12\x17\n\x0f\x63urrent_deficit\x18\x03 \x01(\x03\x12\x0e\n\x06weight\x18\x04 \x01(\r\"Z\n\x13PerThrottleKeyStats\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x0e\n\x06tokens\x18\x02 \x01(\x01\x12\x17\n\x0frate_per_second\x18\x03 \x01(\x01\x12\r\n\x05\x62urst\x18\x04 \x01(\x01\"\x9f\x02\n\x10GetStatsResponse\x12\r\n\x05\x64\x65pth\x18\x01 \x01(\x04\x12\x11\n\tin_flight\x18\x02 \x01(\x04\x12\x1c\n\x14\x61\x63tive_fairness_keys\x18\x03 \x01(\x04\x12\x18\n\x10\x61\x63tive_consumers\x18\x04 \x01(\r\x12\x0f\n\x07quantum\x18\x05 \x01(\r\x12\x33\n\rper_key_stats\x18\x06 \x03(\x0b\x32\x1c.fila.v1.PerFairnessKeyStats\x12\x38\n\x12per_throttle_stats\x18\x07 \x03(\x0b\x32\x1c.fila.v1.PerThrottleKeyStats\x12\x16\n\x0eleader_node_id\x18\x08 \x01(\x04\x12\x19\n\x11replication_count\x18\t \x01(\r\"2\n\x0eRedriveRequest\x12\x11\n\tdlq_queue\x18\x01 \x01(\t\x12\r\n\x05\x63ount\x18\x02 \x01(\x04\"#\n\x0fRedriveResponse\x12\x10\n\x08redriven\x18\x01 \x01(\x04\"\x13\n\x11ListQueuesRequest\"m\n\tQueueInfo\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\r\n\x05\x64\x65pth\x18\x02 \x01(\x04\x12\x11\n\tin_flight\x18\x03 \x01(\x04\x12\x18\n\x10\x61\x63tive_consumers\x18\x04 \x01(\r\x12\x16\n\x0eleader_node_id\x18\x05 \x01(\x04\"T\n\x12ListQueuesResponse\x12\"\n\x06queues\x18\x01 \x03(\x0b\x32\x12.fila.v1.QueueInfo\x12\x1a\n\x12\x63luster_node_count\x18\x02 \x01(\r\"Q\n\x13\x43reateApiKeyRequest\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x15\n\rexpires_at_ms\x18\x02 \x01(\x04\x12\x15\n\ris_superadmin\x18\x03 \x01(\x08\"J\n\x14\x43reateApiKeyResponse\x12\x0e\n\x06key_id\x18\x01 \x01(\t\x12\x0b\n\x03key\x18\x02 \x01(\t\x12\x15\n\ris_superadmin\x18\x03 \x01(\x08\"%\n\x13RevokeApiKeyRequest\x12\x0e\n\x06key_id\x18\x01 \x01(\t\"\x16\n\x14RevokeApiKeyResponse\"\x14\n\x12ListApiKeysRequest\"o\n\nApiKeyInfo\x12\x0e\n\x06key_id\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\x15\n\rcreated_at_ms\x18\x03 \x01(\x04\x12\x15\n\rexpires_at_ms\x18\x04 \x01(\x04\x12\x15\n\ris_superadmin\x18\x05 \x01(\x08\"8\n\x13ListApiKeysResponse\x12!\n\x04keys\x18\x01 \x03(\x0b\x32\x13.fila.v1.ApiKeyInfo\".\n\rAclPermission\x12\x0c\n\x04kind\x18\x01 \x01(\t\x12\x0f\n\x07pattern\x18\x02 \x01(\t\"L\n\rSetAclRequest\x12\x0e\n\x06key_id\x18\x01 \x01(\t\x12+\n\x0bpermissions\x18\x02 \x03(\x0b\x32\x16.fila.v1.AclPermission\"\x10\n\x0eSetAclResponse\"\x1f\n\rGetAclRequest\x12\x0e\n\x06key_id\x18\x01 \x01(\t\"d\n\x0eGetAclResponse\x12\x0e\n\x06key_id\x18\x01 \x01(\t\x12+\n\x0bpermissions\x18\x02 \x03(\x0b\x32\x16.fila.v1.AclPermission\x12\x15\n\ris_superadmin\x18\x03 \x01(\x08\x32\x8e\x07\n\tFilaAdmin\x12H\n\x0b\x43reateQueue\x12\x1b.fila.v1.CreateQueueRequest\x1a\x1c.fila.v1.CreateQueueResponse\x12H\n\x0b\x44\x65leteQueue\x12\x1b.fila.v1.DeleteQueueRequest\x1a\x1c.fila.v1.DeleteQueueResponse\x12\x42\n\tSetConfig\x12\x19.fila.v1.SetConfigRequest\x1a\x1a.fila.v1.SetConfigResponse\x12\x42\n\tGetConfig\x12\x19.fila.v1.GetConfigRequest\x1a\x1a.fila.v1.GetConfigResponse\x12\x45\n\nListConfig\x12\x1a.fila.v1.ListConfigRequest\x1a\x1b.fila.v1.ListConfigResponse\x12?\n\x08GetStats\x12\x18.fila.v1.GetStatsRequest\x1a\x19.fila.v1.GetStatsResponse\x12<\n\x07Redrive\x12\x17.fila.v1.RedriveRequest\x1a\x18.fila.v1.RedriveResponse\x12\x45\n\nListQueues\x12\x1a.fila.v1.ListQueuesRequest\x1a\x1b.fila.v1.ListQueuesResponse\x12K\n\x0c\x43reateApiKey\x12\x1c.fila.v1.CreateApiKeyRequest\x1a\x1d.fila.v1.CreateApiKeyResponse\x12K\n\x0cRevokeApiKey\x12\x1c.fila.v1.RevokeApiKeyRequest\x1a\x1d.fila.v1.RevokeApiKeyResponse\x12H\n\x0bListApiKeys\x12\x1b.fila.v1.ListApiKeysRequest\x1a\x1c.fila.v1.ListApiKeysResponse\x12\x39\n\x06SetAcl\x12\x16.fila.v1.SetAclRequest\x1a\x17.fila.v1.SetAclResponse\x12\x39\n\x06GetAcl\x12\x16.fila.v1.GetAclRequest\x1a\x17.fila.v1.GetAclResponseb\x06proto3" - -pool = ::Google::Protobuf::DescriptorPool.generated_pool -pool.add_serialized_file(descriptor_data) - -module Fila - module V1 - CreateQueueRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.CreateQueueRequest").msgclass - QueueConfig = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.QueueConfig").msgclass - CreateQueueResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.CreateQueueResponse").msgclass - DeleteQueueRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.DeleteQueueRequest").msgclass - DeleteQueueResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.DeleteQueueResponse").msgclass - SetConfigRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.SetConfigRequest").msgclass - SetConfigResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.SetConfigResponse").msgclass - GetConfigRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.GetConfigRequest").msgclass - GetConfigResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.GetConfigResponse").msgclass - ConfigEntry = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.ConfigEntry").msgclass - ListConfigRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.ListConfigRequest").msgclass - ListConfigResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.ListConfigResponse").msgclass - GetStatsRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.GetStatsRequest").msgclass - PerFairnessKeyStats = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.PerFairnessKeyStats").msgclass - PerThrottleKeyStats = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.PerThrottleKeyStats").msgclass - GetStatsResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.GetStatsResponse").msgclass - RedriveRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.RedriveRequest").msgclass - RedriveResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.RedriveResponse").msgclass - ListQueuesRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.ListQueuesRequest").msgclass - QueueInfo = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.QueueInfo").msgclass - ListQueuesResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.ListQueuesResponse").msgclass - CreateApiKeyRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.CreateApiKeyRequest").msgclass - CreateApiKeyResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.CreateApiKeyResponse").msgclass - RevokeApiKeyRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.RevokeApiKeyRequest").msgclass - RevokeApiKeyResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.RevokeApiKeyResponse").msgclass - ListApiKeysRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.ListApiKeysRequest").msgclass - ApiKeyInfo = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.ApiKeyInfo").msgclass - ListApiKeysResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.ListApiKeysResponse").msgclass - AclPermission = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.AclPermission").msgclass - SetAclRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.SetAclRequest").msgclass - SetAclResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.SetAclResponse").msgclass - GetAclRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.GetAclRequest").msgclass - GetAclResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.GetAclResponse").msgclass - end -end diff --git a/lib/fila/proto/fila/v1/admin_services_pb.rb b/lib/fila/proto/fila/v1/admin_services_pb.rb deleted file mode 100644 index 71de19e..0000000 --- a/lib/fila/proto/fila/v1/admin_services_pb.rb +++ /dev/null @@ -1,39 +0,0 @@ -# Generated by the protocol buffer compiler. DO NOT EDIT! -# Source: fila/v1/admin.proto for package 'fila.v1' - -require 'grpc' -require 'fila/v1/admin_pb' - -module Fila - module V1 - module FilaAdmin - # Admin RPCs for operators and the CLI. - class Service - - include ::GRPC::GenericService - - self.marshal_class_method = :encode - self.unmarshal_class_method = :decode - self.service_name = 'fila.v1.FilaAdmin' - - rpc :CreateQueue, ::Fila::V1::CreateQueueRequest, ::Fila::V1::CreateQueueResponse - rpc :DeleteQueue, ::Fila::V1::DeleteQueueRequest, ::Fila::V1::DeleteQueueResponse - rpc :SetConfig, ::Fila::V1::SetConfigRequest, ::Fila::V1::SetConfigResponse - rpc :GetConfig, ::Fila::V1::GetConfigRequest, ::Fila::V1::GetConfigResponse - rpc :ListConfig, ::Fila::V1::ListConfigRequest, ::Fila::V1::ListConfigResponse - rpc :GetStats, ::Fila::V1::GetStatsRequest, ::Fila::V1::GetStatsResponse - rpc :Redrive, ::Fila::V1::RedriveRequest, ::Fila::V1::RedriveResponse - rpc :ListQueues, ::Fila::V1::ListQueuesRequest, ::Fila::V1::ListQueuesResponse - # API key management. CreateApiKey bypasses auth (bootstrap); others require a valid key. - rpc :CreateApiKey, ::Fila::V1::CreateApiKeyRequest, ::Fila::V1::CreateApiKeyResponse - rpc :RevokeApiKey, ::Fila::V1::RevokeApiKeyRequest, ::Fila::V1::RevokeApiKeyResponse - rpc :ListApiKeys, ::Fila::V1::ListApiKeysRequest, ::Fila::V1::ListApiKeysResponse - # Per-key ACL management. - rpc :SetAcl, ::Fila::V1::SetAclRequest, ::Fila::V1::SetAclResponse - rpc :GetAcl, ::Fila::V1::GetAclRequest, ::Fila::V1::GetAclResponse - end - - Stub = Service.rpc_stub_class - end - end -end diff --git a/lib/fila/proto/fila/v1/messages_pb.rb b/lib/fila/proto/fila/v1/messages_pb.rb deleted file mode 100644 index ae3674f..0000000 --- a/lib/fila/proto/fila/v1/messages_pb.rb +++ /dev/null @@ -1,21 +0,0 @@ -# frozen_string_literal: true -# Generated by the protocol buffer compiler. DO NOT EDIT! -# source: fila/v1/messages.proto - -require 'google/protobuf' - -require 'google/protobuf/timestamp_pb' - - -descriptor_data = "\n\x16\x66ila/v1/messages.proto\x12\x07\x66ila.v1\x1a\x1fgoogle/protobuf/timestamp.proto\"\xe2\x01\n\x07Message\x12\n\n\x02id\x18\x01 \x01(\t\x12.\n\x07headers\x18\x02 \x03(\x0b\x32\x1d.fila.v1.Message.HeadersEntry\x12\x0f\n\x07payload\x18\x03 \x01(\x0c\x12*\n\x08metadata\x18\x04 \x01(\x0b\x32\x18.fila.v1.MessageMetadata\x12.\n\ntimestamps\x18\x05 \x01(\x0b\x32\x1a.fila.v1.MessageTimestamps\x1a.\n\x0cHeadersEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"w\n\x0fMessageMetadata\x12\x14\n\x0c\x66\x61irness_key\x18\x01 \x01(\t\x12\x0e\n\x06weight\x18\x02 \x01(\r\x12\x15\n\rthrottle_keys\x18\x03 \x03(\t\x12\x15\n\rattempt_count\x18\x04 \x01(\r\x12\x10\n\x08queue_id\x18\x05 \x01(\t\"s\n\x11MessageTimestamps\x12/\n\x0b\x65nqueued_at\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12-\n\tleased_at\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestampb\x06proto3" - -pool = ::Google::Protobuf::DescriptorPool.generated_pool -pool.add_serialized_file(descriptor_data) - -module Fila - module V1 - Message = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.Message").msgclass - MessageMetadata = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.MessageMetadata").msgclass - MessageTimestamps = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.MessageTimestamps").msgclass - end -end diff --git a/lib/fila/proto/fila/v1/service_pb.rb b/lib/fila/proto/fila/v1/service_pb.rb deleted file mode 100644 index 7eba33f..0000000 --- a/lib/fila/proto/fila/v1/service_pb.rb +++ /dev/null @@ -1,26 +0,0 @@ -# frozen_string_literal: true -# Generated by the protocol buffer compiler. DO NOT EDIT! -# source: fila/v1/service.proto - -require 'google/protobuf' - -require 'fila/v1/messages_pb' - - -descriptor_data = "\n\x15\x66ila/v1/service.proto\x12\x07\x66ila.v1\x1a\x16\x66ila/v1/messages.proto\"\x97\x01\n\x0e\x45nqueueRequest\x12\r\n\x05queue\x18\x01 \x01(\t\x12\x35\n\x07headers\x18\x02 \x03(\x0b\x32$.fila.v1.EnqueueRequest.HeadersEntry\x12\x0f\n\x07payload\x18\x03 \x01(\x0c\x1a.\n\x0cHeadersEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"%\n\x0f\x45nqueueResponse\x12\x12\n\nmessage_id\x18\x01 \x01(\t\"\x1f\n\x0e\x43onsumeRequest\x12\r\n\x05queue\x18\x01 \x01(\t\"4\n\x0f\x43onsumeResponse\x12!\n\x07message\x18\x01 \x01(\x0b\x32\x10.fila.v1.Message\"/\n\nAckRequest\x12\r\n\x05queue\x18\x01 \x01(\t\x12\x12\n\nmessage_id\x18\x02 \x01(\t\"\r\n\x0b\x41\x63kResponse\"?\n\x0bNackRequest\x12\r\n\x05queue\x18\x01 \x01(\t\x12\x12\n\nmessage_id\x18\x02 \x01(\t\x12\r\n\x05\x65rror\x18\x03 \x01(\t\"\x0e\n\x0cNackResponse2\xf2\x01\n\x0b\x46ilaService\x12<\n\x07\x45nqueue\x12\x17.fila.v1.EnqueueRequest\x1a\x18.fila.v1.EnqueueResponse\x12>\n\x07\x43onsume\x12\x17.fila.v1.ConsumeRequest\x1a\x18.fila.v1.ConsumeResponse0\x01\x12\x30\n\x03\x41\x63k\x12\x13.fila.v1.AckRequest\x1a\x14.fila.v1.AckResponse\x12\x33\n\x04Nack\x12\x14.fila.v1.NackRequest\x1a\x15.fila.v1.NackResponseb\x06proto3" - -pool = ::Google::Protobuf::DescriptorPool.generated_pool -pool.add_serialized_file(descriptor_data) - -module Fila - module V1 - EnqueueRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.EnqueueRequest").msgclass - EnqueueResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.EnqueueResponse").msgclass - ConsumeRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.ConsumeRequest").msgclass - ConsumeResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.ConsumeResponse").msgclass - AckRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.AckRequest").msgclass - AckResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.AckResponse").msgclass - NackRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.NackRequest").msgclass - NackResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.NackResponse").msgclass - end -end diff --git a/lib/fila/proto/fila/v1/service_services_pb.rb b/lib/fila/proto/fila/v1/service_services_pb.rb deleted file mode 100644 index 941aba4..0000000 --- a/lib/fila/proto/fila/v1/service_services_pb.rb +++ /dev/null @@ -1,28 +0,0 @@ -# Generated by the protocol buffer compiler. DO NOT EDIT! -# Source: fila/v1/service.proto for package 'fila.v1' - -require 'grpc' -require 'fila/v1/service_pb' - -module Fila - module V1 - module FilaService - # Hot-path RPCs for producers and consumers. - class Service - - include ::GRPC::GenericService - - self.marshal_class_method = :encode - self.unmarshal_class_method = :decode - self.service_name = 'fila.v1.FilaService' - - rpc :Enqueue, ::Fila::V1::EnqueueRequest, ::Fila::V1::EnqueueResponse - rpc :Consume, ::Fila::V1::ConsumeRequest, stream(::Fila::V1::ConsumeResponse) - rpc :Ack, ::Fila::V1::AckRequest, ::Fila::V1::AckResponse - rpc :Nack, ::Fila::V1::NackRequest, ::Fila::V1::NackResponse - end - - Stub = Service.rpc_stub_class - end - end -end diff --git a/lib/fila/version.rb b/lib/fila/version.rb index 33019cf..efdd9b7 100644 --- a/lib/fila/version.rb +++ b/lib/fila/version.rb @@ -1,5 +1,5 @@ # frozen_string_literal: true module Fila - VERSION = '0.1.0' + VERSION = '0.5.0' end diff --git a/proto/fila/v1/admin.proto b/proto/fila/v1/admin.proto deleted file mode 100644 index 886e58d..0000000 --- a/proto/fila/v1/admin.proto +++ /dev/null @@ -1,197 +0,0 @@ -syntax = "proto3"; -package fila.v1; - -// Admin RPCs for operators and the CLI. -service FilaAdmin { - rpc CreateQueue(CreateQueueRequest) returns (CreateQueueResponse); - rpc DeleteQueue(DeleteQueueRequest) returns (DeleteQueueResponse); - rpc SetConfig(SetConfigRequest) returns (SetConfigResponse); - rpc GetConfig(GetConfigRequest) returns (GetConfigResponse); - rpc ListConfig(ListConfigRequest) returns (ListConfigResponse); - rpc GetStats(GetStatsRequest) returns (GetStatsResponse); - rpc Redrive(RedriveRequest) returns (RedriveResponse); - rpc ListQueues(ListQueuesRequest) returns (ListQueuesResponse); - - // API key management. CreateApiKey bypasses auth (bootstrap); others require a valid key. - rpc CreateApiKey(CreateApiKeyRequest) returns (CreateApiKeyResponse); - rpc RevokeApiKey(RevokeApiKeyRequest) returns (RevokeApiKeyResponse); - rpc ListApiKeys(ListApiKeysRequest) returns (ListApiKeysResponse); - - // Per-key ACL management. - rpc SetAcl(SetAclRequest) returns (SetAclResponse); - rpc GetAcl(GetAclRequest) returns (GetAclResponse); -} - -message CreateQueueRequest { - string name = 1; - QueueConfig config = 2; -} - -message QueueConfig { - string on_enqueue_script = 1; - string on_failure_script = 2; - uint64 visibility_timeout_ms = 3; -} - -message CreateQueueResponse { - string queue_id = 1; -} - -message DeleteQueueRequest { - string queue = 1; -} - -message DeleteQueueResponse {} - -message SetConfigRequest { - string key = 1; - string value = 2; -} - -message SetConfigResponse {} - -message GetConfigRequest { - string key = 1; -} - -message GetConfigResponse { - string value = 1; -} - -message ConfigEntry { - string key = 1; - string value = 2; -} - -message ListConfigRequest { - string prefix = 1; -} - -message ListConfigResponse { - repeated ConfigEntry entries = 1; - uint32 total_count = 2; -} - -message GetStatsRequest { - string queue = 1; -} - -message PerFairnessKeyStats { - string key = 1; - uint64 pending_count = 2; - int64 current_deficit = 3; - uint32 weight = 4; -} - -message PerThrottleKeyStats { - string key = 1; - double tokens = 2; - double rate_per_second = 3; - double burst = 4; -} - -message GetStatsResponse { - uint64 depth = 1; - uint64 in_flight = 2; - uint64 active_fairness_keys = 3; - uint32 active_consumers = 4; - uint32 quantum = 5; - repeated PerFairnessKeyStats per_key_stats = 6; - repeated PerThrottleKeyStats per_throttle_stats = 7; - // Cluster fields (0 when not in cluster mode). - uint64 leader_node_id = 8; - uint32 replication_count = 9; -} - -message RedriveRequest { - string dlq_queue = 1; - uint64 count = 2; -} - -message RedriveResponse { - uint64 redriven = 1; -} - -message ListQueuesRequest {} - -message QueueInfo { - string name = 1; - uint64 depth = 2; - uint64 in_flight = 3; - uint32 active_consumers = 4; - uint64 leader_node_id = 5; -} - -message ListQueuesResponse { - repeated QueueInfo queues = 1; - uint32 cluster_node_count = 2; -} - -// --- API Key Management --- - -message CreateApiKeyRequest { - /// Human-readable label for the key. - string name = 1; - /// Optional Unix timestamp (milliseconds) after which the key expires. - /// 0 means no expiration. - uint64 expires_at_ms = 2; - /// When true, the key bypasses all ACL checks (superadmin). - bool is_superadmin = 3; -} - -message CreateApiKeyResponse { - /// Opaque key ID for management operations (revoke, list, set-acl). - string key_id = 1; - /// Plaintext API key. Returned once — store it securely. - string key = 2; - /// Whether this key has superadmin privileges. - bool is_superadmin = 3; -} - -message RevokeApiKeyRequest { - string key_id = 1; -} - -message RevokeApiKeyResponse {} - -message ListApiKeysRequest {} - -message ApiKeyInfo { - string key_id = 1; - string name = 2; - uint64 created_at_ms = 3; - /// 0 means no expiration. - uint64 expires_at_ms = 4; - bool is_superadmin = 5; -} - -message ListApiKeysResponse { - repeated ApiKeyInfo keys = 1; -} - -// --- ACL Management --- - -/// A single permission grant: kind (produce/consume/admin) + queue pattern. -message AclPermission { - /// One of: "produce", "consume", "admin". - string kind = 1; - /// Queue name or wildcard ("*" or "orders.*"). - string pattern = 2; -} - -message SetAclRequest { - string key_id = 1; - repeated AclPermission permissions = 2; -} - -message SetAclResponse {} - -message GetAclRequest { - string key_id = 1; -} - -message GetAclResponse { - string key_id = 1; - repeated AclPermission permissions = 2; - bool is_superadmin = 3; -} diff --git a/proto/fila/v1/messages.proto b/proto/fila/v1/messages.proto deleted file mode 100644 index a0709cf..0000000 --- a/proto/fila/v1/messages.proto +++ /dev/null @@ -1,28 +0,0 @@ -syntax = "proto3"; -package fila.v1; - -import "google/protobuf/timestamp.proto"; - -// Core message envelope persisted in the broker. -message Message { - string id = 1; - map headers = 2; - bytes payload = 3; - MessageMetadata metadata = 4; - MessageTimestamps timestamps = 5; -} - -// Broker-assigned scheduling metadata. -message MessageMetadata { - string fairness_key = 1; - uint32 weight = 2; - repeated string throttle_keys = 3; - uint32 attempt_count = 4; - string queue_id = 5; -} - -// Lifecycle timestamps attached to every message. -message MessageTimestamps { - google.protobuf.Timestamp enqueued_at = 1; - google.protobuf.Timestamp leased_at = 2; -} diff --git a/proto/fila/v1/service.proto b/proto/fila/v1/service.proto deleted file mode 100644 index f14fdd0..0000000 --- a/proto/fila/v1/service.proto +++ /dev/null @@ -1,45 +0,0 @@ -syntax = "proto3"; -package fila.v1; - -import "fila/v1/messages.proto"; - -// Hot-path RPCs for producers and consumers. -service FilaService { - rpc Enqueue(EnqueueRequest) returns (EnqueueResponse); - rpc Consume(ConsumeRequest) returns (stream ConsumeResponse); - rpc Ack(AckRequest) returns (AckResponse); - rpc Nack(NackRequest) returns (NackResponse); -} - -message EnqueueRequest { - string queue = 1; - map headers = 2; - bytes payload = 3; -} - -message EnqueueResponse { - string message_id = 1; -} - -message ConsumeRequest { - string queue = 1; -} - -message ConsumeResponse { - Message message = 1; -} - -message AckRequest { - string queue = 1; - string message_id = 2; -} - -message AckResponse {} - -message NackRequest { - string queue = 1; - string message_id = 2; - string error = 3; -} - -message NackResponse {} diff --git a/test/test_batch.rb b/test/test_batch.rb new file mode 100644 index 0000000..295c6ce --- /dev/null +++ b/test/test_batch.rb @@ -0,0 +1,264 @@ +# frozen_string_literal: true + +require 'test_helper' + +return unless FILA_SERVER_AVAILABLE + +class TestEnqueueMany < Minitest::Test + def setup + @server = TestServerHelper.start + @client = Fila::Client.new(@server[:addr], batch_mode: :disabled) + end + + def teardown + @client&.close + TestServerHelper.stop(@server) if @server + end + + def test_enqueue_many_multiple_messages + TestServerHelper.create_queue(@server, 'many-test') + + messages = 5.times.map do |i| + { queue: 'many-test', payload: "many-msg-#{i}", headers: { 'index' => i.to_s } } + end + + results = @client.enqueue_many(messages) + + assert_equal 5, results.size + results.each do |r| + assert r.success?, "expected success but got error: #{r.error}" + refute_empty r.message_id + end + + # Verify all messages are consumable. + received_ids = [] + @client.consume(queue: 'many-test') do |msg| + received_ids << msg.id + @client.ack(queue: 'many-test', msg_id: msg.id) + break if received_ids.size >= 5 + end + assert_equal 5, received_ids.size + end + + def test_enqueue_many_single_message + TestServerHelper.create_queue(@server, 'many-single') + + results = @client.enqueue_many( + [{ queue: 'many-single', payload: 'solo' }] + ) + + assert_equal 1, results.size + assert results.first.success? + refute_empty results.first.message_id + end + + def test_enqueue_many_empty_array + results = @client.enqueue_many([]) + assert_equal 0, results.size + end + + def test_enqueue_many_mixed_success_and_failure + TestServerHelper.create_queue(@server, 'many-mixed') + + messages = [ + { queue: 'many-mixed', payload: 'good-1' }, + { queue: 'no-such-queue-xyz', payload: 'bad' }, + { queue: 'many-mixed', payload: 'good-2' } + ] + + results = @client.enqueue_many(messages) + assert_equal 3, results.size + + assert results[0].success?, 'first message should succeed' + refute results[1].success?, 'second message should fail (nonexistent queue)' + assert results[1].error, 'second message should have error description' + assert results[2].success?, 'third message should succeed' + end +end + +class TestEnqueueResult < Minitest::Test + def test_success_result + r = Fila::EnqueueResult.new(message_id: 'abc-123') + assert r.success? + assert_equal 'abc-123', r.message_id + assert_nil r.error + end + + def test_error_result + r = Fila::EnqueueResult.new(error: 'queue not found') + refute r.success? + assert_nil r.message_id + assert_equal 'queue not found', r.error + end +end + +class TestAutoBatching < Minitest::Test + def setup + @server = TestServerHelper.start + # Default batch_mode is :auto + @client = Fila::Client.new(@server[:addr]) + end + + def teardown + @client&.close + TestServerHelper.stop(@server) if @server + end + + def test_auto_batch_enqueue_single + TestServerHelper.create_queue(@server, 'auto-single') + + msg_id = @client.enqueue(queue: 'auto-single', payload: 'auto-msg') + assert msg_id + refute_empty msg_id + + received = false + @client.consume(queue: 'auto-single') do |msg| + assert_equal msg_id, msg.id + assert_equal 'auto-msg', msg.payload + @client.ack(queue: 'auto-single', msg_id: msg.id) + received = true + break + end + assert received + end + + def test_auto_batch_enqueue_concurrent + TestServerHelper.create_queue(@server, 'auto-concurrent') + + # Fire multiple enqueues concurrently to exercise batching. + threads = 10.times.map do |i| + Thread.new do + @client.enqueue(queue: 'auto-concurrent', payload: "msg-#{i}") + end + end + ids = threads.map(&:value) + + assert_equal 10, ids.size + ids.each do |id| + assert id + refute_empty id + end + + # Consume all messages. + received = [] + @client.consume(queue: 'auto-concurrent') do |msg| + received << msg.id + @client.ack(queue: 'auto-concurrent', msg_id: msg.id) + break if received.size >= 10 + end + assert_equal 10, received.size + end + + def test_auto_batch_nonexistent_queue_raises + assert_raises(Fila::QueueNotFoundError) do + @client.enqueue(queue: 'no-such-queue-auto', payload: 'fail') + end + end +end + +class TestLingerBatching < Minitest::Test + def setup + @server = TestServerHelper.start + @client = Fila::Client.new(@server[:addr], batch_mode: :linger, linger_ms: 50, batch_size: 10) + end + + def teardown + @client&.close + TestServerHelper.stop(@server) if @server + end + + def test_linger_batch_enqueue + TestServerHelper.create_queue(@server, 'linger-test') + + msg_id = @client.enqueue(queue: 'linger-test', payload: 'linger-msg') + assert msg_id + refute_empty msg_id + end + + def test_linger_batch_concurrent + TestServerHelper.create_queue(@server, 'linger-concurrent') + + threads = 5.times.map do |i| + Thread.new do + @client.enqueue(queue: 'linger-concurrent', payload: "linger-#{i}") + end + end + ids = threads.map(&:value) + + assert_equal 5, ids.size + ids.each { |id| refute_empty id } + end +end + +class TestDisabledBatching < Minitest::Test + def setup + @server = TestServerHelper.start + @client = Fila::Client.new(@server[:addr], batch_mode: :disabled) + end + + def teardown + @client&.close + TestServerHelper.stop(@server) if @server + end + + def test_disabled_batch_enqueue_direct + TestServerHelper.create_queue(@server, 'disabled-test') + + msg_id = @client.enqueue(queue: 'disabled-test', payload: 'direct-msg') + assert msg_id + refute_empty msg_id + end + + def test_disabled_nonexistent_queue_raises + assert_raises(Fila::QueueNotFoundError) do + @client.enqueue(queue: 'no-such-queue-disabled', payload: 'fail') + end + end +end + +class TestBatchModeValidation < Minitest::Test + def test_invalid_batch_mode_raises + assert_raises(ArgumentError) do + Fila::Client.new('localhost:5555', batch_mode: :invalid) + end + end +end + +class TestCloseFlush < Minitest::Test + def setup + @server = TestServerHelper.start + end + + def teardown + TestServerHelper.stop(@server) if @server + end + + def test_close_drains_pending_messages + TestServerHelper.create_queue(@server, 'close-drain') + client = Fila::Client.new(@server[:addr]) + + # Enqueue a message, then close immediately. + msg_id = client.enqueue(queue: 'close-drain', payload: 'drain-me') + refute_empty msg_id + + client.close + + # Verify the message was persisted. + verify_client = Fila::Client.new(@server[:addr], batch_mode: :disabled) + received = false + verify_client.consume(queue: 'close-drain') do |msg| + assert_equal msg_id, msg.id + verify_client.ack(queue: 'close-drain', msg_id: msg.id) + received = true + break + end + assert received + verify_client.close + end + + def test_double_close_is_safe + client = Fila::Client.new(@server[:addr]) + client.close + client.close # Should not raise. + end +end diff --git a/test/test_client.rb b/test/test_client.rb index 7d0a01f..f39fe44 100644 --- a/test/test_client.rb +++ b/test/test_client.rb @@ -11,6 +11,7 @@ def setup end def teardown + @client&.close TestServerHelper.stop(@server) if @server end diff --git a/test/test_helper.rb b/test/test_helper.rb index c88a349..635c496 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -3,12 +3,10 @@ require 'minitest/autorun' require 'tmpdir' require 'socket' -require 'grpc' +require 'timeout' $LOAD_PATH.unshift File.expand_path('../lib', __dir__) -$LOAD_PATH.unshift File.expand_path('../lib/fila/proto', __dir__) require 'fila' -require_relative '../lib/fila/proto/fila/v1/admin_services_pb' FILA_SERVER_BIN = ENV.fetch('FILA_SERVER_BIN') do File.join(__dir__, '..', '..', 'fila', 'target', 'release', 'fila-server') @@ -25,10 +23,9 @@ def self.find_free_port # Start a fila-server instance. # - # @param tls_config [Hash, nil] optional TLS configuration with keys: - # :ca_cert_path, :server_cert_path, :server_key_path + # @param tls_config [Hash, nil] optional TLS configuration # @param bootstrap_apikey [String, nil] optional bootstrap API key - # @return [Hash] server info with :addr, :pid, :data_dir, :admin_stub + # @return [Hash] server info with :addr, :pid, :data_dir, etc. def self.start(tls_config: nil, bootstrap_apikey: nil) port = find_free_port addr = "127.0.0.1:#{port}" @@ -62,32 +59,30 @@ def self.start(tls_config: nil, bootstrap_apikey: nil) ) end - # Build credentials for admin stub. - # client_ca_cert_path is always needed to verify server cert; ca_cert_path is only for mTLS. - credentials = :this_channel_is_insecure + # Build connection options for admin operations. + conn_opts = { tls: false } if tls_config ca_path = tls_config[:client_ca_cert_path] || tls_config[:ca_cert_path] if ca_path - ca_cert = File.read(ca_path) - client_key = tls_config[:client_key_path] ? File.read(tls_config[:client_key_path]) : nil - client_cert = tls_config[:client_cert_path] ? File.read(tls_config[:client_cert_path]) : nil - credentials = GRPC::Core::ChannelCredentials.new(ca_cert, client_key, client_cert) + conn_opts[:ca_cert] = File.read(ca_path) + conn_opts[:client_cert] = File.read(tls_config[:client_cert_path]) if tls_config[:client_cert_path] + conn_opts[:client_key] = File.read(tls_config[:client_key_path]) if tls_config[:client_key_path] end end - - admin_metadata = {} - admin_metadata['authorization'] = "Bearer #{bootstrap_apikey}" if bootstrap_apikey + conn_opts[:api_key] = bootstrap_apikey if bootstrap_apikey # Wait for server ready. deadline = Time.now + 10 ready = false + last_error = nil while Time.now < deadline begin - try_list_queues(addr, credentials: credentials, metadata: admin_metadata) + Timeout.timeout(3) { try_list_queues(addr, conn_opts) } ready = true break - rescue StandardError - sleep 0.05 + rescue StandardError => e + last_error = e + sleep 0.1 end end @@ -100,17 +95,15 @@ def self.start(tls_config: nil, bootstrap_apikey: nil) '' end FileUtils.rm_rf(data_dir) - raise "fila-server failed to start within 10s on #{addr}\nConfig:\n#{toml}\nStderr:\n#{stderr_output}" + raise "fila-server failed to start within 10s on #{addr}\nConfig:\n#{toml}\n" \ + "Stderr:\n#{stderr_output}\nLast error: #{last_error&.class}: #{last_error&.message}" end - admin_stub = ::Fila::V1::FilaAdmin::Stub.new(addr, credentials) - { addr: addr, pid: pid, data_dir: data_dir, - admin_stub: admin_stub, - admin_metadata: admin_metadata + conn_opts: conn_opts } end @@ -123,12 +116,18 @@ def self.stop(server) end def self.create_queue(server, name) - req = ::Fila::V1::CreateQueueRequest.new(name: name, config: {}) - server[:admin_stub].create_queue(req, metadata: server[:admin_metadata] || {}) + client = nil + client = Fila::Client.new(server[:addr], batch_mode: :disabled, **server[:conn_opts]) + client.create_queue(name: name) + ensure + client&.close end - def self.try_list_queues(addr, credentials: :this_channel_is_insecure, metadata: {}) - stub = ::Fila::V1::FilaAdmin::Stub.new(addr, credentials) - stub.list_queues(::Fila::V1::ListQueuesRequest.new, metadata: metadata) + def self.try_list_queues(addr, conn_opts) + client = nil + client = Fila::Client.new(addr, batch_mode: :disabled, **conn_opts) + client.list_queues + ensure + client&.close end end diff --git a/test/test_tls_auth.rb b/test/test_tls_auth.rb index 7aad862..ca61f59 100644 --- a/test/test_tls_auth.rb +++ b/test/test_tls_auth.rb @@ -79,6 +79,7 @@ def setup end def teardown + @client&.close TestServerHelper.stop(@server) if @server end @@ -91,14 +92,16 @@ def test_enqueue_with_api_key end def test_enqueue_without_api_key_rejected - client_no_key = Fila::Client.new(@server[:addr]) TestServerHelper.create_queue(@server, 'auth-reject-queue') - # Without API key, the server should reject the request with Unauthenticated. - err = assert_raises(Fila::RPCError) do + client_no_key = nil + err = assert_raises(Fila::AuthenticationError) do + client_no_key = Fila::Client.new(@server[:addr]) client_no_key.enqueue(queue: 'auth-reject-queue', payload: 'should fail') end - assert_equal 16, err.code # GRPC::Core::StatusCodes::UNAUTHENTICATED + assert_match(/api key required|unauthorized/i, err.message) + ensure + client_no_key&.close end def test_consume_with_api_key @@ -121,8 +124,6 @@ def setup @cert_dir = Dir.mktmpdir('fila-certs-') @certs = CertHelper.generate_certs(@cert_dir) - # Server-only TLS: omit ca_cert_path so server does not require client certs. - # client_ca_cert_path is used by the test client to verify the server cert. @server = TestServerHelper.start( tls_config: { server_cert_path: @certs[:server_cert], @@ -138,6 +139,7 @@ def setup end def teardown + @client&.close TestServerHelper.stop(@server) if @server FileUtils.rm_rf(@cert_dir) if @cert_dir end @@ -189,6 +191,7 @@ def setup end def teardown + @client&.close TestServerHelper.stop(@server) if @server FileUtils.rm_rf(@cert_dir) if @cert_dir end @@ -208,8 +211,6 @@ def setup @certs = CertHelper.generate_certs(@cert_dir) @bootstrap_key = 'tls-bootstrap-key-67890' - # Server-only TLS + API key: omit ca_cert_path so server does not require client certs. - # client_ca_cert_path is used by the test client to verify the server cert. @server = TestServerHelper.start( tls_config: { server_cert_path: @certs[:server_cert], @@ -227,6 +228,7 @@ def setup end def teardown + @client&.close TestServerHelper.stop(@server) if @server FileUtils.rm_rf(@cert_dir) if @cert_dir end @@ -240,16 +242,19 @@ def test_enqueue_with_tls_and_api_key end def test_no_api_key_over_tls_rejected - client_no_key = Fila::Client.new( - @server[:addr], - ca_cert: File.read(@certs[:ca_cert]) - ) TestServerHelper.create_queue(@server, 'tls-auth-reject-queue') - err = assert_raises(Fila::RPCError) do + client_no_key = nil + err = assert_raises(Fila::AuthenticationError) do + client_no_key = Fila::Client.new( + @server[:addr], + ca_cert: File.read(@certs[:ca_cert]) + ) client_no_key.enqueue(queue: 'tls-auth-reject-queue', payload: 'should fail') end - assert_equal 16, err.code # UNAUTHENTICATED + assert_match(/api key required|unauthorized/i, err.message) + ensure + client_no_key&.close end end @@ -260,6 +265,7 @@ def setup end def teardown + @client&.close TestServerHelper.stop(@server) if @server end