@@ -3,7 +3,11 @@ peer: std.net.Stream,
poller: Poller,
//reader: AnyReader,
writer: std.net.Stream.Writer,
/// The packet payload is left in the fifo, between calls to recv to avoid a
/// spurious alloc. Callers that need to retain the information should duplicate
/// data from the response as needed.
drop: usize = 0,
last_tx: usize = 0,
pub const Client = @This();
@@ -28,8 +32,22 @@ pub fn init(a: Allocator, host: []const u8, port: u16) !Client {
/// might be rolled into init
pub fn connect(c: *Client) !void {
var w = c.writer.any();
try (Connect{}).send(&w);
try c.send(Connect{});
}
/// if your type provides a compatible `send()` function delivering via this
/// allows the client to avoid sending additional heartbeats
pub fn send(c: *Client, packet: anytype) !void {
var any = c.writer.any();
c.last_tx = @intCast(std.time.timestamp());
return try packet.send(&any);
}
pub fn heartbeat(c: *Client) !void {
if (std.time.timestamp() > c.last_tx + 500) {
log.err("sending heartbeat", .{});
try c.send(Ping.Req{});
}
}
pub fn recv(c: *Client) !Packet.Parsed {
@@ -41,8 +59,8 @@ pub fn recv(c: *Client) !Packet.Parsed {
}
var poll_more = try c.poller.poll();
while (poll_more) {
try c.heartbeat();
ready = fifo.readableLength();
log.err("loop", .{});
if (ready < 6) {
poll_more = try c.poller.poll();
@@ -70,6 +88,7 @@ const Packet = @import("Packet.zig");
const Publish = @import("Publish.zig");
const Connect = @import("Connect.zig");
const Subscribe = @import("Subscribe.zig");
const Ping = @import("Ping.zig");
const std = @import("std");
const Allocator = std.mem.Allocator;