@@ -8,13 +8,18 @@ writer: std.net.Stream.Writer,
/// data from the response as needed.
drop: usize = 0,
last_tx: usize = 0,
heartbeat_interval: u16,
pub const Client = @This();
pub const Poller = std.io.Poller(PEnum);
pub const PEnum = enum { srv };
pub fn init(a: Allocator, host: []const u8, port: u16) !Client {
pub const Options = struct {
heartbeat_interval: u16 = 3600,
};
pub fn init(a: Allocator, host: []const u8, port: u16, opts: Options) !Client {
const peer = try std.net.tcpConnectToHost(a, host, port);
const poller = std.io.poll(
a,
@@ -26,13 +31,28 @@ pub fn init(a: Allocator, host: []const u8, port: u16) !Client {
.peer = peer,
.poller = poller,
.writer = peer.writer(),
.heartbeat_interval = opts.heartbeat_interval,
};
return c;
}
/// might be rolled into init
pub fn connect(c: *Client) !void {
try c.send(Connect{});
pub fn connect(c: *Client) !bool {
try c.send(Connect{
.keep_alive = .{ .seconds = c.heartbeat_interval },
});
// grab the connack packet
const pkt = try c.recv();
switch (pkt) {
.CONNACK => {
// handle connack, and record settings
return true;
},
else => {
log.err("Unexpected packet => {any}", .{pkt});
return false;
},
}
}
/// if your type provides a compatible `send()` function delivering via this
@@ -44,7 +64,8 @@ pub fn send(c: *Client, packet: anytype) !void {
}
pub fn heartbeat(c: *Client) !void {
if (std.time.timestamp() > c.last_tx + 500) {
const beat_delay: u16 = @truncate(@as(usize, c.heartbeat_interval) * 90 / 100);
if (std.time.timestamp() > c.last_tx + beat_delay) {
log.err("sending heartbeat", .{});
try c.send(Ping.Req{});
}
@@ -84,6 +105,10 @@ pub fn recv(c: *Client) !Packet.Parsed {
return error.StreamCrashed;
}
test Client {
std.testing.refAllDecls(@This());
}
const Packet = @import("Packet.zig");
const Publish = @import("Publish.zig");
const Connect = @import("Connect.zig");