Skip to content

Commit

Permalink
Issue #225: Improved handling of broken connections; Improved handlin…
Browse files Browse the repository at this point in the history
…g of messages already queued for broken connections; Improved logging (again)
  • Loading branch information
hypfvieh committed Aug 15, 2023
1 parent 3715a4a commit 99a8552
Showing 1 changed file with 50 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,44 +2,31 @@

import org.freedesktop.dbus.Marshalling;
import org.freedesktop.dbus.connections.BusAddress;
import org.freedesktop.dbus.connections.transports.AbstractTransport;
import org.freedesktop.dbus.connections.transports.TransportBuilder;
import org.freedesktop.dbus.connections.transports.*;
import org.freedesktop.dbus.connections.transports.TransportBuilder.SaslAuthMode;
import org.freedesktop.dbus.connections.transports.TransportConnection;
import org.freedesktop.dbus.errors.AccessDenied;
import org.freedesktop.dbus.errors.Error;
import org.freedesktop.dbus.errors.MatchRuleInvalid;
import org.freedesktop.dbus.exceptions.DBusException;
import org.freedesktop.dbus.exceptions.DBusExecutionException;
import org.freedesktop.dbus.interfaces.DBus;
import org.freedesktop.dbus.interfaces.*;
import org.freedesktop.dbus.interfaces.DBus.NameOwnerChanged;
import org.freedesktop.dbus.interfaces.FatalException;
import org.freedesktop.dbus.interfaces.Introspectable;
import org.freedesktop.dbus.interfaces.Peer;
import org.freedesktop.dbus.messages.DBusSignal;
import org.freedesktop.dbus.messages.Message;
import org.freedesktop.dbus.messages.MethodCall;
import org.freedesktop.dbus.messages.MethodReturn;
import org.freedesktop.dbus.messages.*;
import org.freedesktop.dbus.types.UInt32;
import org.freedesktop.dbus.types.Variant;
import org.freedesktop.dbus.utils.Hexdump;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.*;
import java.lang.ref.WeakReference;
import java.lang.reflect.InvocationTargetException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.*;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -86,7 +73,7 @@ private void send(ConnectionStruct _connStruct, Message _msg) {
private void send(ConnectionStruct _connStruct, Message _msg, boolean _head) {

// send to all connections
if (null == _connStruct) {
if (_connStruct == null) {
LOGGER.trace("Queuing message {} for all connections", _msg);
for (ConnectionStruct d : conns.keySet()) {
if (d.connection == null || d.connection.getChannel() == null || !d.connection.getChannel().isConnected()) {
Expand All @@ -99,6 +86,9 @@ private void send(ConnectionStruct _connStruct, Message _msg, boolean _head) {
}
}
}
} else if (_connStruct.connection == null || _connStruct.connection.getChannel() == null || !_connStruct.connection.getChannel().isConnected()) {
// ignore messages addressed for invalid or disconnected end points
LOGGER.debug("Ignoring message for disconnected/invalid connection {}: {}", _connStruct, _msg);
} else {
LOGGER.trace("Queuing message {} for {}", _msg, _connStruct.unique);
if (_head) {
Expand Down Expand Up @@ -219,31 +209,44 @@ private void removeConnection(ConnectionStruct _c) {
DBusDaemonReaderThread oldThread = conns.remove(_c);

if (oldThread != null) {
LOGGER.debug("Terminating reader thread for {}", _c);
oldThread.terminate();

try {
if (null != _c.connection) {
if (_c.connection != null) {
_c.connection.close();
LOGGER.debug("Terminated connection {}", _c.connection);
}
} catch (IOException _exIo) {
LOGGER.trace("Error while closing socketchannel", _exIo);
LOGGER.debug("Error while closing socketchannel", _exIo);
}
}

synchronized (names) {
List<String> toRemove = new ArrayList<>();
for (String name : names.keySet()) {
if (names.get(name) == _c) {
toRemove.add(name);
try {
send(null, new NameOwnerChanged("/org/freedesktop/DBus", name, _c.unique, ""));
} catch (DBusException _ex) {
LOGGER.debug("Unable to change owner", _ex);
}
}
LOGGER.debug("Removing signal destination {}", _c);
synchronized (sigrecips) {
if (sigrecips.removeIf(e -> e.equals(_c))) {
LOGGER.debug("Removed one or more signal destinations for {}", _c);
}
}

LOGGER.debug("Removing name registration for {}", _c);
synchronized (names) {
List<String> toRemove = new ArrayList<>();

// find connection by name
for (String name : names.keySet()) {
if (names.get(name) == _c) {
toRemove.add(name);
}
for (String name : toRemove) {
names.remove(name);
}

// remove registered name and send signal to remaining connections
for (String name : toRemove) {
names.remove(name);
try {
send(null, new NameOwnerChanged("/org/freedesktop/DBus", name, _c.unique, ""));
} catch (DBusException _ex) {
LOGGER.debug("Unable to change owner", _ex);
}
}
}
Expand Down Expand Up @@ -766,10 +769,16 @@ public void run() {
connectionStruct.connection.getWriter().writeMessage(pollFirst.first);
} catch (IOException _ex) {
logger.debug("Disconnecting client due to previous exception", _ex);
// we had an exception, the connection can no longer be used,
// remove all messages before closing connection
removeAllMessagesForConnection(connectionStruct);
removeConnection(connectionStruct);
}
} else {
logger.warn("Connection to {} broken", connectionStruct.connection);
// we already know that this connection is no longer working: Remove all messages
// belonging to the broken connection
removeAllMessagesForConnection(connectionStruct);
removeConnection(connectionStruct);
}

Expand All @@ -784,6 +793,12 @@ public void run() {
logger.debug(">>>> Sender Thread terminated <<<<");
}

private void removeAllMessagesForConnection(ConnectionStruct _connectionStruct) {
if (outqueue.removeIf(e -> e.second.get().connection == _connectionStruct.connection)) {
logger.debug("Removed all messages addressed to broken connection {}", _connectionStruct);
}
}

public synchronized void terminate() {
running.set(false);
interrupt();
Expand Down Expand Up @@ -819,7 +834,7 @@ public void run() {
LOGGER.debug("Error reading message", _ex);
removeConnection(conn);
} catch (DBusException _ex) {
LOGGER.debug("", _ex);
LOGGER.debug("DBusException while reading message", _ex);
if (_ex instanceof FatalException) {
removeConnection(conn);
}
Expand Down

0 comments on commit 99a8552

Please sign in to comment.