@@ -9,6 +9,7 @@ writer: std.net.Stream.Writer,
drop: usize = 0,
last_tx: usize = 0,
heartbeat_interval: u16,
timeout: u64 = 10_000_000_000,
srv_topic_aliases: ?[]Alias = null,
cli_topic_aliases: ?[]Alias = null,
@@ -49,7 +50,10 @@ pub fn connect(c: *Client) !bool {
.keep_alive = .{ .seconds = c.heartbeat_interval },
});
// grab the connack packet
const pkt = try c.recv();
const pkt = try c.recv() orelse {
log.err("recv timeout!", .{});
return false;
};
switch (pkt) {
.connack => {
log.err("connack {any}", .{pkt});
@@ -79,21 +83,22 @@ pub fn heartbeat(c: *Client) !void {
}
}
pub fn recv(c: *Client) !Packet.Parsed {
pub fn recv(c: *Client) !?Packet.Parsed {
var fifo = c.poller.fifo(.srv);
var ready = fifo.readableLength();
if (c.drop > 0) {
fifo.discard(c.drop);
c.drop = 0;
ready = fifo.readableLength();
}
var poll_more = ready >= 2 or try c.poller.poll();
var poll_more = true;
while (poll_more) {
ready = fifo.readableLength();
try c.heartbeat();
if (ready < 2) {
poll_more = try c.poller.poll();
try c.heartbeat();
poll_more = try c.poller.pollTimeout(c.timeout);
if (fifo.readableLength() == ready) return null;
continue;
}