srctree

Gregory Mullen parent 39cd0052 4ae6a451
move downstream exclusively into request

inlinesplit
src/frame.zig added: 143, removed: 124, total 19
@@ -10,9 +10,6 @@
alloc: Allocator,
/// Base Request object from the client.
request: *const Request,
/// downsteam writer based on which ever server accepted the client request and
/// created this Frame
downstream: Downstream,
/// Request URI as received by Verse
uri: Router.UriIterator,
 
@@ -48,16 +45,6 @@ pub const SendError = error{
HeadersFinished,
} || NetworkError;
 
pub const Downstream = union(enum) {
buffer: Buffered,
http: Stream,
zwsgi: Stream,
 
pub const Error = Stream.WriteError || Buffered.Error;
 
const Buffered = std.io.BufferedWriter(ONESHOT_SIZE, Stream.Writer);
};
 
/// sendPage is the default way to respond in verse using the Template system.
/// sendPage will flush headers to the client before sending Page data
pub fn sendPage(frame: *Frame, page: anytype) NetworkError!void {
@@ -70,31 +57,28 @@ pub fn sendPage(frame: *Frame, page: anytype) NetworkError!void {
 
try frame.sendRawSlice("\r\n");
 
switch (frame.downstream) {
.http, .zwsgi => |stream| {
var vec_s: [2048]IOVec = @splat(undefined);
var vecs: []IOVec = vec_s[0..];
const required = page.iovecCountAll();
if (required > vec_s.len) {
vecs = frame.alloc.alloc(IOVec, required) catch @panic("OOM");
}
var stkfb = std.heap.stackFallback(0xffff, frame.alloc);
const stkalloc = stkfb.get();
const vec = page.ioVec(vecs, stkalloc) catch |iovec_err| {
log.err("Error building iovec ({}) fallback to writer", .{iovec_err});
const w = stream.writer();
page.format("{}", .{}, w) catch |err| switch (err) {
else => log.err("Page Build Error {}", .{err}),
};
return;
};
stream.writevAll(@ptrCast(vec)) catch |err| switch (err) {
else => log.err("iovec write error Error {} len {}", .{ err, vec.len }),
};
if (required > 2048) frame.alloc.free(vecs);
},
.buffer => @panic("not implemented"),
const stream = frame.request.downstream;
 
var vec_s: [2048]IOVec = @splat(undefined);
var vecs: []IOVec = vec_s[0..];
const required = page.iovecCountAll();
if (required > vec_s.len) {
vecs = frame.alloc.alloc(IOVec, required) catch @panic("OOM");
}
var stkfb = std.heap.stackFallback(0xffff, frame.alloc);
const stkalloc = stkfb.get();
const vec = page.ioVec(vecs, stkalloc) catch |iovec_err| {
log.err("Error building iovec ({}) fallback to writer", .{iovec_err});
const w = stream.writer();
page.format("{}", .{}, w) catch |err| switch (err) {
else => log.err("Page Build Error {}", .{err}),
};
return;
};
stream.writevAll(@ptrCast(vec)) catch |err| switch (err) {
else => log.err("iovec write error Error {} len {}", .{ err, vec.len }),
};
if (required > 2048) frame.alloc.free(vecs);
}
 
/// sendRawSlice will allow you to send data directly to the client. It will not
@@ -189,10 +173,6 @@ pub fn init(a: Allocator, req: *const Request, auth: Auth.Provider) !Frame {
return .{
.alloc = a,
.request = req,
.downstream = switch (req.raw) {
.zwsgi => |z| .{ .zwsgi = z.*.conn.stream },
.http => .{ .http = req.raw.http.server.connection.stream },
},
.uri = try splitUri(req.uri),
.auth_provider = auth,
.headers = Headers.init(),
@@ -225,55 +205,51 @@ pub fn sendHeaders(vrs: *Frame) SendError!void {
return SendError.HeadersFinished;
}
 
switch (vrs.downstream) {
.http, .zwsgi => |stream| {
var vect = VecList(HEADER_VEC_COUNT).init();
const stream = vrs.request.downstream;
var vect = VecList(HEADER_VEC_COUNT).init();
 
const h_resp = vrs.HttpHeader("HTTP/1.1");
try vect.append(h_resp);
const h_resp = vrs.HttpHeader("HTTP/1.1");
try vect.append(h_resp);
 
// Default headers
const s_name = "Server: verse/" ++ build_version ++ "\r\n";
try vect.append(s_name);
// Default headers
const s_name = "Server: verse/" ++ build_version ++ "\r\n";
try vect.append(s_name);
 
if (vrs.content_type) |ct| {
try vect.append("Content-Type: ");
switch (ct.base) {
inline else => |tag, name| {
try vect.append(@tagName(name));
try vect.append("/");
try vect.append(@tagName(tag));
},
}
if (ct.parameter) |param| {
const pre = "; charset=";
try vect.append(pre);
const tag = @tagName(param);
try vect.append(tag);
}
try vect.append("\r\n");
//"text/html; charset=utf-8"); // Firefox is trash
}
 
var itr = vrs.headers.iterator();
while (itr.next()) |header| {
try vect.append(header.name);
try vect.append(": ");
try vect.append(header.value);
try vect.append("\r\n");
}
 
for (vrs.cookie_jar.cookies.items) |cookie| {
const used = try cookie.writeVec(vect.vect[vect.length..]);
vect.length += used;
try vect.append("\r\n");
}
 
stream.writevAll(@ptrCast(vect.vect[0..vect.length])) catch return error.IOWriteFailure;
},
.buffer => @panic("not implemented"),
if (vrs.content_type) |ct| {
try vect.append("Content-Type: ");
switch (ct.base) {
inline else => |tag, name| {
try vect.append(@tagName(name));
try vect.append("/");
try vect.append(@tagName(tag));
},
}
if (ct.parameter) |param| {
const pre = "; charset=";
try vect.append(pre);
const tag = @tagName(param);
try vect.append(tag);
}
try vect.append("\r\n");
//"text/html; charset=utf-8"); // Firefox is trash
}
 
var itr = vrs.headers.iterator();
while (itr.next()) |header| {
try vect.append(header.name);
try vect.append(": ");
try vect.append(header.value);
try vect.append("\r\n");
}
 
for (vrs.cookie_jar.cookies.items) |cookie| {
const used = try cookie.writeVec(vect.vect[vect.length..]);
vect.length += used;
try vect.append("\r\n");
}
 
stream.writevAll(@ptrCast(vect.vect[0..vect.length])) catch return error.IOWriteFailure;
 
vrs.headers_done = true;
}
 
@@ -305,27 +281,17 @@ fn writeAll(vrs: Frame, data: []const u8) !void {
}
}
 
fn writevAll(vrs: Frame, vect: []IOVec) !void {
switch (vrs.downstream) {
.zwsgi, .http => |stream| try stream.writevAll(@ptrCast(vect)),
.buffer => @panic("not implemented"),
}
fn writevAll(f: Frame, vect: []IOVec) !void {
return f.request.downstream.writevAll(vect);
}
 
// Raw writer, use with caution!
fn write(vrs: Frame, data: []const u8) Downstream.Error!usize {
return switch (vrs.downstream) {
.zwsgi => |*w| try w.write(data),
.http => |*w| try w.write(data),
.buffer => try vrs.write(data),
};
return vrs.request.downstream.write(data);
}
 
fn flush(vrs: *Frame) Downstream.Error!void {
switch (vrs.downstream) {
.buffer => |*w| try w.flush(),
.http, .zwsgi => {},
}
fn flush(f: *Frame) Downstream.Error!void {
return f.request.downstream.flush();
}
 
fn HttpHeader(vrs: *Frame, comptime ver: []const u8) [:0]const u8 {
@@ -359,7 +325,7 @@ fn HttpHeader(vrs: *Frame, comptime ver: []const u8) [:0]const u8 {
}
 
pub fn dumpDebugData(frame: *const Frame) void {
switch (frame.request.raw) {
switch (frame.request.downstream) {
.zwsgi => |zw| {
var itr = zw.known.iterator();
while (itr.next()) |entry| {
@@ -378,6 +344,7 @@ pub fn dumpDebugData(frame: *const Frame) void {
std.debug.print("DumpDebug request header => {s} -> {s}\n", .{ header.name, header.value });
}
},
.buffer => |_| @panic("not implemented"),
}
if (frame.request.data.post) |post_data| {
std.debug.print("post data => '''{s}'''\n", .{post_data.rawpost});
@@ -402,6 +369,7 @@ const IOVec = @import("iovec.zig").IOVec;
 
const Server = @import("server.zig");
const Request = @import("request.zig");
pub const Downstream = Request.DownstreamGateway;
const RequestData = @import("request-data.zig");
const Template = @import("template.zig");
const Router = @import("router.zig");
 
src/request.zig added: 143, removed: 124, total 19
@@ -16,22 +16,73 @@ headers: Headers,
cookie_jar: Cookies.Jar,
/// POST or QUERY data
data: Data,
/// TODO this is unstable and likely to be removed
raw: DownstreamGateway,
/// downstream connection to the client.
downstream: DownstreamGateway,
 
const Request = @This();
 
pub const Data = @import("request-data.zig");
pub const UserAgent = @import("user-agent.zig");
const Headers = @import("headers.zig");
const Cookies = @import("cookies.zig");
const zWSGIRequest = @import("zwsgi.zig").zWSGIRequest;
const zWSGIParam = @import("zwsgi.zig").zWSGIParam;
 
pub const DownstreamGateway = union(Downstream) {
zwsgi: *zWSGIRequest,
http: *std.http.Server.Request,
buffer: *std.io.FixedBufferStream([]u8),
 
pub const Error = std.net.Stream.WriteError || std.io.FixedBufferStream([]u8).WriteError;
pub const Writer = std.io.GenericWriter(DownstreamGateway, Error, write);
 
pub fn writer(ds: DownstreamGateway) Writer {
return .{
.context = ds,
};
}
 
fn untypedWrite(ptr: *const anyopaque, bytes: []const u8) anyerror!usize {
const ds: *const DownstreamGateway = @alignCast(@ptrCast(ptr));
return try ds.write(bytes);
}
 
pub fn writeAll(ds: DownstreamGateway, data: []const u8) Error!void {
var index: usize = 0;
while (index < data.len) {
index += try write(ds, data[index..]);
}
}
 
pub fn writevAll(ds: DownstreamGateway, vect: []IOVec) Error!void {
switch (ds) {
.zwsgi => |z| try z.conn.stream.writevAll(@ptrCast(vect)),
.http => |h| try h.server.connection.stream.writevAll(@ptrCast(vect)),
.buffer => @panic("not implemented"),
}
}
 
// Raw writer, use with caution!
pub fn write(ds: DownstreamGateway, data: []const u8) Error!usize {
return switch (ds) {
.zwsgi => |z| try z.conn.stream.write(data),
.http => |h| try h.server.connection.stream.write(data),
.buffer => try ds.write(data),
};
}
 
pub fn flush(ds: DownstreamGateway) Error!void {
switch (ds) {
.buffer => |_| {}, // TODO implement flush for buffered writer
.http, .zwsgi => {},
}
}
};
 
const Downstream = enum {
zwsgi,
http,
buffer,
};
 
pub const Host = []const u8;
@@ -116,11 +167,6 @@ pub const Protocol = union(enum) {
pub const default: Protocol = .{ .http = .@"1.1" };
};
 
const Headers = @import("headers.zig");
const Cookies = @import("cookies.zig");
const zWSGIRequest = @import("zwsgi.zig").zWSGIRequest;
const zWSGIParam = @import("zwsgi.zig").zWSGIParam;
 
fn initCommon(
a: Allocator,
remote_addr: RemoteAddr,
@@ -136,7 +182,7 @@ fn initCommon(
cookies: ?[]const u8,
proto: []const u8,
data: Data,
raw: DownstreamGateway,
downstream: DownstreamGateway,
secure: bool,
) !Request {
var method = _method;
@@ -154,7 +200,7 @@ fn initCommon(
.headers = headers,
.host = host,
.method = method,
.raw = raw,
.downstream = downstream,
.referer = referer,
.remote_addr = remote_addr,
.uri = uri,
@@ -298,3 +344,5 @@ const eql = std.mem.eql;
const eqlIgnoreCase = std.ascii.eqlIgnoreCase;
const allocPrint = std.fmt.allocPrint;
const bufPrint = std.fmt.bufPrint;
 
const IOVec = @import("iovec.zig").IOVec;
 
src/router.zig added: 143, removed: 124, total 19
@@ -353,10 +353,11 @@ pub fn defaultBuilder(vrs: *Frame, build: BuildFn) void {
// dumping the information is likely to help with debugging the
// error.
log.err("Abusive {} because {}\n", .{ vrs.request, err });
var itr = vrs.request.raw.http.iterateHeaders();
while (itr.next()) |vars| {
log.err("Abusive var '{s}' => '''{s}'''\n", .{ vars.name, vars.value });
}
// TODO fix me
//var itr = vrs.request.raw.http.iterateHeaders();
//while (itr.next()) |vars| {
// log.err("Abusive var '{s}' => '''{s}'''\n", .{ vars.name, vars.value });
//}
},
}
};
 
src/websocket.zig added: 143, removed: 124, total 19
@@ -50,15 +50,17 @@ fn respond(f: *Frame, key: []const u8) WriteError!void {
pub fn send(ws: Websocket, msg: []const u8) WriteError!void {
const m = Message.init(msg, .text);
 
_ = switch (ws.frame.downstream) {
.zwsgi, .http => |stream| stream.writev(m.toVec()[0..3]),
else => unreachable,
} catch return error.IOWriteFailure;
return ws.frame.request.downstream.writevAll(
@ptrCast(@constCast(m.toVec()[0..3])),
) catch |err| switch (err) {
else => return error.IOWriteFailure,
};
}
 
pub fn recieve(ws: *Websocket, buffer: []align(8) u8) Error!Message {
var reader = switch (ws.frame.downstream) {
.zwsgi, .http => |stream| stream.reader(),
var reader = switch (ws.frame.request.downstream) {
.zwsgi => |z| z.conn.stream.reader(),
.http => |h| h.server.connection.stream.reader(),
else => unreachable,
};
return Message.read(reader.any(), buffer) catch error.IOReadFailure;