more support for connack

@@ -9,12 +9,19 @@ writer:,
drop: usize = 0,
last_tx: usize = 0,
heartbeat_interval: u16,
srv_topic_aliases: ?[]Alias = null,
cli_topic_aliases: ?[]Alias = null,
pub const Client = @This();
pub const Poller =;
pub const PEnum = enum { srv };
pub const Alias = struct {
num: u16,
alias: []const u8,
pub const Options = struct {
heartbeat_interval: u16 = 3600,
const pkt = try c.recv();
switch (pkt) {
log.err("connack {any}", .{pkt});
// handle connack, and record settings
return true;
pub const Ack = struct {
pub const Properties = enum(u8) {
//This is the length of the Properties in the CONNACK packet Variable
//Header encoded as a Variable Byte Integer.
reason: Reason,
srv_opts: SrvOptions,
pub const SrvOptions = struct {
topic_alias_max: u16 = 0,
recv_max: ?u16 = null,
wildcards: bool = true,
pub const Property = enum(u8) {
expiry_interval = 17,
//Followed by the Four Byte Integer representing the Session Expiry
//Interval in seconds. It is a Protocol Error to include the Session
//description of the use of Session Expiry Interval.
receive_maximum = 33,
//Followed by the Two Byte Integer representing the Receive Maximum
//value. It is a Protocol Error to include the Receive Maximum value
//more than once or for it to have the value 0. The Server uses this
//value to limit the number of QoS 1 and QoS 2 publications that it is
//willing to process concurrently for the Client. It does not provide a
//mechanism to limit the QoS 0 publications that the Client might try to
//send. If the Receive Maximum value is absent, then its value defaults
//to 65,535. Refer to section 4.9 Flow Control for details of how the
//Receive Maximum is used.
//It is a Protocol Error to include the Receive Maximum value more than
//once or for it to have the value 0.
maximum_qos = 36,
// (0x24) Byte, Identifier of the Maximum QoS. Followed by a Byte with a
// used by any other Session currently in the Server [MQTT-3.2.2-16].
topic_alias_maximum = 34,
// (0x22) Byte, Identifier of the Topic Alias Maximum. Followed by the
// Two Byte Integer representing the Topic Alias Maximum value. It is a
// Protocol Error to include the Topic Alias Maximum value more than
// once. If the Topic Alias Maximum property is absent, the default
// value is 0. This value indicates the highest value that the Server
// will accept as a Topic Alias sent by the Client. The Server uses this
// value to limit the number of Topic Aliases that it is willing to hold
// on this Connection. The Client MUST NOT send a Topic Alias in a
// PUBLISH packet to the Server greater than this value [MQTT-3.2.2-17].
// A value of 0 indicates that the Server does not accept any Topic
// Aliases on this connection. If Topic Alias Maximum is absent or 0,
// the Client MUST NOT send any Topic Aliases on to the Server
// [MQTT-3.2.2-18].
reason_string = 31,
// (0x1F) Byte Identifier of the Reason String. Followed by the UTF-8
// exchanged authentication data. It is a Protocol Error to include the
// Authentication Data more than once. Refer to section 4.12 for more
// information about extended authentication.
pub fn parse(r: *AnyReader, len: usize) !SrvOptions {
var remain = len;
var opts: SrvOptions = .{};
while (remain > 0) {
remain -|= 1;
const prop = intToEnum(Property, try r.readByte()) catch return error.InvalidProperty;
switch (prop) {
.topic_alias_maximum => {
remain -|= 2;
opts.topic_alias_max = try r.readInt(u16, .big);
.receive_maximum => {
remain -|= 2;
opts.recv_max = try r.readInt(u16, .big);
else => |pk| {
log.err("Not Implemented conn ack prop {s}", .{@tagName(pk)});
return error.NotSupported;
return opts;
pub fn parse(pkt: []const u8) !Ack {
log.err("Connect Ack not implemented", .{});
log.err("Connect Ack ({}) {any}", .{ pkt.len, pkt });
return .{};
pub const Flags = packed struct(u8) {
session_exists: bool,
reserved: u7,
pub const Reason = enum(u8) {
success = 0,
//The Connection is accepted.
unspecified_error = 128,
//The Server does not wish to reveal the reason for the failure, or none
//of the other Reason Codes apply.
malformed_packet = 129,
//Data within the CONNECT packet could not be correctly parsed.
protocol_error = 130,
//Data in the CONNECT packet does not conform to this specification.
implementation_specific_error = 131,
//The CONNECT is valid but is not accepted by this Server.
unsupported_protocol_version = 132,
//The Server does not support the version of the MQTT protocol requested
//by the Client.
client_identifier_not_valid = 133,
//The Client Identifier is a valid string but is not allowed by the
bad_user_name_or_password = 134,
//The Server does not accept the User Name or Password specified by the
not_authorized = 135,
//The Client is not authorized to connect.
server_unavailable = 136,
//The MQTT Server is not available.
server_busy = 137,
//The Server is busy. Try again later.
banned = 138,
//This Client has been banned by administrative action. Contact the
//server administrator.
bad_authentication_method = 140,
//The authentication method is not supported or does not match the
//authentication method currently in use.
topic_name_invalid = 144,
//The Will Topic Name is not malformed, but is not accepted by this
packet_too_large = 149,
//The CONNECT packet exceeded the maximum permissible size.
quota_exceeded = 151,
//An implementation or administrative imposed limit has been exceeded.
payload_format_invalid = 153,
//The Will Payload does not match the specified Payload Format
retain_not_supported = 154,
//The Server does not support retained messages, and Will Retain was set
//to 1.
qos_not_supported = 155,
//The Server does not support the QoS set in Will QoS.
use_another_server = 156,
//The Client should temporarily use another server.
server_moved = 157,
//The Client should permanently use another server.
connection_rate_exceeded = 159,
//The connection rate limit has been exceeded.
pub fn parse(r: *AnyReader) !Ack {
const flags: Ack.Flags = @bitCast(try r.readByte());
if (flags.reserved != 0) return error.InvalidPacket; // TODO determine if
// strictly following the spec is better or if RFC 761 2.10 should take priority
const reason = intToEnum(Reason, try r.readByte()) catch return error.InvalidPacket;
if (reason != .success) {
log.err("Connect Ack error {}", .{reason});
return error.ConnectionRejected;
const proplen = try Packet.unpackVarInt(r);
const opts = try Property.parse(r, proplen);
return .{
.reason = reason,
.srv_opts = opts,
test "parse" {
const pkt = [_]u8{ 0, 0, 6, 34, 0, 10, 33, 0, 20 };
var fbs =;
var fbr = fbs.reader();
var r = fbr.any();
const a = try Ack.parse(&r);
try std.testing.expectEqualDeep(a, Ack{
.reason = .success,
.srv_opts = .{ .topic_alias_max = 10, .recv_max = 20 },
const log = std.log.scoped(.mqtt);
const AnyWriter =;
const AnyReader =;
const intToEnum = std.meta.intToEnum;
pub fn flags(cpt: ControlType) !Flags {
return switch (cpt) {
.reserved => error.InvalidCPT,
.PUBREL, .SUBSCRIBE, .UNSUBSCRIBE => .{ .qos = .at_least_once },
switch (header.kind) {
.CONNECT => return .{ .CONNECT = try Connect.parse(&r) },
.CONNACK => return .{ .CONNACK = try Connect.Ack.parse(payload) },
.CONNACK => return .{ .CONNACK = try Connect.Ack.parse(&r) },
.PUBLISH => return .{ .PUBLISH = try Publish.parse(payload, header.flags) },
.PUBACK => return .{ .PUBACK = try Publish.Ack.parse(&r) },
.SUBSCRIBE => return .{ .SUBSCRIBE = try Subscribe.parse(&r) },