srctree

Gregory Mullen parent 857935c3 7efec8b2
turn mqtt into a lib

build.zig added: 695, removed: 283, total 412
@@ -5,17 +5,8 @@ pub fn build(b: *std.Build) void {
 
const optimize = b.standardOptimizeOption(.{});
 
const lib = b.addStaticLibrary(.{
.name = "zig^2bee",
.root_source_file = b.path("src/root.zig"),
.target = target,
.optimize = optimize,
});
 
b.installArtifact(lib);
 
const exe = b.addExecutable(.{
.name = "zig^2bee",
.name = "mqtt",
.root_source_file = b.path("src/main.zig"),
.target = target,
.optimize = optimize,
@@ -34,14 +25,6 @@ pub fn build(b: *std.Build) void {
const run_step = b.step("run", "Run the app");
run_step.dependOn(&run_cmd.step);
 
const lib_unit_tests = b.addTest(.{
.root_source_file = b.path("src/root.zig"),
.target = target,
.optimize = optimize,
});
 
const run_lib_unit_tests = b.addRunArtifact(lib_unit_tests);
 
const exe_unit_tests = b.addTest(.{
.root_source_file = b.path("src/main.zig"),
.target = target,
@@ -51,6 +34,5 @@ pub fn build(b: *std.Build) void {
const run_exe_unit_tests = b.addRunArtifact(exe_unit_tests);
 
const test_step = b.step("test", "Run unit tests");
test_step.dependOn(&run_lib_unit_tests.step);
test_step.dependOn(&run_exe_unit_tests.step);
}
 
filename was Deleted added: 695, removed: 283, total 412
@@ -0,0 +1,295 @@
client_id: ?[]const u8 = null,
flags: Flags = .{},
keep_alive: KeepAlive = .{},
 
const Connect = @This();
 
pub const MQTT_VERSION = 5;
 
pub const Flags = packed struct(u8) {
reserved: bool = false,
// 3.1.2.4 requires this be 0
clean_start: bool = true,
will_flag: bool = false,
will_qos: u2 = 0,
will_retain: bool = false,
password: bool = false,
username: bool = false,
};
 
pub const KeepAlive = packed struct(u16) {
seconds: u16 = 600,
};
 
pub fn send(c: Connect, any: *AnyWriter) !void {
const props = [_]u8{ 0x11, 0x00, 0x00, 0x00, 0x03 };
const client_id: []const u8 = c.client_id orelse
"generic_mqtt_client";
 
var buffer: [0x80]u8 = undefined;
var fbs =
std.io.fixedBufferStream(&buffer);
var w = fbs.writer();
 
try w.writeInt(u16, 4, .big);
try w.writeAll("MQTT");
try w.writeByte(MQTT_VERSION);
try w.writeByte(@bitCast(c.flags));
try w.writeInt(u16, @bitCast(c.keep_alive), .big);
try w.writeByte(@intCast(props.len));
try w.writeAll(&props);
try w.writeInt(u16, @intCast(client_id.len), .big);
try w.writeAll(client_id);
 
const pkt: Packet = .{ .header = .{ .kind = .CONNECT }, .body = fbs.getWritten() };
 
log.warn("writing connect packet", .{});
try pkt.send(any);
}
 
pub const Ack = struct {
pub const Properties = enum(u8) {
//This is the length of the Properties in the CONNACK packet Variable
//Header encoded as a Variable Byte Integer.
 
expiry_interval = 17,
//Followed by the Four Byte Integer representing the Session Expiry
//Interval in seconds. It is a Protocol Error to include the Session
//Expiry Interval more than once. If the Session Expiry Interval is
//absent the value in the CONNECT Packet used. The server uses this
//property to inform the Client that it is using a value other than that
//sent by the Client in the CONNACK. Refer to section 3.1.2.11.2 for a
//description of the use of Session Expiry Interval.
 
receive_maximum = 33,
//Followed by the Two Byte Integer representing the Receive Maximum
//value. It is a Protocol Error to include the Receive Maximum value
//more than once or for it to have the value 0. The Server uses this
//value to limit the number of QoS 1 and QoS 2 publications that it is
//willing to process concurrently for the Client. It does not provide a
//mechanism to limit the QoS 0 publications that the Client might try to
//send. If the Receive Maximum value is absent, then its value defaults
//to 65,535. Refer to section 4.9 Flow Control for details of how the
//Receive Maximum is used.
 
maximum_qos = 36,
// (0x24) Byte, Identifier of the Maximum QoS. Followed by a Byte with a
// value of either 0 or 1. It is a Protocol Error to include Maximum QoS
// more than once, or to have a value other than 0 or 1. If the Maximum
// QoS is absent, the Client uses a Maximum QoS of 2. If a Server does
// not support QoS 1 or QoS 2 PUBLISH packets it MUST send a Maximum QoS
// in the CONNACK packet specifying the highest QoS it supports
// [MQTT-3.2.2-9]. A Server that does not support QoS 1 or QoS 2 PUBLISH
// packets MUST still accept SUBSCRIBE packets containing a Requested
// QoS of 0, 1 or 2 [MQTT-3.2.2-10]. If a Client receives a Maximum QoS
// from a Server, it MUST NOT send PUBLISH packets at a QoS level
// exceeding the Maximum QoS level specified [MQTT-3.2.2-11]. It is a
// Protocol Error if the Server receives a PUBLISH packet with a QoS
// greater than the Maximum QoS it specified. In this case use
// DISCONNECT with Reason Code 0x9B (QoS not supported) as described in
// section 4.13 Handling errors. If a Server receives a CONNECT packet
// containing a Will QoS that exceeds its capabilities, it MUST reject
// the connection. It SHOULD use a CONNACK packet with Reason Code 0x9B
// (QoS not supported) as described in section 4.13 Handling errors, and
// MUST close the Network Connection [MQTT-3.2.2-12]. Non-normative
// comment A Client does not need to support QoS 1 or QoS 2 PUBLISH
// packets. If this is the case, the Client simply restricts the maximum
// QoS field in any SUBSCRIBE commands it sends to a value it can
// support.
 
retain_available = 37,
// (0x25) Byte, Identifier of Retain Available. Followed by a Byte
// field. If present, this byte declares whether the Server supports
// retained messages. A value of 0 means that retained messages are not
// supported. A value of 1 means retained messages are supported. If not
// present, then retained messages are supported. It is a Protocol Error
// to include Retain Available more than once or to use a value other
// than 0 or 1. If a Server receives a CONNECT packet containing a Will
// Message with the Will Retain set to 1, and it does not support
// retained messages, the Server MUST reject the connection request. It
// SHOULD send CONNACK with Reason Code 0x9A (Retain not supported) and
// then it MUST close the Network Connection [MQTT-3.2.2-13]. A Client
// receiving Retain Available set to 0 from the Server MUST NOT send a
// PUBLISH packet with the RETAIN flag set to 1 [MQTT-3.2.2-14]. If the
// Server receives such a packet, this is a Protocol Error. The Server
// SHOULD send a DISCONNECT with Reason Code of 0x9A (Retain not
// supported) as described in section 4.13.
 
maximum_packet_size = 39,
// (0x27) Byte, Identifier of the Maximum Packet Size. Followed by a
// Four Byte Integer representing the Maximum Packet Size the Server is
// willing to accept. If the Maximum Packet Size is not present, there
// is no limit on the packet size imposed beyond the limitations in the
// protocol as a result of the remaining length encoding and the
// protocol header sizes. It is a Protocol Error to include the Maximum
// Packet Size more than once, or for the value to be set to zero. The
// packet size is the total number of bytes in an MQTT Control Packet,
// as defined in section 2.1.4. The Server uses the Maximum Packet Size
// to inform the Client that it will not process packets whose size
// exceeds this limit. The Client MUST NOT send packets exceeding
// Maximum Packet Size to the Server [MQTT-3.2.2-15]. If a Server
// receives a packet whose size exceeds this limit, this is a Protocol
// Error, the Server uses DISCONNECT with Reason Code 0x95 (Packet too
// large), as described in section 4.13.
 
assigned_client_identifier = 18,
// (0x12) Byte, Identifier of the Assigned Client Identifier. Followed
// by the UTF-8 string which is the Assigned Client Identifier. It is a
// Protocol Error to include the Assigned Client Identifier more than
// once. The Client Identifier which was assigned by the Server because
// a zero length Client Identifier was found in the CONNECT packet. If
// the Client connects using a zero length Client Identifier, the Server
// MUST respond with a CONNACK containing an Assigned Client Identifier.
// The Assigned Client Identifier MUST be a new Client Identifier not
// used by any other Session currently in the Server [MQTT-3.2.2-16].
 
topic_alias_maximum = 34,
// (0x22) Byte, Identifier of the Topic Alias Maximum. Followed by the
// Two Byte Integer representing the Topic Alias Maximum value. It is a
// Protocol Error to include the Topic Alias Maximum value more than
// once. If the Topic Alias Maximum property is absent, the default
// value is 0. This value indicates the highest value that the Server
// will accept as a Topic Alias sent by the Client. The Server uses this
// value to limit the number of Topic Aliases that it is willing to hold
// on this Connection. The Client MUST NOT send a Topic Alias in a
// PUBLISH packet to the Server greater than this value [MQTT-3.2.2-17].
// A value of 0 indicates that the Server does not accept any Topic
// Aliases on this connection. If Topic Alias Maximum is absent or 0,
// the Client MUST NOT send any Topic Aliases on to the Server
// [MQTT-3.2.2-18].
 
reason_string = 31,
// (0x1F) Byte Identifier of the Reason String. Followed by the UTF-8
// Encoded String representing the reason associated with this response.
// This Reason String is a human readable string designed for
// diagnostics and SHOULD NOT be parsed by the Client. The Server uses
// this value to give additional information to the Client. The Server
// MUST NOT send this property if it would increase the size of the
// CONNACK packet beyond the Maximum Packet Size specified by the Client
// [MQTT-3.2.2-19]. It is a Protocol Error to include the Reason String
// more than once. Non-normative comment Proper uses for the reason
// string in the Client would include using this information in an
// exception thrown by the Client code, or writing this string to a log.
 
user_property = 38,
// (0x26) Byte, Identifier of User Property. Followed by a UTF-8 String
// Pair. This property can be used to provide additional information to
// the Client including diagnostic information. The Server MUST NOT send
// this property if it would increase the size of the CONNACK packet
// beyond the Maximum Packet Size specified by the Client
// [MQTT-3.2.2-20]. The User Property is allowed to appear multiple
// times to represent multiple name, value pairs. The same name is
// allowed to appear more than once. The content and meaning of this
// property is not defined by this specification. The receiver of a
// CONNACK containing this property MAY ignore it.
 
wildcard_subscription_available = 40,
// (0x28) Byte, Identifier of Wildcard Subscription Available. Followed
// by a Byte field. If present, this byte declares whether the Server
// supports Wildcard Subscriptions. A value is 0 means that Wildcard
// Subscriptions are not supported. A value of 1 means Wildcard
// Subscriptions are supported. If not present, then Wildcard
// Subscriptions are supported. It is a Protocol Error to include the
// Wildcard Subscription Available more than once or to send a value
// other than 0 or 1. If the Server receives a SUBSCRIBE packet
// containing a Wildcard Subscription and it does not support Wildcard
// Subscriptions, this is a Protocol Error. The Server uses DISCONNECT
// with Reason Code 0xA2 (Wildcard Subscriptions not supported) as
// described in section 4.13. If a Server supports Wildcard
// Subscriptions, it can still reject a particular subscribe request
// containing a Wildcard Subscription. In this case the Server MAY send
// a SUBACK Control Packet with a Reason Code 0xA2 (Wildcard
// Subscriptions not supported).
 
subscription_identifiers_available = 41,
// (0x29) Byte, Identifier of Subscription Identifier Available.
// Followed by a Byte field. If present, this byte declares whether the
// Server supports Subscription Identifiers. A value is 0 means that
// Subscription Identifiers are not supported. A value of 1 means
// Subscription Identifiers are supported. If not present, then
// Subscription Identifiers are supported. It is a Protocol Error to
// include the Subscription Identifier Available more than once, or to
// send a value other than 0 or 1. If the Server receives a SUBSCRIBE
// packet containing Subscription Identifier and it does not support
// Subscription Identifiers, this is a Protocol Error. The Server uses
// DISCONNECT with Reason Code of 0xA1 (Subscription Identifiers not
// supported) as described in section 4.13.
 
shared_subscription_available = 42,
// (0x2A) Byte, Identifier of Shared Subscription Available. Followed by
// a Byte field. If present, this byte declares whether the Server
// supports Shared Subscriptions. A value is 0 means that Shared
// Subscriptions are not supported. A value of 1 means Shared
// Subscriptions are supported. If not present, then Shared
// Subscriptions are supported. It is a Protocol Error to include the
// Shared Subscription Available more than once or to send a value other
// than 0 or 1. If the Server receives a SUBSCRIBE packet containing
// Shared Subscriptions and it does not support Shared Subscriptions,
// this is a Protocol Error. The Server uses DISCONNECT with Reason Code
// 0x9E (Shared Subscriptions not supported) as described in section
// 4.13.
 
server_keep_alive = 19,
// (0x13) Byte, Identifier of the Server Keep Alive. Followed by a Two
// Byte Integer with the Keep Alive time assigned by the Server. If the
// Server sends a Server Keep Alive on the CONNACK packet, the Client
// MUST use this value instead of the Keep Alive value the Client sent
// on CONNECT [MQTT-3.2.2-21]. If the Server does not send the Server
// Keep Alive, the Server MUST use the Keep Alive value set by the
// Client on CONNECT [MQTT-3.2.2-22]. It is a Protocol Error to include
// the Server Keep Alive more than once. Non-normative comment The
// primary use of the Server Keep Alive is for the Server to inform the
// Client that it will disconnect the Client for inactivity sooner than
// the Keep Alive specified by the Client.
 
response_information = 26,
// (0x1A) Byte, Identifier of the Response Information. Followed by a
// UTF-8 Encoded String which is used as the basis for creating a
// Response Topic. The way in which the Client creates a Response Topic
// from the Response Information is not defined by this specification.
// It is a Protocol Error to include the Response Information more than
// once. If the Client sends a Request Response Information with a value
// 1, it is OPTIONAL for the Server to send the Response Information in
// the CONNACK. Non-normative comment A common use of this is to pass a
// globally unique portion of the topic tree which is reserved for this
// Client for at least the lifetime of its Session. This often cannot
// just be a random name as both the requesting Client and the
// responding Client need to be authorized to use it. It is normal to
// use this as the root of a topic tree for a particular Client. For the
// Server to return this information, it normally needs to be correctly
// configured. Using this mechanism allows this configuration to be done
// once in the Server rather than in each Client. Refer to section 4.10
// for more information about Request / Response.
 
server_reference = 28,
// (0x1C) Byte, Identifier of the Server Reference. Followed by a UTF-8
// Encoded String which can be used by the Client to identify another
// Server to use. It is a Protocol Error to include the Server Reference
// more than once. The Server uses a Server Reference in either a
// CONNACK or DISCONNECT packet with Reason code of 0x9C (Use another
// server) or Reason Code 0x9D (Server moved) as described in section
// 4.13. Refer to section 4.11 Server redirection for information about
// how Server Reference is used.
 
authentication_method = 21,
// (0x15) Byte, Identifier of the Authentication Method. Followed by a
// UTF-8 Encoded String containing the name of the authentication
// method. It is a Protocol Error to include the Authentication Method
// more than once. Refer to section 4.12 for more information about
// extended authentication.
 
authentication_data = 22,
// (0x16) Byte, Identifier of the Authentication Data. Followed by
// Binary Data containing authentication data. The contents of this data
// are defined by the authentication method and the state of already
// exchanged authentication data. It is a Protocol Error to include the
// Authentication Data more than once. Refer to section 4.12 for more
// information about extended authentication.
};
};
 
const Packet = @import("Packet.zig");
 
const std = @import("std");
const log = std.log.scoped(.mqtt);
const AnyWriter = std.io.AnyWriter;
 
filename was Deleted added: 695, removed: 283, total 412
@@ -0,0 +1,121 @@
//! Builds and sends a packet. The MQTT v5 defined layout is always
//! FixedHeader u8 | CODE | Flags |
//! VarLen Int u8-u32 | u1 cont & u7 | ... up to 3 additional bytes
//! payload | remaining bytes |
//!
 
header: FixedHeader,
body: []const u8,
 
const Packet = @This();
 
pub const FixedHeader = packed struct(u8) {
flags: ControlType.Flags = .{},
kind: ControlType,
};
 
pub const ControlType = enum(u4) {
reserved = 0,
CONNECT = 1,
CONNACK = 2,
PUBLISH = 3,
PUBACK = 4,
PUBREC = 5,
PUBREL = 6,
PUBCOMP = 7,
SUBSCRIBE = 8,
SUBACK = 9,
UNSUBSCRIBE = 10,
UNSUBACK = 11,
PINGREQ = 12,
PINGRESP = 13,
DISCONNECT = 14,
AUTH = 15,
 
const Flags = packed struct(u4) {
retain: bool = false,
qos: QOS = .at_most_once,
dup: bool = false,
};
/// MQTT 5.0 -- 2.1.3
pub fn flags(cpt: ControlType) !Flags {
return switch (cpt) {
.reserved => error.InvalidCPT,
.CONNECT, .CONNACK, .PUBLISH, .PUBACK, .PUBREC, .PUBCOMP, .SUBACK => .{},
.UNSUBACK, .PINGREQ, .PINGRESP, .DISCONNECT, .AUTH => .{},
.PUBREL, .SUBSCRIBE, .UNSUBSCRIBE => .{ .qos = .at_least_once },
};
}
};
 
pub const QOS = enum(u2) {
at_most_once = 0,
at_least_once = 1,
exactly_once = 2,
invalid = 3,
};
 
pub fn send(p: Packet, any: *AnyWriter) !void {
try any.writeByte(@bitCast(p.header));
_ = try writeVarInt(p.body.len, any);
try any.writeAll(p.body);
log.err("debug: {s}", .{p.body});
log.err("debug: {any}", .{p.body});
}
 
pub fn writeVarInt(requested: usize, any: *AnyWriter) !usize {
var written: usize = 0;
var len = requested;
if (len > 0xffffff7f) return error.PayloadTooLarge;
while (len > 0) {
const byte: u8 = @truncate(len & 0x7f);
len >>= 7;
try any.writeByte(byte | if (len > 0) 0x80 else @as(u8, 0x00));
written += 1;
}
return written;
}
 
pub fn unpackVarInt(any: *AnyReader) !usize {
var current: u8 = try any.readByte();
var result: usize = current & 127;
var mult: usize = 128;
while (current > 127) {
current = try any.readByte();
result += @as(usize, (current & 127)) * mult;
mult *= 128;
if (mult > 128 * 128 * 128) return error.InvalidIntSize;
}
 
return result;
}
 
test unpackVarInt {
var buffer = [_]u8{ 0, 0, 0, 0 };
var fbs = std.io.fixedBufferStream(&buffer);
var r = fbs.reader().any();
 
var result = unpackVarInt(&r);
try std.testing.expectEqual(fbs.pos, 1);
try std.testing.expectEqual(result, 0);
fbs.reset();
buffer = [4]u8{ 127, 0, 0, 0 };
result = unpackVarInt(&r);
try std.testing.expectEqual(fbs.pos, 1);
try std.testing.expectEqual(result, 127);
fbs.reset();
buffer = [4]u8{ 128, 1, 0, 0 };
result = unpackVarInt(&r);
try std.testing.expectEqual(fbs.pos, 2);
try std.testing.expectEqual(result, 128);
fbs.reset();
buffer = [4]u8{ 129, 1, 0, 0 };
result = unpackVarInt(&r);
try std.testing.expectEqual(fbs.pos, 2);
try std.testing.expectEqual(result, 129);
}
 
const std = @import("std");
const log = std.log.scoped(.mqtt);
const AnyWriter = std.io.AnyWriter;
const AnyReader = std.io.AnyReader;
 
filename was Deleted added: 695, removed: 283, total 412
@@ -0,0 +1,207 @@
topic_name: []const u8,
//The Packet Identifier field is only present in PUBLISH packets where the
//QoS level is 1 or 2. Section 2.2.1 provides more information about Packet
//Identifiers.
packet_ident: ?u16,
//The length of the Properties in the PUBLISH packet Variable Header encoded as a Variable Byte Integer.
properties: []const u8,
payload: []const u8,
 
const Publish = @This();
 
pub const Properties = enum(u8) {
payload_format = 1,
// 3.3.2.3.2 Payload Format Indicator 1 (0x01) Byte, Identifier of the
// Payload Format Indicator. Followed by the value of the Payload Forma t
// Indicator, either of: · 0 (0x00) Byte Indicates that the
// Payload is unspecified bytes, which is equivalent to not sending a
// Payload Format Indicator. · 1 (0x01) Byte Indicates that the
// Payload is UTF-8 Encoded Character Data. The UTF-8 data in the Payload
// MUST be well-formed UTF-8 as defined by the Unicode specification
// [Unicode] and restated in RFC 3629 [RFC3629]. A Server MUST send the
// Payload Format Indicator unaltered to all subscribers receiving the
// Application Message [MQTT-3.3.2-4]. The receiver MAY validate that the
// Payload is of the format indicated, and if it is not send a PUBACK,
// PUBREC, or DISCONNECT with Reason Code of 0x99 (Payload format
// invalid) as described in section 4.13. Refer to section 5.4.9 for
// information about security issues in validating the payload format.
 
msg_expire = 2,
// 3.3.2.3.3 Message Expiry Interval` 2 (0x02) Byte, Identifier of the
// Message Expiry Interval. Followed by the Four Byte Integer representing
// the Message Expiry Interval. If present, the Four Byte value is the
// lifetime of the Application Message in seconds. If the Message Expiry
// Interval has passed and the Server has not managed to start onward
// delivery to a matching subscriber, then it MUST delete the copy of the
// message for that subscriber [MQTT-3.3.2-5]. If absent, the Application
// Message does not expire. The PUBLISH packet sent to a Client by the Server
// MUST contain a Message Expiry Interval set to the received value minus the
// time that the Application Message has been waiting in the Server
// [MQTT-3.3.2-6]. Refer to section 4.1 for details and limitations of stored
// state.
 
topic_alias = 35,
// 3.3.2.3.4 Topic Alias 35 (0x23) Byte, Identifier of the Topic Alias.
// Followed by the Two Byte integer representing the Topic Alias value. It is
// a Protocol Error to include the Topic Alias value more than once. A Topic
// Alias is an integer value that is used to identify the Topic instead of
// using the Topic Name. This reduces the size of the PUBLISH packet, and is
// useful when the Topic Names are long and the same Topic Names are used
// repetitively within a Network Connection. The sender decides whether to
// use a Topic Alias and chooses the value. It sets a Topic Alias mapping by
// including a non-zero length Topic Name and a Topic Alias in the PUBLISH
// packet. The receiver processes the PUBLISH as normal but also sets the
// specified Topic Alias mapping to this Topic Name. If a Topic Alias mapping
// has been set at the receiver, a sender can send a PUBLISH packet that
// contains that Topic Alias and a zero length Topic Name. The receiver then
// treats the incoming PUBLISH as if it had contained the Topic Name of the
// Topic Alias. A sender can modify the Topic Alias mapping by sending
// another PUBLISH in the same Network Connection with the same Topic Alias
// value and a different non-zero length Topic Name. Topic Alias mappings
// exist only within a Network Connection and last only for the lifetime of
// that Network Connection. A receiver MUST NOT carry forward any Topic Alias
// mappings from one Network Connection to another [MQTT-3.3.2-7]. A Topic
// Alias of 0 is not permitted. A sender MUST NOT send a PUBLISH packet
// containing a Topic Alias which has the value 0 [MQTT-3.3.2-8]. A Client
// MUST NOT send a PUBLISH packet with a Topic Alias greater than the Topic
// Alias Maximum value returned by the Server in the CONNACK packet
// [MQTT-3.3.2-9]. A Client MUST accept all Topic Alias values greater than 0
// and less than or equal to the Topic Alias Maximum value that it sent in
// the CONNECT packet [MQTT-3.3.2-10]. A Server MUST NOT send a PUBLISH
// packet with a Topic Alias greater than the Topic Alias Maximum value sent
// by the Client in the CONNECT packet [MQTT-3.3.2-11]. A Server MUST accept
// all Topic Alias values greater than 0 and less than or equal to the Topic
// Alias Maximum value that it returned in the CONNACK packet
// [MQTT-3.3.2-12]. The Topic Alias mappings used by the Client and Server
// are independent from each other. Thus, when a Client sends a PUBLISH
// containing a Topic Alias value of 1 to a Server and the Server sends a
// PUBLISH with a Topic Alias value of 1 to that Client they will in general
// be referring to different Topics.
 
response_topic = 8,
// 3.3.2.3.5 Response Topic 8 (0x08) Byte, Identifier of the Response Topic.
// Followed by a UTF-8 Encoded String which is used as the Topic Name for a
// response message. The Response Topic MUST be a UTF-8 Encoded String as
// defined in section 1.5.4 [MQTT-3.3.2-13]. The Response Topic MUST NOT
// contain wildcard characters [MQTT-3.3.2-14]. It is a Protocol Error to
// include the Response Topic more than once. The presence of a Response
// Topic identifies the Message as a Request. Refer to section 4.10 for more
// information about Request / Response. The Server MUST send the Response
// Topic unaltered to all subscribers receiving the Application Message
// [MQTT-3.3.2-15]. Non-normative comment: The receiver of an Application
// Message with a Response Topic sends a response by using the Response Topic
// as the Topic Name of a PUBLISH. If the Request Message contains a
// Correlation Data, the receiver of the Request Message should also include
// this Correlation Data as a property in the PUBLISH packet of the Response
// Message.
 
correlation_data = 9,
// 3.3.2.3.6 Correlation Data 9 (0x09) Byte, Identifier of the Correlation
// Data. Followed by Binary Data. The Correlation Data is used by the sender
// of the Request Message to identify which request the Response Message is
// for when it is received. It is a Protocol Error to include Correlation
// Data more than once. If the Correlation Data is not present, the Requester
// does not require any correlation data. The Server MUST send the
// Correlation Data unaltered to all subscribers receiving the Application
// Message [MQTT-3.3.2-16]. The value of the Correlation Data only has
// meaning to the sender of the Request Message and receiver of the Response
// Message. Non-normative comment The receiver of an Application Message
// which contains both a Response Topic and a Correlation Data sends a
// response by using the Response Topic as the Topic Name of a PUBLISH. The
// Client should also send the Correlation Data unaltered as part of the
// PUBLISH of the responses. Non-normative comment If the Correlation Data
// contains information which can cause application failures if modified by
// the Client responding to the request, it should be encrypted and/or hashed
// to allow any alteration to be detected. Refer to section 4.10 for more
// information about Request / Response
 
user_property = 38,
// 3.3.2.3.7 User Property 38 (0x26) Byte, Identifier of the User Property.
// Followed by a UTF-8 String Pair. The User Property is allowed to appear
// multiple times to represent multiple name, value pairs. The same name is
// allowed to appear more than once. The Server MUST send all User Properties
// unaltered in a PUBLISH packet when forwarding the Application Message to a
// Client [MQTT-3.3.2-17]. The Server MUST maintain the order of User
// Properties when forwarding the Application Message [MQTT-3.3.2-18].
// Non-normative comment This property is intended to provide a means of
// transferring application layer name-value tags whose meaning and
// interpretation are known only by the application programs responsible for
// sending and receiving them.
 
sub_ident = 11,
// 3.3.2.3.8 Subscription Identifier 11 (0x0B), Identifier of the
// Subscription Identifier. Followed by a Variable Byte Integer representing
// the identifier of the subscription. The Subscription Identifier can have
// the value of 1 to 268,435,455. It is a Protocol Error if the Subscription
// Identifier has a value of 0. Multiple Subscription Identifiers will be
// included if the publication is the result of a match to more than one
// subscription, in this case their order is not significant.
 
content_type = 3,
// 3.3.2.3.9 Content Type 3 (0x03) Identifier of the Content Type. Followed
// by a UTF-8 Encoded String describing the content of the Application
// Message. The Content Type MUST be a UTF-8 Encoded String as defined in
// section 1.5.4 [MQTT-3.3.2-19]. It is a Protocol Error to include the
// Content Type more than once. The value of the Content Type is defined by
// the sending and receiving application. A Server MUST send the Content Type
// unaltered to all subscribers receiving the Application Message
// [MQTT-3.3.2-20].
};
 
pub fn send(p: Publish, any: *AnyWriter) !void {
var buffer: [0x4000]u8 = undefined;
var fbs = std.io.fixedBufferStream(&buffer);
var w = fbs.writer();
 
try w.writeInt(u16, @intCast(p.topic_name.len), .big);
try w.writeAll(p.topic_name);
if (p.packet_ident) |pid| {
try w.writeInt(u16, pid, .big);
}
try w.writeByte(0); // properties not implemented
try w.writeAll(p.payload);
 
const pkt: Packet = .{
.header = .{
.kind = .PUBLISH,
.flags = .{ .qos = if (p.packet_ident != null) .at_least_once else .at_most_once },
},
.body = fbs.getWritten(),
};
 
try pkt.send(any);
}
 
pub const Ack = struct {
pub const Reason = enum(u8) {
success = 0,
no_match = 16,
error_nos = 128,
internal_error = 131,
not_authorized = 135,
topic_name_invalid = 144,
packet_id_in_use = 145,
over_quota = 151,
payload_format_invalid = 153,
_,
};
 
pub fn send(pkt_id: u16, code: Reason, any: *AnyWriter) !void {
var buffer: [0x4000]u8 = undefined;
var fbs = std.io.fixedBufferStream(&buffer);
var w = fbs.writer();
 
try w.writeInt(u16, pkt_id, .big);
try w.writeByte(@intFromEnum(code));
try w.writeByte(0); //property length;
 
const pkt: Packet = .{ .header = .{ .kind = .PUBACK }, .body = fbs.getWritten() };
try pkt.send(any);
}
};
 
const Packet = @import("Packet.zig");
 
const std = @import("std");
const log = std.log.scoped(.mqtt);
const AnyWriter = std.io.AnyWriter;
 
filename was Deleted added: 695, removed: 283, total 412
@@ -0,0 +1,29 @@
channels: []const []const u8,
 
const Subscribe = @This();
 
pub fn send(s: Subscribe, any: *AnyWriter) !void {
var buffer: [0x4000]u8 = undefined;
var fbs = std.io.fixedBufferStream(&buffer);
var w = fbs.writer();
 
log.err("writing subscribe packet", .{});
try w.writeInt(u16, 10, .big);
try w.writeByte(0); // No props
for (s.channels) |ch| {
try w.writeInt(u16, @intCast(ch.len), .big);
try w.writeAll(ch);
try w.writeByte(0x01); // options
}
const pkt: Packet = .{
.header = .{ .kind = .SUBSCRIBE, .flags = try Packet.ControlType.SUBSCRIBE.flags() },
.body = fbs.getWritten(),
};
try pkt.send(any);
}
 
const Packet = @import("Packet.zig");
 
const std = @import("std");
const log = std.log.scoped(.mqtt);
const AnyWriter = std.io.AnyWriter;
 
src/main.zig added: 695, removed: 283, total 412
@@ -2,210 +2,7 @@
//! https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html
//! At least the parts that make sense
 
const ControlType = enum(u4) {
reserved = 0,
CONNECT = 1,
CONNACK = 2,
PUBLISH = 3,
PUBACK = 4,
PUBREC = 5,
PUBREL = 6,
PUBCOMP = 7,
SUBSCRIBE = 8,
SUBACK = 9,
UNSUBSCRIBE = 10,
UNSUBACK = 11,
PINGREQ = 12,
PINGRESP = 13,
DISCONNECT = 14,
AUTH = 15,
 
const Flags = packed struct(u4) {
retain: bool = false,
qos: QOS = .at_most_once,
dup: bool = false,
};
/// MQTT 5.0 -- 2.1.3
pub fn flags(cpt: ControlType) !Flags {
return switch (cpt) {
.reserved => error.InvalidCPT,
.CONNECT, .CONNACK, .PUBLISH, .PUBACK, .PUBREC, .PUBCOMP, .SUBACK => .{},
.UNSUBACK, .PINGREQ, .PINGRESP, .DISCONNECT, .AUTH => .{},
.PUBREL, .SUBSCRIBE, .UNSUBSCRIBE => .{ .qos = .at_least_once },
};
}
};
 
const CPT = ControlType;
 
const FixedHeader = packed struct(u8) {
flags: ControlType.Flags = .{},
kind: ControlType,
};
 
const QOS = enum(u2) {
at_most_once = 0,
at_least_once = 1,
exactly_once = 2,
invalid = 3,
};
 
const Connect = struct {
client_id: ?[]const u8 = null,
flags: Flags = .{},
 
pub const Flags = packed struct(u8) {
reserved: bool = false, // 3.1.2.4 requires this be 0
clean_start: bool = true,
will_flag: bool = false,
will_qos: u2 = 0,
will_retain: bool = false,
password: bool = false,
username: bool = false,
};
 
const KeepAlive = packed struct(u16) {
seconds: u16 = 600,
};
pub fn send(c: Connect, w: *std.net.Stream.Writer) !void {
const props = [_]u8{ 0x11, 0x00, 0x00, 0x00, 0x03 };
const client_id: []const u8 = c.client_id orelse "generic_mqtt_client";
 
log.err("writing connect packet", .{});
try w.writeByte(@bitCast(FixedHeader{ .kind = .CONNECT }));
try w.writeByte(@intCast(10 + 1 + props.len + 2 + client_id.len));
try w.writeInt(u16, 4, .big);
try w.writeAll("MQTT");
try w.writeByte(0x05); // version
try w.writeByte(@bitCast(c.flags));
try w.writeInt(u16, @bitCast(KeepAlive{}), .big);
try w.writeByte(@intCast(props.len));
try w.writeAll(&props);
try w.writeInt(u16, @intCast(client_id.len), .big);
try w.writeAll(client_id);
}
};
 
const Publish = struct {
topic_name: []const u8,
//The Packet Identifier field is only present in PUBLISH packets where the
//QoS level is 1 or 2. Section 2.2.1 provides more information about Packet
//Identifiers.
packet_ident: ?u16,
//The length of the Properties in the PUBLISH packet Variable Header encoded as a Variable Byte Integer.
properties: []const u8,
 
pub const Properties = enum(u8) {
payload_format = 1,
// 3.3.2.3.2 Payload Format Indicator
//1 (0x01) Byte, Identifier of the Payload Format Indicator.
//Followed by the value of the Payload Forma t Indicator, either of:
//· 0 (0x00) Byte Indicates that the Payload is unspecified bytes, which is equivalent to not sending a Payload Format Indicator.
//· 1 (0x01) Byte Indicates that the Payload is UTF-8 Encoded Character Data. The UTF-8 data in the Payload MUST be well-formed UTF-8 as defined by the Unicode specification [Unicode] and restated in RFC 3629 [RFC3629].
//A Server MUST send the Payload Format Indicator unaltered to all subscribers receiving the Application Message [MQTT-3.3.2-4]. The receiver MAY validate that the Payload is of the format indicated, and if it is not send a PUBACK, PUBREC, or DISCONNECT with Reason Code of 0x99 (Payload format invalid) as described in section 4.13. Refer to section 5.4.9 for information about security issues in validating the payload format.
msg_expire = 2,
//3.3.2.3.3 Message Expiry Interval`
//2 (0x02) Byte, Identifier of the Message Expiry Interval.
//Followed by the Four Byte Integer representing the Message Expiry Interval.
//If present, the Four Byte value is the lifetime of the Application Message in seconds. If the Message Expiry Interval has passed and the Server has not managed to start onward delivery to a matching subscriber, then it MUST delete the copy of the message for that subscriber [MQTT-3.3.2-5].
//If absent, the Application Message does not expire.
//The PUBLISH packet sent to a Client by the Server MUST contain a Message Expiry Interval set to the received value minus the time that the Application Message has been waiting in the Server [MQTT-3.3.2-6]. Refer to section 4.1 for details and limitations of stored state.
topic_alias = 35,
//3.3.2.3.4 Topic Alias
//35 (0x23) Byte, Identifier of the Topic Alias.
//Followed by the Two Byte integer representing the Topic Alias value. It is a Protocol Error to include the Topic Alias value more than once.
//A Topic Alias is an integer value that is used to identify the Topic instead of using the Topic Name. This reduces the size of the PUBLISH packet, and is useful when the Topic Names are long and the same Topic Names are used repetitively within a Network Connection.
//The sender decides whether to use a Topic Alias and chooses the value. It sets a Topic Alias mapping by including a non-zero length Topic Name and a Topic Alias in the PUBLISH packet. The receiver processes the PUBLISH as normal but also sets the specified Topic Alias mapping to this Topic Name.
//If a Topic Alias mapping has been set at the receiver, a sender can send a PUBLISH packet that contains that Topic Alias and a zero length Topic Name. The receiver then treats the incoming PUBLISH as if it had contained the Topic Name of the Topic Alias.
//A sender can modify the Topic Alias mapping by sending another PUBLISH in the same Network Connection with the same Topic Alias value and a different non-zero length Topic Name.
//Topic Alias mappings exist only within a Network Connection and last only for the lifetime of that Network Connection. A receiver MUST NOT carry forward any Topic Alias mappings from one Network Connection to another [MQTT-3.3.2-7].
//A Topic Alias of 0 is not permitted. A sender MUST NOT send a PUBLISH packet containing a Topic Alias which has the value 0 [MQTT-3.3.2-8].
//A Client MUST NOT send a PUBLISH packet with a Topic Alias greater than the Topic Alias Maximum value returned by the Server in the CONNACK packet [MQTT-3.3.2-9]. A Client MUST accept all Topic Alias values greater than 0 and less than or equal to the Topic Alias Maximum value that it sent in the CONNECT packet [MQTT-3.3.2-10].
//A Server MUST NOT send a PUBLISH packet with a Topic Alias greater than the Topic Alias Maximum value sent by the Client in the CONNECT packet [MQTT-3.3.2-11]. A Server MUST accept all Topic Alias values greater than 0 and less than or equal to the Topic Alias Maximum value that it returned in the CONNACK packet [MQTT-3.3.2-12].
//The Topic Alias mappings used by the Client and Server are independent from each other. Thus, when a Client sends a PUBLISH containing a Topic Alias value of 1 to a Server and the Server sends a PUBLISH with a Topic Alias value of 1 to that Client they will in general be referring to different Topics.
response_topic = 8,
//3.3.2.3.5 Response Topic
//8 (0x08) Byte, Identifier of the Response Topic.
//Followed by a UTF-8 Encoded String which is used as the Topic Name for a response message. The Response Topic MUST be a UTF-8 Encoded String as defined in section 1.5.4 [MQTT-3.3.2-13]. The Response Topic MUST NOT contain wildcard characters [MQTT-3.3.2-14]. It is a Protocol Error to include the Response Topic more than once. The presence of a Response Topic identifies the Message as a Request.
//Refer to section 4.10 for more information about Request / Response.
//The Server MUST send the Response Topic unaltered to all subscribers receiving the Application Message [MQTT-3.3.2-15].
//Non-normative comment:
//The receiver of an Application Message with a Response Topic sends a response by using the Response Topic as the Topic Name of a PUBLISH. If the Request Message contains a Correlation Data, the receiver of the Request Message should also include this Correlation Data as a property in the PUBLISH packet of the Response Message.
correlation_data = 9,
//3.3.2.3.6 Correlation Data
//9 (0x09) Byte, Identifier of the Correlation Data.
//Followed by Binary Data. The Correlation Data is used by the sender of the Request Message to identify which request the Response Message is for when it is received. It is a Protocol Error to include Correlation Data more than once. If the Correlation Data is not present, the Requester does not require any correlation data.
//The Server MUST send the Correlation Data unaltered to all subscribers receiving the Application Message [MQTT-3.3.2-16]. The value of the Correlation Data only has meaning to the sender of the Request Message and receiver of the Response Message.
//Non-normative comment
//The receiver of an Application Message which contains both a Response Topic and a Correlation Data sends a response by using the Response Topic as the Topic Name of a PUBLISH. The Client should also send the Correlation Data unaltered as part of the PUBLISH of the responses.
//Non-normative comment
//If the Correlation Data contains information which can cause application failures if modified by the Client responding to the request, it should be encrypted and/or hashed to allow any alteration to be detected.
//Refer to section 4.10 for more information about Request / Response
user_property = 38,
//3.3.2.3.7 User Property
//38 (0x26) Byte, Identifier of the User Property.
//Followed by a UTF-8 String Pair. The User Property is allowed to appear multiple times to represent multiple name, value pairs. The same name is allowed to appear more than once.
//The Server MUST send all User Properties unaltered in a PUBLISH packet when forwarding the Application Message to a Client [MQTT-3.3.2-17]. The Server MUST maintain the order of User Properties when forwarding the Application Message [MQTT-3.3.2-18].
//Non-normative comment
//This property is intended to provide a means of transferring application layer name-value tags whose meaning and interpretation are known only by the application programs responsible for sending and receiving them.
sub_ident = 11,
//3.3.2.3.8 Subscription Identifier
//11 (0x0B), Identifier of the Subscription Identifier.
//Followed by a Variable Byte Integer representing the identifier of the subscription.
//The Subscription Identifier can have the value of 1 to 268,435,455. It is a Protocol Error if the Subscription Identifier has a value of 0. Multiple Subscription Identifiers will be included if the publication is the result of a match to more than one subscription, in this case their order is not significant.
content_type = 3,
//3.3.2.3.9 Content Type
//3 (0x03) Identifier of the Content Type.
//Followed by a UTF-8 Encoded String describing the content of the Application Message. The Content Type MUST be a UTF-8 Encoded String as defined in section 1.5.4 [MQTT-3.3.2-19].
//It is a Protocol Error to include the Content Type more than once. The value of the Content Type is defined by the sending and receiving application.
//A Server MUST send the Content Type unaltered to all subscribers receiving the Application Message [MQTT-3.3.2-20].
};
};
 
pub const Subscribe = struct {
channels: []const []const u8,
 
pub fn send(s: Subscribe, w: *std.net.Stream.Writer) !void {
log.err("writing subscribe packet", .{});
try w.writeByte(@bitCast(FixedHeader{ .kind = .SUBSCRIBE, .flags = try CPT.flags(.SUBSCRIBE) }));
var total: u8 = 0;
for (s.channels) |ch| {
total += @intCast(ch.len + 1);
}
try w.writeByte(@intCast(2 + 1 + 2 + total));
try w.writeInt(u16, 10, .big);
try w.writeByte(0); // No props
for (s.channels) |ch| {
try w.writeInt(u16, @intCast(ch.len), .big);
try w.writeAll(ch);
try w.writeByte(0x01); // options
}
}
};
 
const PublishAck = struct {
pub const Reason = enum(u8) {
success = 0,
no_match = 16,
error_nos = 128,
internal_error = 131,
not_authorized = 135,
topic_name_invalid = 144,
packet_id_in_use = 145,
over_quota = 151,
payload_format_invalid = 153,
_,
};
 
pub fn send(pkt_id: u16, code: PublishAck.Reason, w: *std.net.Stream.Writer) !void {
try w.writeByte(@bitCast(FixedHeader{ .kind = .PUBACK }));
try w.writeByte(4);
try w.writeInt(u16, pkt_id, .big);
try w.writeByte(@intFromEnum(code));
try w.writeByte(0); //property length;
log.err(" (PUBACK sent)", .{});
}
};
const mqtt = @import("mqtt.zig");
 
pub fn main() !void {
log.err("startup", .{});
@@ -216,8 +13,9 @@ pub fn main() !void {
return e;
};
var w = conn.writer();
var any = w.any();
 
try (Connect{}).send(&w);
try (mqtt.Connect{}).send(&any);
 
var poller = std.io.poll(
a,
@@ -226,18 +24,21 @@ pub fn main() !void {
);
 
var poll_more = try poller.poll();
while (poll_more) {
var fifo = poller.fifo(.srv);
var fifo = poller.fifo(.srv);
while (fifo.readableLength() > 0 or try poller.poll()) {
var ready = fifo.readableLength();
 
if (ready < 6) {
poll_more = try poller.poll();
continue;
}
 
log.err("", .{});
const pkt: FixedHeader = @bitCast(fifo.readItem() orelse unreachable);
var used: u3 = 0;
const reported = unpackVarInt(fifo.readableSliceOfLen(4), &used);
fifo.discard(used);
const pkt: mqtt.Packet.FixedHeader = @bitCast(fifo.readItem() orelse unreachable);
 
var r = fifo.reader();
var anyr = r.any();
const reported = try mqtt.Packet.unpackVarInt(&anyr);
 
ready = fifo.readableLength();
while (ready < reported and poll_more) {
@@ -246,17 +47,16 @@ pub fn main() !void {
ready = fifo.readableLength();
}
 
var r = fifo.reader();
switch (pkt.kind) {
.CONNACK => {
log.err("CONNACK ({}/{}) ", .{ reported, ready });
fifo.discard(@min(ready, reported));
try (Subscribe{ .channels = &.{"zigbee2mqtt/#"} }).send(&w);
try (mqtt.Subscribe{ .channels = &.{""} }).send(&any);
},
.PUBLISH => {
const slen = try r.readInt(u16, .big);
const topic = fifo.readableSliceOfLen(slen);
log.err("PUBLISH [{s}]", .{topic});
log.err("PUBLISH [{s}] [{any}]", .{ topic, topic });
fifo.discard(slen);
var pktid: ?u16 = null;
switch (pkt.flags.qos) {
@@ -266,7 +66,7 @@ pub fn main() !void {
.at_least_once => {
log.err(" expecting {s}", .{"PUBACK"});
pktid = try r.readInt(u16, .big);
try PublishAck.send(pktid.?, .success, &w);
try mqtt.Publish.Ack.send(pktid.?, .success, &any);
},
.exactly_once => {
log.err(" expecting {s}", .{"PUBREC"});
@@ -275,6 +75,10 @@ pub fn main() !void {
}
 
const drop = @min(ready, reported) - 2 - slen - if (pktid != null) 2 else @as(usize, 0);
 
const contents = fifo.readableSliceOfLen(drop);
log.err(">{s}<", .{contents});
 
log.err(" discarding {}", .{drop});
fifo.discard(drop);
},
@@ -300,48 +104,8 @@ pub fn main() !void {
log.err("end going to exit", .{});
}
 
fn unpackVarInt(buf: []const u8, used: *u3) usize {
std.debug.assert(used.* == 0);
var current: u8 = buf[used.*];
used.* += 1;
var result: usize = current & 127;
var mult: usize = 128;
while (current > 127) {
current = buf[used.*];
used.* += 1;
result += @as(usize, (current & 127)) * mult;
mult *= 128;
if (mult > 128 * 128 * 128) @panic("invalid var int");
}
 
return result;
}
 
test unpackVarInt {
var used: u3 = 0;
 
var result = unpackVarInt([4]u8{ 0, 0, 0, 0 }, &used);
try std.testing.expectEqual(used, 1);
try std.testing.expectEqual(result, 0);
used = 0;
result = unpackVarInt([4]u8{ 127, 0, 0, 0 }, &used);
try std.testing.expectEqual(used, 1);
try std.testing.expectEqual(result, 127);
used = 0;
result = unpackVarInt([4]u8{ 128, 1, 0, 0 }, &used);
try std.testing.expectEqual(used, 2);
try std.testing.expectEqual(result, 128);
used = 0;
result = unpackVarInt([4]u8{ 129, 1, 0, 0 }, &used);
try std.testing.expectEqual(used, 2);
try std.testing.expectEqual(result, 129);
}
 
test "simple test" {
var list = std.ArrayList(i32).init(std.testing.allocator);
defer list.deinit(); // try commenting this out and see if zig detects the memory leak!
try list.append(42);
try std.testing.expectEqual(@as(i32, 42), list.pop());
test "main" {
std.testing.refAllDecls(@This());
}
 
const std = @import("std");
 
filename was Deleted added: 695, removed: 283, total 412
@@ -0,0 +1,14 @@
//! makes a general attempt to follow
//! https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html
//! At least the parts that make sense
 
pub const Packet = @import("Packet.zig");
pub const Publish = @import("Publish.zig");
pub const Connect = @import("Connect.zig");
pub const Subscribe = @import("Subscribe.zig");
 
const mqtt = @This();
 
test mqtt {
@import("std").testing.refAllDecls(mqtt);
}