diff --git a/build.zig b/build.zig index 53ce90f..16c800b 100644 --- a/build.zig +++ b/build.zig @@ -6,7 +6,7 @@ pub fn build(b: *std.Build) !void { const target = b.standardTargetOptions(.{}); const optimize = b.standardOptimizeOption(.{}); - _ = b.addModule("xev", .{ .root_source_file = b.path("src/main.zig") }); + _ = b.addModule("xev", .{ .root_source_file = b.path("src/lib.zig") }); const man_pages = b.option( bool, @@ -60,7 +60,7 @@ pub fn build(b: *std.Build) !void { // we can easily run it manually without digging through the cache. const test_exe = b.addTest(.{ .name = "xev-test", - .root_source_file = b.path("src/main.zig"), + .root_source_file = b.path("src/lib.zig"), .target = target, .optimize = optimize, }); diff --git a/src/backend/epoll.zig b/src/backend/epoll.zig index ae4ec7d..8ad50be 100644 --- a/src/backend/epoll.zig +++ b/src/backend/epoll.zig @@ -5,7 +5,7 @@ const posix = std.posix; const queue = @import("../queue.zig"); const queue_mpsc = @import("../queue_mpsc.zig"); const heap = @import("../heap.zig"); -const main = @import("../main.zig"); +const main = @import("../lib.zig"); const xev = main.Epoll; const ThreadPool = main.ThreadPool; diff --git a/src/backend/io_uring.zig b/src/backend/io_uring.zig index e9e2d02..3b67da3 100644 --- a/src/backend/io_uring.zig +++ b/src/backend/io_uring.zig @@ -3,7 +3,7 @@ const assert = std.debug.assert; const linux = std.os.linux; const posix = std.posix; const queue = @import("../queue.zig"); -const xev = @import("../main.zig").IO_Uring; +const xev = @import("../lib.zig").IO_Uring; pub const Loop = struct { ring: linux.IoUring, diff --git a/src/backend/iocp.zig b/src/backend/iocp.zig index 90d0063..266bf5b 100644 --- a/src/backend/iocp.zig +++ b/src/backend/iocp.zig @@ -4,7 +4,7 @@ const assert = std.debug.assert; const windows = @import("../windows.zig"); const queue = @import("../queue.zig"); const heap = @import("../heap.zig"); -const xev = @import("../main.zig").IOCP; +const xev = @import("../lib.zig").IOCP; const posix = std.posix; const log = std.log.scoped(.libxev_iocp); diff --git a/src/backend/kqueue.zig b/src/backend/kqueue.zig index 834f916..d2f8426 100644 --- a/src/backend/kqueue.zig +++ b/src/backend/kqueue.zig @@ -7,7 +7,7 @@ const posix = std.posix; const queue = @import("../queue.zig"); const queue_mpsc = @import("../queue_mpsc.zig"); const heap = @import("../heap.zig"); -const main = @import("../main.zig"); +const main = @import("../lib.zig"); const xev = main.Kqueue; const ThreadPool = main.ThreadPool; diff --git a/src/backend/wasi_poll.zig b/src/backend/wasi_poll.zig index 9b6f915..f49724e 100644 --- a/src/backend/wasi_poll.zig +++ b/src/backend/wasi_poll.zig @@ -5,7 +5,7 @@ const wasi = std.os.wasi; const posix = std.posix; const queue = @import("../queue.zig"); const heap = @import("../heap.zig"); -const xev = @import("../main.zig").WasiPoll; +const xev = @import("../lib.zig").WasiPoll; pub const Loop = struct { pub const threaded = std.Target.wasm.featureSetHas(builtin.cpu.features, .atomics); diff --git a/src/c_api.zig b/src/c_api.zig index c5172ff..bc1169e 100644 --- a/src/c_api.zig +++ b/src/c_api.zig @@ -9,7 +9,7 @@ const std = @import("std"); const builtin = @import("builtin"); const assert = std.debug.assert; -const xev = @import("main.zig"); +const xev = @import("lib.zig"); export fn xev_loop_init(loop: *xev.Loop) c_int { // TODO: overflow diff --git a/src/lib.zig b/src/lib.zig new file mode 100644 index 0000000..0b22124 --- /dev/null +++ b/src/lib.zig @@ -0,0 +1,137 @@ +const std = @import("std"); +const builtin = @import("builtin"); + +pub const Xev = @import("xev.zig").Xev; + +/// System-specific interfaces. Note that they are always pub for +/// all systems but if you reference them and force them to be analyzed +/// the proper system APIs must exist. Due to Zig's lazy analysis, if you +/// don't use any interface it will NOT be compiled (yay!). +pub const IO_Uring = Xev(.io_uring, @import("backend/io_uring.zig")); +pub const Epoll = Xev(.epoll, @import("backend/epoll.zig")); +pub const Kqueue = Xev(.kqueue, @import("backend/kqueue.zig")); +pub const WasiPoll = Xev(.wasi_poll, @import("backend/wasi_poll.zig")); +pub const IOCP = Xev(.iocp, @import("backend/iocp.zig")); + +/// Generic thread pool implementation. +pub const ThreadPool = @import("ThreadPool.zig"); + +/// This stream (lowercase s) can be used as a namespace to access +/// Closeable, Writeable, Readable, etc. so that custom streams +/// can be constructed. +pub const stream = @import("watcher/stream.zig"); + +/// The backend types. +pub const Backend = enum { + io_uring, + epoll, + kqueue, + wasi_poll, + iocp, + + /// Returns a recommend default backend from inspecting the system. + pub fn default() Backend { + return @as(?Backend, switch (builtin.os.tag) { + .linux => .io_uring, + .ios, .macos => .kqueue, + .wasi => .wasi_poll, + .windows => .iocp, + else => null, + }) orelse { + @compileLog(builtin.os); + @compileError("no default backend for this target"); + }; + } + + /// Returns the Api (return value of Xev) for the given backend type. + pub fn Api(comptime self: Backend) type { + return switch (self) { + .io_uring => IO_Uring, + .epoll => Epoll, + .kqueue => Kqueue, + .wasi_poll => WasiPoll, + .iocp => IOCP, + }; + } +}; + +pub const backend = Backend.default(); + +const T = backend.Api(); +pub const Sys = T.Sys; + +const loop = @import("loop.zig"); + +/// The core loop APIs. +pub const Loop = Sys.Loop; +pub const Completion = Sys.Completion; +pub const Result = Sys.Result; +pub const ReadBuffer = Sys.ReadBuffer; +pub const WriteBuffer = Sys.WriteBuffer; +pub const Options = loop.Options; +pub const RunMode = loop.RunMode; +pub const CallbackAction = loop.CallbackAction; +pub const CompletionState = loop.CompletionState; + +/// Error types +pub const AcceptError = Sys.AcceptError; +pub const CancelError = Sys.CancelError; +pub const CloseError = Sys.CloseError; +pub const ConnectError = Sys.ConnectError; +pub const ShutdownError = Sys.ShutdownError; +pub const WriteError = Sys.WriteError; +pub const ReadError = Sys.ReadError; + +const Self = @This(); + +/// The high-level helper interfaces that make it easier to perform +/// common tasks. These may not work with all possible Loop implementations. +pub const Async = @import("watcher/async.zig").Async(Self); +pub const File = @import("watcher/file.zig").File(Self); +pub const Process = @import("watcher/process.zig").Process(Self); +pub const Stream = stream.GenericStream(Self); +pub const Timer = @import("watcher/timer.zig").Timer(Self); +pub const TCP = @import("watcher/tcp.zig").TCP(Self); +pub const UDP = @import("watcher/udp.zig").UDP(Self); + +/// The callback of the main Loop operations. Higher level interfaces may +/// use a different callback mechanism. +pub const Callback = *const fn ( + userdata: ?*anyopaque, + loop: *Loop, + completion: *Completion, + result: Result, +) CallbackAction; + +pub const noopCallback = T.noopCallback; + +test { + // Tested on all platforms + _ = @import("heap.zig"); + _ = @import("queue.zig"); + _ = @import("queue_mpsc.zig"); + _ = ThreadPool; + + // Test the C API + if (builtin.os.tag != .wasi) _ = @import("c_api.zig"); + + // OS-specific tests + switch (builtin.os.tag) { + .linux => { + _ = Epoll; + _ = IO_Uring; + _ = @import("linux/timerfd.zig"); + }, + + .wasi => { + //_ = WasiPoll; + _ = @import("backend/wasi_poll.zig"); + }, + + .windows => { + _ = @import("backend/iocp.zig"); + }, + + else => {}, + } +} diff --git a/src/loop.zig b/src/loop.zig index 6e324f5..a644341 100644 --- a/src/loop.zig +++ b/src/loop.zig @@ -3,7 +3,7 @@ const std = @import("std"); const assert = std.debug.assert; -const xev = @import("main.zig"); +const xev = @import("lib.zig"); /// Common options across backends. Not all options apply to all backends. /// Read the doc comment for individual fields to learn what backends they diff --git a/src/main.zig b/src/main.zig deleted file mode 100644 index 74f451a..0000000 --- a/src/main.zig +++ /dev/null @@ -1,174 +0,0 @@ -const std = @import("std"); -const builtin = @import("builtin"); - -/// The low-level IO interfaces using the recommended compile-time -/// interface for the target system. -const xev = Backend.default().Api(); -pub usingnamespace xev; -//pub usingnamespace Epoll; - -/// System-specific interfaces. Note that they are always pub for -/// all systems but if you reference them and force them to be analyzed -/// the proper system APIs must exist. Due to Zig's lazy analysis, if you -/// don't use any interface it will NOT be compiled (yay!). -pub const IO_Uring = Xev(.io_uring, @import("backend/io_uring.zig")); -pub const Epoll = Xev(.epoll, @import("backend/epoll.zig")); -pub const Kqueue = Xev(.kqueue, @import("backend/kqueue.zig")); -pub const WasiPoll = Xev(.wasi_poll, @import("backend/wasi_poll.zig")); -pub const IOCP = Xev(.iocp, @import("backend/iocp.zig")); - -/// Generic thread pool implementation. -pub const ThreadPool = @import("ThreadPool.zig"); - -/// This stream (lowercase s) can be used as a namespace to access -/// Closeable, Writeable, Readable, etc. so that custom streams -/// can be constructed. -pub const stream = @import("watcher/stream.zig"); - -/// The backend types. -pub const Backend = enum { - io_uring, - epoll, - kqueue, - wasi_poll, - iocp, - - /// Returns a recommend default backend from inspecting the system. - pub fn default() Backend { - return @as(?Backend, switch (builtin.os.tag) { - .linux => .io_uring, - .ios, .macos => .kqueue, - .wasi => .wasi_poll, - .windows => .iocp, - else => null, - }) orelse { - @compileLog(builtin.os); - @compileError("no default backend for this target"); - }; - } - - /// Returns the Api (return value of Xev) for the given backend type. - pub fn Api(comptime self: Backend) type { - return switch (self) { - .io_uring => IO_Uring, - .epoll => Epoll, - .kqueue => Kqueue, - .wasi_poll => WasiPoll, - .iocp => IOCP, - }; - } -}; - -/// Creates the Xev API based on a backend type. -/// -/// For the default backend type for your system (i.e. io_uring on Linux), -/// this is the main API you interact with. It is `usingnamespaced` into -/// the "xev" package so you'd use types such as `xev.Loop`, `xev.Completion`, -/// etc. -/// -/// Unless you're using a custom or specific backend type, you do NOT ever -/// need to call the Xev function itself. -pub fn Xev(comptime be: Backend, comptime T: type) type { - return struct { - const Self = @This(); - const loop = @import("loop.zig"); - - /// The backend that this is. This is supplied at comptime so - /// it is up to the caller to say the right thing. This lets custom - /// implementations also "quack" like an implementation. - pub const backend = be; - - /// The core loop APIs. - pub const Loop = T.Loop; - pub const Completion = T.Completion; - pub const Result = T.Result; - pub const ReadBuffer = T.ReadBuffer; - pub const WriteBuffer = T.WriteBuffer; - pub const Options = loop.Options; - pub const RunMode = loop.RunMode; - pub const CallbackAction = loop.CallbackAction; - pub const CompletionState = loop.CompletionState; - - /// Error types - pub const AcceptError = T.AcceptError; - pub const CancelError = T.CancelError; - pub const CloseError = T.CloseError; - pub const ConnectError = T.ConnectError; - pub const ShutdownError = T.ShutdownError; - pub const WriteError = T.WriteError; - pub const ReadError = T.ReadError; - - /// The high-level helper interfaces that make it easier to perform - /// common tasks. These may not work with all possible Loop implementations. - pub const Async = @import("watcher/async.zig").Async(Self); - pub const File = @import("watcher/file.zig").File(Self); - pub const Process = @import("watcher/process.zig").Process(Self); - pub const Stream = stream.GenericStream(Self); - pub const Timer = @import("watcher/timer.zig").Timer(Self); - pub const TCP = @import("watcher/tcp.zig").TCP(Self); - pub const UDP = @import("watcher/udp.zig").UDP(Self); - - /// The callback of the main Loop operations. Higher level interfaces may - /// use a different callback mechanism. - pub const Callback = *const fn ( - userdata: ?*anyopaque, - loop: *Loop, - completion: *Completion, - result: Result, - ) CallbackAction; - - /// A way to access the raw type. - pub const Sys = T; - - /// A callback that does nothing and immediately disarms. This - /// implements xev.Callback and is the default value for completions. - pub fn noopCallback( - _: ?*anyopaque, - _: *Loop, - _: *Completion, - _: Result, - ) CallbackAction { - return .disarm; - } - - test { - @import("std").testing.refAllDecls(@This()); - } - - test "completion is zero-able" { - const c: Completion = .{}; - _ = c; - } - }; -} - -test { - // Tested on all platforms - _ = @import("heap.zig"); - _ = @import("queue.zig"); - _ = @import("queue_mpsc.zig"); - _ = ThreadPool; - - // Test the C API - if (builtin.os.tag != .wasi) _ = @import("c_api.zig"); - - // OS-specific tests - switch (builtin.os.tag) { - .linux => { - _ = Epoll; - _ = IO_Uring; - _ = @import("linux/timerfd.zig"); - }, - - .wasi => { - //_ = WasiPoll; - _ = @import("backend/wasi_poll.zig"); - }, - - .windows => { - _ = @import("backend/iocp.zig"); - }, - - else => {}, - } -} diff --git a/src/watcher/file.zig b/src/watcher/file.zig index 0d31d05..2d76f3f 100644 --- a/src/watcher/file.zig +++ b/src/watcher/file.zig @@ -3,7 +3,7 @@ const builtin = @import("builtin"); const common = @import("common.zig"); const assert = std.debug.assert; const posix = std.posix; -const main = @import("../main.zig"); +const main = @import("../lib.zig"); const stream = @import("stream.zig"); /// File operations. @@ -32,13 +32,25 @@ pub fn File(comptime xev: type) type { /// The underlying file fd: FdType, - pub usingnamespace stream.Stream(xev, Self, .{ + const S = stream.Stream(xev, Self, .{ .close = true, .read = .read, .write = .write, .threadpool = true, }); + pub const CloseError = S.CloseError; + pub const close = S.close; + + pub const ReadError = S.ReadError; + pub const read = S.read; + + pub const WriteError = S.WriteError; + pub const WriteQueue = S.WriteQueue; + pub const WriteRequest = S.WriteRequest; + pub const queueWrite = S.queueWrite; + pub const write = S.write; + /// Initialize a File from a std.fs.File. pub fn init(file: std.fs.File) !Self { return .{ diff --git a/src/watcher/stream.zig b/src/watcher/stream.zig index 7f5df6f..363afe6 100644 --- a/src/watcher/stream.zig +++ b/src/watcher/stream.zig @@ -19,7 +19,8 @@ pub const Options = struct { }; /// Creates a stream type that is meant to be embedded within other -/// types using "usingnamespace". A stream is something that supports read, +/// types using "usingnamespace" (or by using its decls directly). +/// A stream is something that supports read, /// write, close, etc. The exact operations supported are defined by the /// "options" struct. /// @@ -28,10 +29,21 @@ pub const Options = struct { /// - decl named "initFd" to initialize a new T from a fd /// pub fn Stream(comptime xev: type, comptime T: type, comptime options: Options) type { + const C = if (options.close) Closeable(xev, T, options); + const R = if (options.read != .none) Readable(xev, T, options); + const W = if (options.write != .none) Writeable(xev, T, options); return struct { - pub usingnamespace if (options.close) Closeable(xev, T, options) else struct {}; - pub usingnamespace if (options.read != .none) Readable(xev, T, options) else struct {}; - pub usingnamespace if (options.write != .none) Writeable(xev, T, options) else struct {}; + pub const CloseError = if (options.close) C.CloseError; + pub const close = if (options.close) C.close; + + pub const ReadError = if (options.read != .none) R.ReadError; + pub const read = if (options.read != .none) R.read; + + pub const WriteError = if (options.write != .none) W.WriteError; + pub const WriteQueue = if (options.write != .none) W.WriteQueue; + pub const WriteRequest = if (options.write != .none) W.WriteRequest; + pub const queueWrite = if (options.write != .none) W.queueWrite; + pub const write = if (options.write != .none) W.write; }; } @@ -258,7 +270,7 @@ pub fn Writeable(comptime xev: type, comptime T: type, comptime options: Options // Initialize our completion req.* = .{ .full_write_buffer = buf }; // Must be kept in sync with partial write logic inside the callback - self.write_init(&req.completion, buf); + write_init(self, &req.completion, buf); req.completion.userdata = q; req.completion.callback = (struct { fn callback( @@ -289,7 +301,7 @@ pub fn Writeable(comptime xev: type, comptime T: type, comptime options: Options if (written_len < queued_len) { // Write remainder of the buffer, reusing the same completion const rem_buf = writeBufferRemainder(cb_res.buf, written_len); - cb_res.writer.write_init(&req_inner.completion, rem_buf); + write_init(cb_res.writer, &req_inner.completion, rem_buf); req_inner.completion.userdata = q_inner; req_inner.completion.callback = callback; l_inner.add(&req_inner.completion); @@ -364,7 +376,7 @@ pub fn Writeable(comptime xev: type, comptime T: type, comptime options: Options r: WriteError!usize, ) xev.CallbackAction, ) void { - self.write_init(c, buf); + write_init(self, c, buf); c.userdata = userdata; c.callback = (struct { fn callback( @@ -508,12 +520,24 @@ pub fn GenericStream(comptime xev: type) type { /// The underlying file fd: std.posix.fd_t, - pub usingnamespace Stream(xev, Self, .{ + const S = Stream(xev, Self, .{ .close = true, .read = .read, .write = .write, }); + pub const CloseError = S.CloseError; + pub const close = S.close; + + pub const ReadError = S.ReadError; + pub const read = S.read; + + pub const WriteError = S.WriteError; + pub const WriteQueue = S.WriteQueue; + pub const WriteRequest = S.WriteRequest; + pub const queueWrite = S.queueWrite; + pub const write = S.write; + /// Initialize a generic stream from a file descriptor. pub fn initFd(fd: std.posix.fd_t) Self { return .{ diff --git a/src/watcher/tcp.zig b/src/watcher/tcp.zig index e923b50..720d4c3 100644 --- a/src/watcher/tcp.zig +++ b/src/watcher/tcp.zig @@ -20,12 +20,24 @@ pub fn TCP(comptime xev: type) type { fd: FdType, - pub usingnamespace stream.Stream(xev, Self, .{ + const S = stream.Stream(xev, Self, .{ .close = true, .read = .recv, .write = .send, }); + pub const CloseError = S.CloseError; + pub const close = S.close; + + pub const ReadError = S.ReadError; + pub const read = S.read; + + pub const WriteError = S.WriteError; + pub const WriteQueue = S.WriteQueue; + pub const WriteRequest = S.WriteRequest; + pub const queueWrite = S.queueWrite; + pub const write = S.write; + /// Initialize a new TCP with the family from the given address. Only /// the family is used, the actual address has no impact on the created /// resource. diff --git a/src/watcher/udp.zig b/src/watcher/udp.zig index e8eeff9..6c62afb 100644 --- a/src/watcher/udp.zig +++ b/src/watcher/udp.zig @@ -42,12 +42,15 @@ fn UDPSendto(comptime xev: type) type { userdata: ?*anyopaque, }; - pub usingnamespace stream.Stream(xev, Self, .{ + const S = stream.Stream(xev, Self, .{ .close = true, .read = .none, .write = .none, }); + pub const CloseError = S.CloseError; + pub const close = S.close; + /// Initialize a new UDP with the family from the given address. Only /// the family is used, the actual address has no impact on the created /// resource. @@ -223,12 +226,15 @@ fn UDPSendtoIOCP(comptime xev: type) type { userdata: ?*anyopaque, }; - pub usingnamespace stream.Stream(xev, Self, .{ + const S = stream.Stream(xev, Self, .{ .close = true, .read = .none, .write = .none, }); + pub const CloseError = S.CloseError; + pub const close = S.close; + /// Initialize a new UDP with the family from the given address. Only /// the family is used, the actual address has no impact on the created /// resource. @@ -421,12 +427,15 @@ fn UDPSendMsg(comptime xev: type) type { }, }; - pub usingnamespace stream.Stream(xev, Self, .{ + const S = stream.Stream(xev, Self, .{ .close = true, .read = .none, .write = .none, }); + pub const CloseError = S.CloseError; + pub const close = S.close; + /// Initialize a new UDP with the family from the given address. Only /// the family is used, the actual address has no impact on the created /// resource. diff --git a/src/xev.zig b/src/xev.zig new file mode 100644 index 0000000..76d294f --- /dev/null +++ b/src/xev.zig @@ -0,0 +1,75 @@ +const Backend = @import("lib.zig").Backend; + +pub fn Xev(comptime be: Backend, comptime T: type) type { + return struct { + const Self = @This(); + const loop = @import("loop.zig"); + + /// The backend that this is. This is supplied at comptime so + /// it is up to the caller to say the right thing. This lets custom + /// implementations also "quack" like an implementation. + pub const backend = be; + + /// The core loop APIs. + pub const Loop = T.Loop; + pub const Completion = T.Completion; + pub const Result = T.Result; + pub const ReadBuffer = T.ReadBuffer; + pub const WriteBuffer = T.WriteBuffer; + pub const Options = loop.Options; + pub const RunMode = loop.RunMode; + pub const CallbackAction = loop.CallbackAction; + pub const CompletionState = loop.CompletionState; + + /// Error types + pub const AcceptError = T.AcceptError; + pub const CancelError = T.CancelError; + pub const CloseError = T.CloseError; + pub const ConnectError = T.ConnectError; + pub const ShutdownError = T.ShutdownError; + pub const WriteError = T.WriteError; + pub const ReadError = T.ReadError; + + /// The high-level helper interfaces that make it easier to perform + /// common tasks. These may not work with all possible Loop implementations. + pub const Async = @import("watcher/async.zig").Async(Self); + pub const File = @import("watcher/file.zig").File(Self); + pub const Process = @import("watcher/process.zig").Process(Self); + pub const Stream = @import("watcher/stream.zig").GenericStream(Self); + pub const Timer = @import("watcher/timer.zig").Timer(Self); + pub const TCP = @import("watcher/tcp.zig").TCP(Self); + pub const UDP = @import("watcher/udp.zig").UDP(Self); + + /// The callback of the main Loop operations. Higher level interfaces may + /// use a different callback mechanism. + pub const Callback = *const fn ( + userdata: ?*anyopaque, + loop: *Loop, + completion: *Completion, + result: Result, + ) CallbackAction; + + /// A way to access the raw type. + pub const Sys = T; + + /// A callback that does nothing and immediately disarms. This + /// implements xev.Callback and is the default value for completions. + pub fn noopCallback( + _: ?*anyopaque, + _: *Loop, + _: *Completion, + _: Result, + ) CallbackAction { + return .disarm; + } + + test { + @import("std").testing.refAllDecls(@This()); + } + + test "completion is zero-able" { + const c: Completion = .{}; + _ = c; + } + }; +}