srctree

Gregory Mullen parent a5059493 c29e11e9
add support for threads

inlinesplit
src/http.zig added: 64, removed: 15, total 49
@@ -5,10 +5,12 @@ router: Router,
auth: Auth.Provider,
 
listen_addr: std.net.Address,
max_request_size: usize = 0xffff,
request_buffer: [0xffff]u8 = undefined,
running: bool = true,
alive: bool = false,
threads: ?struct {
count: usize,
pool: std.Thread.Pool,
} = null,
 
const HTTP = @This();
 
@@ -25,6 +27,14 @@ pub fn init(a: Allocator, router: Router, opts: Options, sopts: VServer.Options)
.router = router,
.auth = sopts.auth,
.listen_addr = try std.net.Address.parseIp(opts.host, opts.port),
.threads = if (sopts.threads) |tcount| brk: {
var pool: std.Thread.Pool = undefined;
try pool.init(.{ .allocator = a, .n_jobs = tcount });
break :brk .{
.count = tcount,
.pool = pool,
};
} else null,
};
}
 
@@ -35,21 +45,40 @@ pub fn serve(http: *HTTP) !void {
log.info("HTTP Server listening on port: {any}", .{http.listen_addr.getPort()});
http.alive = true;
while (http.running) {
try http.once(&srv);
const conn = try srv.accept();
if (http.threads) |*threads| {
try threads.pool.spawn(onceThreaded, .{ http, conn });
} else {
try http.once(conn);
}
}
log.info("Normal HTTPD shutdown", .{});
}
 
pub fn once(http: *HTTP, srv: *net.Server) !void {
fn onceThreaded(http: *HTTP, acpt: net.Server.Connection) void {
once(http, acpt) catch |err| switch (err) {
error.HttpRequestTruncated => {
log.err("HttpRequestTruncated in threaded mode", .{});
},
else => {
log.err("Unexpected endpoint error {} in threaded mode", .{err});
http.running = false;
},
};
}
 
pub fn once(http: *HTTP, sconn: net.Server.Connection) !void {
var conn = sconn;
defer conn.stream.close();
 
var request_buffer: [0xfffff]u8 = undefined;
log.info("HTTP connection from {}", .{conn.address});
var hsrv = std.http.Server.init(conn, &request_buffer);
 
var arena = std.heap.ArenaAllocator.init(http.alloc);
defer arena.deinit();
const a = arena.allocator();
 
var conn = try srv.accept();
defer conn.stream.close();
 
log.info("HTTP connection from {}", .{conn.address});
var hsrv = std.http.Server.init(conn, &http.request_buffer);
 
var hreq = try hsrv.receiveHead();
const reqdata = try requestData(a, &hreq);
const req = try Request.initHttp(a, &hreq, reqdata);
 
src/server.zig added: 64, removed: 15, total 49
@@ -25,6 +25,7 @@ pub const Interface = union(RunModes) {
pub const Options = struct {
mode: RunMode = .{ .http = .{} },
auth: Auth.Provider = .invalid,
threads: ?u16 = null,
};
 
pub fn init(a: Allocator, router: Router, opts: Options) !Server {
 
src/zwsgi.zig added: 64, removed: 15, total 49
@@ -5,6 +5,7 @@ alloc: Allocator,
router: Router,
options: Options,
auth: Auth.Provider,
threads: ?u16,
 
unix_file: []const u8,
 
@@ -20,8 +21,9 @@ pub fn init(a: Allocator, router: Router, opts: Options, sopts: Server.Options)
.alloc = a,
.unix_file = opts.file,
.router = router,
.auth = sopts.auth,
.options = opts,
.auth = sopts.auth,
.threads = sopts.threads,
};
}
 
@@ -63,6 +65,11 @@ pub fn serve(z: *zWSGI) !void {
}
log.warn("Unix server listening", .{});
 
var thr_pool: std.Thread.Pool = undefined;
if (z.threads) |thread_count| {
try thr_pool.init(.{ .allocator = z.alloc, .n_jobs = thread_count });
}
 
while (running) {
var pollfds = [1]std.os.linux.pollfd{
.{ .fd = server.stream.handle, .events = std.math.maxInt(i16), .revents = 0 },
@@ -70,7 +77,12 @@ pub fn serve(z: *zWSGI) !void {
const ready = try std.posix.poll(&pollfds, 100);
if (ready == 0) continue;
const acpt = try server.accept();
try once(z, acpt);
 
if (z.threads) |_| {
try thr_pool.spawn(onceThreaded, .{ z, acpt });
} else {
try once(z, acpt);
}
}
log.warn("closing, and cleaning up", .{});
}
@@ -108,6 +120,13 @@ pub fn once(z: *const zWSGI, acpt: net.Server.Connection) !void {
z.router.builder(&frame, routed_endpoint);
}
 
fn onceThreaded(z: *const zWSGI, acpt: net.Server.Connection) void {
once(z, acpt) catch |err| {
log.err("Unexpected endpoint error {} in threaded mode", .{err});
running = false;
};
}
 
export fn sig_cb(sig: c_int, _: *const siginfo_t, _: ?*const anyopaque) callconv(.C) void {
switch (sig) {
std.posix.SIG.INT => {