Skip to content

Commit

Permalink
Port CDM updates to 2.26.x branch (#6434)
Browse files Browse the repository at this point in the history
* DDF-6408 Improve Logging for CDM  (#6064) (#6122)

* Update the contentDirectoryMonitor appender configuration in org.ops4j.pax.logging.cfg to match log4j2.xml (#6344)

* Cdm Set Max File Size (#6358)

* Fix create commit before ingest

* Set max file size for in place directory monitoring to 1GB and made configurable

* CDM timeout update (#6339)

* DDF-6334 added logging & camel will expire messages. Also set max number of threads for CDM

* Update FrameworkProducer and TransformProducer to support timeouts

* Update CDM timeouts to use camel component timeouts

* Add failure retry

* Camel component test updates

Co-authored-by: Joseph Thweatt <joseph.thweatt@connexta.com>

* Cleanup

* reverting test updates to get build working

* Updating policy for CDM

Co-authored-by: Joseph Thweatt <joseph.thweatt@connexta.com>
Co-authored-by: Emily Berk <emmberk@users.noreply.github.com>
Co-authored-by: Chris Lockard <chris.lockard@connexta.com>
  • Loading branch information
4 people authored Dec 8, 2020
1 parent faa1716 commit 28bb9aa
Show file tree
Hide file tree
Showing 19 changed files with 717 additions and 69 deletions.
10 changes: 7 additions & 3 deletions catalog/core/catalog-core-camelcomponent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@
<version>${tika.thirdparty.bundle.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ddf.security.core</groupId>
<artifactId>security-core-api</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down Expand Up @@ -196,17 +200,17 @@
<limit implementation="org.codice.jacoco.LenientLimit">
<counter>INSTRUCTION</counter>
<value>COVEREDRATIO</value>
<minimum>0.68</minimum>
<minimum>0.72</minimum>
</limit>
<limit implementation="org.codice.jacoco.LenientLimit">
<counter>BRANCH</counter>
<value>COVEREDRATIO</value>
<minimum>0.60</minimum>
<minimum>0.65</minimum>
</limit>
<limit implementation="org.codice.jacoco.LenientLimit">
<counter>COMPLEXITY</counter>
<value>COVEREDRATIO</value>
<minimum>0.53</minimum>
<minimum>0.56</minimum>
</limit>
</limits>
</rule>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import ddf.mime.MimeTypeMapper;
import ddf.mime.MimeTypeToTransformerMapper;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import org.apache.camel.Endpoint;
import org.apache.camel.support.DefaultComponent;
import org.osgi.framework.BundleContext;
Expand Down Expand Up @@ -49,6 +50,8 @@ public class CatalogComponent extends DefaultComponent {

private MimeTypeMapper mimeTypeMapper;

private ExecutorService executor;

public CatalogComponent() {
super();
LOGGER.debug("INSIDE CatalogComponent constructor");
Expand All @@ -74,9 +77,10 @@ protected Endpoint createEndpoint(String uri, String remaining, Map<String, Obje

LOGGER.debug("transformerId = {}", transformerId);

Endpoint endpoint =
CatalogEndpoint endpoint =
new CatalogEndpoint(
uri, this, transformerId, mimeType, remaining, catalogFramework, mimeTypeMapper);
endpoint.setExecutor(executor);
try {
setProperties(endpoint, parameters);
} catch (Exception e) {
Expand Down Expand Up @@ -138,4 +142,12 @@ public void setCatalogFramework(CatalogFramework catalogFramework) {
public void setMimeTypeMapper(MimeTypeMapper mimeTypeMapper) {
this.mimeTypeMapper = mimeTypeMapper;
}

public void setExecutor(ExecutorService executor) {
this.executor = executor;
}

public void destroy() {
executor.shutdownNow();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import ddf.camel.component.catalog.queryresponsetransformer.QueryResponseTransformerProducer;
import ddf.catalog.CatalogFramework;
import ddf.mime.MimeTypeMapper;
import java.util.concurrent.ExecutorService;
import org.apache.camel.Consumer;
import org.apache.camel.ExchangePattern;
import org.apache.camel.MultipleConsumersSupport;
Expand Down Expand Up @@ -60,6 +61,8 @@ public class CatalogEndpoint extends DefaultEndpoint implements MultipleConsumer

private MimeTypeMapper mimeTypeMapper;

private ExecutorService executor;

/**
* Constructs a CatalogEndpoint for the specified custom <code>catalog</code> component.
*
Expand Down Expand Up @@ -204,4 +207,12 @@ public boolean isSingleton() {
public boolean isMultipleConsumersSupported() {
return true;
}

public void setExecutor(ExecutorService executor) {
this.executor = executor;
}

public ExecutorService getExecutor() {
return this.executor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
*/
package ddf.camel.component.catalog.framework;

import ddf.camel.component.catalog.CatalogEndpoint;
import ddf.catalog.CatalogFramework;
import ddf.catalog.data.Metacard;
import ddf.catalog.operation.CreateRequest;
import ddf.catalog.operation.CreateResponse;
import ddf.catalog.operation.DeleteRequest;
import ddf.catalog.operation.DeleteResponse;
import ddf.catalog.operation.Operation;
import ddf.catalog.operation.Update;
import ddf.catalog.operation.UpdateRequest;
import ddf.catalog.operation.UpdateResponse;
Expand All @@ -27,13 +29,22 @@
import ddf.catalog.operation.impl.UpdateRequestImpl;
import ddf.catalog.source.IngestException;
import ddf.catalog.source.SourceUnavailableException;
import ddf.security.SecurityConstants;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.camel.Endpoint;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.camel.Exchange;
import org.apache.camel.TypeConversionException;
import org.apache.camel.support.DefaultProducer;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.shiro.SecurityUtils;
import org.apache.shiro.subject.Subject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -99,8 +110,12 @@ public class FrameworkProducer extends DefaultProducer {

private static final String OPERATION_HEADER_KEY = "operation";

private static final String TIMEOUT_HEADER_KEY = "timeoutMilliseconds";

private CatalogFramework catalogFramework;

private ExecutorService executor;

private static final String CATALOG_RESPONSE_NULL = "Catalog response object is null";

/**
Expand All @@ -109,13 +124,14 @@ public class FrameworkProducer extends DefaultProducer {
* @param endpoint the Camel endpoint that created this consumer
* @param catalogFramework the DDF Catalog Framework to use
*/
public FrameworkProducer(Endpoint endpoint, CatalogFramework catalogFramework) {
public FrameworkProducer(CatalogEndpoint endpoint, CatalogFramework catalogFramework) {
super(endpoint);
this.catalogFramework = catalogFramework;
this.executor = endpoint.getExecutor();
}

@Override
public void process(Exchange exchange) throws FrameworkProducerException {
public void process(Exchange exchange) throws FrameworkProducerException, InterruptedException {
try {
LOGGER.debug("Entering process method");

Expand Down Expand Up @@ -148,7 +164,7 @@ public void process(Exchange exchange) throws FrameworkProducerException {
exchange.getIn().setBody(new ArrayList<Metacard>());
LOGGER.debug("Received a non-String as the operation type");
throw new FrameworkProducerException(cce);
} catch (SourceUnavailableException | IngestException e) {
} catch (SourceUnavailableException | IngestException | ExecutionException e) {
LOGGER.debug("Exception cataloging metacards", e);
throw new FrameworkProducerException(e);
}
Expand All @@ -165,7 +181,8 @@ public void process(Exchange exchange) throws FrameworkProducerException {
* @throws ddf.camel.component.catalog.framework.FrameworkProducerException
*/
private void create(final Exchange exchange)
throws SourceUnavailableException, IngestException, FrameworkProducerException {
throws SourceUnavailableException, IngestException, FrameworkProducerException,
InterruptedException, ExecutionException {
CreateResponse createResponse = null;

// read in data
Expand All @@ -188,7 +205,13 @@ private void create(final Exchange exchange)
}

LOGGER.debug("Making CREATE call to Catalog Framework...");
createResponse = catalogFramework.create(createRequest);
Object timeoutObj = exchange.getIn().getHeader(TIMEOUT_HEADER_KEY);
if (timeoutObj != null) {
createResponse =
processWithTimeout((long) timeoutObj, catalogFramework::create, createRequest);
} else {
createResponse = catalogFramework.create(createRequest);
}

if (createResponse == null) {
LOGGER.debug("CreateResponse is null from catalog framework");
Expand Down Expand Up @@ -218,6 +241,32 @@ private void create(final Exchange exchange)
processCatalogResponse(createResponse, exchange);
}

<T extends Operation, R> R processWithTimeout(
long timeout, CatalogFunction<T, R> catalog, T request)
throws InterruptedException, IngestTimeoutException, ExecutionException {
LOGGER.trace("Running catalog operation with timeout of {}", timeout);
Subject subject = SecurityUtils.getSubject();
if (subject instanceof ddf.security.Subject) {
request
.getProperties()
.put(SecurityConstants.SECURITY_SUBJECT, (ddf.security.Subject) subject);
}
try {
List<Future<R>> futures =
executor.invokeAll(
Collections.singleton((Callable<R>) () -> catalog.ingest(request)),
timeout,
TimeUnit.MILLISECONDS);
if (!futures.get(0).isDone()) {
LOGGER.warn("Ingest task was canceled due to timeout");
throw new IngestTimeoutException("The ingest request timed out 1");
}
return futures.get(0).get();
} catch (CancellationException e) {
throw new IngestTimeoutException("The ingest request timed out 2");
}
}

/**
* Updates metacard(s) in the catalog using the Catalog Framework.
*
Expand All @@ -229,7 +278,8 @@ private void create(final Exchange exchange)
* @throws ddf.camel.component.catalog.framework.FrameworkProducerException
*/
private void update(final Exchange exchange)
throws SourceUnavailableException, IngestException, FrameworkProducerException {
throws SourceUnavailableException, IngestException, FrameworkProducerException,
ExecutionException, InterruptedException {
UpdateResponse updateResponse = null;

// read in data from exchange
Expand Down Expand Up @@ -258,7 +308,13 @@ private void update(final Exchange exchange)
}

LOGGER.debug("Making UPDATE call to Catalog Framework...");
updateResponse = catalogFramework.update(updateRequest);
Object timeoutObj = exchange.getIn().getHeader(TIMEOUT_HEADER_KEY);
if (timeoutObj != null) {
updateResponse =
processWithTimeout((long) timeoutObj, catalogFramework::update, updateRequest);
} else {
updateResponse = catalogFramework.update(updateRequest);
}

if (updateResponse == null) {
LOGGER.debug("UpdateResponse is null from catalog framework");
Expand Down Expand Up @@ -534,4 +590,9 @@ private List<Metacard> readBodyDataAsMetacards(final Exchange exchange) {

return metacardsToProcess;
}

@FunctionalInterface
public interface CatalogFunction<T extends Operation, R> {
R ingest(T t) throws IngestException, SourceUnavailableException;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/**
* Copyright (c) Codice Foundation
*
* <p>This is free software: you can redistribute it and/or modify it under the terms of the GNU
* Lesser General Public License as published by the Free Software Foundation, either version 3 of
* the License, or any later version.
*
* <p>This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
* without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details. A copy of the GNU Lesser General Public
* License is distributed along with this program and can be found at
* <http://www.gnu.org/licenses/lgpl.html>.
*/
package ddf.camel.component.catalog.framework;

public class IngestTimeoutException extends FrameworkProducerException {
/** The constant serialVersionUID. */
private static final long serialVersionUID = 1L;

/** Instantiates a new {@code IngestTimeoutException}. */
public IngestTimeoutException() {
super();
}

/**
* Instantiates a new {@code IngestTimeoutException} with the provided message.
*
* @param message the message
*/
public IngestTimeoutException(String message) {
super(message);
}

/**
* Instantiates a new {@code IngestTimeoutException} with the provided message and throwable.
*
* @param message the message
* @param throwable the throwable
*/
public IngestTimeoutException(String message, Throwable throwable) {
super(message, throwable);
}

/**
* Instantiates a new {@code IngestTimeoutException} with the provided throwable.
*
* @param throwable the throwable
*/
public IngestTimeoutException(Throwable throwable) {
super(throwable);
}
}
Loading

0 comments on commit 28bb9aa

Please sign in to comment.