From ed90540a84198ed31892613f653ed53e8ce88723 Mon Sep 17 00:00:00 2001 From: carlos-schmidt <18703981+carlos-schmidt@users.noreply.github.com> Date: Thu, 31 Oct 2024 10:56:28 +0100 Subject: [PATCH 1/3] Don't let pipelines of same type run simultaneously --- .../iosb/app/util/VariableRateScheduler.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/edc-extension4aas/src/main/java/de/fraunhofer/iosb/app/util/VariableRateScheduler.java b/edc-extension4aas/src/main/java/de/fraunhofer/iosb/app/util/VariableRateScheduler.java index 84c0cc49..9045c629 100644 --- a/edc-extension4aas/src/main/java/de/fraunhofer/iosb/app/util/VariableRateScheduler.java +++ b/edc-extension4aas/src/main/java/de/fraunhofer/iosb/app/util/VariableRateScheduler.java @@ -54,7 +54,7 @@ public VariableRateScheduler(int corePoolSize, Runnable runnable, Monitor monito public void scheduleAtVariableRate(Supplier rateSupplier) { schedule(() -> { try { - runnable.run(); + runRunnable(); } catch (Exception e) { monitor.severe("VariableRateScheduler stopped execution.", e); throw new EdcException(e); @@ -63,13 +63,17 @@ public void scheduleAtVariableRate(Supplier rateSupplier) { }, (long) rateSupplier.get(), TimeUnit.SECONDS); } + private synchronized void runRunnable() { + runnable.run(); + } + @Override public void created(Registry registry) { - runnable.run(); + runRunnable(); } @Override public void created(Service service) { - runnable.run(); + runRunnable(); } } From 93c34b39487821ab6906d758bf9b28feac25845d Mon Sep 17 00:00:00 2001 From: carlos-schmidt <18703981+carlos-schmidt@users.noreply.github.com> Date: Thu, 31 Oct 2024 10:56:58 +0100 Subject: [PATCH 2/3] Fix: leaked connection on error response (okhttp3) --- .../main/java/de/fraunhofer/iosb/app/aas/agent/AasAgent.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/edc-extension4aas/src/main/java/de/fraunhofer/iosb/app/aas/agent/AasAgent.java b/edc-extension4aas/src/main/java/de/fraunhofer/iosb/app/aas/agent/AasAgent.java index 217cde4e..0dbb40f2 100644 --- a/edc-extension4aas/src/main/java/de/fraunhofer/iosb/app/aas/agent/AasAgent.java +++ b/edc-extension4aas/src/main/java/de/fraunhofer/iosb/app/aas/agent/AasAgent.java @@ -61,6 +61,7 @@ protected Result> readElements(AasDataProcessor processor, AasProvid var responseResult = executeRequest(processor, dataAddress); if (responseResult.failed()) { + responseResult.getContent().close(); return Result.failure("Reading %s from %s failed: %s" .formatted(clazz.getName(), path, responseResult.getFailureDetail())); } @@ -76,8 +77,8 @@ protected Result> readElements(AasDataProcessor processor, AasProvid } private Result executeRequest(AasDataProcessor processor, AasDataAddress dataAddress) { - try { - return Result.success(processor.send(dataAddress)); + try(var response = processor.send(dataAddress)) { + return Result.success(response); } catch (IOException httpIOException) { return Result.failure(List.of(httpIOException.getClass().getSimpleName(), httpIOException.getMessage())); } From 0f188dd9f4b225dd668e09a154e5002680e96240 Mon Sep 17 00:00:00 2001 From: carlos-schmidt <18703981+carlos-schmidt@users.noreply.github.com> Date: Thu, 31 Oct 2024 11:10:33 +0100 Subject: [PATCH 3/3] Fix autoclosing responseBody --- .../iosb/app/aas/agent/AasAgent.java | 29 ++++++------------- 1 file changed, 9 insertions(+), 20 deletions(-) diff --git a/edc-extension4aas/src/main/java/de/fraunhofer/iosb/app/aas/agent/AasAgent.java b/edc-extension4aas/src/main/java/de/fraunhofer/iosb/app/aas/agent/AasAgent.java index 0dbb40f2..c172e792 100644 --- a/edc-extension4aas/src/main/java/de/fraunhofer/iosb/app/aas/agent/AasAgent.java +++ b/edc-extension4aas/src/main/java/de/fraunhofer/iosb/app/aas/agent/AasAgent.java @@ -22,7 +22,6 @@ import de.fraunhofer.iosb.app.pipeline.PipelineStep; import de.fraunhofer.iosb.dataplane.aas.spi.AasDataAddress; import de.fraunhofer.iosb.model.aas.AasProvider; -import okhttp3.Response; import okhttp3.ResponseBody; import org.eclipse.digitaltwin.aas4j.v3.dataformat.core.DeserializationException; import org.eclipse.digitaltwin.aas4j.v3.dataformat.json.JsonDeserializer; @@ -58,29 +57,19 @@ protected Result> readElements(AasDataProcessor processor, AasProvid .path(path) .build(); - var responseResult = executeRequest(processor, dataAddress); + try (var response = processor.send(dataAddress)) { - if (responseResult.failed()) { - responseResult.getContent().close(); - return Result.failure("Reading %s from %s failed: %s" - .formatted(clazz.getName(), path, responseResult.getFailureDetail())); - } - - var response = responseResult.getContent(); + if (response.isSuccessful() && response.body() != null) { + return readList(response.body(), clazz); + } - if (response.isSuccessful() && response.body() != null) { - return readList(response.body(), clazz); - } - - return Result.failure("Reading %s from %s failed: %s, %s" - .formatted(clazz.getSimpleName(), path, response.code(), response.message())); - } + return Result.failure("Reading %s from %s failed: %s, %s" + .formatted(clazz.getSimpleName(), path, response.code(), response.message())); - private Result executeRequest(AasDataProcessor processor, AasDataAddress dataAddress) { - try(var response = processor.send(dataAddress)) { - return Result.success(response); } catch (IOException httpIOException) { - return Result.failure(List.of(httpIOException.getClass().getSimpleName(), httpIOException.getMessage())); + return Result.failure("Reading %s from %s failed: %s %s" + .formatted(clazz.getName(), path, httpIOException.getClass().getSimpleName(), + httpIOException.getMessage())); } }