Skip to content

Commit

Permalink
socket_manager: add feature to take over another server
Browse files Browse the repository at this point in the history
Another process can take over UDP/TCP sockets without downtime.

    server = ServerEngine::SocketManager::Server.take_over_another_server(path)

This starts a new server that has all UDP/TCP sockets of the
existing server.
The old process should stop without removing the file for the
socket after the new process starts.

This may not be the primary use case assumed by ServerEngine, but
we need this feature to replace both the server and the workers
with a new process without downtime.
Currently, ServerEngine does not provide this feature for
network servers.

At the moment, I assume that the application side uses this
feature ad hoc, but, in the future, this could be used to support
live reload for entire network servers.

ref: fluent/fluentd#4622

Signed-off-by: Daijiro Fukuda <fukuda@clear-code.com>
Co-authored-by: Shizuo Fujita <fujita@clear-code.com>
  • Loading branch information
daipom and Watson1978 committed Oct 21, 2024
1 parent 5e9d11e commit 13b3d39
Show file tree
Hide file tree
Showing 5 changed files with 279 additions and 65 deletions.
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -413,8 +413,14 @@ se = ServerEngine.create(MyServer, MyWorker, {
se.run
```

See also [examples](https://github.com/fluent/serverengine/tree/master/examples).
Other features:

- `socket_manager_server = SocketManager::Server.take_over_another_server(path)`
- It starts a new manager server that has all UDP/TCP sockets of the existing manager.
- It means that another process can take over UDP/TCP sockets without downtime.
- The old process should stop without removing the file for the socket after the new process starts.

See also [examples](https://github.com/fluent/serverengine/tree/master/examples).

## Module API

Expand Down
16 changes: 12 additions & 4 deletions lib/serverengine/socket_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,22 @@ def self.open(path = nil)
end
end

def initialize(path)
def self.take_over_another_server(path)
raise NotImplementedError, "Not supported on Windows." if ServerEngine.windows?
server = new(path, start: false)
server.take_over_another_server
server
end

def initialize(path, start: true)
@tcp_sockets = {}
@udp_sockets = {}
@mutex = Mutex.new
@path = start_server(path)
@path = start ? start_server(path) : path
end

attr_reader :path
attr_reader :tcp_sockets, :udp_sockets # for tests

def new_client
Client.new(@path)
Expand Down Expand Up @@ -159,9 +167,9 @@ def process_peer(peer)
res = SocketManager.recv_peer(peer)
return if res.nil?

pid, method, bind, port = *res
pid, method, *opts = res
begin
send_socket(peer, pid, method, bind, port)
send_socket(peer, pid, method, *opts)
rescue => e
SocketManager.send_peer(peer, e)
end
Expand Down
129 changes: 89 additions & 40 deletions lib/serverengine/socket_manager_unix.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,67 @@ def recv_udp(family, peer, sent)
end

module ServerModule
def start_server(path)
unless @server
# return absolute path so that client can connect to this path
# when client changed working directory
path = File.expand_path(path)

begin
old_umask = File.umask(0077) # Protect unix socket from other users
@server = UNIXServer.new(path)
ensure
File.umask(old_umask)
end
end

@thread = Thread.new do
begin
while peer = @server.accept
Thread.new(peer, &method(:process_peer)) # process_peer calls send_socket
end
rescue => e
unless @server.closed?
ServerEngine.dump_uncaught_error(e)
end
end
end

return path
end

def take_over_another_server
another_server = UNIXSocket.new(@path)
begin
idx = 0
while true
SocketManager.send_peer(another_server, [Process.pid, :get_listening_tcp, idx])
key = SocketManager.recv_peer(another_server)
break if key.nil?
@tcp_sockets[key] = another_server.recv_io TCPServer
idx += 1
end

idx = 0
while true
SocketManager.send_peer(another_server, [Process.pid, :get_listening_udp, idx])
key = SocketManager.recv_peer(another_server)
break if key.nil?
@udp_sockets[key] = another_server.recv_io UDPSocket
idx += 1
end

SocketManager.send_peer(another_server, [Process.pid, :get_unix])
res = SocketManager.recv_peer(another_server)
raise res if res.is_a?(Exception)
@server = another_server.recv_io UNIXServer

start_server(@path)
ensure
another_server.close
end
end

private

def listen_tcp_new(bind_ip, port)
Expand Down Expand Up @@ -76,33 +137,6 @@ def listen_udp_new(bind_ip, port)
UDPSocket.for_fd(usock.fileno)
end

def start_server(path)
# return absolute path so that client can connect to this path
# when client changed working directory
path = File.expand_path(path)

begin
old_umask = File.umask(0077) # Protect unix socket from other users
@server = UNIXServer.new(path)
ensure
File.umask(old_umask)
end

@thread = Thread.new do
begin
while peer = @server.accept
Thread.new(peer, &method(:process_peer)) # process_peer calls send_socket
end
rescue => e
unless @server.closed?
ServerEngine.dump_uncaught_error(e)
end
end
end

return path
end

def stop_server
@tcp_sockets.reject! {|key,lsock| lsock.close; true }
@udp_sockets.reject! {|key,usock| usock.close; true }
Expand All @@ -111,19 +145,34 @@ def stop_server
@thread.join if RUBY_VERSION >= "2.2"
end

def send_socket(peer, pid, method, bind, port)
sock = case method
when :listen_tcp
listen_tcp(bind, port)
when :listen_udp
listen_udp(bind, port)
else
raise ArgumentError, "Unknown method: #{method.inspect}"
end

SocketManager.send_peer(peer, nil)

peer.send_io sock
def send_socket(peer, pid, method, *opts)
case method
when :listen_tcp
bind, port = opts
sock = listen_tcp(bind, port)
SocketManager.send_peer(peer, nil)
peer.send_io sock
when :listen_udp
bind, port = opts
sock = listen_udp(bind, port)
SocketManager.send_peer(peer, nil)
peer.send_io sock
when :get_listening_tcp
idx, = opts
key = @tcp_sockets.keys[idx]
SocketManager.send_peer(peer, key)
peer.send_io(@tcp_sockets.values[idx]) if key
when :get_listening_udp
idx, = opts
key = @udp_sockets.keys[idx]
SocketManager.send_peer(peer, key)
peer.send_io(@udp_sockets.values[idx]) if key
when :get_unix
SocketManager.send_peer(peer, nil)
peer.send_io @server
else
raise ArgumentError, "Unknown method: #{method.inspect}"
end
end
end

Expand Down
40 changes: 20 additions & 20 deletions lib/serverengine/socket_manager_win.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,26 @@ def recv_udp(family, peer, sent)
end

module ServerModule
def start_server(addr)
# We need to take care about selecting an available port.
# By passing `nil` or `0` as `addr`, an available port is automatically selected.
# However, we should consider using NamedPipe instead of TCPServer.
@server = TCPServer.new("127.0.0.1", addr)
@thread = Thread.new do
begin
while peer = @server.accept
Thread.new(peer, &method(:process_peer)) # process_peer calls send_socket
end
rescue => e
unless @server.closed?
ServerEngine.dump_uncaught_error(e)
end
end
end

return @server.addr[1]
end

private

TCP_OPTIONS = [Socket::SOCK_STREAM, Socket::IPPROTO_TCP, TCPServer, true]
Expand Down Expand Up @@ -107,26 +127,6 @@ def htons(h)
[h].pack("S").unpack("n")[0]
end

def start_server(addr)
# We need to take care about selecting an available port.
# By passing `nil` or `0` as `addr`, an available port is automatically selected.
# However, we should consider using NamedPipe instead of TCPServer.
@server = TCPServer.new("127.0.0.1", addr)
@thread = Thread.new do
begin
while peer = @server.accept
Thread.new(peer, &method(:process_peer)) # process_peer calls send_socket
end
rescue => e
unless @server.closed?
ServerEngine.dump_uncaught_error(e)
end
end
end

return @server.addr[1]
end

def stop_server
@tcp_sockets.reject! {|key,lsock| lsock.close; true }
@udp_sockets.reject! {|key,usock| usock.close; true }
Expand Down
Loading

0 comments on commit 13b3d39

Please sign in to comment.