srctree

Gregory Mullen parent 7efec8b2 ce3005d6
add basic client implementation

filename was Deleted added: 204, removed: 7, total 197
@@ -0,0 +1,78 @@
alloc: Allocator,
peer: std.net.Stream,
poller: Poller,
//reader: AnyReader,
writer: std.net.Stream.Writer,
drop: usize = 0,
 
pub const Client = @This();
 
pub const Poller = std.io.Poller(PEnum);
pub const PEnum = enum { srv };
 
pub fn init(a: Allocator, host: []const u8, port: u16) !Client {
const peer = try std.net.tcpConnectToHost(a, host, port);
const poller = std.io.poll(
a,
PEnum,
.{ .srv = .{ .handle = peer.handle } },
);
const c: Client = .{
.alloc = a,
.peer = peer,
.poller = poller,
.writer = peer.writer(),
};
return c;
}
 
/// might be rolled into init
pub fn connect(c: *Client) !void {
var w = c.writer.any();
try (Connect{}).send(&w);
}
 
pub fn recv(c: *Client) !Packet.Parsed {
var fifo = c.poller.fifo(.srv);
var ready = fifo.readableLength();
if (c.drop > 0 and ready >= c.drop) {
fifo.discard(c.drop);
c.drop = 0;
}
var poll_more = try c.poller.poll();
while (poll_more) {
ready = fifo.readableLength();
log.err("loop", .{});
 
if (ready < 6) {
poll_more = try c.poller.poll();
continue;
}
 
const pkt: Packet.FixedHeader = @bitCast(fifo.readItem().?);
var fr = fifo.reader();
var r = fr.any();
const reported = try Packet.unpackVarInt(&r);
while (ready < reported and poll_more) {
log.err(" getting more data... {}/{}", .{ ready, reported });
poll_more = try c.poller.poll();
ready = fifo.readableLength();
}
const payload = fifo.readableSliceOfLen(reported);
 
c.drop = reported;
return try Packet.parse(pkt, payload);
}
return error.StreamCrashed;
}
 
const Packet = @import("Packet.zig");
const Publish = @import("Publish.zig");
const Connect = @import("Connect.zig");
const Subscribe = @import("Subscribe.zig");
 
const std = @import("std");
const Allocator = std.mem.Allocator;
const log = std.log.scoped(.mqtt);
const AnyReader = std.io.AnyReader;
const AnyWriter = std.io.AnyWriter;
 
src/Connect.zig added: 204, removed: 7, total 197
@@ -21,6 +21,11 @@ pub const KeepAlive = packed struct(u16) {
seconds: u16 = 600,
};
 
pub fn parse(r: *AnyReader) !Connect {
_ = r;
@panic("not implemented");
}
 
pub fn send(c: Connect, any: *AnyWriter) !void {
const props = [_]u8{ 0x11, 0x00, 0x00, 0x00, 0x03 };
const client_id: []const u8 = c.client_id orelse
@@ -286,6 +291,12 @@ pub const Ack = struct {
// Authentication Data more than once. Refer to section 4.12 for more
// information about extended authentication.
};
 
pub fn parse(pkt: []const u8) !Ack {
log.err("Connect Ack not implemented", .{});
log.err("Connect Ack ({}) {any}", .{ pkt.len, pkt });
return .{};
}
};
 
const Packet = @import("Packet.zig");
@@ -293,3 +304,4 @@ const Packet = @import("Packet.zig");
const std = @import("std");
const log = std.log.scoped(.mqtt);
const AnyWriter = std.io.AnyWriter;
const AnyReader = std.io.AnyReader;
 
