Skip to content

Commit

Permalink
M:N threading
Browse files Browse the repository at this point in the history
 * add std.atomic.QueueMpsc.isEmpty
 * make std.debug.global_allocator thread-safe
 * std.event.Loop: now you have to choose between
   - initSingleThreaded
   - initMultiThreaded
 * std.event.Loop multiplexes coroutines onto kernel threads
 * Remove std.event.Loop.stop. Instead the event loop run() function
   returns once there are no pending coroutines.
 * fix crash in ir.cpp for calling methods under some conditions
 * small progress self-hosted compiler, analyzing top level declarations
 * Introduce std.event.Lock for synchronizing coroutines
 * introduce std.event.Locked(T) for data that only 1 coroutine should
   modify at once.
 * make the self hosted compiler use multi threaded event loop
 * make std.heap.DirectAllocator thread-safe

See #174

TODO:
 * call sched_getaffinity instead of hard coding thread pool size 4
 * support for Windows and MacOS
 * #1194
 * #1197
  • Loading branch information
andrewrk committed Jul 7, 2018
1 parent d8295c1 commit eb326e1
Show file tree
Hide file tree
Showing 10 changed files with 833 additions and 114 deletions.
5 changes: 2 additions & 3 deletions src-self-hosted/main.zig
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,8 @@ fn buildOutputType(allocator: *Allocator, args: []const []const u8, out_type: Mo
const zig_lib_dir = introspect.resolveZigLibDir(allocator) catch os.exit(1);
defer allocator.free(zig_lib_dir);

var loop = try event.Loop.init(allocator);
var loop: event.Loop = undefined;
try loop.initMultiThreaded(allocator);

var module = try Module.create(
&loop,
Expand Down Expand Up @@ -493,8 +494,6 @@ async fn processBuildEvents(module: *Module, watch: bool) void {
switch (build_event) {
Module.Event.Ok => {
std.debug.warn("Build succeeded\n");
// for now we stop after 1
module.loop.stop();
return;
},
Module.Event.Error => |err| {
Expand Down
257 changes: 242 additions & 15 deletions src-self-hosted/module.zig
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ const std = @import("std");
const os = std.os;
const io = std.io;
const mem = std.mem;
const Allocator = mem.Allocator;
const Buffer = std.Buffer;
const llvm = @import("llvm.zig");
const c = @import("c.zig");
Expand All @@ -13,6 +14,7 @@ const ArrayList = std.ArrayList;
const errmsg = @import("errmsg.zig");
const ast = std.zig.ast;
const event = std.event;
const assert = std.debug.assert;

pub const Module = struct {
loop: *event.Loop,
Expand Down Expand Up @@ -81,6 +83,8 @@ pub const Module = struct {
link_out_file: ?[]const u8,
events: *event.Channel(Event),

exported_symbol_names: event.Locked(Decl.Table),

// TODO handle some of these earlier and report them in a way other than error codes
pub const BuildError = error{
OutOfMemory,
Expand Down Expand Up @@ -232,6 +236,7 @@ pub const Module = struct {
.test_name_prefix = null,
.emit_file_type = Emit.Binary,
.link_out_file = null,
.exported_symbol_names = event.Locked(Decl.Table).init(loop, Decl.Table.init(loop.allocator)),
});
}

Expand Down Expand Up @@ -272,38 +277,91 @@ pub const Module = struct {
return;
};
await (async self.events.put(Event.Ok) catch unreachable);
// for now we stop after 1
return;
}
}

async fn addRootSrc(self: *Module) !void {
const root_src_path = self.root_src_path orelse @panic("TODO handle null root src path");
// TODO async/await os.path.real
const root_src_real_path = os.path.real(self.a(), root_src_path) catch |err| {
try printError("unable to get real path '{}': {}", root_src_path, err);
return err;
};
errdefer self.a().free(root_src_real_path);

// TODO async/await readFileAlloc()
const source_code = io.readFileAlloc(self.a(), root_src_real_path) catch |err| {
try printError("unable to open '{}': {}", root_src_real_path, err);
return err;
};
errdefer self.a().free(source_code);

var tree = try std.zig.parse(self.a(), source_code);
defer tree.deinit();

//var it = tree.root_node.decls.iterator();
//while (it.next()) |decl_ptr| {
// const decl = decl_ptr.*;
// switch (decl.id) {
// ast.Node.Comptime => @panic("TODO"),
// ast.Node.VarDecl => @panic("TODO"),
// ast.Node.UseDecl => @panic("TODO"),
// ast.Node.FnDef => @panic("TODO"),
// ast.Node.TestDecl => @panic("TODO"),
// else => unreachable,
// }
//}
var parsed_file = ParsedFile{
.tree = try std.zig.parse(self.a(), source_code),
.realpath = root_src_real_path,
};
errdefer parsed_file.tree.deinit();

const tree = &parsed_file.tree;

// create empty struct for it
const decls = try Scope.Decls.create(self.a(), null);
errdefer decls.destroy();

var it = tree.root_node.decls.iterator(0);
while (it.next()) |decl_ptr| {
const decl = decl_ptr.*;
switch (decl.id) {
ast.Node.Id.Comptime => @panic("TODO"),
ast.Node.Id.VarDecl => @panic("TODO"),
ast.Node.Id.FnProto => {
const fn_proto = @fieldParentPtr(ast.Node.FnProto, "base", decl);

const name = if (fn_proto.name_token) |name_token| tree.tokenSlice(name_token) else {
@panic("TODO add compile error");
//try self.addCompileError(
// &parsed_file,
// fn_proto.fn_token,
// fn_proto.fn_token + 1,
// "missing function name",
//);
continue;
};

const fn_decl = try self.a().create(Decl.Fn{
.base = Decl{
.id = Decl.Id.Fn,
.name = name,
.visib = parseVisibToken(tree, fn_proto.visib_token),
.resolution = Decl.Resolution.Unresolved,
},
.value = Decl.Fn.Val{ .Unresolved = {} },
.fn_proto = fn_proto,
});
errdefer self.a().destroy(fn_decl);

// TODO make this parallel
try await try async self.addTopLevelDecl(tree, &fn_decl.base);
},
ast.Node.Id.TestDecl => @panic("TODO"),
else => unreachable,
}
}
}

async fn addTopLevelDecl(self: *Module, tree: *ast.Tree, decl: *Decl) !void {
const is_export = decl.isExported(tree);

{
const exported_symbol_names = await try async self.exported_symbol_names.acquire();
defer exported_symbol_names.release();

if (try exported_symbol_names.value.put(decl.name, decl)) |other_decl| {
@panic("TODO report compile error");
}
}
}

pub fn link(self: *Module, out_file: ?[]const u8) !void {
Expand Down Expand Up @@ -350,3 +408,172 @@ fn printError(comptime format: []const u8, args: ...) !void {
const out_stream = &stderr_file_out_stream.stream;
try out_stream.print(format, args);
}

fn parseVisibToken(tree: *ast.Tree, optional_token_index: ?ast.TokenIndex) Visib {
if (optional_token_index) |token_index| {
const token = tree.tokens.at(token_index);
assert(token.id == Token.Id.Keyword_pub);
return Visib.Pub;
} else {
return Visib.Private;
}
}

pub const Scope = struct {
id: Id,
parent: ?*Scope,

pub const Id = enum {
Decls,
Block,
};

pub const Decls = struct {
base: Scope,
table: Decl.Table,

pub fn create(a: *Allocator, parent: ?*Scope) !*Decls {
const self = try a.create(Decls{
.base = Scope{
.id = Id.Decls,
.parent = parent,
},
.table = undefined,
});
errdefer a.destroy(self);

self.table = Decl.Table.init(a);
errdefer self.table.deinit();

return self;
}

pub fn destroy(self: *Decls) void {
self.table.deinit();
self.table.allocator.destroy(self);
self.* = undefined;
}
};

pub const Block = struct {
base: Scope,
};
};

pub const Visib = enum {
Private,
Pub,
};

pub const Decl = struct {
id: Id,
name: []const u8,
visib: Visib,
resolution: Resolution,

pub const Table = std.HashMap([]const u8, *Decl, mem.hash_slice_u8, mem.eql_slice_u8);

pub fn isExported(base: *const Decl, tree: *ast.Tree) bool {
switch (base.id) {
Id.Fn => {
const fn_decl = @fieldParentPtr(Fn, "base", base);
return fn_decl.isExported(tree);
},
else => return false,
}
}

pub const Resolution = enum {
Unresolved,
InProgress,
Invalid,
Ok,
};

pub const Id = enum {
Var,
Fn,
CompTime,
};

pub const Var = struct {
base: Decl,
};

pub const Fn = struct {
base: Decl,
value: Val,
fn_proto: *const ast.Node.FnProto,

// TODO https://github.com/ziglang/zig/issues/683 and then make this anonymous
pub const Val = union {
Unresolved: void,
Ok: *Value.Fn,
};

pub fn externLibName(self: Fn, tree: *ast.Tree) ?[]const u8 {
return if (self.fn_proto.extern_export_inline_token) |tok_index| x: {
const token = tree.tokens.at(tok_index);
break :x switch (token.id) {
Token.Id.Extern => tree.tokenSlicePtr(token),
else => null,
};
} else null;
}

pub fn isExported(self: Fn, tree: *ast.Tree) bool {
if (self.fn_proto.extern_export_inline_token) |tok_index| {
const token = tree.tokens.at(tok_index);
return token.id == Token.Id.Keyword_export;
} else {
return false;
}
}
};

pub const CompTime = struct {
base: Decl,
};
};

pub const Value = struct {
pub const Fn = struct {};
};

pub const Type = struct {
id: Id,

pub const Id = enum {
Type,
Void,
Bool,
NoReturn,
Int,
Float,
Pointer,
Array,
Struct,
ComptimeFloat,
ComptimeInt,
Undefined,
Null,
Optional,
ErrorUnion,
ErrorSet,
Enum,
Union,
Fn,
Opaque,
Promise,
};

pub const Struct = struct {
base: Type,
decls: *Scope.Decls,
};
};

pub const ParsedFile = struct {
tree: ast.Tree,
realpath: []const u8,
};
2 changes: 1 addition & 1 deletion src/ir.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13278,7 +13278,7 @@ static TypeTableEntry *ir_analyze_instruction_call(IrAnalyze *ira, IrInstruction
FnTableEntry *fn_table_entry = fn_ref->value.data.x_bound_fn.fn;
IrInstruction *first_arg_ptr = fn_ref->value.data.x_bound_fn.first_arg;
return ir_analyze_fn_call(ira, call_instruction, fn_table_entry, fn_table_entry->type_entry,
nullptr, first_arg_ptr, is_comptime, call_instruction->fn_inline);
fn_ref, first_arg_ptr, is_comptime, call_instruction->fn_inline);
} else {
ir_add_error_node(ira, fn_ref->source_node,
buf_sprintf("type '%s' not a function", buf_ptr(&fn_ref->value.type->name)));
Expand Down
17 changes: 17 additions & 0 deletions std/atomic/queue_mpsc.zig
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ pub fn QueueMpsc(comptime T: type) type {

pub const Node = std.atomic.Stack(T).Node;

/// Not thread-safe. The call to init() must complete before any other functions are called.
/// No deinitialization required.
pub fn init() Self {
return Self{
.inboxes = []std.atomic.Stack(T){
Expand All @@ -26,12 +28,15 @@ pub fn QueueMpsc(comptime T: type) type {
};
}

/// Fully thread-safe. put() may be called from any thread at any time.
pub fn put(self: *Self, node: *Node) void {
const inbox_index = @atomicLoad(usize, &self.inbox_index, AtomicOrder.SeqCst);
const inbox = &self.inboxes[inbox_index];
inbox.push(node);
}

/// Must be called by only 1 consumer at a time. Every call to get() and isEmpty() must complete before
/// the next call to get().
pub fn get(self: *Self) ?*Node {
if (self.outbox.pop()) |node| {
return node;
Expand All @@ -43,6 +48,18 @@ pub fn QueueMpsc(comptime T: type) type {
}
return self.outbox.pop();
}

/// Must be called by only 1 consumer at a time. Every call to get() and isEmpty() must complete before
/// the next call to isEmpty().
pub fn isEmpty(self: *Self) bool {
if (!self.outbox.isEmpty()) return false;
const prev_inbox_index = @atomicRmw(usize, &self.inbox_index, AtomicRmwOp.Xor, 0x1, AtomicOrder.SeqCst);
const prev_inbox = &self.inboxes[prev_inbox_index];
while (prev_inbox.pop()) |node| {
self.outbox.push(node);
}
return self.outbox.isEmpty();
}
};
}

Expand Down
Loading

0 comments on commit eb326e1

Please sign in to comment.