Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions lib/fila/batcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def drain_nonblocking(batch)

# Flush a batch of items via the FIBP transport.
# Groups items by queue to produce one frame per queue.
def flush_batch(items) # rubocop:disable Metrics/AbcSize
def flush_batch(items)
# Group by queue, preserving per-item result queues
groups = items.each_with_index.group_by { |item, _| item.message[:queue] }

Expand All @@ -145,7 +145,9 @@ def flush_batch(items) # rubocop:disable Metrics/AbcSize
rescue Transport::ConnectionClosed => e
broadcast_error(items, RPCError.new(0, "connection closed: #{e.message}"))
rescue StandardError => e
broadcast_error(items, Fila::Error.new(e.message))
# Re-broadcast the original exception so callers see the specific type
# (e.g. QueueNotFoundError, RPCError) rather than a generic Fila::Error.
broadcast_error(items, e)
end

# Convert an EnqueueResult into a String (message_id) or Exception.
Expand Down
2 changes: 1 addition & 1 deletion lib/fila/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ def consume_stream(queue, &block)
when Transport::ConnectionClosed then break
when Exception then raise frame
when String
msg = Codec.decode_consume_push(frame)
msg = Codec.decode_consume_push(frame, queue_name: queue)
block.call(msg) if msg
end
end
Expand Down
165 changes: 103 additions & 62 deletions lib/fila/codec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,93 @@ module Fila
module Codec # rubocop:disable Metrics/ModuleLength
module_function

# -----------------------------------------------------------------------
# Low-level read/write primitives
#
# These are listed first so that module_function makes them available as
# both public module methods (Codec.read_u16 etc.) and private instance
# methods. All higher-level encode/decode methods rely on them via
# implicit self, which only works when they are module functions too.
# -----------------------------------------------------------------------

# @param str [String]
# @return [String] binary fragment (len:u16BE | utf8-bytes)
def encode_str16(str)
bytes = str.encode('UTF-8').b
[bytes.bytesize].pack('n') + bytes
end

# @param buf [String] binary buffer
# @param pos [Integer] byte offset
# @return [Array(String, Integer)] decoded string and new offset
def read_str16(buf, pos)
len, pos = read_u16(buf, pos)
[buf.byteslice(pos, len).force_encoding('UTF-8'), pos + len]
end

# @param buf [String] binary buffer
# @param pos [Integer] byte offset
# @return [Array(Hash, Integer)] decoded headers hash and new offset
def read_headers(buf, pos)
count, pos = read_u8(buf, pos)
headers = {}
count.times do
key, pos = read_str16(buf, pos)
val, pos = read_str16(buf, pos)
headers[key] = val
end
[headers, pos]
end

# @param buf [String] binary buffer
# @param pos [Integer] byte offset
# @return [Array(Integer, Integer)] decoded u8 value and new offset
def read_u8(buf, pos)
[buf.getbyte(pos), pos + 1]
end

# @param buf [String] binary buffer
# @param pos [Integer] byte offset
# @return [Array(Integer, Integer)] decoded big-endian u16 value and new offset
def read_u16(buf, pos)
[buf.byteslice(pos, 2).unpack1('n'), pos + 2]
end

# @param buf [String] binary buffer
# @param pos [Integer] byte offset
# @return [Array(Integer, Integer)] decoded big-endian u32 value and new offset
def read_u32(buf, pos)
[buf.byteslice(pos, 4).unpack1('N'), pos + 4]
end

# -----------------------------------------------------------------------
# Single-message encoder
#
# Wire format: header_count:u8 |
# headers: (key_len:u16BE+key, val_len:u16BE+val)* |
# payload_len:u32BE | payload
# -----------------------------------------------------------------------

# @param msg [Hash] with :payload and optional :headers
# @return [String] binary fragment
def encode_message(msg)
headers = msg[:headers] || {}
buf = [headers.size].pack('C')
headers.each do |key, val|
buf += encode_str16(key.to_s)
buf += encode_str16(val.to_s)
end
payload_b = (msg[:payload] || '').b
buf += [payload_b.bytesize].pack('N') + payload_b
buf
end

