diff --git a/NEWS.md b/NEWS.md index 5b76473ec..b51e27c17 100644 --- a/NEWS.md +++ b/NEWS.md @@ -21,6 +21,7 @@ * Fix generation of IDs ranges in Reindex Upload for Subject, Classification and Contributor ([MSEARCH-907](https://folio-org.atlassian.net/browse/MSEARCH-907)) * Remove browse config caching ([MSEARCH-897](https://folio-org.atlassian.net/browse/MSEARCH-897)) * Fix the "Invalid reference" appears after updating ownership ([MSEARCH-915](https://folio-org.atlassian.net/browse/MSEARCH-915)) +* Fix an issue with interrupting the batch event processing due to SystemUserAuthorizationException ([MSEARCH-925](https://folio-org.atlassian.net/browse/MSEARCH-925)) ### Tech Dept * Recreate upload ranges each upload execution ([MSEARCH-934](https://folio-org.atlassian.net/browse/MSEARCH-934)) diff --git a/src/main/java/org/folio/search/integration/message/interceptor/PopulateInstanceBatchInterceptor.java b/src/main/java/org/folio/search/integration/message/interceptor/PopulateInstanceBatchInterceptor.java index 0bb94ba17..37351f399 100644 --- a/src/main/java/org/folio/search/integration/message/interceptor/PopulateInstanceBatchInterceptor.java +++ b/src/main/java/org/folio/search/integration/message/interceptor/PopulateInstanceBatchInterceptor.java @@ -16,6 +16,7 @@ import java.util.Map; import java.util.stream.Collectors; import java.util.stream.StreamSupport; +import lombok.extern.log4j.Log4j2; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -28,6 +29,7 @@ import org.folio.search.service.reindex.jdbc.MergeRangeRepository; import org.folio.search.service.reindex.jdbc.ReindexJdbcRepository; import org.folio.search.utils.SearchConverterUtils; +import org.folio.spring.exception.SystemUserAuthorizationException; import org.folio.spring.service.SystemUserScopedExecutionService; import org.springframework.core.Ordered; import org.springframework.core.annotation.Order; @@ -36,6 +38,7 @@ @Order(Ordered.LOWEST_PRECEDENCE) @Component +@Log4j2 public class PopulateInstanceBatchInterceptor implements BatchInterceptor { private final Map repositories; @@ -74,50 +77,33 @@ public ConsumerRecords intercept(ConsumerRecords records) { var batchByTenant = records.stream().collect(Collectors.groupingBy(ResourceEvent::getTenant)); - batchByTenant.forEach((tenant, batch) -> systemUserScopedExecutionService.executeSystemUserScoped(tenant, - () -> executionService.execute(() -> { - process(tenant, batch); - return null; - }))); - + batchByTenant.forEach((tenant, batch) -> { + try { + systemUserScopedExecutionService.executeSystemUserScoped(tenant, () -> executionService.execute(() -> { + process(tenant, batch); + return null; + })); + } catch (SystemUserAuthorizationException ex) { + log.warn("System user authorization failed. Skip processing batch for tenant {}: {}", + tenant, ex.getMessage(), ex); + } + }); } private void process(String tenant, List batch) { var recordByResource = batch.stream().collect(Collectors.groupingBy(ResourceEvent::getResourceName)); for (Map.Entry> recordCollection : recordByResource.entrySet()) { if (ResourceType.BOUND_WITH.getName().equals(recordCollection.getKey())) { - var repository = repositories.get(ReindexEntityType.INSTANCE); - for (ResourceEvent resourceEvent : recordCollection.getValue()) { - boolean bound = resourceEvent.getType() != ResourceEventType.DELETE; - var eventPayload = getEventPayload(resourceEvent); - var id = getString(eventPayload, INSTANCE_ID_FIELD); - repository.updateBoundWith(tenant, id, bound); - } + processBoundWithEvents(tenant, recordCollection); continue; } var repository = repositories.get(ReindexEntityType.fromValue(recordCollection.getKey())); if (repository != null) { - var recordByOperation = recordCollection.getValue().stream() - .filter(resourceEvent -> { - if (ResourceType.INSTANCE.getName().equals(resourceEvent.getResourceName())) { - return !startsWith(getResourceSource(resourceEvent), SOURCE_CONSORTIUM_PREFIX); - } - return true; - }) - .collect(Collectors.groupingBy(resourceEvent -> resourceEvent.getType() != ResourceEventType.DELETE)); - var resourceToSave = recordByOperation.getOrDefault(true, emptyList()).stream() - .map(SearchConverterUtils::getNewAsMap) - .toList(); - if (!resourceToSave.isEmpty()) { - repository.saveEntities(tenant, resourceToSave); - } - var idsToDrop = recordByOperation.getOrDefault(false, emptyList()).stream() - .map(ResourceEvent::getId) - .toList(); - if (!idsToDrop.isEmpty()) { - deleteEntities(tenant, recordCollection.getKey(), repository, idsToDrop); - } + var recordByOperation = getRecordByOperation(recordCollection); + saveEntities(tenant, recordByOperation, repository); + deleteEntities(tenant, recordCollection.getKey(), recordByOperation, repository); + if (ResourceType.INSTANCE.getName().equals(recordCollection.getKey())) { var noShadowCopiesInstanceEvents = recordByOperation.values().stream().flatMap(Collection::stream).toList(); instanceChildrenResourceService.persistChildren(tenant, noShadowCopiesInstanceEvents); @@ -127,11 +113,50 @@ private void process(String tenant, List batch) { } } - private void deleteEntities(String tenant, String resourceType, MergeRangeRepository repository, List ids) { - if (ResourceType.HOLDINGS.getName().equals(resourceType) || ResourceType.ITEM.getName().equals(resourceType)) { - repository.deleteEntitiesForTenant(ids, tenant); - } else { - repository.deleteEntities(ids); + private void processBoundWithEvents(String tenant, Map.Entry> recordCollection) { + var repository = repositories.get(ReindexEntityType.INSTANCE); + for (ResourceEvent resourceEvent : recordCollection.getValue()) { + boolean bound = resourceEvent.getType() != ResourceEventType.DELETE; + var eventPayload = getEventPayload(resourceEvent); + var id = getString(eventPayload, INSTANCE_ID_FIELD); + repository.updateBoundWith(tenant, id, bound); + } + } + + private Map> getRecordByOperation( + Map.Entry> recordCollection) { + + return recordCollection.getValue().stream() + .filter(resourceEvent -> { + if (ResourceType.INSTANCE.getName().equals(resourceEvent.getResourceName())) { + return !startsWith(getResourceSource(resourceEvent), SOURCE_CONSORTIUM_PREFIX); + } + return true; + }) + .collect(Collectors.groupingBy(resourceEvent -> resourceEvent.getType() != ResourceEventType.DELETE)); + } + + private void saveEntities(String tenant, Map> recordByOperation, + MergeRangeRepository repository) { + var resourceToSave = recordByOperation.getOrDefault(true, emptyList()).stream() + .map(SearchConverterUtils::getNewAsMap) + .toList(); + if (!resourceToSave.isEmpty()) { + repository.saveEntities(tenant, resourceToSave); + } + } + + private void deleteEntities(String tenant, String resourceType, + Map> recordByOperation, MergeRangeRepository repository) { + var idsToDrop = recordByOperation.getOrDefault(false, emptyList()).stream() + .map(ResourceEvent::getId) + .toList(); + if (!idsToDrop.isEmpty()) { + if (ResourceType.HOLDINGS.getName().equals(resourceType) || ResourceType.ITEM.getName().equals(resourceType)) { + repository.deleteEntitiesForTenant(idsToDrop, tenant); + } else { + repository.deleteEntities(idsToDrop); + } } } diff --git a/src/test/java/org/folio/search/integration/message/interceptor/PopulateInstanceBatchInterceptorTest.java b/src/test/java/org/folio/search/integration/message/interceptor/PopulateInstanceBatchInterceptorTest.java new file mode 100644 index 000000000..163330a0f --- /dev/null +++ b/src/test/java/org/folio/search/integration/message/interceptor/PopulateInstanceBatchInterceptorTest.java @@ -0,0 +1,100 @@ +package org.folio.search.integration.message.interceptor; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.function.Supplier; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.folio.search.domain.dto.ResourceEvent; +import org.folio.search.service.InstanceChildrenResourceService; +import org.folio.search.service.consortium.ConsortiumTenantExecutor; +import org.folio.search.service.reindex.jdbc.ItemRepository; +import org.folio.spring.exception.SystemUserAuthorizationException; +import org.folio.spring.service.SystemUserScopedExecutionService; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class PopulateInstanceBatchInterceptorTest { + + private static final String TENANT_ID = "tenantId"; + + @Mock + private ConsortiumTenantExecutor executionService; + @Mock + private SystemUserScopedExecutionService systemUserScopedExecutionService; + @Mock + private InstanceChildrenResourceService instanceChildrenResourceService; + @Mock + private ItemRepository itemRepository; + @Mock + private Consumer consumer; + + private PopulateInstanceBatchInterceptor populateInstanceBatchInterceptor; + + @BeforeEach + void setUp() { + populateInstanceBatchInterceptor = new PopulateInstanceBatchInterceptor( + List.of(itemRepository), + executionService, + systemUserScopedExecutionService, + instanceChildrenResourceService + ); + } + + @Test + void shouldHandleSystemUserAuthorizationExceptionInIntercept() { + // Arrange + var resourceEvent = new ResourceEvent().tenant(TENANT_ID).resourceName("instance"); + var consumerRecord = new ConsumerRecord<>("topic", 0, 0L, "key", resourceEvent); + var records = new ConsumerRecords<>(Map.of(new TopicPartition("topic", 0), List.of(consumerRecord))); + + doThrow(new SystemUserAuthorizationException("Authorization failed")) + .when(systemUserScopedExecutionService).executeSystemUserScoped(eq(TENANT_ID), any()); + + // Act + populateInstanceBatchInterceptor.intercept(records, consumer); + + // Assert + verify(systemUserScopedExecutionService).executeSystemUserScoped(eq(TENANT_ID), any()); + verify(executionService, never()).execute(any()); + } + + @Test + void shouldProcessRecordsSuccessfullyInIntercept() { + // Arrange + doAnswer(invocation -> { + Supplier operation = invocation.getArgument(0); + return operation.get(); + }).when(executionService).execute(any(Supplier.class)); + + doAnswer(invocation -> { + Callable action = invocation.getArgument(1); + return action.call(); + }).when(systemUserScopedExecutionService).executeSystemUserScoped(any(String.class), any(Callable.class)); + + var resourceEvent = new ResourceEvent().tenant(TENANT_ID).resourceName("instance"); + var consumerRecord = new ConsumerRecord<>("topic", 0, 0L, "key", resourceEvent); + var records = new ConsumerRecords<>(Map.of(new TopicPartition("topic", 0), List.of(consumerRecord))); + + // Act + populateInstanceBatchInterceptor.intercept(records, consumer); + + // Assert + verify(systemUserScopedExecutionService).executeSystemUserScoped(eq(TENANT_ID), any()); + verify(executionService).execute(any()); + } +}