Skip to content

Commit

Permalink
fixed logger, changes to Channel, added mpmc (WIP)
Browse files Browse the repository at this point in the history
  • Loading branch information
ultd committed Aug 16, 2023
1 parent da9b490 commit ff28ae5
Show file tree
Hide file tree
Showing 9 changed files with 927 additions and 81 deletions.
2 changes: 1 addition & 1 deletion build.zig
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ pub fn build(b: *std.Build) void {
// This declares intent for the executable to be installed into the
// standard location when the user invokes the "install" step (the default
// step when running `zig build`).
b.installArtifact(exe);
// b.installArtifact(exe);

// This *creates* a Run step in the build graph, to be executed when another
// step is evaluated that depends on it. The next line below will establish
Expand Down
3 changes: 2 additions & 1 deletion src/cmd/cmd.zig
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ fn identity(_: []const []const u8) !void {

// gossip entrypoint
fn gossip(_: []const []const u8) !void {
var logger = Logger.init(gpa, .debug);
var arena = std.heap.ArenaAllocator.init(gpa);
var logger = Logger.init(arena.allocator(), .debug);
logger.spawn();

var gossip_port: u16 = @intCast(gossip_port_option.value.int.?);
Expand Down
1 change: 1 addition & 0 deletions src/lib.zig
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ pub const version = struct {

pub const sync = struct {
pub usingnamespace @import("sync/channel.zig");
pub usingnamespace @import("sync/mpmc.zig");
};

pub const utils = struct {
Expand Down
77 changes: 57 additions & 20 deletions src/sync/channel.zig
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ const Atomic = std.atomic.Atomic;
const Mutex = std.Thread.Mutex;
const Condition = std.Thread.Condition;
const testing = std.testing;
const assert = std.debug.assert;

/// A very basic mpmc channel implementation - TODO: replace with a legit channel impl
pub fn Channel(comptime T: type) type {
Expand All @@ -11,32 +12,36 @@ pub fn Channel(comptime T: type) type {
lock: Mutex = .{},
hasValue: Condition = .{},
closed: Atomic(bool) = Atomic(bool).init(false),
allocator: std.mem.Allocator,

const Self = @This();

pub fn init(allocator: std.mem.Allocator, init_capacity: usize) Self {
return .{
pub fn init(allocator: std.mem.Allocator, init_capacity: usize) *Self {
var self = allocator.create(Self) catch unreachable;
self.* = .{
.buffer = std.ArrayList(T).initCapacity(allocator, init_capacity) catch unreachable,
.allocator = allocator,
};
return self;
}

pub fn deinit(self: *Self) void {
self.buffer.deinit();
self.allocator.destroy(self);
}

pub fn send(self: *Self, value: T) void {
self.lock.lock();
defer self.lock.unlock();

self.buffer.append(value) catch unreachable;
self.lock.unlock();
self.hasValue.signal();
}

pub fn receive(self: *Self) ?T {
self.lock.lock();
defer self.lock.unlock();

while (self.buffer.items.len == 0 and !self.closed.load(std.atomic.Ordering.SeqCst)) {
while (self.buffer.items.len == 0 and !self.closed.load(.SeqCst)) {
self.hasValue.wait(&self.lock);
}

Expand All @@ -48,8 +53,36 @@ pub fn Channel(comptime T: type) type {
return self.buffer.pop();
}

/// `drain` func will remove all pending items from queue.
///
/// NOTE: Caller is responsible for calling `allocator.free` on the returned slice.
pub fn drain(self: *Self) ?[]T {
self.lock.lock();
defer self.lock.unlock();

while (self.buffer.items.len == 0 and !self.closed.load(.SeqCst)) {
self.hasValue.wait(&self.lock);
}

// channel closed so return null to signal no more items
if (self.buffer.items.len == 0) {
return null;
}

var num_items_to_drain = self.buffer.items.len;

var out = self.allocator.alloc(T, num_items_to_drain) catch @panic("could not alloc");
@memcpy(out, self.buffer.items);

self.buffer.shrinkRetainingCapacity(0);
assert(self.buffer.items.len == 0);
assert(num_items_to_drain == out.len);

return out;
}

pub fn close(self: *Self) void {
self.closed.store(true, std.atomic.Ordering.SeqCst);
self.closed.store(true, .SeqCst);
self.hasValue.broadcast();
}
};
Expand All @@ -64,34 +97,38 @@ const BlockChannel = Channel(Block);

const logger = std.log.scoped(.sync_channel_tests);

fn testReceiver(chan: *BlockChannel, recv_count: *usize) void {
while (true) {
_ = chan.receive() orelse break;
recv_count.* += 1;
fn testReceiver(chan: *BlockChannel, recv_count: *Atomic(usize), id: u8) void {
_ = id;
while (chan.receive()) |v| {
_ = v;
_ = recv_count.fetchAdd(1, .SeqCst);
}
}

fn testSender(chan: *BlockChannel, send_count: *usize) void {
fn testSender(chan: *BlockChannel, total_send: usize) void {
var i: usize = 0;
while (i < 10) : (i += 1) {
std.time.sleep(std.time.ns_per_ms * 100);
while (i < total_send) : (i += 1) {
chan.send(Block{ .num = @intCast(i) });
send_count.* += 1;
}
chan.close();
}

test "sync: channel works" {
test "sync.channel: channel works properly" {
var ch = BlockChannel.init(testing.allocator, 100);
defer ch.deinit();

var recv_count: usize = 0;
var send_count: usize = 0;
var join1 = try std.Thread.spawn(.{}, testReceiver, .{ &ch, &recv_count });
var join2 = try std.Thread.spawn(.{}, testSender, .{ &ch, &send_count });
var recv_count: Atomic(usize) = Atomic(usize).init(0);
var send_count: usize = 100_000;

var join1 = try std.Thread.spawn(.{}, testReceiver, .{ ch, &recv_count, 1 });
var join2 = try std.Thread.spawn(.{}, testSender, .{ ch, send_count });
var join3 = try std.Thread.spawn(.{}, testReceiver, .{ ch, &recv_count, 2 });
var join4 = try std.Thread.spawn(.{}, testReceiver, .{ ch, &recv_count, 3 });

join1.join();
join2.join();
join3.join();
join4.join();

try testing.expect(recv_count == send_count);
try testing.expectEqual(send_count, recv_count.value);
}
Loading

0 comments on commit ff28ae5

Please sign in to comment.