diff --git a/application/src/ext-test/java/org/opentripplanner/ext/siri/updater/azure/AbstractAzureSiriUpdaterTest.java b/application/src/ext-test/java/org/opentripplanner/ext/siri/updater/azure/AbstractAzureSiriUpdaterTest.java new file mode 100644 index 00000000000..f4cfa8ceb80 --- /dev/null +++ b/application/src/ext-test/java/org/opentripplanner/ext/siri/updater/azure/AbstractAzureSiriUpdaterTest.java @@ -0,0 +1,393 @@ +package org.opentripplanner.ext.siri.updater.azure; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.mockito.Mockito.*; + +import com.azure.core.util.ExpandableStringEnum; +import com.azure.messaging.servicebus.ServiceBusErrorContext; +import com.azure.messaging.servicebus.ServiceBusErrorSource; +import com.azure.messaging.servicebus.ServiceBusException; +import com.azure.messaging.servicebus.ServiceBusFailureReason; +import com.azure.messaging.servicebus.ServiceBusReceivedMessageContext; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.net.URISyntaxException; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.stream.Stream; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.ArgumentCaptor; +import org.mockito.InOrder; +import org.opentripplanner.framework.io.OtpHttpClientException; + +class AbstractAzureSiriUpdaterTest { + + private SiriAzureUpdaterParameters mockConfig; + private AbstractAzureSiriUpdater updater; + private AbstractAzureSiriUpdater.CheckedRunnable task; + + @BeforeEach + public void setUp() throws Exception { + mockConfig = mock(SiriAzureUpdaterParameters.class); + when(mockConfig.configRef()).thenReturn("testConfigRef"); + when(mockConfig.getAuthenticationType()).thenReturn(AuthenticationType.SharedAccessKey); + when(mockConfig.getFullyQualifiedNamespace()).thenReturn("testNamespace"); + when(mockConfig.getServiceBusUrl()).thenReturn("testServiceBusUrl"); + when(mockConfig.getTopicName()).thenReturn("testTopic"); + when(mockConfig.getDataInitializationUrl()).thenReturn("http://testurl.com"); + when(mockConfig.getTimeout()).thenReturn(5000); + when(mockConfig.feedId()).thenReturn("testFeedId"); + when(mockConfig.getAutoDeleteOnIdle()).thenReturn(Duration.ofHours(1)); + when(mockConfig.getPrefetchCount()).thenReturn(10); + when(mockConfig.isFuzzyTripMatching()).thenReturn(true); + + // Create a spy on AbstractAzureSiriUpdater with the mock configuration + updater = spy(new AbstractAzureSiriUpdater(mockConfig) { + @Override + protected void messageConsumer(ServiceBusReceivedMessageContext messageContext) { + } + + @Override + protected void errorConsumer(ServiceBusErrorContext errorContext) { + } + + @Override + protected void initializeData(String url, + Consumer consumer + ) throws URISyntaxException { + } + }); + + task = mock(AbstractAzureSiriUpdater.CheckedRunnable.class); + } + + /** + * Tests the retry mechanism when a retryable ServiceBusException is thrown multiple times + * and checks that it follows the backoff sequence. + */ + @Test + void testExecuteWithRetry_FullBackoffSequence() throws Throwable { + final int totalRunCalls = 10; // 9 failures + 1 success + final int totalSleepCalls = 9; // 9 retries + + doNothing().when(updater).sleep(anyInt()); + + // Configure the task to throw a retryable exception for 9 attempts and then succeed + doThrow(createServiceBusException(ServiceBusFailureReason.SERVICE_BUSY)) + .doThrow(createServiceBusException(ServiceBusFailureReason.SERVICE_BUSY)) + .doThrow(createServiceBusException(ServiceBusFailureReason.SERVICE_BUSY)) + .doThrow(createServiceBusException(ServiceBusFailureReason.SERVICE_BUSY)) + .doThrow(createServiceBusException(ServiceBusFailureReason.SERVICE_BUSY)) + .doThrow(createServiceBusException(ServiceBusFailureReason.SERVICE_BUSY)) + .doThrow(createServiceBusException(ServiceBusFailureReason.SERVICE_BUSY)) + .doThrow(createServiceBusException(ServiceBusFailureReason.SERVICE_BUSY)) + .doThrow(createServiceBusException(ServiceBusFailureReason.SERVICE_BUSY)) + .doNothing() // Succeed on the 10th attempt + .when(task).run(); + + updater.executeWithRetry(task, "Test Task"); + + verify(updater, times(totalSleepCalls)).sleep(anyInt()); + + InOrder inOrder = inOrder(updater); + inOrder.verify(updater).sleep(1000); + inOrder.verify(updater).sleep(2000); + inOrder.verify(updater).sleep(4000); + inOrder.verify(updater).sleep(8000); + inOrder.verify(updater).sleep(16000); + inOrder.verify(updater).sleep(32000); + + for (int i = 0; i < 3; i++) { + inOrder.verify(updater).sleep(60000); + } + + verify(task, times(totalRunCalls)).run(); + } + + /** + * Tests the executeWithRetry method when a non-retryable exception is thrown. + * Ensures that no further retries are attempted and sleep is not called. + */ + @Test + public void testExecuteWithRetry_NonRetryableException() throws Throwable { + doNothing().when(updater).sleep(anyInt()); + + ServiceBusException serviceBusException = createServiceBusException(ServiceBusFailureReason.MESSAGE_SIZE_EXCEEDED); + + doThrow(serviceBusException).when(task).run(); + + try { + updater.executeWithRetry(task, "Test Task"); + } catch (ServiceBusException e) { + assertEquals(ServiceBusFailureReason.MESSAGE_SIZE_EXCEEDED, e.getReason(), "Exception should have reason MESSAGE_SIZE_EXCEEDED"); + } + + verify(updater, never()).sleep(anyInt()); + verify(task, times(1)).run(); + } + + /** + * Tests the executeWithRetry method when the task fails multiple times with retryable exceptions + * and then succeeds, ensuring that sleep is called the expected number of times with correct durations. + */ + @Test + public void testExecuteWithRetry_MultipleRetriesThenSuccess() throws Throwable { + final int retriesBeforeSuccess = 3; + CountDownLatch latch = new CountDownLatch(retriesBeforeSuccess); + + doThrow(createServiceBusException(ServiceBusFailureReason.SERVICE_BUSY)) + .doThrow(createServiceBusException(ServiceBusFailureReason.SERVICE_BUSY)) + .doThrow(createServiceBusException(ServiceBusFailureReason.SERVICE_BUSY)) + .doNothing() + .when(task).run(); + + doAnswer(invocation -> { + latch.countDown(); + return null; + }).when(updater).sleep(anyInt()); + + updater.executeWithRetry(task, "Test Task"); + + boolean completed = latch.await(5, TimeUnit.SECONDS); + assertTrue(completed, "Expected sleep calls were not made."); + + ArgumentCaptor sleepCaptor = ArgumentCaptor.forClass(Integer.class); + verify(updater, times(retriesBeforeSuccess)).sleep(sleepCaptor.capture()); + + var sleepDurations = sleepCaptor.getAllValues(); + long[] expectedBackoffSequence = {1000, 2000, 4000}; + + for (int i = 0; i < expectedBackoffSequence.length; i++) { + assertEquals(expectedBackoffSequence[i], Long.valueOf(sleepDurations.get(i)), + "Backoff duration mismatch at retry " + (i + 1)); + } + + verify(task, times(retriesBeforeSuccess + 1)).run(); + } + + /** + * Tests the executeWithRetry method when the task succeeds on the first attempt. + * Ensures that no sleep calls are made. + */ + @Test + public void testExecuteWithRetry_ImmediateSuccess() throws Throwable { + doNothing().when(task).run(); + doNothing().when(updater).sleep(anyInt()); + + updater.executeWithRetry(task, "Test Task"); + + verify(updater, never()).sleep(anyInt()); + verify(task, times(1)).run(); + } + + /** + * Tests the executeWithRetry method when the task fails once with a retryable exception + * and then succeeds on the first retry. + */ + @Test + public void testExecuteWithRetry_OneRetryThenSuccess() throws Throwable { + final int expectedSleepCalls = 1; + CountDownLatch latch = new CountDownLatch(expectedSleepCalls); + + doThrow(createServiceBusException(ServiceBusFailureReason.SERVICE_BUSY)) + .doNothing() + .when(task).run(); + + doAnswer(invocation -> { + if (invocation.getArgument(0).equals(1000)) { + latch.countDown(); + } + return null; + }).when(updater).sleep(anyInt()); + + updater.executeWithRetry(task, "Test Task"); + + boolean completed = latch.await(5, TimeUnit.SECONDS); + assertTrue(completed, "Expected sleep call was not made."); + + verify(updater, times(expectedSleepCalls)).sleep(1000); + verify(task, times(2)).run(); + } + + /** + * Parameterized test to verify that shouldRetry returns the correct value for each ServiceBusFailureReason. + * + * @param reason The ServiceBusFailureReason to test. + * @param expectedRetry The expected result of shouldRetry. + */ + @ParameterizedTest(name = "shouldRetry with reason {0} should return {1}") + @MethodSource("provideServiceBusFailureReasons") + @DisplayName("Test shouldRetry for all ServiceBusFailureReason values") + void testShouldRetry_ServiceBusFailureReasons(ServiceBusFailureReason reason, boolean expectedRetry) throws Exception { + ServiceBusException serviceBusException = createServiceBusException(reason); + boolean result = updater.shouldRetry(serviceBusException); + assertEquals(expectedRetry, result, "shouldRetry should return " + expectedRetry + " for reason " + reason); + } + + /** + * Test that shouldRetry returns false for non-ServiceBus exceptions. + */ + @Test + @DisplayName("shouldRetry should return false for non-ServiceBus exceptions") + public void testShouldRetry_NonServiceBusException() { + Exception genericException = new Exception("Generic exception"); + boolean result = updater.shouldRetry(genericException); + assertFalse(result, "shouldRetry should return false for non-ServiceBus exceptions"); + } + + /** + * Test that shouldRetry handles all ServiceBusFailureReason values. + * Since enums are closed, this test ensures that the parameterized tests cover all existing values. + */ + @Test + @DisplayName("shouldRetry covers all ServiceBusFailureReason values") + public void testShouldRetry_CoversAllReasons() { + long enumCount = getExpandableStringEnumValues(ServiceBusFailureReason.class).size(); + long testCaseCount = provideServiceBusFailureReasons().count(); + assertEquals(enumCount, testCaseCount, "All ServiceBusFailureReason values should be covered by tests."); + } + + @Test + void testExecuteWithRetry_InterruptedException() throws Throwable { + final int expectedRunCalls = 2; + final int expectedSleepCalls = 1; + + doThrow(createServiceBusException(ServiceBusFailureReason.SERVICE_BUSY)) + .doThrow(new InterruptedException("Sleep interrupted")) + .when(task).run(); + + doNothing().when(updater).sleep(1000); + + InterruptedException thrownException = assertThrows(InterruptedException.class, () -> { + updater.executeWithRetry(task, "Test Task"); + }, "Expected executeWithRetry to throw InterruptedException"); + + assertEquals("Sleep interrupted", thrownException.getMessage(), "Exception message should match"); + verify(updater, times(expectedSleepCalls)).sleep(1000); + verify(task, times(expectedRunCalls)).run(); + assertTrue(Thread.currentThread().isInterrupted(), "Thread should be interrupted"); + } + + @Test + void testExecuteWithRetry_OtpHttpClientException() throws Throwable { + final int retryAttempts = 3; + final int expectedSleepCalls = retryAttempts; + + doThrow(new OtpHttpClientException("could not get historical data")) + .doThrow(new OtpHttpClientException("could not get historical data")) + .doThrow(new OtpHttpClientException("could not get historical data")) + .doNothing() + .when(task).run(); + + doNothing().when(updater).sleep(anyInt()); + + updater.executeWithRetry(task, "Test Task"); + + ArgumentCaptor sleepCaptor = ArgumentCaptor.forClass(Integer.class); + verify(updater, times(expectedSleepCalls)).sleep(sleepCaptor.capture()); + + List sleepDurations = sleepCaptor.getAllValues(); + List expectedBackoffSequence = Arrays.asList(1000, 2000, 4000); + + for (int i = 0; i < retryAttempts; i++) { + assertEquals(expectedBackoffSequence.get(i), sleepDurations.get(i), + "Backoff duration mismatch at retry " + (i + 1)); + } + + verify(task, times(retryAttempts + 1)).run(); + } + + @Test + void testExecuteWithRetry_UnexpectedException() throws Throwable { + doNothing().when(updater).sleep(anyInt()); + + Exception unexpectedException = new NullPointerException("Unexpected null value"); + doThrow(unexpectedException).when(task).run(); + + Exception thrown = assertThrows(NullPointerException.class, () -> { + updater.executeWithRetry(task, "Test Task"); + }, "Expected executeWithRetry to throw NullPointerException"); + + assertEquals("Unexpected null value", thrown.getMessage(), "Exception message should match"); + verify(updater, never()).sleep(anyInt()); + verify(task, times(1)).run(); + } + + /** + * Provides test arguments for each ServiceBusFailureReason and the expected shouldRetry outcome. + * + * @return Stream of Arguments containing ServiceBusFailureReason and expected boolean. + */ + private static Stream provideServiceBusFailureReasons() { + return Stream.of( + // Retryable (Transient) Errors + Arguments.of(ServiceBusFailureReason.SERVICE_BUSY, true), + Arguments.of(ServiceBusFailureReason.SERVICE_TIMEOUT, true), + Arguments.of(ServiceBusFailureReason.SERVICE_COMMUNICATION_ERROR, true), + Arguments.of(ServiceBusFailureReason.MESSAGE_LOCK_LOST, true), + Arguments.of(ServiceBusFailureReason.SESSION_LOCK_LOST, true), + Arguments.of(ServiceBusFailureReason.SESSION_CANNOT_BE_LOCKED, true), + Arguments.of(ServiceBusFailureReason.QUOTA_EXCEEDED, true), + Arguments.of(ServiceBusFailureReason.GENERAL_ERROR, true), + Arguments.of(ServiceBusFailureReason.UNAUTHORIZED, true), + + // Non-Retryable Errors + Arguments.of(ServiceBusFailureReason.MESSAGING_ENTITY_NOT_FOUND, false), + Arguments.of(ServiceBusFailureReason.MESSAGING_ENTITY_DISABLED, false), + Arguments.of(ServiceBusFailureReason.MESSAGE_SIZE_EXCEEDED, false), + Arguments.of(ServiceBusFailureReason.MESSAGE_NOT_FOUND, false), + Arguments.of(ServiceBusFailureReason.MESSAGING_ENTITY_ALREADY_EXISTS, false) + ); + } + + /** + * Helper method to create a ServiceBusException with a specified failure reason. + * + * @param reason The ServiceBusFailureReason to set. + * @return A ServiceBusException instance with the specified reason. + */ + private ServiceBusException createServiceBusException(ServiceBusFailureReason reason) { + ServiceBusException exception = new ServiceBusException(new Throwable(), ServiceBusErrorSource.RECEIVE); + try { + Field reasonField = ServiceBusException.class.getDeclaredField("reason"); + reasonField.setAccessible(true); + reasonField.set(exception, reason); + } catch (NoSuchFieldException | IllegalAccessException e) { + throw new RuntimeException("Failed to set ServiceBusFailureReason via reflection", e); + } + return exception; + } + + /** + * Helper method to retrieve all instances of an ExpandableStringEnum subclass. + * + * @param clazz The class of the ExpandableStringEnum subclass. + * @param The type parameter extending ExpandableStringEnum. + * @return A Collection of all registered instances. + */ + private static > Collection getExpandableStringEnumValues(Class clazz) { + try { + Method valuesMethod = ExpandableStringEnum.class.getDeclaredMethod("values", Class.class); + valuesMethod.setAccessible(true); + @SuppressWarnings("unchecked") + Collection values = (Collection) valuesMethod.invoke(null, clazz); + return values; + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException("Failed to retrieve values from ExpandableStringEnum.", e); + } + } +} \ No newline at end of file diff --git a/application/src/ext/java/org/opentripplanner/ext/siri/updater/azure/AbstractAzureSiriUpdater.java b/application/src/ext/java/org/opentripplanner/ext/siri/updater/azure/AbstractAzureSiriUpdater.java index 7a1b32d36e5..545b216dd56 100644 --- a/application/src/ext/java/org/opentripplanner/ext/siri/updater/azure/AbstractAzureSiriUpdater.java +++ b/application/src/ext/java/org/opentripplanner/ext/siri/updater/azure/AbstractAzureSiriUpdater.java @@ -1,5 +1,6 @@ package org.opentripplanner.ext.siri.updater.azure; + import com.azure.identity.DefaultAzureCredentialBuilder; import com.azure.messaging.servicebus.ServiceBusClientBuilder; import com.azure.messaging.servicebus.ServiceBusErrorContext; @@ -18,11 +19,14 @@ import java.time.temporal.ChronoUnit; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.hc.client5.http.classic.methods.HttpGet; import org.opentripplanner.framework.application.ApplicationShutdownSupport; +import org.opentripplanner.framework.io.OtpHttpClientException; import org.opentripplanner.framework.io.OtpHttpClientFactory; import org.opentripplanner.updater.spi.GraphUpdater; import org.opentripplanner.updater.spi.HttpHeaders; @@ -34,6 +38,35 @@ public abstract class AbstractAzureSiriUpdater implements GraphUpdater { + /** + * custom functional interface that allows throwing checked exceptions, thereby + * preserving the exception's intent and type. + */ + @FunctionalInterface + interface CheckedRunnable { + void run() throws Exception; + } + + private static final Set RETRYABLE_REASONS = Set.of( + ServiceBusFailureReason.GENERAL_ERROR, + ServiceBusFailureReason.QUOTA_EXCEEDED, + ServiceBusFailureReason.SERVICE_BUSY, + ServiceBusFailureReason.SERVICE_COMMUNICATION_ERROR, + ServiceBusFailureReason.SERVICE_TIMEOUT, + ServiceBusFailureReason.UNAUTHORIZED, + ServiceBusFailureReason.MESSAGE_LOCK_LOST, + ServiceBusFailureReason.SESSION_LOCK_LOST, + ServiceBusFailureReason.SESSION_CANNOT_BE_LOCKED + ); + + private static final Set NON_RETRYABLE_REASONS = Set.of( + ServiceBusFailureReason.MESSAGING_ENTITY_NOT_FOUND, + ServiceBusFailureReason.MESSAGING_ENTITY_DISABLED, + ServiceBusFailureReason.MESSAGE_SIZE_EXCEEDED, + ServiceBusFailureReason.MESSAGE_NOT_FOUND, + ServiceBusFailureReason.MESSAGING_ENTITY_ALREADY_EXISTS + ); + private final Logger LOG = LoggerFactory.getLogger(getClass()); private final AuthenticationType authenticationType; private final String fullyQualifiedNamespace; @@ -64,17 +97,31 @@ public abstract class AbstractAzureSiriUpdater implements GraphUpdater { protected final int timeout; public AbstractAzureSiriUpdater(SiriAzureUpdaterParameters config) { - this.configRef = config.configRef(); - this.authenticationType = config.getAuthenticationType(); - this.fullyQualifiedNamespace = config.getFullyQualifiedNamespace(); - this.serviceBusUrl = config.getServiceBusUrl(); - this.topicName = config.getTopicName(); - this.dataInitializationUrl = config.getDataInitializationUrl(); + this.configRef = Objects.requireNonNull(config.configRef(), "configRef must not be null"); + this.authenticationType = Objects.requireNonNull(config.getAuthenticationType(), "authenticationType must not be null"); + this.topicName = Objects.requireNonNull(config.getTopicName(), "topicName must not be null"); + this.dataInitializationUrl = Objects.requireNonNull(config.getDataInitializationUrl(), "dataInitializationUrl must not be null"); this.timeout = config.getTimeout(); - this.feedId = config.feedId(); + this.feedId = Objects.requireNonNull(config.feedId(), "feedId must not be null"); this.autoDeleteOnIdle = config.getAutoDeleteOnIdle(); this.prefetchCount = config.getPrefetchCount(); this.fuzzyTripMatching = config.isFuzzyTripMatching(); + + if (authenticationType == AuthenticationType.FederatedIdentity) { + this.fullyQualifiedNamespace = Objects.requireNonNull( + config.getFullyQualifiedNamespace(), + "fullyQualifiedNamespace must not be null when using FederatedIdentity authentication" + ); + this.serviceBusUrl = null; + } else if (authenticationType == AuthenticationType.SharedAccessKey) { + this.serviceBusUrl = Objects.requireNonNull( + config.getServiceBusUrl(), + "serviceBusUrl must not be null when using SharedAccessKey authentication" + ); + this.fullyQualifiedNamespace = null; + } else { + throw new IllegalArgumentException("Unsupported authentication type: " + authenticationType); + } } /** @@ -96,10 +143,6 @@ public void setup(WriteToGraphCallback writeToGraphCallback) { @Override public void run() { - Objects.requireNonNull(topicName, "'topic' must be set"); - Objects.requireNonNull(serviceBusUrl, "'servicebus-url' must be set"); - Objects.requireNonNull(feedId, "'feedId' must be set"); - Preconditions.checkState(feedId.length() > 0, "'feedId' must be set"); // In Kubernetes this should be the POD identifier subscriptionName = System.getenv("HOSTNAME"); @@ -107,6 +150,134 @@ public void run() { subscriptionName = "otp-" + UUID.randomUUID(); } + try { + executeWithRetry( + this::setupSubscription, + "Setting up Service Bus subscription to topic" + ); + + executeWithRetry( + () -> initializeData(dataInitializationUrl, messageConsumer), + "Initializing historical Siri data" + ); + + executeWithRetry( + this::startEventProcessor, + "Starting Service Bus event processor" + ); + + setPrimed(); + + ApplicationShutdownSupport.addShutdownHook( + "azure-siri-updater-shutdown", + () -> { + LOG.info("Calling shutdownHook on AbstractAzureSiriUpdater"); + if (eventProcessor != null) { + eventProcessor.close(); + } + if (serviceBusAdmin != null) { + serviceBusAdmin.deleteSubscription(topicName, subscriptionName).block(); + LOG.info("Subscription '{}' deleted on topic '{}'.", subscriptionName, topicName); + } + } + ); + + } catch (ServiceBusException e) { + LOG.error("Service Bus encountered an error during setup: {}", e.getMessage(), e); + } catch (URISyntaxException e) { + LOG.error("Invalid URI provided for Service Bus setup: {}", e.getMessage(), e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.warn("Updater was interrupted during setup."); + } catch (Exception e) { + LOG.error("An unexpected error occurred during setup: {}", e.getMessage(), e); + } + } + + /** + * Sleeps. This is to be able to mock testing + * @param millis number of milliseconds + * @throws InterruptedException if sleep is interrupted + */ + protected void sleep(int millis) throws InterruptedException { + Thread.sleep(millis); + } + + /** + * Executes a task with retry logic. Retries indefinitely for retryable exceptions with exponential backoff. + * Does not retry for InterruptedException and propagates it + * @param task The task to execute. + * @param description A description of the task for logging purposes. + * @throws InterruptedException If the thread is interrupted while waiting between retries. + */ + protected void executeWithRetry(CheckedRunnable task, String description) throws Exception { + int sleepPeriod = 1000; // Start with 1-second delay + int attemptCounter = 1; + + while (true) { + try { + task.run(); + LOG.info("{} succeeded.", description); + return; + } catch (InterruptedException ie) { + LOG.warn("{} was interrupted during execution.", description); + Thread.currentThread().interrupt(); // Restore interrupted status + throw ie; + } catch (Exception e) { + LOG.warn("{} failed. Error: {} (Attempt {})", description, e.getMessage(), attemptCounter); + + if (!shouldRetry(e)) { + LOG.error("{} encountered a non-retryable error: {}.", description, e.getMessage()); + throw e; // Stop retries if the error is non-retryable + } + + LOG.warn("{} will retry in {} ms.", description, sleepPeriod); + attemptCounter++; + try { + sleep(sleepPeriod); + } catch (InterruptedException ie){ + LOG.warn("{} was interrupted during sleep.", description); + Thread.currentThread().interrupt(); // Restore interrupted status + throw ie; + } + sleepPeriod = Math.min(sleepPeriod * 2, 60 * 1000); // Exponential backoff with a cap at 60 seconds + } + } + } + + protected boolean shouldRetry(Exception e) { + if (e instanceof ServiceBusException sbException) { + ServiceBusFailureReason reason = sbException.getReason(); + + if (RETRYABLE_REASONS.contains(reason)) { + + LOG.warn("Transient error encountered: {}. Retrying...", reason); + return true; + + } else if (NON_RETRYABLE_REASONS.contains(reason)) { + + LOG.error("Non-recoverable error encountered: {}. Not retrying.", reason); + return false; + + } else { + LOG.warn("Unhandled ServiceBusFailureReason: {}. Retrying by default.", reason); + return true; + } + } + else if (ExceptionUtils.hasCause(e, OtpHttpClientException.class)){ + // retry for OtpHttpClientException as it is thrown if historical data can't be read at the moment + return true; + } + + LOG.warn("Non-ServiceBus exception encountered: {}. Not retrying.", e.getClass().getName()); + return false; + } + + /** + * Sets up the Service Bus subscription, including checking old subscription, deleting if necessary, + * and creating a new subscription. + */ + private void setupSubscription() throws ServiceBusException, URISyntaxException { // Client with permissions to create subscription if (authenticationType == AuthenticationType.FederatedIdentity) { serviceBusAdmin = @@ -121,40 +292,50 @@ public void run() { } // Set options - var options = new CreateSubscriptionOptions(); - options.setDefaultMessageTimeToLive(Duration.of(25, ChronoUnit.HOURS)); - // Set subscription to be deleted if idle for a certain time, so that orphaned instances doesn't linger. - options.setAutoDeleteOnIdle(autoDeleteOnIdle); + CreateSubscriptionOptions options = new CreateSubscriptionOptions() + .setDefaultMessageTimeToLive(Duration.of(25, ChronoUnit.HOURS)) + .setAutoDeleteOnIdle(autoDeleteOnIdle); // Make sure there is no old subscription on serviceBus - if ( - Boolean.TRUE.equals( - serviceBusAdmin.getSubscriptionExists(topicName, subscriptionName).block() - ) - ) { - LOG.info("Subscription {} already exists", subscriptionName); + if ( Boolean.TRUE.equals( serviceBusAdmin.getSubscriptionExists(topicName, subscriptionName).block())) { + LOG.info("Subscription '{}' already exists. Deleting existing subscription.", subscriptionName); serviceBusAdmin.deleteSubscription(topicName, subscriptionName).block(); LOG.info("Service Bus deleted subscription {}.", subscriptionName); } serviceBusAdmin.createSubscription(topicName, subscriptionName, options).block(); - LOG.info("Service Bus created subscription {}", subscriptionName); + LOG.info("{} created subscription {}", getClass().getSimpleName(), subscriptionName); + } - // Initialize historical Siri data - initializeData(); + /** + * Starts the Service Bus event processor. + */ + private void startEventProcessor() throws ServiceBusException { + ServiceBusClientBuilder clientBuilder = new ServiceBusClientBuilder(); - eventProcessor = - new ServiceBusClientBuilder() - .connectionString(serviceBusUrl) - .processor() - .topicName(topicName) - .subscriptionName(subscriptionName) - .receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE) - .disableAutoComplete() // Receive and delete does not need autocomplete - .prefetchCount(prefetchCount) - .processError(errorConsumer) - .processMessage(messageConsumer) - .buildProcessorClient(); + if (authenticationType == AuthenticationType.FederatedIdentity) { + Preconditions.checkNotNull(fullyQualifiedNamespace, "fullyQualifiedNamespace must be set for FederatedIdentity authentication"); + clientBuilder + .fullyQualifiedNamespace(fullyQualifiedNamespace) + .credential(new DefaultAzureCredentialBuilder().build()); + } else if (authenticationType == AuthenticationType.SharedAccessKey) { + Preconditions.checkNotNull(serviceBusUrl, "serviceBusUrl must be set for SharedAccessKey authentication"); + clientBuilder + .connectionString(serviceBusUrl); + } else { + throw new IllegalArgumentException("Unsupported authentication type: " + authenticationType); + } + + eventProcessor = clientBuilder + .processor() + .topicName(topicName) + .subscriptionName(subscriptionName) + .receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE) + .disableAutoComplete() // Receive and delete does not need autocomplete + .prefetchCount(prefetchCount) + .processError(errorConsumer) + .processMessage(messageConsumer) + .buildProcessorClient(); eventProcessor.start(); LOG.info( @@ -163,20 +344,9 @@ public void run() { subscriptionName, prefetchCount ); - - setPrimed(); - - ApplicationShutdownSupport.addShutdownHook( - "azure-siri-updater-shutdown", - () -> { - LOG.info("Calling shutdownHook on AbstractAzureSiriUpdater"); - eventProcessor.close(); - serviceBusAdmin.deleteSubscription(topicName, subscriptionName).block(); - LOG.info("Subscription '{}' deleted on topic '{}'.", subscriptionName, topicName); - } - ); } + @Override public boolean isPrimed() { return this.isPrimed; @@ -221,37 +391,6 @@ boolean fuzzyTripMatching() { return fuzzyTripMatching; } - /** - * InitializeData - wrapping method that calls an implementation of initialize data - and blocks readiness till finished - */ - private void initializeData() { - int sleepPeriod = 1000; - int attemptCounter = 1; - boolean otpIsShuttingDown = false; - - while (!otpIsShuttingDown) { - try { - initializeData(dataInitializationUrl, messageConsumer); - break; - } catch (Exception e) { - sleepPeriod = Math.min(sleepPeriod * 2, 60 * 1000); - - LOG.warn( - "Caught exception while initializing data will retry after {} ms - attempt {}. ({})", - sleepPeriod, - attemptCounter++, - e.toString() - ); - try { - Thread.sleep(sleepPeriod); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - otpIsShuttingDown = true; - LOG.info("OTP is shutting down, cancelling attempt to initialize Azure SIRI Updater."); - } - } - } - } protected abstract void initializeData( String url, @@ -260,7 +399,7 @@ protected abstract void initializeData( /** * Make some sensible logging on error and if Service Bus is busy, sleep for some time before try again to get messages. - * This code snippet is taken from Microsoft example https://docs.microsoft.com/sv-se/azure/service-bus-messaging/service-bus-java-how-to-use-queues. + * This code snippet is taken from Microsoft example .... * @param errorContext Context for errors handled by the ServiceBusProcessorClient. */ protected void defaultErrorConsumer(ServiceBusErrorContext errorContext) { @@ -270,19 +409,15 @@ protected void defaultErrorConsumer(ServiceBusErrorContext errorContext) { errorContext.getEntityPath() ); - if (!(errorContext.getException() instanceof ServiceBusException)) { + if (!(errorContext.getException() instanceof ServiceBusException e)) { LOG.error("Non-ServiceBusException occurred!", errorContext.getException()); return; } - var e = (ServiceBusException) errorContext.getException(); var reason = e.getReason(); - if ( - reason == ServiceBusFailureReason.MESSAGING_ENTITY_DISABLED || - reason == ServiceBusFailureReason.MESSAGING_ENTITY_NOT_FOUND || - reason == ServiceBusFailureReason.UNAUTHORIZED - ) { + if (reason == ServiceBusFailureReason.MESSAGING_ENTITY_DISABLED || + reason == ServiceBusFailureReason.MESSAGING_ENTITY_NOT_FOUND) { LOG.error( "An unrecoverable error occurred. Stopping processing with reason {} {}", reason, @@ -290,8 +425,9 @@ protected void defaultErrorConsumer(ServiceBusErrorContext errorContext) { ); } else if (reason == ServiceBusFailureReason.MESSAGE_LOCK_LOST) { LOG.error("Message lock lost for message", e); - } else if (reason == ServiceBusFailureReason.SERVICE_BUSY) { - LOG.error("Service Bus is busy, wait and try again"); + } else if (reason == ServiceBusFailureReason.SERVICE_BUSY || + reason == ServiceBusFailureReason.UNAUTHORIZED) { + LOG.error("Service Bus is busy or unauthorized, wait and try again"); try { // Choosing an arbitrary amount of time to wait until trying again. TimeUnit.SECONDS.sleep(5); diff --git a/application/src/main/java/org/opentripplanner/standalone/config/routerconfig/updaters/azure/SiriAzureUpdaterConfig.java b/application/src/main/java/org/opentripplanner/standalone/config/routerconfig/updaters/azure/SiriAzureUpdaterConfig.java index ddb6a967f92..28fb7bf9549 100644 --- a/application/src/main/java/org/opentripplanner/standalone/config/routerconfig/updaters/azure/SiriAzureUpdaterConfig.java +++ b/application/src/main/java/org/opentripplanner/standalone/config/routerconfig/updaters/azure/SiriAzureUpdaterConfig.java @@ -33,14 +33,10 @@ public static void populateConfig( .asString(null) ); parameters.setTopicName( - c.of("topic").since(V2_2).summary("Service Bus topic to connect to.").asString(null) + c.of("topic").since(V2_2).summary("Service Bus topic to connect to.").asString() ); parameters.setFeedId( - c - .of("feedId") - .since(V2_2) - .summary("The ID of the feed to apply the updates to.") - .asString(null) + c.of("feedId").since(V2_2).summary("The ID of the feed to apply the updates to.").asString() ); parameters.setAutoDeleteOnIdle( c diff --git a/doc/user/sandbox/siri/SiriAzureUpdater.md b/doc/user/sandbox/siri/SiriAzureUpdater.md index c8e7f4d9255..898e70d7b84 100644 --- a/doc/user/sandbox/siri/SiriAzureUpdater.md +++ b/doc/user/sandbox/siri/SiriAzureUpdater.md @@ -28,12 +28,12 @@ To enable the SIRI updater you need to add it to the updaters section of the `ro | [authenticationType](#u__11__authenticationType) | `enum` | Which authentication type to use | *Optional* | `"sharedaccesskey"` | 2.5 | | autoDeleteOnIdle | `duration` | The time after which an inactive subscription is removed. | *Optional* | `"PT1H"` | 2.5 | | [customMidnight](#u__11__customMidnight) | `integer` | Time on which time breaks into new day. | *Optional* | `0` | 2.2 | -| feedId | `string` | The ID of the feed to apply the updates to. | *Optional* | | 2.2 | +| feedId | `string` | The ID of the feed to apply the updates to. | *Required* | | 2.2 | | [fullyQualifiedNamespace](#u__11__fullyQualifiedNamespace) | `string` | Service Bus fully qualified namespace used for authentication. | *Optional* | | 2.5 | | fuzzyTripMatching | `boolean` | Whether to apply fuzzyTripMatching on the updates | *Optional* | `false` | 2.2 | | prefetchCount | `integer` | The number of messages to fetch from the subscription at a time. | *Optional* | `10` | 2.5 | | [servicebus-url](#u__11__servicebus_url) | `string` | Service Bus connection used for authentication. | *Optional* | | 2.2 | -| topic | `string` | Service Bus topic to connect to. | *Optional* | | 2.2 | +| topic | `string` | Service Bus topic to connect to. | *Required* | | 2.2 | | history | `object` | Configuration for fetching historical data on startup | *Optional* | | 2.2 | |    fromDateTime | `string` | Datetime boundary for historical data | *Optional* | `"-P1D"` | 2.2 | |    timeout | `integer` | Timeout in milliseconds | *Optional* | `300000` | na | @@ -116,12 +116,12 @@ Has to be present for authenticationMethod SharedAccessKey. This should be Prima | [authenticationType](#u__10__authenticationType) | `enum` | Which authentication type to use | *Optional* | `"sharedaccesskey"` | 2.5 | | autoDeleteOnIdle | `duration` | The time after which an inactive subscription is removed. | *Optional* | `"PT1H"` | 2.5 | | [customMidnight](#u__10__customMidnight) | `integer` | Time on which time breaks into new day. | *Optional* | `0` | 2.2 | -| feedId | `string` | The ID of the feed to apply the updates to. | *Optional* | | 2.2 | +| feedId | `string` | The ID of the feed to apply the updates to. | *Required* | | 2.2 | | [fullyQualifiedNamespace](#u__10__fullyQualifiedNamespace) | `string` | Service Bus fully qualified namespace used for authentication. | *Optional* | | 2.5 | | fuzzyTripMatching | `boolean` | Whether to apply fuzzyTripMatching on the updates | *Optional* | `false` | 2.2 | | prefetchCount | `integer` | The number of messages to fetch from the subscription at a time. | *Optional* | `10` | 2.5 | | [servicebus-url](#u__10__servicebus_url) | `string` | Service Bus connection used for authentication. | *Optional* | | 2.2 | -| topic | `string` | Service Bus topic to connect to. | *Optional* | | 2.2 | +| topic | `string` | Service Bus topic to connect to. | *Required* | | 2.2 | | history | `object` | Configuration for fetching historical data on startup | *Optional* | | 2.2 | |    fromDateTime | `string` | Datetime boundary for historical data. | *Optional* | `"-P1D"` | 2.2 | |    timeout | `integer` | Timeout in milliseconds | *Optional* | `300000` | na |