srctree

Andrew Kelley parent f17ef5f9 e432dfc3
add wasm and websocket

build.zig added: 409, removed: 6, total 403
@@ -89,6 +89,37 @@ pub fn build(b: *std.Build) void {
.install_dir = .prefix,
.install_subdir = "www",
});
 
const wasm = b.addExecutable(.{
.name = "main",
.root_source_file = .{ .path = "client/main.zig" },
.target = b.resolveTargetQuery(.{
.cpu_arch = .wasm32,
.os_tag = .freestanding,
.cpu_features_add = std.Target.wasm.featureSet(&.{
.atomics,
.bulk_memory,
// .extended_const, not supported by Safari
.multivalue,
.mutable_globals,
.nontrapping_fptoint,
.reference_types,
//.relaxed_simd, not supported by Firefox or Safari
.sign_ext,
// observed to cause Error occured during wast conversion :
// Unknown operator: 0xfd058 in Firefox 117
//.simd128,
// .tail_call, not supported by Safari
}),
}),
.optimize = switch (optimize) {
.Debug => .Debug,
else => .ReleaseSmall,
},
});
wasm.rdynamic = true; // expose exported functions to wasm
wasm.entry = .disabled;
b.getInstallStep().dependOn(&b.addInstallFile(wasm.getEmittedBin(), "www/main.wasm").step);
}
 
