@@ -1,5 +1,5 @@
const IoUring = @This();
const std = @import("../../std.zig");
const std = @import("std");
const builtin = @import("builtin");
const assert = std.debug.assert;
const mem = std.mem;
@@ -1440,6 +1440,229 @@ pub const CompletionQueue = struct {
}
};
/// Group of application provided buffers. Uses newer type, called ring mapped
/// buffers, supported since kernel 5.19. Buffers are identified by a buffer
/// group ID, and within that group, a buffer ID. IO_Uring can have multiple
/// buffer groups, each with unique group ID.
///
/// In `init` application provides contiguous block of memory `buffers` for
/// `buffers_count` buffers of size `buffers_size`. Application can then submit
/// `recv` operation without providing buffer upfront. Once the operation is
/// ready to receive data, a buffer is picked automatically and the resulting
/// CQE will contain the buffer ID in `cqe.buffer_id()`. Use `get` method to get
/// buffer for buffer ID identified by CQE. Once the application has processed
/// the buffer, it may hand ownership back to the kernel, by calling `put`
/// allowing the cycle to repeat.
///
/// Depending on the rate of arrival of data, it is possible that a given buffer
/// group will run out of buffers before those in CQEs can be put back to the
/// kernel. If this happens, a `cqe.err()` will have ENOBUFS as the error value.
///
pub const BufferGroup = struct {
/// Parent ring for which this group is registered.
ring: *IoUring,
/// Pointer to the memory shared by the kernel.
/// `buffers_count` of `io_uring_buf` structures are shared by the kernel.
/// First `io_uring_buf` is overlaid by `io_uring_buf_ring` struct.
br: *align(mem.page_size) linux.io_uring_buf_ring,
/// Contiguous block of memory of size (buffers_count * buffer_size).
buffers: []u8,
/// Size of each buffer in buffers.
buffer_size: u32,
// Number of buffers in `buffers`, number of `io_uring_buf structures` in br.
buffers_count: u16,
/// ID of this group, must be unique in ring.
group_id: u16,
pub fn init(
ring: *IoUring,
group_id: u16,
buffers: []u8,
buffer_size: u32,
buffers_count: u16,
) !BufferGroup {
assert(buffers.len == buffers_count * buffer_size);
const br = try setup_buf_ring(ring.fd, buffers_count, group_id);
buf_ring_init(br);
const mask = buf_ring_mask(buffers_count);
var i: u16 = 0;
while (i < buffers_count) : (i += 1) {
const start = buffer_size * i;
const buf = buffers[start .. start + buffer_size];
buf_ring_add(br, buf, i, mask, i);
}
buf_ring_advance(br, buffers_count);
return BufferGroup{
.ring = ring,
.group_id = group_id,
.br = br,
.buffers = buffers,
.buffer_size = buffer_size,
.buffers_count = buffers_count,
};
}
// Prepare recv operation which will select buffer from this group.
pub fn recv(self: *BufferGroup, user_data: u64, fd: os.fd_t, flags: u32) !*linux.io_uring_sqe {
var sqe = try self.ring.get_sqe();
sqe.prep_rw(.RECV, fd, 0, 0, 0);
sqe.rw_flags = flags;
sqe.flags |= linux.IOSQE_BUFFER_SELECT;
sqe.buf_index = self.group_id;
sqe.user_data = user_data;
return sqe;
}
// Prepare multishot recv operation which will select buffer from this group.
pub fn recv_multishot(self: *BufferGroup, user_data: u64, fd: os.fd_t, flags: u32) !*linux.io_uring_sqe {
var sqe = try self.recv(user_data, fd, flags);
sqe.ioprio |= linux.IORING_RECV_MULTISHOT;
return sqe;
}
// Get buffer by id.
pub fn get(self: *BufferGroup, buffer_id: u16) []u8 {
const head = self.buffer_size * buffer_id;
return self.buffers[head .. head + self.buffer_size];
}
// Get buffer by CQE.
pub fn get_cqe(self: *BufferGroup, cqe: linux.io_uring_cqe) ![]u8 {
const buffer_id = try cqe.buffer_id();
const used_len = @as(usize, @intCast(cqe.res));
return self.get(buffer_id)[0..used_len];
}
// Release buffer to the kernel.
pub fn put(self: *BufferGroup, buffer_id: u16) void {
const mask = buf_ring_mask(self.buffers_count);
const buffer = self.get(buffer_id);
buf_ring_add(self.br, buffer, buffer_id, mask, 0);
buf_ring_advance(self.br, 1);
}
// Release buffer from CQE to the kernel.
pub fn put_cqe(self: *BufferGroup, cqe: linux.io_uring_cqe) !void {
self.put(try cqe.buffer_id());
}
pub fn deinit(self: *BufferGroup) void {
free_buf_ring(self.ring.fd, self.br, self.buffers_count, self.group_id);
}
};
/// Registers a shared buffer ring to be used with provided buffers.
/// `entries` number of `io_uring_buf` structures is mem mapped and shared by kernel.
/// `fd` is IO_Uring.fd for which the provided buffer ring is being registered.
/// `entries` is the number of entries requested in the buffer ring, must be power of 2.
/// `group_id` is the chosen buffer group ID, unique in IO_Uring.
pub fn setup_buf_ring(fd: os.fd_t, entries: u16, group_id: u16) !*align(mem.page_size) linux.io_uring_buf_ring {
if (entries == 0 or entries > 1 << 15) return error.EntriesNotInRange;
if (!std.math.isPowerOfTwo(entries)) return error.EntriesNotPowerOfTwo;
const mmap_size = entries * @sizeOf(linux.io_uring_buf);
const mmap = try os.mmap(
null,
mmap_size,
os.PROT.READ | os.PROT.WRITE,
.{ .TYPE = .PRIVATE, .ANONYMOUS = true },
-1,
0,
);
errdefer os.munmap(mmap);
assert(mmap.len == mmap_size);
const br: *align(mem.page_size) linux.io_uring_buf_ring = @ptrCast(mmap.ptr);
try register_buf_ring(fd, @intFromPtr(br), entries, group_id);
return br;
}
fn register_buf_ring(fd: os.fd_t, addr: u64, entries: u32, group_id: u16) !void {
var reg = mem.zeroInit(linux.io_uring_buf_reg, .{
.ring_addr = addr,
.ring_entries = entries,
.bgid = group_id,
});
const res = linux.io_uring_register(
fd,
.REGISTER_PBUF_RING,
@as(*const anyopaque, @ptrCast(®)),
1,
);
try handle_register_buf_ring_result(res);
}
fn unregister_buf_ring(fd: os.fd_t, group_id: u16) !void {
var reg = mem.zeroInit(linux.io_uring_buf_reg, .{
.bgid = group_id,
});
const res = linux.io_uring_register(
fd,
.UNREGISTER_PBUF_RING,
@as(*const anyopaque, @ptrCast(®)),
1,
);
try handle_register_buf_ring_result(res);
}
fn handle_register_buf_ring_result(res: usize) !void {
switch (linux.getErrno(res)) {
.SUCCESS => {},
.INVAL => return error.ArgumentsInvalid,
else => |errno| return os.unexpectedErrno(errno),
}
}
// Unregisters a previously registered shared buffer ring, returned from io_uring_setup_buf_ring.
pub fn free_buf_ring(fd: os.fd_t, br: *align(mem.page_size) linux.io_uring_buf_ring, entries: u32, group_id: u16) void {
unregister_buf_ring(fd, group_id) catch {};
var mmap: []align(mem.page_size) u8 = undefined;
mmap.ptr = @ptrCast(br);
mmap.len = entries * @sizeOf(linux.io_uring_buf);
os.munmap(mmap);
}
/// Initialises `br` so that it is ready to be used.
pub fn buf_ring_init(br: *linux.io_uring_buf_ring) void {
br.tail = 0;
}
/// Calculates the appropriate size mask for a buffer ring.
/// `entries` is the ring entries as specified in io_uring_register_buf_ring.
pub fn buf_ring_mask(entries: u16) u16 {
return entries - 1;
}
/// Assigns `buffer` with the `br` buffer ring.
/// `buffer_id` is identifier which will be returned in the CQE.
/// `buffer_offset` is the offset to insert at from the current tail.
/// If just one buffer is provided before the ring tail is committed with advance then offset should be 0.
/// If buffers are provided in a loop before being committed, the offset must be incremented by one for each buffer added.
pub fn buf_ring_add(
br: *linux.io_uring_buf_ring,
buffer: []u8,
buffer_id: u16,
mask: u16,
buffer_offset: u16,
) void {
const bufs: [*]linux.io_uring_buf = @ptrCast(br);
const buf: *linux.io_uring_buf = &bufs[(br.tail +% buffer_offset) & mask];
buf.addr = @intFromPtr(buffer.ptr);
buf.len = @intCast(buffer.len);
buf.bid = buffer_id;
}
/// Make `count` new buffers visible to the kernel. Called after
/// `io_uring_buf_ring_add` has been called `count` times to fill in new buffers.
pub fn buf_ring_advance(br: *linux.io_uring_buf_ring, count: u16) void {
const tail: u16 = br.tail +% count;
@atomicStore(u16, &br.tail, tail, .release);
}
test "structs/offsets/entries" {
if (builtin.os.tag != .linux) return error.SkipZigTest;
@@ -3652,7 +3875,7 @@ test "waitid" {
try testing.expectEqual(7, siginfo.fields.common.second.sigchld.status);
}
/// For use in tests. Returns SkipZigTest is kernel version is less than required.
/// For use in tests. Returns SkipZigTest if kernel version is less than required.
inline fn skipKernelLessThan(required: std.SemanticVersion) !void {
if (builtin.os.tag != .linux) return error.SkipZigTest;
@@ -3668,3 +3891,342 @@ inline fn skipKernelLessThan(required: std.SemanticVersion) !void {
current.pre = null; // don't check pre field
if (required.order(current) == .gt) return error.SkipZigTest;
}
test BufferGroup {
if (builtin.os.tag != .linux) return error.SkipZigTest;
// Init IoUring
var ring = IoUring.init(16, 0) catch |err| switch (err) {
error.SystemOutdated => return error.SkipZigTest,
error.PermissionDenied => return error.SkipZigTest,
else => return err,
};
defer ring.deinit();
// Init buffer group for ring
const group_id: u16 = 1; // buffers group id
const buffers_count: u16 = 1; // number of buffers in buffer group
const buffer_size: usize = 128; // size of each buffer in group
const buffers = try testing.allocator.alloc(u8, buffers_count * buffer_size);
defer testing.allocator.free(buffers);
var buf_grp = BufferGroup.init(
&ring,
group_id,
buffers,
buffer_size,
buffers_count,
) catch |err| switch (err) {
// kernel older than 5.19
error.ArgumentsInvalid => return error.SkipZigTest,
else => return err,
};
defer buf_grp.deinit();
// Create client/server fds
const fds = try createSocketTestHarness(&ring);
defer fds.close();
const data = [_]u8{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0xa, 0xb, 0xc, 0xd, 0xe };
// Client sends data
{
_ = try ring.send(1, fds.client, data[0..], 0);
const submitted = try ring.submit();
try testing.expectEqual(1, submitted);
const cqe_send = try ring.copy_cqe();
if (cqe_send.err() == .INVAL) return error.SkipZigTest;
try testing.expectEqual(linux.io_uring_cqe{ .user_data = 1, .res = data.len, .flags = 0 }, cqe_send);
}
// Server uses buffer group receive
{
// Submit recv operation, buffer will be choosen from buffer group
_ = try buf_grp.recv(2, fds.server, 0);
const submitted = try ring.submit();
try testing.expectEqual(1, submitted);
// ... when we have completion for recv operation
const cqe = try ring.copy_cqe();
try testing.expectEqual(2, cqe.user_data); // matches submitted user_data
try testing.expect(cqe.res >= 0); // success
try testing.expectEqual(os.E.SUCCESS, cqe.err());
try testing.expectEqual(data.len, @as(usize, @intCast(cqe.res))); // cqe.res holds received data len
// Read buffer_id and used buffer len from cqe
const buffer_id = try cqe.buffer_id();
const len: usize = @intCast(cqe.res);
// Get buffer from pool
const buf = buf_grp.get(buffer_id)[0..len];
try testing.expectEqualSlices(u8, &data, buf);
// Releaase buffer to the kernel when application is done with it
buf_grp.put(buffer_id);
}
}
test "ring mapped buffers recv" {
if (builtin.os.tag != .linux) return error.SkipZigTest;
var ring = IoUring.init(16, 0) catch |err| switch (err) {
error.SystemOutdated => return error.SkipZigTest,
error.PermissionDenied => return error.SkipZigTest,
else => return err,
};
defer ring.deinit();
// init buffer group
const group_id: u16 = 1; // buffers group id
const buffers_count: u16 = 2; // number of buffers in buffer group
const buffer_size: usize = 4; // size of each buffer in group
const buffers = try testing.allocator.alloc(u8, buffers_count * buffer_size);
defer testing.allocator.free(buffers);
var buf_grp = BufferGroup.init(
&ring,
group_id,
buffers,
buffer_size,
buffers_count,
) catch |err| switch (err) {
// kernel older than 5.19
error.ArgumentsInvalid => return error.SkipZigTest,
else => return err,
};
defer buf_grp.deinit();
// create client/server fds
const fds = try createSocketTestHarness(&ring);
defer fds.close();
// for random user_data in sqe/cqe
var Rnd = std.rand.DefaultPrng.init(0);
var rnd = Rnd.random();
var round: usize = 4; // repeat send/recv cycle round times
while (round > 0) : (round -= 1) {
// client sends data
const data = [_]u8{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0xa, 0xb, 0xc, 0xd, 0xe };
{
const user_data = rnd.int(u64);
_ = try ring.send(user_data, fds.client, data[0..], 0);
try testing.expectEqual(@as(u32, 1), try ring.submit());
const cqe_send = try ring.copy_cqe();
if (cqe_send.err() == .INVAL) return error.SkipZigTest;
try testing.expectEqual(linux.io_uring_cqe{ .user_data = user_data, .res = data.len, .flags = 0 }, cqe_send);
}
// server reads data into provided buffers
// there are 2 buffers of size 4, so each read gets only chunk of data
// we read four chunks of 4, 4, 4, 3 bytes each
var chunk: []const u8 = data[0..buffer_size]; // first chunk
const id1 = try expect_buf_grp_recv(&ring, &buf_grp, fds.server, rnd.int(u64), chunk);
chunk = data[buffer_size .. buffer_size * 2]; // second chunk
const id2 = try expect_buf_grp_recv(&ring, &buf_grp, fds.server, rnd.int(u64), chunk);
// both buffers provided to the kernel are used so we get error
// 'no more buffers', until we put buffers to the kernel
{
const user_data = rnd.int(u64);
_ = try buf_grp.recv(user_data, fds.server, 0);
try testing.expectEqual(@as(u32, 1), try ring.submit());
const cqe = try ring.copy_cqe();
try testing.expectEqual(user_data, cqe.user_data);
try testing.expect(cqe.res < 0); // fail
try testing.expectEqual(os.E.NOBUFS, cqe.err());
try testing.expect(cqe.flags & linux.IORING_CQE_F_BUFFER == 0); // IORING_CQE_F_BUFFER flags is set on success only
try testing.expectError(error.NoBufferSelected, cqe.buffer_id());
}
// put buffers back to the kernel
buf_grp.put(id1);
buf_grp.put(id2);
chunk = data[buffer_size * 2 .. buffer_size * 3]; // third chunk
const id3 = try expect_buf_grp_recv(&ring, &buf_grp, fds.server, rnd.int(u64), chunk);
buf_grp.put(id3);
chunk = data[buffer_size * 3 ..]; // last chunk
const id4 = try expect_buf_grp_recv(&ring, &buf_grp, fds.server, rnd.int(u64), chunk);
buf_grp.put(id4);
}
}
test "ring mapped buffers multishot recv" {
if (builtin.os.tag != .linux) return error.SkipZigTest;
var ring = IoUring.init(16, 0) catch |err| switch (err) {
error.SystemOutdated => return error.SkipZigTest,
error.PermissionDenied => return error.SkipZigTest,
else => return err,
};
defer ring.deinit();
// init buffer group
const group_id: u16 = 1; // buffers group id
const buffers_count: u16 = 2; // number of buffers in buffer group
const buffer_size: usize = 4; // size of each buffer in group
const buffers = try testing.allocator.alloc(u8, buffers_count * buffer_size);
defer testing.allocator.free(buffers);
var buf_grp = BufferGroup.init(
&ring,
group_id,
buffers,
buffer_size,
buffers_count,
) catch |err| switch (err) {
// kernel older than 5.19
error.ArgumentsInvalid => return error.SkipZigTest,
else => return err,
};
defer buf_grp.deinit();
// create client/server fds
const fds = try createSocketTestHarness(&ring);
defer fds.close();
// for random user_data in sqe/cqe
var Rnd = std.rand.DefaultPrng.init(0);
var rnd = Rnd.random();
var round: usize = 4; // repeat send/recv cycle round times
while (round > 0) : (round -= 1) {
// client sends data
const data = [_]u8{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0xa, 0xb, 0xc, 0xd, 0xe };
{
const user_data = rnd.int(u64);
_ = try ring.send(user_data, fds.client, data[0..], 0);
try testing.expectEqual(@as(u32, 1), try ring.submit());
const cqe_send = try ring.copy_cqe();
if (cqe_send.err() == .INVAL) return error.SkipZigTest;
try testing.expectEqual(linux.io_uring_cqe{ .user_data = user_data, .res = data.len, .flags = 0 }, cqe_send);
}
// start multishot recv
var recv_user_data = rnd.int(u64);
_ = try buf_grp.recv_multishot(recv_user_data, fds.server, 0);
try testing.expectEqual(@as(u32, 1), try ring.submit()); // submit
// server reads data into provided buffers
// there are 2 buffers of size 4, so each read gets only chunk of data
// we read four chunks of 4, 4, 4, 3 bytes each
var chunk: []const u8 = data[0..buffer_size]; // first chunk
const cqe1 = try expect_buf_grp_cqe(&ring, &buf_grp, recv_user_data, chunk);
try testing.expect(cqe1.flags & linux.IORING_CQE_F_MORE > 0);
chunk = data[buffer_size .. buffer_size * 2]; // second chunk
const cqe2 = try expect_buf_grp_cqe(&ring, &buf_grp, recv_user_data, chunk);
try testing.expect(cqe2.flags & linux.IORING_CQE_F_MORE > 0);
// both buffers provided to the kernel are used so we get error
// 'no more buffers', until we put buffers to the kernel
{
const cqe = try ring.copy_cqe();
try testing.expectEqual(recv_user_data, cqe.user_data);
try testing.expect(cqe.res < 0); // fail
try testing.expectEqual(os.E.NOBUFS, cqe.err());
try testing.expect(cqe.flags & linux.IORING_CQE_F_BUFFER == 0); // IORING_CQE_F_BUFFER flags is set on success only
// has more is not set
// indicates that multishot is finished
try testing.expect(cqe.flags & linux.IORING_CQE_F_MORE == 0);
try testing.expectError(error.NoBufferSelected, cqe.buffer_id());
}
// put buffers back to the kernel
buf_grp.put(try cqe1.buffer_id());
buf_grp.put(try cqe2.buffer_id());
// restart multishot
recv_user_data = rnd.int(u64);
_ = try buf_grp.recv_multishot(recv_user_data, fds.server, 0);
try testing.expectEqual(@as(u32, 1), try ring.submit()); // submit
chunk = data[buffer_size * 2 .. buffer_size * 3]; // third chunk
const cqe3 = try expect_buf_grp_cqe(&ring, &buf_grp, recv_user_data, chunk);
try testing.expect(cqe3.flags & linux.IORING_CQE_F_MORE > 0);
buf_grp.put(try cqe3.buffer_id());
chunk = data[buffer_size * 3 ..]; // last chunk
const cqe4 = try expect_buf_grp_cqe(&ring, &buf_grp, recv_user_data, chunk);
try testing.expect(cqe4.flags & linux.IORING_CQE_F_MORE > 0);
buf_grp.put(try cqe4.buffer_id());
// cancel pending multishot recv operation
{
const cancel_user_data = rnd.int(u64);
_ = try ring.cancel(cancel_user_data, recv_user_data, 0);
try testing.expectEqual(@as(u32, 1), try ring.submit());
// expect completion of cancel operation and completion of recv operation
var cqe_cancel = try ring.copy_cqe();
if (cqe_cancel.err() == .INVAL) return error.SkipZigTest;
var cqe_recv = try ring.copy_cqe();
if (cqe_recv.err() == .INVAL) return error.SkipZigTest;
// don't depend on order of completions
if (cqe_cancel.user_data == recv_user_data and cqe_recv.user_data == cancel_user_data) {
const a = cqe_cancel;
const b = cqe_recv;
cqe_cancel = b;
cqe_recv = a;
}
// Note on different kernel results:
// on older kernel (tested with v6.0.16, v6.1.57, v6.2.12, v6.4.16)
// cqe_cancel.err() == .NOENT
// cqe_recv.err() == .NOBUFS
// on kernel (tested with v6.5.0, v6.5.7)
// cqe_cancel.err() == .SUCCESS
// cqe_recv.err() == .CANCELED
// Upstream reference: https://github.com/axboe/liburing/issues/984
// cancel operation is success (or NOENT on older kernels)
try testing.expectEqual(cancel_user_data, cqe_cancel.user_data);
try testing.expect(cqe_cancel.err() == .NOENT or cqe_cancel.err() == .SUCCESS);
// recv operation is failed with err CANCELED (or NOBUFS on older kernels)
try testing.expectEqual(recv_user_data, cqe_recv.user_data);
try testing.expect(cqe_recv.res < 0);
try testing.expect(cqe_recv.err() == .NOBUFS or cqe_recv.err() == .CANCELED);
try testing.expect(cqe_recv.flags & linux.IORING_CQE_F_MORE == 0);
}
}
}
// Prepare and submit recv using buffer group.
// Test that buffer from group, pointed by cqe, matches expected.
fn expect_buf_grp_recv(
ring: *IoUring,
buf_grp: *BufferGroup,
fd: os.fd_t,
user_data: u64,
expected: []const u8,
) !u16 {
// prepare and submit read
const sqe = try buf_grp.recv(user_data, fd, 0);
try testing.expect(sqe.flags & linux.IOSQE_BUFFER_SELECT == linux.IOSQE_BUFFER_SELECT);
try testing.expect(sqe.buf_index == buf_grp.group_id);
try testing.expectEqual(@as(u32, 1), try ring.submit()); // submit
const cqe = try expect_buf_grp_cqe(ring, buf_grp, user_data, expected);
return try cqe.buffer_id();
}
fn expect_buf_grp_cqe(
ring: *IoUring,
buf_grp: *BufferGroup,
user_data: u64,
expected: []const u8,
) !linux.io_uring_cqe {
// get cqe
const cqe = try ring.copy_cqe();
try testing.expectEqual(user_data, cqe.user_data);
try testing.expect(cqe.res >= 0); // success
try testing.expect(cqe.flags & linux.IORING_CQE_F_BUFFER == linux.IORING_CQE_F_BUFFER); // IORING_CQE_F_BUFFER flag is set
try testing.expectEqual(expected.len, @as(usize, @intCast(cqe.res)));
try testing.expectEqual(os.E.SUCCESS, cqe.err());
// get buffer from pool
const buffer_id = try cqe.buffer_id();
const len = @as(usize, @intCast(cqe.res));
const buf = buf_grp.get(buffer_id)[0..len];
try testing.expectEqualSlices(u8, expected, buf);
return cqe;
}