Skip to content

Commit

Permalink
Add support for RoutingId and RouterHandover socket options
Browse files Browse the repository at this point in the history
Remove unused ZSocketOptionTag enum
Add `ZSocket - routing id` unit test
See #4
Thanks to https://github.com/7Zifle
  • Loading branch information
fkollmann committed May 5, 2024
1 parent 666a459 commit c6ab6ee
Showing 1 changed file with 125 additions and 22 deletions.
147 changes: 125 additions & 22 deletions src/classes/zsocket.zig
Original file line number Diff line number Diff line change
Expand Up @@ -132,19 +132,7 @@ pub const ZSocketType = enum(c_int) {
Push = c.ZMQ_PUSH,
};

pub const ZSocketOptionTag = enum {
ReceiveTimeout,
ReceiveHighWaterMark,
ReceiveBufferSize,

SendTimeout,
SendHighWaterMark,
SendBufferSize,

LingerTimeout,
};

pub const ZSocketOption = union(ZSocketOptionTag) {
pub const ZSocketOption = union(enum) {
/// ZMQ_RCVTIMEO: Maximum time before a recv operation returns with EAGAIN
///
/// Sets the timeout for receive operation on the socket.
Expand Down Expand Up @@ -251,6 +239,38 @@ pub const ZSocketOption = union(ZSocketOptionTag) {
///
/// For more details, see https://libzmq.readthedocs.io/en/latest/zmq_setsockopt.html
LingerTimeout: i32,

/// ZMQ_ROUTING_ID: Set socket routing id
///
/// The 'ZMQ_ROUTING_ID' option shall set the routing id of the specified 'socket' when connecting to a ROUTER socket.
///
/// A routing id must be at least one byte and at most 255 bytes long. Identities starting with a zero byte are reserved for
/// use by the 0MQ infrastructure.
///
/// If two clients use the same routing id when connecting to a ROUTER, the results shall
/// depend on the ZMQ_ROUTER_HANDOVER option setting. If that is not set (or set to the
/// default of zero), the ROUTER socket shall reject clients trying to connect with an
/// already-used routing id. If that option is set to 1, the ROUTER socket shall hand-over
/// the connection to the new client and disconnect the existing one.
///
/// For more details, see https://libzmq.readthedocs.io/en/latest/zmq_setsockopt.html
RoutingId: []u8,

/// ZMQ_ROUTER_HANDOVER: handle duplicate client routing ids on ROUTER sockets
///
/// If two clients use the same routing id when connecting to a ROUTER,
/// the results shall depend on the ZMQ_ROUTER_HANDOVER option setting.
///
/// If that is not set (or set to the default of false), the ROUTER socket shall reject
/// clients trying to connect with an already-used routing id.
///
/// If that option is set to true, the ROUTER socket shall hand-over the connection
/// to the new client and disconnect the existing one.
///
/// Default: false (reject client)
///
/// For more details, see https://libzmq.readthedocs.io/en/latest/zmq_setsockopt.html
RouterHandover: bool,
};

/// System level socket, which allows for opening outgoing and
Expand Down Expand Up @@ -492,20 +512,51 @@ pub const ZSocket = struct {
var result: c_int = 0;

switch (opt) {
.ReceiveTimeout => result = c.zmq_setsockopt(self.socket_, c.ZMQ_RCVTIMEO, &opt.ReceiveTimeout, @sizeOf(@TypeOf(opt.ReceiveTimeout))),
.ReceiveHighWaterMark => result = c.zmq_setsockopt(self.socket_, c.ZMQ_RCVHWM, &opt.ReceiveHighWaterMark, @sizeOf(@TypeOf(opt.ReceiveHighWaterMark))),
.ReceiveBufferSize => result = c.zmq_setsockopt(self.socket_, c.ZMQ_RCVBUF, &opt.ReceiveBufferSize, @sizeOf(@TypeOf(opt.ReceiveBufferSize))),
.ReceiveTimeout => {
result = c.zmq_setsockopt(self.socket_, c.ZMQ_RCVTIMEO, &opt.ReceiveTimeout, @sizeOf(@TypeOf(opt.ReceiveTimeout)));
},
.ReceiveHighWaterMark => {
result = c.zmq_setsockopt(self.socket_, c.ZMQ_RCVHWM, &opt.ReceiveHighWaterMark, @sizeOf(@TypeOf(opt.ReceiveHighWaterMark)));
},
.ReceiveBufferSize => {
result = c.zmq_setsockopt(self.socket_, c.ZMQ_RCVBUF, &opt.ReceiveBufferSize, @sizeOf(@TypeOf(opt.ReceiveBufferSize)));
},

.SendTimeout => result = c.zmq_setsockopt(self.socket_, c.ZMQ_SNDTIMEO, &opt.SendTimeout, @sizeOf(@TypeOf(opt.SendTimeout))),
.SendHighWaterMark => result = c.zmq_setsockopt(self.socket_, c.ZMQ_SNDHWM, &opt.SendHighWaterMark, @sizeOf(@TypeOf(opt.SendHighWaterMark))),
.SendBufferSize => result = c.zmq_setsockopt(self.socket_, c.ZMQ_SNDBUF, &opt.SendBufferSize, @sizeOf(@TypeOf(opt.SendBufferSize))),
.SendTimeout => {
result = c.zmq_setsockopt(self.socket_, c.ZMQ_SNDTIMEO, &opt.SendTimeout, @sizeOf(@TypeOf(opt.SendTimeout)));
},
.SendHighWaterMark => {
result = c.zmq_setsockopt(self.socket_, c.ZMQ_SNDHWM, &opt.SendHighWaterMark, @sizeOf(@TypeOf(opt.SendHighWaterMark)));
},
.SendBufferSize => {
result = c.zmq_setsockopt(self.socket_, c.ZMQ_SNDBUF, &opt.SendBufferSize, @sizeOf(@TypeOf(opt.SendBufferSize)));
},

.LingerTimeout => {
result = c.zmq_setsockopt(self.socket_, c.ZMQ_LINGER, &opt.LingerTimeout, @sizeOf(@TypeOf(opt.LingerTimeout)));
},

.RoutingId => {
result = c.zmq_setsockopt(self.socket_, c.ZMQ_ROUTING_ID, opt.RoutingId.ptr, opt.RoutingId.len);
},
.RouterHandover => {
const v: c_int = @intFromBool(opt.RouterHandover);

.LingerTimeout => result = c.zmq_setsockopt(self.socket_, c.ZMQ_LINGER, &opt.LingerTimeout, @sizeOf(@TypeOf(opt.LingerTimeout))),
result = c.zmq_setsockopt(self.socket_, c.ZMQ_ROUTER_HANDOVER, &v, @sizeOf(@TypeOf(v)));
},

//else => return error.UnknownOption,
}

if (result < 0) return error.SetFailed;
if (result < 0) {
switch (c.zmq_errno()) {
c.EINVAL => return error.OptionOrValueInvalid,
c.ETERM => return error.ZContextTerminated,
c.ENOTSOCK => return error.SocketInvalid,
c.EINTR => return error.Interrupted,
else => return error.SetFailed,
}
}
}

/// Get an option of the socket. See `ZSocketOption` for details.
Expand Down Expand Up @@ -551,10 +602,25 @@ pub const ZSocket = struct {
result = c.zmq_getsockopt(self.socket_, c.ZMQ_LINGER, &opt.LingerTimeout, &length);
},

.RoutingId => {
result = c.zmq_getsockopt(self.socket_, c.ZMQ_ROUTING_ID, opt.RoutingId.ptr, &opt.RoutingId.len);
},
.RouterHandover => {
return error.UnknownOption; // ZMQ_ROUTER_HANDOVER cannot be retrieved
},

//else => return error.UnknownOption,
}

if (result < 0) return error.GetFailed;
if (result < 0) {
switch (c.zmq_errno()) {
c.EINVAL => return error.OptionOrValueInvalid,
c.ETERM => return error.ZContextTerminated,
c.ENOTSOCK => return error.SocketInvalid,
c.EINTR => return error.Interrupted,
else => return error.GetFailed,
}
}
}