src/Packet.zig added: 204, removed: 7, total 197
@@ -32,7 +32,7 @@ pub const ControlType = enum(u4) {
DISCONNECT = 14,
AUTH = 15,
 
const Flags = packed struct(u4) {
pub const Flags = packed struct(u4) {
retain: bool = false,
qos: QOS = .at_most_once,
dup: bool = false,
@@ -55,6 +55,55 @@ pub const QOS = enum(u2) {
invalid = 3,
};
 
// TODO find better name
pub const Header = struct {
header: FixedHeader,
length: usize,
};
 
// TODO find better name
pub const Parsed = union(ControlType) {
reserved: void,
CONNECT: Connect,
CONNACK: Connect.Ack,
PUBLISH: Publish,
PUBACK: Publish.Ack,
PUBREC: void,
PUBREL: void,
PUBCOMP: void,
SUBSCRIBE: Subscribe,
SUBACK: Subscribe.Ack,
UNSUBSCRIBE: void,
UNSUBACK: void,
PINGREQ: void,
PINGRESP: void,
DISCONNECT: void,
AUTH: void,
};
 
pub fn parse(header: FixedHeader, payload: []const u8) !Parsed {
var fbs = std.io.fixedBufferStream(payload);
var fbsr = fbs.reader();
var r = fbsr.any();
switch (header.kind) {
.CONNECT => return .{ .CONNECT = try Connect.parse(&r) },
.CONNACK => return .{ .CONNACK = try Connect.Ack.parse(payload) },
.PUBLISH => return .{ .PUBLISH = try Publish.parse(payload, header.flags) },
.PUBACK => return .{ .PUBACK = try Publish.Ack.parse(&r) },
.SUBSCRIBE => return .{ .SUBSCRIBE = try Subscribe.parse(&r) },
.SUBACK => return .{ .SUBACK = try Subscribe.Ack.parse(&r) },
.PUBREC, .PUBREL, .PUBCOMP, .UNSUBSCRIBE, .UNSUBACK, .PINGREQ, .PINGRESP, .DISCONNECT, .AUTH => {
log.err("not implemented parser for {}", .{header.kind});
@panic("not implemented");
},
else => |els| {
log.err("not implemented parser for {}", .{els});
unreachable;
},
}
unreachable;
}
 
pub fn send(p: Packet, any: *AnyWriter) !void {
try any.writeByte(@bitCast(p.header));
_ = try writeVarInt(p.body.len, any);
@@ -115,6 +164,10 @@ test unpackVarInt {
try std.testing.expectEqual(result, 129);
}
 
const Publish = @import("Publish.zig");
const Connect = @import("Connect.zig");
const Subscribe = @import("Subscribe.zig");
 
const std = @import("std");
const log = std.log.scoped(.mqtt);
const AnyWriter = std.io.AnyWriter;
 
src/Publish.zig added: 204, removed: 7, total 197
@@ -148,6 +148,39 @@ pub const Properties = enum(u8) {
// [MQTT-3.3.2-20].
};
 
pub fn parse(publ: []const u8, flags: Packet.ControlType.Flags) !Publish {
var fbs = std.io.fixedBufferStream(publ);
var r = fbs.reader();
log.err("{s} <> {any}", .{ publ, publ });
const slen = try r.readInt(u16, .big);
const topic = publ[2..][0..slen];
try fbs.seekBy(slen);
var pktid: ?u16 = null;
switch (flags.qos) {
.at_most_once => {
log.err(" expecting {s}", .{"nop"});
},
.at_least_once => {
log.err(" expecting {s}", .{"PUBACK"});
pktid = try r.readInt(u16, .big);
//try Publish.Ack.send(pktid.?, .success, &any);
},
.exactly_once => {
log.err(" expecting {s}", .{"PUBREC"});
},
.invalid => @panic("unreachable"),
}
const props = publ[2 + slen .. 2 + slen]; // TODO implement
try fbs.seekBy(1); // short VLI
 
return .{
.topic_name = topic,
.packet_ident = pktid,
.properties = props,
.payload = publ[fbs.pos..],
};
}
 
pub fn send(p: Publish, any: *AnyWriter) !void {
var buffer: [0x4000]u8 = undefined;
var fbs = std.io.fixedBufferStream(&buffer);
@@ -186,6 +219,11 @@ pub const Ack = struct {
_,
};
 
pub fn parse(r: *AnyReader) !Ack {
_ = r;
@panic("not implemented");
}
 
pub fn send(pkt_id: u16, code: Reason, any: *AnyWriter) !void {
var buffer: [0x4000]u8 = undefined;
var fbs = std.io.fixedBufferStream(&buffer);
@@ -205,3 +243,4 @@ const Packet = @import("Packet.zig");
const std = @import("std");
const log = std.log.scoped(.mqtt);
const AnyWriter = std.io.AnyWriter;
const AnyReader = std.io.AnyReader;
 
src/Subscribe.zig added: 204, removed: 7, total 197
@@ -2,6 +2,11 @@ channels: []const []const u8,
 
const Subscribe = @This();
 
pub fn parse(r: *AnyReader) !Subscribe {
_ = r;
@panic("not implemented");
}
 
pub fn send(s: Subscribe, any: *AnyWriter) !void {
var buffer: [0x4000]u8 = undefined;
var fbs = std.io.fixedBufferStream(&buffer);
@@ -22,8 +27,16 @@ pub fn send(s: Subscribe, any: *AnyWriter) !void {
try pkt.send(any);
}
 
pub const Ack = struct {
pub fn parse(r: *AnyReader) !Ack {
_ = r;
return .{};
}
};
 
const Packet = @import("Packet.zig");
 
const std = @import("std");
const log = std.log.scoped(.mqtt);
const AnyWriter = std.io.AnyWriter;
const AnyReader = std.io.AnyReader;
 
src/mqtt.zig added: 204, removed: 7, total 197
@@ -2,6 +2,8 @@
//! https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html
//! At least the parts that make sense
 
pub const Client = @import("Client.zig");
 
pub const Packet = @import("Packet.zig");
pub const Publish = @import("Publish.zig");
pub const Connect = @import("Connect.zig");