From eb6794ed71f8b8bdfd25483408702c8b32108a72 Mon Sep 17 00:00:00 2001 From: jhartmann Date: Thu, 1 Aug 2024 11:56:06 +0200 Subject: [PATCH 1/3] fix(irs-api):[#755] fix job callbacks called multiple times after completion --- CHANGELOG.md | 1 + .../job/CallbackResponderEventListener.java | 155 +++++++++++------- .../irs/connector/job/JobOrchestrator.java | 16 +- .../irs/IrsWireMockIntegrationTest.java | 45 ++++- .../eclipse/tractusx/irs/WiremockSupport.java | 31 +++- .../CallbackResponderEventListenerTest.java | 110 ++++++++++--- 6 files changed, 271 insertions(+), 87 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index dc692742ef..2a0aada3c6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ _**For better traceability add the corresponding GitHub issue number in each cha - Change policy to include full namespace `https://w3id.org/catenax/policy/` instead of `cx-policy:` in some remaining code places (in context of #794). - Fixed flaky test `InMemoryJobStoreTest.checkLastModifiedOnAfterCreation()` (PR#857). +- Fixed occasion where completed Job callbacks are called multiple times. #755 ### Changed diff --git a/irs-api/src/main/java/org/eclipse/tractusx/irs/aaswrapper/job/CallbackResponderEventListener.java b/irs-api/src/main/java/org/eclipse/tractusx/irs/aaswrapper/job/CallbackResponderEventListener.java index 37e30a3aa3..9cd6320397 100644 --- a/irs-api/src/main/java/org/eclipse/tractusx/irs/aaswrapper/job/CallbackResponderEventListener.java +++ b/irs-api/src/main/java/org/eclipse/tractusx/irs/aaswrapper/job/CallbackResponderEventListener.java @@ -26,6 +26,9 @@ import static org.eclipse.tractusx.irs.configuration.RestTemplateConfig.NO_ERROR_REST_TEMPLATE; import java.net.URI; +import java.time.Duration; +import java.time.LocalDateTime; +import java.time.ZoneOffset; import java.util.HashMap; import java.util.Map; import java.util.UUID; @@ -60,87 +63,124 @@ class CallbackResponderEventListener { public static final String INVALID_CALLBACK_URL = "Invalid callback url '{}'."; private final UrlValidator urlValidator; private final RestTemplate restTemplate; + private final Map completedCallbacks; /* package */ CallbackResponderEventListener( @Qualifier(NO_ERROR_REST_TEMPLATE) final RestTemplate noErrorRestTemplate) { this.urlValidator = new UrlValidator(UrlValidator.ALLOW_LOCAL_URLS); this.restTemplate = noErrorRestTemplate; + this.completedCallbacks = new HashMap<>(); } @Async @EventListener public void handleJobProcessingFinishedEvent(final JobProcessingFinishedEvent jobProcessingFinishedEvent) { - if (thereIsCallbackUrlRegistered(jobProcessingFinishedEvent.callbackUrl())) { - log.info("Processing of job has finished - attempting to notify job requestor"); - - final URI callbackUri = buildCallbackUri(jobProcessingFinishedEvent.callbackUrl(), - jobProcessingFinishedEvent.jobId(), JobState.valueOf(jobProcessingFinishedEvent.jobState())); - if (urlValidator.isValid(callbackUri.toString())) { - log.info("Got callback url {} for jobId {} with state {}", callbackUri, - jobProcessingFinishedEvent.jobId(), jobProcessingFinishedEvent.jobState()); - - try { - final ResponseEntity callbackResponse = restTemplate.getForEntity(callbackUri, Void.class); - log.info("Callback url pinged, received http status: {}, jobId {}", callbackResponse.getStatusCode(), jobProcessingFinishedEvent.jobId()); - } catch (final ResourceAccessException resourceAccessException) { - log.warn("Callback url is not reachable - connection timed out, jobId {}", jobProcessingFinishedEvent.jobId()); - } - } else { - log.warn(INVALID_CALLBACK_URL, callbackUri); - } + if (noCallbackUrlRegistered(jobProcessingFinishedEvent.callbackUrl())) { + return; + } + log.info("Processing of job has finished - attempting to notify job requestor"); + + final URI callbackUri = buildCallbackUri(jobProcessingFinishedEvent.callbackUrl(), + jobProcessingFinishedEvent.jobId(), JobState.valueOf(jobProcessingFinishedEvent.jobState())); + + if (!urlValidator.isValid(callbackUri.toString())) { + log.warn(INVALID_CALLBACK_URL, callbackUri); + return; } + + log.info("Got callback url '{}' for jobId '{}' with state '{}'", callbackUri, jobProcessingFinishedEvent.jobId(), + jobProcessingFinishedEvent.jobState()); + sendCallback(callbackUri, jobProcessingFinishedEvent.jobId()); } @Async @EventListener public void handleBatchProcessingFinishedEvent(final BatchProcessingFinishedEvent batchProcessingFinishedEvent) { - if (thereIsCallbackUrlRegistered(batchProcessingFinishedEvent.callbackUrl())) { - log.info("Processing of Batch has finished - attempting to notify requestor"); - - final URI callbackUri = buildCallbackUri(batchProcessingFinishedEvent.callbackUrl(), batchProcessingFinishedEvent.batchOrderId(), - batchProcessingFinishedEvent.batchId(), batchProcessingFinishedEvent.batchOrderState(), batchProcessingFinishedEvent.batchState()); - if (urlValidator.isValid(callbackUri.toString())) { - log.info("Got callback url {} for orderId {} with orderState {} and batchId {} with batchState {}", callbackUri, - batchProcessingFinishedEvent.batchOrderId(), batchProcessingFinishedEvent.batchOrderState(), batchProcessingFinishedEvent.batchId(), batchProcessingFinishedEvent.batchState()); - - try { - final ResponseEntity callbackResponse = restTemplate.getForEntity(callbackUri, Void.class); - log.info("Callback url pinged, received http status: {}, orderId {} batchId {}", callbackResponse.getStatusCode(), batchProcessingFinishedEvent.batchOrderId(), batchProcessingFinishedEvent.batchId()); - } catch (final ResourceAccessException resourceAccessException) { - log.warn("Callback url is not reachable - connection timed out, orderId {} batchId {}", batchProcessingFinishedEvent.batchOrderId(), batchProcessingFinishedEvent.batchId()); - } - } else { - log.warn(INVALID_CALLBACK_URL, callbackUri); - } + if (noCallbackUrlRegistered(batchProcessingFinishedEvent.callbackUrl())) { + return; } + log.info("Processing of Batch has finished - attempting to notify requestor"); + + final URI callbackUri = buildCallbackUri(batchProcessingFinishedEvent.callbackUrl(), + batchProcessingFinishedEvent.batchOrderId(), batchProcessingFinishedEvent.batchId(), + batchProcessingFinishedEvent.batchOrderState(), batchProcessingFinishedEvent.batchState()); + + if (!urlValidator.isValid(callbackUri.toString())) { + log.warn(INVALID_CALLBACK_URL, callbackUri); + return; + } + + log.info("Got callback url '{}' for orderId '{}' with orderState '{}' and batchId '{}' with batchState '{}'", callbackUri, + batchProcessingFinishedEvent.batchOrderId(), batchProcessingFinishedEvent.batchOrderState(), + batchProcessingFinishedEvent.batchId(), batchProcessingFinishedEvent.batchState()); + sendCallback(callbackUri, batchProcessingFinishedEvent.batchId().toString()); } @Async @EventListener - public void handleBatchOrderProcessingFinishedEvent(final BatchOrderProcessingFinishedEvent batchOrderProcessingFinishedEvent) { - if (thereIsCallbackUrlRegistered(batchOrderProcessingFinishedEvent.callbackUrl())) { - log.info("Processing of Batch Order has finished - attempting to notify requestor"); - - final URI callbackUri = buildCallbackUri(batchOrderProcessingFinishedEvent.callbackUrl(), batchOrderProcessingFinishedEvent.batchOrderId(), - null, batchOrderProcessingFinishedEvent.batchOrderState(), null); - if (urlValidator.isValid(callbackUri.toString())) { - log.info("Got callback url {} for orderId {} with orderState {}", callbackUri, - batchOrderProcessingFinishedEvent.batchOrderId(), batchOrderProcessingFinishedEvent.batchOrderState()); - - try { - final ResponseEntity callbackResponse = restTemplate.getForEntity(callbackUri, Void.class); - log.info("Callback url pinged, received http status: {}, orderId {}", callbackResponse.getStatusCode(), batchOrderProcessingFinishedEvent.batchOrderId()); - } catch (final ResourceAccessException resourceAccessException) { - log.warn("Callback url is not reachable - connection timed out, jobId {}", batchOrderProcessingFinishedEvent.batchOrderId()); - } - } else { - log.warn(INVALID_CALLBACK_URL, callbackUri); + public void handleBatchOrderProcessingFinishedEvent( + final BatchOrderProcessingFinishedEvent batchOrderProcessingFinishedEvent) { + if (noCallbackUrlRegistered(batchOrderProcessingFinishedEvent.callbackUrl())) { + return; + } + log.info("Processing of Batch Order has finished - attempting to notify requestor"); + + final URI callbackUri = buildCallbackUri(batchOrderProcessingFinishedEvent.callbackUrl(), + batchOrderProcessingFinishedEvent.batchOrderId(), null, + batchOrderProcessingFinishedEvent.batchOrderState(), null); + if (!urlValidator.isValid(callbackUri.toString())) { + log.warn(INVALID_CALLBACK_URL, callbackUri); + return; + } + + log.info("Got callback url '{}' for orderId '{}' with orderState '{}'", callbackUri, + batchOrderProcessingFinishedEvent.batchOrderId(), batchOrderProcessingFinishedEvent.batchOrderState()); + sendCallback(callbackUri, batchOrderProcessingFinishedEvent.batchOrderId().toString()); + + } + + private void sendCallback(final URI callbackUri, final String key) { + if (callbackNotSentYet(key)) { + addJobToSentCallbacks(key); + cleanupValuesOlderThan(Duration.ofHours(1)); + try { + final ResponseEntity callbackResponse = restTemplate.getForEntity(callbackUri, Void.class); + log.info("Callback url '{}' pinged, received http status: '{}'", callbackUri, + callbackResponse.getStatusCode()); + } catch (final ResourceAccessException resourceAccessException) { + log.warn("Callback url '{}' is not reachable - connection timed out.", callbackUri); } + } else { + log.info("Callback for url '{}' is already sent.", callbackUri); + } + } + + private void addJobToSentCallbacks(final String key) { + final LocalDateTime currentTime = LocalDateTime.now(); + synchronized (completedCallbacks) { + completedCallbacks.put(key, currentTime.toEpochSecond(ZoneOffset.UTC)); + } + + } + + private void cleanupValuesOlderThan(final Duration otherDuration) { + final LocalDateTime currentTime = LocalDateTime.now(); + synchronized (completedCallbacks) { + completedCallbacks.entrySet() + .removeIf(entry -> + Duration.between(LocalDateTime.ofEpochSecond(entry.getValue(), 0, ZoneOffset.UTC), + currentTime).compareTo(otherDuration) > 0); + } + } + + private boolean callbackNotSentYet(final String key) { + synchronized (completedCallbacks) { + return !completedCallbacks.containsKey(key); } } - private boolean thereIsCallbackUrlRegistered(final String callbackUrl) { - return StringUtils.isNotBlank(callbackUrl); + private boolean noCallbackUrlRegistered(final String callbackUrl) { + return !StringUtils.isNotBlank(callbackUrl); } @SuppressWarnings("PMD.UseConcurrentHashMap") @@ -155,7 +195,8 @@ private URI buildCallbackUri(final String callbackUrl, final String jobId, final } @SuppressWarnings("PMD.UseConcurrentHashMap") - private URI buildCallbackUri(final String callbackUrl, final UUID orderId, final UUID batchId, final ProcessingState orderState, final ProcessingState batchState) { + private URI buildCallbackUri(final String callbackUrl, final UUID orderId, final UUID batchId, + final ProcessingState orderState, final ProcessingState batchState) { final Map uriVariables = new HashMap<>(); uriVariables.put("orderId", orderId); uriVariables.put("batchId", batchId); diff --git a/irs-api/src/main/java/org/eclipse/tractusx/irs/connector/job/JobOrchestrator.java b/irs-api/src/main/java/org/eclipse/tractusx/irs/connector/job/JobOrchestrator.java index d07a12d581..f00c332eae 100644 --- a/irs-api/src/main/java/org/eclipse/tractusx/irs/connector/job/JobOrchestrator.java +++ b/irs-api/src/main/java/org/eclipse/tractusx/irs/connector/job/JobOrchestrator.java @@ -30,7 +30,6 @@ import java.util.List; import java.util.Optional; import java.util.UUID; -import java.util.stream.Collectors; import java.util.stream.Stream; import lombok.extern.slf4j.Slf4j; @@ -250,19 +249,22 @@ private void markJobInError(final MultiTransferJob job, final Throwable exceptio } private void publishJobProcessingFinishedEventIfFinished(final String jobId) { - jobStore.find(jobId).ifPresent(job -> { - if (job.getJob().getState().equals(JobState.COMPLETED) || job.getJob() - .getState() - .equals(JobState.ERROR)) { + jobStore.find(jobId).ifPresentOrElse(job -> { + if (JobState.COMPLETED.equals(job.getJob().getState()) || JobState.ERROR.equals(job.getJob().getState())) { + log.info("Publishing JobProcessingFinishedEvent for job '{}' with status '{}'.", job.getJobIdString(), + job.getJob().getState()); applicationEventPublisher.publishEvent( new JobProcessingFinishedEvent(job.getJobIdString(), job.getJob().getState().name(), job.getJobParameter().getCallbackUrl(), job.getBatchId())); + } else { + log.warn("Could not publish JobProcessingFinishedEvent. Job '{}' not in state COMPLETED or ERROR.", + jobId); } - }); + }, () -> log.warn("Could not publish JobProcessingFinishedEvent. Job '{}' not present.", jobId)); } private long startTransfers(final MultiTransferJob job, final Stream dataRequests) /* throws JobErrorDetails */ { - return dataRequests.map(r -> startTransfer(job, r)).collect(Collectors.counting()); + return dataRequests.map(r -> startTransfer(job, r)).toList().size(); } private TransferInitiateResponse startTransfer(final MultiTransferJob job, diff --git a/irs-api/src/test/java/org/eclipse/tractusx/irs/IrsWireMockIntegrationTest.java b/irs-api/src/test/java/org/eclipse/tractusx/irs/IrsWireMockIntegrationTest.java index 4f70fb6c7b..f685885016 100644 --- a/irs-api/src/test/java/org/eclipse/tractusx/irs/IrsWireMockIntegrationTest.java +++ b/irs-api/src/test/java/org/eclipse/tractusx/irs/IrsWireMockIntegrationTest.java @@ -51,6 +51,7 @@ import static org.eclipse.tractusx.irs.testing.wiremock.SubmodelFacadeWiremockSupport.PATH_TRANSFER; import java.time.Duration; +import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -189,6 +190,48 @@ void shouldStopJobAfterDepthIsReached() { assertThat(jobForJobId.getTombstones()).isEmpty(); } + @Test + void shouldSendOneCallbackAfterJobCompletion() { + // Arrange + final String globalAssetIdLevel1 = "globalAssetId"; + final String globalAssetIdLevel2 = "urn:uuid:7e4541ea-bb0f-464c-8cb3-021abccbfaf5"; + + WiremockSupport.successfulSemanticModelRequest(); + WiremockSupport.successfulSemanticHubRequests(); + WiremockSupport.successfulDiscovery(); + WiremockSupport.successfulCallbackRequest(); + + successfulRegistryAndDataRequest(globalAssetIdLevel1, "Cathode", TEST_BPN, "integrationtesting/batch-1.json", + "integrationtesting/singleLevelBomAsBuilt-1.json"); + successfulRegistryAndDataRequest(globalAssetIdLevel2, "Polyamid", TEST_BPN, "integrationtesting/batch-2.json", + "integrationtesting/singleLevelBomAsBuilt-2.json"); + + final RegisterJob request = WiremockSupport.jobRequest(globalAssetIdLevel1, TEST_BPN, 1, WiremockSupport.CALLBACK_URL); + + // Act + List startedJobs = new ArrayList<>(); + for (int i = 0; i < 50; i++) { + startedJobs.add(irsService.registerItemJob(request)); + } + + for (JobHandle jobHandle : startedJobs) { + assertThat(jobHandle.getId()).isNotNull(); + waitForCompletion(jobHandle); + } + + // Assert + for (JobHandle jobHandle : startedJobs) { + final Jobs jobForJobId = irsService.getJobForJobId(jobHandle.getId(), true); + + assertThat(jobForJobId.getJob().getState()).isEqualTo(JobState.COMPLETED); + assertThat(jobForJobId.getShells()).hasSize(2); + assertThat(jobForJobId.getRelationships()).hasSize(1); + assertThat(jobForJobId.getTombstones()).isEmpty(); + + WiremockSupport.verifyCallbackCall(jobHandle.getId().toString(), JobState.COMPLETED, 1); + } + } + @Test void shouldCreateTombstoneWhenDiscoveryServiceNotAvailable() { // Arrange @@ -423,7 +466,7 @@ private void failedNegotiation() { private void waitForCompletion(final JobHandle jobHandle) { Awaitility.await() .timeout(Duration.ofSeconds(35)) - .pollInterval(Duration.ofSeconds(1)) + .pollInterval(Duration.ofMillis(500)) .until(() -> irsService.getJobForJobId(jobHandle.getId(), false) .getJob() .getState() diff --git a/irs-api/src/test/java/org/eclipse/tractusx/irs/WiremockSupport.java b/irs-api/src/test/java/org/eclipse/tractusx/irs/WiremockSupport.java index 5c73b9dc13..0192d77d33 100644 --- a/irs-api/src/test/java/org/eclipse/tractusx/irs/WiremockSupport.java +++ b/irs-api/src/test/java/org/eclipse/tractusx/irs/WiremockSupport.java @@ -19,8 +19,10 @@ ********************************************************************************/ package org.eclipse.tractusx.irs; +import static com.github.tomakehurst.wiremock.client.WireMock.equalTo; import static com.github.tomakehurst.wiremock.client.WireMock.get; import static com.github.tomakehurst.wiremock.client.WireMock.getRequestedFor; +import static com.github.tomakehurst.wiremock.client.WireMock.matching; import static com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor; import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo; @@ -45,6 +47,7 @@ import org.eclipse.tractusx.irs.component.RegisterJob; import org.eclipse.tractusx.irs.component.assetadministrationshell.IdentifierKeyValuePair; import org.eclipse.tractusx.irs.component.enums.Direction; +import org.eclipse.tractusx.irs.component.enums.JobState; import org.eclipse.tractusx.irs.data.StringMapper; import org.eclipse.tractusx.irs.edc.client.configuration.JsonLdConfiguration; import org.eclipse.tractusx.irs.edc.client.model.EDRAuthCode; @@ -57,6 +60,8 @@ public class WiremockSupport { public static final String SUBMODEL_SUFFIX = "/\\$value"; + public static final String CALLBACK_URL = "http://localhost/callback?id={id}&state={state}"; + public static final String CALLBACK_PATH = "/callback"; public static EndpointDataReference createEndpointDataReference(final String contractAgreementId) { final EDRAuthCode edrAuthCode = EDRAuthCode.builder() @@ -94,6 +99,18 @@ static RegisterJob jobRequest(final String globalAssetId, final String bpn, fina .build(); } + static RegisterJob jobRequest(final String globalAssetId, final String bpn, final int depth, + final String callbackUrl) { + return RegisterJob.builder() + .key(PartChainIdentificationKey.builder().bpn(bpn).globalAssetId(globalAssetId).build()) + .depth(depth) + .aspects(List.of(BATCH_3_0_0, SINGLE_LEVEL_BOM_AS_BUILT_3_0_0)) + .collectAspects(true) + .direction(Direction.DOWNWARD) + .callbackUrl(callbackUrl) + .build(); + } + static void successfulDiscovery() { stubFor(DiscoveryServiceWiremockSupport.postDiscoveryFinder200()); stubFor(DiscoveryServiceWiremockSupport.postEdcDiscovery200()); @@ -134,10 +151,22 @@ static void verifyNegotiationCalls(final int times) { } static void successfulDataRequests(final String assetId, final String fileName) { - stubFor(get(urlPathMatching(DtrWiremockSupport.DATAPLANE_PUBLIC_PATH + "/" + assetId+ SUBMODEL_SUFFIX)).willReturn( + stubFor(get( + urlPathMatching(DtrWiremockSupport.DATAPLANE_PUBLIC_PATH + "/" + assetId + SUBMODEL_SUFFIX)).willReturn( responseWithStatus(200).withBodyFile(fileName))); } + static void successfulCallbackRequest() { + stubFor(get(urlPathEqualTo(CALLBACK_PATH)).withQueryParam("id", matching(".*")) + .withQueryParam("state", matching(".*")) + .willReturn(responseWithStatus(200))); + } + + static void verifyCallbackCall(final String jobId, final JobState state, final int times) { + verify(times, getRequestedFor(urlPathEqualTo(CALLBACK_PATH)).withQueryParam("id", equalTo(jobId)) + .withQueryParam("state", equalTo(state.toString()))); + } + static void successfulSemanticHubRequests() { SemanticHubWireMockSupport.semanticHubWillReturnBatchSchema(); SemanticHubWireMockSupport.semanticHubWillReturnSingleLevelBomAsBuiltSchema(); diff --git a/irs-api/src/test/java/org/eclipse/tractusx/irs/aaswrapper/job/CallbackResponderEventListenerTest.java b/irs-api/src/test/java/org/eclipse/tractusx/irs/aaswrapper/job/CallbackResponderEventListenerTest.java index 079ddf736c..8bc220f632 100644 --- a/irs-api/src/test/java/org/eclipse/tractusx/irs/aaswrapper/job/CallbackResponderEventListenerTest.java +++ b/irs-api/src/test/java/org/eclipse/tractusx/irs/aaswrapper/job/CallbackResponderEventListenerTest.java @@ -27,7 +27,8 @@ class CallbackResponderEventListenerTest { private final RestTemplate restTemplate = mock(RestTemplate.class); - private final CallbackResponderEventListener callbackResponderEventListener = new CallbackResponderEventListener(restTemplate); + private final CallbackResponderEventListener callbackResponderEventListener = new CallbackResponderEventListener( + restTemplate); @BeforeEach void mockRestTemplate() { @@ -40,8 +41,8 @@ void shouldCallCallbackUrlIfIsValidAndStateCompletedAndJobProcessingFinishedEven final String callbackUrlTemplate = "https://hostname.com/callback?id={id}&state={state}"; final String jobId = UUID.randomUUID().toString(); final JobState jobState = JobState.COMPLETED; - final JobProcessingFinishedEvent jobProcessingFinishedEvent = new JobProcessingFinishedEvent(jobId, jobState.name(), callbackUrlTemplate, - Optional.empty()); + final JobProcessingFinishedEvent jobProcessingFinishedEvent = new JobProcessingFinishedEvent(jobId, + jobState.name(), callbackUrlTemplate, Optional.empty()); // when callbackResponderEventListener.handleJobProcessingFinishedEvent(jobProcessingFinishedEvent); @@ -59,27 +60,34 @@ void shouldCallCallbackUrlIfIsValidAndStateCompletedAndBatchProcessingFinishedEv final UUID batchId = UUID.randomUUID(); final ProcessingState orderState = ProcessingState.COMPLETED; final ProcessingState batchState = ProcessingState.COMPLETED; - final BatchProcessingFinishedEvent batchProcessingFinishedEvent = new BatchProcessingFinishedEvent(orderId, batchId, orderState, batchState, 1, callbackUrlTemplate); + final BatchProcessingFinishedEvent batchProcessingFinishedEvent = new BatchProcessingFinishedEvent(orderId, + batchId, orderState, batchState, 1, callbackUrlTemplate); // when callbackResponderEventListener.handleBatchProcessingFinishedEvent(batchProcessingFinishedEvent); - final String expectedCallbackUrl = "https://hostname.com/callback?orderId=" + orderId + "&batchId=" + batchId + "&orderState=" + orderState + "&batchState=" + batchState; + final String expectedCallbackUrl = + "https://hostname.com/callback?orderId=" + orderId + "&batchId=" + batchId + "&orderState=" + orderState + + "&batchState=" + batchState; // then verify(this.restTemplate, times(1)).getForEntity(new URI(expectedCallbackUrl), Void.class); } @Test - void shouldCallCallbackUrlIfIsValidAndStateCompletedAndBatchOrderProcessingFinishedEvent() throws URISyntaxException { + void shouldCallCallbackUrlIfIsValidAndStateCompletedAndBatchOrderProcessingFinishedEvent() + throws URISyntaxException { // given final String callbackUrlTemplate = "https://hostname.com/callback?orderId={orderId}&batchId={batchId}&orderState={orderState}&batchState={batchState}"; final UUID orderId = UUID.randomUUID(); final ProcessingState orderState = ProcessingState.COMPLETED; - final BatchOrderProcessingFinishedEvent batchOrderProcessingFinishedEvent = new BatchOrderProcessingFinishedEvent(orderId, orderState, callbackUrlTemplate); + final BatchOrderProcessingFinishedEvent batchOrderProcessingFinishedEvent = new BatchOrderProcessingFinishedEvent( + orderId, orderState, callbackUrlTemplate); // when callbackResponderEventListener.handleBatchOrderProcessingFinishedEvent(batchOrderProcessingFinishedEvent); - final String expectedCallbackUrl = "https://hostname.com/callback?orderId=" + orderId + "&batchId=" + "&orderState=" + orderState + "&batchState="; + final String expectedCallbackUrl = + "https://hostname.com/callback?orderId=" + orderId + "&batchId=" + "&orderState=" + orderState + + "&batchState="; // then verify(this.restTemplate, times(1)).getForEntity(new URI(expectedCallbackUrl), Void.class); @@ -91,8 +99,8 @@ void shouldCallCallbackUrlIfIsValidAndStateError() throws URISyntaxException { final String callbackUrlTemplate = "http://qwerty.de/{id}/{state}"; final String jobId = UUID.randomUUID().toString(); final JobState jobState = JobState.ERROR; - final JobProcessingFinishedEvent jobProcessingFinishedEvent = new JobProcessingFinishedEvent(jobId, jobState.name(), callbackUrlTemplate, - Optional.empty()); + final JobProcessingFinishedEvent jobProcessingFinishedEvent = new JobProcessingFinishedEvent(jobId, + jobState.name(), callbackUrlTemplate, Optional.empty()); // when callbackResponderEventListener.handleJobProcessingFinishedEvent(jobProcessingFinishedEvent); @@ -108,8 +116,8 @@ void shouldCallCallbackUrlIfUrlIsValidAndWithoutPlaceholders() throws URISyntaxE final String callbackUrlTemplate = "https://hostname.com/"; final String jobId = UUID.randomUUID().toString(); final JobState jobState = JobState.COMPLETED; - final JobProcessingFinishedEvent jobProcessingFinishedEvent = new JobProcessingFinishedEvent(jobId, jobState.name(), callbackUrlTemplate, - Optional.empty()); + final JobProcessingFinishedEvent jobProcessingFinishedEvent = new JobProcessingFinishedEvent(jobId, + jobState.name(), callbackUrlTemplate, Optional.empty()); // when callbackResponderEventListener.handleJobProcessingFinishedEvent(jobProcessingFinishedEvent); @@ -124,8 +132,8 @@ void shouldCallCallbackUrlIfUrlIsValidAndWithOnePlaceholder() throws URISyntaxEx // given final String callbackUrlTemplate = "https://hostname.com/callback?id={id}"; final String jobId = UUID.randomUUID().toString(); - final JobProcessingFinishedEvent jobProcessingFinishedEvent = new JobProcessingFinishedEvent(jobId, JobState.COMPLETED.name(), callbackUrlTemplate, - Optional.empty()); + final JobProcessingFinishedEvent jobProcessingFinishedEvent = new JobProcessingFinishedEvent(jobId, + JobState.COMPLETED.name(), callbackUrlTemplate, Optional.empty()); // when callbackResponderEventListener.handleJobProcessingFinishedEvent(jobProcessingFinishedEvent); @@ -139,8 +147,8 @@ void shouldCallCallbackUrlIfUrlIsValidAndWithOnePlaceholder() throws URISyntaxEx void shouldNotCallCallbackUrlIfIsNotValidAndJobProcessingFinishedEvent() { // given final String callbackUrlTemplate = "wrongCallbackUrl/id={id}"; - final JobProcessingFinishedEvent jobProcessingFinishedEvent = new JobProcessingFinishedEvent(UUID.randomUUID().toString(), JobState.COMPLETED.name(), callbackUrlTemplate, - Optional.empty()); + final JobProcessingFinishedEvent jobProcessingFinishedEvent = new JobProcessingFinishedEvent( + UUID.randomUUID().toString(), JobState.COMPLETED.name(), callbackUrlTemplate, Optional.empty()); // when callbackResponderEventListener.handleJobProcessingFinishedEvent(jobProcessingFinishedEvent); @@ -153,8 +161,9 @@ void shouldNotCallCallbackUrlIfIsNotValidAndJobProcessingFinishedEvent() { void shouldNotCallCallbackUrlIfIsNotValidAndBatchProcessingFinishedEvent() { // given final String callbackUrlTemplate = "wrongCallbackUrl/id={id}"; - final BatchProcessingFinishedEvent batchProcessingFinishedEvent = new BatchProcessingFinishedEvent(UUID.randomUUID(), UUID.randomUUID(), ProcessingState.COMPLETED, - ProcessingState.COMPLETED, 1, callbackUrlTemplate); + final BatchProcessingFinishedEvent batchProcessingFinishedEvent = new BatchProcessingFinishedEvent( + UUID.randomUUID(), UUID.randomUUID(), ProcessingState.COMPLETED, ProcessingState.COMPLETED, 1, + callbackUrlTemplate); // when callbackResponderEventListener.handleBatchProcessingFinishedEvent(batchProcessingFinishedEvent); @@ -167,7 +176,8 @@ void shouldNotCallCallbackUrlIfIsNotValidAndBatchProcessingFinishedEvent() { void shouldNotCallCallbackUrlIfIsNotValidAndBatchOrderProcessingFinishedEvent() { // given final String callbackUrlTemplate = "wrongCallbackUrl/id={id}"; - final BatchOrderProcessingFinishedEvent batchOrderProcessingFinishedEvent = new BatchOrderProcessingFinishedEvent(UUID.randomUUID(), ProcessingState.COMPLETED, callbackUrlTemplate); + final BatchOrderProcessingFinishedEvent batchOrderProcessingFinishedEvent = new BatchOrderProcessingFinishedEvent( + UUID.randomUUID(), ProcessingState.COMPLETED, callbackUrlTemplate); // when callbackResponderEventListener.handleBatchOrderProcessingFinishedEvent(batchOrderProcessingFinishedEvent); @@ -211,8 +221,8 @@ void shouldNotCallCallbackUrlIfTldIsNotValidAndStateCompletedAndJobProcessingFin void shouldNotCallCallbackUrlIfCallbackUrlIsMissing() { // given final String emptyCallbackUrlTemplate = ""; - final JobProcessingFinishedEvent jobProcessingFinishedEvent = new JobProcessingFinishedEvent(UUID.randomUUID().toString(), JobState.COMPLETED.name(), emptyCallbackUrlTemplate, - Optional.empty()); + final JobProcessingFinishedEvent jobProcessingFinishedEvent = new JobProcessingFinishedEvent( + UUID.randomUUID().toString(), JobState.COMPLETED.name(), emptyCallbackUrlTemplate, Optional.empty()); // when callbackResponderEventListener.handleJobProcessingFinishedEvent(jobProcessingFinishedEvent); @@ -220,4 +230,62 @@ void shouldNotCallCallbackUrlIfCallbackUrlIsMissing() { // then verifyNoInteractions(this.restTemplate); } + + @Test + void shouldNotSendCallbackIfAlreadyPublished() throws URISyntaxException { + final String callbackUrlTemplate = "https://hostname.com/callback?id={id}"; + final String jobId = UUID.randomUUID().toString(); + final JobProcessingFinishedEvent jobProcessingFinishedEvent = new JobProcessingFinishedEvent(jobId, + JobState.COMPLETED.toString(), callbackUrlTemplate, Optional.empty()); + + // when + callbackResponderEventListener.handleJobProcessingFinishedEvent(jobProcessingFinishedEvent); + callbackResponderEventListener.handleJobProcessingFinishedEvent(jobProcessingFinishedEvent); + final String expectedCallbackUrl = "https://hostname.com/callback?id=" + jobId; + + // then + verify(this.restTemplate, times(1)).getForEntity(new URI(expectedCallbackUrl), Void.class); + } + + @Test + void shouldNotSendBatchCallbackIfAlreadyPublished() throws URISyntaxException { + final String callbackUrlTemplate = "https://hostname.com/callback?orderId={orderId}&batchId={batchId}&orderState={orderState}&batchState={batchState}"; + final UUID orderId = UUID.randomUUID(); + final UUID batchId = UUID.randomUUID(); + final ProcessingState orderState = ProcessingState.COMPLETED; + final ProcessingState batchState = ProcessingState.COMPLETED; + final BatchProcessingFinishedEvent batchProcessingFinishedEvent = new BatchProcessingFinishedEvent(orderId, + batchId, orderState, batchState, 1, callbackUrlTemplate); + + // when + callbackResponderEventListener.handleBatchProcessingFinishedEvent(batchProcessingFinishedEvent); + callbackResponderEventListener.handleBatchProcessingFinishedEvent(batchProcessingFinishedEvent); + final String expectedCallbackUrl = + "https://hostname.com/callback?orderId=" + orderId + "&batchId=" + batchId + "&orderState=" + orderState + + "&batchState=" + batchState; + + // then + verify(this.restTemplate, times(1)).getForEntity(new URI(expectedCallbackUrl), Void.class); + } + + @Test + void shouldNotSendBatchOrderCallbackIfAlreadyPublished() + throws URISyntaxException { + // given + final String callbackUrlTemplate = "https://hostname.com/callback?orderId={orderId}&batchId={batchId}&orderState={orderState}&batchState={batchState}"; + final UUID orderId = UUID.randomUUID(); + final ProcessingState orderState = ProcessingState.COMPLETED; + final BatchOrderProcessingFinishedEvent batchOrderProcessingFinishedEvent = new BatchOrderProcessingFinishedEvent( + orderId, orderState, callbackUrlTemplate); + + // when + callbackResponderEventListener.handleBatchOrderProcessingFinishedEvent(batchOrderProcessingFinishedEvent); + callbackResponderEventListener.handleBatchOrderProcessingFinishedEvent(batchOrderProcessingFinishedEvent); + final String expectedCallbackUrl = + "https://hostname.com/callback?orderId=" + orderId + "&batchId=" + "&orderState=" + orderState + + "&batchState="; + + // then + verify(this.restTemplate, times(1)).getForEntity(new URI(expectedCallbackUrl), Void.class); + } } From 73d973f3529bb4dd8a9fe9cb3cd26569aae6d9a2 Mon Sep 17 00:00:00 2001 From: jhartmann Date: Thu, 1 Aug 2024 15:07:50 +0200 Subject: [PATCH 2/3] fix(irs-api):[#755] use inline StringUtils.isblank for better readability --- .../aaswrapper/job/CallbackResponderEventListener.java | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/irs-api/src/main/java/org/eclipse/tractusx/irs/aaswrapper/job/CallbackResponderEventListener.java b/irs-api/src/main/java/org/eclipse/tractusx/irs/aaswrapper/job/CallbackResponderEventListener.java index 9cd6320397..8ad9ec6d42 100644 --- a/irs-api/src/main/java/org/eclipse/tractusx/irs/aaswrapper/job/CallbackResponderEventListener.java +++ b/irs-api/src/main/java/org/eclipse/tractusx/irs/aaswrapper/job/CallbackResponderEventListener.java @@ -75,7 +75,7 @@ class CallbackResponderEventListener { @Async @EventListener public void handleJobProcessingFinishedEvent(final JobProcessingFinishedEvent jobProcessingFinishedEvent) { - if (noCallbackUrlRegistered(jobProcessingFinishedEvent.callbackUrl())) { + if (StringUtils.isBlank(jobProcessingFinishedEvent.callbackUrl())) { return; } log.info("Processing of job has finished - attempting to notify job requestor"); @@ -96,7 +96,7 @@ public void handleJobProcessingFinishedEvent(final JobProcessingFinishedEvent jo @Async @EventListener public void handleBatchProcessingFinishedEvent(final BatchProcessingFinishedEvent batchProcessingFinishedEvent) { - if (noCallbackUrlRegistered(batchProcessingFinishedEvent.callbackUrl())) { + if (StringUtils.isBlank(batchProcessingFinishedEvent.callbackUrl())) { return; } log.info("Processing of Batch has finished - attempting to notify requestor"); @@ -120,7 +120,7 @@ public void handleBatchProcessingFinishedEvent(final BatchProcessingFinishedEven @EventListener public void handleBatchOrderProcessingFinishedEvent( final BatchOrderProcessingFinishedEvent batchOrderProcessingFinishedEvent) { - if (noCallbackUrlRegistered(batchOrderProcessingFinishedEvent.callbackUrl())) { + if (StringUtils.isBlank(batchOrderProcessingFinishedEvent.callbackUrl())) { return; } log.info("Processing of Batch Order has finished - attempting to notify requestor"); @@ -179,10 +179,6 @@ private boolean callbackNotSentYet(final String key) { } } - private boolean noCallbackUrlRegistered(final String callbackUrl) { - return !StringUtils.isNotBlank(callbackUrl); - } - @SuppressWarnings("PMD.UseConcurrentHashMap") private URI buildCallbackUri(final String callbackUrl, final String jobId, final JobState jobState) { final Map uriVariables = new HashMap<>(); From 072a0f1d0fca27322eee9100a423a4034196625c Mon Sep 17 00:00:00 2001 From: jhartmann Date: Thu, 1 Aug 2024 15:15:23 +0200 Subject: [PATCH 3/3] fix(irs-api):[#755] removed unnecessary comparisons, added description --- .../tractusx/irs/IrsWireMockIntegrationTest.java | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/irs-api/src/test/java/org/eclipse/tractusx/irs/IrsWireMockIntegrationTest.java b/irs-api/src/test/java/org/eclipse/tractusx/irs/IrsWireMockIntegrationTest.java index f685885016..ee2cf697c4 100644 --- a/irs-api/src/test/java/org/eclipse/tractusx/irs/IrsWireMockIntegrationTest.java +++ b/irs-api/src/test/java/org/eclipse/tractusx/irs/IrsWireMockIntegrationTest.java @@ -209,8 +209,12 @@ void shouldSendOneCallbackAfterJobCompletion() { final RegisterJob request = WiremockSupport.jobRequest(globalAssetIdLevel1, TEST_BPN, 1, WiremockSupport.CALLBACK_URL); // Act - List startedJobs = new ArrayList<>(); - for (int i = 0; i < 50; i++) { + final List startedJobs = new ArrayList<>(); + + // Start 50 jobs in parallel. The bug #755 occurred when multiple (>10 Jobs) were started at the same time. + // To definitely provoke the cases where callbacks were triggered multiple times, we start 50 jobs. + final int numberOfParallelJobs = 50; + for (int i = 0; i < numberOfParallelJobs; i++) { startedJobs.add(irsService.registerItemJob(request)); } @@ -221,13 +225,6 @@ void shouldSendOneCallbackAfterJobCompletion() { // Assert for (JobHandle jobHandle : startedJobs) { - final Jobs jobForJobId = irsService.getJobForJobId(jobHandle.getId(), true); - - assertThat(jobForJobId.getJob().getState()).isEqualTo(JobState.COMPLETED); - assertThat(jobForJobId.getShells()).hasSize(2); - assertThat(jobForJobId.getRelationships()).hasSize(1); - assertThat(jobForJobId.getTombstones()).isEmpty(); - WiremockSupport.verifyCallbackCall(jobHandle.getId().toString(), JobState.COMPLETED, 1); } }