Skip to content

Commit

Permalink
Merge branch 'tweaks'
Browse files Browse the repository at this point in the history
  • Loading branch information
karlseguin committed Oct 6, 2024
2 parents 7364edb + c2257e0 commit e55f3f7
Show file tree
Hide file tree
Showing 10 changed files with 1,244 additions and 924 deletions.
24 changes: 12 additions & 12 deletions build.zig
Original file line number Diff line number Diff line change
Expand Up @@ -50,24 +50,24 @@ pub fn build(b: *std.Build) !void {
test_step.dependOn(&run_test.step);
}

const examples = [_]struct{
const examples = [_]struct {
file: []const u8,
name: []const u8,
libc: bool = false,
} {
.{.file = "examples/01_basic.zig", .name = "example_1"},
.{.file = "examples/02_handler.zig", .name = "example_2"},
.{.file = "examples/03_dispatch.zig", .name = "example_3"},
.{.file = "examples/04_action_context.zig", .name = "example_4"},
.{.file = "examples/05_request_takeover.zig", .name = "example_5"},
.{.file = "examples/06_middleware.zig", .name = "example_6"},
.{.file = "examples/07_advanced_routing.zig", .name = "example_7"},
.{.file = "examples/08_websocket.zig", .name = "example_8"},
.{.file = "examples/09_shutdown.zig", .name = "example_9", .libc = true},
}{
.{ .file = "examples/01_basic.zig", .name = "example_1" },
.{ .file = "examples/02_handler.zig", .name = "example_2" },
.{ .file = "examples/03_dispatch.zig", .name = "example_3" },
.{ .file = "examples/04_action_context.zig", .name = "example_4" },
.{ .file = "examples/05_request_takeover.zig", .name = "example_5" },
.{ .file = "examples/06_middleware.zig", .name = "example_6" },
.{ .file = "examples/07_advanced_routing.zig", .name = "example_7" },
.{ .file = "examples/08_websocket.zig", .name = "example_8" },
.{ .file = "examples/09_shutdown.zig", .name = "example_9", .libc = true },
};

{
for (examples) |ex| {
for (examples) |ex| {
const exe = b.addExecutable(.{
.name = ex.name,
.target = target,
Expand Down
25 changes: 2 additions & 23 deletions src/config.zig
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@ const httpz = @import("httpz.zig");
const request = @import("request.zig");
const response = @import("response.zig");

// don't like using CPU detection since hyperthread cores are marketing.
const DEFAULT_WORKERS = 2;

pub const Config = struct {
port: ?u16 = null,
address: ?[]const u8 = null,
Expand Down Expand Up @@ -60,31 +57,13 @@ pub const Config = struct {
};

pub fn threadPoolCount(self: *const Config) u32 {
const thread_count = self.thread_pool.count orelse 4;

// In blockingMode, we only have 1 worker (regardless of the
// config). We want to make blocking and nonblocking modes
// use the same number of threads, so we'll convert extra workers
// into thread pool threads.
// In blockingMode, the worker does relatively little work, and the
// thread pool threads do more, so this re-balancing makes some sense
// and can always be opted out of by explicitly setting
// config.workers.count = 1

if (httpz.blockingMode()) {
const worker_count = self.workerCount();
if (worker_count > 1) {
return thread_count + worker_count - 1;
}
}

return thread_count;
return self.thread_pool.count orelse 32;
}

pub fn workerCount(self: *const Config) u32 {
if (httpz.blockingMode()) {
return 1;
}
return self.workers.count orelse DEFAULT_WORKERS;
return self.workers.count orelse 1;
}
};
97 changes: 34 additions & 63 deletions src/httpz.zig
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ const log = std.log.scoped(.httpz);

const worker = @import("worker.zig");
const HTTPConn = worker.HTTPConn;
const ThreadPool = @import("thread_pool.zig").ThreadPool;

const build = @import("build");
const force_blocking: bool = if (@hasDecl(build, "httpz_blocking")) build.httpz_blocking else false;
Expand Down Expand Up @@ -246,24 +245,22 @@ pub fn Server(comptime H: type) type {
};

return struct {
const TP = if (blockingMode()) ThreadPool(worker.Blocking(*Self, WebsocketHandler).handleConnection) else ThreadPool(worker.NonBlocking(*Self, WebsocketHandler).processData);

handler: H,
config: Config,
arena: Allocator,
allocator: Allocator,
_router: Router(H, ActionArg),
_mut: Thread.Mutex,
_workers: []Worker,
_cond: Thread.Condition,
_thread_pool: *TP,
_signals: []posix.fd_t,
_listener: ?posix.socket_t,
_max_request_per_connection: usize,
_middlewares: []const Middleware(H),
_websocket_state: websocket.server.WorkerState,
_middleware_registry: std.SinglyLinkedList(Middleware(H)),

const Self = @This();
const Worker = if (blockingMode()) worker.Blocking(*Self, WebsocketHandler) else worker.NonBlocking(*Self, WebsocketHandler);

pub fn init(allocator: Allocator, config: Config, handler: H) !Self {
// Be mindful about where we pass this arena. Most things are able to
Expand All @@ -275,14 +272,6 @@ pub fn Server(comptime H: type) type {
arena.* = std.heap.ArenaAllocator.init(allocator);
errdefer arena.deinit();

const thread_pool = try TP.init(arena.allocator(), .{
.count = config.threadPoolCount(),
.backlog = config.thread_pool.backlog orelse 500,
.buffer_size = config.thread_pool.buffer_size orelse 32_768,
});

const signals: []posix.fd_t = if (blockingMode()) &.{} else try arena.allocator().alloc(posix.fd_t, config.workerCount());

const default_dispatcher = if (comptime Handler == void) defaultDispatcher else defaultDispatcherWithHandler;

// do not pass arena.allocator to WorkerState, it needs to be able to
Expand All @@ -305,26 +294,26 @@ pub fn Server(comptime H: type) type {
});
errdefer websocket_state.deinit();

const workers = try arena.allocator().alloc(Worker, config.workerCount());

return .{
.config = config,
.handler = handler,
.allocator = allocator,
.arena = arena.allocator(),
._mut = .{},
._cond = .{},
._workers = workers,
._listener = null,
._signals = signals,
._middlewares = &.{},
._middleware_registry = .{},
._thread_pool = thread_pool,
._websocket_state = websocket_state,
._router = try Router(H, ActionArg).init(arena.allocator(), default_dispatcher, handler),
._max_request_per_connection = config.timeout.request_count orelse MAX_REQUEST_COUNT,
};
}

pub fn deinit(self: *Self) void {
self._thread_pool.stop();
self._websocket_state.deinit();

var node = self._middleware_registry.first;
Expand Down Expand Up @@ -393,13 +382,15 @@ pub fn Server(comptime H: type) type {
}

self._listener = listener;

var workers = self._workers;
const allocator = self.allocator;

if (comptime blockingMode()) {
var w = try worker.Blocking(*Self, WebsocketHandler).init(allocator, self, &config);
defer w.deinit();
workers[0] = try worker.Blocking(*Self, WebsocketHandler).init(allocator, self, &config);
defer workers[0].deinit();

const thrd = try Thread.spawn(.{}, worker.Blocking(*Self, WebsocketHandler).listen, .{ &w, listener });
const thrd = try Thread.spawn(.{}, worker.Blocking(*Self, WebsocketHandler).listen, .{ &workers[0], listener });

// incase listenInNewThread was used and is waiting for us to start
self._cond.signal();
Expand All @@ -409,32 +400,22 @@ pub fn Server(comptime H: type) type {
// socket is closed.
thrd.join();
} else {
const Worker = worker.NonBlocking(*Self, WebsocketHandler);
var signals = self._signals;
const worker_count = signals.len;
const workers = try self.arena.alloc(Worker, worker_count);
const threads = try self.arena.alloc(Thread, worker_count);

var started: usize = 0;
defer for (0..started) |i| {
workers[i].deinit();
};

errdefer for (0..started) |i| {
// on success, these will be closed by a call to stop();
posix.close(signals[i]);
workers[i].stop();
};

defer {
for (0..started) |i| {
const threads = try self.arena.alloc(Thread, workers.len);
for (0..workers.len) |i| {
workers[i] = try Worker.init(allocator, self, &config);
errdefer {
workers[i].stop();
workers[i].deinit();
}
}

for (0..workers.len) |i| {
const pipe = try posix.pipe2(.{ .NONBLOCK = true });
signals[i] = pipe[1];
errdefer posix.close(pipe[1]);

workers[i] = try Worker.init(allocator, pipe, self, &config);
errdefer workers[i].deinit();

threads[i] = try Thread.spawn(.{}, Worker.run, .{ &workers[i], listener });
started += 1;
}
Expand All @@ -461,31 +442,22 @@ pub fn Server(comptime H: type) type {
}

pub fn stop(self: *Self) void {
// Order matters. When thread_pool.stop() is called, pending requests
// will continue to be processed. Only once the thread-pool queue is
// empty, does the thread-pool really stop.
// So we need to close the signal, which tells the worker to stop
// accepting / processing new requests.
{
self._mut.lock();
defer self._mut.unlock();
self._mut.lock();
defer self._mut.unlock();

for (self._signals) |s| {
posix.close(s);
}
for (self._workers) |*w| {
w.stop();
}

if (self._listener) |l| {
if (comptime blockingMode()) {
// necessary to unblock accept on linux
// (which might not be that necessary since, on Linux,
// NonBlocking should be used)
posix.shutdown(l, .recv) catch {};
}
posix.close(l);
if (self._listener) |l| {
if (comptime blockingMode()) {
// necessary to unblock accept on linux
// (which might not be that necessary since, on Linux,
// NonBlocking should be used)
posix.shutdown(l, .recv) catch {};
}
posix.close(l);
}
@atomicStore(usize, &self._max_request_per_connection, 0, .monotonic);
self._thread_pool.stop();
}

pub fn router(self: *Self, config: RouterConfig) *Router(H, ActionArg) {
Expand Down Expand Up @@ -538,7 +510,6 @@ pub fn Server(comptime H: type) type {
var req = Request.init(allocator, conn);
var res = Response.init(allocator, conn);


if (comptime std.meta.hasFn(Handler, "handle")) {
if (comptime @typeInfo(@TypeOf(Handler.handle)).@"fn".return_type != void) {
@compileError(@typeName(Handler) ++ ".handle must return 'void'");
Expand Down Expand Up @@ -577,7 +548,7 @@ pub fn Server(comptime H: type) type {

if (conn.handover == .unknown) {
// close is the default
conn.handover = if (req.canKeepAlive() and conn.request_count < @atomicLoad(usize, &self._max_request_per_connection, .monotonic)) .keepalive else .close;
conn.handover = if (req.canKeepAlive() and conn.request_count < self._max_request_per_connection) .keepalive else .close;
}

res.write() catch {
Expand Down Expand Up @@ -1188,7 +1159,7 @@ test "httpz: event stream" {

try t.expectEqual(818, res.status);
try t.expectEqual(true, res.headers.get("Content-Length") == null);
try t.expectString("text/event-stream", res.headers.get("Content-Type").?);
try t.expectString("text/event-stream; charset=UTF-8", res.headers.get("Content-Type").?);
try t.expectString("no-cache", res.headers.get("Cache-Control").?);
try t.expectString("keep-alive", res.headers.get("Connection").?);
try t.expectString("helloa message", res.body);
Expand Down
2 changes: 1 addition & 1 deletion src/key_value.zig
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const mem = std.mem;
const ascii = std.ascii;
const Allocator = std.mem.Allocator;

fn KeyValue(K: type, V: type, equalFn: fn (lhs: K, rhs: K) callconv(.Inline) bool, hashFn: fn (key: K) callconv(.Inline) u8) type {
fn KeyValue(K: type, V: type, equalFn: fn (lhs: K, rhs: K) callconv(.Inline) bool, hashFn: fn (key: K) callconv(.Inline) u8) type {
return struct {
len: usize,
keys: []K,
Expand Down
10 changes: 5 additions & 5 deletions src/metrics.zig
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const m = @import("metrics");
var metrics = Metrics{
.connections = m.Counter(usize).Impl.init("httpz_connections", .{}),
.requests = m.Counter(usize).Impl.init("httpz_requests", .{}),
.timeout_active = m.Counter(usize).Impl.init("httpz_timeout_active", .{}),
.timeout_request = m.Counter(usize).Impl.init("httpz_timeout_request", .{}),
.timeout_keepalive = m.Counter(usize).Impl.init("httpz_timeout_keepalive", .{}),
.alloc_buffer_empty = m.Counter(usize).Impl.init("httpz_alloc_buffer_empty", .{}),
.alloc_buffer_large = m.Counter(usize).Impl.init("httpz_alloc_buffer_large", .{}),
Expand All @@ -25,7 +25,7 @@ const Metrics = struct {
requests: m.Counter(usize).Impl,

// number of connections that were timed out while service a request
timeout_active: m.Counter(usize).Impl,
timeout_request: m.Counter(usize).Impl,

// number of connections that were timed out while in keepalive
timeout_keepalive: m.Counter(usize).Impl,
Expand Down Expand Up @@ -58,7 +58,7 @@ const Metrics = struct {
pub fn write(writer: anytype) !void {
try metrics.connections.write(writer);
try metrics.requests.write(writer);
try metrics.timeout_active.write(writer);
try metrics.timeout_request.write(writer);
try metrics.timeout_keepalive.write(writer);
try metrics.alloc_buffer_empty.write(writer);
try metrics.alloc_buffer_large.write(writer);
Expand Down Expand Up @@ -89,8 +89,8 @@ pub fn request() void {
metrics.requests.incr();
}

pub fn timeoutActive(count: usize) void {
metrics.timeout_active.incrBy(count);
pub fn timeoutRequest(count: usize) void {
metrics.timeout_request.incrBy(count);
}

pub fn timeoutKeepalive(count: usize) void {
Expand Down
Loading

0 comments on commit e55f3f7

Please sign in to comment.