diff --git a/README.md b/README.md index cac361e..ca903eb 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,11 @@ This Zig library provides a ZeroMQ client. -It is implemented as a wrapper of the "High-level C Binding for ZeroMQ" ([CZMQ](http://czmq.zeromq.org)). +It is implemented based on the C API of [libzmq](https://libzmq.readthedocs.io/en/latest/). +The interface is highly inspired by [CZMQ](http://czmq.zeromq.org) and [goczmq](https://github.com/zeromq/goczmq). + +It was originally based on the "High-level C Binding for ZeroMQ" ([CZMQ](http://czmq.zeromq.org)), +but later moved to using [libzmq](https://libzmq.readthedocs.io/en/latest/) directly, to provide zero-copy message support. **IMPORTANT: The library is currently still work in progress!!** @@ -12,7 +16,7 @@ It is implemented as a wrapper of the "High-level C Binding for ZeroMQ" ([CZMQ]( ### Minimal Example -Since this library is basically a 1:1 wrapper of CZMQ, please refer to the [CZMQ documentation](http://czmq.zeromq.org) to get a better understanding on how the library works. +This repository holds various example within the `examples` folder. Please feel free to also have a look at the various unit tests in this library (esp. [ZSocket](src/classes/zsocket.zig)). Running the server (also see [full example](https://github.com/nine-lives-later/zzmq/tree/main/examples/hello_world_server)): @@ -23,20 +27,26 @@ const zzmq = @import("zzmq"); var socket = try zzmq.ZSocket.init(allocator, zzmq.ZSocketType.Pair); defer socket.deinit(); -const port = try socket.bind("tcp://127.0.0.1:!"); +try socket.bind("tcp://127.0.0.1:*"); + +std.log.info("Endpoint: {s}", .{try socket.endpoint()}); // send a message -var frame = try zzmq.ZFrame.init(data); -defer frame.deinit(); +var message = try zzmq.ZMessage.initUnmanaged(data, null); +defer message.deinit(); -try socket.send(&frame, .{}); +try socket.send(&message, .{}); ``` + Running the client (also see [full example](https://github.com/nine-lives-later/zzmq/tree/main/examples/hello_world_client)): ```zig const zzmq = @import("zzmq"); -var socket = try zzmq.ZSocket.init(allocator, zzmq.ZSocketType.Pair); +var context = try zzmq.ZContext.init(allocator); +defer context.deinit(); + +var socket = try zzmq.ZSocket.init(zzmq.ZSocketType.Pair, &context); defer socket.deinit(); const endpoint = try std.fmt.allocPrint(allocator, "tcp://127.0.0.1:{}", .{port}); @@ -45,10 +55,10 @@ defer allocator.free(endpoint); try socket.connect(endpoint); // receive a message -var frame = try socket.receive(); -defer frame.deinit(); +var message = try socket.receive(.{}); +defer message.deinit(); -const data = try frame.data(); +const data = try message.data(); ``` @@ -83,24 +93,26 @@ const zzmq = b.dependency("zzmq", .{ // `exe.root_module.addImport` instead of `exe.addModule` exe.addModule("zzmq", zzmq.module("zzmq")); -exe.linkSystemLibrary("czmq"); +exe.linkSystemLibrary("zmq"); exe.linkLibC(); ``` ### Installing local dependencies -Installing [CZMQ](http://czmq.zeromq.org) development library version 4.0 or higher is also required: +Installing [libzmq](https://zeromq.org/download/) development library version 4.1 or higher is also required: ```sh # Building on Ubuntu, PoP_OS, ZorinOS, etc. -sudo apt install libczmq-dev +sudo apt install libzmq5-dev # Running on Ubuntu, PoP_OS, ZorinOS, etc. -sudo apt install libczmq +sudo apt install libzmq5 ``` See the [unit test Dockerfile](test.Dockerfile) on how to install it into an Alpine Docker image. +To retrieve the version of the libzmq library actually being used, call `ZContext.version()`. + ## Contributing ### Zig Version Branches @@ -121,7 +133,7 @@ The library can be tested locally by running: `zig build test`. Implementation done by [Felix Kollmann](https://github.com/fkollmann). -Based on the work of [CZMQ](http://czmq.zeromq.org), inspired by [goczmq](https://github.com/zeromq/goczmq). +Inspired by [CZMQ](http://czmq.zeromq.org) and [goczmq](https://github.com/zeromq/goczmq). ## License diff --git a/build.zig b/build.zig index 1248ce8..129d166 100644 --- a/build.zig +++ b/build.zig @@ -14,7 +14,7 @@ pub fn build(b: *std.Build) !void { .optimize = optimize, }); - lib_test.linkSystemLibrary("czmq"); + lib_test.linkSystemLibrary("zmq"); lib_test.linkLibC(); const run_test = b.addRunArtifact(lib_test); diff --git a/build_and_test.sh b/build_and_test.sh index beb643f..9a7b7be 100755 --- a/build_and_test.sh +++ b/build_and_test.sh @@ -3,4 +3,6 @@ set -e zig build test --summary all +#zig test src/zzmq.zig -lc -lzmq + zig fmt . > /dev/null diff --git a/libzmq/.gitignore b/libzmq/.gitignore new file mode 100644 index 0000000..ea1472e --- /dev/null +++ b/libzmq/.gitignore @@ -0,0 +1 @@ +output/ diff --git a/libzmq/Dockerfile b/libzmq/Dockerfile new file mode 100644 index 0000000..8082f66 --- /dev/null +++ b/libzmq/Dockerfile @@ -0,0 +1,20 @@ +FROM alpine:3.19 as builder + +RUN apk add --no-cache g++ gcc cmake make musl-dev + +# add the pre-processed source package (note: this is not the raw source code from Git!) +ADD https://github.com/zeromq/libzmq/releases/download/v4.3.5/zeromq-4.3.5.tar.gz /tmp/source.tgz + +WORKDIR /build + +RUN tar -xzf /tmp/source.tgz --strip-components=1 + +RUN ./configure --prefix=/build/output +RUN make install + + + +# copy the build output +FROM alpine:3.19 + +COPY --from=builder /build/output /build/output diff --git a/libzmq/build_and_extract.sh b/libzmq/build_and_extract.sh new file mode 100755 index 0000000..67b2886 --- /dev/null +++ b/libzmq/build_and_extract.sh @@ -0,0 +1,15 @@ +#!/bin/sh + +set -e + +IMAGE=zzmq_libzmq_345345345 + +DOCKER_BUILDKIT=1 docker build . -t $IMAGE + +if [ -d output ]; then + rm -rf output +fi + +mkdir -p output + +docker run -v "$PWD/output:/mnt" $IMAGE sh -c "cp -rf /build/output/* /mnt/ && chown $(id -u):$(id -g) -R /mnt/" diff --git a/src/classes/zcontext.zig b/src/classes/zcontext.zig new file mode 100644 index 0000000..46cf3e5 --- /dev/null +++ b/src/classes/zcontext.zig @@ -0,0 +1,79 @@ +const std = @import("std"); +const c = @import("../zmq.zig").c; + +/// Version information of the `libzmq` in use. +pub const ZVersion = struct { + major: u16, + minor: u16, + patch: u16, +}; + +/// Creates a new ZermoMQ context. +/// +/// Multiple contextes can exist independently, e.g. for libraries. +/// +/// A 0MQ 'context' is thread safe and may be shared among as many application threads as necessary, +/// without any additional locking required on the part of the caller. +pub const ZContext = struct { + allocator_: std.mem.Allocator, + ctx_: *anyopaque, + + pub fn init(allocator: std.mem.Allocator) !ZContext { + // check the libzmq version 4.x + if (ZContext.version().major != 4) { + return error.LibZmqVersionMismatch; + } + + // try creating the socket, early + var s = c.zmq_ctx_new() orelse { + switch (c.zmq_errno()) { + c.EMFILE => return error.MaxOpenFilesExceeded, + else => return error.ContextCreateFailed, + } + }; + errdefer { + c.zmq_ctx_term(s); + } + + // done + return .{ + .allocator_ = allocator, + .ctx_ = s, + }; + } + + /// Destroy the socket and clean up + pub fn deinit(self: *ZContext) void { + _ = c.zmq_ctx_term(self.ctx_); + } + + /// Returns the version of the `libzmq` shared library. + pub fn version() ZVersion { + var major: c_int = undefined; + var minor: c_int = undefined; + var patch: c_int = undefined; + + c.zmq_version(&major, &minor, &patch); + + return .{ + .major = @intCast(major), + .minor = @intCast(minor), + .patch = @intCast(patch), + }; + } +}; + +test "ZContext - roundtrip" { + const allocator = std.testing.allocator; + + var incoming = try ZContext.init(allocator); + defer incoming.deinit(); +} + +test "ZContext - version" { + const v = ZContext.version(); + + std.log.info("Version: {}.{}.{}", .{ v.major, v.minor, v.patch }); + + try std.testing.expectEqual(@as(u16, 4), v.major); +} diff --git a/src/classes/zframe.zig b/src/classes/zframe.zig deleted file mode 100644 index 391df8d..0000000 --- a/src/classes/zframe.zig +++ /dev/null @@ -1,127 +0,0 @@ -const std = @import("std"); -const c = @import("../czmq.zig").c; - -pub const ZFrame = struct { - frame: *c.zframe_t, - frameOwned: bool = true, - - /// Creates a frame based on the provided data. - /// - /// The data is being copied into the frame. - pub fn init(d: []const u8) !ZFrame { - return ZFrame{ - .frame = c.zframe_new(&d[0], d.len) orelse return error.FrameAllocFailed, - }; - } - - /// Creates an empty frame, e.g. for delimiter frames. - pub fn initEmpty() !ZFrame { - return ZFrame{ - .frame = c.zframe_new_empty() orelse return error.FrameAllocFailed, - }; - } - - /// Retrieves a slice to the data stored within the frame. - /// - /// Example: - /// allocator.dupe(u8, frame.data()); - pub fn data(self: *const ZFrame) ![]const u8 { - if (!self.frameOwned) return error.FrameOwnershipLost; - - const s = c.zframe_size(self.frame); - if (s <= 0) { - return ""; - } - - const d = c.zframe_data(self.frame); - - return d[0..s]; - } - - /// Retrieves a size of data within the frame. - pub fn size(self: *const ZFrame) !usize { - if (!self.frameOwned) return error.FrameOwnershipLost; - - return c.zframe_size(self.frame); - } - - /// Creates a copy of the frame. - pub fn clone(self: *ZFrame) !ZFrame { - if (!self.frameOwned) return error.FrameOwnershipLost; - - return ZFrame{ - .frame = c.zframe_dup(self.frame) orelse return error.FrameAllocFailed, - }; - } - - /// When *receiving* frames of message, this function returns - /// true, if more frames are available to be received, - /// as part of a multi-part message. - pub fn hasMore(self: *ZFrame) !bool { - if (!self.frameOwned) return error.FrameOwnershipLost; - - return c.zframe_more(self.frame) != 0; - } - - /// Destroys the frame and performs clean up. - pub fn deinit(self: *ZFrame) void { - if (self.frameOwned) { - var d: ?*c.zframe_t = self.frame; - - c.zframe_destroy(&d); - - self.frameOwned = false; - } - } -}; - -test "ZFrame - create and destroy" { - const msg = "hello world"; - - var data = try ZFrame.init(msg); - defer data.deinit(); - - try std.testing.expectEqual(msg.len, try data.size()); - try std.testing.expectEqualStrings(msg, try data.data()); - - // create and test a clone - var clone = try data.clone(); - defer clone.deinit(); - - try std.testing.expectEqual(msg.len, try clone.size()); - try std.testing.expectEqualStrings(msg, try clone.data()); -} - -test "ZFrame - empty and destroy" { - var data = try ZFrame.initEmpty(); - defer data.deinit(); - - try std.testing.expectEqual(@as(usize, 0), try data.size()); - try std.testing.expectEqualStrings("", try data.data()); - - // create and test a clone - var clone = try data.clone(); - defer clone.deinit(); - - try std.testing.expectEqual(@as(usize, 0), try clone.size()); - try std.testing.expectEqualStrings("", try clone.data()); -} - -test "ZFrame - ownership lost" { - var data = try ZFrame.initEmpty(); - defer data.deinit(); - - try std.testing.expectEqual(true, data.frameOwned); - try std.testing.expectEqual(@as(usize, 0), try data.size()); - - // force loosing ownership - data.frameOwned = false; - - try std.testing.expectError(error.FrameOwnershipLost, data.size()); - try std.testing.expectError(error.FrameOwnershipLost, data.data()); - try std.testing.expectError(error.FrameOwnershipLost, data.clone()); - try std.testing.expectError(error.FrameOwnershipLost, data.hasMore()); - - // restore ownership to not leak memory - data.frameOwned = true; -} diff --git a/src/classes/zmessage.zig b/src/classes/zmessage.zig new file mode 100644 index 0000000..3048213 --- /dev/null +++ b/src/classes/zmessage.zig @@ -0,0 +1,312 @@ +const std = @import("std"); +const c = @import("../zmq.zig").c; + +const ZMessageType = enum { + /// The message content memory is managed internally by Zig. + /// + /// Since the content is reference-counted, it is safe + /// to call `deinit()` after queueing its content for sending. + /// + /// The main use case is outgoing messages. + Internal, + + /// The message content memory is managed by libzmq. + /// + /// Since the content ownership is tracked, it is safe to + /// call `deinit()` after queueing its content for sending. + /// + /// The main use case is incoming messages. + External, +}; + +const ZMessageImpl = union(ZMessageType) { + Internal: ZMessageInternal, + External: ZMessageExternal, +}; + +pub const ZMessage = struct { + impl_: ZMessageImpl, + + /// Creates a message based on the provided data. + /// + /// The data is being copied into the message. + pub fn init(allocator: std.mem.Allocator, d: []const u8) !ZMessage { + return .{ .impl_ = .{ + .Internal = try ZMessageInternal.init(allocator, d), + } }; + } + + /// Creates a message based on the provided data. + /// + /// If an allocator is provided, it will be used to free + /// the provided data. If not, the caller is responsible for + /// freeing the memory at some point. + pub fn initUnmanaged(d: []const u8, allocator: ?std.mem.Allocator) !ZMessage { + return .{ .impl_ = .{ + .Internal = try ZMessageInternal.initUnmanaged(d, allocator), + } }; + } + + /// Takes the ownership over the provided raw message. + /// + /// Note: the message argument must be initalized, using `zmq_msg_init()` or other similar functions. + /// + /// The ownership can be lost when sending the message. + pub fn initExternal(message: *c.zmq_msg_t) !ZMessage { + return .{ .impl_ = .{ + .External = try ZMessageExternal.init(message), + } }; + } + + /// Creates an empty message, e.g. for delimiters. + /// + /// The ownership can be lost when sending the message. + pub fn initExternalEmpty() !ZMessage { + return .{ .impl_ = .{ + .External = try ZMessageExternal.initEmpty(), + } }; + } + + /// Retrieves a slice to the data stored within the message. + pub fn data(self: *const ZMessage) ![]const u8 { + switch (self.impl_) { + .Internal => return self.impl_.Internal.data(), + .External => return try self.impl_.External.data(), + } + } + + /// Retrieves a size of data within the message. + pub fn size(self: *const ZMessage) !usize { + switch (self.impl_) { + .Internal => return self.impl_.Internal.size(), + .External => return try self.impl_.External.size(), + } + } + + /// Allocates (or moves ownership of) external representation the message. + /// + /// This is used for tranferring ownership of data to ZeroMQ internal functions + /// like `zmq_msg_send()`. + pub fn allocExternal(self: *ZMessage) !ZMessageExternal { + switch (self.impl_) { + .Internal => return try self.impl_.Internal.allocExternal(), + .External => { + var cpy = self.impl_.External; + + self.impl_.External.msgOwned_ = false; // ownership moved away + + return cpy; + }, + } + } + + /// Destroys the message and performs clean up. + pub fn deinit(self: *ZMessage) void { + switch (self.impl_) { + .Internal => self.impl_.Internal.deinit(), + .External => self.impl_.External.deinit(), + } + } +}; + +const ZMessageInternal = struct { + data_: []const u8, + refCounter_: usize = 1, // starting with Zig 0.12, replace by std.Thread.Atomic(usize) + allocator_: ?std.mem.Allocator = null, + + /// Creates a message based on the provided data. + /// + /// The data is being copied into the message. + pub fn init(allocator: std.mem.Allocator, d: []const u8) !ZMessageInternal { + const d2 = try allocator.dupe(u8, d); + errdefer allocator.free(d2); + + return try initUnmanaged(d2, allocator); + } + + /// Creates a message based on the provided data. + /// + /// If an allocator is provided, it will be used to free + /// the provided data. If not, the caller is responsible for + /// freeing the memory at some point. + pub fn initUnmanaged(d: []const u8, allocator: ?std.mem.Allocator) !ZMessageInternal { + return .{ + .data_ = d, + .allocator_ = allocator, + }; + } + + fn allocExternalFree(d: ?*anyopaque, hint: ?*anyopaque) callconv(.C) void { + _ = d; + + var r: *ZMessageInternal = @alignCast(@ptrCast(hint.?)); + + r.refRelease(); + } + + /// Creates a new external message object which points to the + /// data stored within this internal message. + pub fn allocExternal(self: *ZMessageInternal) !ZMessageExternal { + var message: c.zmq_msg_t = undefined; + + if (self.data_.len <= 0) { + const result = c.zmq_msg_init(&message); + if (result < 0) { + return error.FrameInitFailed; + } + } else { + const result = c.zmq_msg_init_data(&message, @constCast(&self.data_[0]), self.data_.len, &allocExternalFree, self); + if (result < 0) { + return error.FrameInitFailed; + } + } + + self.refAdd(); // increase reference count + + return try ZMessageExternal.init(&message); + } + + /// Retrieves a slice to the data stored within the message. + pub fn data(self: *const ZMessageInternal) []const u8 { + return self.data_; + } + + /// Retrieves a size of data within the message. + pub fn size(self: *const ZMessageInternal) usize { + return self.data_.len; + } + + fn refAdd(self: *ZMessageInternal) void { + _ = @atomicRmw(usize, &self.refCounter_, .Add, 1, .SeqCst); + } + + fn refRelease(self: *ZMessageInternal) void { + const prev = @atomicRmw(usize, &self.refCounter_, .Sub, 1, .SeqCst); + + if (prev == 1) { // it's now zero + if (self.allocator_) |a| { + a.free(self.data_); + } + } + } + + /// Destroys the message and performs clean up. + pub fn deinit(self: *ZMessageInternal) void { + self.refRelease(); + } +}; + +test "ZMessageInternal - create and destroy" { + const allocator = std.testing.allocator; + const msg = "hello world"; + + var data = try ZMessageInternal.init(allocator, msg); + defer data.deinit(); + + try std.testing.expectEqual(msg.len, data.size()); + try std.testing.expectEqualStrings(msg, data.data()); +} + +test "ZMessageInternal - create unmanaged" { + const allocator = std.testing.allocator; + const msg = try allocator.dupe(u8, "Hello World!"); // duplicate to track the memory + defer allocator.free(msg); + + var data = try ZMessageInternal.initUnmanaged(msg, null); + defer data.deinit(); + + try std.testing.expectEqual(msg.len, data.size()); + try std.testing.expectEqualStrings(msg, data.data()); +} + +const ZMessageExternal = struct { + msg_: c.zmq_msg_t, + msgOwned_: bool = true, + + /// Takes the ownership over the provided raw message. + /// + /// Note: the message argument must be initalized, using `zmq_msg_init()` or other similar functions. + pub fn init(message: *c.zmq_msg_t) !ZMessageExternal { + return .{ + .msg_ = message.*, + }; + } + + /// Creates an empty message, e.g. for delimiters. + pub fn initEmpty() !ZMessageExternal { + var r = ZMessageExternal{ + .msg_ = undefined, + }; + + const result = c.zmq_msg_init(&r.msg_); + if (result < 0) { + return error.MessageAllocFailed; + } + + return r; + } + + /// Retrieves a slice to the data stored within the message. + /// + /// Example: + /// allocator.dupe(u8, message.data()); + pub fn data(self: *const ZMessageExternal) ![]const u8 { + if (!self.msgOwned_) return error.MessageOwnershipLost; + + const m: *c.zmq_msg_t = @constCast(&self.msg_); + const s = c.zmq_msg_size(m); + if (s <= 0) { + return ""; + } + + const d = c.zmq_msg_data(m).?; + const dd: [*c]u8 = @ptrCast(d); + + return dd[0..s]; + } + + /// Retrieves a size of data within the message. + pub fn size(self: *const ZMessageExternal) !usize { + if (!self.msgOwned_) return error.MessageOwnershipLost; + + const m: *c.zmq_msg_t = @constCast(&self.msg_); + + return c.zmq_msg_size(m); + } + + /// Moves ownership of raw representation of the message. + /// + /// The returned object must either be freed by using `c.zmq_msg_close()` + /// or by providing it to functions like `c.zmq_msg_send()`. + pub fn move(self: *ZMessageExternal) ![*c]c.zmq_msg_t { + if (!self.msgOwned_) return error.MessageOwnershipLost; + + self.msgOwned_ = false; + + return &self.msg_; // returning a pointer is fine, here + } + + /// Destroys the message and performs clean up. + pub fn deinit(self: *ZMessageExternal) void { + if (self.msgOwned_) { + _ = c.zmq_msg_close(&self.msg_); + + self.msgOwned_ = false; + } + } +}; + +test "ZMessageExternal - ownership lost" { + var data = try ZMessageExternal.initEmpty(); + defer data.deinit(); + + try std.testing.expectEqual(true, data.msgOwned_); + try std.testing.expectEqual(@as(usize, 0), try data.size()); + + // force loosing ownership + data.msgOwned_ = false; + defer data.msgOwned_ = true; // restore ownership to not leak memory + + try std.testing.expectError(error.MessageOwnershipLost, data.size()); + try std.testing.expectError(error.MessageOwnershipLost, data.data()); +} diff --git a/src/classes/zsocket.zig b/src/classes/zsocket.zig index 25567f1..fc8fb43 100644 --- a/src/classes/zsocket.zig +++ b/src/classes/zsocket.zig @@ -1,13 +1,54 @@ const std = @import("std"); -const zframe = @import("zframe.zig"); -const c = @import("../czmq.zig").c; +const zcontext = @import("zcontext.zig"); +const zmessage = @import("zmessage.zig"); +const c = @import("../zmq.zig").c; + +pub const ZMessageReceived = struct { + message_: zmessage.ZMessage = undefined, + hasMore_: bool = false, + + /// Takes the ownership over the provided raw message. + /// + /// Note: the message argument must be initalized, using `zmq_msg_init()` or other similar functions. + /// + /// The ownership can be lost when sending the message. + pub fn init(message: *c.zmq_msg_t, hasMoreParts: bool) !ZMessageReceived { + return .{ + .message_ = try zmessage.ZMessage.initExternal(message), + .hasMore_ = hasMoreParts, + }; + } + + /// Retrieves a slice to the data stored within the message. + pub fn data(self: *const ZMessageReceived) ![]const u8 { + return self.message_.data(); + } + + /// Retrieves a size of data within the message. + pub fn size(self: *const ZMessageReceived) !usize { + return self.message_.size(); + } + + /// Returns true, if the message is part of a multi-message + /// and more message parts are available. + /// + /// To retrieve the next part, call `socket.receive()` again. + pub fn hasMore(self: *const ZMessageReceived) bool { + return self.hasMore_; + } + + /// Destroys the message and performs clean up. + pub fn deinit(self: *ZMessageReceived) void { + self.message_.deinit(); + } +}; pub const ZSocketType = enum(c_int) { /// A socket of type ZMQ_PAIR can only be connected to a single peer at any one time. /// /// No message routing or filtering is performed on messages sent over a ZMQ_PAIR socket. /// - /// For more details, see https://libzmq.readthedocs.io/en/latest/zmq_socket.html . + /// For more details, see https://libzmq.readthedocs.io/en/zeromq3-x/zmq_socket.html . Pair = c.ZMQ_PAIR, /// A socket of type ZMQ_PUB is used by a publisher to distribute data. @@ -15,7 +56,7 @@ pub const ZSocketType = enum(c_int) { /// Messages sent are distributed in a fan out fashion to all connected peers. /// The zmq_recv function is not implemented for this socket type. /// - /// For more details, see https://libzmq.readthedocs.io/en/latest/zmq_socket.html . + /// For more details, see https://libzmq.readthedocs.io/en/zeromq3-x/zmq_socket.html . Pub = c.ZMQ_PUB, /// A socket of type ZMQ_SUB is used by a subscriber to subscribe to data distributed by a publisher. @@ -24,21 +65,21 @@ pub const ZSocketType = enum(c_int) { /// of zmq_setsockopt to specify which messages to subscribe to. /// The zmq_send() function is not implemented for this socket type. /// - /// For more details, see https://libzmq.readthedocs.io/en/latest/zmq_socket.html . + /// For more details, see https://libzmq.readthedocs.io/en/zeromq3-x/zmq_socket.html . Sub = c.ZMQ_SUB, /// Same as ZMQ_PUB except that you can receive subscriptions from the peers in form of incoming messages. /// /// Subscription message is a byte 1 (for subscriptions) or byte 0 (for unsubscriptions) followed by the subscription body. /// - /// For more details, see https://libzmq.readthedocs.io/en/latest/zmq_socket.html . + /// For more details, see https://libzmq.readthedocs.io/en/zeromq3-x/zmq_socket.html . XPub = c.ZMQ_XPUB, /// Same as ZMQ_SUB except that you subscribe by sending subscription messages to the socket. /// /// Subscription message is a byte 1 (for subscriptions) or byte 0 (for unsubscriptions) followed by the subscription body. /// - /// For more details, see https://libzmq.readthedocs.io/en/latest/zmq_socket.html . + /// For more details, see https://libzmq.readthedocs.io/en/zeromq3-x/zmq_socket.html . XSub = c.ZMQ_XSUB, /// A socket of type ZMQ_REQ is used by a client to send requests to and receive replies from a service. @@ -47,7 +88,7 @@ pub const ZSocketType = enum(c_int) { /// and subsequent zmq_recv(reply) calls. Each request sent is round-robined among all services, /// and each reply received is matched with the last issued request. /// - /// For more details, see https://libzmq.readthedocs.io/en/latest/zmq_socket.html . + /// For more details, see https://libzmq.readthedocs.io/en/zeromq3-x/zmq_socket.html . Req = c.ZMQ_REQ, /// A socket of type ZMQ_REP is used by a service to receive requests from and send replies to a client. @@ -56,14 +97,14 @@ pub const ZSocketType = enum(c_int) { /// Each request received is fair-queued from among all clients, and each reply sent is routed to the client that /// issued the last request. If the original requester doesn’t exist any more the reply is silently discarded. /// - /// For more details, see https://libzmq.readthedocs.io/en/latest/zmq_socket.html . + /// For more details, see https://libzmq.readthedocs.io/en/zeromq3-x/zmq_socket.html . Rep = c.ZMQ_REP, /// A socket of type ZMQ_DEALER is an advanced pattern used for extending request/reply sockets. /// /// Each message sent is round-robined among all connected peers, and each message received is fair-queued from all connected peers. /// - /// For more details, see https://libzmq.readthedocs.io/en/latest/zmq_socket.html . + /// For more details, see https://libzmq.readthedocs.io/en/zeromq3-x/zmq_socket.html . Dealer = c.ZMQ_DEALER, /// A socket of type ZMQ_ROUTER is an advanced socket type used for extending request/reply sockets. @@ -72,14 +113,14 @@ pub const ZSocketType = enum(c_int) { /// of the originating peer to the message before passing it to the application. /// Messages received are fair-queued from among all connected peers. /// - /// For more details, see https://libzmq.readthedocs.io/en/latest/zmq_socket.html . + /// For more details, see https://libzmq.readthedocs.io/en/zeromq3-x/zmq_socket.html . Router = c.ZMQ_ROUTER, /// A socket of type ZMQ_PULL is used by a pipeline node to receive messages from upstream pipeline nodes. /// /// Messages are fair-queued from among all connected upstream nodes. The zmq_send() function is not implemented for this socket type. /// - /// For more details, see https://libzmq.readthedocs.io/en/latest/zmq_socket.html . + /// For more details, see https://libzmq.readthedocs.io/en/zeromq3-x/zmq_socket.html . Pull = c.ZMQ_PULL, /// A socket of type ZMQ_PUSH is used by a pipeline node to send messages to downstream pipeline nodes. @@ -87,7 +128,7 @@ pub const ZSocketType = enum(c_int) { /// Messages are round-robined to all connected downstream nodes. /// The zmq_recv() function is not implemented for this socket type. /// - /// For more details, see https://libzmq.readthedocs.io/en/latest/zmq_socket.html . + /// For more details, see https://libzmq.readthedocs.io/en/zeromq3-x/zmq_socket.html . Push = c.ZMQ_PUSH, }; @@ -99,6 +140,8 @@ pub const ZSocketOptionTag = enum { SendTimeout, SendHighWaterMark, SendBufferSize, + + LingerTimeout, }; pub const ZSocketOption = union(ZSocketOptionTag) { @@ -127,7 +170,7 @@ pub const ZSocketOption = union(ZSocketOptionTag) { /// /// Refer to the individual socket descriptions in zmq_socket(3) for details on the exact action taken for each socket type. /// - /// Unit: message count + /// Unit: message count (0 = unlimited) /// Default: 1000 /// /// For more details, see https://libzmq.readthedocs.io/en/latest/zmq_setsockopt.html @@ -176,7 +219,7 @@ pub const ZSocketOption = union(ZSocketOptionTag) { /// Refer to the individual socket descriptions /// in zmq_socket(3) for details on the exact action taken for each socket type. /// - /// Unit: message count + /// Unit: message count (0 = unlimited) /// Default: 1000 /// /// For more details, see https://libzmq.readthedocs.io/en/latest/zmq_setsockopt.html @@ -195,6 +238,19 @@ pub const ZSocketOption = union(ZSocketOptionTag) { /// /// For more details, see https://libzmq.readthedocs.io/en/latest/zmq_setsockopt.html SendBufferSize: i32, + + /// ZMQ_LINGER: Set linger period for socket shutdown + /// + /// The 'ZMQ_LINGER' option shall set the linger period for the specified 'socket'. + /// The linger period determines how long pending messages which have yet to be sent + /// to a peer shall linger in memory after a socket is disconnected with zmq_disconnect or + /// closed with zmq_close, and further affects the termination of the socket’s context with zmq_ctx_term. + /// + /// Unit: milliseconds (0 = don't wait, -1 = infinite) + /// Default: 0 (don't wait) + /// + /// For more details, see https://libzmq.readthedocs.io/en/latest/zmq_setsockopt.html + LingerTimeout: i32, }; /// System level socket, which allows for opening outgoing and @@ -203,88 +259,106 @@ pub const ZSocketOption = union(ZSocketOptionTag) { /// It is either used to *bind* to a local port, or *connect* /// to a remote (or local) port. pub const ZSocket = struct { - allocator: std.mem.Allocator, - selfArena: std.heap.ArenaAllocator, - socket: *c.zsock_t, - type: []const u8, - endpoint: ?[]const u8, + allocator_: std.mem.Allocator, + selfArena_: std.heap.ArenaAllocator, + socket_: *anyopaque, + endpoint_: ?[]const u8, - pub fn init(allocator: std.mem.Allocator, socketType: ZSocketType) !*ZSocket { + pub fn init(socketType: ZSocketType, context: *zcontext.ZContext) !*ZSocket { // try creating the socket, early - var s = c.zsock_new(@intFromEnum(socketType)) orelse return error.SocketCreateFailed; - errdefer { - var ss: ?*c.zsock_t = s; - c.zsock_destroy(&ss); - } + var s = c.zmq_socket(context.ctx_, @intFromEnum(socketType)) orelse return error.SocketCreateFailed; + errdefer _ = c.zmq_close(s); // create the managed object + const allocator = context.allocator_; + var selfArena = std.heap.ArenaAllocator.init(allocator); errdefer selfArena.deinit(); const selfAllocator = selfArena.allocator(); var r = try selfAllocator.create(ZSocket); - r.allocator = allocator; - r.selfArena = selfArena; - r.socket = s; - r.endpoint = null; - - // get the socket type as string - const typeStrZ = std.mem.span(c.zsock_type_str(s)); + r.allocator_ = allocator; + r.selfArena_ = selfArena; + r.socket_ = s; + r.endpoint_ = null; - r.type = try selfAllocator.dupe(u8, typeStrZ[0..typeStrZ.len]); // copy to managed memory + // set docket defaults + try r.setSocketOption(.{ .SendHighWaterMark = 1000 }); + try r.setSocketOption(.{ .ReceiveHighWaterMark = 1000 }); + try r.setSocketOption(.{ .LingerTimeout = 0 }); // done return r; } - /// Bind a socket to a endpoint. For tcp:// endpoints, supports - /// ephemeral ports, if you specify the port number as "*". By default - /// zsock uses the IANA designated range from C000 (49152) to FFFF (65535). - /// To override this range, follow the "*" with "[first-last]". Either or - /// both first and last may be empty. To bind to a random port within the - /// range, use "!" in place of "*". + /// Returns the actual endpoint being used for this socket. /// - /// Examples: - /// tcp://127.0.0.1:* bind to first free port from C000 up - /// tcp://127.0.0.1:! bind to random port from C000 to FFFF - /// tcp://127.0.0.1:*[60000-] bind to first free port from 60000 up - /// tcp://127.0.0.1:![-60000] bind to random port from C000 to 60000 - /// tcp://127.0.0.1:![55000-55999] bind to random port from 55000 to 55999 + /// When binding TCP and IPC transports, it will also contain + /// the actual port being used. + pub fn endpoint(self: *const ZSocket) ![]const u8 { + if (self.endpoint_) |e| { + return e; + } + + return error.NotBoundOrConnected; + } + + /// Bind a socket to a endpoint. For tcp:// endpoints, supports + /// ephemeral ports, if you specify the port number as "*". /// - /// On success, returns the actual port number used, for tcp:// endpoints, - /// and 0 for other transports. Note that when using - /// ephemeral ports, a port may be reused by different services without - /// clients being aware. Protocols that run on ephemeral ports should take - /// this into account. + /// Call `endpoint()` to retrieve the actual endpoint being used. + /// + /// Examples: + /// tcp://127.0.0.1:* bind to first free port from C000 up /// /// Example: /// var socket = ZSocket.init(ZSocketType.Pair); /// defer socket.deinit(); /// - /// const port = try socket.bind("tcp://127.0.0.1:!"); + /// const port = try socket.bind("tcp://127.0.0.1:*"); /// - /// For more details, see https://libzmq.readthedocs.io/en/latest/zmq_bind.html . - pub fn bind(self: *ZSocket, ep: []const u8) !u16 { - const epZ = try self.allocator.dupeZ(u8, ep); - defer self.allocator.free(epZ); - - const result = c.zsock_bind(self.socket, "%s", &epZ[0]); + /// For more details, see https://libzmq.readthedocs.io/en/zeromq3-x/zmq_tcp.html + pub fn bind(self: *ZSocket, ep: []const u8) !void { + const epZ = try self.allocator_.dupeZ(u8, ep); + defer self.allocator_.free(epZ); + var result = c.zmq_bind(self.socket_, &epZ[0]); if (result < 0) { - return error.SocketBindFailed; + switch (c.zmq_errno()) { + c.EINVAL => return error.EndpointInvalid, + c.EPROTONOSUPPORT => return error.TransportUnsupported, + c.ENOCOMPATPROTO => return error.TransportIncompatible, + c.EADDRINUSE => return error.AddressAlreadyInUse, + c.EADDRNOTAVAIL => return error.AddressNotLocal, + c.ENODEV => return error.AddressInterfaceInvalid, + c.EMTHREAD => return error.IOThreadsExceeded, + else => return error.SocketBindFailed, + } } + errdefer _ = c.zmq_unbind(self.socket_, &epZ[0]); // retrieve endpoint value - const selfAllocator = self.selfArena.allocator(); + var lastEndpoint: [256]u8 = undefined; + var lastEndpointLen: usize = @sizeOf(@TypeOf(lastEndpoint)); - if (self.endpoint) |e| { - selfAllocator.free(e); + lastEndpoint[0] = 0; // set 0 terminator + + result = c.zmq_getsockopt(self.socket_, c.ZMQ_LAST_ENDPOINT, &lastEndpoint[0], &lastEndpointLen); + if (result < 0) { + return error.GetEndpointFailed; } - self.endpoint = try selfAllocator.dupe(u8, ep); // copy to managed memory + const selfAllocator = self.selfArena_.allocator(); - // done - return @intCast(result); + if (self.endpoint_) |e| { // free existing value + selfAllocator.free(e); + } + + if (lastEndpoint[0] != 0 and lastEndpointLen > 0) { + self.endpoint_ = try selfAllocator.dupe(u8, lastEndpoint[0 .. lastEndpointLen - 1]); // cut terminating 0 + } else { + self.endpoint_ = try selfAllocator.dupe(u8, ep); // copy to managed memory + } } /// Connect a socket to an endpoint @@ -295,209 +369,319 @@ pub const ZSocket = struct { /// /// try socket.connect("tcp://127.0.0.1:54321"); /// - /// For more details, see https://libzmq.readthedocs.io/en/latest/zmq_connect.html . + /// For more details, see https://libzmq.readthedocs.io/en/zeromq3-x/zmq_connect.html . pub fn connect(self: *ZSocket, ep: []const u8) !void { - const epZ = try self.allocator.dupeZ(u8, ep); - defer self.allocator.free(epZ); + const epZ = try self.allocator_.dupeZ(u8, ep); + defer self.allocator_.free(epZ); - const result = c.zsock_connect(self.socket, "%s", &epZ[0]); + const result = c.zmq_connect(self.socket_, &epZ[0]); if (result < 0) { - return error.SocketConnectFailed; + switch (c.zmq_errno()) { + c.EINVAL => return error.EndpointInvalid, + c.EPROTONOSUPPORT => return error.TransportUnsupported, + c.ENOCOMPATPROTO => return error.TransportIncompatible, + c.EMTHREAD => return error.IOThreadsExceeded, + else => return error.SocketConnectFailed, + } } // retrieve endpoint value - const selfAllocator = self.selfArena.allocator(); + const selfAllocator = self.selfArena_.allocator(); - if (self.endpoint) |e| { + if (self.endpoint_) |e| { // free existing value selfAllocator.free(e); } - self.endpoint = try selfAllocator.dupe(u8, ep); // copy to managed memory + self.endpoint_ = try selfAllocator.dupe(u8, ep); // copy to managed memory } - /// Send a frame to a socket. + /// Send a message to a socket. /// - /// Note: The frame will lose ownership and become invalid, unless `options.reuse` is true. + /// Note: The message will lose ownership and become invalid, unless `options.reuse` is true. /// /// Example: - /// var frame = try ZFrame.init(data); - /// defer frame.deinit(); + /// var message = try ZMessage.init(data); + /// defer message.deinit(); /// - /// try socket.send(&frame, .{}); - pub fn send(self: *ZSocket, frame: *zframe.ZFrame, options: struct { - /// Indicates that this frame is part of a multi-part message - /// and more frames will be sent. + /// try socket.send(&message, .{}); + pub fn send(self: *ZSocket, message: *zmessage.ZMessage, options: struct { + /// Indicates that this message is part of a multi-part message + /// and more messages will be sent. /// - /// On the receiving side, will cause `frame.hasMore()` to return true. + /// On the receiving side, will cause `message.hasMore()` to return true. more: bool = false, - /// Indicates that the provided frame shall not loose - /// ownership over its data. - /// - /// Important: This requires the frame data to live as long - /// as it takes for the frame to be sent. - /// Consider setting `dontwait` to true. - reuse: bool = false, - - /// Do not wait for the frame to be sent, but return immediately. + /// Do not wait for the message to be sent, but return immediately. dontwait: bool = false, }) !void { - var f: ?*c.zframe_t = frame.frame; - var flags: c_int = 0; - if (options.more) flags |= c.ZFRAME_MORE; - if (options.reuse) flags |= c.ZFRAME_REUSE; - if (options.dontwait) flags |= c.ZFRAME_DONTWAIT; + if (options.more) flags |= c.ZMQ_SNDMORE; + if (options.dontwait) flags |= c.ZMQ_DONTWAIT; - const result = c.zframe_send(&f, self.socket, flags); - if (result < 0) { - return error.SendFrameFailed; - } + var messageExt = try message.allocExternal(); + defer messageExt.deinit(); - // frame lost ownership of internal object - if (!options.reuse) { - frame.frameOwned = false; + const result = c.zmq_msg_send(try messageExt.move(), self.socket_, flags); + if (result < 0) { + switch (c.zmq_errno()) { + c.EAGAIN => return error.NonBlockingQueueFull, + c.ENOTSUP => return error.SocketTypeUnsupported, + c.EFSM => return error.SocketStateInvalid, + c.EINTR => return error.Interrupted, + c.EFAULT => return error.MessageInvalid, + else => return error.SendMessageFailed, + } } } - /// Receive a single frame of a message from the socket. + /// Receive a single message of a message from the socket. /// Does a blocking recv, if you want to not block then use /// zpoller or zloop. /// - /// The caller must invoke `deinit()` on the returned frame. + /// The caller must invoke `deinit()` on the returned message. /// - /// If receiving a multi-part message, then `frame.hasMore()` will return true + /// If receiving a multi-part message, then `message.hasMore()` will return true /// and the another receive call should be invoked. /// /// Example: - /// var frame = try socket.receive(); - /// defer frame.deinit(); - /// - /// const data = try frame.data(); - pub fn receive(self: *ZSocket) !zframe.ZFrame { - var frame = c.zframe_recv(self.socket); - if (frame == null) { - return error.ReceiveFrameInterrupted; + /// var message = try socket.receive(.{}); + /// defer message.deinit(); + /// + /// const data = try message.data(); + pub fn receive(self: *ZSocket, options: struct { + /// Do not wait for the message to be received, but return immediately. + dontwait: bool = false, + }) !ZMessageReceived { + var flags: c_int = 0; + if (options.dontwait) flags |= c.ZMQ_DONTWAIT; + + var message: c.zmq_msg_t = undefined; + var result = c.zmq_msg_init(&message); + if (result < 0) { + return error.InitMessageFailed; + } + + result = c.zmq_msg_recv(&message, self.socket_, flags); + if (result < 0) { + switch (c.zmq_errno()) { + c.EAGAIN => return error.NonBlockingQueueEmpty, + c.ENOTSUP => return error.SocketTypeUnsupported, + c.EFSM => return error.SocketStateInvalid, + c.EINTR => return error.Interrupted, + c.EFAULT => return error.MessageInvalid, + else => return error.ReceiveMessageFailed, + } } - return zframe.ZFrame{ .frame = frame.? }; + // check if this is a multi-part message + var hasMore: c_int = undefined; + var hasMoreLen: usize = @sizeOf(@TypeOf(hasMore)); + + result = c.zmq_getsockopt(self.socket_, c.ZMQ_RCVMORE, &hasMore, &hasMoreLen); + if (result < 0) { + return error.CheckForMoreFailed; + } + + return ZMessageReceived.init(&message, hasMore != 0); } /// Set an option on the socket. See `ZSocketOption` for details. pub fn setSocketOption(self: *ZSocket, opt: ZSocketOption) !void { + var result: c_int = 0; + switch (opt) { - .ReceiveTimeout => c.zsock_set_rcvtimeo(self.socket, @intCast(opt.ReceiveTimeout)), - .ReceiveHighWaterMark => c.zsock_set_rcvhwm(self.socket, @intCast(opt.ReceiveHighWaterMark)), - .ReceiveBufferSize => c.zsock_set_rcvbuf(self.socket, @intCast(opt.ReceiveBufferSize)), + .ReceiveTimeout => result = c.zmq_setsockopt(self.socket_, c.ZMQ_RCVTIMEO, &opt.ReceiveTimeout, @sizeOf(@TypeOf(opt.ReceiveTimeout))), + .ReceiveHighWaterMark => result = c.zmq_setsockopt(self.socket_, c.ZMQ_RCVHWM, &opt.ReceiveHighWaterMark, @sizeOf(@TypeOf(opt.ReceiveHighWaterMark))), + .ReceiveBufferSize => result = c.zmq_setsockopt(self.socket_, c.ZMQ_RCVBUF, &opt.ReceiveBufferSize, @sizeOf(@TypeOf(opt.ReceiveBufferSize))), + + .SendTimeout => result = c.zmq_setsockopt(self.socket_, c.ZMQ_SNDTIMEO, &opt.SendTimeout, @sizeOf(@TypeOf(opt.SendTimeout))), + .SendHighWaterMark => result = c.zmq_setsockopt(self.socket_, c.ZMQ_SNDHWM, &opt.SendHighWaterMark, @sizeOf(@TypeOf(opt.SendHighWaterMark))), + .SendBufferSize => result = c.zmq_setsockopt(self.socket_, c.ZMQ_SNDBUF, &opt.SendBufferSize, @sizeOf(@TypeOf(opt.SendBufferSize))), - .SendTimeout => c.zsock_set_sndtimeo(self.socket, @intCast(opt.SendTimeout)), - .SendHighWaterMark => c.zsock_set_sndhwm(self.socket, @intCast(opt.SendHighWaterMark)), - .SendBufferSize => c.zsock_set_sndbuf(self.socket, @intCast(opt.SendBufferSize)), + .LingerTimeout => result = c.zmq_setsockopt(self.socket_, c.ZMQ_LINGER, &opt.LingerTimeout, @sizeOf(@TypeOf(opt.LingerTimeout))), //else => return error.UnknownOption, } + + if (result < 0) return error.SetFailed; } /// Get an option of the socket. See `ZSocketOption` for details. pub fn getSocketOption(self: *ZSocket, opt: *ZSocketOption) !void { + var result: c_int = 0; + switch (opt.*) { - .ReceiveTimeout => opt.ReceiveTimeout = c.zsock_rcvtimeo(self.socket), - .ReceiveHighWaterMark => opt.ReceiveHighWaterMark = c.zsock_rcvhwm(self.socket), - .ReceiveBufferSize => opt.ReceiveBufferSize = c.zsock_rcvbuf(self.socket), + .ReceiveTimeout => { + var length: usize = @sizeOf(@TypeOf(opt.ReceiveTimeout)); + + result = c.zmq_getsockopt(self.socket_, c.ZMQ_RCVTIMEO, &opt.ReceiveTimeout, &length); + }, + .ReceiveHighWaterMark => { + var length: usize = @sizeOf(@TypeOf(opt.ReceiveHighWaterMark)); + + result = c.zmq_getsockopt(self.socket_, c.ZMQ_RCVHWM, &opt.ReceiveHighWaterMark, &length); + }, + .ReceiveBufferSize => { + var length: usize = @sizeOf(@TypeOf(opt.ReceiveBufferSize)); + + result = c.zmq_getsockopt(self.socket_, c.ZMQ_RCVBUF, &opt.ReceiveBufferSize, &length); + }, + + .SendTimeout => { + var length: usize = @sizeOf(@TypeOf(opt.SendTimeout)); + + result = c.zmq_getsockopt(self.socket_, c.ZMQ_SNDTIMEO, &opt.SendTimeout, &length); + }, + .SendHighWaterMark => { + var length: usize = @sizeOf(@TypeOf(opt.SendHighWaterMark)); + + result = c.zmq_getsockopt(self.socket_, c.ZMQ_SNDHWM, &opt.SendHighWaterMark, &length); + }, + .SendBufferSize => { + var length: usize = @sizeOf(@TypeOf(opt.SendBufferSize)); - .SendTimeout => opt.SendTimeout = c.zsock_sndtimeo(self.socket), - .SendHighWaterMark => opt.SendHighWaterMark = c.zsock_sndhwm(self.socket), - .SendBufferSize => opt.SendBufferSize = c.zsock_sndbuf(self.socket), + result = c.zmq_getsockopt(self.socket_, c.ZMQ_SNDBUF, &opt.SendBufferSize, &length); + }, + + .LingerTimeout => { + var length: usize = @sizeOf(@TypeOf(opt.LingerTimeout)); + + result = c.zmq_getsockopt(self.socket_, c.ZMQ_LINGER, &opt.LingerTimeout, &length); + }, //else => return error.UnknownOption, } + + if (result < 0) return error.GetFailed; } /// Destroy the socket and clean up pub fn deinit(self: *ZSocket) void { - var socket: ?*c.zsock_t = self.socket; - - c.zsock_destroy(&socket); + _ = c.zmq_close(self.socket_); // clean-up arena - var arena = self.selfArena; // prevent seg fault + var arena = self.selfArena_; // prevent seg fault arena.deinit(); } }; -test "ZSocket - roundtrip" { +test "ZSocket - roundtrip single" { const allocator = std.testing.allocator; + // create the context + var context = try zcontext.ZContext.init(allocator); + defer context.deinit(); + // bind the incoming socket - var incoming = try ZSocket.init(allocator, ZSocketType.Pair); + var incoming = try ZSocket.init(ZSocketType.Pair, &context); defer incoming.deinit(); - const port = try incoming.bind("tcp://127.0.0.1:!"); - try std.testing.expect(port >= 0xC000); - try std.testing.expect(incoming.endpoint != null); + try incoming.bind("tcp://127.0.0.1:*"); + try std.testing.expect(incoming.endpoint_ != null); + + std.log.info("Endpoint: {s}", .{try incoming.endpoint()}); // connect to the socket - var outgoing = try ZSocket.init(allocator, ZSocketType.Pair); + var outgoing = try ZSocket.init(ZSocketType.Pair, &context); defer outgoing.deinit(); - const endpoint = try std.fmt.allocPrint(allocator, "tcp://127.0.0.1:{}", .{port}); - defer allocator.free(endpoint); + try outgoing.connect(try incoming.endpoint()); + try std.testing.expect(outgoing.endpoint_ != null); - try outgoing.connect(endpoint); - try std.testing.expect(outgoing.endpoint != null); + // send a message + const msg = "hello world"; + + var outgoingData = try zmessage.ZMessage.initUnmanaged(msg, null); + defer outgoingData.deinit(); + try std.testing.expectEqual(msg.len, try outgoingData.size()); + try std.testing.expectEqualStrings(msg, try outgoingData.data()); + + // send the message + try outgoing.send(&outgoingData, .{ .dontwait = true }); + + // receive the message + var incomingData = try incoming.receive(.{}); + defer incomingData.deinit(); + + try std.testing.expectEqual(msg.len, try incomingData.size()); + try std.testing.expectEqualStrings(msg, try incomingData.data()); + try std.testing.expectEqual(false, incomingData.hasMore()); +} + +test "ZSocket - roundtrip multi-part" { + const allocator = std.testing.allocator; + + // create the context + var context = try zcontext.ZContext.init(allocator); + defer context.deinit(); + + // bind the incoming socket + var incoming = try ZSocket.init(ZSocketType.Pair, &context); + defer incoming.deinit(); + + try incoming.bind("tcp://127.0.0.1:*"); + try std.testing.expect(incoming.endpoint_ != null); + + std.log.info("Endpoint: {s}", .{try incoming.endpoint()}); + + // connect to the socket + var outgoing = try ZSocket.init(ZSocketType.Pair, &context); + defer outgoing.deinit(); + + try outgoing.connect(try incoming.endpoint()); + try std.testing.expect(outgoing.endpoint_ != null); // send a message const msg = "hello world"; - var outgoingData = try zframe.ZFrame.init(msg); + var outgoingData = try zmessage.ZMessage.initUnmanaged(msg, null); defer outgoingData.deinit(); - try std.testing.expectEqual(true, outgoingData.frameOwned); try std.testing.expectEqual(msg.len, try outgoingData.size()); try std.testing.expectEqualStrings(msg, try outgoingData.data()); - // send the first frame - try outgoing.send(&outgoingData, .{ .dontwait = true, .reuse = true, .more = true }); - try std.testing.expectEqual(true, outgoingData.frameOwned); + // send the first message + try outgoing.send(&outgoingData, .{ .dontwait = true, .more = true }); - // send the second frame (reusing the previous one) + // send the second message (reusing the previous one) try outgoing.send(&outgoingData, .{ .dontwait = true }); - try std.testing.expectEqual(false, outgoingData.frameOwned); - // receive the first frame of the message - var incomingData = try incoming.receive(); + // receive the first message of the message + var incomingData = try incoming.receive(.{}); defer incomingData.deinit(); try std.testing.expectEqual(msg.len, try incomingData.size()); try std.testing.expectEqualStrings(msg, try incomingData.data()); - try std.testing.expectEqual(true, try incomingData.hasMore()); + try std.testing.expectEqual(true, incomingData.hasMore()); - // receive the second frame - var incomingData2 = try incoming.receive(); + // receive the second message + var incomingData2 = try incoming.receive(.{}); defer incomingData2.deinit(); try std.testing.expectEqual(msg.len, try incomingData2.size()); try std.testing.expectEqualStrings(msg, try incomingData2.data()); - try std.testing.expectEqual(false, try incomingData2.hasMore()); + try std.testing.expectEqual(false, incomingData2.hasMore()); } test "ZSocket - roundtrip json" { const allocator = std.testing.allocator; + // create the context + var context = try zcontext.ZContext.init(allocator); + defer context.deinit(); + // bind the incoming socket - var incoming = try ZSocket.init(allocator, ZSocketType.Pair); + var incoming = try ZSocket.init(ZSocketType.Pair, &context); defer incoming.deinit(); - const port = try incoming.bind("tcp://127.0.0.1:!"); - try std.testing.expect(port >= 0xC000); + try incoming.bind("tcp://127.0.0.1:*"); + + std.log.info("Endpoint: {s}", .{try incoming.endpoint()}); // connect to the socket - var outgoing = try ZSocket.init(allocator, ZSocketType.Pair); + var outgoing = try ZSocket.init(ZSocketType.Pair, &context); defer outgoing.deinit(); - const endpoint = try std.fmt.allocPrint(allocator, "tcp://127.0.0.1:{}", .{port}); - defer allocator.free(endpoint); - - try outgoing.connect(endpoint); + try outgoing.connect(try incoming.endpoint()); // send a message const Obj = struct { @@ -526,13 +710,13 @@ test "ZSocket - roundtrip json" { const msg = try std.json.stringifyAlloc(allocator, &outgoingObj, .{}); defer allocator.free(msg); - var outgoingData = try zframe.ZFrame.init(msg); + var outgoingData = try zmessage.ZMessage.initUnmanaged(msg, null); defer outgoingData.deinit(); try outgoing.send(&outgoingData, .{ .dontwait = true }); - // receive the first frame of the message - var incomingData = try incoming.receive(); + // receive the first message of the message + var incomingData = try incoming.receive(.{}); defer incomingData.deinit(); const incomingObj = try std.json.parseFromSlice(Obj, allocator, try incomingData.data(), .{}); @@ -544,8 +728,12 @@ test "ZSocket - roundtrip json" { test "ZSocket - receive timeout" { const allocator = std.testing.allocator; + // create the context + var context = try zcontext.ZContext.init(allocator); + defer context.deinit(); + // create the socket - var incoming = try ZSocket.init(allocator, ZSocketType.Rep); + var incoming = try ZSocket.init(ZSocketType.Rep, &context); defer incoming.deinit(); // set the receive timeout @@ -564,18 +752,23 @@ test "ZSocket - receive timeout" { } // bind the port - const port = try incoming.bind("tcp://127.0.0.1:!"); - try std.testing.expect(port >= 0xC000); + try incoming.bind("tcp://127.0.0.1:*"); + + std.log.info("Endpoint: {s}", .{try incoming.endpoint()}); // try to receive the message - try std.testing.expectError(error.ReceiveFrameInterrupted, incoming.receive()); + try std.testing.expectError(error.NonBlockingQueueEmpty, incoming.receive(.{})); } test "ZSocket - send timeout" { const allocator = std.testing.allocator; + // create the context + var context = try zcontext.ZContext.init(allocator); + defer context.deinit(); + // create the socket - var socket = try ZSocket.init(allocator, ZSocketType.Pair); + var socket = try ZSocket.init(ZSocketType.Pair, &context); defer socket.deinit(); // set the send timeout @@ -594,12 +787,13 @@ test "ZSocket - send timeout" { } // bind the port - const port = try socket.bind("tcp://127.0.0.1:!"); - try std.testing.expect(port >= 0xC000); + try socket.bind("tcp://127.0.0.1:*"); + + std.log.info("Endpoint: {s}", .{try socket.endpoint()}); // try to send the message - var frame = try zframe.ZFrame.initEmpty(); - defer frame.deinit(); + var message = try zmessage.ZMessage.initExternalEmpty(); + defer message.deinit(); - try std.testing.expectError(error.SendFrameFailed, socket.send(&frame, .{})); + try std.testing.expectError(error.NonBlockingQueueFull, socket.send(&message, .{})); } diff --git a/src/czmq.zig b/src/zmq.zig similarity index 69% rename from src/czmq.zig rename to src/zmq.zig index a311fdf..06e0e90 100644 --- a/src/czmq.zig +++ b/src/zmq.zig @@ -1,4 +1,4 @@ pub const c = @cImport({ - @cInclude("czmq.h"); + @cInclude("zmq.h"); @cInclude("string.h"); }); diff --git a/src/zzmq.zig b/src/zzmq.zig index 85a9e68..8333fcb 100644 --- a/src/zzmq.zig +++ b/src/zzmq.zig @@ -1,12 +1,18 @@ const std = @import("std"); +const zcontext = @import("classes/zcontext.zig"); const zsocket = @import("classes/zsocket.zig"); -const zframe = @import("classes/zframe.zig"); +const zmessage = @import("classes/zmessage.zig"); + +pub const ZContext = zcontext.ZContext; +pub const ZVersion = zcontext.ZVersion; pub const ZSocket = zsocket.ZSocket; pub const ZSocketType = zsocket.ZSocketType; +pub const ZSocketOption = zsocket.ZSocketOption; +pub const ZMessageReceived = zsocket.ZMessageReceived; -pub const ZFrame = zframe.ZFrame; +pub const ZMessage = zmessage.ZMessage; test { std.testing.refAllDeclsRecursive(@This()); diff --git a/test.Dockerfile b/test.Dockerfile index 6b98708..3f74a82 100644 --- a/test.Dockerfile +++ b/test.Dockerfile @@ -23,32 +23,12 @@ RUN make install FROM cpp_base as czmq_builder -ARG CZMQ_VERSION=4.2.1 - -# add the pre-processed source package (note: this is not the raw source code from Git!) -ADD https://github.com/zeromq/czmq/releases/download/v${CZMQ_VERSION}/czmq-${CZMQ_VERSION}.tar.gz /tmp/source.tgz - -COPY --from=libzmq_builder /build/output/ /usr/ - -WORKDIR /build - -RUN tar -xzf /tmp/source.tgz --strip-components=1 - -RUN ./configure --prefix=/build/output -RUN make install - - - - -FROM alpine:3.19 as builder - # install Zig 0.11 from Alpine edge repo: https://pkgs.alpinelinux.org/package/edge/testing/x86_64/zig RUN echo "@testing https://dl-cdn.alpinelinux.org/alpine/edge/testing" >> /etc/apk/repositories RUN apk add --no-cache zig@testing~=0.11.0 # install dependencies (keep in sync with other images above) COPY --from=libzmq_builder /build/output/ /usr/ -COPY --from=czmq_builder /build/output/ /usr/ COPY . /build/