/// Destroy the socket and clean up
Expand Down Expand Up @@ -802,3 +868,40 @@ test "ZSocket - send timeout" {
// try again, the owner should be lost
try std.testing.expectError(error.MessageOwnershipLost, socket.send(&message, .{}));
}

test "ZSocket - routing id" {
const allocator = std.testing.allocator;

// create the context
var context = try zcontext.ZContext.init(allocator);
defer context.deinit();

// create the socket
var socket = try ZSocket.init(ZSocketType.Router, &context);
defer socket.deinit();

// set the routing id
{
var v = ZSocketOption{ .RoutingId = undefined };
try socket.getSocketOption(&v);
try std.testing.expectEqualStrings("", v.RoutingId);
}

try socket.setSocketOption(.{ .RoutingId = @constCast("myRoutingID") });

{
var routingId: [255]u8 = undefined;

var v = ZSocketOption{ .RoutingId = &routingId };
try socket.getSocketOption(&v);
try std.testing.expectEqualStrings("myRoutingID", v.RoutingId);
}

// set the router handover
try socket.setSocketOption(.{ .RouterHandover = true });

{
var v = ZSocketOption{ .RouterHandover = undefined };
try std.testing.expectError(error.UnknownOption, socket.getSocketOption(&v));
}
}

0 comments on commit c6ab6ee

Please sign in to comment.