srctree

Gregory Mullen parent ec2cd8a2 3314d308
more support for connack

src/Client.zig added: 160, removed: 35, total 125
@@ -9,12 +9,19 @@ writer: std.net.Stream.Writer,
drop: usize = 0,
last_tx: usize = 0,
heartbeat_interval: u16,
srv_topic_aliases: ?[]Alias = null,
cli_topic_aliases: ?[]Alias = null,
 
pub const Client = @This();
 
pub const Poller = std.io.Poller(PEnum);
pub const PEnum = enum { srv };
 
pub const Alias = struct {
num: u16,
alias: []const u8,
};
 
pub const Options = struct {
heartbeat_interval: u16 = 3600,
};
@@ -45,6 +52,7 @@ pub fn connect(c: *Client) !bool {
const pkt = try c.recv();
switch (pkt) {
.CONNACK => {
log.err("connack {any}", .{pkt});
// handle connack, and record settings
return true;
},
 
src/Connect.zig added: 160, removed: 35, total 125
@@ -227,10 +227,16 @@ pub const Properties = enum(u8) {
};
 
pub const Ack = struct {
pub const Properties = enum(u8) {
//This is the length of the Properties in the CONNACK packet Variable
//Header encoded as a Variable Byte Integer.
reason: Reason,
srv_opts: SrvOptions,
 
pub const SrvOptions = struct {
topic_alias_max: u16 = 0,
recv_max: ?u16 = null,
wildcards: bool = true,
};
 
pub const Property = enum(u8) {
expiry_interval = 17,
//Followed by the Four Byte Integer representing the Session Expiry
//Interval in seconds. It is a Protocol Error to include the Session
@@ -241,15 +247,8 @@ pub const Ack = struct {
//description of the use of Session Expiry Interval.
 
receive_maximum = 33,
//Followed by the Two Byte Integer representing the Receive Maximum
//value. It is a Protocol Error to include the Receive Maximum value
//more than once or for it to have the value 0. The Server uses this
//value to limit the number of QoS 1 and QoS 2 publications that it is
//willing to process concurrently for the Client. It does not provide a
//mechanism to limit the QoS 0 publications that the Client might try to
//send. If the Receive Maximum value is absent, then its value defaults
//to 65,535. Refer to section 4.9 Flow Control for details of how the
//Receive Maximum is used.
//It is a Protocol Error to include the Receive Maximum value more than
//once or for it to have the value 0.
 
maximum_qos = 36,
// (0x24) Byte, Identifier of the Maximum QoS. Followed by a Byte with a
@@ -323,19 +322,6 @@ pub const Ack = struct {
// used by any other Session currently in the Server [MQTT-3.2.2-16].
 
topic_alias_maximum = 34,
// (0x22) Byte, Identifier of the Topic Alias Maximum. Followed by the
// Two Byte Integer representing the Topic Alias Maximum value. It is a
// Protocol Error to include the Topic Alias Maximum value more than
// once. If the Topic Alias Maximum property is absent, the default
// value is 0. This value indicates the highest value that the Server
// will accept as a Topic Alias sent by the Client. The Server uses this
// value to limit the number of Topic Aliases that it is willing to hold
// on this Connection. The Client MUST NOT send a Topic Alias in a
// PUBLISH packet to the Server greater than this value [MQTT-3.2.2-17].
// A value of 0 indicates that the Server does not accept any Topic
// Aliases on this connection. If Topic Alias Maximum is absent or 0,
// the Client MUST NOT send any Topic Aliases on to the Server
// [MQTT-3.2.2-18].
 
reason_string = 31,
// (0x1F) Byte Identifier of the Reason String. Followed by the UTF-8
@@ -464,12 +450,142 @@ pub const Ack = struct {
// exchanged authentication data. It is a Protocol Error to include the
// Authentication Data more than once. Refer to section 4.12 for more
// information about extended authentication.
 
pub fn parse(r: *AnyReader, len: usize) !SrvOptions {
var remain = len;
var opts: SrvOptions = .{};
while (remain > 0) {
remain -|= 1;
const prop = intToEnum(Property, try r.readByte()) catch return error.InvalidProperty;
switch (prop) {
.topic_alias_maximum => {
remain -|= 2;
opts.topic_alias_max = try r.readInt(u16, .big);
},
.receive_maximum => {
remain -|= 2;
opts.recv_max = try r.readInt(u16, .big);
},
else => |pk| {
log.err("Not Implemented conn ack prop {s}", .{@tagName(pk)});
return error.NotSupported;
},
}
}
return opts;
}
};
 
pub fn parse(pkt: []const u8) !Ack {
log.err("Connect Ack not implemented", .{});
log.err("Connect Ack ({}) {any}", .{ pkt.len, pkt });
return .{};
pub const Flags = packed struct(u8) {
session_exists: bool,
reserved: u7,
};
 
pub const Reason = enum(u8) {
success = 0,
//The Connection is accepted.
 
unspecified_error = 128,
//The Server does not wish to reveal the reason for the failure, or none
//of the other Reason Codes apply.
 
malformed_packet = 129,
//Data within the CONNECT packet could not be correctly parsed.
 
protocol_error = 130,
//Data in the CONNECT packet does not conform to this specification.
 
implementation_specific_error = 131,
//The CONNECT is valid but is not accepted by this Server.
 
unsupported_protocol_version = 132,
//The Server does not support the version of the MQTT protocol requested
//by the Client.
 
client_identifier_not_valid = 133,
//The Client Identifier is a valid string but is not allowed by the
//Server.
 
bad_user_name_or_password = 134,
//The Server does not accept the User Name or Password specified by the
//Client
 
not_authorized = 135,
//The Client is not authorized to connect.
 
server_unavailable = 136,
//The MQTT Server is not available.
 
server_busy = 137,
//The Server is busy. Try again later.
 
banned = 138,
//This Client has been banned by administrative action. Contact the
//server administrator.
 
bad_authentication_method = 140,
//The authentication method is not supported or does not match the
//authentication method currently in use.
 
topic_name_invalid = 144,
//The Will Topic Name is not malformed, but is not accepted by this
//Server.
 
packet_too_large = 149,
//The CONNECT packet exceeded the maximum permissible size.
 
quota_exceeded = 151,
//An implementation or administrative imposed limit has been exceeded.
 
payload_format_invalid = 153,
//The Will Payload does not match the specified Payload Format
//Indicator.
 
retain_not_supported = 154,
//The Server does not support retained messages, and Will Retain was set
//to 1.
 
qos_not_supported = 155,
//The Server does not support the QoS set in Will QoS.
 
use_another_server = 156,
//The Client should temporarily use another server.
 
server_moved = 157,
//The Client should permanently use another server.
 
connection_rate_exceeded = 159,
//The connection rate limit has been exceeded.
};
 
pub fn parse(r: *AnyReader) !Ack {
const flags: Ack.Flags = @bitCast(try r.readByte());
if (flags.reserved != 0) return error.InvalidPacket; // TODO determine if
// strictly following the spec is better or if RFC 761 2.10 should take priority
const reason = intToEnum(Reason, try r.readByte()) catch return error.InvalidPacket;
if (reason != .success) {
log.err("Connect Ack error {}", .{reason});
return error.ConnectionRejected;
}
const proplen = try Packet.unpackVarInt(r);
const opts = try Property.parse(r, proplen);
 
return .{
.reason = reason,
.srv_opts = opts,
};
}
 
test "parse" {
const pkt = [_]u8{ 0, 0, 6, 34, 0, 10, 33, 0, 20 };
var fbs = std.io.fixedBufferStream(&pkt);
var fbr = fbs.reader();
var r = fbr.any();
const a = try Ack.parse(&r);
try std.testing.expectEqualDeep(a, Ack{
.reason = .success,
.srv_opts = .{ .topic_alias_max = 10, .recv_max = 20 },
});
}
};
 
@@ -498,3 +614,4 @@ const std = @import("std");
const log = std.log.scoped(.mqtt);
const AnyWriter = std.io.AnyWriter;
const AnyReader = std.io.AnyReader;
const intToEnum = std.meta.intToEnum;
 
src/Packet.zig added: 160, removed: 35, total 125
@@ -43,8 +43,8 @@ pub const ControlType = enum(u4) {
pub fn flags(cpt: ControlType) !Flags {
return switch (cpt) {
.reserved => error.InvalidCPT,
.CONNECT, .CONNACK, .PUBLISH, .PUBACK, .PUBREC, .PUBCOMP, .SUBACK => .{},
.UNSUBACK, .PINGREQ, .PINGRESP, .DISCONNECT, .AUTH => .{},
.CONNECT, .CONNACK, .PUBLISH, .PUBACK, .PUBREC, .PUBCOMP => .{},
.SUBACK, .UNSUBACK, .PINGREQ, .PINGRESP, .DISCONNECT, .AUTH => .{},
.PUBREL, .SUBSCRIBE, .UNSUBSCRIBE => .{ .qos = .at_least_once },
};
}
@@ -89,7 +89,7 @@ pub fn parse(header: FixedHeader, payload: []const u8) !Parsed {
var r = fbsr.any();
switch (header.kind) {
.CONNECT => return .{ .CONNECT = try Connect.parse(&r) },
.CONNACK => return .{ .CONNACK = try Connect.Ack.parse(payload) },
.CONNACK => return .{ .CONNACK = try Connect.Ack.parse(&r) },
.PUBLISH => return .{ .PUBLISH = try Publish.parse(payload, header.flags) },
.PUBACK => return .{ .PUBACK = try Publish.Ack.parse(&r) },
.SUBSCRIBE => return .{ .SUBSCRIBE = try Subscribe.parse(&r) },