# -----------------------------------------------------------------------
# Enqueue request
#
# queue_len:u16BE | queue:utf8
# msg_count:u16BE
# messages... (each: header_count:u8 |
# headers: (key_len:u16BE+key, val_len:u16BE+val)* |
# payload_len:u32BE | payload)
# messages... (each encoded by encode_message)
# -----------------------------------------------------------------------

# @param queue [String]
Expand Down Expand Up @@ -72,29 +151,33 @@ def encode_consume(queue, initial_credits: 256)
#
# Frame payload: msg_count:u16BE | messages...
# Each message: msg_id_len:u16BE+msg_id | fk_len:u16BE+fk |
# attempt_count:u32BE | queue_id_len:u16BE+queue_id |
# header_count:u8 | headers | payload_len:u32BE | payload
# attempt_count:u32BE |
# header_count:u8 | headers: (key_len:u16BE+key, val_len:u16BE+val)* |
# payload_len:u32BE | payload
#
# Note: the queue name is NOT included in the wire frame; it is passed in
# by the caller (from the original consume request) via +queue_name+.
#
# @param payload [String] raw binary frame payload
# @param queue_name [String] name of the queue being consumed
# @return [ConsumeMessage, nil] the first message in the frame
def decode_consume_push(payload)
def decode_consume_push(payload, queue_name: '')
pos = 0
_msg_count, pos = read_u16(payload, pos)
msg_id, pos = read_str16(payload, pos)
fairness_key, pos = read_str16(payload, pos)
attempt_count, pos = read_u32(payload, pos)
queue_id, pos = read_str16(payload, pos)
headers, pos = read_headers(payload, pos)
pay_len, pos = read_u32(payload, pos)
body = payload.byteslice(pos, pay_len)
_msg_count, pos = read_u16(payload, pos)
msg_id, pos = read_str16(payload, pos)
fairness_key, pos = read_str16(payload, pos)
attempt_count, pos = read_u32(payload, pos)
headers, pos = read_headers(payload, pos)
pay_len, pos = read_u32(payload, pos)
body = payload.byteslice(pos, pay_len)

ConsumeMessage.new(
id: msg_id,
headers: headers,
payload: body,
fairness_key: fairness_key,
attempt_count: attempt_count,
queue: queue_id
queue: queue_name
)
end

Expand Down Expand Up @@ -159,53 +242,11 @@ def encode_nack(items)
end

# Decode a nack response (same shape as ack response).
alias decode_nack_response decode_ack_response

private

def encode_message(msg)
headers = msg[:headers] || {}
buf = [headers.size].pack('C')
headers.each do |key, val|
buf += encode_str16(key.to_s)
buf += encode_str16(val.to_s)
end
payload_b = (msg[:payload] || '').b
buf += [payload_b.bytesize].pack('N') + payload_b
buf
end

def encode_str16(str)
bytes = str.encode('UTF-8').b
[bytes.bytesize].pack('n') + bytes
end

def read_str16(buf, pos)
len, pos = read_u16(buf, pos)
[buf.byteslice(pos, len).force_encoding('UTF-8'), pos + len]
end

def read_headers(buf, pos)
count, pos = read_u8(buf, pos)
headers = {}
count.times do
key, pos = read_str16(buf, pos)
val, pos = read_str16(buf, pos)
headers[key] = val
end
[headers, pos]
end

def read_u8(buf, pos)
[buf.getbyte(pos), pos + 1]
end

def read_u16(buf, pos)
[buf.byteslice(pos, 2).unpack1('n'), pos + 2]
end

def read_u32(buf, pos)
[buf.byteslice(pos, 4).unpack1('N'), pos + 4]
#
# module_function does not propagate to aliases, so we define this
# explicitly to ensure it is callable as Codec.decode_nack_response.
def decode_nack_response(payload)
decode_ack_response(payload)
end
end
end
69 changes: 55 additions & 14 deletions lib/fila/transport.rb
Original file line number Diff line number Diff line change
Expand Up @@ -100,25 +100,43 @@ def request(opcode, payload)
# Register a push queue for consume-stream server-push frames.
# Returns corr_id used to issue the consume request.
#
# The server sends two kinds of frames after a consume request:
# 1. An ACK frame (flags=0, corr_id=<request corr_id>, empty payload)
# confirming the consume subscription was registered.
# 2. Push frames (FLAG_SERVER_PUSH set, corr_id=0) carrying messages.
#
# We use a one-shot queue for the ACK (to block until the subscription is
# confirmed) and register +push_q+ at corr_id=0 for ongoing push frames.
#
# @param payload [String] consume request payload
# @param push_q [Queue] messages pushed here as they arrive
# @return [Integer] corr_id
def start_consume(payload, push_q)
corr_id = next_corr_id
ack_q = Queue.new

