Skip to content

Commit

Permalink
Fix interrupt handling in examples
Browse files Browse the repository at this point in the history
[skip ci]
  • Loading branch information
fkollmann committed Feb 19, 2024
1 parent 0e91081 commit cda2944
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 14 deletions.
21 changes: 20 additions & 1 deletion examples/dealer_rep_client/src/main.zig
Original file line number Diff line number Diff line change
@@ -1,6 +1,22 @@
const std = @import("std");
const zzmq = @import("zzmq");

var stopRunning_ = std.atomic.Atomic(bool).init(false);
const stopRunning = &stopRunning_;

fn sig_handler(sig: c_int) align(1) callconv(.C) void {
_ = sig;
std.log.info("Stopping...", .{});

stopRunning.store(true, .SeqCst);
}

const sig_ign = std.os.Sigaction{
.handler = .{ .handler = &sig_handler },
.mask = std.os.empty_sigset,
.flags = 0,
};

pub fn main() !void {
std.log.info("Connecting to the server...", .{});

Expand All @@ -20,9 +36,12 @@ pub fn main() !void {
try socket.setSocketOption(.{ .ReceiveBufferSize = 256 }); // keep it small
try socket.setSocketOption(.{ .SendTimeout = 500 });

try std.os.sigaction(std.os.SIG.INT, &sig_ign, null); // ZSocket.init() will re-assign interrupts
try std.os.sigaction(std.os.SIG.TERM, &sig_ign, null);

try socket.connect("tcp://127.0.0.1:5555");

while (true) {
while (!stopRunning.load(.SeqCst)) {
// Receive the request
{
var frame = try socket.receive();
Expand Down
29 changes: 16 additions & 13 deletions examples/dealer_rep_server/src/main.zig
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
const std = @import("std");
const zzmq = @import("zzmq");

fn senderThreadMain(socket: *zzmq.ZSocket, stop: *bool, allocator: std.mem.Allocator) !void {
var stopRunning_ = std.atomic.Atomic(bool).init(false);
const stopRunning = &stopRunning_;

fn senderThreadMain(socket: *zzmq.ZSocket, allocator: std.mem.Allocator) !void {
var index: usize = 0;

while (!stop.*) {
while (!stopRunning.load(.SeqCst)) {
index += 1;

std.log.info("Sending {}...", .{index});
Expand All @@ -15,11 +18,11 @@ fn senderThreadMain(socket: *zzmq.ZSocket, stop: *bool, allocator: std.mem.Alloc
var frame = try zzmq.ZFrame.initEmpty();
defer frame.deinit();

while (!stop.*) { // retry until a client connects
while (!stopRunning.load(.SeqCst)) { // retry until a client connects
socket.send(&frame, .{ .more = true }) catch continue;
break;
}
if (stop.*) return;
if (stopRunning.load(.SeqCst)) return;
}

// Send the request
Expand All @@ -38,11 +41,11 @@ fn senderThreadMain(socket: *zzmq.ZSocket, stop: *bool, allocator: std.mem.Alloc
}
}

var senderThreadStop = false;

fn sig_handler(sig: c_int) align(1) callconv(.C) void {
_ = sig;
senderThreadStop = true;
std.log.info("Stopping...", .{});

stopRunning.store(true, .SeqCst);
}

const sig_ign = std.os.Sigaction{
Expand All @@ -52,9 +55,6 @@ const sig_ign = std.os.Sigaction{
};

pub fn main() !void {
try std.os.sigaction(std.os.SIG.INT, &sig_ign, null);
try std.os.sigaction(std.os.SIG.TERM, &sig_ign, null);

std.log.info("Starting the server...", .{});

var gpa = std.heap.GeneralPurposeAllocator(.{}){};
Expand All @@ -73,16 +73,19 @@ pub fn main() !void {
try socket.setSocketOption(.{ .SendHighWaterMark = 50 });
try socket.setSocketOption(.{ .SendBufferSize = 256 }); // keep it small

try std.os.sigaction(std.os.SIG.INT, &sig_ign, null); // ZSocket.init() will re-assign interrupts
try std.os.sigaction(std.os.SIG.TERM, &sig_ign, null);

_ = try socket.bind("tcp://127.0.0.1:5555");

// start sending threads
const senderThread = try std.Thread.spawn(.{}, senderThreadMain, .{ socket, &senderThreadStop, allocator });
const senderThread = try std.Thread.spawn(.{}, senderThreadMain, .{ socket, allocator });
defer {
senderThreadStop = true;
stopRunning.store(true, .SeqCst);
senderThread.join();
}

while (!senderThreadStop) {
while (!stopRunning.load(.SeqCst)) {
// Wait for the next reply
{
var frame = socket.receive() catch continue;
Expand Down

0 comments on commit cda2944

Please sign in to comment.