srctree

Gregory Mullen parent ce3005d6 3ecc57f6
prevent client from timing out

src/Client.zig added: 63, removed: 17, total 46
@@ -3,7 +3,11 @@ peer: std.net.Stream,
poller: Poller,
//reader: AnyReader,
writer: std.net.Stream.Writer,
/// The packet payload is left in the fifo, between calls to recv to avoid a
/// spurious alloc. Callers that need to retain the information should duplicate
/// data from the response as needed.
drop: usize = 0,
last_tx: usize = 0,
 
pub const Client = @This();
 
@@ -28,8 +32,22 @@ pub fn init(a: Allocator, host: []const u8, port: u16) !Client {
 
/// might be rolled into init
pub fn connect(c: *Client) !void {
var w = c.writer.any();
try (Connect{}).send(&w);
try c.send(Connect{});
}
 
/// if your type provides a compatible `send()` function delivering via this
/// allows the client to avoid sending additional heartbeats
pub fn send(c: *Client, packet: anytype) !void {
var any = c.writer.any();
c.last_tx = @intCast(std.time.timestamp());
return try packet.send(&any);
}
 
pub fn heartbeat(c: *Client) !void {
if (std.time.timestamp() > c.last_tx + 500) {
log.err("sending heartbeat", .{});
try c.send(Ping.Req{});
}
}
 
pub fn recv(c: *Client) !Packet.Parsed {
@@ -41,8 +59,8 @@ pub fn recv(c: *Client) !Packet.Parsed {
}
var poll_more = try c.poller.poll();
while (poll_more) {
try c.heartbeat();
ready = fifo.readableLength();
log.err("loop", .{});
 
if (ready < 6) {
poll_more = try c.poller.poll();
@@ -70,6 +88,7 @@ const Packet = @import("Packet.zig");
const Publish = @import("Publish.zig");
const Connect = @import("Connect.zig");
const Subscribe = @import("Subscribe.zig");
const Ping = @import("Ping.zig");
 
const std = @import("std");
const Allocator = std.mem.Allocator;
 
src/Packet.zig added: 63, removed: 17, total 46
@@ -75,8 +75,8 @@ pub const Parsed = union(ControlType) {
SUBACK: Subscribe.Ack,
UNSUBSCRIBE: void,
UNSUBACK: void,
PINGREQ: void,
PINGRESP: void,
PINGREQ: Ping.Req,
PINGRESP: Ping.Resp,
DISCONNECT: void,
AUTH: void,
};
@@ -92,7 +92,9 @@ pub fn parse(header: FixedHeader, payload: []const u8) !Parsed {
.PUBACK => return .{ .PUBACK = try Publish.Ack.parse(&r) },
.SUBSCRIBE => return .{ .SUBSCRIBE = try Subscribe.parse(&r) },
.SUBACK => return .{ .SUBACK = try Subscribe.Ack.parse(&r) },
.PUBREC, .PUBREL, .PUBCOMP, .UNSUBSCRIBE, .UNSUBACK, .PINGREQ, .PINGRESP, .DISCONNECT, .AUTH => {
.PINGREQ => return .{ .PINGREQ = try Ping.Req.parse(payload) },
.PINGRESP => return .{ .PINGRESP = try Ping.Resp.parse(payload) },
.PUBREC, .PUBREL, .PUBCOMP, .UNSUBSCRIBE, .UNSUBACK, .DISCONNECT, .AUTH => {
log.err("not implemented parser for {}", .{header.kind});
@panic("not implemented");
},
@@ -116,7 +118,7 @@ pub fn writeVarInt(requested: usize, any: *AnyWriter) !usize {
var written: usize = 0;
var len = requested;
if (len > 0xffffff7f) return error.PayloadTooLarge;
while (len > 0) {
while (written == 0 or len > 0) {
const byte: u8 = @truncate(len & 0x7f);
len >>= 7;
try any.writeByte(byte | if (len > 0) 0x80 else @as(u8, 0x00));
@@ -167,6 +169,7 @@ test unpackVarInt {
const Publish = @import("Publish.zig");
const Connect = @import("Connect.zig");
const Subscribe = @import("Subscribe.zig");
const Ping = @import("Ping.zig");
 
const std = @import("std");
const log = std.log.scoped(.mqtt);
 
filename was Deleted added: 63, removed: 17, total 46
@@ -0,0 +1,23 @@
pub const Req = struct {
pub fn send(_: Req, any: *AnyWriter) !void {
return try (Packet{ .header = .{ .kind = .PINGREQ }, .body = &[0]u8{} }).send(any);
}
 
pub fn parse(_: []const u8) !Req {
return .{};
}
};
 
pub const Resp = struct {
pub fn send(_: Resp, any: *AnyWriter) !void {
return try (Packet{ .header = .{ .kind = .PINGRESP }, .body = &[0]u8{} }).send(any);
}
 
pub fn parse(_: []const u8) !Resp {
return .{};
}
};
 
const std = @import("std");
const Packet = @import("Packet.zig");
const AnyWriter = std.io.AnyWriter;
 
src/Publish.zig added: 63, removed: 17, total 46
@@ -6,6 +6,7 @@ packet_ident: ?u16,
//The length of the Properties in the PUBLISH packet Variable Header encoded as a Variable Byte Integer.
properties: []const u8,
payload: []const u8,
ack_required: bool = false,
 
const Publish = @This();
 
@@ -151,22 +152,21 @@ pub const Properties = enum(u8) {
pub fn parse(publ: []const u8, flags: Packet.ControlType.Flags) !Publish {
var fbs = std.io.fixedBufferStream(publ);
var r = fbs.reader();
log.err("{s} <> {any}", .{ publ, publ });
const slen = try r.readInt(u16, .big);
const topic = publ[2..][0..slen];
try fbs.seekBy(slen);
var pktid: ?u16 = null;
var ack_required = false;
switch (flags.qos) {
.at_most_once => {
log.err(" expecting {s}", .{"nop"});
},
.at_most_once => {},
.at_least_once => {
log.err(" expecting {s}", .{"PUBACK"});
ack_required = true;
pktid = try r.readInt(u16, .big);
//try Publish.Ack.send(pktid.?, .success, &any);
},
.exactly_once => {
log.err(" expecting {s}", .{"PUBREC"});
log.err(" expecting {s} (not implemented)", .{"PUBREC"});
ack_required = true;
},
.invalid => @panic("unreachable"),
}
@@ -178,6 +178,7 @@ pub fn parse(publ: []const u8, flags: Packet.ControlType.Flags) !Publish {
.packet_ident = pktid,
.properties = props,
.payload = publ[fbs.pos..],
.ack_required = ack_required,
};
}