@mutex.synchronize do
raise ConnectionClosed, 'connection is closed' if @closed

@pending[corr_id] = push_q
@pending[corr_id] = ack_q # consume ACK routed here (one-shot)
@pending[0] = push_q # server-push frames carry corr_id=0
Comment thread
cubic-dev-ai[bot] marked this conversation as resolved.
end

write_frame(OP_CONSUME, corr_id, payload)

# Wait for the ACK so we know the subscription is active before returning.
outcome = ack_q.pop
raise outcome if outcome.is_a?(Exception)

corr_id
end

# Remove the consume push queue and stop dispatching to it.
def stop_consume(corr_id)
@mutex.synchronize { @pending.delete(corr_id) }
@mutex.synchronize do
@pending.delete(corr_id)
@pending.delete(0)
end
end

# Close the connection.
Expand Down Expand Up @@ -169,8 +187,8 @@ def perform_handshake
end

def send_auth
key_bytes = @api_key.encode('UTF-8').b
payload = [key_bytes.bytesize].pack('n') + key_bytes
# The FIBP AUTH frame payload is the raw API key bytes — no length prefix.
payload = @api_key.encode('UTF-8').b
request(OP_AUTH, payload)
end

Expand Down Expand Up @@ -216,16 +234,36 @@ def dispatch_frame(frame)
dest.push(result)
end

# Parse an OP_ERROR frame payload.
#
# FIBP OP_ERROR frames carry a plain UTF-8 message (no numeric code
# prefix). We map well-known error messages to typed Ruby exceptions so
# callers can rescue specific error classes.
def parse_error_frame(payload)
err_code = payload.byteslice(0, 2).unpack1('n')
msg_len = payload.byteslice(2, 2).unpack1('n')
msg = payload.byteslice(4, msg_len).force_encoding('UTF-8')
case err_code
when ERR_QUEUE_NOT_FOUND then QueueNotFoundError.new(msg)
when ERR_MESSAGE_NOT_FOUND then MessageNotFoundError.new(msg)
when ERR_UNAUTHENTICATED then RPCError.new(ERR_UNAUTHENTICATED, msg)
else RPCError.new(err_code, msg)
end
msg = payload.force_encoding('UTF-8')
error_from_message(msg)
end

# Map a plain-text FIBP error message to the appropriate Ruby exception.
def error_from_message(msg)
return RPCError.new(ERR_UNAUTHENTICATED, msg) if auth_error?(msg)
return QueueNotFoundError.new(msg) if queue_not_found_error?(msg)
return MessageNotFoundError.new(msg) if message_not_found_error?(msg)

RPCError.new(0, msg)
end

def auth_error?(msg)
msg.include?('authentication') || msg.include?('unauthenticated') ||
msg.include?('api key') || msg.include?('OP_AUTH')
end

def queue_not_found_error?(msg)
msg.include?('queue not found') || msg.include?('queue does not exist')
end

def message_not_found_error?(msg)
msg.include?('message not found') || msg.include?('lease not found')
end

def write_frame(opcode, corr_id, payload)
Expand Down Expand Up @@ -254,9 +292,12 @@ def read_raw(num_bytes)
buf
end

# corr_id=0 is permanently reserved for server-push frames. Regular
# request IDs cycle through 1..0xFFFFFFFF so they never collide.
def next_corr_id
@mutex.synchronize do
@corr_seq = (@corr_seq + 1) & 0xFFFFFFFF
@corr_seq += 1
@corr_seq = 1 if @corr_seq > 0xFFFFFFFF
@corr_seq
end
end
Expand Down
Loading
Loading