From c1734c6ec5ef709ee4126b3474c7bee0a377a1fa Mon Sep 17 00:00:00 2001 From: Jarred Sumner Date: Sun, 28 Aug 2022 21:28:05 -0700 Subject: [PATCH] More reliable macOS event loop (#1166) * More reliable macOS event loop * Reduce CPU usage of idling * Add another implementation * Add benchmark Co-authored-by: Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> --- Makefile | 61 ++++- bench/snippets/set-timeout.mjs | 16 ++ build.zig | 22 ++ misctools/machbench.zig | 137 +++++++++++ src/bun.js/event_loop.zig | 17 +- src/bun_js.zig | 20 +- src/http_client_async.zig | 12 +- src/io/io_darwin.cpp | 65 +++++ src/io/io_darwin.zig | 304 ++++++++++++++++++------ src/io/io_linux.zig | 28 ++- src/network_thread.zig | 50 ++-- test/bun.js/ffi.test.fixture.callback.c | 11 +- test/bun.js/ffi.test.fixture.receiver.c | 8 +- 13 files changed, 607 insertions(+), 144 deletions(-) create mode 100644 bench/snippets/set-timeout.mjs create mode 100644 misctools/machbench.zig create mode 100644 src/io/io_darwin.cpp diff --git a/Makefile b/Makefile index 06ed00d1bf8445..4790030638abbf 100644 --- a/Makefile +++ b/Makefile @@ -243,6 +243,7 @@ SRC_FILES := $(wildcard $(SRC_DIR)/*.cpp) SRC_WEBCORE_FILES := $(wildcard $(SRC_DIR)/webcore/*.cpp) SRC_SQLITE_FILES := $(wildcard $(SRC_DIR)/sqlite/*.cpp) SRC_NODE_OS_FILES := $(wildcard $(SRC_DIR)/node_os/*.cpp) +SRC_IO_FILES := $(wildcard src/io/*.cpp) SRC_BUILTINS_FILES := $(wildcard src/bun.js/builtins/*.cpp) OBJ_FILES := $(patsubst $(SRC_DIR)/%.cpp,$(OBJ_DIR)/%.o,$(SRC_FILES)) @@ -250,15 +251,18 @@ WEBCORE_OBJ_FILES := $(patsubst $(SRC_DIR)/webcore/%.cpp,$(OBJ_DIR)/%.o,$(SRC_WE SQLITE_OBJ_FILES := $(patsubst $(SRC_DIR)/sqlite/%.cpp,$(OBJ_DIR)/%.o,$(SRC_SQLITE_FILES)) NODE_OS_OBJ_FILES := $(patsubst $(SRC_DIR)/node_os/%.cpp,$(OBJ_DIR)/%.o,$(SRC_NODE_OS_FILES)) BUILTINS_OBJ_FILES := $(patsubst src/bun.js/builtins/%.cpp,$(OBJ_DIR)/%.o,$(SRC_BUILTINS_FILES)) +IO_FILES := $(patsubst src/io/%.cpp,$(OBJ_DIR)/%.o,$(SRC_IO_FILES)) + DEBUG_OBJ_FILES := $(patsubst $(SRC_DIR)/%.cpp,$(DEBUG_OBJ_DIR)/%.o,$(SRC_FILES)) DEBUG_WEBCORE_OBJ_FILES := $(patsubst $(SRC_DIR)/webcore/%.cpp,$(DEBUG_OBJ_DIR)/%.o,$(SRC_WEBCORE_FILES)) DEBUG_SQLITE_OBJ_FILES := $(patsubst $(SRC_DIR)/sqlite/%.cpp,$(DEBUG_OBJ_DIR)/%.o,$(SRC_SQLITE_FILES)) DEBUG_NODE_OS_OBJ_FILES := $(patsubst $(SRC_DIR)/node_os/%.cpp,$(DEBUG_OBJ_DIR)/%.o,$(SRC_NODE_OS_FILES)) DEBUG_BUILTINS_OBJ_FILES := $(patsubst src/bun.js/builtins/%.cpp,$(DEBUG_OBJ_DIR)/%.o,$(SRC_BUILTINS_FILES)) +DEBUG_IO_FILES := $(patsubst src/io/%.cpp,$(DEBUG_OBJ_DIR)/%.o,$(SRC_IO_FILES)) -BINDINGS_OBJ := $(OBJ_FILES) $(WEBCORE_OBJ_FILES) $(SQLITE_OBJ_FILES) $(NODE_OS_OBJ_FILES) $(BUILTINS_OBJ_FILES) -DEBUG_BINDINGS_OBJ := $(DEBUG_OBJ_FILES) $(DEBUG_WEBCORE_OBJ_FILES) $(DEBUG_SQLITE_OBJ_FILES) $(DEBUG_NODE_OS_OBJ_FILES) $(DEBUG_BUILTINS_OBJ_FILES) +BINDINGS_OBJ := $(OBJ_FILES) $(WEBCORE_OBJ_FILES) $(SQLITE_OBJ_FILES) $(NODE_OS_OBJ_FILES) $(BUILTINS_OBJ_FILES) $(IO_FILES) +DEBUG_BINDINGS_OBJ := $(DEBUG_OBJ_FILES) $(DEBUG_WEBCORE_OBJ_FILES) $(DEBUG_SQLITE_OBJ_FILES) $(DEBUG_NODE_OS_OBJ_FILES) $(DEBUG_BUILTINS_OBJ_FILES) $(DEBUG_IO_FILES) MAC_INCLUDE_DIRS := -I$(WEBKIT_RELEASE_DIR)/JavaScriptCore/PrivateHeaders \ -I$(WEBKIT_RELEASE_DIR)/WTF/Headers \ @@ -751,9 +755,9 @@ generate-install-script: @esbuild --log-level=error --define:BUN_VERSION="\"$(PACKAGE_JSON_VERSION)\"" --define:process.env.NODE_ENV="\"production\"" --platform=node --format=cjs $(PACKAGES_REALPATH)/bun/install.ts > $(PACKAGES_REALPATH)/bun/install.js .PHONY: fetch -fetch: +fetch: $(IO_FILES) $(ZIG) build -Drelease-fast fetch-obj - $(CXX) $(PACKAGE_DIR)/fetch.o -g $(OPTIMIZATION_LEVEL) -o ./misctools/fetch $(DEFAULT_LINKER_FLAGS) -lc $(MINIMUM_ARCHIVE_FILES) + $(CXX) $(PACKAGE_DIR)/fetch.o -g $(OPTIMIZATION_LEVEL) -o ./misctools/fetch $(IO_FILES) $(DEFAULT_LINKER_FLAGS) -lc $(MINIMUM_ARCHIVE_FILES) rm -rf $(PACKAGE_DIR)/fetch.o .PHONY: sha @@ -763,19 +767,31 @@ sha: rm -rf $(PACKAGE_DIR)/sha.o .PHONY: fetch-debug -fetch-debug: +fetch-debug: $(IO_FILES) $(ZIG) build fetch-obj - $(CXX) $(DEBUG_PACKAGE_DIR)/fetch.o -g $(OPTIMIZATION_LEVEL) -o ./misctools/fetch $(DEFAULT_LINKER_FLAGS) -lc $(MINIMUM_ARCHIVE_FILES) + $(CXX) $(DEBUG_PACKAGE_DIR)/fetch.o -g $(OPTIMIZATION_LEVEL) -o ./misctools/fetch $(DEBUG_IO_FILES) $(DEFAULT_LINKER_FLAGS) -lc $(MINIMUM_ARCHIVE_FILES) + +.PHONY: machbench-debug +machbench-debug: $(IO_FILES) + $(ZIG) build machbench-obj + $(CXX) $(DEBUG_PACKAGE_DIR)/machbench.o -g $(OPTIMIZATION_LEVEL) -o ./misctools/machbench $(DEBUG_IO_FILES) $(DEFAULT_LINKER_FLAGS) -lc $(MINIMUM_ARCHIVE_FILES) + +.PHONY: machbench +machbench: $(IO_FILES) + $(ZIG) build -Drelease-fast machbench-obj + $(CXX) $(PACKAGE_DIR)/machbench.o -g $(OPTIMIZATION_LEVEL) -o ./misctools/machbench $(IO_FILES) $(DEFAULT_LINKER_FLAGS) -lc $(MINIMUM_ARCHIVE_FILES) + rm -rf $(PACKAGE_DIR)/machbench.o + .PHONY: httpbench-debug -httpbench-debug: +httpbench-debug: $(IO_FILES) $(ZIG) build httpbench-obj - $(CXX) $(DEBUG_PACKAGE_DIR)/httpbench.o -fuse-ld=lld -g -o ./misctools/http_bench $(DEFAULT_LINKER_FLAGS) -lc $(MINIMUM_ARCHIVE_FILES) + $(CXX) $(IO_FILES) $(DEBUG_PACKAGE_DIR)/httpbench.o -g -o ./misctools/http_bench $(DEFAULT_LINKER_FLAGS) -lc $(MINIMUM_ARCHIVE_FILES) .PHONY: httpbench-release -httpbench-release: +httpbench-release: $(IO_FILES) $(ZIG) build -Drelease-fast httpbench-obj - $(CXX) $(PACKAGE_DIR)/httpbench.o -march=native -mtune=native -fuse-ld=lld -g $(OPTIMIZATION_LEVEL) -o ./misctools/http_bench $(DEFAULT_LINKER_FLAGS) -lc $(MINIMUM_ARCHIVE_FILES) + $(CXX) $(PACKAGE_DIR)/httpbench.o -g $(OPTIMIZATION_LEVEL) -o ./misctools/httpbench $(IO_FILES) $(DEFAULT_LINKER_FLAGS) -lc $(MINIMUM_ARCHIVE_FILES) rm -rf $(PACKAGE_DIR)/httpbench.o .PHONY: check-glibc-version-dependency @@ -1288,12 +1304,12 @@ clean: clean-bindings (cd $(BUN_DEPS_DIR)/zlib && make clean) || echo ""; .PHONY: release-bindings -release-bindings: $(OBJ_DIR) $(OBJ_FILES) $(WEBCORE_OBJ_FILES) $(SQLITE_OBJ_FILES) $(NODE_OS_OBJ_FILES) $(BUILTINS_OBJ_FILES) +release-bindings: $(OBJ_DIR) $(OBJ_FILES) $(WEBCORE_OBJ_FILES) $(SQLITE_OBJ_FILES) $(NODE_OS_OBJ_FILES) $(BUILTINS_OBJ_FILES) $(IO_FILES) # Do not add $(DEBUG_DIR) to this list # It will break caching, causing you to have to wait for every .cpp file to rebuild. .PHONY: bindings -bindings: $(DEBUG_OBJ_FILES) $(DEBUG_WEBCORE_OBJ_FILES) $(DEBUG_SQLITE_OBJ_FILES) $(DEBUG_NODE_OS_OBJ_FILES) $(DEBUG_BUILTINS_OBJ_FILES) +bindings: $(DEBUG_OBJ_FILES) $(DEBUG_WEBCORE_OBJ_FILES) $(DEBUG_SQLITE_OBJ_FILES) $(DEBUG_NODE_OS_OBJ_FILES) $(DEBUG_BUILTINS_OBJ_FILES) $(DEBUG_IO_FILES) .PHONY: jsc-bindings-mac jsc-bindings-mac: bindings @@ -1474,6 +1490,16 @@ $(OBJ_DIR)/%.o: $(SRC_DIR)/sqlite/%.cpp $(EMIT_LLVM) \ -c -o $@ $< +$(OBJ_DIR)/%.o: src/io/%.cpp + $(CXX) $(CLANG_FLAGS) \ + $(MACOS_MIN_FLAG) \ + $(OPTIMIZATION_LEVEL) \ + -fno-exceptions \ + -fno-rtti \ + -ferror-limit=1000 \ + $(EMIT_LLVM) \ + -c -o $@ $< + $(OBJ_DIR)/%.o: $(SRC_DIR)/node_os/%.cpp $(CXX) $(CLANG_FLAGS) \ $(MACOS_MIN_FLAG) \ @@ -1518,6 +1544,17 @@ $(DEBUG_OBJ_DIR)/%.o: $(SRC_DIR)/webcore/%.cpp $(EMIT_LLVM_FOR_DEBUG) \ -g3 -c -o $@ $< +$(DEBUG_OBJ_DIR)/%.o: src/io/%.cpp + $(CXX) $(CLANG_FLAGS) \ + $(MACOS_MIN_FLAG) \ + $(DEBUG_OPTIMIZATION_LEVEL) \ + -fno-exceptions \ + -fno-rtti \ + -ferror-limit=1000 \ + $(EMIT_LLVM_FOR_DEBUG) \ + -g3 -c -o $@ $< + + # $(DEBUG_OBJ_DIR) is not included here because it breaks # detecting if a file needs to be rebuilt $(DEBUG_OBJ_DIR)/%.o: $(SRC_DIR)/sqlite/%.cpp diff --git a/bench/snippets/set-timeout.mjs b/bench/snippets/set-timeout.mjs new file mode 100644 index 00000000000000..be7ea9bc43868c --- /dev/null +++ b/bench/snippets/set-timeout.mjs @@ -0,0 +1,16 @@ +import { bench, run } from "../node_modules/mitata/src/cli.mjs"; + +bench("setTimeout(, 4) 100 times", async () => { + var i = 100; + while (--i >= 0) { + await new Promise((resolve, reject) => { + setTimeout(() => { + resolve(); + }, 4); + }); + } +}); + +setTimeout(() => { + run({}).then(() => {}); +}, 1); diff --git a/build.zig b/build.zig index a6518088f760ac..f88281fbedc0a8 100644 --- a/build.zig +++ b/build.zig @@ -463,6 +463,28 @@ pub fn build(b: *std.build.Builder) !void { headers_obj.addOptions("build_options", opts); } + { + const headers_step = b.step("machbench-obj", "Build Machbench tool (object files)"); + var headers_obj: *std.build.LibExeObjStep = b.addObject("machbench", "misctools/machbench.zig"); + defer headers_step.dependOn(&headers_obj.step); + try configureObjectStep(b, headers_obj, target, obj.main_pkg_path.?); + var opts = b.addOptions(); + opts.addOption( + bool, + "bindgen", + false, + ); + + opts.addOption( + bool, + "baseline", + is_baseline, + ); + opts.addOption([:0]const u8, "sha", git_sha); + opts.addOption(bool, "is_canary", is_canary); + headers_obj.addOptions("build_options", opts); + } + { const headers_step = b.step("fetch-obj", "Build fetch (object files)"); var headers_obj: *std.build.LibExeObjStep = b.addObject("fetch", "misctools/fetch.zig"); diff --git a/misctools/machbench.zig b/misctools/machbench.zig new file mode 100644 index 00000000000000..9c486f48ac3e3d --- /dev/null +++ b/misctools/machbench.zig @@ -0,0 +1,137 @@ +// most of this file is copy pasted from other files in misctools +const std = @import("std"); +const bun = @import("../src/global.zig"); +const string = bun.string; +const Output = bun.Output; +const Global = bun.Global; +const Environment = bun.Environment; +const strings = bun.strings; +const MutableString = bun.MutableString; +const stringZ = bun.stringZ; +const default_allocator = bun.default_allocator; +const C = bun.C; +const clap = @import("../src/deps/zig-clap/clap.zig"); +const AsyncIO = @import("io"); + +const URL = @import("../src/url.zig").URL; +const Headers = @import("http").Headers; +const Method = @import("../src/http/method.zig").Method; +const ColonListType = @import("../src/cli/colon_list_type.zig").ColonListType; +const HeadersTuple = ColonListType(string, noop_resolver); +const path_handler = @import("../src/resolver/resolve_path.zig"); +const NetworkThread = @import("http").NetworkThread; +const HTTP = @import("http"); +fn noop_resolver(in: string) !string { + return in; +} + +var waker: AsyncIO.Waker = undefined; + +fn spamMe(count: usize) void { + Output.Source.configureNamedThread("1"); + defer Output.flush(); + var timer = std.time.Timer.start() catch unreachable; + + var i: usize = 0; + while (i < count) : (i += 1) { + waker.wake() catch unreachable; + } + Output.prettyErrorln("[EVFILT_MACHPORT] Sent {any}", .{bun.fmt.fmtDuration(timer.read())}); +} +const thread_count = 1; +pub fn machMain(runs: usize) anyerror!void { + defer Output.flush(); + waker = try AsyncIO.Waker.init(bun.default_allocator); + + var args = try std.process.argsAlloc(bun.default_allocator); + const count = std.fmt.parseInt(usize, args[args.len - 1], 10) catch 1024; + var elapsed: u64 = 0; + + var remaining_runs: usize = runs; + while (remaining_runs > 0) : (remaining_runs -= 1) { + var threads: [thread_count]std.Thread = undefined; + var j: usize = 0; + while (j < thread_count) : (j += 1) { + threads[j] = try std.Thread.spawn(.{}, spamMe, .{count}); + } + + var timer = try std.time.Timer.start(); + var i: usize = 0; + while (i < count * thread_count) : (i += 1) { + i += try waker.wait(); + } + + j = 0; + while (j < thread_count) : (j += 1) { + threads[j].join(); + } + elapsed += timer.read(); + } + + Output.prettyErrorln("[EVFILT_MACHPORT] Recv {any}", .{bun.fmt.fmtDuration(elapsed)}); +} +var user_waker: AsyncIO.UserFilterWaker = undefined; + +fn spamMeUserFilter(count: usize) void { + Output.Source.configureNamedThread("2"); + defer Output.flush(); + var timer = std.time.Timer.start() catch unreachable; + var i: usize = 0; + while (i < count * thread_count) : (i += 1) { + user_waker.wake() catch unreachable; + } + + Output.prettyErrorln("[EVFILT_USER] Sent {any}", .{bun.fmt.fmtDuration(timer.read())}); +} +pub fn userMain(runs: usize) anyerror!void { + defer Output.flush(); + user_waker = try AsyncIO.UserFilterWaker.init(bun.default_allocator); + + var args = try std.process.argsAlloc(bun.default_allocator); + const count = std.fmt.parseInt(usize, args[args.len - 1], 10) catch 1024; + var remaining_runs = runs; + var elapsed: u64 = 0; + + while (remaining_runs > 0) : (remaining_runs -= 1) { + var threads: [thread_count]std.Thread = undefined; + var j: usize = 0; + while (j < thread_count) : (j += 1) { + threads[j] = try std.Thread.spawn(.{}, spamMeUserFilter, .{count}); + } + + var timer = try std.time.Timer.start(); + var i: usize = 0; + while (i < count) { + i += try user_waker.wait(); + } + + j = 0; + while (j < thread_count) : (j += 1) { + threads[j].join(); + } + elapsed += timer.read(); + } + + Output.prettyErrorln("[EVFILT_USER] Recv {any}", .{bun.fmt.fmtDuration(elapsed)}); + Output.flush(); +} + +pub fn main() anyerror!void { + var stdout_ = std.io.getStdOut(); + var stderr_ = std.io.getStdErr(); + var output_source = Output.Source.init(stdout_, stderr_); + Output.Source.set(&output_source); + + var args = try std.process.argsAlloc(bun.default_allocator); + const count = std.fmt.parseInt(usize, args[args.len - 1], 10) catch 1024; + Output.prettyErrorln("For {d} messages and {d} threads:", .{ count, thread_count }); + Output.flush(); + defer Output.flush(); + const runs = if (std.os.getenv("RUNS")) |run_count| try std.fmt.parseInt(usize, run_count, 10) else 1; + + if (std.os.getenv("NO_MACH") == null) + try machMain(runs); + + if (std.os.getenv("NO_USER") == null) + try userMain(runs); +} diff --git a/src/bun.js/event_loop.zig b/src/bun.js/event_loop.zig index 828e7559bf9388..9e60a75395e258 100644 --- a/src/bun.js/event_loop.zig +++ b/src/bun.js/event_loop.zig @@ -291,6 +291,8 @@ pub const Task = TaggedPointerUnion(.{ // TimeoutTasklet, }); +const AsyncIO = @import("io"); + pub const EventLoop = struct { ready_tasks_count: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0), pending_tasks_count: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0), @@ -300,6 +302,8 @@ pub const EventLoop = struct { concurrent_lock: Lock = Lock.init(), global: *JSGlobalObject = undefined, virtual_machine: *VirtualMachine = undefined, + waker: ?AsyncIO.Waker = null, + pub const Queue = std.fifo.LinearFifo(Task, .Dynamic); pub fn tickWithCount(this: *EventLoop) u32 { @@ -465,6 +469,12 @@ pub const EventLoop = struct { this.tasks.writeItem(task) catch unreachable; } + pub fn ensureWaker(this: *EventLoop) void { + if (this.waker == null) { + this.waker = AsyncIO.Waker.init(this.virtual_machine.allocator) catch unreachable; + } + } + pub fn enqueueTaskConcurrent(this: *EventLoop, task: Task) void { this.concurrent_lock.lock(); defer this.concurrent_lock.unlock(); @@ -472,7 +482,12 @@ pub const EventLoop = struct { if (this.virtual_machine.uws_event_loop) |loop| { loop.nextTick(*EventLoop, this, EventLoop.tick); } - _ = this.ready_tasks_count.fetchAdd(1, .Monotonic); + + if (this.ready_tasks_count.fetchAdd(1, .Monotonic) == 0) { + if (this.waker) |waker| { + waker.wake() catch unreachable; + } + } } }; diff --git a/src/bun_js.zig b/src/bun_js.zig index aad8e2a8b312e1..bb5f458f3e5d23 100644 --- a/src/bun_js.zig +++ b/src/bun_js.zig @@ -135,17 +135,25 @@ pub const Run = struct { this.vm.tick(); { - var i: usize = 0; + var any = false; while (this.vm.*.event_loop.pending_tasks_count.loadUnchecked() > 0 or this.vm.active_tasks > 0) { this.vm.tick(); - i +%= 1; - - if (i > 0 and i % 100 == 0) { - std.time.sleep(std.time.ns_per_ms); + any = true; + if (this.vm.active_tasks > 0) { + if (this.vm.event_loop.ready_tasks_count.load(.Monotonic) == 0) { + _ = this.vm.global.vm().runGC(false); + + if (this.vm.event_loop.ready_tasks_count.load(.Monotonic) == 0 and + this.vm.active_tasks > 0) + { + this.vm.event_loop.ensureWaker(); + _ = this.vm.event_loop.waker.?.wait() catch 0; + } + } } } - if (i > 0) { + if (any) { if (this.vm.log.msgs.items.len > 0) { if (Output.enable_ansi_colors) { this.vm.log.printForLogLevelWithEnableAnsiColors(Output.errorWriter(), true) catch {}; diff --git a/src/http_client_async.zig b/src/http_client_async.zig index a31ee237d67745..53c0d8d61cb303 100644 --- a/src/http_client_async.zig +++ b/src/http_client_async.zig @@ -42,11 +42,11 @@ pub fn onThreadStart(_: ?*anyopaque) ?*anyopaque { return null; } -pub fn onThreadStartNew(event_fd: os.fd_t) void { +pub fn onThreadStartNew(waker: AsyncIO.Waker) void { default_arena = Arena.init() catch unreachable; default_allocator = default_arena.allocator(); NetworkThread.address_list_cached = NetworkThread.AddressListCache.init(default_allocator); - AsyncIO.global = AsyncIO.init(1024, 0, event_fd) catch |err| { + AsyncIO.global = AsyncIO.init(1024, 0, waker) catch |err| { log: { if (comptime Environment.isLinux) { if (err == error.SystemOutdated) { @@ -115,16 +115,10 @@ pub fn onThreadStartNew(event_fd: os.fd_t) void { AsyncIO.global_loaded = true; NetworkThread.global.io = &AsyncIO.global; - if (comptime !Environment.isLinux) { - NetworkThread.global.pool.io = &AsyncIO.global; - } - Output.Source.configureNamedThread("HTTP"); AsyncBIO.initBoringSSL(); - if (comptime Environment.isLinux) { - NetworkThread.global.processEvents(); - } + NetworkThread.global.processEvents(); } pub inline fn getAllocator() std.mem.Allocator { diff --git a/src/io/io_darwin.cpp b/src/io/io_darwin.cpp new file mode 100644 index 00000000000000..74a7638123f06d --- /dev/null +++ b/src/io/io_darwin.cpp @@ -0,0 +1,65 @@ + +#ifdef __APPLE__ + +#include + +#include +// errno +#include + +extern "C" mach_port_t io_darwin_create_machport(uint64_t wakeup, int32_t fd, + void *wakeup_buffer_, + size_t nbytes) { + + mach_port_t port; + // Create a Mach port that will be used to wake up the pump + kern_return_t kr = + mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE, &port); + if (kr != KERN_SUCCESS) { + return 0; + } + + // Configure the event to directly receive the Mach message as part of the + // kevent64() call. + kevent64_s event{}; + event.ident = port; + event.filter = EVFILT_MACHPORT; + event.flags = EV_ADD | EV_ENABLE; + event.fflags = MACH_RCV_MSG | MACH_RCV_OVERWRITE; + event.ext[0] = reinterpret_cast(wakeup_buffer_); + event.ext[1] = nbytes; + + while (true) { + int rv = kevent64(fd, &event, 1, NULL, 0, 0, NULL); + if (rv == -1) { + if (errno == EINTR) { + continue; + } + + return 0; + } + + return port; + } +} + +extern "C" bool io_darwin_schedule_wakeup(mach_port_t waker) { + mach_msg_empty_send_t message{}; + message.header.msgh_size = sizeof(message); + message.header.msgh_bits = + MACH_MSGH_BITS_REMOTE(MACH_MSG_TYPE_MAKE_SEND_ONCE); + message.header.msgh_remote_port = waker; + kern_return_t kr = mach_msg_send(&message.header); + if (kr != KERN_SUCCESS) { + // If io_darwin_schedule_wakeup() is being called by other threads faster + // than the pump can dispatch work, the kernel message queue for the wakeup + // port can fill The kernel does return a SEND_ONCE right in the case of + // failure, which must be destroyed to avoid leaking. + mach_msg_destroy(&message.header); + return false; + } + + return true; +} + +#endif \ No newline at end of file diff --git a/src/io/io_darwin.zig b/src/io/io_darwin.zig index 5157b01983cdde..7906dd655072e3 100644 --- a/src/io/io_darwin.zig +++ b/src/io/io_darwin.zig @@ -478,7 +478,6 @@ const Time = @import("./time.zig").Time; const IO = @This(); -kq: os.fd_t, time: Time = .{}, io_inflight: usize = 0, timeouts: FIFO(Completion) = .{}, @@ -486,28 +485,176 @@ completed: FIFO(Completion) = .{}, io_pending: FIFO(Completion) = .{}, last_event_fd: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(32), pending_count: usize = 0, +waker: Waker = undefined, pub fn hasNoWork(this: *IO) bool { return this.pending_count == 0 and this.io_inflight == 0 and this.io_pending.peek() == null and this.completed.peek() == null and this.timeouts.peek() == null; } -pub fn init(_: u12, _: u32, _: anytype) !IO { - const kq = try os.kqueue(); - assert(kq > -1); - return IO{ .kq = kq }; +pub fn init(_: u12, _: u32, waker: Waker) !IO { + return IO{ + .waker = waker, + }; } +pub const Waker = struct { + kq: os.fd_t, + machport: *anyopaque = undefined, + machport_buf: []u8 = &.{}, + + const zeroed = std.mem.zeroes([16]Kevent64); + + pub fn wake(this: Waker) !void { + if (!io_darwin_schedule_wakeup(this.machport)) { + return error.WakeUpFailed; + } + } + + pub fn wait(this: Waker) !usize { + var events = zeroed; + + const count = std.os.system.kevent64( + this.kq, + &events, + 0, + &events, + events.len, + 0, + null, + ); + + if (count < 0) { + return asError(std.c.getErrno(count)); + } + + return @intCast(usize, count); + } + + extern fn io_darwin_create_machport( + *anyopaque, + os.fd_t, + *anyopaque, + usize, + ) ?*anyopaque; + + extern fn io_darwin_schedule_wakeup( + *anyopaque, + ) bool; + + pub fn init(allocator: std.mem.Allocator) !Waker { + const kq = try os.kqueue(); + assert(kq > -1); + var machport_buf = try allocator.alloc(u8, 1024); + const machport = io_darwin_create_machport( + machport_buf.ptr, + kq, + machport_buf.ptr, + 1024, + ) orelse return error.MachportCreationFailed; + + return Waker{ + .kq = kq, + .machport = machport, + .machport_buf = machport_buf, + }; + } +}; + +pub const UserFilterWaker = struct { + kq: os.fd_t, + ident: u64 = undefined, + + pub fn wake(this: UserFilterWaker) !void { + var events = zeroed; + events[0].ident = this.ident; + events[0].filter = c.EVFILT_USER; + events[0].data = 0; + events[0].fflags = c.NOTE_TRIGGER; + events[0].udata = 0; + const errno = std.os.system.kevent64( + this.kq, + &events, + 1, + &events, + events.len, + 0, + null, + ); + + if (errno < 0) { + return asError(std.c.getErrno(errno)); + } + } + + const zeroed = std.mem.zeroes([16]Kevent64); + + pub fn wait(this: UserFilterWaker) !u64 { + var events = zeroed; + events[0].ident = 123; + events[0].filter = c.EVFILT_USER; + events[0].flags = c.EV_ADD | c.EV_ENABLE; + events[0].data = 0; + events[0].udata = 0; + + const errno = std.os.system.kevent64( + this.kq, + &events, + 1, + &events, + events.len, + 0, + null, + ); + if (errno < 0) { + return asError(std.c.getErrno(errno)); + } + + return @intCast(u64, errno); + } + + pub fn init(_: std.mem.Allocator) !UserFilterWaker { + const kq = try os.kqueue(); + assert(kq > -1); + + var events = [1]Kevent64{std.mem.zeroes(Kevent64)}; + events[0].ident = 123; + events[0].filter = c.EVFILT_USER; + events[0].flags = c.EV_ADD | c.EV_ENABLE; + events[0].data = 0; + events[0].udata = 0; + var timespec = default_timespec; + const errno = std.os.system.kevent64( + kq, + &events, + 1, + &events, + @intCast(c_int, events.len), + 0, + ×pec, + ); + + std.debug.assert(errno == 0); + + return UserFilterWaker{ + .kq = kq, + .ident = 123, + }; + } +}; + pub fn deinit(self: *IO) void { - assert(self.kq > -1); - os.close(self.kq); - self.kq = -1; + assert(self.waker.kq > -1); + os.close(self.waker.kq); + self.waker.kq = -1; } /// Pass all queued submissions to the kernel and peek for completions. pub fn tick(self: *IO) !void { - return self.flush(false); + return self.flush(.no_wait); } +const Kevent64 = std.os.system.kevent64_s; + /// Pass all queued submissions to the kernel and run for `nanoseconds`. /// The `nanoseconds` argument is a u63 to allow coercion to the i64 used /// in the __kernel_timespec struct. @@ -536,64 +683,89 @@ pub fn run_for_ns(self: *IO, nanoseconds: u63) !void { // Loop until our timeout completion is processed above, which sets timed_out to true. // LLVM shouldn't be able to cache timed_out's value here since its address escapes above. while (!timed_out) { - try self.flush(true); + try self.flush(.wait_for_completion); } } const default_timespec = std.mem.zeroInit(os.timespec, .{}); -fn flush(self: *IO, wait_for_completions: bool) !void { +pub fn wait(self: *IO, context: anytype, comptime function: anytype) void { + self.flush(.block) catch unreachable; + function(context); +} + +fn flush(self: *IO, comptime _: @Type(.EnumLiteral)) !void { var io_pending = self.io_pending.peek(); - var events: [512]os.Kevent = undefined; + var events: [2048]Kevent64 = undefined; // Check timeouts and fill events with completions in io_pending // (they will be submitted through kevent). // Timeouts are expired here and possibly pushed to the completed queue. const next_timeout = self.flush_timeouts(); - const change_events = self.flush_io(&events, &io_pending); - // Only call kevent() if we need to submit io events or if we need to wait for completions. - if (change_events > 0 or self.completed.peek() == null) { - // Zero timeouts for kevent() implies a non-blocking poll - var ts = default_timespec; - - // We need to wait (not poll) on kevent if there's nothing to submit or complete. - // We should never wait indefinitely (timeout_ptr = null for kevent) given: - // - tick() is non-blocking (wait_for_completions = false) - // - run_for_ns() always submits a timeout - if (change_events == 0 and self.completed.peek() == null) { - if (wait_for_completions) { - const timeout_ns = next_timeout orelse @panic("kevent() blocking forever"); - ts.tv_nsec = @intCast(@TypeOf(ts.tv_nsec), timeout_ns % std.time.ns_per_s); - ts.tv_sec = @intCast(@TypeOf(ts.tv_sec), timeout_ns / std.time.ns_per_s); - } else if (self.io_inflight == 0) { - return; - } - } + // Flush any timeouts + { + var completed = self.completed; + self.completed = .{}; + if (completed.pop()) |first| { + (first.callback)(self, first); - const new_events = try os.kevent( - self.kq, - events[0..change_events], - events[0..events.len], - &ts, - ); + while (completed.pop()) |completion| + (completion.callback)(self, completion); - // Mark the io events submitted only after kevent() successfully processed them - self.io_pending.out = io_pending; - if (io_pending == null) { - self.io_pending.in = null; + return; } + } - self.io_inflight += change_events; - self.io_inflight -= new_events; + const change_events = self.flush_io(&events, &io_pending); - for (events[0..new_events]) |kevent| { - const completion = @intToPtr(*Completion, kevent.udata); - completion.next = null; - self.completed.push(completion); + // Zero timeouts for kevent() implies a non-blocking poll + var ts = default_timespec; + + // We need to wait (not poll) on kevent if there's nothing to submit or complete. + if (next_timeout) |timeout_ns| { + ts.tv_nsec = @intCast(@TypeOf(ts.tv_nsec), timeout_ns % std.time.ns_per_s); + ts.tv_sec = @intCast(@TypeOf(ts.tv_sec), timeout_ns / std.time.ns_per_s); + } + + const new_events_ = std.os.system.kevent64( + self.waker.kq, + &events, + @intCast(c_int, change_events), + &events, + @intCast(c_int, events.len), + 0, + if (next_timeout != null) &ts else null, + ); + + if (new_events_ < 0) { + return std.debug.panic("kevent() failed {s}", .{@tagName(std.c.getErrno(new_events_))}); + } + const new_events = @intCast(usize, new_events_); + + // Mark the io events submitted only after kevent() successfully processed them + self.io_pending.out = io_pending; + if (io_pending == null) { + self.io_pending.in = null; + } + + var new_io_inflight_events = new_events; + self.io_inflight += change_events; + + for (events[0..new_events]) |kevent| { + if (kevent.filter == c.EVFILT_MACHPORT) { + new_io_inflight_events -= 1; + continue; } + + const completion = @intToPtr(*Completion, kevent.udata); + completion.next = null; + self.completed.push(completion); } + // subtract machport events from io_inflight + self.io_inflight -= new_io_inflight_events; + var completed = self.completed; self.completed = .{}; while (completed.pop()) |completion| { @@ -601,7 +773,7 @@ fn flush(self: *IO, wait_for_completions: bool) !void { } } -fn flush_io(_: *IO, events: []os.Kevent, io_pending_top: *?*Completion) usize { +fn flush_io(_: *IO, events: []Kevent64, io_pending_top: *?*Completion) usize { for (events) |*kevent, flushed| { const completion = io_pending_top.* orelse return flushed; io_pending_top.* = completion.next; @@ -645,6 +817,7 @@ fn flush_io(_: *IO, events: []os.Kevent, io_pending_top: *?*Completion) usize { }; kevent.* = .{ + .ext = [2]u64{ 0, 0 }, .ident = @intCast(u32, event_info[0]), .filter = @intCast(i16, event_info[1]), .flags = @intCast(u16, event_info[2]), @@ -660,11 +833,13 @@ fn flush_io(_: *IO, events: []os.Kevent, io_pending_top: *?*Completion) usize { fn flush_timeouts(self: *IO) ?u64 { var min_timeout: ?u64 = null; var timeouts: ?*Completion = self.timeouts.peek(); + + // NOTE: We could cache `now` above the loop but monotonic() should be cheap to call. + const now: u64 = if (timeouts != null) self.time.monotonic() else 0; + while (timeouts) |completion| { timeouts = completion.next; - // NOTE: We could cache `now` above the loop but monotonic() should be cheap to call. - const now = self.time.monotonic(); const expires = completion.operation.timeout.expires; // NOTE: remove() could be O(1) here with a doubly-linked-list @@ -833,35 +1008,6 @@ pub fn eventfd(self: *IO) os.fd_t { return @intCast(os.fd_t, self.last_event_fd.fetchAdd(1, .Monotonic)); } -pub fn triggerEvent(event_fd: os.fd_t, completion: *Completion) !void { - var kevents = [1]os.Kevent{ - .{ - .ident = @intCast(usize, event_fd), - .filter = c.EVFILT_USER, - .flags = c.EV_ADD | c.EV_ENABLE | c.EV_ONESHOT, - .fflags = 0, - .data = 0, - .udata = @ptrToInt(completion), - }, - }; - - var change_events = [1]os.Kevent{ - .{ - .ident = @intCast(usize, event_fd), - .filter = c.EVFILT_USER, - .flags = c.EV_ADD | c.EV_ENABLE | c.EV_ONESHOT, - .fflags = 0, - .data = 0, - .udata = @ptrToInt(completion), - }, - }; - - const ret = try os.kevent(global.kq, &kevents, &change_events, null); - if (ret != 1) { - @panic("failed to trigger event"); - } -} - // -- NOT DONE YET pub fn event( self: *IO, diff --git a/src/io/io_linux.zig b/src/io/io_linux.zig index dfad7e7951b896..af53820530bf6a 100644 --- a/src/io/io_linux.zig +++ b/src/io/io_linux.zig @@ -498,7 +498,7 @@ pub fn wait(this: *@This(), ptr: anytype, comptime onReady: anytype) void { _ = this.ring.enter(submitted, 1, linux.IORING_ENTER_GETEVENTS) catch 0; } -pub fn init(entries_: u12, flags: u32, event_fd: os.fd_t) !IO { +pub fn init(entries_: u12, flags: u32, waker: Waker) !IO { var ring: IO_Uring = undefined; var entries = entries_; @@ -541,7 +541,7 @@ pub fn init(entries_: u12, flags: u32, event_fd: os.fd_t) !IO { break; } - return IO{ .ring = ring, .event_fd = event_fd }; + return IO{ .ring = ring, .event_fd = waker.fd }; } pub fn deinit(self: *IO) void { @@ -984,6 +984,30 @@ pub const Completion = struct { } }; +pub const Waker = struct { + fd: os.fd_t, + + pub fn init(_: std.mem.Allocator) !Waker { + return Waker{ + .fd = try os.eventfd(0, 0), + }; + } + + pub fn wait(this: Waker) !u64 { + var bytes: usize = 0; + _ = std.os.read(this.fd, @ptrCast(*[8]u8, &bytes)) catch 0; + return @intCast(u64, bytes); + } + + pub fn wake(this: Waker) !void { + var bytes: usize = 1; + _ = std.os.write( + this.fd, + @ptrCast(*[8]u8, &bytes), + ) catch 0; + } +}; + /// This union encodes the set of operations supported as well as their arguments. const Operation = union(enum) { accept: struct { diff --git a/src/network_thread.zig b/src/network_thread.zig index 3b48e436e9bd44..aa6fcf15db2e03 100644 --- a/src/network_thread.zig +++ b/src/network_thread.zig @@ -14,10 +14,9 @@ const Lock = @import("./lock.zig").Lock; const FIFO = @import("./io/fifo.zig").FIFO; /// Single-thread in this pool -pool: ThreadPool, io: *AsyncIO = undefined, thread: std.Thread = undefined, -event_fd: std.os.fd_t = 0, +waker: AsyncIO.Waker = undefined, queued_tasks_mutex: Lock = Lock.init(), queued_tasks: Batch = .{}, processing_tasks: Batch = .{}, @@ -44,11 +43,6 @@ pub fn processEvents(this: *@This()) void { } /// Should only be called on the HTTP thread! fn processEvents_(this: *@This()) !void { - { - var bytes: [8]u8 = undefined; - _ = std.os.read(this.event_fd, &bytes) catch 0; - } - while (true) { this.queueEvents(); @@ -82,20 +76,20 @@ fn processEvents_(this: *@This()) !void { } pub fn schedule(this: *@This(), batch: Batch) void { - if (comptime Environment.isLinux) { - if (batch.len == 0) - return; + if (batch.len == 0) + return; - { - this.queued_tasks_mutex.lock(); - defer this.queued_tasks_mutex.unlock(); - this.queued_tasks.push(batch); - } + { + this.queued_tasks_mutex.lock(); + defer this.queued_tasks_mutex.unlock(); + this.queued_tasks.push(batch); + } + if (comptime Environment.isLinux) { const one = @bitCast([8]u8, @as(usize, batch.len)); - _ = std.os.write(this.event_fd, &one) catch @panic("Failed to write to eventfd"); + _ = std.os.write(this.waker.fd, &one) catch @panic("Failed to write to eventfd"); } else { - this.pool.schedule(batch); + this.waker.wake() catch @panic("Failed to wake"); } } @@ -151,8 +145,6 @@ pub fn warmup() !void { if (has_warmed or global_loaded.load(.Monotonic) > 0) return; has_warmed = true; try init(); - if (comptime !Environment.isLinux) - global.pool.forceSpawn(); } pub fn init() !void { @@ -160,16 +152,20 @@ pub fn init() !void { AsyncIO.global_loaded = true; global = NetworkThread{ - .pool = ThreadPool.init(.{ .max_threads = 1, .stack_size = 64 * 1024 * 1024 }), .timer = try std.time.Timer.start(), }; - global.pool.on_thread_spawn = HTTP.onThreadStart; + if (comptime Environment.isLinux) { - const event_fd = try std.os.eventfd(0, std.os.linux.EFD.CLOEXEC | 0); - global.event_fd = event_fd; - global.thread = try std.Thread.spawn(.{ .stack_size = 64 * 1024 * 1024 }, HTTP.onThreadStartNew, .{ - @intCast(std.os.fd_t, event_fd), - }); - global.thread.detach(); + const fd = try std.os.eventfd(0, std.os.linux.EFD.CLOEXEC | 0); + global.waker = .{ .fd = fd }; + } else if (comptime Environment.isMac) { + global.waker = try AsyncIO.Waker.init(@import("./global.zig").default_allocator); + } else { + @compileLog("TODO: Waker"); } + + global.thread = try std.Thread.spawn(.{ .stack_size = 64 * 1024 * 1024 }, HTTP.onThreadStartNew, .{ + global.waker, + }); + global.thread.detach(); } diff --git a/test/bun.js/ffi.test.fixture.callback.c b/test/bun.js/ffi.test.fixture.callback.c index f6e74f8bab21a9..7de63120f53e42 100644 --- a/test/bun.js/ffi.test.fixture.callback.c +++ b/test/bun.js/ffi.test.fixture.callback.c @@ -111,9 +111,9 @@ static bool JSVALUE_IS_CELL(EncodedJSValue val) __attribute__((__always_inline__ static bool JSVALUE_IS_INT32(EncodedJSValue val) __attribute__((__always_inline__)); static bool JSVALUE_IS_NUMBER(EncodedJSValue val) __attribute__((__always_inline__)); -static uint64_t JSVALUE_TO_UINT64(void* globalObject, EncodedJSValue value) __attribute__((__always_inline__)); +static uint64_t JSVALUE_TO_UINT64(EncodedJSValue value) __attribute__((__always_inline__)); static int64_t JSVALUE_TO_INT64(EncodedJSValue value) __attribute__((__always_inline__)); -uint64_t JSVALUE_TO_UINT64_SLOW(void* globalObject, EncodedJSValue value); +uint64_t JSVALUE_TO_UINT64_SLOW(EncodedJSValue value); int64_t JSVALUE_TO_INT64_SLOW(EncodedJSValue value); EncodedJSValue UINT64_TO_JSVALUE_SLOW(void* globalObject, uint64_t val); @@ -207,7 +207,7 @@ static bool JSVALUE_TO_BOOL(EncodedJSValue val) { } -static uint64_t JSVALUE_TO_UINT64(void* globalObject, EncodedJSValue value) { +static uint64_t JSVALUE_TO_UINT64(EncodedJSValue value) { if (JSVALUE_IS_INT32(value)) { return (uint64_t)JSVALUE_TO_INT32(value); } @@ -216,7 +216,7 @@ static uint64_t JSVALUE_TO_UINT64(void* globalObject, EncodedJSValue value) { return (uint64_t)JSVALUE_TO_DOUBLE(value); } - return JSVALUE_TO_UINT64_SLOW(globalObject, value); + return JSVALUE_TO_UINT64_SLOW(value); } static int64_t JSVALUE_TO_INT64(EncodedJSValue value) { if (JSVALUE_IS_INT32(value)) { @@ -268,6 +268,9 @@ ZIG_REPR_TYPE JSFunctionCall(void* globalObject, void* callFrame); bool my_callback_function(void* arg0); bool my_callback_function(void* arg0) { +#ifdef INJECT_BEFORE +INJECT_BEFORE; +#endif EncodedJSValue arguments[1] = { PTR_TO_JSVALUE(arg0) }; diff --git a/test/bun.js/ffi.test.fixture.receiver.c b/test/bun.js/ffi.test.fixture.receiver.c index 3e2939a26c6755..550d64b8191a15 100644 --- a/test/bun.js/ffi.test.fixture.receiver.c +++ b/test/bun.js/ffi.test.fixture.receiver.c @@ -112,9 +112,9 @@ static bool JSVALUE_IS_CELL(EncodedJSValue val) __attribute__((__always_inline__ static bool JSVALUE_IS_INT32(EncodedJSValue val) __attribute__((__always_inline__)); static bool JSVALUE_IS_NUMBER(EncodedJSValue val) __attribute__((__always_inline__)); -static uint64_t JSVALUE_TO_UINT64(void* globalObject, EncodedJSValue value) __attribute__((__always_inline__)); +static uint64_t JSVALUE_TO_UINT64(EncodedJSValue value) __attribute__((__always_inline__)); static int64_t JSVALUE_TO_INT64(EncodedJSValue value) __attribute__((__always_inline__)); -uint64_t JSVALUE_TO_UINT64_SLOW(void* globalObject, EncodedJSValue value); +uint64_t JSVALUE_TO_UINT64_SLOW(EncodedJSValue value); int64_t JSVALUE_TO_INT64_SLOW(EncodedJSValue value); EncodedJSValue UINT64_TO_JSVALUE_SLOW(void* globalObject, uint64_t val); @@ -208,7 +208,7 @@ static bool JSVALUE_TO_BOOL(EncodedJSValue val) { } -static uint64_t JSVALUE_TO_UINT64(void* globalObject, EncodedJSValue value) { +static uint64_t JSVALUE_TO_UINT64(EncodedJSValue value) { if (JSVALUE_IS_INT32(value)) { return (uint64_t)JSVALUE_TO_INT32(value); } @@ -217,7 +217,7 @@ static uint64_t JSVALUE_TO_UINT64(void* globalObject, EncodedJSValue value) { return (uint64_t)JSVALUE_TO_DOUBLE(value); } - return JSVALUE_TO_UINT64_SLOW(globalObject, value); + return JSVALUE_TO_UINT64_SLOW(value); } static int64_t JSVALUE_TO_INT64(EncodedJSValue value) { if (JSVALUE_IS_INT32(value)) {