From 28bb9aa64779515f96b516cc9cca1632d85ac35d Mon Sep 17 00:00:00 2001 From: Aaron Ilovici Date: Tue, 8 Dec 2020 11:43:00 -0500 Subject: [PATCH] Port CDM updates to 2.26.x branch (#6434) * DDF-6408 Improve Logging for CDM (#6064) (#6122) * Update the contentDirectoryMonitor appender configuration in org.ops4j.pax.logging.cfg to match log4j2.xml (#6344) * Cdm Set Max File Size (#6358) * Fix create commit before ingest * Set max file size for in place directory monitoring to 1GB and made configurable * CDM timeout update (#6339) * DDF-6334 added logging & camel will expire messages. Also set max number of threads for CDM * Update FrameworkProducer and TransformProducer to support timeouts * Update CDM timeouts to use camel component timeouts * Add failure retry * Camel component test updates Co-authored-by: Joseph Thweatt * Cleanup * reverting test updates to get build working * Updating policy for CDM Co-authored-by: Joseph Thweatt Co-authored-by: Emily Berk Co-authored-by: Chris Lockard --- .../core/catalog-core-camelcomponent/pom.xml | 10 +- .../component/catalog/CatalogComponent.java | 14 +- .../component/catalog/CatalogEndpoint.java | 11 + .../catalog/framework/FrameworkProducer.java | 77 ++++++- .../framework/IngestTimeoutException.java | 52 +++++ .../transformer/TransformerProducer.java | 40 +++- .../TransformerTimeoutException.java | 53 +++++ .../OSGI-INF/blueprint/blueprint.xml | 14 +- .../framework/FrameworkProducerTest.java | 190 ++++++++++++++++++ .../InputTransformerProducerTest.java | 73 ++++++- .../monitor/AsyncFileAlterationObserver.java | 133 ++++++++++-- .../monitor/ContentDirectoryMonitor.java | 35 +++- .../DurableFileAlterationListener.java | 10 + .../DurableFileSystemFileConsumer.java | 2 +- .../CompletionSynchronization.java | 2 +- .../AsyncFileAlterationObserverTest.java | 63 ++++-- .../monitor/ContentDirectoryMonitorTest.java | 2 +- .../resources/etc/org.ops4j.pax.logging.cfg | 3 +- .../resources/security/configurations.policy | 2 +- 19 files changed, 717 insertions(+), 69 deletions(-) create mode 100644 catalog/core/catalog-core-camelcomponent/src/main/java/ddf/camel/component/catalog/framework/IngestTimeoutException.java create mode 100644 catalog/core/catalog-core-camelcomponent/src/main/java/ddf/camel/component/catalog/transformer/TransformerTimeoutException.java create mode 100644 catalog/core/catalog-core-camelcomponent/src/test/java/ddf/camel/component/catalog/framework/FrameworkProducerTest.java diff --git a/catalog/core/catalog-core-camelcomponent/pom.xml b/catalog/core/catalog-core-camelcomponent/pom.xml index c1bf8675d1ea..6979fa44fb22 100644 --- a/catalog/core/catalog-core-camelcomponent/pom.xml +++ b/catalog/core/catalog-core-camelcomponent/pom.xml @@ -115,6 +115,10 @@ ${tika.thirdparty.bundle.version} test + + ddf.security.core + security-core-api + @@ -196,17 +200,17 @@ INSTRUCTION COVEREDRATIO - 0.68 + 0.72 BRANCH COVEREDRATIO - 0.60 + 0.65 COMPLEXITY COVEREDRATIO - 0.53 + 0.56 diff --git a/catalog/core/catalog-core-camelcomponent/src/main/java/ddf/camel/component/catalog/CatalogComponent.java b/catalog/core/catalog-core-camelcomponent/src/main/java/ddf/camel/component/catalog/CatalogComponent.java index ec3ca7ce851e..c64f1243b47e 100644 --- a/catalog/core/catalog-core-camelcomponent/src/main/java/ddf/camel/component/catalog/CatalogComponent.java +++ b/catalog/core/catalog-core-camelcomponent/src/main/java/ddf/camel/component/catalog/CatalogComponent.java @@ -18,6 +18,7 @@ import ddf.mime.MimeTypeMapper; import ddf.mime.MimeTypeToTransformerMapper; import java.util.Map; +import java.util.concurrent.ExecutorService; import org.apache.camel.Endpoint; import org.apache.camel.support.DefaultComponent; import org.osgi.framework.BundleContext; @@ -49,6 +50,8 @@ public class CatalogComponent extends DefaultComponent { private MimeTypeMapper mimeTypeMapper; + private ExecutorService executor; + public CatalogComponent() { super(); LOGGER.debug("INSIDE CatalogComponent constructor"); @@ -74,9 +77,10 @@ protected Endpoint createEndpoint(String uri, String remaining, Mapcatalog component. * @@ -204,4 +207,12 @@ public boolean isSingleton() { public boolean isMultipleConsumersSupported() { return true; } + + public void setExecutor(ExecutorService executor) { + this.executor = executor; + } + + public ExecutorService getExecutor() { + return this.executor; + } } diff --git a/catalog/core/catalog-core-camelcomponent/src/main/java/ddf/camel/component/catalog/framework/FrameworkProducer.java b/catalog/core/catalog-core-camelcomponent/src/main/java/ddf/camel/component/catalog/framework/FrameworkProducer.java index aac1ed47a09a..8ff4d5c350d6 100644 --- a/catalog/core/catalog-core-camelcomponent/src/main/java/ddf/camel/component/catalog/framework/FrameworkProducer.java +++ b/catalog/core/catalog-core-camelcomponent/src/main/java/ddf/camel/component/catalog/framework/FrameworkProducer.java @@ -13,12 +13,14 @@ */ package ddf.camel.component.catalog.framework; +import ddf.camel.component.catalog.CatalogEndpoint; import ddf.catalog.CatalogFramework; import ddf.catalog.data.Metacard; import ddf.catalog.operation.CreateRequest; import ddf.catalog.operation.CreateResponse; import ddf.catalog.operation.DeleteRequest; import ddf.catalog.operation.DeleteResponse; +import ddf.catalog.operation.Operation; import ddf.catalog.operation.Update; import ddf.catalog.operation.UpdateRequest; import ddf.catalog.operation.UpdateResponse; @@ -27,13 +29,22 @@ import ddf.catalog.operation.impl.UpdateRequestImpl; import ddf.catalog.source.IngestException; import ddf.catalog.source.SourceUnavailableException; +import ddf.security.SecurityConstants; import java.util.ArrayList; +import java.util.Collections; import java.util.List; -import org.apache.camel.Endpoint; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import org.apache.camel.Exchange; import org.apache.camel.TypeConversionException; import org.apache.camel.support.DefaultProducer; import org.apache.commons.collections4.CollectionUtils; +import org.apache.shiro.SecurityUtils; +import org.apache.shiro.subject.Subject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -99,8 +110,12 @@ public class FrameworkProducer extends DefaultProducer { private static final String OPERATION_HEADER_KEY = "operation"; + private static final String TIMEOUT_HEADER_KEY = "timeoutMilliseconds"; + private CatalogFramework catalogFramework; + private ExecutorService executor; + private static final String CATALOG_RESPONSE_NULL = "Catalog response object is null"; /** @@ -109,13 +124,14 @@ public class FrameworkProducer extends DefaultProducer { * @param endpoint the Camel endpoint that created this consumer * @param catalogFramework the DDF Catalog Framework to use */ - public FrameworkProducer(Endpoint endpoint, CatalogFramework catalogFramework) { + public FrameworkProducer(CatalogEndpoint endpoint, CatalogFramework catalogFramework) { super(endpoint); this.catalogFramework = catalogFramework; + this.executor = endpoint.getExecutor(); } @Override - public void process(Exchange exchange) throws FrameworkProducerException { + public void process(Exchange exchange) throws FrameworkProducerException, InterruptedException { try { LOGGER.debug("Entering process method"); @@ -148,7 +164,7 @@ public void process(Exchange exchange) throws FrameworkProducerException { exchange.getIn().setBody(new ArrayList()); LOGGER.debug("Received a non-String as the operation type"); throw new FrameworkProducerException(cce); - } catch (SourceUnavailableException | IngestException e) { + } catch (SourceUnavailableException | IngestException | ExecutionException e) { LOGGER.debug("Exception cataloging metacards", e); throw new FrameworkProducerException(e); } @@ -165,7 +181,8 @@ public void process(Exchange exchange) throws FrameworkProducerException { * @throws ddf.camel.component.catalog.framework.FrameworkProducerException */ private void create(final Exchange exchange) - throws SourceUnavailableException, IngestException, FrameworkProducerException { + throws SourceUnavailableException, IngestException, FrameworkProducerException, + InterruptedException, ExecutionException { CreateResponse createResponse = null; // read in data @@ -188,7 +205,13 @@ private void create(final Exchange exchange) } LOGGER.debug("Making CREATE call to Catalog Framework..."); - createResponse = catalogFramework.create(createRequest); + Object timeoutObj = exchange.getIn().getHeader(TIMEOUT_HEADER_KEY); + if (timeoutObj != null) { + createResponse = + processWithTimeout((long) timeoutObj, catalogFramework::create, createRequest); + } else { + createResponse = catalogFramework.create(createRequest); + } if (createResponse == null) { LOGGER.debug("CreateResponse is null from catalog framework"); @@ -218,6 +241,32 @@ private void create(final Exchange exchange) processCatalogResponse(createResponse, exchange); } + R processWithTimeout( + long timeout, CatalogFunction catalog, T request) + throws InterruptedException, IngestTimeoutException, ExecutionException { + LOGGER.trace("Running catalog operation with timeout of {}", timeout); + Subject subject = SecurityUtils.getSubject(); + if (subject instanceof ddf.security.Subject) { + request + .getProperties() + .put(SecurityConstants.SECURITY_SUBJECT, (ddf.security.Subject) subject); + } + try { + List> futures = + executor.invokeAll( + Collections.singleton((Callable) () -> catalog.ingest(request)), + timeout, + TimeUnit.MILLISECONDS); + if (!futures.get(0).isDone()) { + LOGGER.warn("Ingest task was canceled due to timeout"); + throw new IngestTimeoutException("The ingest request timed out 1"); + } + return futures.get(0).get(); + } catch (CancellationException e) { + throw new IngestTimeoutException("The ingest request timed out 2"); + } + } + /** * Updates metacard(s) in the catalog using the Catalog Framework. * @@ -229,7 +278,8 @@ private void create(final Exchange exchange) * @throws ddf.camel.component.catalog.framework.FrameworkProducerException */ private void update(final Exchange exchange) - throws SourceUnavailableException, IngestException, FrameworkProducerException { + throws SourceUnavailableException, IngestException, FrameworkProducerException, + ExecutionException, InterruptedException { UpdateResponse updateResponse = null; // read in data from exchange @@ -258,7 +308,13 @@ private void update(final Exchange exchange) } LOGGER.debug("Making UPDATE call to Catalog Framework..."); - updateResponse = catalogFramework.update(updateRequest); + Object timeoutObj = exchange.getIn().getHeader(TIMEOUT_HEADER_KEY); + if (timeoutObj != null) { + updateResponse = + processWithTimeout((long) timeoutObj, catalogFramework::update, updateRequest); + } else { + updateResponse = catalogFramework.update(updateRequest); + } if (updateResponse == null) { LOGGER.debug("UpdateResponse is null from catalog framework"); @@ -534,4 +590,9 @@ private List readBodyDataAsMetacards(final Exchange exchange) { return metacardsToProcess; } + + @FunctionalInterface + public interface CatalogFunction { + R ingest(T t) throws IngestException, SourceUnavailableException; + } } diff --git a/catalog/core/catalog-core-camelcomponent/src/main/java/ddf/camel/component/catalog/framework/IngestTimeoutException.java b/catalog/core/catalog-core-camelcomponent/src/main/java/ddf/camel/component/catalog/framework/IngestTimeoutException.java new file mode 100644 index 000000000000..0d4819a72d8b --- /dev/null +++ b/catalog/core/catalog-core-camelcomponent/src/main/java/ddf/camel/component/catalog/framework/IngestTimeoutException.java @@ -0,0 +1,52 @@ +/** + * Copyright (c) Codice Foundation + * + *

This is free software: you can redistribute it and/or modify it under the terms of the GNU + * Lesser General Public License as published by the Free Software Foundation, either version 3 of + * the License, or any later version. + * + *

This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; + * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. A copy of the GNU Lesser General Public + * License is distributed along with this program and can be found at + * . + */ +package ddf.camel.component.catalog.framework; + +public class IngestTimeoutException extends FrameworkProducerException { + /** The constant serialVersionUID. */ + private static final long serialVersionUID = 1L; + + /** Instantiates a new {@code IngestTimeoutException}. */ + public IngestTimeoutException() { + super(); + } + + /** + * Instantiates a new {@code IngestTimeoutException} with the provided message. + * + * @param message the message + */ + public IngestTimeoutException(String message) { + super(message); + } + + /** + * Instantiates a new {@code IngestTimeoutException} with the provided message and throwable. + * + * @param message the message + * @param throwable the throwable + */ + public IngestTimeoutException(String message, Throwable throwable) { + super(message, throwable); + } + + /** + * Instantiates a new {@code IngestTimeoutException} with the provided throwable. + * + * @param throwable the throwable + */ + public IngestTimeoutException(Throwable throwable) { + super(throwable); + } +} diff --git a/catalog/core/catalog-core-camelcomponent/src/main/java/ddf/camel/component/catalog/transformer/TransformerProducer.java b/catalog/core/catalog-core-camelcomponent/src/main/java/ddf/camel/component/catalog/transformer/TransformerProducer.java index 6f3c01816ff4..1c2028ca7483 100644 --- a/catalog/core/catalog-core-camelcomponent/src/main/java/ddf/camel/component/catalog/transformer/TransformerProducer.java +++ b/catalog/core/catalog-core-camelcomponent/src/main/java/ddf/camel/component/catalog/transformer/TransformerProducer.java @@ -18,6 +18,14 @@ import ddf.catalog.transform.CatalogTransformerException; import ddf.mime.MimeTypeToTransformerMapper; import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import javax.activation.MimeTypeParseException; import org.apache.camel.Exchange; import org.apache.camel.Message; @@ -38,6 +46,10 @@ public abstract class TransformerProducer extends DefaultProducer { private CatalogEndpoint endpoint; + private ExecutorService executor; + + private static final String TIMEOUT_HEADER_KEY = "timeoutMilliseconds"; + /** * Constructs the {@link org.apache.camel.Producer} for the custom Camel CatalogComponent. This * producer would map to a Camel route node with a URI like catalog:inputtransformer @@ -48,6 +60,7 @@ public abstract class TransformerProducer extends DefaultProducer { public TransformerProducer(CatalogEndpoint endpoint) { super(endpoint); this.endpoint = endpoint; + this.executor = endpoint.getExecutor(); LOGGER.debug( "\"INSIDE InputTransformerProducer constructor for {}", endpoint.getTransformerId()); @@ -60,7 +73,8 @@ public TransformerProducer(CatalogEndpoint endpoint) { */ @Override public void process(Exchange exchange) - throws CatalogTransformerException, MimeTypeParseException, IOException { + throws CatalogTransformerException, MimeTypeParseException, IOException, InterruptedException, + ExecutionException { LOGGER.trace("ENTERING: process"); LOGGER.debug("exchange pattern = {}", exchange.getPattern()); @@ -99,8 +113,28 @@ public void process(Exchange exchange) Object metacard; if (mapper != null) { LOGGER.debug("Got a MimeTypeToTransformerMapper service"); - - metacard = transform(in, mimeType, transformerId, mapper); + Object timeoutObj = exchange.getIn().getHeader(TIMEOUT_HEADER_KEY); + final String type = mimeType; + final String transformId = transformerId; + if (timeoutObj != null) { + try { + List> futures = + executor.invokeAll( + Collections.singleton( + (Callable) () -> transform(in, type, transformId, mapper)), + (long) timeoutObj, + TimeUnit.MILLISECONDS); + if (!futures.get(0).isDone()) { + LOGGER.warn("Ingest task was canceled due to timeout"); + throw new TransformerTimeoutException("The ingest request timed out 1"); + } + metacard = futures.get(0).get(); + } catch (CancellationException e) { + throw new TransformerTimeoutException("The ingest request timed out 2"); + } + } else { + metacard = transform(in, mimeType, transformerId, mapper); + } } else { LOGGER.debug("Did not find a MimeTypeToTransformerMapper service"); throw new CatalogTransformerException("Did not find a MimeTypeToTransformerMapper service"); diff --git a/catalog/core/catalog-core-camelcomponent/src/main/java/ddf/camel/component/catalog/transformer/TransformerTimeoutException.java b/catalog/core/catalog-core-camelcomponent/src/main/java/ddf/camel/component/catalog/transformer/TransformerTimeoutException.java new file mode 100644 index 000000000000..c941df9e030a --- /dev/null +++ b/catalog/core/catalog-core-camelcomponent/src/main/java/ddf/camel/component/catalog/transformer/TransformerTimeoutException.java @@ -0,0 +1,53 @@ +/** + * Copyright (c) Codice Foundation + * + *

This is free software: you can redistribute it and/or modify it under the terms of the GNU + * Lesser General Public License as published by the Free Software Foundation, either version 3 of + * the License, or any later version. + * + *

This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; + * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. A copy of the GNU Lesser General Public + * License is distributed along with this program and can be found at + * . + */ +package ddf.camel.component.catalog.transformer; + +public class TransformerTimeoutException extends RuntimeException { + + /** The constant serialVersionUID. */ + private static final long serialVersionUID = 1L; + + /** Instantiates a new {@code FrameworkProducerException}. */ + public TransformerTimeoutException() { + super(); + } + + /** + * Instantiates a new {@code FrameworkProducerException} with the provided message. + * + * @param message the message + */ + public TransformerTimeoutException(String message) { + super(message); + } + + /** + * Instantiates a new {@code FrameworkProducerException} with the provided message and throwable. + * + * @param message the message + * @param throwable the throwable + */ + public TransformerTimeoutException(String message, Throwable throwable) { + super(message, throwable); + } + + /** + * Instantiates a new {@code FrameworkProducerException} with the provided throwable. + * + * @param throwable the throwable + */ + public TransformerTimeoutException(Throwable throwable) { + super(throwable); + } +} diff --git a/catalog/core/catalog-core-camelcomponent/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/catalog/core/catalog-core-camelcomponent/src/main/resources/OSGI-INF/blueprint/blueprint.xml index 72447b112c6a..07e7738addcd 100644 --- a/catalog/core/catalog-core-camelcomponent/src/main/resources/OSGI-INF/blueprint/blueprint.xml +++ b/catalog/core/catalog-core-camelcomponent/src/main/resources/OSGI-INF/blueprint/blueprint.xml @@ -30,11 +30,12 @@ - + + diff --git a/catalog/core/catalog-core-camelcomponent/src/test/java/ddf/camel/component/catalog/framework/FrameworkProducerTest.java b/catalog/core/catalog-core-camelcomponent/src/test/java/ddf/camel/component/catalog/framework/FrameworkProducerTest.java new file mode 100644 index 000000000000..a0b8917962b7 --- /dev/null +++ b/catalog/core/catalog-core-camelcomponent/src/test/java/ddf/camel/component/catalog/framework/FrameworkProducerTest.java @@ -0,0 +1,190 @@ +/** + * Copyright (c) Codice Foundation + * + *

This is free software: you can redistribute it and/or modify it under the terms of the GNU + * Lesser General Public License as published by the Free Software Foundation, either version 3 of + * the License, or any later version. + * + *

This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; + * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. A copy of the GNU Lesser General Public + * License is distributed along with this program and can be found at + * . + */ +package ddf.camel.component.catalog.framework; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import ddf.camel.component.catalog.CatalogComponent; +import ddf.camel.component.catalog.CatalogEndpoint; +import ddf.catalog.CatalogFramework; +import ddf.catalog.data.impl.MetacardImpl; +import ddf.catalog.operation.CreateRequest; +import ddf.catalog.operation.DeleteRequest; +import ddf.catalog.operation.DeleteResponse; +import ddf.catalog.operation.UpdateRequest; +import ddf.security.Subject; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.shiro.SecurityUtils; +import org.apache.shiro.mgt.SecurityManager; +import org.junit.Before; +import org.junit.Test; + +public class FrameworkProducerTest { + + private static final String CREATE_OPERATION = "CREATE"; + + private static final String UPDATE_OPERATION = "UPDATE"; + + private static final String DELETE_OPERATION = "DELETE"; + + private static final String OPERATION_HEADER_KEY = "operation"; + + private static final String TIMEOUT_HEADER_KEY = "timeoutMilliseconds"; + + CatalogEndpoint catalogEndpoint; + + List> futures; + + FrameworkProducer frameworkProducer; + + CatalogFramework catalogFramework; + + @Before + public void setup() { + SecurityUtils.setSecurityManager(mock(SecurityManager.class)); + when(SecurityUtils.getSubject()).thenReturn(mock(Subject.class)); + } + + private void setupFrameworkProducer(boolean timeout) throws Exception { + catalogFramework = mock(CatalogFramework.class); + catalogEndpoint = mock(CatalogEndpoint.class); + + CatalogComponent catalogComponent = mock(CatalogComponent.class); + when(catalogEndpoint.getComponent()).thenReturn(catalogComponent); + + futures = new ArrayList<>(); + Future future = mock(Future.class); + when(future.isDone()).thenReturn(!timeout); + futures.add(future); + + ExecutorService executorService = mock(ExecutorService.class); + when(executorService.invokeAll(any(), anyLong(), any(TimeUnit.class))) + .thenAnswer( + invocationOnMock -> { + ((Callable) ((Set) invocationOnMock.getArguments()[0]).iterator().next()).call(); + return futures; + }); + when(catalogEndpoint.getExecutor()).thenReturn(executorService); + + frameworkProducer = new FrameworkProducer(catalogEndpoint, catalogFramework); + } + + @Test + public void testFrameworkProducerCreate() throws Exception { + setupFrameworkProducer(false); + + Exchange mockExchange = mock(Exchange.class); + Message message = mock(Message.class); + + when(mockExchange.getIn()).thenReturn(message); + when(mockExchange.getOut()).thenReturn(message); + when(mockExchange.getIn().getHeader(OPERATION_HEADER_KEY)).thenReturn(CREATE_OPERATION); + when(mockExchange.getIn().getHeader(TIMEOUT_HEADER_KEY)).thenReturn(1000L); + + when(mockExchange.getIn().getBody()).thenReturn(new MetacardImpl()); + when(mockExchange.getIn().getBody(any())).thenReturn(new MetacardImpl()); + + frameworkProducer.process(mockExchange); + verify(catalogFramework).create(any(CreateRequest.class)); + } + + @Test(expected = IngestTimeoutException.class) + public void testFrameworkProducerCreateTimeout() throws Exception { + setupFrameworkProducer(true); + + Exchange mockExchange = mock(Exchange.class); + Message message = mock(Message.class); + + when(mockExchange.getIn()).thenReturn(message); + when(mockExchange.getOut()).thenReturn(message); + when(mockExchange.getIn().getHeader(OPERATION_HEADER_KEY)).thenReturn(CREATE_OPERATION); + when(mockExchange.getIn().getHeader(TIMEOUT_HEADER_KEY)).thenReturn(1000L); + + when(mockExchange.getIn().getBody()).thenReturn(new MetacardImpl()); + when(mockExchange.getIn().getBody(any())).thenReturn(new MetacardImpl()); + + frameworkProducer.process(mockExchange); + } + + @Test + public void testFrameworkProducerUpdate() throws Exception { + setupFrameworkProducer(false); + + Exchange mockExchange = mock(Exchange.class); + Message message = mock(Message.class); + + when(mockExchange.getIn()).thenReturn(message); + when(mockExchange.getOut()).thenReturn(message); + when(mockExchange.getIn().getHeader(OPERATION_HEADER_KEY)).thenReturn(UPDATE_OPERATION); + when(mockExchange.getIn().getHeader(TIMEOUT_HEADER_KEY)).thenReturn(1000L); + + when(mockExchange.getIn().getBody()).thenReturn(new MetacardImpl()); + when(mockExchange.getIn().getBody(any())).thenReturn(new MetacardImpl()); + + frameworkProducer.process(mockExchange); + verify(catalogFramework).update(any(UpdateRequest.class)); + } + + @Test(expected = IngestTimeoutException.class) + public void testFrameworkProducerUpdateTimeout() throws Exception { + setupFrameworkProducer(true); + + Exchange mockExchange = mock(Exchange.class); + Message message = mock(Message.class); + + when(mockExchange.getIn()).thenReturn(message); + when(mockExchange.getOut()).thenReturn(message); + when(mockExchange.getIn().getHeader(OPERATION_HEADER_KEY)).thenReturn(UPDATE_OPERATION); + when(mockExchange.getIn().getHeader(TIMEOUT_HEADER_KEY)).thenReturn(1000L); + + when(mockExchange.getIn().getBody()).thenReturn(new MetacardImpl()); + when(mockExchange.getIn().getBody(any())).thenReturn(new MetacardImpl()); + + frameworkProducer.process(mockExchange); + } + + @Test + public void testFrameworkProducerDelete() throws Exception { + setupFrameworkProducer(false); + + Exchange mockExchangeDelete = mock(Exchange.class); + Message message = mock(Message.class); + List ids = Collections.singletonList("metacardId"); + when(message.getBody()).thenReturn(ids); + when(message.getBody(any())).thenReturn(ids); + + when(mockExchangeDelete.getIn()).thenReturn(message); + when(mockExchangeDelete.getOut()).thenReturn(message); + when(mockExchangeDelete.getIn().getHeader(OPERATION_HEADER_KEY)).thenReturn(DELETE_OPERATION); + when(mockExchangeDelete.getIn().getHeader(TIMEOUT_HEADER_KEY)).thenReturn(1000L); + DeleteResponse response = mock(DeleteResponse.class); + when(response.getDeletedMetacards()).thenReturn(Collections.singletonList(new MetacardImpl())); + when(catalogFramework.delete(any())).thenReturn(response); + frameworkProducer.process(mockExchangeDelete); + verify(catalogFramework).delete(any(DeleteRequest.class)); + } +} diff --git a/catalog/core/catalog-core-camelcomponent/src/test/java/ddf/camel/component/catalog/inputtransformer/InputTransformerProducerTest.java b/catalog/core/catalog-core-camelcomponent/src/test/java/ddf/camel/component/catalog/inputtransformer/InputTransformerProducerTest.java index 9bee8bd7d2c6..7c4b66aa6838 100644 --- a/catalog/core/catalog-core-camelcomponent/src/test/java/ddf/camel/component/catalog/inputtransformer/InputTransformerProducerTest.java +++ b/catalog/core/catalog-core-camelcomponent/src/test/java/ddf/camel/component/catalog/inputtransformer/InputTransformerProducerTest.java @@ -17,11 +17,14 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Matchers.anyLong; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import ddf.camel.component.catalog.CatalogComponent; import ddf.camel.component.catalog.CatalogEndpoint; +import ddf.camel.component.catalog.transformer.TransformerTimeoutException; import ddf.catalog.data.Metacard; import ddf.catalog.transform.CatalogTransformerException; import ddf.catalog.transform.InputTransformer; @@ -29,8 +32,16 @@ import ddf.mime.MimeTypeToTransformerMapper; import java.io.IOException; import java.io.InputStream; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import javax.activation.MimeType; +import org.apache.camel.Exchange; import org.apache.camel.Message; import org.junit.Before; import org.junit.Test; @@ -50,23 +61,49 @@ public class InputTransformerProducerTest { private Message message; + CatalogEndpoint catalogEndpoint; + + List> futures; + private InputTransformerProducer inputTransformerProducer; private MimeTypeToTransformerMapper mimeTypeToTransformerMapper; private MimeTypeMapper mimeTypeMapper; - @Before - public void setup() throws Exception { - message = mock(Message.class); - - CatalogEndpoint catalogEndpoint = mock(CatalogEndpoint.class); + private void setupCatalogEndpoint(boolean timeout) throws Exception { + catalogEndpoint = mock(CatalogEndpoint.class); mimeTypeMapper = mock(MimeTypeMapper.class); when(catalogEndpoint.getMimeTypeMapper()).thenReturn(mimeTypeMapper); + CatalogComponent catalogComponent = mock(CatalogComponent.class); + when(catalogEndpoint.getComponent()).thenReturn(catalogComponent); + when(catalogComponent.getMimeTypeToTransformerMapper()).thenReturn(mimeTypeToTransformerMapper); + + futures = new ArrayList<>(); + Future future = mock(Future.class); + when(future.isDone()).thenReturn(!timeout); + futures.add(future); + + ExecutorService executorService = mock(ExecutorService.class); + when(executorService.invokeAll(any(), anyLong(), any(TimeUnit.class))) + .thenAnswer( + invocationOnMock -> { + ((Callable) ((Set) invocationOnMock.getArguments()[0]).iterator().next()).call(); + return futures; + }); + when(catalogEndpoint.getExecutor()).thenReturn(executorService); + inputTransformerProducer = new InputTransformerProducer(catalogEndpoint); + } + @Before + public void setup() throws Exception { + message = mock(Message.class); mimeTypeToTransformerMapper = mock(MimeTypeToTransformerMapper.class); + + setupCatalogEndpoint(false); + InputTransformer inputTransformer = mock(InputTransformer.class); when(inputTransformer.transform(any(InputStream.class))).thenReturn(mock(Metacard.class)); when(mimeTypeToTransformerMapper.findMatches(any(Class.class), any(MimeType.class))) @@ -136,4 +173,30 @@ public void testMessageInputStreamIsClosed() throws Exception { inputTransformerProducer.transform(message, "", "", mimeTypeToTransformerMapper); verify(is).close(); } + + @Test(expected = TransformerTimeoutException.class) + public void testTransformTimeout() throws Exception { + setupCatalogEndpoint(true); + + Exchange mockExchange = mock(Exchange.class); + + when(mockExchange.getIn()).thenReturn(message); + when(mockExchange.getOut()).thenReturn(message); + when(mockExchange.getOut()).thenReturn(message); + when(mockExchange.getIn().getHeader("timeoutMilliseconds")).thenReturn(1000L); + + inputTransformerProducer.process(mockExchange); + } + + @Test + public void testTransformNoTimeout() throws Exception { + Exchange mockExchange = mock(Exchange.class); + + when(mockExchange.getIn()).thenReturn(message); + when(mockExchange.getOut()).thenReturn(message); + when(mockExchange.getOut()).thenReturn(message); + + inputTransformerProducer.process(mockExchange); + verify(message).setBody(any(Metacard.class)); + } } diff --git a/catalog/core/catalog-core-directorymonitor/src/main/java/org/codice/ddf/catalog/content/monitor/AsyncFileAlterationObserver.java b/catalog/core/catalog-core-directorymonitor/src/main/java/org/codice/ddf/catalog/content/monitor/AsyncFileAlterationObserver.java index f539d393fc29..57e709b9ff41 100644 --- a/catalog/core/catalog-core-directorymonitor/src/main/java/org/codice/ddf/catalog/content/monitor/AsyncFileAlterationObserver.java +++ b/catalog/core/catalog-core-directorymonitor/src/main/java/org/codice/ddf/catalog/content/monitor/AsyncFileAlterationObserver.java @@ -18,11 +18,14 @@ import com.google.common.annotations.VisibleForTesting; import java.io.File; import java.util.Arrays; +import java.util.Date; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import javax.annotation.Nullable; import org.apache.commons.io.FileUtils; @@ -57,8 +60,14 @@ public class AsyncFileAlterationObserver { private static final Logger LOGGER = LoggerFactory.getLogger(CDM_LOGGER_NAME); - private static final int LOGGING_TIME_DELAY = 500; - private static final int LOGGING_TIME_INTERVAL = 5000; + private static final int INGEST_CHECK_TIME_DELAY = 500; + + private static final int INGEST_CHECK_TIME_INTERVAL = 5000; + + private static final String FAILURE_RETRY_PERIOD_KEY = + "org.codice.ddf.catalog.content.monitor.failureRetryPeriod"; + + private static final long DEFAULT_FAILURE_RETRY_PERIOD = TimeUnit.HOURS.toMillis(12); private final AsyncFileEntry rootFile; private AsyncFileAlterationListener listener = null; @@ -69,6 +78,10 @@ public class AsyncFileAlterationObserver { private Timer timer; + private long lastFailureRetry = new Date().getTime(); + + Map failedFiles = new ConcurrentHashMap<>(); + private boolean isProcessing = false; public AsyncFileAlterationObserver(File fileToObserve, ObjectPersistentStore serializer) { @@ -125,13 +138,15 @@ public void initialize() throws IllegalStateException { public void initializePeriodicLogging() { if (timer == null) { timer = new Timer(); - timer.scheduleAtFixedRate(new LogProcessing(), LOGGING_TIME_DELAY, LOGGING_TIME_INTERVAL); + timer.scheduleAtFixedRate( + new StatusTask(), INGEST_CHECK_TIME_DELAY, INGEST_CHECK_TIME_INTERVAL); } } public void destroy() { + serializer.store(rootFile.getName(), rootFile); rootFile.destroy(); - + LOGGER.debug("Destroying AsyncFileAlterationObserver and timer"); if (timer != null) { timer.cancel(); timer.purge(); @@ -159,8 +174,10 @@ public boolean checkAndNotify() { AsyncFileAlterationListener listenerCopy; synchronized (processingLock) { if (!processing.isEmpty()) { - LOGGER.debug( - "{} files are still processing. Waiting until the list is empty", processing.size()); + LOGGER.trace( + "{} files are still processing in {}. Waiting until the list is empty.", + processing.size(), + rootFile.getFile().getPath()); return false; } else if (isProcessing) { LOGGER.debug("Another thread is currently running, returning until next poll"); @@ -205,7 +222,9 @@ AsyncFileEntry getRootFile() { * @param entry The file entry */ private void doCreate(AsyncFileEntry entry, final AsyncFileAlterationListener listenerCopy) { - + if (failedAndNotUpdated(entry.getFile())) { + return; + } processing.add(entry); if (!entry.getFile().isDirectory()) { @@ -215,13 +234,11 @@ private void doCreate(AsyncFileEntry entry, final AsyncFileAlterationListener li } else { // Directories are always committed and added to the parent IF they // don't already exist - + commitCreate(entry, true); File[] children = listFiles(entry.getFile()); for (File child : children) { doCreate(new AsyncFileEntry(entry, child), listenerCopy); } - - commitCreate(entry, true); } } @@ -244,6 +261,7 @@ private void commitCreate(AsyncFileEntry entry, boolean success) { entry.getParent().map(AsyncFileEntry::getName).orElse("parent")); } else { LOGGER.debug("Create task failed for {}", entry.getName()); + failedFiles.put(entry.getFile().getPath(), entry); } } finally { onFinish(entry); @@ -256,10 +274,10 @@ private void commitCreate(AsyncFileEntry entry, boolean success) { * @param entry The previous file system entry */ private void doMatch(AsyncFileEntry entry, final AsyncFileAlterationListener listenerCopy) { - if (!entry.hasChanged()) { + + if (!entry.hasChanged() || !entry.getFile().exists() || failedAndNotUpdated(entry.getFile())) { return; } - processing.add(entry); LOGGER.trace("{} has changed", entry.getName()); @@ -280,12 +298,13 @@ private void doMatch(AsyncFileEntry entry, final AsyncFileAlterationListener lis */ private void commitMatch(AsyncFileEntry entry, boolean success) { try { + entry.commit(); + LOGGER.debug("{} committed", entry.getName()); if (success) { LOGGER.trace("commitMatch({},{}): Starting...", entry.getName(), success); - entry.commit(); - LOGGER.debug("{} committed", entry.getName()); } else { LOGGER.debug("Match task failed for {}", entry.getName()); + failedFiles.put(entry.getFile().getPath(), entry); } } finally { onFinish(entry); @@ -298,6 +317,12 @@ private void commitMatch(AsyncFileEntry entry, boolean success) { * @param entry The file entry */ private void doDelete(AsyncFileEntry entry, final AsyncFileAlterationListener listenerCopy) { + // There is a case where a delete would be ignored if prior to the delete there + // was a failed update. In this case a restart of the directory monitor would clear the issue. + if (failedFiles.containsKey(entry.getFile().getPath())) { + return; + } + if (!entry.isDirectory()) { processing.add(entry); listenerCopy.onFileDelete( @@ -332,12 +357,34 @@ private void commitDelete(AsyncFileEntry entry, boolean success) { entry.getParent().map(AsyncFileEntry::getName).orElse("parent")); } else { LOGGER.debug("Delete task failed for {}", entry.getName()); + failedFiles.put(entry.getFile().getPath(), entry); } } finally { onFinish(entry); } } + /** + * Verify whether {@link File} has previously failed processing time AND has not been updated + * + * @param file + * @return true if the file has failed processing before and has not been updated since + */ + private boolean failedAndNotUpdated(File file) { + if (file.isDirectory() || !failedFiles.containsKey(file.getPath())) { + return false; + } + + long lastModified = failedFiles.get(file.getPath()).getFile().lastModified(); + boolean updated = lastModified != file.lastModified(); + + if (updated) { + failedFiles.remove(file.getPath()); + } + + return !updated; + } + /** * Steps file by file comparing the snapshot state to the current state of the directory being * monitored. @@ -362,6 +409,7 @@ private void checkAndNotify( doCreate(new AsyncFileEntry(parent, files[c]), listenerCopy); c++; } + if (c < files.length && entry.compareToFile(files[c]) == 0) { doMatch(entry, listenerCopy); checkAndNotify(entry, entry.getChildren(), listFiles(files[c]), listenerCopy); @@ -418,22 +466,67 @@ private void onFinish(AsyncFileEntry entry) { synchronized (processingLock) { processing.remove(entry); if (processing.isEmpty()) { - LOGGER.debug("All files finished processing"); + LOGGER.debug("All files finished processing for {}", rootFile.getFile().getPath()); serializer.store(rootFile.getName(), rootFile); isProcessing = false; + logFailedIngests(); } } } - private class LogProcessing extends TimerTask { + private void logFailedIngests() { + if (LOGGER.isDebugEnabled()) { + if (!failedFiles.isEmpty()) { + String failedFilesStr = + failedFiles.values().stream() + .map(AsyncFileEntry::getName) + .collect(Collectors.joining(", ")); + LOGGER.debug( + "Total failed ingests {} in {}. Failed files: {}", + failedFiles.size(), + rootFile.getFile().getPath(), + failedFilesStr); + } + } + } + + @VisibleForTesting + void checkFailureRetry() { + long retry = Long.getLong(FAILURE_RETRY_PERIOD_KEY, DEFAULT_FAILURE_RETRY_PERIOD); + if (retry > 0 && lastFailureRetry + retry < new Date().getTime()) { + lastFailureRetry = new Date().getTime(); + LOGGER.info( + "Retrying failed ingests. Next retry will be in {} minutes", + TimeUnit.MILLISECONDS.toMinutes(retry)); + failedFiles.clear(); + } + } + + @VisibleForTesting + void setLastFailureRetry(long time) { + lastFailureRetry = time; + } + + /** Processing and logging operations which should be done periodically */ + private class StatusTask extends TimerTask { /** Log files still in processing at scheduled intervals */ @Override public void run() { - if (LOGGER.isDebugEnabled() && !processing.isEmpty()) { - String files = - processing.stream().map(AsyncFileEntry::getName).collect(Collectors.joining(", ")); - LOGGER.debug("{} files being processed: {}", processing.size(), files); + checkFailureRetry(); + if (!processing.isEmpty()) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "{} files being processed in '{}' directory", + processing.size(), + rootFile.getFile().getPath()); + } + if (LOGGER.isTraceEnabled()) { + logFailedIngests(); + String files = + processing.stream().map(AsyncFileEntry::getName).collect(Collectors.joining(", ")); + LOGGER.trace("Files processing in {}: {}", rootFile.getFile().getPath(), files); + } } } } diff --git a/catalog/core/catalog-core-directorymonitor/src/main/java/org/codice/ddf/catalog/content/monitor/ContentDirectoryMonitor.java b/catalog/core/catalog-core-directorymonitor/src/main/java/org/codice/ddf/catalog/content/monitor/ContentDirectoryMonitor.java index 661e8ca025c9..7b3473d2f7f6 100644 --- a/catalog/core/catalog-core-directorymonitor/src/main/java/org/codice/ddf/catalog/content/monitor/ContentDirectoryMonitor.java +++ b/catalog/core/catalog-core-directorymonitor/src/main/java/org/codice/ddf/catalog/content/monitor/ContentDirectoryMonitor.java @@ -14,6 +14,7 @@ package org.codice.ddf.catalog.content.monitor; import static ddf.catalog.Constants.CDM_LOGGER_NAME; +import static org.apache.camel.LoggingLevel.DEBUG; import ddf.catalog.Constants; import ddf.catalog.data.AttributeRegistry; @@ -33,8 +34,10 @@ import net.jodah.failsafe.RetryPolicy; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; +import org.apache.camel.Predicate; import org.apache.camel.Processor; import org.apache.camel.ServiceStatus; +import org.apache.camel.builder.PredicateBuilder; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.model.FromDefinition; import org.apache.camel.model.ModelCamelContext; @@ -71,6 +74,8 @@ public class ContentDirectoryMonitor implements DirectoryMonitor { private Security security; + private static final long MAX_FILE_SIZE = 1_073_741_824; + private final int maxRetries; private final int delayBetweenRetries; @@ -203,9 +208,8 @@ private Object configure() { *

Only remove routes that this Content Directory Monitor created since the same CamelContext * is shared across all Content Directory Monitors. */ - @SuppressWarnings( - "squid:S1172" /* The code parameter is required in blueprint-cm-1.0.7. See https://issues.apache.org/jira/browse/ARIES-1436. */) - public void destroy(int code) { + public void destroy() { + LOGGER.debug("Shutting down CDM for {}", this.monitoredDirectory); CompletableFuture.runAsync(this::removeRoutes, configurationExecutor); } @@ -213,7 +217,7 @@ private void removeRoutes() { if (routeBuilder == null) { return; } - + LOGGER.debug("CDM Removing routes"); for (RouteDefinition routeDef : routeBuilder.getRouteCollection().getRoutes()) { try { String routeId = routeDef.getId(); @@ -401,14 +405,29 @@ public void configure() throws Exception { routeDefinition.setHeader(Constants.ATTRIBUTE_OVERRIDES_KEY).constant(attributeOverrides); } - ThreadsDefinition td = routeDefinition.threads(numThreads).process(systemSubjectBinder); + ThreadsDefinition td = + routeDefinition + .threads(numThreads, numThreads) + .maxQueueSize(numThreads * 2) + .process(systemSubjectBinder); if (processingMechanism.equals(IN_PLACE)) { + String maxSize = + System.getProperty( + "org.codice.ddf.catalog.content.monitor.maxFileSizeBytes", + String.valueOf(MAX_FILE_SIZE)); + Predicate sizeLimit = simple("${file:size} < " + maxSize); + Predicate createOrUpdateOperation = + simple("${in.headers.operation} == 'CREATE' || ${in.headers.operation} == 'UPDATE'"); + td.choice() - .when( - simple( - "${in.headers.operation} == 'CREATE' || ${in.headers.operation} == 'UPDATE'")) + .when(PredicateBuilder.and(sizeLimit, createOrUpdateOperation)) .to("catalog:inputtransformer") .process(new InPlaceMetacardProcessor(attributeRegistry)) + .otherwise() + .log( + DEBUG, + CDM_LOGGER, + "Ignoring file ${file:name} with size ${file:size} because it is too big") .end() .to("catalog:framework"); } else { diff --git a/catalog/core/catalog-core-directorymonitor/src/main/java/org/codice/ddf/catalog/content/monitor/DurableFileAlterationListener.java b/catalog/core/catalog-core-directorymonitor/src/main/java/org/codice/ddf/catalog/content/monitor/DurableFileAlterationListener.java index 3bec6c36dbf6..6cbe991593a3 100644 --- a/catalog/core/catalog-core-directorymonitor/src/main/java/org/codice/ddf/catalog/content/monitor/DurableFileAlterationListener.java +++ b/catalog/core/catalog-core-directorymonitor/src/main/java/org/codice/ddf/catalog/content/monitor/DurableFileAlterationListener.java @@ -39,6 +39,8 @@ public class DurableFileAlterationListener implements AsyncFileAlterationListener, FileAlterationListener { + public static final int DEFAULT_EXPIRATION_TIME = 300_000; + private static final Logger LOGGER = LoggerFactory.getLogger(CDM_LOGGER_NAME); private static final String FILE_EXTENSION_HEADER = "org.codice.ddf.camel.FileExtension"; @@ -49,6 +51,12 @@ public class DurableFileAlterationListener private static final String CATALOG_DELETE = "DELETE"; + private static final String TIMEOUT_HEADER_KEY = "timeoutMilliseconds"; + + private long expirationTime = + Long.getLong( + "org.codice.ddf.catalog.content.monitor.expirationTime", DEFAULT_EXPIRATION_TIME); + private FileSystemPersistenceProvider productToMetacardIdMap; private AbstractDurableFileConsumer consumer; @@ -92,6 +100,7 @@ private void fileUpdate(File file, Synchronization cb) { .addHeader(OPERATION_HEADER, CATALOG_UPDATE) .addHeader(FILE_EXTENSION_HEADER, FilenameUtils.getExtension(file.getName())) .addHeader(Core.RESOURCE_URI, fileUri) + .addHeader(TIMEOUT_HEADER_KEY, expirationTime) .addHeader("org.codice.ddf.camel.transformer.MetacardUpdateId", metacardId) .addSynchronization( new FileToMetacardMappingSynchronization(fileUri, productToMetacardIdMap)) @@ -114,6 +123,7 @@ private void fileCreate(File file, Synchronization cb) { .addHeader(OPERATION_HEADER, "CREATE") .addHeader(FILE_EXTENSION_HEADER, FilenameUtils.getExtension(file.getName())) .addHeader(Core.RESOURCE_URI, fileUri) + .addHeader(TIMEOUT_HEADER_KEY, expirationTime) .addSynchronization( new FileToMetacardMappingSynchronization(fileUri, productToMetacardIdMap)) .addSynchronization(cb) diff --git a/catalog/core/catalog-core-directorymonitor/src/main/java/org/codice/ddf/catalog/content/monitor/DurableFileSystemFileConsumer.java b/catalog/core/catalog-core-directorymonitor/src/main/java/org/codice/ddf/catalog/content/monitor/DurableFileSystemFileConsumer.java index c26ebd29e7f1..95ac971bc1d7 100644 --- a/catalog/core/catalog-core-directorymonitor/src/main/java/org/codice/ddf/catalog/content/monitor/DurableFileSystemFileConsumer.java +++ b/catalog/core/catalog-core-directorymonitor/src/main/java/org/codice/ddf/catalog/content/monitor/DurableFileSystemFileConsumer.java @@ -65,7 +65,6 @@ protected void initialize(String fileName) { } if (observer == null && fileName != null) { - observer = AsyncFileAlterationObserver.load(new File(fileName), jsonSerializer); // Backwards Compatibility @@ -108,6 +107,7 @@ private AsyncFileAlterationObserver backwardsCompatibility(String fileName) { @Override public void shutdown() { + LOGGER.debug("Shutting down DurableFileSystemFileConsumer and listener/observer"); super.shutdown(); listener.destroy(); diff --git a/catalog/core/catalog-core-directorymonitor/src/main/java/org/codice/ddf/catalog/content/monitor/synchronizations/CompletionSynchronization.java b/catalog/core/catalog-core-directorymonitor/src/main/java/org/codice/ddf/catalog/content/monitor/synchronizations/CompletionSynchronization.java index 0157a8af84d4..49c502f98253 100644 --- a/catalog/core/catalog-core-directorymonitor/src/main/java/org/codice/ddf/catalog/content/monitor/synchronizations/CompletionSynchronization.java +++ b/catalog/core/catalog-core-directorymonitor/src/main/java/org/codice/ddf/catalog/content/monitor/synchronizations/CompletionSynchronization.java @@ -53,7 +53,7 @@ public final void onFailure(Exchange exchange) { "a network error occurred, The file [{}] failed to process", asyncFileEntry.getName()); } else if (exchange != null && exchange.getException() != null) { LOGGER.debug( - "Exchange {} failed synchronization on exception", + "Exchange {} failed synchronization on exception: {}", exchange.getExchangeId(), exchange.getException()); } diff --git a/catalog/core/catalog-core-directorymonitor/src/test/java/org/codice/ddf/catalog/content/monitor/AsyncFileAlterationObserverTest.java b/catalog/core/catalog-core-directorymonitor/src/test/java/org/codice/ddf/catalog/content/monitor/AsyncFileAlterationObserverTest.java index 8f4c2964de1d..62fdd3ef047b 100644 --- a/catalog/core/catalog-core-directorymonitor/src/test/java/org/codice/ddf/catalog/content/monitor/AsyncFileAlterationObserverTest.java +++ b/catalog/core/catalog-core-directorymonitor/src/test/java/org/codice/ddf/catalog/content/monitor/AsyncFileAlterationObserverTest.java @@ -29,6 +29,7 @@ import java.io.File; import java.io.IOException; import java.nio.charset.Charset; +import java.util.Date; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -37,6 +38,7 @@ import java.util.stream.Stream; import org.apache.camel.spi.Synchronization; import org.apache.commons.io.FileUtils; +import org.codice.junit.rules.RestoreSystemProperties; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -50,6 +52,9 @@ @RunWith(JUnit4.class) public class AsyncFileAlterationObserverTest { + @Rule + public final RestoreSystemProperties restoreSystemProperties = new RestoreSystemProperties(); + private static String dummyData = "The duck may swim on the lake..."; private static String changedData = "the duck."; @@ -235,8 +240,7 @@ public void testFileCreation() throws Exception { public void testCreationFailure() throws Exception { File[] files = initFiles(1, monitoredDirectory, "file00"); - int toFail = 2; - timesToFail.set(toFail); + timesToFail.set(files.length); observer.checkAndNotify(); observer.checkAndNotify(); @@ -244,11 +248,11 @@ public void testCreationFailure() throws Exception { observer.checkAndNotify(); observer.checkAndNotify(); - verify(fileListener, times(files.length + failures)) + verify(fileListener, times(files.length)) .onFileCreate(any(File.class), any(Synchronization.class)); verify(fileListener, never()).onFileChange(any(File.class), any(Synchronization.class)); verify(fileListener, never()).onFileDelete(any(File.class), any(Synchronization.class)); - assertThat(failures, is(toFail)); + assertThat(failures, is(files.length)); } @Test @@ -290,7 +294,7 @@ public void testOneChangeWithError() throws Exception { observer.checkAndNotify(); init(); - int toFail = 2; + int toFail = 1; timesToFail.set(toFail); Stream.of(files).forEach(this::changeData); @@ -299,7 +303,7 @@ public void testOneChangeWithError() throws Exception { } verify(fileListener, never()).onFileCreate(any(File.class), any(Synchronization.class)); - verify(fileListener, times(files.length + failures)) + verify(fileListener, times(files.length)) .onFileChange(any(File.class), any(Synchronization.class)); verify(fileListener, never()).onFileDelete(any(File.class), any(Synchronization.class)); assertThat(failures, is(toFail)); @@ -347,9 +351,9 @@ public void testFileDeleteWithError() throws Exception { verify(fileListener, never()).onFileCreate(any(File.class), any(Synchronization.class)); verify(fileListener, never()).onFileChange(any(File.class), any(Synchronization.class)); - verify(fileListener, times(files.length + failures)) + verify(fileListener, times(files.length)) .onFileDelete(any(File.class), any(Synchronization.class)); - assertThat(failures, is(toFail)); + assertThat(failures, is(files.length)); } @Test @@ -656,7 +660,7 @@ public void testCreateNestedDirectoryWithErrors() throws Exception { observer.checkAndNotify(); } - int totalNoFiles = childFiles.length + grandchildFiles.length + files.length + failures; + int totalNoFiles = childFiles.length + grandchildFiles.length + files.length; observer.checkAndNotify(); @@ -892,7 +896,7 @@ public void testMovingDirectoryWithErrorsAndDelays() throws Exception { Mockito.verify(fileListener, atLeast(0)).onFileCreate(propertyKeyCaptor.capture(), any()); Mockito.verify(fileListener, atLeast(0)).onFileDelete(propertyKeyCaptor.capture(), any()); - assertThat(propertyKeyCaptor.getAllValues().size(), is(grandchildFiles.length * 2 + failures)); + assertThat(propertyKeyCaptor.getAllValues().size(), is(grandchildFiles.length * 2)); assertThat(failures, is(toFail)); @@ -939,7 +943,7 @@ public void testCreateWithErrors() throws Exception { // Just in case there was a straggler who was unable to successfully finish observer.checkAndNotify(); - verify(fileListener, times(files.length + failures)) + verify(fileListener, times(files.length)) .onFileCreate(any(File.class), any(Synchronization.class)); verify(fileListener, never()).onFileChange(any(File.class), any(Synchronization.class)); verify(fileListener, never()).onFileDelete(any(File.class), any(Synchronization.class)); @@ -969,7 +973,7 @@ public void testDeleteWithErrors() throws Exception { verify(fileListener, never()).onFileCreate(any(File.class), any(Synchronization.class)); verify(fileListener, never()).onFileChange(any(File.class), any(Synchronization.class)); - verify(fileListener, times(files.length + failures)) + verify(fileListener, times(files.length)) .onFileDelete(any(File.class), any(Synchronization.class)); assertThat(failures, is(files.length)); } @@ -1042,7 +1046,7 @@ public void contentMonitorTest() throws Exception { delayLatch.await(timeout, TimeUnit.MILLISECONDS); assertThat(failures, is(toFail)); - verify(fileListener, times(totalSize + failures)) + verify(fileListener, times(totalSize)) .onFileCreate(any(File.class), any(Synchronization.class)); verify(fileListener, never()).onFileChange(any(File.class), any(Synchronization.class)); verify(fileListener, never()).onFileDelete(any(File.class), any(Synchronization.class)); @@ -1066,7 +1070,7 @@ public void contentMonitorTest() throws Exception { assertThat(failures, is(files.length)); verify(fileListener, never()).onFileCreate(any(File.class), any(Synchronization.class)); - verify(fileListener, times(files.length + failures)) + verify(fileListener, times(files.length)) .onFileChange(any(File.class), any(Synchronization.class)); verify(fileListener, never()).onFileDelete(any(File.class), any(Synchronization.class)); assertThat(failures, is(files.length)); @@ -1205,6 +1209,37 @@ public void testloadNull() { AsyncFileAlterationObserver.load(new File("File"), null); } + @Test + public void testCheckFailureRetry() { + System.setProperty( + "org.codice.ddf.catalog.content.monitor.failureRetryPeriod", + "" + TimeUnit.MINUTES.toMillis(5)); + observer.failedFiles.put("key", new AsyncFileEntry(new File("."))); + observer.setLastFailureRetry(new Date().getTime() - TimeUnit.MINUTES.toMillis(6)); + observer.checkFailureRetry(); + assertThat(observer.failedFiles.size(), is(0)); + } + + @Test + public void testCheckFailureRetryNever() { + System.setProperty("org.codice.ddf.catalog.content.monitor.failureRetryPeriod", "-1"); + observer.failedFiles.put("key", new AsyncFileEntry(new File("."))); + observer.setLastFailureRetry(new Date().getTime() - TimeUnit.MINUTES.toMillis(6)); + observer.checkFailureRetry(); + assertThat(observer.failedFiles.size(), is(1)); + } + + @Test + public void testCheckFailureRetryDefault() { + observer.failedFiles.put("key", new AsyncFileEntry(new File("."))); + observer.setLastFailureRetry(new Date().getTime() - TimeUnit.MINUTES.toMillis(60 * 11 + 55)); + observer.checkFailureRetry(); + assertThat(observer.failedFiles.size(), is(1)); + observer.setLastFailureRetry(new Date().getTime() - TimeUnit.MINUTES.toMillis(60 * 12 + 5)); + observer.checkFailureRetry(); + assertThat(observer.failedFiles.size(), is(0)); + } + private void initNestedDirectory(int child, int grand, int topLevel, int gSibling) throws Exception { childDir = new File(monitoredDirectory, "child001"); diff --git a/catalog/core/catalog-core-directorymonitor/src/test/java/org/codice/ddf/catalog/content/monitor/ContentDirectoryMonitorTest.java b/catalog/core/catalog-core-directorymonitor/src/test/java/org/codice/ddf/catalog/content/monitor/ContentDirectoryMonitorTest.java index 0a8574fe5a5e..5951dc1d1a5e 100644 --- a/catalog/core/catalog-core-directorymonitor/src/test/java/org/codice/ddf/catalog/content/monitor/ContentDirectoryMonitorTest.java +++ b/catalog/core/catalog-core-directorymonitor/src/test/java/org/codice/ddf/catalog/content/monitor/ContentDirectoryMonitorTest.java @@ -99,7 +99,7 @@ public void setup() throws Exception { @After public void destroy() throws Exception { - monitor.destroy(0); + monitor.destroy(); camelContext.stop(); } diff --git a/distribution/ddf-common/src/main/resources/etc/org.ops4j.pax.logging.cfg b/distribution/ddf-common/src/main/resources/etc/org.ops4j.pax.logging.cfg index a42cdc33f9f2..53b9a6c5033e 100644 --- a/distribution/ddf-common/src/main/resources/etc/org.ops4j.pax.logging.cfg +++ b/distribution/ddf-common/src/main/resources/etc/org.ops4j.pax.logging.cfg @@ -180,9 +180,8 @@ log4j2.appender.contentDirectoryMonitor.name = cdm log4j2.appender.contentDirectoryMonitor.fileName = ${karaf.log}/cdm.log log4j2.appender.contentDirectoryMonitor.filePattern = ${karaf.log}/cdm.log-%d{yyyy-MM-dd-HH}-%i.log.gz log4j2.appender.contentDirectoryMonitor.append = true -log4j2.appender.contentDirectoryMonitor.ignoreExceptions = false log4j2.appender.contentDirectoryMonitor.layout.type = PatternLayout -log4j2.appender.contentDirectoryMonitor.layout.pattern = [%-5p] %d{ISO8601} | %-16.16t | %-15.20c{1} | %m%n +log4j2.appender.contentDirectoryMonitor.layout.pattern = ${log4j2.pattern} log4j2.appender.contentDirectoryMonitor.policies.type = Policies log4j2.appender.contentDirectoryMonitor.policies.size.type = SizeBasedTriggeringPolicy log4j2.appender.contentDirectoryMonitor.policies.size.size = 20MB diff --git a/distribution/ddf-common/src/main/resources/security/configurations.policy b/distribution/ddf-common/src/main/resources/security/configurations.policy index 876b96a297dd..8e8e2e380d11 100644 --- a/distribution/ddf-common/src/main/resources/security/configurations.policy +++ b/distribution/ddf-common/src/main/resources/security/configurations.policy @@ -56,7 +56,7 @@ priority "grant"; // to be monitored must ALSO be added to the URL Resource Reader section of this file. // // DO NOT MODIFY THE NEXT LINE. It specifies which modules are granted permission. -grant codeBase "file:/catalog-core-directorymonitor/org.apache.camel.camel-core/org.apache.camel.camel-blueprint/catalog-core-camelcomponent/catalog-core-urlresourcereader/com.google.guava/catalog-core-standardframework/org.apache.tika.core" { +grant codeBase "file:/catalog-core-directorymonitor/org.apache.camel.camel-core/org.apache.camel.camel-blueprint/catalog-core-camelcomponent/catalog-core-urlresourcereader/com.google.guava/catalog-core-standardframework/org.apache.tika.core/platform-util/org.apache.camel.camel-api/org.apache.camel.camel-base/org.apache.camel.camel-file/org.apache.camel.camel-support/org.apache.camel.camel-util/org.apache.camel.karaf.camel-core-osgi" { // EXAMPLE. The two lines that begin with "permission" enable the Content Directory