From eb326e15530dd6dca4ccbe7dbfde7bf048de813e Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Thu, 5 Jul 2018 15:09:02 -0400 Subject: [PATCH] M:N threading * add std.atomic.QueueMpsc.isEmpty * make std.debug.global_allocator thread-safe * std.event.Loop: now you have to choose between - initSingleThreaded - initMultiThreaded * std.event.Loop multiplexes coroutines onto kernel threads * Remove std.event.Loop.stop. Instead the event loop run() function returns once there are no pending coroutines. * fix crash in ir.cpp for calling methods under some conditions * small progress self-hosted compiler, analyzing top level declarations * Introduce std.event.Lock for synchronizing coroutines * introduce std.event.Locked(T) for data that only 1 coroutine should modify at once. * make the self hosted compiler use multi threaded event loop * make std.heap.DirectAllocator thread-safe See #174 TODO: * call sched_getaffinity instead of hard coding thread pool size 4 * support for Windows and MacOS * #1194 * #1197 --- src-self-hosted/main.zig | 5 +- src-self-hosted/module.zig | 257 +++++++++++++++- src/ir.cpp | 2 +- std/atomic/queue_mpsc.zig | 17 ++ std/debug/index.zig | 7 +- std/event.zig | 580 ++++++++++++++++++++++++++++++++----- std/heap.zig | 30 +- std/mem.zig | 2 +- std/os/index.zig | 39 ++- std/os/linux/index.zig | 8 + 10 files changed, 833 insertions(+), 114 deletions(-) diff --git a/src-self-hosted/main.zig b/src-self-hosted/main.zig index d17fc94c82a0..fe94a4460a4f 100644 --- a/src-self-hosted/main.zig +++ b/src-self-hosted/main.zig @@ -384,7 +384,8 @@ fn buildOutputType(allocator: *Allocator, args: []const []const u8, out_type: Mo const zig_lib_dir = introspect.resolveZigLibDir(allocator) catch os.exit(1); defer allocator.free(zig_lib_dir); - var loop = try event.Loop.init(allocator); + var loop: event.Loop = undefined; + try loop.initMultiThreaded(allocator); var module = try Module.create( &loop, @@ -493,8 +494,6 @@ async fn processBuildEvents(module: *Module, watch: bool) void { switch (build_event) { Module.Event.Ok => { std.debug.warn("Build succeeded\n"); - // for now we stop after 1 - module.loop.stop(); return; }, Module.Event.Error => |err| { diff --git a/src-self-hosted/module.zig b/src-self-hosted/module.zig index cf27c826c8d3..5ce1a7965a93 100644 --- a/src-self-hosted/module.zig +++ b/src-self-hosted/module.zig @@ -2,6 +2,7 @@ const std = @import("std"); const os = std.os; const io = std.io; const mem = std.mem; +const Allocator = mem.Allocator; const Buffer = std.Buffer; const llvm = @import("llvm.zig"); const c = @import("c.zig"); @@ -13,6 +14,7 @@ const ArrayList = std.ArrayList; const errmsg = @import("errmsg.zig"); const ast = std.zig.ast; const event = std.event; +const assert = std.debug.assert; pub const Module = struct { loop: *event.Loop, @@ -81,6 +83,8 @@ pub const Module = struct { link_out_file: ?[]const u8, events: *event.Channel(Event), + exported_symbol_names: event.Locked(Decl.Table), + // TODO handle some of these earlier and report them in a way other than error codes pub const BuildError = error{ OutOfMemory, @@ -232,6 +236,7 @@ pub const Module = struct { .test_name_prefix = null, .emit_file_type = Emit.Binary, .link_out_file = null, + .exported_symbol_names = event.Locked(Decl.Table).init(loop, Decl.Table.init(loop.allocator)), }); } @@ -272,38 +277,91 @@ pub const Module = struct { return; }; await (async self.events.put(Event.Ok) catch unreachable); + // for now we stop after 1 + return; } } async fn addRootSrc(self: *Module) !void { const root_src_path = self.root_src_path orelse @panic("TODO handle null root src path"); + // TODO async/await os.path.real const root_src_real_path = os.path.real(self.a(), root_src_path) catch |err| { try printError("unable to get real path '{}': {}", root_src_path, err); return err; }; errdefer self.a().free(root_src_real_path); + // TODO async/await readFileAlloc() const source_code = io.readFileAlloc(self.a(), root_src_real_path) catch |err| { try printError("unable to open '{}': {}", root_src_real_path, err); return err; }; errdefer self.a().free(source_code); - var tree = try std.zig.parse(self.a(), source_code); - defer tree.deinit(); - - //var it = tree.root_node.decls.iterator(); - //while (it.next()) |decl_ptr| { - // const decl = decl_ptr.*; - // switch (decl.id) { - // ast.Node.Comptime => @panic("TODO"), - // ast.Node.VarDecl => @panic("TODO"), - // ast.Node.UseDecl => @panic("TODO"), - // ast.Node.FnDef => @panic("TODO"), - // ast.Node.TestDecl => @panic("TODO"), - // else => unreachable, - // } - //} + var parsed_file = ParsedFile{ + .tree = try std.zig.parse(self.a(), source_code), + .realpath = root_src_real_path, + }; + errdefer parsed_file.tree.deinit(); + + const tree = &parsed_file.tree; + + // create empty struct for it + const decls = try Scope.Decls.create(self.a(), null); + errdefer decls.destroy(); + + var it = tree.root_node.decls.iterator(0); + while (it.next()) |decl_ptr| { + const decl = decl_ptr.*; + switch (decl.id) { + ast.Node.Id.Comptime => @panic("TODO"), + ast.Node.Id.VarDecl => @panic("TODO"), + ast.Node.Id.FnProto => { + const fn_proto = @fieldParentPtr(ast.Node.FnProto, "base", decl); + + const name = if (fn_proto.name_token) |name_token| tree.tokenSlice(name_token) else { + @panic("TODO add compile error"); + //try self.addCompileError( + // &parsed_file, + // fn_proto.fn_token, + // fn_proto.fn_token + 1, + // "missing function name", + //); + continue; + }; + + const fn_decl = try self.a().create(Decl.Fn{ + .base = Decl{ + .id = Decl.Id.Fn, + .name = name, + .visib = parseVisibToken(tree, fn_proto.visib_token), + .resolution = Decl.Resolution.Unresolved, + }, + .value = Decl.Fn.Val{ .Unresolved = {} }, + .fn_proto = fn_proto, + }); + errdefer self.a().destroy(fn_decl); + + // TODO make this parallel + try await try async self.addTopLevelDecl(tree, &fn_decl.base); + }, + ast.Node.Id.TestDecl => @panic("TODO"), + else => unreachable, + } + } + } + + async fn addTopLevelDecl(self: *Module, tree: *ast.Tree, decl: *Decl) !void { + const is_export = decl.isExported(tree); + + { + const exported_symbol_names = await try async self.exported_symbol_names.acquire(); + defer exported_symbol_names.release(); + + if (try exported_symbol_names.value.put(decl.name, decl)) |other_decl| { + @panic("TODO report compile error"); + } + } } pub fn link(self: *Module, out_file: ?[]const u8) !void { @@ -350,3 +408,172 @@ fn printError(comptime format: []const u8, args: ...) !void { const out_stream = &stderr_file_out_stream.stream; try out_stream.print(format, args); } + +fn parseVisibToken(tree: *ast.Tree, optional_token_index: ?ast.TokenIndex) Visib { + if (optional_token_index) |token_index| { + const token = tree.tokens.at(token_index); + assert(token.id == Token.Id.Keyword_pub); + return Visib.Pub; + } else { + return Visib.Private; + } +} + +pub const Scope = struct { + id: Id, + parent: ?*Scope, + + pub const Id = enum { + Decls, + Block, + }; + + pub const Decls = struct { + base: Scope, + table: Decl.Table, + + pub fn create(a: *Allocator, parent: ?*Scope) !*Decls { + const self = try a.create(Decls{ + .base = Scope{ + .id = Id.Decls, + .parent = parent, + }, + .table = undefined, + }); + errdefer a.destroy(self); + + self.table = Decl.Table.init(a); + errdefer self.table.deinit(); + + return self; + } + + pub fn destroy(self: *Decls) void { + self.table.deinit(); + self.table.allocator.destroy(self); + self.* = undefined; + } + }; + + pub const Block = struct { + base: Scope, + }; +}; + +pub const Visib = enum { + Private, + Pub, +}; + +pub const Decl = struct { + id: Id, + name: []const u8, + visib: Visib, + resolution: Resolution, + + pub const Table = std.HashMap([]const u8, *Decl, mem.hash_slice_u8, mem.eql_slice_u8); + + pub fn isExported(base: *const Decl, tree: *ast.Tree) bool { + switch (base.id) { + Id.Fn => { + const fn_decl = @fieldParentPtr(Fn, "base", base); + return fn_decl.isExported(tree); + }, + else => return false, + } + } + + pub const Resolution = enum { + Unresolved, + InProgress, + Invalid, + Ok, + }; + + pub const Id = enum { + Var, + Fn, + CompTime, + }; + + pub const Var = struct { + base: Decl, + }; + + pub const Fn = struct { + base: Decl, + value: Val, + fn_proto: *const ast.Node.FnProto, + + // TODO https://github.com/ziglang/zig/issues/683 and then make this anonymous + pub const Val = union { + Unresolved: void, + Ok: *Value.Fn, + }; + + pub fn externLibName(self: Fn, tree: *ast.Tree) ?[]const u8 { + return if (self.fn_proto.extern_export_inline_token) |tok_index| x: { + const token = tree.tokens.at(tok_index); + break :x switch (token.id) { + Token.Id.Extern => tree.tokenSlicePtr(token), + else => null, + }; + } else null; + } + + pub fn isExported(self: Fn, tree: *ast.Tree) bool { + if (self.fn_proto.extern_export_inline_token) |tok_index| { + const token = tree.tokens.at(tok_index); + return token.id == Token.Id.Keyword_export; + } else { + return false; + } + } + }; + + pub const CompTime = struct { + base: Decl, + }; +}; + +pub const Value = struct { + pub const Fn = struct {}; +}; + +pub const Type = struct { + id: Id, + + pub const Id = enum { + Type, + Void, + Bool, + NoReturn, + Int, + Float, + Pointer, + Array, + Struct, + ComptimeFloat, + ComptimeInt, + Undefined, + Null, + Optional, + ErrorUnion, + ErrorSet, + Enum, + Union, + Fn, + Opaque, + Promise, + }; + + pub const Struct = struct { + base: Type, + decls: *Scope.Decls, + }; +}; + +pub const ParsedFile = struct { + tree: ast.Tree, + realpath: []const u8, +}; diff --git a/src/ir.cpp b/src/ir.cpp index 98b1bd85ad1a..3fc8306339a9 100644 --- a/src/ir.cpp +++ b/src/ir.cpp @@ -13278,7 +13278,7 @@ static TypeTableEntry *ir_analyze_instruction_call(IrAnalyze *ira, IrInstruction FnTableEntry *fn_table_entry = fn_ref->value.data.x_bound_fn.fn; IrInstruction *first_arg_ptr = fn_ref->value.data.x_bound_fn.first_arg; return ir_analyze_fn_call(ira, call_instruction, fn_table_entry, fn_table_entry->type_entry, - nullptr, first_arg_ptr, is_comptime, call_instruction->fn_inline); + fn_ref, first_arg_ptr, is_comptime, call_instruction->fn_inline); } else { ir_add_error_node(ira, fn_ref->source_node, buf_sprintf("type '%s' not a function", buf_ptr(&fn_ref->value.type->name))); diff --git a/std/atomic/queue_mpsc.zig b/std/atomic/queue_mpsc.zig index 8030565d7ae1..bc0a94258b48 100644 --- a/std/atomic/queue_mpsc.zig +++ b/std/atomic/queue_mpsc.zig @@ -15,6 +15,8 @@ pub fn QueueMpsc(comptime T: type) type { pub const Node = std.atomic.Stack(T).Node; + /// Not thread-safe. The call to init() must complete before any other functions are called. + /// No deinitialization required. pub fn init() Self { return Self{ .inboxes = []std.atomic.Stack(T){ @@ -26,12 +28,15 @@ pub fn QueueMpsc(comptime T: type) type { }; } + /// Fully thread-safe. put() may be called from any thread at any time. pub fn put(self: *Self, node: *Node) void { const inbox_index = @atomicLoad(usize, &self.inbox_index, AtomicOrder.SeqCst); const inbox = &self.inboxes[inbox_index]; inbox.push(node); } + /// Must be called by only 1 consumer at a time. Every call to get() and isEmpty() must complete before + /// the next call to get(). pub fn get(self: *Self) ?*Node { if (self.outbox.pop()) |node| { return node; @@ -43,6 +48,18 @@ pub fn QueueMpsc(comptime T: type) type { } return self.outbox.pop(); } + + /// Must be called by only 1 consumer at a time. Every call to get() and isEmpty() must complete before + /// the next call to isEmpty(). + pub fn isEmpty(self: *Self) bool { + if (!self.outbox.isEmpty()) return false; + const prev_inbox_index = @atomicRmw(usize, &self.inbox_index, AtomicRmwOp.Xor, 0x1, AtomicOrder.SeqCst); + const prev_inbox = &self.inboxes[prev_inbox_index]; + while (prev_inbox.pop()) |node| { + self.outbox.push(node); + } + return self.outbox.isEmpty(); + } }; } diff --git a/std/debug/index.zig b/std/debug/index.zig index 57b2dfc30025..a5e1c313f03d 100644 --- a/std/debug/index.zig +++ b/std/debug/index.zig @@ -11,6 +11,11 @@ const builtin = @import("builtin"); pub const FailingAllocator = @import("failing_allocator.zig").FailingAllocator; +pub const runtime_safety = switch (builtin.mode) { + builtin.Mode.Debug, builtin.Mode.ReleaseSafe => true, + builtin.Mode.ReleaseFast, builtin.Mode.ReleaseSmall => false, +}; + /// Tries to write to stderr, unbuffered, and ignores any error returned. /// Does not append a newline. /// TODO atomic/multithread support @@ -1098,7 +1103,7 @@ fn readILeb128(in_stream: var) !i64 { /// This should only be used in temporary test programs. pub const global_allocator = &global_fixed_allocator.allocator; -var global_fixed_allocator = std.heap.FixedBufferAllocator.init(global_allocator_mem[0..]); +var global_fixed_allocator = std.heap.ThreadSafeFixedBufferAllocator.init(global_allocator_mem[0..]); var global_allocator_mem: [100 * 1024]u8 = undefined; // TODO make thread safe diff --git a/std/event.zig b/std/event.zig index c6ac04a9d030..2d69d0cb1643 100644 --- a/std/event.zig +++ b/std/event.zig @@ -11,53 +11,69 @@ pub const TcpServer = struct { handleRequestFn: async<*mem.Allocator> fn (*TcpServer, *const std.net.Address, *const std.os.File) void, loop: *Loop, - sockfd: i32, + sockfd: ?i32, accept_coro: ?promise, listen_address: std.net.Address, waiting_for_emfile_node: PromiseNode, + listen_resume_node: event.Loop.ResumeNode, const PromiseNode = std.LinkedList(promise).Node; - pub fn init(loop: *Loop) !TcpServer { - const sockfd = try std.os.posixSocket(posix.AF_INET, posix.SOCK_STREAM | posix.SOCK_CLOEXEC | posix.SOCK_NONBLOCK, posix.PROTO_tcp); - errdefer std.os.close(sockfd); - + pub fn init(loop: *Loop) TcpServer { // TODO can't initialize handler coroutine here because we need well defined copy elision return TcpServer{ .loop = loop, - .sockfd = sockfd, + .sockfd = null, .accept_coro = null, .handleRequestFn = undefined, .waiting_for_emfile_node = undefined, .listen_address = undefined, + .listen_resume_node = event.Loop.ResumeNode{ + .id = event.Loop.ResumeNode.Id.Basic, + .handle = undefined, + }, }; } - pub fn listen(self: *TcpServer, address: *const std.net.Address, handleRequestFn: async<*mem.Allocator> fn (*TcpServer, *const std.net.Address, *const std.os.File) void) !void { + pub fn listen( + self: *TcpServer, + address: *const std.net.Address, + handleRequestFn: async<*mem.Allocator> fn (*TcpServer, *const std.net.Address, *const std.os.File) void, + ) !void { self.handleRequestFn = handleRequestFn; - try std.os.posixBind(self.sockfd, &address.os_addr); - try std.os.posixListen(self.sockfd, posix.SOMAXCONN); - self.listen_address = std.net.Address.initPosix(try std.os.posixGetSockName(self.sockfd)); + const sockfd = try std.os.posixSocket(posix.AF_INET, posix.SOCK_STREAM | posix.SOCK_CLOEXEC | posix.SOCK_NONBLOCK, posix.PROTO_tcp); + errdefer std.os.close(sockfd); + self.sockfd = sockfd; + + try std.os.posixBind(sockfd, &address.os_addr); + try std.os.posixListen(sockfd, posix.SOMAXCONN); + self.listen_address = std.net.Address.initPosix(try std.os.posixGetSockName(sockfd)); self.accept_coro = try async TcpServer.handler(self); errdefer cancel self.accept_coro.?; - try self.loop.addFd(self.sockfd, self.accept_coro.?); - errdefer self.loop.removeFd(self.sockfd); + self.listen_resume_node.handle = self.accept_coro.?; + try self.loop.addFd(sockfd, &self.listen_resume_node); + errdefer self.loop.removeFd(sockfd); + } + + /// Stop listening + pub fn close(self: *TcpServer) void { + self.loop.removeFd(self.sockfd.?); + std.os.close(self.sockfd.?); } pub fn deinit(self: *TcpServer) void { - self.loop.removeFd(self.sockfd); if (self.accept_coro) |accept_coro| cancel accept_coro; - std.os.close(self.sockfd); + if (self.sockfd) |sockfd| std.os.close(sockfd); } pub async fn handler(self: *TcpServer) void { while (true) { var accepted_addr: std.net.Address = undefined; - if (std.os.posixAccept(self.sockfd, &accepted_addr.os_addr, posix.SOCK_NONBLOCK | posix.SOCK_CLOEXEC)) |accepted_fd| { + if (std.os.posixAccept(self.sockfd.?, &accepted_addr.os_addr, posix.SOCK_NONBLOCK | posix.SOCK_CLOEXEC)) |accepted_fd| { var socket = std.os.File.openHandle(accepted_fd); _ = async self.handleRequestFn(self, accepted_addr, socket) catch |err| switch (err) { error.OutOfMemory => { @@ -95,32 +111,65 @@ pub const TcpServer = struct { pub const Loop = struct { allocator: *mem.Allocator, - keep_running: bool, next_tick_queue: std.atomic.QueueMpsc(promise), os_data: OsData, + dispatch_lock: u8, // TODO make this a bool + pending_event_count: usize, + extra_threads: []*std.os.Thread, + final_resume_node: ResumeNode, - const OsData = switch (builtin.os) { - builtin.Os.linux => struct { - epollfd: i32, - }, - else => struct {}, + pub const NextTickNode = std.atomic.QueueMpsc(promise).Node; + + pub const ResumeNode = struct { + id: Id, + handle: promise, + + pub const Id = enum { + Basic, + Stop, + EventFd, + }; + + pub const EventFd = struct { + base: ResumeNode, + eventfd: i32, + }; }; - pub const NextTickNode = std.atomic.QueueMpsc(promise).Node; + /// After initialization, call run(). + /// TODO copy elision / named return values so that the threads referencing *Loop + /// have the correct pointer value. + fn initSingleThreaded(self: *Loop, allocator: *mem.Allocator) !void { + return self.initInternal(allocator, 1); + } /// The allocator must be thread-safe because we use it for multiplexing /// coroutines onto kernel threads. - pub fn init(allocator: *mem.Allocator) !Loop { - var self = Loop{ - .keep_running = true, + /// After initialization, call run(). + /// TODO copy elision / named return values so that the threads referencing *Loop + /// have the correct pointer value. + fn initMultiThreaded(self: *Loop, allocator: *mem.Allocator) !void { + // TODO check the actual cpu core count + return self.initInternal(allocator, 4); + } + + /// Thread count is the total thread count. The thread pool size will be + /// max(thread_count - 1, 0) + fn initInternal(self: *Loop, allocator: *mem.Allocator, thread_count: usize) !void { + self.* = Loop{ + .pending_event_count = 0, .allocator = allocator, .os_data = undefined, .next_tick_queue = std.atomic.QueueMpsc(promise).init(), + .dispatch_lock = 1, // start locked so threads go directly into epoll wait + .extra_threads = undefined, + .final_resume_node = ResumeNode{ + .id = ResumeNode.Id.Stop, + .handle = undefined, + }, }; - try self.initOsData(); + try self.initOsData(thread_count); errdefer self.deinitOsData(); - - return self; } /// must call stop before deinit @@ -128,13 +177,70 @@ pub const Loop = struct { self.deinitOsData(); } - const InitOsDataError = std.os.LinuxEpollCreateError; + const InitOsDataError = std.os.LinuxEpollCreateError || mem.Allocator.Error || std.os.LinuxEventFdError || + std.os.SpawnThreadError || std.os.LinuxEpollCtlError; + + const wakeup_bytes = []u8{0x1} ** 8; - fn initOsData(self: *Loop) InitOsDataError!void { + fn initOsData(self: *Loop, thread_count: usize) InitOsDataError!void { switch (builtin.os) { builtin.Os.linux => { - self.os_data.epollfd = try std.os.linuxEpollCreate(std.os.linux.EPOLL_CLOEXEC); + const extra_thread_count = thread_count - 1; + self.os_data.available_eventfd_resume_nodes = std.atomic.Stack(ResumeNode.EventFd).init(); + self.os_data.eventfd_resume_nodes = try self.allocator.alloc( + std.atomic.Stack(ResumeNode.EventFd).Node, + extra_thread_count, + ); + errdefer self.allocator.free(self.os_data.eventfd_resume_nodes); + + errdefer { + while (self.os_data.available_eventfd_resume_nodes.pop()) |node| std.os.close(node.data.eventfd); + } + for (self.os_data.eventfd_resume_nodes) |*eventfd_node| { + eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{ + .data = ResumeNode.EventFd{ + .base = ResumeNode{ + .id = ResumeNode.Id.EventFd, + .handle = undefined, + }, + .eventfd = try std.os.linuxEventFd(1, posix.EFD_CLOEXEC | posix.EFD_NONBLOCK), + }, + .next = undefined, + }; + self.os_data.available_eventfd_resume_nodes.push(eventfd_node); + } + + self.os_data.epollfd = try std.os.linuxEpollCreate(posix.EPOLL_CLOEXEC); errdefer std.os.close(self.os_data.epollfd); + + self.os_data.final_eventfd = try std.os.linuxEventFd(0, posix.EFD_CLOEXEC | posix.EFD_NONBLOCK); + errdefer std.os.close(self.os_data.final_eventfd); + + self.os_data.final_eventfd_event = posix.epoll_event{ + .events = posix.EPOLLIN, + .data = posix.epoll_data{ .ptr = @ptrToInt(&self.final_resume_node) }, + }; + try std.os.linuxEpollCtl( + self.os_data.epollfd, + posix.EPOLL_CTL_ADD, + self.os_data.final_eventfd, + &self.os_data.final_eventfd_event, + ); + self.extra_threads = try self.allocator.alloc(*std.os.Thread, extra_thread_count); + errdefer self.allocator.free(self.extra_threads); + + var extra_thread_index: usize = 0; + errdefer { + while (extra_thread_index != 0) { + extra_thread_index -= 1; + // writing 8 bytes to an eventfd cannot fail + std.os.posixWrite(self.os_data.final_eventfd, wakeup_bytes) catch unreachable; + self.extra_threads[extra_thread_index].wait(); + } + } + while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) { + self.extra_threads[extra_thread_index] = try std.os.spawnThread(self, workerRun); + } }, else => {}, } @@ -142,65 +248,154 @@ pub const Loop = struct { fn deinitOsData(self: *Loop) void { switch (builtin.os) { - builtin.Os.linux => std.os.close(self.os_data.epollfd), + builtin.Os.linux => { + std.os.close(self.os_data.final_eventfd); + while (self.os_data.available_eventfd_resume_nodes.pop()) |node| std.os.close(node.data.eventfd); + std.os.close(self.os_data.epollfd); + self.allocator.free(self.os_data.eventfd_resume_nodes); + self.allocator.free(self.extra_threads); + }, else => {}, } } - pub fn addFd(self: *Loop, fd: i32, prom: promise) !void { + /// resume_node must live longer than the promise that it holds a reference to. + pub fn addFd(self: *Loop, fd: i32, resume_node: *ResumeNode) !void { + _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst); + errdefer { + _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); + } + try self.addFdNoCounter(fd, resume_node); + } + + fn addFdNoCounter(self: *Loop, fd: i32, resume_node: *ResumeNode) !void { var ev = std.os.linux.epoll_event{ .events = std.os.linux.EPOLLIN | std.os.linux.EPOLLOUT | std.os.linux.EPOLLET, - .data = std.os.linux.epoll_data{ .ptr = @ptrToInt(prom) }, + .data = std.os.linux.epoll_data{ .ptr = @ptrToInt(resume_node) }, }; try std.os.linuxEpollCtl(self.os_data.epollfd, std.os.linux.EPOLL_CTL_ADD, fd, &ev); } pub fn removeFd(self: *Loop, fd: i32) void { + self.removeFdNoCounter(fd); + _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); + } + + fn removeFdNoCounter(self: *Loop, fd: i32) void { std.os.linuxEpollCtl(self.os_data.epollfd, std.os.linux.EPOLL_CTL_DEL, fd, undefined) catch {}; } - async fn waitFd(self: *Loop, fd: i32) !void { + + pub async fn waitFd(self: *Loop, fd: i32) !void { defer self.removeFd(fd); + var resume_node = ResumeNode{ + .id = ResumeNode.Id.Basic, + .handle = undefined, + }; suspend |p| { - try self.addFd(fd, p); + resume_node.handle = p; + try self.addFd(fd, &resume_node); } + var a = &resume_node; // TODO better way to explicitly put memory in coro frame } - pub fn stop(self: *Loop) void { - // TODO make atomic - self.keep_running = false; - // TODO activate an fd in the epoll set which should cancel all the promises - } - - /// bring your own linked list node. this means it can't fail. + /// Bring your own linked list node. This means it can't fail. pub fn onNextTick(self: *Loop, node: *NextTickNode) void { + _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst); self.next_tick_queue.put(node); } pub fn run(self: *Loop) void { - while (self.keep_running) { - // TODO multiplex the next tick queue and the epoll event results onto a thread pool - while (self.next_tick_queue.get()) |node| { - resume node.data; - } - if (!self.keep_running) break; - - self.dispatchOsEvents(); + _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst); + self.workerRun(); + for (self.extra_threads) |extra_thread| { + extra_thread.wait(); } } - fn dispatchOsEvents(self: *Loop) void { - switch (builtin.os) { - builtin.Os.linux => { - var events: [16]std.os.linux.epoll_event = undefined; - const count = std.os.linuxEpollWait(self.os_data.epollfd, events[0..], -1); - for (events[0..count]) |ev| { - const p = @intToPtr(promise, ev.data.ptr); - resume p; + fn workerRun(self: *Loop) void { + start_over: while (true) { + if (@atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst) == 0) { + while (self.next_tick_queue.get()) |next_tick_node| { + const handle = next_tick_node.data; + if (self.next_tick_queue.isEmpty()) { + // last node, just resume it + _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst); + resume handle; + _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); + continue :start_over; + } + + // non-last node, stick it in the epoll set so that + // other threads can get to it + if (self.os_data.available_eventfd_resume_nodes.pop()) |resume_stack_node| { + const eventfd_node = &resume_stack_node.data; + eventfd_node.base.handle = handle; + // the pending count is already accounted for + self.addFdNoCounter(eventfd_node.eventfd, &eventfd_node.base) catch |_| { + // fine, we didn't need it anyway + _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst); + self.os_data.available_eventfd_resume_nodes.push(resume_stack_node); + resume handle; + _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); + continue :start_over; + }; + } else { + // threads are too busy, can't add another eventfd to wake one up + _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst); + resume handle; + _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); + continue :start_over; + } } - }, - else => {}, + + const pending_event_count = @atomicLoad(usize, &self.pending_event_count, AtomicOrder.SeqCst); + if (pending_event_count == 0) { + // cause all the threads to stop + // writing 8 bytes to an eventfd cannot fail + std.os.posixWrite(self.os_data.final_eventfd, wakeup_bytes) catch unreachable; + return; + } + + _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst); + } + + // only process 1 event so we don't steal from other threads + var events: [1]std.os.linux.epoll_event = undefined; + const count = std.os.linuxEpollWait(self.os_data.epollfd, events[0..], -1); + for (events[0..count]) |ev| { + const resume_node = @intToPtr(*ResumeNode, ev.data.ptr); + const handle = resume_node.handle; + const resume_node_id = resume_node.id; + switch (resume_node_id) { + ResumeNode.Id.Basic => {}, + ResumeNode.Id.Stop => return, + ResumeNode.Id.EventFd => { + const event_fd_node = @fieldParentPtr(ResumeNode.EventFd, "base", resume_node); + self.removeFdNoCounter(event_fd_node.eventfd); + const stack_node = @fieldParentPtr(std.atomic.Stack(ResumeNode.EventFd).Node, "data", event_fd_node); + self.os_data.available_eventfd_resume_nodes.push(stack_node); + }, + } + resume handle; + if (resume_node_id == ResumeNode.Id.EventFd) { + _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); + } + } } } + + const OsData = switch (builtin.os) { + builtin.Os.linux => struct { + epollfd: i32, + // pre-allocated eventfds. all permanently active. + // this is how we send promises to be resumed on other threads. + available_eventfd_resume_nodes: std.atomic.Stack(ResumeNode.EventFd), + eventfd_resume_nodes: []std.atomic.Stack(ResumeNode.EventFd).Node, + final_eventfd: i32, + final_eventfd_event: posix.epoll_event, + }, + else => struct {}, + }; }; /// many producer, many consumer, thread-safe, lock-free, runtime configurable buffer size @@ -304,9 +499,7 @@ pub fn Channel(comptime T: type) type { // TODO integrate this function with named return values // so we can get rid of this extra result copy var result: T = undefined; - var debug_handle: usize = undefined; suspend |handle| { - debug_handle = @ptrToInt(handle); var my_tick_node = Loop.NextTickNode{ .next = undefined, .data = handle, @@ -438,9 +631,8 @@ test "listen on a port, send bytes, receive bytes" { const self = @fieldParentPtr(Self, "tcp_server", tcp_server); var socket = _socket.*; // TODO https://github.com/ziglang/zig/issues/733 defer socket.close(); - const next_handler = async errorableHandler(self, _addr, socket) catch |err| switch (err) { - error.OutOfMemory => @panic("unable to handle connection: out of memory"), - }; + // TODO guarantee elision of this allocation + const next_handler = async errorableHandler(self, _addr, socket) catch unreachable; (await next_handler) catch |err| { std.debug.panic("unable to handle connection: {}\n", err); }; @@ -461,17 +653,18 @@ test "listen on a port, send bytes, receive bytes" { const ip4addr = std.net.parseIp4("127.0.0.1") catch unreachable; const addr = std.net.Address.initIp4(ip4addr, 0); - var loop = try Loop.init(std.debug.global_allocator); - var server = MyServer{ .tcp_server = try TcpServer.init(&loop) }; + var loop: Loop = undefined; + try loop.initSingleThreaded(std.debug.global_allocator); + var server = MyServer{ .tcp_server = TcpServer.init(&loop) }; defer server.tcp_server.deinit(); try server.tcp_server.listen(addr, MyServer.handler); - const p = try async doAsyncTest(&loop, server.tcp_server.listen_address); + const p = try async doAsyncTest(&loop, server.tcp_server.listen_address, &server.tcp_server); defer cancel p; loop.run(); } -async fn doAsyncTest(loop: *Loop, address: *const std.net.Address) void { +async fn doAsyncTest(loop: *Loop, address: *const std.net.Address, server: *TcpServer) void { errdefer @panic("test failure"); var socket_file = try await try async event.connect(loop, address); @@ -481,7 +674,7 @@ async fn doAsyncTest(loop: *Loop, address: *const std.net.Address) void { const amt_read = try socket_file.read(buf[0..]); const msg = buf[0..amt_read]; assert(mem.eql(u8, msg, "hello from server\n")); - loop.stop(); + server.close(); } test "std.event.Channel" { @@ -490,7 +683,9 @@ test "std.event.Channel" { const allocator = &da.allocator; - var loop = try Loop.init(allocator); + var loop: Loop = undefined; + // TODO make a multi threaded test + try loop.initSingleThreaded(allocator); defer loop.deinit(); const channel = try Channel(i32).create(&loop, 0); @@ -515,11 +710,248 @@ async fn testChannelGetter(loop: *Loop, channel: *Channel(i32)) void { const value2_promise = try async channel.get(); const value2 = await value2_promise; assert(value2 == 4567); - - loop.stop(); } async fn testChannelPutter(channel: *Channel(i32)) void { await (async channel.put(1234) catch @panic("out of memory")); await (async channel.put(4567) catch @panic("out of memory")); } + +/// Thread-safe async/await lock. +/// Does not make any syscalls - coroutines which are waiting for the lock are suspended, and +/// are resumed when the lock is released, in order. +pub const Lock = struct { + loop: *Loop, + shared_bit: u8, // TODO make this a bool + queue: Queue, + queue_empty_bit: u8, // TODO make this a bool + + const Queue = std.atomic.QueueMpsc(promise); + + pub const Held = struct { + lock: *Lock, + + pub fn release(self: Held) void { + // Resume the next item from the queue. + if (self.lock.queue.get()) |node| { + self.lock.loop.onNextTick(node); + return; + } + + // We need to release the lock. + _ = @atomicRmw(u8, &self.lock.queue_empty_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst); + _ = @atomicRmw(u8, &self.lock.shared_bit, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst); + + // There might be a queue item. If we know the queue is empty, we can be done, + // because the other actor will try to obtain the lock. + // But if there's a queue item, we are the actor which must loop and attempt + // to grab the lock again. + if (@atomicLoad(u8, &self.lock.queue_empty_bit, AtomicOrder.SeqCst) == 1) { + return; + } + + while (true) { + const old_bit = @atomicRmw(u8, &self.lock.shared_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst); + if (old_bit != 0) { + // We did not obtain the lock. Great, the queue is someone else's problem. + return; + } + + // Resume the next item from the queue. + if (self.lock.queue.get()) |node| { + self.lock.loop.onNextTick(node); + return; + } + + // Release the lock again. + _ = @atomicRmw(u8, &self.lock.queue_empty_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst); + _ = @atomicRmw(u8, &self.lock.shared_bit, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst); + + // Find out if we can be done. + if (@atomicLoad(u8, &self.lock.queue_empty_bit, AtomicOrder.SeqCst) == 1) { + return; + } + } + } + }; + + pub fn init(loop: *Loop) Lock { + return Lock{ + .loop = loop, + .shared_bit = 0, + .queue = Queue.init(), + .queue_empty_bit = 1, + }; + } + + /// Must be called when not locked. Not thread safe. + /// All calls to acquire() and release() must complete before calling deinit(). + pub fn deinit(self: *Lock) void { + assert(self.shared_bit == 0); + while (self.queue.get()) |node| cancel node.data; + } + + pub async fn acquire(self: *Lock) Held { + var my_tick_node: Loop.NextTickNode = undefined; + + s: suspend |handle| { + my_tick_node.data = handle; + self.queue.put(&my_tick_node); + + // At this point, we are in the queue, so we might have already been resumed and this coroutine + // frame might be destroyed. For the rest of the suspend block we cannot access the coroutine frame. + + // We set this bit so that later we can rely on the fact, that if queue_empty_bit is 1, some actor + // will attempt to grab the lock. + _ = @atomicRmw(u8, &self.queue_empty_bit, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst); + + while (true) { + const old_bit = @atomicRmw(u8, &self.shared_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst); + if (old_bit != 0) { + // We did not obtain the lock. Trust that our queue entry will resume us, and allow + // suspend to complete. + break; + } + // We got the lock. However we might have already been resumed from the queue. + if (self.queue.get()) |node| { + // Whether this node is us or someone else, we tail resume it. + resume node.data; + break; + } else { + // We already got resumed, and there are none left in the queue, which means that + // we aren't even supposed to hold the lock right now. + _ = @atomicRmw(u8, &self.queue_empty_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst); + _ = @atomicRmw(u8, &self.shared_bit, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst); + + // There might be a queue item. If we know the queue is empty, we can be done, + // because the other actor will try to obtain the lock. + // But if there's a queue item, we are the actor which must loop and attempt + // to grab the lock again. + if (@atomicLoad(u8, &self.queue_empty_bit, AtomicOrder.SeqCst) == 1) { + break; + } else { + continue; + } + } + unreachable; + } + } + + // TODO this workaround to force my_tick_node to be in the coroutine frame should + // not be necessary + var trash1 = &my_tick_node; + + return Held{ .lock = self }; + } +}; + +/// Thread-safe async/await lock that protects one piece of data. +/// Does not make any syscalls - coroutines which are waiting for the lock are suspended, and +/// are resumed when the lock is released, in order. +pub fn Locked(comptime T: type) type { + return struct { + lock: Lock, + private_data: T, + + const Self = this; + + pub const HeldLock = struct { + value: *T, + held: Lock.Held, + + pub fn release(self: HeldLock) void { + self.held.release(); + } + }; + + pub fn init(loop: *Loop, data: T) Self { + return Self{ + .lock = Lock.init(loop), + .private_data = data, + }; + } + + pub fn deinit(self: *Self) void { + self.lock.deinit(); + } + + pub async fn acquire(self: *Self) HeldLock { + return HeldLock{ + // TODO guaranteed allocation elision + .held = await (async self.lock.acquire() catch unreachable), + .value = &self.private_data, + }; + } + }; +} + +test "std.event.Lock" { + var da = std.heap.DirectAllocator.init(); + defer da.deinit(); + + const allocator = &da.allocator; + + var loop: Loop = undefined; + try loop.initMultiThreaded(allocator); + defer loop.deinit(); + + var lock = Lock.init(&loop); + defer lock.deinit(); + + const handle = try async testLock(&loop, &lock); + defer cancel handle; + loop.run(); + + assert(mem.eql(i32, shared_test_data, [1]i32{3 * 10} ** 10)); +} + +async fn testLock(loop: *Loop, lock: *Lock) void { + const handle1 = async lockRunner(lock) catch @panic("out of memory"); + var tick_node1 = Loop.NextTickNode{ + .next = undefined, + .data = handle1, + }; + loop.onNextTick(&tick_node1); + + const handle2 = async lockRunner(lock) catch @panic("out of memory"); + var tick_node2 = Loop.NextTickNode{ + .next = undefined, + .data = handle2, + }; + loop.onNextTick(&tick_node2); + + const handle3 = async lockRunner(lock) catch @panic("out of memory"); + var tick_node3 = Loop.NextTickNode{ + .next = undefined, + .data = handle3, + }; + loop.onNextTick(&tick_node3); + + await handle1; + await handle2; + await handle3; + + // TODO this is to force tick node memory to be in the coro frame + // there should be a way to make it explicit where the memory is + var a = &tick_node1; + var b = &tick_node2; + var c = &tick_node3; +} + +var shared_test_data = [1]i32{0} ** 10; +var shared_test_index: usize = 0; + +async fn lockRunner(lock: *Lock) void { + suspend; // resumed by onNextTick + + var i: usize = 0; + while (i < 10) : (i += 1) { + const handle = await (async lock.acquire() catch @panic("out of memory")); + defer handle.release(); + + shared_test_index = 0; + while (shared_test_index < shared_test_data.len) : (shared_test_index += 1) { + shared_test_data[shared_test_index] = shared_test_data[shared_test_index] + 1; + } + } +} diff --git a/std/heap.zig b/std/heap.zig index 2e02733da1cd..bcace34afe38 100644 --- a/std/heap.zig +++ b/std/heap.zig @@ -38,7 +38,7 @@ fn cFree(self: *Allocator, old_mem: []u8) void { } /// This allocator makes a syscall directly for every allocation and free. -/// TODO make this thread-safe. The windows implementation will need some atomics. +/// Thread-safe and lock-free. pub const DirectAllocator = struct { allocator: Allocator, heap_handle: ?HeapHandle, @@ -74,34 +74,34 @@ pub const DirectAllocator = struct { const alloc_size = if (alignment <= os.page_size) n else n + alignment; const addr = p.mmap(null, alloc_size, p.PROT_READ | p.PROT_WRITE, p.MAP_PRIVATE | p.MAP_ANONYMOUS, -1, 0); if (addr == p.MAP_FAILED) return error.OutOfMemory; - if (alloc_size == n) return @intToPtr([*]u8, addr)[0..n]; - var aligned_addr = addr & ~usize(alignment - 1); - aligned_addr += alignment; + const aligned_addr = (addr & ~usize(alignment - 1)) + alignment; - //We can unmap the unused portions of our mmap, but we must only - // pass munmap bytes that exist outside our allocated pages or it - // will happily eat us too + // We can unmap the unused portions of our mmap, but we must only + // pass munmap bytes that exist outside our allocated pages or it + // will happily eat us too. - //Since alignment > page_size, we are by definition on a page boundry + // Since alignment > page_size, we are by definition on a page boundary. const unused_start = addr; const unused_len = aligned_addr - 1 - unused_start; - var err = p.munmap(unused_start, unused_len); - debug.assert(p.getErrno(err) == 0); + const err = p.munmap(unused_start, unused_len); + assert(p.getErrno(err) == 0); - //It is impossible that there is an unoccupied page at the top of our - // mmap. + // It is impossible that there is an unoccupied page at the top of our + // mmap. return @intToPtr([*]u8, aligned_addr)[0..n]; }, Os.windows => { const amt = n + alignment + @sizeOf(usize); - const heap_handle = self.heap_handle orelse blk: { + const optional_heap_handle = @atomicLoad(?HeapHandle, ?self.heap_handle, builtin.AtomicOrder.SeqCst); + const heap_handle = optional_heap_handle orelse blk: { const hh = os.windows.HeapCreate(os.windows.HEAP_NO_SERIALIZE, amt, 0) orelse return error.OutOfMemory; - self.heap_handle = hh; - break :blk hh; + const other_hh = @cmpxchgStrong(?HeapHandle, &self.heap_handle, null, hh, builtin.AtomicOrder.SeqCst, builtin.AtomicOrder.SeqCst) orelse break :blk hh; + _ = os.windows.HeapDestroy(hh); + break :blk other_hh; }; const ptr = os.windows.HeapAlloc(heap_handle, 0, amt) orelse return error.OutOfMemory; const root_addr = @ptrToInt(ptr); diff --git a/std/mem.zig b/std/mem.zig index b52d3e9f68d3..555e1e249db3 100644 --- a/std/mem.zig +++ b/std/mem.zig @@ -6,7 +6,7 @@ const builtin = @import("builtin"); const mem = this; pub const Allocator = struct { - const Error = error{OutOfMemory}; + pub const Error = error{OutOfMemory}; /// Allocate byte_count bytes and return them in a slice, with the /// slice's pointer aligned at least to alignment bytes. diff --git a/std/os/index.zig b/std/os/index.zig index 52b36c351cbf..74a1b64f6ea1 100644 --- a/std/os/index.zig +++ b/std/os/index.zig @@ -2309,6 +2309,30 @@ pub fn linuxEpollWait(epfd: i32, events: []linux.epoll_event, timeout: i32) usiz } } +pub const LinuxEventFdError = error{ + InvalidFlagValue, + SystemResources, + ProcessFdQuotaExceeded, + SystemFdQuotaExceeded, + + Unexpected, +}; + +pub fn linuxEventFd(initval: u32, flags: u32) LinuxEventFdError!i32 { + const rc = posix.eventfd(initval, flags); + const err = posix.getErrno(rc); + switch (err) { + 0 => return @intCast(i32, rc), + else => return unexpectedErrorPosix(err), + + posix.EINVAL => return LinuxEventFdError.InvalidFlagValue, + posix.EMFILE => return LinuxEventFdError.ProcessFdQuotaExceeded, + posix.ENFILE => return LinuxEventFdError.SystemFdQuotaExceeded, + posix.ENODEV => return LinuxEventFdError.SystemResources, + posix.ENOMEM => return LinuxEventFdError.SystemResources, + } +} + pub const PosixGetSockNameError = error{ /// Insufficient resources were available in the system to perform the operation. SystemResources, @@ -2605,10 +2629,17 @@ pub fn spawnThread(context: var, comptime startFn: var) SpawnThreadError!*Thread const MainFuncs = struct { extern fn linuxThreadMain(ctx_addr: usize) u8 { - if (@sizeOf(Context) == 0) { - return startFn({}); - } else { - return startFn(@intToPtr(*const Context, ctx_addr).*); + const arg = if (@sizeOf(Context) == 0) {} else @intToPtr(*const Context, ctx_addr).*; + + switch (@typeId(@typeOf(startFn).ReturnType)) { + builtin.TypeId.Int => { + return startFn(arg); + }, + builtin.TypeId.Void => { + startFn(arg); + return 0; + }, + else => @compileError("expected return type of startFn to be 'u8', 'noreturn', 'void', or '!void'"), } } extern fn posixThreadMain(ctx: ?*c_void) ?*c_void { diff --git a/std/os/linux/index.zig b/std/os/linux/index.zig index 65aa659c8201..1c15be4887b3 100644 --- a/std/os/linux/index.zig +++ b/std/os/linux/index.zig @@ -523,6 +523,10 @@ pub const CLONE_NEWPID = 0x20000000; pub const CLONE_NEWNET = 0x40000000; pub const CLONE_IO = 0x80000000; +pub const EFD_SEMAPHORE = 1; +pub const EFD_CLOEXEC = O_CLOEXEC; +pub const EFD_NONBLOCK = O_NONBLOCK; + pub const MS_RDONLY = 1; pub const MS_NOSUID = 2; pub const MS_NODEV = 4; @@ -1221,6 +1225,10 @@ pub fn epoll_wait(epoll_fd: i32, events: [*]epoll_event, maxevents: u32, timeout return syscall4(SYS_epoll_wait, @intCast(usize, epoll_fd), @ptrToInt(events), @intCast(usize, maxevents), @intCast(usize, timeout)); } +pub fn eventfd(count: u32, flags: u32) usize { + return syscall2(SYS_eventfd2, count, flags); +} + pub fn timerfd_create(clockid: i32, flags: u32) usize { return syscall2(SYS_timerfd_create, @intCast(usize, clockid), @intCast(usize, flags)); }