srctree

Gregory Mullen parent 3973a0b2 8a5195ce
add timeout to client

src/Client.zig added: 15, removed: 9, total 6
@@ -9,6 +9,7 @@ writer: std.net.Stream.Writer,
drop: usize = 0,
last_tx: usize = 0,
heartbeat_interval: u16,
timeout: u64 = 10_000_000_000,
srv_topic_aliases: ?[]Alias = null,
cli_topic_aliases: ?[]Alias = null,
 
@@ -49,7 +50,10 @@ pub fn connect(c: *Client) !bool {
.keep_alive = .{ .seconds = c.heartbeat_interval },
});
// grab the connack packet
const pkt = try c.recv();
const pkt = try c.recv() orelse {
log.err("recv timeout!", .{});
return false;
};
switch (pkt) {
.connack => {
log.err("connack {any}", .{pkt});
@@ -79,21 +83,22 @@ pub fn heartbeat(c: *Client) !void {
}
}
 
pub fn recv(c: *Client) !Packet.Parsed {
pub fn recv(c: *Client) !?Packet.Parsed {
var fifo = c.poller.fifo(.srv);
var ready = fifo.readableLength();
if (c.drop > 0) {
fifo.discard(c.drop);
c.drop = 0;
ready = fifo.readableLength();
}
var poll_more = ready >= 2 or try c.poller.poll();
 
var poll_more = true;
while (poll_more) {
ready = fifo.readableLength();
try c.heartbeat();
 
if (ready < 2) {
poll_more = try c.poller.poll();
try c.heartbeat();
poll_more = try c.poller.pollTimeout(c.timeout);
if (fifo.readableLength() == ready) return null;
continue;
}
 
 
src/Publish.zig added: 15, removed: 9, total 6
@@ -3,7 +3,8 @@ topic_name: []const u8,
//QoS level is 1 or 2. Section 2.2.1 provides more information about Packet
//Identifiers.
packet_ident: ?u16,
//The length of the Properties in the PUBLISH packet Variable Header encoded as a Variable Byte Integer.
//The length of the Properties in the PUBLISH packet Variable Header encoded as
//a Variable Byte Integer.
properties: []const u8,
payload: []const u8,
ack_required: bool = false,