Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ High-performance CoAP server and client library for Zig, built on Linux io_uring
- DTLS 1.2 PSK handshake and encrypted transport
- Pre-allocated in-flight tracking, zero hot-path allocations

See the [protocol compliance roadmap](docs/ROADMAP.md) for planned features.

## Quick Start

### Server
Expand Down
13 changes: 10 additions & 3 deletions bench/client.zig
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const bench_psk: coap.Psk = .{

const SuiteConfig = struct {
host: []const u8 = "127.0.0.1",
bind_address: []const u8 = "0.0.0.0",
port: u16 = 5683,
warmup_count: u32 = 1_000,
window_size: u16 = 256,
Expand Down Expand Up @@ -165,7 +166,7 @@ pub fn main() !void {
if (need_restart) {
kill_server(&server_pid);
const psk: ?coap.Psk = if (s.use_dtls) bench_psk else null;
server_pid = try fork_server(port, srv_tc, psk, counters);
server_pid = try fork_server(port, srv_tc, psk, counters, config.bind_address);
std.Thread.sleep(150 * std.time.ns_per_ms);
current_group = group;
}
Expand Down Expand Up @@ -818,7 +819,7 @@ fn run_bench(

fn make_client_socket(host: []const u8, port: u16) !posix.socket_t {
const dest = try std.net.Address.parseIp(host, port);
const fd = try posix.socket(posix.AF.INET, posix.SOCK.DGRAM, 0);
const fd = try posix.socket(dest.any.family, posix.SOCK.DGRAM, 0);
errdefer posix.close(fd);
try posix.connect(fd, &dest.any, dest.getOsSockLen());

Expand Down Expand Up @@ -876,7 +877,7 @@ fn alloc_shared_counters() !*ServerCounters {
return ptr;
}

fn fork_server(port: u16, thread_count: u16, psk: ?coap.Psk, counters: ?*ServerCounters) !posix.pid_t {
fn fork_server(port: u16, thread_count: u16, psk: ?coap.Psk, counters: ?*ServerCounters, bind_address: []const u8) !posix.pid_t {
const pid = try posix.fork();
if (pid == 0) {
// Silence server log output (info/warn) so it doesn't break the
Expand All @@ -891,6 +892,7 @@ fn fork_server(port: u16, thread_count: u16, psk: ?coap.Psk, counters: ?*ServerC
std.heap.page_allocator,
.{
.port = port,
.bind_address = bind_address,
.buffer_count = 512,
.buffer_size = 1280,
.thread_count = thread_count,
Expand All @@ -906,6 +908,7 @@ fn fork_server(port: u16, thread_count: u16, psk: ?coap.Psk, counters: ?*ServerC
std.heap.page_allocator,
.{
.port = port,
.bind_address = bind_address,
.buffer_count = 512,
.buffer_size = 1280,
.thread_count = thread_count,
Expand Down Expand Up @@ -1141,6 +1144,7 @@ fn print_usage() void {
" --window <n> Client sliding window size (default: 256)\n" ++
" --threads <n> Thread count, 0 = nproc (default: 0)\n" ++
" --no-server Don't fork embedded echo server\n" ++
" --ipv6 Use IPv6 loopback (::1) instead of IPv4\n" ++
"\n" ++
"Filters:\n" ++
" --plain-only Skip DTLS scenarios\n" ++
Expand Down Expand Up @@ -1182,6 +1186,9 @@ fn parse_args() SuiteConfig {
config.thread_count = std.fmt.parseInt(u16, val, 10) catch 0;
} else if (std.mem.eql(u8, arg, "--no-server")) {
config.embedded_server = false;
} else if (std.mem.eql(u8, arg, "--ipv6")) {
config.host = "::1";
config.bind_address = "::";
} else if (std.mem.eql(u8, arg, "--plain-only")) {
config.filter_dtls = false;
} else if (std.mem.eql(u8, arg, "--dtls-only")) {
Expand Down
13 changes: 4 additions & 9 deletions docs/ROADMAP.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,13 @@ These are protocol violations or mandatory omissions in the base CoAP spec.
on first response via `poll()`, `waitForResponse()`, or `routeObserve()`.

### 1.4 IPv6 support (§1)
- **Status:** `[ ]` hardcoded IPv4
- **Status:** `[x]` done
- **Issue:** AF_INET hardcoded in Io.zig, Client.zig, Server.zig. `sockaddr.in`
cast in `decode_recv()` would overflow for `sockaddr_in6`. RFC 7252 treats
IPv6 as essential.
- **Impact:** Cannot deploy on IPv6-only networks (increasingly common in IoT).
- **Effort:** Medium. Requires:
- sockaddr union (in/in6) throughout
- Buffer sizing for 28-byte sockaddr_in6
- Address hashing changes in exchange.zig, rate_limiter.zig, Session.zig
- Dual-stack or family-specific socket creation
- **Perf note:** Larger sockaddr means more cache pressure in address hashing.
Use a compact address representation internally (hash, not raw sockaddr).
- **Resolution:** Family auto-detected from bind/host address. Dual-stack via
`IPV6_V6ONLY=0` when binding `"::"`. Family-aware address hashing in exchange,
rate_limiter, DTLS Cookie, and DTLS Session. Bench supports `--ipv6` flag.

### 1.5 Option order validation on decode (§5.4.6)
- **Status:** `[x]` done — structurally enforced by delta encoding
Expand Down
7 changes: 3 additions & 4 deletions src/Client.zig
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ const Client = @This();

/// Client configuration. All fields have sensible defaults.
pub const Config = struct {
/// Server IPv4 address. Default: `"127.0.0.1"`.
/// Server address (IPv4 or IPv6). Default: `"127.0.0.1"`.
host: []const u8 = "127.0.0.1",
/// Server UDP port. Default: 5683 (CoAP standard).
port: u16 = constants.port_default,
Expand Down Expand Up @@ -369,7 +369,7 @@ const empty_sentinel: u16 = 0xFFFF;
/// by the client and freed in `deinit()`.
///
/// Returns `error.InvalidConfig` if `max_in_flight` is 0 or `buffer_size` < 64.
/// Returns `error.UnsupportedAddressFamily` for non-IPv4 addresses.
/// Supports both IPv4 and IPv6 addresses.
pub fn init(allocator: std.mem.Allocator, config: Config) !Client {
if (config.max_in_flight == 0) return error.InvalidConfig;
if (config.buffer_size < 64) return error.InvalidConfig;
Expand All @@ -383,9 +383,8 @@ pub fn init(allocator: std.mem.Allocator, config: Config) !Client {
}

const dest = try std.net.Address.parseIp(effective_config.host, effective_config.port);
if (dest.any.family != posix.AF.INET) return error.UnsupportedAddressFamily;

const fd = try posix.socket(posix.AF.INET, posix.SOCK.DGRAM | posix.SOCK.NONBLOCK, 0);
const fd = try posix.socket(dest.any.family, posix.SOCK.DGRAM | posix.SOCK.NONBLOCK, 0);
errdefer posix.close(fd);
try posix.connect(fd, &dest.any, dest.getOsSockLen());

Expand Down
33 changes: 17 additions & 16 deletions src/Io.zig
Original file line number Diff line number Diff line change
Expand Up @@ -74,21 +74,21 @@ pub fn deinit(io: *Io, allocator: std.mem.Allocator) void {
/// Bind a UDP socket and provide buffers to the kernel pool.
pub fn setup(io: *Io, port: u16, bind_address: []const u8) !void {
const address = try std.net.Address.parseIp(bind_address, port);
const family = address.any.family;

// Only IPv4 is supported; IPv6 requires larger sockaddr buffers
// throughout the recv/send paths.
if (address.any.family != posix.AF.INET) return error.UnsupportedAddressFamily;

const fd = try posix.socket(
posix.AF.INET,
posix.SOCK.DGRAM,
0,
);
const fd = try posix.socket(family, posix.SOCK.DGRAM, 0);
io.fd_socket = fd;

try posix.setsockopt(fd, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
try posix.setsockopt(fd, posix.SOL.SOCKET, posix.SO.REUSEPORT, &std.mem.toBytes(@as(c_int, 1)));

// Enable dual-stack for IPv6 sockets (accept both v4 and v6 clients).
if (family == posix.AF.INET6) {
posix.setsockopt(fd, linux.IPPROTO.IPV6, linux.IPV6.V6ONLY, &std.mem.toBytes(@as(c_int, 0))) catch |err| {
log.debug("IPV6_V6ONLY: {}", .{err});
};
}

// Increase socket buffers for throughput.
const buf_size = std.mem.toBytes(@as(c_int, 4 * 1024 * 1024));
posix.setsockopt(fd, posix.SOL.SOCKET, posix.SO.SNDBUF, &buf_size) catch |err| {
Expand Down Expand Up @@ -202,14 +202,15 @@ pub fn decode_recv(io: *const Io, cqe: *const Cqe) !RecvResult {
return error.PayloadOutOfBounds;
}

const peer_addr: *linux.sockaddr.in =
@ptrCast(@alignCast(io.buffers.ptr + name_offset));
// Determine address family from the first 2 bytes of the name area.
const peer_family = std.mem.readInt(u16, io.buffers[name_offset..][0..2], .little);

// Port from sockaddr is in network byte order; initIp4 expects host order.
const net_address = std.net.Address.initIp4(
@bitCast(peer_addr.addr),
std.mem.bigToNative(u16, peer_addr.port),
);
const net_address: std.net.Address = if (peer_family == posix.AF.INET)
.{ .in = .{ .sa = @as(*const linux.sockaddr.in, @ptrCast(@alignCast(io.buffers.ptr + name_offset))).* } }
else if (peer_family == posix.AF.INET6)
.{ .in6 = .{ .sa = @as(*const linux.sockaddr.in6, @ptrCast(@alignCast(io.buffers.ptr + name_offset))).* } }
else
return error.UnsupportedAddressFamily;

return .{
.peer_address = net_address,
Expand Down
69 changes: 59 additions & 10 deletions src/Server.zig
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ exchange_lifetime_ms: u32,
running: std.atomic.Value(bool),

// Pre-allocated per-CQE response state.
addrs_response: []linux.sockaddr,
addrs_response: []std.net.Address,
msgs_response: []linux.msghdr_const,
iovs_response: []posix.iovec,
buffer_response: []u8,
Expand All @@ -122,7 +122,7 @@ buffer_response: []u8,
emergency_ack: []u8,

// Recv state.
addr_recv: linux.sockaddr,
addr_recv: linux.sockaddr.in6,
msg_recv: linux.msghdr,

// Eviction timer.
Expand Down Expand Up @@ -268,7 +268,7 @@ fn init_raw(
);

const addrs_response = try allocator.alloc(
linux.sockaddr,
std.net.Address,
batch,
);
errdefer allocator.free(addrs_response);
Expand Down Expand Up @@ -338,7 +338,7 @@ fn init_raw(
.iovs_response = iovs_response,
.buffer_response = buffer_response,
.emergency_ack = emergency_ack,
.addr_recv = std.mem.zeroes(linux.sockaddr),
.addr_recv = std.mem.zeroes(linux.sockaddr.in6),
.msg_recv = std.mem.zeroes(linux.msghdr),
.last_eviction_ns = 0,
.tick_count = 0,
Expand Down Expand Up @@ -387,8 +387,10 @@ pub fn deinit(server: *Server) void {
pub fn listen(server: *Server) !void {
try server.io.setup(server.config.port, server.config.bind_address);

server.msg_recv.name = &server.addr_recv;
server.msg_recv.namelen = @sizeOf(linux.sockaddr);
server.msg_recv.name = @ptrCast(&server.addr_recv);
// Set name buffer size based on actual socket family.
const bind_addr = try std.net.Address.parseIp(server.config.bind_address, server.config.port);
server.msg_recv.namelen = bind_addr.getOsSockLen();
server.msg_recv.controllen = 0;

try server.io.recv_multishot(&server.msg_recv);
Expand Down Expand Up @@ -802,8 +804,8 @@ fn handle_recv(
// Rate limiting in throttled mode.
if (server.load_level == .throttled) {
if (server.rate_limiter) |*rl| {
const ip = recv.peer_address.in.sa.addr;
if (!rl.allow(ip, server.tick_now_ns)) {
const addr_key = RateLimiter.AddrKey.fromAddress(recv.peer_address);
if (!rl.allow(addr_key, server.tick_now_ns)) {
const is_con_raw = recv.payload.len >= 1 and ((recv.payload[0] >> 4) & 0x03) == 0;
release_buffer_robust(&server.io, recv.buffer_id);
server.buffers_outstanding -|= 1;
Expand Down Expand Up @@ -1632,7 +1634,7 @@ fn send_raw(
peer_address: std.net.Address,
index: usize,
) !void {
server.addrs_response[index] = peer_address.any;
server.addrs_response[index] = peer_address;

server.iovs_response[index] = .{
.base = @ptrCast(@constCast(data.ptr)),
Expand All @@ -1641,7 +1643,7 @@ fn send_raw(

server.msgs_response[index] = .{
.name = @ptrCast(&server.addrs_response[index]),
.namelen = @sizeOf(linux.sockaddr),
.namelen = peer_address.getOsSockLen(),
.iov = @ptrCast(&server.iovs_response[index]),
.iovlen = 1,
.control = null,
Expand Down Expand Up @@ -2393,3 +2395,50 @@ test "recognized_options allows custom critical options" {
try testing.expectEqual(.content, response.code);
try testing.expectEqualSlices(u8, "ok", response.payload);
}

fn test_client_ip(host: []const u8, port: u16) !posix.socket_t {
const dest = try std.net.Address.parseIp(host, port);
const fd = try posix.socket(dest.any.family, posix.SOCK.DGRAM, 0);
const timeout = posix.timeval{ .sec = 1, .usec = 0 };
try posix.setsockopt(fd, posix.SOL.SOCKET, posix.SO.RCVTIMEO, std.mem.asBytes(&timeout));
try posix.connect(fd, &dest.any, dest.getOsSockLen());
return fd;
}

test "round-trip: NON echo via IPv6 loopback" {
const port: u16 = 19715;

var server = Server.init(testing.allocator, .{
.port = port,
.bind_address = "::1",
.buffer_count = 8,
.buffer_size = 1280,
.rate_limit_ip_count = 0,
}, echo_handler) catch return;
defer server.deinit();
try setup_for_test(&server);

const request_packet = coapz.Packet{
.kind = .non_confirmable,
.code = .get,
.msg_id = 0x1234,
.token = &.{ 0xAA, 0xBB },
.options = &.{},
.payload = "ipv6",
.data_buf = &.{},
};
const wire = try request_packet.write(testing.allocator);
defer testing.allocator.free(wire);

const client_fd = test_client_ip("::1", port) catch return;
defer posix.close(client_fd);

const raw = try send_tick_recv(&server, client_fd, wire);
defer testing.allocator.free(raw);

const response = try coapz.Packet.read(testing.allocator, raw);
defer response.deinit(testing.allocator);

try testing.expectEqual(.content, response.code);
try testing.expectEqualSlices(u8, "ipv6", response.payload);
}
10 changes: 8 additions & 2 deletions src/dtls/Cookie.zig
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,20 @@
///
/// Cookie = HMAC-SHA256(server_secret, client_addr_bytes || client_random)
const std = @import("std");
const posix = std.posix;
const HmacSha256 = std.crypto.auth.hmac.sha2.HmacSha256;

/// Generate a stateless DTLS cookie.
pub fn generate(server_secret: [32]u8, client_addr: std.net.Address, client_random: [32]u8) [32]u8 {
const addr_bytes: [16]u8 = @bitCast(client_addr.any);
var mac: [32]u8 = undefined;
var h = HmacSha256.init(&server_secret);
h.update(&addr_bytes);
// Hash the full sockaddr for the active family. Use pointer to
// client_addr directly (not a copy) to avoid dangling slice.
switch (client_addr.any.family) {
posix.AF.INET => h.update(std.mem.asBytes(&client_addr.in)),
posix.AF.INET6 => h.update(std.mem.asBytes(&client_addr.in6)),
else => h.update(std.mem.asBytes(&client_addr.in)),
}
h.update(&client_random);
h.final(&mac);
return mac;
Expand Down
18 changes: 13 additions & 5 deletions src/dtls/Session.zig
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
/// Pre-allocated open-addressed hash table with intrusive LRU doubly-linked
/// list and free list. Provides O(1) lookup, allocation, and eviction.
const std = @import("std");
const posix = std.posix;
const Sha256 = std.crypto.hash.sha2.Sha256;

pub const State = enum(u8) {
Expand Down Expand Up @@ -347,19 +348,26 @@ pub const SessionTable = struct {
};

fn addrHash(addr: std.net.Address) u64 {
const addr_bytes: [16]u8 = @bitCast(addr.any);
const bytes = switch (addr.any.family) {
posix.AF.INET => std.mem.asBytes(&addr.in),
posix.AF.INET6 => std.mem.asBytes(&addr.in6),
else => std.mem.asBytes(&addr.in),
};
var hash: u64 = 0xcbf29ce484222325; // FNV-1a 64-bit offset basis
for (addr_bytes) |b| {
for (bytes) |b| {
hash ^= b;
hash *%= 0x100000001b3; // FNV-1a 64-bit prime
}
return hash;
}

fn addrEqual(a: std.net.Address, b: std.net.Address) bool {
const ab: [16]u8 = @bitCast(a.any);
const bb: [16]u8 = @bitCast(b.any);
return std.mem.eql(u8, &ab, &bb);
if (a.any.family != b.any.family) return false;
return switch (a.any.family) {
posix.AF.INET => std.mem.eql(u8, std.mem.asBytes(&a.in), std.mem.asBytes(&b.in)),
posix.AF.INET6 => std.mem.eql(u8, std.mem.asBytes(&a.in6), std.mem.asBytes(&b.in6)),
else => false,
};
}

fn wrappingDistance(from: u32, to: u32, mask: u32) u32 {
Expand Down
Loading
Loading