fn installBin(b: *std.Build, bin: *std.Build.Step.Compile) void {
 
filename was Deleted added: 409, removed: 6, total 403
@@ -0,0 +1,55 @@
const std = @import("std");
const log = std.log;
const assert = std.debug.assert;
const gpa = std.heap.wasm_allocator;
 
const js = struct {
extern "js" fn log(ptr: [*]const u8, len: usize) void;
extern "js" fn panic(ptr: [*]const u8, len: usize) noreturn;
};
 
pub const std_options: std.Options = .{
.logFn = logFn,
.log_level = .debug,
};
 
pub fn panic(msg: []const u8, st: ?*std.builtin.StackTrace, addr: ?usize) noreturn {
_ = st;
_ = addr;
log.err("panic: {s}", .{msg});
@trap();
}
 
fn logFn(
comptime message_level: log.Level,
comptime scope: @TypeOf(.enum_literal),
comptime format: []const u8,
args: anytype,
) void {
const level_txt = comptime message_level.asText();
const prefix2 = if (scope == .default) ": " else "(" ++ @tagName(scope) ++ "): ";
var buf: [500]u8 = undefined;
const line = std.fmt.bufPrint(&buf, level_txt ++ prefix2 ++ format, args) catch l: {
buf[buf.len - 3 ..][0..3].* = "...".*;
break :l &buf;
};
js.log(line.ptr, line.len);
}
 
export fn alloc(n: usize) [*]u8 {
const slice = gpa.alloc(u8, n) catch @panic("OOM");
return slice.ptr;
}
 
var message_buffer: std.ArrayListUnmanaged(u8) = .{};
 
/// Resizes the message buffer to be the correct length; returns the pointer to
/// the query string.
export fn message_begin(len: usize) [*]u8 {
message_buffer.resize(gpa, len) catch @panic("OOM");
return message_buffer.items.ptr;
}
 
export fn message_end() void {
log.debug("got message: {s}", .{message_buffer.items});
}
 
filename was Deleted added: 409, removed: 6, total 403
@@ -0,0 +1,212 @@
//! See https://tools.ietf.org/html/rfc6455
 
const builtin = @import("builtin");
const std = @import("std");
const WebSocket = @This();
const assert = std.debug.assert;
const native_endian = builtin.cpu.arch.endian();
 
key: []const u8,
request: *std.http.Server.Request,
recv_fifo: std.fifo.LinearFifo(u8, .Slice),
reader: std.io.AnyReader,
response: std.http.Server.Response,
 
pub const InitError = error{WebSocketUpgradeMissingKey} ||
std.http.Server.Request.ReaderError;
 
pub fn init(
ws: *WebSocket,
request: *std.http.Server.Request,
send_buffer: []u8,
recv_buffer: []align(4) u8,
) InitError!bool {
var sec_websocket_key: ?[]const u8 = null;
var upgrade_websocket: bool = false;
var it = request.iterateHeaders();
while (it.next()) |header| {
if (std.ascii.eqlIgnoreCase(header.name, "sec-websocket-key")) {
sec_websocket_key = header.value;
} else if (std.ascii.eqlIgnoreCase(header.name, "upgrade")) {
if (!std.mem.eql(u8, header.value, "websocket"))
return false;
upgrade_websocket = true;
}
}
if (!upgrade_websocket)
return false;
 
const key = sec_websocket_key orelse return error.WebSocketUpgradeMissingKey;
 
var sha1 = std.crypto.hash.Sha1.init(.{});
sha1.update(key);
sha1.update("258EAFA5-E914-47DA-95CA-C5AB0DC85B11");
var digest: [std.crypto.hash.Sha1.digest_length]u8 = undefined;
sha1.final(&digest);
var base64_digest: [28]u8 = undefined;
assert(std.base64.standard.Encoder.encode(&base64_digest, &digest).len == base64_digest.len);
 
request.head.content_length = std.math.maxInt(u64);
 
ws.* = .{
.key = key,
.recv_fifo = std.fifo.LinearFifo(u8, .Slice).init(recv_buffer),
.reader = try request.reader(),
.response = request.respondStreaming(.{
.send_buffer = send_buffer,
.respond_options = .{
.status = .switching_protocols,
.extra_headers = &.{
.{ .name = "upgrade", .value = "websocket" },
.{ .name = "connection", .value = "upgrade" },
.{ .name = "sec-websocket-accept", .value = &base64_digest },
},
.transfer_encoding = .none,
},
}),
.request = request,
};
return true;
}
 
pub const Header0 = packed struct(u8) {
opcode: Opcode,
rsv3: u1 = 0,
rsv2: u1 = 0,
rsv1: u1 = 0,
fin: bool,
};
 
pub const Header1 = packed struct(u8) {
payload_len: enum(u7) {
len16 = 126,
len64 = 127,
_,
},
mask: bool,
};
 
pub const Opcode = enum(u4) {
continuation = 0,
text = 1,
binary = 2,
connection_close = 8,
ping = 9,
pong = 10,
_,
};
 
pub const ReadSmallTextMessageError = error{
ConnectionClose,
UnexpectedOpCode,
MessageTooBig,
};
 
/// Reads the next message from the WebSocket stream, failing if the message does not fit
/// into `recv_buffer`.
pub fn readSmallMessage(ws: *WebSocket) ReadSmallTextMessageError![]u8 {
const header_bytes = (try recv(ws, 2))[0..2];
const h0: Header0 = @bitCast(header_bytes[0]);
const h1: Header1 = @bitCast(header_bytes[1]);
 
switch (h0.opcode) {
.text, .binary => {},
.connection_close => return error.ConnectionClose,
else => return error.UnexpectedOpCode,
}
 
if (!h0.fin) return error.MessageTooBig;
if (!h1.mask) return error.MissingMaskBit;
 
const len: usize = switch (h1.payload_len) {
.len16 => try recvReadInt(ws, u16),
.len64 => std.math.cast(usize, try recvReadInt(ws, u64)) orelse return error.MessageTooBig,
else => @intFromEnum(h1.payload_len),
};
if (len > ws.recv_fifo.buf.len) return error.MessageTooBig;
 
const mask: u32 = @bitCast((try recv(ws, 4))[0..4].*);
const payload = try recv(ws, len);
 
// The last item may contain a partial word of unused data.
const u32_payload: []u32 = std.mem.bytesAsSlice(u32, payload);
for (u32_payload) |*elem| elem.* ^= mask;
 
return payload;
}
 
fn recv(ws: *WebSocket, n: usize) ![]u8 {
const result = try recvPeek(ws, n);
ws.recv_fifo.discard(n);
return result;
}
 
fn recvPeek(ws: *WebSocket, n: usize) ![]u8 {
assert(n <= ws.recv_fifo.buf.len);
if (n > ws.recv_fifo.count) {
const small_buf = ws.recv_fifo.writableSlice();
const needed = n - ws.recv_fifo.count;
const buf = if (small_buf.len >= needed) small_buf else b: {
ws.recv_fifo.realign();
break :b ws.recv_fifo.writableSlice();
};
try ws.reader.readNoEof(buf);
ws.recv_fifo.update(buf.len);
}
return ws.recv_fifo.readableSliceOfLen(n);
}
 
fn recvReadInt(ws: *WebSocket, comptime I: type) I {
const unswapped: I = @bitCast((try recv(ws, @sizeOf(I))[0..@sizeOf(I)]).*);
return switch (native_endian) {
.little => @byteSwap(unswapped),
.big => unswapped,
};
}
 
pub const WriteError = std.http.Server.Response.WriteError;
 
pub fn writeMessage(ws: *WebSocket, message: []const std.posix.iovec_const) WriteError!void {
const total_len = l: {
var total_len: u64 = 0;
for (message) |iovec| total_len += iovec.iov_len;
break :l total_len;
};
 
var header_buf: [2 + 8]u8 = undefined;
header_buf[0] = @bitCast(@as(Header0, .{
.opcode = .binary,
.fin = true,
}));
const header = switch (total_len) {
0...125 => blk: {
header_buf[1] = @bitCast(@as(Header1, .{
.payload_len = @enumFromInt(total_len),
.mask = false,
}));
break :blk header_buf[0..2];
},
126...0xffff => blk: {
header_buf[1] = @bitCast(@as(Header1, .{
.payload_len = .len16,
.mask = false,
}));
std.mem.writeInt(u16, header_buf[2..4], @intCast(total_len), .big);
break :blk header_buf[0..4];
},
else => blk: {
header_buf[1] = @bitCast(@as(Header1, .{
.payload_len = .len64,
.mask = false,
}));
std.mem.writeInt(u64, header_buf[2..10], total_len, .big);
break :blk header_buf[0..10];
},
};
 
const response = &ws.response;
try response.writeAll(header);
for (message) |iovec|
try response.writeAll(iovec.iov_base[0..iovec.iov_len]);
try response.flush();
}
 
server/main.zig added: 409, removed: 6, total 403
@@ -6,6 +6,7 @@ const Config = @import("Config.zig");
const Allocator = std.mem.Allocator;
const assert = std.debug.assert;
const Db = @import("Db.zig");
const WebSocket = @import("WebSocket.zig");
const StaticHttpFileServer = @import("StaticHttpFileServer");
 
const usage =
@@ -171,13 +172,45 @@ pub const Server = struct {
 
fn handleConnection(s: *Server, connection: std.net.Server.Connection) !void {
var read_buffer: [0x4000]u8 = undefined;
var send_buffer: [0x4000]u8 = undefined;
var web_socket: WebSocket = undefined;
var ws_recv_buffer: [0x4000]u8 align(4) = undefined;
var http_server = std.http.Server.init(connection, &read_buffer);
 
while (http_server.state == .ready) {
var request = try http_server.receiveHead();
try s.static_http_file_server.serve(&request);
if (try web_socket.init(&request, &send_buffer, &ws_recv_buffer)) {
try handleWebSocket(s, &web_socket);
} else {
try s.static_http_file_server.serve(&request);
}
}
}
 
fn handleWebSocket(s: *Server, ws: *WebSocket) !void {
const send_thread = std.Thread.spawn(.{}, websocketSendLoop, .{ s, ws }) catch |err| {
std.log.err("spawning websocket handler thread failed: {s}", .{@errorName(err)});
return err;
};
defer send_thread.join();
 
while (true) {}
}
 
fn websocketSendLoop(s: *Server, ws: *WebSocket) !void {
const files = std.mem.sliceAsBytes(s.db.files.items);
const directories = std.mem.sliceAsBytes(s.db.directories.items);
const albums = std.mem.sliceAsBytes(s.db.albums.keys());
const string_bytes = s.db.string_bytes.items;
 
const iovecs = [_]std.posix.iovec_const{
.{ .iov_base = files.ptr, .iov_len = files.len },
.{ .iov_base = directories.ptr, .iov_len = directories.len },
.{ .iov_base = albums.ptr, .iov_len = albums.len },
.{ .iov_base = string_bytes.ptr, .iov_len = string_bytes.len },
};
try ws.writeMessage(&iovecs);
}
};
 
fn scanDir(db: *Db, gpa: Allocator, db_dir: Db.Path.Index, it: *std.fs.Dir.Iterator) anyerror!void {
 
www/main.js added: 409, removed: 6, total 403
@@ -1,3 +1,75 @@
(function(){
let ws = null;
let wasm_exports = null;
const text_decoder = new TextDecoder();
const text_encoder = new TextEncoder();
 
WebAssembly.instantiateStreaming(fetch("main.wasm"), {
js: {
log: function(ptr, len) {
const msg = decodeString(ptr, len);
console.log(msg);
},
panic: function (ptr, len) {
const msg = decodeString(ptr, len);
throw new Error("panic: " + msg);
},
},
}).then(function(obj) {
wasm_exports = obj.instance.exports;
window.wasm = obj; // for debugging
 
connectWebSocket();
});
 
function connectWebSocket() {
const host = window.document.location.host;
const pathname = window.document.location.pathname;
const isHttps = window.document.location.protocol === 'https:';
const match = host.match(/^(.+):(\d+)$/);
const defaultPort = isHttps ? 443 : 80;
const port = match ? parseInt(match[2], 10) : defaultPort;
const hostName = match ? match[1] : host;
const wsProto = isHttps ? "wss:" : "ws:";
const wsUrl = wsProto + '//' + hostName + ':' + port + pathname;
ws = new WebSocket(wsUrl);
ws.addEventListener('message', onWebSocketMessage, false);
ws.addEventListener('error', timeoutThenCreateNew, false);
ws.addEventListener('close', timeoutThenCreateNew, false);
ws.addEventListener('open', onWebSocketOpen, false);
}
 
function onWebSocketOpen() {
console.log("web socket opened");
}
 
function onWebSocketClose() {
console.log("web socket closed");
}
 
function onWebSocketMessage(ev) {
wasmOnMessage(ev.data);
}
 
function timeoutThenCreateNew() {
ws.removeEventListener('message', onWebSocketMessage, false);
ws.removeEventListener('error', timeoutThenCreateNew, false);
ws.removeEventListener('close', timeoutThenCreateNew, false);
ws.removeEventListener('open', onWebSocketOpen, false);
onWebSocketClose();
setTimeout(connectWebSocket, 1000);
}
 
function wasmOnMessage(data) {
const len = data.size;
const ptr = wasm_exports.message_begin(len);
const wasmArray = new Uint8Array(wasm_exports.memory.buffer, ptr, len);
wasmArray.set(data);
wasm_exports.message_end();
}
 
function decodeString(ptr, len) {
if (len === 0) return "";
return text_decoder.decode(new Uint8Array(wasm_exports.memory.buffer, ptr, len));
}
})();