Skip to content

Commit

Permalink
Merge pull request #6 from lightpanda-io/wrapper
Browse files Browse the repository at this point in the history
Wrapper
  • Loading branch information
francisbouvier authored Nov 21, 2024
2 parents 2fc7a24 + 2831fbe commit ed7ae07
Show file tree
Hide file tree
Showing 9 changed files with 247 additions and 110 deletions.
1 change: 0 additions & 1 deletion .github/workflows/zig-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,3 @@ jobs:
fetch-depth: 0

- run: zig build test
- run: zig build run
38 changes: 6 additions & 32 deletions build.zig
Original file line number Diff line number Diff line change
Expand Up @@ -15,47 +15,21 @@ pub fn build(b: *std.Build) void {
// set a preferred release mode, allowing the user to decide how to optimize.
const optimize = b.standardOptimizeOption(.{});

const exe = b.addExecutable(.{
.name = "zigio",
.root_source_file = b.path("src/main.zig"),
const tests = b.addTest(.{
.root_source_file = b.path("src/tests.zig"),
.test_runner = b.path("src/test_runner.zig"),
.target = target,
.optimize = optimize,
});

// This *creates* a Run step in the build graph, to be executed when another
// step is evaluated that depends on it. The next line below will establish
// such a dependency.
const run_cmd = b.addRunArtifact(exe);

// By making the run step depend on the install step, it will be run from the
// installation directory rather than directly from within the cache directory.
// This is not necessary, however, if the application depends on other installed
// files, this ensures they will be present and in the expected location.
run_cmd.step.dependOn(b.getInstallStep());

// This allows the user to pass arguments to the application in the build
// command itself, like this: `zig build run -- arg1 arg2 etc`
const run_tests = b.addRunArtifact(tests);
if (b.args) |args| {
run_cmd.addArgs(args);
run_tests.addArgs(args);
}

// This creates a build step. It will be visible in the `zig build --help` menu,
// and can be selected like this: `zig build run`
// This will evaluate the `run` step rather than the default, which is "install".
const run_step = b.step("run", "Run the app");
run_step.dependOn(&run_cmd.step);

const exe_unit_tests = b.addTest(.{
.root_source_file = b.path("src/main.zig"),
.target = target,
.optimize = optimize,
});

const run_exe_unit_tests = b.addRunArtifact(exe_unit_tests);

// Similar to creating the run step earlier, this exposes a `test` step to
// the `zig build --help` menu, providing a way for the user to request
// running the unit tests.
const test_step = b.step("test", "Run unit tests");
test_step.dependOn(&run_exe_unit_tests.step);
test_step.dependOn(&run_tests.step);
}
67 changes: 67 additions & 0 deletions src/Blocking.zig
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
const std = @import("std");

// Blocking is an example implementation of an IO API
// following the zig-async-io model.
// As it name suggests in this implementation all operations are
// in fact blocking, the async API is just faked.
pub const Blocking = @This();

pub const Completion = void;

pub const ConnectError = std.posix.ConnectError;
pub const SendError = std.posix.WriteError;
pub const RecvError = std.posix.ReadError;

pub fn connect(
_: *Blocking,
comptime CtxT: type,
ctx: *CtxT,
_: *Completion,
comptime cbk: fn (ctx: *CtxT, _: *Completion, res: ConnectError!void) void,
socket: std.posix.socket_t,
address: std.net.Address,
) void {
std.posix.connect(socket, &address.any, address.getOsSockLen()) catch |err| {
cbk(ctx, @constCast(&{}), err);
return;
};
cbk(ctx, @constCast(&{}), {});
}

pub fn onConnect(_: *Blocking, _: ConnectError!void) void {}

pub fn send(
_: *Blocking,
comptime CtxT: type,
ctx: *CtxT,
_: *Completion,
comptime cbk: fn (ctx: *CtxT, _: *Completion, res: SendError!usize) void,
socket: std.posix.socket_t,
buf: []const u8,
) void {
const len = std.posix.write(socket, buf) catch |err| {
cbk(ctx, @constCast(&{}), err);
return;
};
cbk(ctx, @constCast(&{}), len);
}

pub fn onSend(_: *Blocking, _: SendError!usize) void {}

pub fn recv(
_: *Blocking,
comptime CtxT: type,
ctx: *CtxT,
_: *Completion,
comptime cbk: fn (ctx: *CtxT, _: *Completion, res: RecvError!usize) void,
socket: std.posix.socket_t,
buf: []u8,
) void {
const len = std.posix.read(socket, buf) catch |err| {
cbk(ctx, @constCast(&{}), err);
return;
};
cbk(ctx, @constCast(&{}), len);
}

pub fn onRecv(_: *Blocking, _: RecvError!usize) void {}
192 changes: 140 additions & 52 deletions src/io.zig
Original file line number Diff line number Diff line change
@@ -1,59 +1,147 @@
const std = @import("std");

pub const Ctx = @import("std/http/Client.zig").Ctx;
pub const Cbk = @import("std/http/Client.zig").Cbk;

pub const Blocking = struct {
pub fn connect(
_: *Blocking,
comptime ctxT: type,
ctx: *ctxT,
comptime cbk: Cbk,
socket: std.posix.socket_t,
address: std.net.Address,
) void {
std.posix.connect(socket, &address.any, address.getOsSockLen()) catch |err| {
std.posix.close(socket);
cbk(ctx, err) catch |e| {
ctx.setErr(e);
};
};
cbk(ctx, {}) catch |e| ctx.setErr(e);
// IO is a type defined via a root declaration.
// It must implements the following methods:
// - connect, onConnect
// - send, onSend
// - recv, onRecv
// It must also define the following types:
// - Completion
// - ConnectError
// - SendError
// - RecvError
// see Blocking.io for an implementation example.
pub const IO = blk: {
const root = @import("root");
if (@hasDecl(root, "IO")) {
break :blk root.IO;
}
@compileError("no IO API defined at root");
};

// Wrapper for a base IO API.
pub fn Wrapper(IO_T: type) type {
return struct {
io: *IO_T,
completion: IO_T.Completion,

const Self = @This();

pub fn init(io: *IO_T) Self {
return .{ .io = io, .completion = undefined };
}

// NOTE: Business methods connect, send, recv expect a Ctx
// who should reference the base IO API in Ctx.io field

// NOTE: Ctx is already known (ie. @import("std/http/Client.zig").Ctx)
// but we require to provide its type (comptime) as argument
// to avoid dependency loop
// ie. Wrapper requiring Ctx and Ctx requiring Wrapper

fn Cbk(comptime Ctx: type) type {
return *const fn (ctx: *Ctx, res: anyerror!void) anyerror!void;
}

pub fn send(
_: *Blocking,
comptime ctxT: type,
ctx: *ctxT,
comptime cbk: Cbk,
socket: std.posix.socket_t,
buf: []const u8,
) void {
const len = std.posix.write(socket, buf) catch |err| {
cbk(ctx, err) catch |e| {
return ctx.setErr(e);
pub fn connect(
self: *Self,
comptime Ctx: type,
ctx: *Ctx,
comptime cbk: Cbk(Ctx),
socket: std.posix.socket_t,
address: std.net.Address,
) void {
self.io.connect(Ctx, ctx, &self.completion, onConnect(Ctx, cbk), socket, address);
}

fn onConnectFn(comptime Ctx: type) type {
return fn (
ctx: *Ctx,
_: *IO_T.Completion,
result: IO_T.ConnectError!void,
) void;
}
fn onConnect(comptime Ctx: type, comptime cbk: Cbk(Ctx)) onConnectFn(Ctx) {
const s = struct {
fn on(
ctx: *Ctx,
_: *IO_T.Completion,
result: IO_T.ConnectError!void,
) void {
ctx.io.io.onConnect(result); // base IO callback
_ = result catch |err| return ctx.setErr(err);
cbk(ctx, {}) catch |err| return ctx.setErr(err);
}
};
return ctx.setErr(err);
};
ctx.setLen(len);
cbk(ctx, {}) catch |e| ctx.setErr(e);
}
return s.on;
}

pub fn recv(
_: *Blocking,
comptime ctxT: type,
ctx: *ctxT,
comptime cbk: Cbk,
socket: std.posix.socket_t,
buf: []u8,
) void {
const len = std.posix.read(socket, buf) catch |err| {
cbk(ctx, err) catch |e| {
return ctx.setErr(e);
pub fn send(
self: *Self,
comptime Ctx: type,
ctx: *Ctx,
comptime cbk: Cbk(Ctx),
socket: std.posix.socket_t,
buf: []const u8,
) void {
self.io.send(Ctx, ctx, &self.completion, onSend(Ctx, cbk), socket, buf);
}

fn onSendFn(comptime Ctx: type) type {
return fn (
ctx: *Ctx,
_: *IO_T.Completion,
result: IO_T.SendError!usize,
) void;
}
fn onSend(comptime Ctx: type, comptime cbk: Cbk(Ctx)) onSendFn(Ctx) {
const s = struct {
fn on(
ctx: *Ctx,
_: *IO_T.Completion,
result: IO_T.SendError!usize,
) void {
ctx.io.io.onSend(result); // base IO callback
const len = result catch |err| return ctx.setErr(err);
ctx.setLen(len);
cbk(ctx, {}) catch |e| ctx.setErr(e);
}
};
return ctx.setErr(err);
};
ctx.setLen(len);
cbk(ctx, {}) catch |e| ctx.setErr(e);
}
};
return s.on;
}

pub fn recv(
self: *Self,
comptime Ctx: type,
ctx: *Ctx,
comptime cbk: Cbk(Ctx),
socket: std.posix.socket_t,
buf: []u8,
) void {
self.io.recv(Ctx, ctx, &self.completion, onRecv(Ctx, cbk), socket, buf);
}

fn onRecvFn(comptime Ctx: type) type {
return fn (
ctx: *Ctx,
_: *IO_T.Completion,
result: IO_T.RecvError!usize,
) void;
}
fn onRecv(comptime Ctx: type, comptime cbk: Cbk(Ctx)) onRecvFn(Ctx) {
const s = struct {
fn do(
ctx: *Ctx,
_: *IO_T.Completion,
result: IO_T.RecvError!usize,
) void {
ctx.io.io.onRecv(result); // base IO callback
const len = result catch |err| return ctx.setErr(err);
ctx.setLen(len);
cbk(ctx, {}) catch |err| return ctx.setErr(err);
}
};
return s.do;
}
};
}
2 changes: 2 additions & 0 deletions src/lib.zig
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub const Wrapper = @import("io.zig").Wrapper;
pub const Client = @import("std/http/Client.zig");
9 changes: 4 additions & 5 deletions src/std/http/Client.zig
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ const proto = @import("protocol.zig");
const tls23 = @import("../../tls.zig/main.zig");
const VecPut = @import("../../tls.zig/connection.zig").VecPut;
const GenericStack = @import("../../stack.zig").Stack;
const async_io = @import("../../io.zig");
const Loop = async_io.Blocking;
pub const IO = @import("../../io.zig").IO;

const cipher = @import("../../tls.zig/cipher.zig");

Expand Down Expand Up @@ -2390,7 +2389,7 @@ pub const Ctx = struct {

userData: *anyopaque = undefined,

loop: *Loop,
io: *IO,
data: Data,
stack: ?*Stack = null,
err: ?anyerror = null,
Expand Down Expand Up @@ -2419,7 +2418,7 @@ pub const Ctx = struct {
_tls_write_index: usize = 0,
_tls_write_buf: [cipher.max_ciphertext_record_len]u8 = undefined,

pub fn init(loop: *Loop, req: *Request) !Ctx {
pub fn init(io: *IO, req: *Request) !Ctx {
const connection = try req.client.allocator.create(Connection);
connection.* = .{
.stream = undefined,
Expand All @@ -2430,7 +2429,7 @@ pub const Ctx = struct {
};
return .{
.req = req,
.loop = loop,
.io = io,
.data = .{ .conn = connection },
};
}
Expand Down
6 changes: 3 additions & 3 deletions src/std/net.zig
Original file line number Diff line number Diff line change
Expand Up @@ -1908,7 +1908,7 @@ pub const Stream = struct {
ctx: *Ctx,
comptime cbk: Cbk,
) !void {
return ctx.loop.recv(Ctx, ctx, cbk, self.handle, buffer);
return ctx.io.recv(Ctx, ctx, cbk, self.handle, buffer);
}

pub fn async_readv(
Expand All @@ -1924,7 +1924,7 @@ pub const Stream = struct {

// TODO: why not take a buffer here?
pub fn async_write(self: Stream, buffer: []const u8, ctx: *Ctx, comptime cbk: Cbk) void {
return ctx.loop.send(Ctx, ctx, cbk, self.handle, buffer);
return ctx.io.send(Ctx, ctx, cbk, self.handle, buffer);
}

fn onWriteAll(ctx: *Ctx, res: anyerror!void) anyerror!void {
Expand Down Expand Up @@ -2033,7 +2033,7 @@ pub fn async_tcpConnectToAddress(address: std.net.Address, ctx: *Ctx, comptime c
ctx.data.socket = sockfd;
ctx.push(cbk) catch |e| return ctx.pop(e);

ctx.loop.connect(
ctx.io.connect(
Ctx,
ctx,
setStream,
Expand Down
Loading

0 comments on commit ed7ae07

Please sign in to comment.