Skip to content

Commit

Permalink
Fix NullPointerException in congestion control.
Browse files Browse the repository at this point in the history
Add getRemoteSocketAddress to Exchange and use that.

fixes: issue 2093

Signed-off-by: Achim Kraus <achim.kraus@cloudcoap.net>
  • Loading branch information
boaks committed Dec 19, 2022
1 parent 27ac3a0 commit ef16394
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -337,10 +337,9 @@ public enum Origin {
* @param peersIdentity peer's identity. Usually that's the peer's
* {@link InetSocketAddress}.
* @param origin the origin of the request (LOCAL or REMOTE)
* @param executor executor to be used for exchanges. Maybe {@code null} for
* unit tests.
* @throws NullPointerException if request is {@code null}
* @since 3.0 (added peersIdentity)
* @param executor executor to be used for exchanges.
* @throws NullPointerException if request or executor is {@code null}
* @since 3.0 (added peersIdentity, executor adapted to mandatory)
*/
public Exchange(Request request, Object peersIdentity, Origin origin, Executor executor) {
this(request, peersIdentity, origin, executor, null, false);
Expand Down Expand Up @@ -511,6 +510,32 @@ public boolean isOfLocalOrigin() {
return origin == Origin.LOCAL;
}

/**
* Get remote socket address.
*
* Get remote socket address of current request.
*
* @return current remote socket address
* @throws IllegalArgumentException if corresponding endpoint context is
* missing
* @since 3.8
*/
public InetSocketAddress getRemoteSocketAddress() {
EndpointContext remoteEndpoint;
if ((origin == Origin.LOCAL)) {
remoteEndpoint = currentRequest.getDestinationContext();
if (remoteEndpoint == null) {
throw new IllegalArgumentException("Outgoing request must have destination context");
}
} else {
remoteEndpoint = currentRequest.getSourceContext();
if (remoteEndpoint == null) {
throw new IllegalArgumentException("Incoming request must have source context");
}
}
return remoteEndpoint.getPeerAddress();
}

/**
* Indicate to keep the original request in the exchange store. Intended to
* be used for observe request with blockwise response to be able to react
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,13 +196,7 @@ public void destroy() {
* @see #createRemoteEndpoint(InetSocketAddress)
*/
protected RemoteEndpoint getRemoteEndpoint(Exchange exchange) {
Message message;
if (exchange.isOfLocalOrigin()) {
message = exchange.getCurrentRequest();
} else {
message = exchange.getCurrentResponse();
}
InetSocketAddress remoteSocketAddress = message.getDestinationContext().getPeerAddress();
InetSocketAddress remoteSocketAddress = exchange.getRemoteSocketAddress();
synchronized (remoteEndpoints) {
RemoteEndpoint remoteEndpoint = remoteEndpoints.get(remoteSocketAddress);
if (remoteEndpoint == null) {
Expand Down Expand Up @@ -262,14 +256,16 @@ public RtoType getExchangeEstimatorState(Exchange exchange) {
* response is postponed and put into a queue.
*/
private boolean processResponse(RemoteEndpoint endpoint, Exchange exchange, Response response) {
Type messageType = response.getType();

exchange.setCurrentResponse(response);
if (!response.isNotification()) {
if (messageType == Type.CON) {
if (response.isConfirmable()) {
return checkNSTART(endpoint, exchange);
} else {
return true;
}
}

// Check, if there's space in the notifies queue
int size;
boolean start = false;
Expand Down Expand Up @@ -306,16 +302,16 @@ private boolean processResponse(RemoteEndpoint endpoint, Exchange exchange, Resp
private boolean checkNSTART(RemoteEndpoint endpoint, Exchange exchange) {
boolean send = false;
boolean queued = false;
Type type;
Message message;
String messageType;
Queue<Exchange> queue;
if (exchange.isOfLocalOrigin()) {
messageType = "req.-";
type = exchange.getCurrentRequest().getType();
message = exchange.getCurrentRequest();
queue = endpoint.getRequestQueue();
} else {
messageType = "resp.-";
type = exchange.getCurrentResponse().getType();
message = exchange.getCurrentResponse();
queue = endpoint.getResponseQueue();
}
int size;
Expand All @@ -333,16 +329,8 @@ private boolean checkNSTART(RemoteEndpoint endpoint, Exchange exchange) {
}
}
if (send) {
Message message;
if (exchange.isOfLocalOrigin()) {
// it's a request
message = exchange.getCurrentRequest();
} else {
// it's a response
message = exchange.getCurrentResponse();
}
message.addMessageObserver(new TimeoutTask(endpoint, exchange));
LOGGER.trace("{}send {}{}", tag, messageType, type);
LOGGER.trace("{}send {}{}", tag, messageType, message.getType());
if (statistic != null) {
statistic.sendRequest();
}
Expand All @@ -352,7 +340,7 @@ private boolean checkNSTART(RemoteEndpoint endpoint, Exchange exchange) {
statistic.queueRequest();
}
} else {
LOGGER.debug("{}drop {}{}, queue full {}", tag, messageType, type, size);
LOGGER.debug("{}drop {}{}, queue full {}", tag, messageType, message.getType(), size);
}
return false;
}
Expand Down Expand Up @@ -465,6 +453,7 @@ public void sendRequest(Exchange exchange, Request request) {
// process ReliabilityLayer
prepareRequest(exchange, request);
RemoteEndpoint endpoint = getRemoteEndpoint(exchange);
exchange.setCurrentRequest(request);
if (checkNSTART(endpoint, exchange)) {
endpoint.checkAging();
LOGGER.debug("{}send request", tag);
Expand Down

0 comments on commit ef16394

Please sign in to comment.