From 48fcef591b2873d6d318b45e5b861a32ee850b69 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Wi=C5=9Bniewski?= Date: Tue, 27 Dec 2022 00:33:32 +0100 Subject: [PATCH 1/7] Create callback with detailed datagram data during drop --- .../californium/scandium/DTLSConnector.java | 123 ++++++++++++++---- .../scandium/DatagramFilterExtended.java | 41 ++++++ 2 files changed, 137 insertions(+), 27 deletions(-) create mode 100644 scandium-core/src/main/java/org/eclipse/californium/scandium/DatagramFilterExtended.java diff --git a/scandium-core/src/main/java/org/eclipse/californium/scandium/DTLSConnector.java b/scandium-core/src/main/java/org/eclipse/californium/scandium/DTLSConnector.java index 23929c5b1d..a70ae7f178 100644 --- a/scandium-core/src/main/java/org/eclipse/californium/scandium/DTLSConnector.java +++ b/scandium-core/src/main/java/org/eclipse/californium/scandium/DTLSConnector.java @@ -1822,6 +1822,9 @@ protected void processDatagram(DatagramPacket packet, InetSocketAddress router) // other information. If not used, a value of zero is inserted. DROP_LOGGER.trace("Discarding record with {} bytes from [{}] without source-port", packet.getLength(), StringUtil.toLog(peerAddress)); + if (datagramFilter instanceof DatagramFilterExtended) { + ((DatagramFilterExtended) datagramFilter).onDrop(packet); + } if (health != null) { health.receivingRecord(true); } @@ -1831,6 +1834,9 @@ protected void processDatagram(DatagramPacket packet, InetSocketAddress router) if (!datagramFilter.onReceiving(packet)) { DROP_LOGGER.trace("Filter out packet with {} bytes from [{}]", packet.getLength(), StringUtil.toLog(peerAddress)); + if (datagramFilter instanceof DatagramFilterExtended) { + ((DatagramFilterExtended) datagramFilter).onDrop(packet); + } if (health != null) { health.receivingRecord(true); } @@ -1845,6 +1851,9 @@ protected void processDatagram(DatagramPacket packet, InetSocketAddress router) if (records.isEmpty()) { DROP_LOGGER.trace("Discarding malicious record with {} bytes from [{}]", packet.getLength(), StringUtil.toLog(peerAddress)); + if (datagramFilter instanceof DatagramFilterExtended) { + ((DatagramFilterExtended) datagramFilter).onDrop(packet); + } if (health != null) { health.receivingRecord(true); } @@ -1856,6 +1865,9 @@ protected void processDatagram(DatagramPacket packet, InetSocketAddress router) records.get(0).getType(), StringUtil.toLog(peerAddress)); LOGGER.debug("Execution shutdown while processing incoming records from peer: {}", StringUtil.toLog(peerAddress)); + if (datagramFilter instanceof DatagramFilterExtended) { + ((DatagramFilterExtended) datagramFilter).onDrop(packet); + } if (health != null) { health.receivingRecord(true); } @@ -1885,30 +1897,42 @@ protected void processRecords(final List records, if (dtlsRole == DtlsRole.CLIENT_ONLY) { DROP_LOGGER.trace("client-only, discarding {} CLIENT_HELLO from [{}]!", records.size(), StringUtil.toLog(peerAddress)); + if (datagramFilter instanceof DatagramFilterExtended) { + ((DatagramFilterExtended) datagramFilter).onDrop(firstRecord); + } if (health != null) { health.receivingRecord(true); } return; } - executeInbound(getExecutorService(), peerAddress, new LimitedRunnable(pendingInboundJobsCountdown) { + executeInbound(getExecutorService(), peerAddress, + new LimitedRunnable(pendingInboundJobsCountdown) { - @Override - public void run() { - try { - if (running.get()) { - if (MDC_SUPPORT) { - MDC.put("PEER", StringUtil.toString(firstRecord.getPeerAddress())); + @Override + public void run() { + try { + if (running.get()) { + if (MDC_SUPPORT) { + MDC.put("PEER", StringUtil.toString(firstRecord.getPeerAddress())); + } + processNewClientHello(firstRecord); + if (MDC_SUPPORT) { + MDC.clear(); + } + } + } finally { + onDequeueing(); } - processNewClientHello(firstRecord); - if (MDC_SUPPORT) { - MDC.clear(); + } + }, + new Runnable() { + @Override + public void run() { + if (datagramFilter instanceof DatagramFilterExtended) { + ((DatagramFilterExtended) datagramFilter).onDrop(firstRecord); } } - } finally { - onDequeueing(); - } - } - }); + }); return; } @@ -1916,6 +1940,9 @@ public void run() { final Connection connection = getConnection(peerAddress, connectionId, false); if (connection == null) { + if (datagramFilter instanceof DatagramFilterExtended) { + ((DatagramFilterExtended) datagramFilter).onDrop(firstRecord); + } if (health != null) { health.receivingRecord(true); } @@ -1934,19 +1961,27 @@ public void run() { for (final Record record : records) { record.setAddress(peerAddress, router); try { - if (!executeInbound(serialExecutor, peerAddress, new LimitedRunnable(pendingInboundJobsCountdown) { - - @Override - public void run() { - try { - if (running.get() && connection.isExecuting()) { - processRecord(record, connection); + if (!executeInbound(serialExecutor, peerAddress, + new LimitedRunnable(pendingInboundJobsCountdown) { + @Override + public void run() { + try { + if (running.get() && connection.isExecuting()) { + processRecord(record, connection); + } + } finally { + onDequeueing(); } - } finally { - onDequeueing(); } - } - })) { + }, + new Runnable() { + @Override + public void run() { + if (datagramFilter instanceof DatagramFilterExtended) { + ((DatagramFilterExtended) datagramFilter).onDrop(record); + } + } + })) { break; } } catch (RuntimeException e) { @@ -1969,7 +2004,7 @@ public void run() { * @since 3.5 */ @NoPublicAPI - protected boolean executeInbound(Executor executor, InetSocketAddress peer, LimitedRunnable job) { + protected boolean executeInbound(Executor executor, InetSocketAddress peer, LimitedRunnable job, Runnable onError) { try { job.execute(executor); return true; @@ -1982,6 +2017,7 @@ protected boolean executeInbound(Executor executor, InetSocketAddress peer, Limi LOGGER.debug("Execution rejected while processing record from peer [{}]", StringUtil.toLog(peer), e); } } + onError.run(); if (health != null) { health.receivingRecord(true); } @@ -2005,6 +2041,9 @@ public void processRecord(Record record, Connection connection) { DROP_LOGGER.debug("Drop received record {}, connection changed address {} => {}! (shift {}ms)", record.getType(), StringUtil.toLog(record.getPeerAddress()), StringUtil.toLog(connection.getPeerAddress()), delay); + if (datagramFilter instanceof DatagramFilterExtended) { + ((DatagramFilterExtended) datagramFilter).onDrop(record); + } if (health != null) { health.receivingRecord(true); } @@ -2027,6 +2066,9 @@ public void processRecord(Record record, Connection connection) { "Discarding {} record [epoch {}, rseqn {}] received from peer [{}], handshake expired!", record.getType(), epoch, record.getSequenceNumber(), StringUtil.toLog(record.getPeerAddress())); + if (datagramFilter instanceof DatagramFilterExtended) { + ((DatagramFilterExtended) datagramFilter).onDrop(record); + } if (health != null) { health.receivingRecord(true); } @@ -2046,6 +2088,9 @@ public void processRecord(Record record, Connection connection) { "Discarding {} record [epoch {}, rseqn {}] received from peer [{}] without an active dtls context", record.getType(), epoch, record.getSequenceNumber(), StringUtil.toLog(record.getPeerAddress())); + if (datagramFilter instanceof DatagramFilterExtended) { + ((DatagramFilterExtended) datagramFilter).onDrop(record); + } if (health != null) { health.receivingRecord(true); } @@ -2071,6 +2116,9 @@ public void processRecord(Record record, Connection connection) { record.getType(), epoch, record.getSequenceNumber(), StringUtil.toLog(record.getPeerAddress())); } + if (datagramFilter instanceof DatagramFilterExtended) { + ((DatagramFilterExtended) datagramFilter).onDrop(record); + } if (health != null) { health.receivingRecord(true); } @@ -2082,6 +2130,9 @@ public void processRecord(Record record, Connection connection) { if (epoch == 0) { DROP_LOGGER.debug("Discarding TLS_CID record received from peer [{}] during handshake", StringUtil.toLog(record.getPeerAddress())); + if (datagramFilter instanceof DatagramFilterExtended) { + ((DatagramFilterExtended) datagramFilter).onDrop(record); + } if (health != null) { health.receivingRecord(true); } @@ -2090,6 +2141,9 @@ public void processRecord(Record record, Connection connection) { } else if (epoch > 0 && connection.expectCid()) { DROP_LOGGER.debug("Discarding record received from peer [{}], CID required!", StringUtil.toLog(record.getPeerAddress())); + if (datagramFilter instanceof DatagramFilterExtended) { + ((DatagramFilterExtended) datagramFilter).onDrop(record); + } if (health != null) { health.receivingRecord(true); } @@ -2101,6 +2155,9 @@ public void processRecord(Record record, Connection connection) { if (!datagramFilter.onReceiving(record, connection)) { DROP_LOGGER.trace("Filter out record with {} bytes from [{}]", record.size(), StringUtil.toLog(record.getPeerAddress())); + if (datagramFilter instanceof DatagramFilterExtended) { + ((DatagramFilterExtended) datagramFilter).onDrop(record); + } if (health != null) { health.receivingRecord(true); } @@ -2140,6 +2197,9 @@ public void processRecord(Record record, Connection connection) { StringUtil.toLog(record.getPeerAddress())); } } catch (RuntimeException e) { + if (datagramFilter instanceof DatagramFilterExtended) { + ((DatagramFilterExtended) datagramFilter).onDrop(record); + } if (health != null) { health.receivingRecord(true); } @@ -2163,6 +2223,9 @@ public void processRecord(Record record, Connection connection) { } DROP_LOGGER.debug("Discarding {} received from peer [{}] caused by {}", record.getType(), StringUtil.toLog(record.getPeerAddress()), e.getMessage()); + if (datagramFilter instanceof DatagramFilterExtended) { + ((DatagramFilterExtended) datagramFilter).onDrop(record); + } if (health != null) { if (health instanceof DtlsHealthExtended2) { ((DtlsHealthExtended2) health).receivingMacError(); @@ -2173,6 +2236,9 @@ public void processRecord(Record record, Connection connection) { } catch (GeneralSecurityException e) { DROP_LOGGER.debug("Discarding {} received from peer [{}] caused by {}", record.getType(), StringUtil.toLog(record.getPeerAddress()), e.getMessage()); + if (datagramFilter instanceof DatagramFilterExtended) { + ((DatagramFilterExtended) datagramFilter).onDrop(record); + } if (health != null) { health.receivingRecord(true); } @@ -2230,6 +2296,9 @@ private void processApplicationDataRecord(final Record record, final Connection if (useNewerRecordFilter && !newest) { DROP_LOGGER.debug("Discarding reorderd {} record [epoch {}, rseqn {}] received from peer [{}]", record.getType(), record.getEpoch(), record.getSequenceNumber(), StringUtil.toLog(record.getPeerAddress())); + if (datagramFilter instanceof DatagramFilterExtended) { + ((DatagramFilterExtended) datagramFilter).onDrop(record); + } if (health != null) { health.receivingRecord(true); } diff --git a/scandium-core/src/main/java/org/eclipse/californium/scandium/DatagramFilterExtended.java b/scandium-core/src/main/java/org/eclipse/californium/scandium/DatagramFilterExtended.java new file mode 100644 index 0000000000..eb579ced49 --- /dev/null +++ b/scandium-core/src/main/java/org/eclipse/californium/scandium/DatagramFilterExtended.java @@ -0,0 +1,41 @@ +/******************************************************************************* + * Copyright (c) 2022 Bosch.IO GmbH and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v2.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v20.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.html. + * + * Contributors: + * Bosch IO.GmbH - initial creation + ******************************************************************************/ +package org.eclipse.californium.scandium; + +import org.eclipse.californium.elements.util.PublicAPIExtension; +import org.eclipse.californium.scandium.dtls.Record; + +import java.net.DatagramPacket; +import java.net.InetSocketAddress; + +/** + * Extension of DatagramFilter + */ +@PublicAPIExtension(type = DatagramFilter.class) +public interface DatagramFilterExtended { + + /** + * Called when a datagram packed is dropped. Allows to inject packet based action in form of callback + * @param packet the dropped datagram packet + */ + void onDrop(DatagramPacket packet); + /** + * Called when a record is dropped. Allows to inject record based action in form of callback + * @param record the dropped record + */ + void onDrop(Record record); + void onDrop(InetSocketAddress sourceAddress); +} From b9aac2840c2f1e4226ed32e3f46d65d6f7d13680 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Wi=C5=9Bniewski?= Date: Tue, 27 Dec 2022 00:35:14 +0100 Subject: [PATCH 2/7] remove garbage --- .../org/eclipse/californium/scandium/DatagramFilterExtended.java | 1 - 1 file changed, 1 deletion(-) diff --git a/scandium-core/src/main/java/org/eclipse/californium/scandium/DatagramFilterExtended.java b/scandium-core/src/main/java/org/eclipse/californium/scandium/DatagramFilterExtended.java index eb579ced49..e19be9fb07 100644 --- a/scandium-core/src/main/java/org/eclipse/californium/scandium/DatagramFilterExtended.java +++ b/scandium-core/src/main/java/org/eclipse/californium/scandium/DatagramFilterExtended.java @@ -37,5 +37,4 @@ public interface DatagramFilterExtended { * @param record the dropped record */ void onDrop(Record record); - void onDrop(InetSocketAddress sourceAddress); } From 130e0c6a54619240324974033a13f955175b0456 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Wi=C5=9Bniewski?= Date: Tue, 27 Dec 2022 09:23:35 +0100 Subject: [PATCH 3/7] fix copyright template --- .../californium/scandium/DatagramFilterExtended.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/scandium-core/src/main/java/org/eclipse/californium/scandium/DatagramFilterExtended.java b/scandium-core/src/main/java/org/eclipse/californium/scandium/DatagramFilterExtended.java index e19be9fb07..ba1e7ee7f0 100644 --- a/scandium-core/src/main/java/org/eclipse/californium/scandium/DatagramFilterExtended.java +++ b/scandium-core/src/main/java/org/eclipse/californium/scandium/DatagramFilterExtended.java @@ -1,17 +1,17 @@ /******************************************************************************* - * Copyright (c) 2022 Bosch.IO GmbH and others. - * + * Copyright (c) 2022 AVSystem and others. + * * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v2.0 * and Eclipse Distribution License v1.0 which accompany this distribution. - * + * * The Eclipse Public License is available at * http://www.eclipse.org/legal/epl-v20.html * and the Eclipse Distribution License is available at * http://www.eclipse.org/org/documents/edl-v10.html. - * + * * Contributors: - * Bosch IO.GmbH - initial creation + * Bartłomiej Wiśniewski (AVSystem) - initial creation ******************************************************************************/ package org.eclipse.californium.scandium; @@ -19,7 +19,6 @@ import org.eclipse.californium.scandium.dtls.Record; import java.net.DatagramPacket; -import java.net.InetSocketAddress; /** * Extension of DatagramFilter From 8e40a145d26f0aea467651e133cb46860df023ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Wi=C5=9Bniewski?= Date: Tue, 27 Dec 2022 09:44:37 +0100 Subject: [PATCH 4/7] Extend LimitedRunable instead of passing Runnable to executeInBounds --- .../elements/util/LimitedRunnable.java | 7 ++ .../californium/scandium/DTLSConnector.java | 89 ++++++++++--------- 2 files changed, 52 insertions(+), 44 deletions(-) diff --git a/element-connector/src/main/java/org/eclipse/californium/elements/util/LimitedRunnable.java b/element-connector/src/main/java/org/eclipse/californium/elements/util/LimitedRunnable.java index 54d4af890d..1d952fc7d5 100644 --- a/element-connector/src/main/java/org/eclipse/californium/elements/util/LimitedRunnable.java +++ b/element-connector/src/main/java/org/eclipse/californium/elements/util/LimitedRunnable.java @@ -90,6 +90,12 @@ public boolean isOverflown() { return overflow; } + /** + * Handles {@code RejectedExecutionException} + * @param ex the thrown exception + */ + public abstract void onError(RejectedExecutionException ex); + /** * Execute this job. * @@ -101,6 +107,7 @@ public void execute(Executor executor) { executor.execute(this); } catch (RejectedExecutionException ex) { onDequeueing(); + onError(ex); throw ex; } } diff --git a/scandium-core/src/main/java/org/eclipse/californium/scandium/DTLSConnector.java b/scandium-core/src/main/java/org/eclipse/californium/scandium/DTLSConnector.java index a70ae7f178..8f1c53fa60 100644 --- a/scandium-core/src/main/java/org/eclipse/californium/scandium/DTLSConnector.java +++ b/scandium-core/src/main/java/org/eclipse/californium/scandium/DTLSConnector.java @@ -1905,34 +1905,32 @@ protected void processRecords(final List records, } return; } - executeInbound(getExecutorService(), peerAddress, - new LimitedRunnable(pendingInboundJobsCountdown) { + executeInbound(getExecutorService(), peerAddress, new LimitedRunnable(pendingInboundJobsCountdown) { - @Override - public void run() { - try { - if (running.get()) { - if (MDC_SUPPORT) { - MDC.put("PEER", StringUtil.toString(firstRecord.getPeerAddress())); - } - processNewClientHello(firstRecord); - if (MDC_SUPPORT) { - MDC.clear(); - } - } - } finally { - onDequeueing(); + @Override + public void run() { + try { + if (running.get()) { + if (MDC_SUPPORT) { + MDC.put("PEER", StringUtil.toString(firstRecord.getPeerAddress())); } - } - }, - new Runnable() { - @Override - public void run() { - if (datagramFilter instanceof DatagramFilterExtended) { - ((DatagramFilterExtended) datagramFilter).onDrop(firstRecord); + processNewClientHello(firstRecord); + if (MDC_SUPPORT) { + MDC.clear(); } } - }); + } finally { + onDequeueing(); + } + } + + @Override + public void onError(RejectedExecutionException ex) { + if (datagramFilter instanceof DatagramFilterExtended) { + ((DatagramFilterExtended) datagramFilter).onDrop(firstRecord); + } + } + }); return; } @@ -1961,27 +1959,25 @@ public void run() { for (final Record record : records) { record.setAddress(peerAddress, router); try { - if (!executeInbound(serialExecutor, peerAddress, - new LimitedRunnable(pendingInboundJobsCountdown) { - @Override - public void run() { - try { - if (running.get() && connection.isExecuting()) { - processRecord(record, connection); - } - } finally { - onDequeueing(); + if (!executeInbound(serialExecutor, peerAddress, new LimitedRunnable(pendingInboundJobsCountdown) { + @Override + public void run() { + try { + if (running.get() && connection.isExecuting()) { + processRecord(record, connection); } + } finally { + onDequeueing(); } - }, - new Runnable() { - @Override - public void run() { - if (datagramFilter instanceof DatagramFilterExtended) { - ((DatagramFilterExtended) datagramFilter).onDrop(record); - } + } + + @Override + public void onError(RejectedExecutionException ex) { + if (datagramFilter instanceof DatagramFilterExtended) { + ((DatagramFilterExtended) datagramFilter).onDrop(record); } - })) { + } + })) { break; } } catch (RuntimeException e) { @@ -2004,7 +2000,7 @@ public void run() { * @since 3.5 */ @NoPublicAPI - protected boolean executeInbound(Executor executor, InetSocketAddress peer, LimitedRunnable job, Runnable onError) { + protected boolean executeInbound(Executor executor, InetSocketAddress peer, LimitedRunnable job) { try { job.execute(executor); return true; @@ -2017,7 +2013,6 @@ protected boolean executeInbound(Executor executor, InetSocketAddress peer, Limi LOGGER.debug("Execution rejected while processing record from peer [{}]", StringUtil.toLog(peer), e); } } - onError.run(); if (health != null) { health.receivingRecord(true); } @@ -3084,6 +3079,9 @@ public void run() { onDequeueing(); } } + + @Override + public void onError(RejectedExecutionException ex) {} })) { message.onError(new IllegalStateException("Outbound message overflow!")); } @@ -3540,6 +3538,9 @@ public void run() { onDequeueing(); } } + + @Override + public void onError(RejectedExecutionException ex) {} }); } catch (RuntimeException e) { LOGGER.warn("Unexpected error occurred while processing handshake result [{}]", connection, e); From 2dd8e81e3eace2a7808efdf3fc3fb6e00d6cea3d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Wi=C5=9Bniewski?= Date: Tue, 27 Dec 2022 15:08:40 +0100 Subject: [PATCH 5/7] default implementation of onError in LimetedRunnable --- .../eclipse/californium/elements/util/LimitedRunnable.java | 2 +- .../org/eclipse/californium/scandium/DTLSConnector.java | 6 ------ 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/element-connector/src/main/java/org/eclipse/californium/elements/util/LimitedRunnable.java b/element-connector/src/main/java/org/eclipse/californium/elements/util/LimitedRunnable.java index 1d952fc7d5..50b0d52286 100644 --- a/element-connector/src/main/java/org/eclipse/californium/elements/util/LimitedRunnable.java +++ b/element-connector/src/main/java/org/eclipse/californium/elements/util/LimitedRunnable.java @@ -94,7 +94,7 @@ public boolean isOverflown() { * Handles {@code RejectedExecutionException} * @param ex the thrown exception */ - public abstract void onError(RejectedExecutionException ex); + public void onError(RejectedExecutionException ex) {}; /** * Execute this job. diff --git a/scandium-core/src/main/java/org/eclipse/californium/scandium/DTLSConnector.java b/scandium-core/src/main/java/org/eclipse/californium/scandium/DTLSConnector.java index 8f1c53fa60..ec11815712 100644 --- a/scandium-core/src/main/java/org/eclipse/californium/scandium/DTLSConnector.java +++ b/scandium-core/src/main/java/org/eclipse/californium/scandium/DTLSConnector.java @@ -3079,9 +3079,6 @@ public void run() { onDequeueing(); } } - - @Override - public void onError(RejectedExecutionException ex) {} })) { message.onError(new IllegalStateException("Outbound message overflow!")); } @@ -3538,9 +3535,6 @@ public void run() { onDequeueing(); } } - - @Override - public void onError(RejectedExecutionException ex) {} }); } catch (RuntimeException e) { LOGGER.warn("Unexpected error occurred while processing handshake result [{}]", connection, e); From 6dfeffda8dc454e2e1f42bd59e760f2945c55a2e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Wi=C5=9Bniewski?= Date: Wed, 28 Dec 2022 12:07:16 +0100 Subject: [PATCH 6/7] add missing datagramFilter.onDrop calls --- .../californium/scandium/DTLSConnector.java | 69 +++++++------------ 1 file changed, 24 insertions(+), 45 deletions(-) diff --git a/scandium-core/src/main/java/org/eclipse/californium/scandium/DTLSConnector.java b/scandium-core/src/main/java/org/eclipse/californium/scandium/DTLSConnector.java index ec11815712..89e7deaa08 100644 --- a/scandium-core/src/main/java/org/eclipse/californium/scandium/DTLSConnector.java +++ b/scandium-core/src/main/java/org/eclipse/californium/scandium/DTLSConnector.java @@ -1897,9 +1897,7 @@ protected void processRecords(final List records, if (dtlsRole == DtlsRole.CLIENT_ONLY) { DROP_LOGGER.trace("client-only, discarding {} CLIENT_HELLO from [{}]!", records.size(), StringUtil.toLog(peerAddress)); - if (datagramFilter instanceof DatagramFilterExtended) { - ((DatagramFilterExtended) datagramFilter).onDrop(firstRecord); - } + informListenerOfRecordDrop(firstRecord); if (health != null) { health.receivingRecord(true); } @@ -1926,9 +1924,7 @@ public void run() { @Override public void onError(RejectedExecutionException ex) { - if (datagramFilter instanceof DatagramFilterExtended) { - ((DatagramFilterExtended) datagramFilter).onDrop(firstRecord); - } + informListenerOfRecordDrop(firstRecord); } }); return; @@ -1938,9 +1934,7 @@ public void onError(RejectedExecutionException ex) { final Connection connection = getConnection(peerAddress, connectionId, false); if (connection == null) { - if (datagramFilter instanceof DatagramFilterExtended) { - ((DatagramFilterExtended) datagramFilter).onDrop(firstRecord); - } + informListenerOfRecordDrop(firstRecord); if (health != null) { health.receivingRecord(true); } @@ -1973,9 +1967,7 @@ public void run() { @Override public void onError(RejectedExecutionException ex) { - if (datagramFilter instanceof DatagramFilterExtended) { - ((DatagramFilterExtended) datagramFilter).onDrop(record); - } + informListenerOfRecordDrop(record); } })) { break; @@ -2036,9 +2028,7 @@ public void processRecord(Record record, Connection connection) { DROP_LOGGER.debug("Drop received record {}, connection changed address {} => {}! (shift {}ms)", record.getType(), StringUtil.toLog(record.getPeerAddress()), StringUtil.toLog(connection.getPeerAddress()), delay); - if (datagramFilter instanceof DatagramFilterExtended) { - ((DatagramFilterExtended) datagramFilter).onDrop(record); - } + informListenerOfRecordDrop(record); if (health != null) { health.receivingRecord(true); } @@ -2061,9 +2051,7 @@ public void processRecord(Record record, Connection connection) { "Discarding {} record [epoch {}, rseqn {}] received from peer [{}], handshake expired!", record.getType(), epoch, record.getSequenceNumber(), StringUtil.toLog(record.getPeerAddress())); - if (datagramFilter instanceof DatagramFilterExtended) { - ((DatagramFilterExtended) datagramFilter).onDrop(record); - } + informListenerOfRecordDrop(record); if (health != null) { health.receivingRecord(true); } @@ -2083,9 +2071,7 @@ public void processRecord(Record record, Connection connection) { "Discarding {} record [epoch {}, rseqn {}] received from peer [{}] without an active dtls context", record.getType(), epoch, record.getSequenceNumber(), StringUtil.toLog(record.getPeerAddress())); - if (datagramFilter instanceof DatagramFilterExtended) { - ((DatagramFilterExtended) datagramFilter).onDrop(record); - } + informListenerOfRecordDrop(record); if (health != null) { health.receivingRecord(true); } @@ -2111,9 +2097,7 @@ public void processRecord(Record record, Connection connection) { record.getType(), epoch, record.getSequenceNumber(), StringUtil.toLog(record.getPeerAddress())); } - if (datagramFilter instanceof DatagramFilterExtended) { - ((DatagramFilterExtended) datagramFilter).onDrop(record); - } + informListenerOfRecordDrop(record); if (health != null) { health.receivingRecord(true); } @@ -2125,9 +2109,7 @@ public void processRecord(Record record, Connection connection) { if (epoch == 0) { DROP_LOGGER.debug("Discarding TLS_CID record received from peer [{}] during handshake", StringUtil.toLog(record.getPeerAddress())); - if (datagramFilter instanceof DatagramFilterExtended) { - ((DatagramFilterExtended) datagramFilter).onDrop(record); - } + informListenerOfRecordDrop(record); if (health != null) { health.receivingRecord(true); } @@ -2136,9 +2118,7 @@ public void processRecord(Record record, Connection connection) { } else if (epoch > 0 && connection.expectCid()) { DROP_LOGGER.debug("Discarding record received from peer [{}], CID required!", StringUtil.toLog(record.getPeerAddress())); - if (datagramFilter instanceof DatagramFilterExtended) { - ((DatagramFilterExtended) datagramFilter).onDrop(record); - } + informListenerOfRecordDrop(record); if (health != null) { health.receivingRecord(true); } @@ -2150,9 +2130,7 @@ public void processRecord(Record record, Connection connection) { if (!datagramFilter.onReceiving(record, connection)) { DROP_LOGGER.trace("Filter out record with {} bytes from [{}]", record.size(), StringUtil.toLog(record.getPeerAddress())); - if (datagramFilter instanceof DatagramFilterExtended) { - ((DatagramFilterExtended) datagramFilter).onDrop(record); - } + informListenerOfRecordDrop(record); if (health != null) { health.receivingRecord(true); } @@ -2192,9 +2170,7 @@ public void processRecord(Record record, Connection connection) { StringUtil.toLog(record.getPeerAddress())); } } catch (RuntimeException e) { - if (datagramFilter instanceof DatagramFilterExtended) { - ((DatagramFilterExtended) datagramFilter).onDrop(record); - } + informListenerOfRecordDrop(record); if (health != null) { health.receivingRecord(true); } @@ -2218,9 +2194,7 @@ public void processRecord(Record record, Connection connection) { } DROP_LOGGER.debug("Discarding {} received from peer [{}] caused by {}", record.getType(), StringUtil.toLog(record.getPeerAddress()), e.getMessage()); - if (datagramFilter instanceof DatagramFilterExtended) { - ((DatagramFilterExtended) datagramFilter).onDrop(record); - } + informListenerOfRecordDrop(record); if (health != null) { if (health instanceof DtlsHealthExtended2) { ((DtlsHealthExtended2) health).receivingMacError(); @@ -2231,9 +2205,7 @@ public void processRecord(Record record, Connection connection) { } catch (GeneralSecurityException e) { DROP_LOGGER.debug("Discarding {} received from peer [{}] caused by {}", record.getType(), StringUtil.toLog(record.getPeerAddress()), e.getMessage()); - if (datagramFilter instanceof DatagramFilterExtended) { - ((DatagramFilterExtended) datagramFilter).onDrop(record); - } + informListenerOfRecordDrop(record); if (health != null) { health.receivingRecord(true); } @@ -2291,9 +2263,7 @@ private void processApplicationDataRecord(final Record record, final Connection if (useNewerRecordFilter && !newest) { DROP_LOGGER.debug("Discarding reorderd {} record [epoch {}, rseqn {}] received from peer [{}]", record.getType(), record.getEpoch(), record.getSequenceNumber(), StringUtil.toLog(record.getPeerAddress())); - if (datagramFilter instanceof DatagramFilterExtended) { - ((DatagramFilterExtended) datagramFilter).onDrop(record); - } + informListenerOfRecordDrop(record); if (health != null) { health.receivingRecord(true); } @@ -2329,6 +2299,7 @@ private void processApplicationDataRecord(final Record record, final Connection } else { DROP_LOGGER.debug("Discarding APPLICATION_DATA record received from peer [{}]", StringUtil.toLog(record.getPeerAddress())); + informListenerOfRecordDrop(record); } } @@ -3421,6 +3392,7 @@ public void dropReceivedRecord(Record record) { DROP_LOGGER.debug("Discarding {} record [epoch {}, rseqn {}] dropped by handshaker for peer [{}]", record.getType(), record.getEpoch(), record.getSequenceNumber(), StringUtil.toLog(record.getPeerAddress())); + informListenerOfRecordDrop(record); if (health != null) { health.receivingRecord(true); } @@ -3894,6 +3866,7 @@ public final void setAlertHandler(AlertHandler handler) { } private void discardRecord(final Record record, final Throwable cause) { + informListenerOfRecordDrop(record); if (health != null) { health.receivingRecord(true); } @@ -3910,6 +3883,12 @@ private void discardRecord(final Record record, final Throwable cause) { } } + private void informListenerOfRecordDrop(Record droppedRecord) { + if (datagramFilter instanceof DatagramFilterExtended) { + ((DatagramFilterExtended) datagramFilter).onDrop(droppedRecord); + } + } + @Override public String getProtocol() { return "DTLS"; From 82f2876e505db7c1440fc6790400036683e6d6ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Wi=C5=9Bniewski?= Date: Wed, 28 Dec 2022 12:11:17 +0100 Subject: [PATCH 7/7] DatagramFilterExtended and LimitedRunnable#onError javadoc cosmetic changes --- .../californium/elements/util/LimitedRunnable.java | 2 ++ .../californium/scandium/DatagramFilterExtended.java | 8 +++++--- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/element-connector/src/main/java/org/eclipse/californium/elements/util/LimitedRunnable.java b/element-connector/src/main/java/org/eclipse/californium/elements/util/LimitedRunnable.java index 50b0d52286..f06f9d169a 100644 --- a/element-connector/src/main/java/org/eclipse/californium/elements/util/LimitedRunnable.java +++ b/element-connector/src/main/java/org/eclipse/californium/elements/util/LimitedRunnable.java @@ -93,6 +93,8 @@ public boolean isOverflown() { /** * Handles {@code RejectedExecutionException} * @param ex the thrown exception + * + * @since 3.8 */ public void onError(RejectedExecutionException ex) {}; diff --git a/scandium-core/src/main/java/org/eclipse/californium/scandium/DatagramFilterExtended.java b/scandium-core/src/main/java/org/eclipse/californium/scandium/DatagramFilterExtended.java index ba1e7ee7f0..d9d3c2eb4f 100644 --- a/scandium-core/src/main/java/org/eclipse/californium/scandium/DatagramFilterExtended.java +++ b/scandium-core/src/main/java/org/eclipse/californium/scandium/DatagramFilterExtended.java @@ -21,18 +21,20 @@ import java.net.DatagramPacket; /** - * Extension of DatagramFilter + * Extension of DatagramFilter. + * + * @since 3.8 */ @PublicAPIExtension(type = DatagramFilter.class) public interface DatagramFilterExtended { /** - * Called when a datagram packed is dropped. Allows to inject packet based action in form of callback + * Called when a datagram packed is dropped. Allows to inject packet based action in form of callback. * @param packet the dropped datagram packet */ void onDrop(DatagramPacket packet); /** - * Called when a record is dropped. Allows to inject record based action in form of callback + * Called when a record is dropped. Allows to inject record based action in form of callback. * @param record the dropped record */ void onDrop(Record record);