Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port CDM updates to 2.26.x branch #6434

Merged
merged 7 commits into from
Dec 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions catalog/core/catalog-core-camelcomponent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@
<version>${tika.thirdparty.bundle.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ddf.security.core</groupId>
<artifactId>security-core-api</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down Expand Up @@ -196,17 +200,17 @@
<limit implementation="org.codice.jacoco.LenientLimit">
<counter>INSTRUCTION</counter>
<value>COVEREDRATIO</value>
<minimum>0.68</minimum>
<minimum>0.72</minimum>
</limit>
<limit implementation="org.codice.jacoco.LenientLimit">
<counter>BRANCH</counter>
<value>COVEREDRATIO</value>
<minimum>0.60</minimum>
<minimum>0.65</minimum>
</limit>
<limit implementation="org.codice.jacoco.LenientLimit">
<counter>COMPLEXITY</counter>
<value>COVEREDRATIO</value>
<minimum>0.53</minimum>
<minimum>0.56</minimum>
</limit>
</limits>
</rule>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import ddf.mime.MimeTypeMapper;
import ddf.mime.MimeTypeToTransformerMapper;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import org.apache.camel.Endpoint;
import org.apache.camel.support.DefaultComponent;
import org.osgi.framework.BundleContext;
Expand Down Expand Up @@ -49,6 +50,8 @@ public class CatalogComponent extends DefaultComponent {

private MimeTypeMapper mimeTypeMapper;

private ExecutorService executor;

public CatalogComponent() {
super();
LOGGER.debug("INSIDE CatalogComponent constructor");
Expand All @@ -74,9 +77,10 @@ protected Endpoint createEndpoint(String uri, String remaining, Map<String, Obje

LOGGER.debug("transformerId = {}", transformerId);

Endpoint endpoint =
CatalogEndpoint endpoint =
new CatalogEndpoint(
uri, this, transformerId, mimeType, remaining, catalogFramework, mimeTypeMapper);
endpoint.setExecutor(executor);
try {
setProperties(endpoint, parameters);
} catch (Exception e) {
Expand Down Expand Up @@ -138,4 +142,12 @@ public void setCatalogFramework(CatalogFramework catalogFramework) {
public void setMimeTypeMapper(MimeTypeMapper mimeTypeMapper) {
this.mimeTypeMapper = mimeTypeMapper;
}

public void setExecutor(ExecutorService executor) {
this.executor = executor;
}

public void destroy() {
executor.shutdownNow();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import ddf.camel.component.catalog.queryresponsetransformer.QueryResponseTransformerProducer;
import ddf.catalog.CatalogFramework;
import ddf.mime.MimeTypeMapper;
import java.util.concurrent.ExecutorService;
import org.apache.camel.Consumer;
import org.apache.camel.ExchangePattern;
import org.apache.camel.MultipleConsumersSupport;
Expand Down Expand Up @@ -60,6 +61,8 @@ public class CatalogEndpoint extends DefaultEndpoint implements MultipleConsumer

private MimeTypeMapper mimeTypeMapper;

private ExecutorService executor;

/**
* Constructs a CatalogEndpoint for the specified custom <code>catalog</code> component.
*
Expand Down Expand Up @@ -204,4 +207,12 @@ public boolean isSingleton() {
public boolean isMultipleConsumersSupported() {
return true;
}

public void setExecutor(ExecutorService executor) {
this.executor = executor;
}

public ExecutorService getExecutor() {
return this.executor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
*/
package ddf.camel.component.catalog.framework;

import ddf.camel.component.catalog.CatalogEndpoint;
import ddf.catalog.CatalogFramework;
import ddf.catalog.data.Metacard;
import ddf.catalog.operation.CreateRequest;
import ddf.catalog.operation.CreateResponse;
import ddf.catalog.operation.DeleteRequest;
import ddf.catalog.operation.DeleteResponse;
import ddf.catalog.operation.Operation;
import ddf.catalog.operation.Update;
import ddf.catalog.operation.UpdateRequest;
import ddf.catalog.operation.UpdateResponse;
Expand All @@ -27,13 +29,22 @@
import ddf.catalog.operation.impl.UpdateRequestImpl;
import ddf.catalog.source.IngestException;
import ddf.catalog.source.SourceUnavailableException;
import ddf.security.SecurityConstants;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.camel.Endpoint;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.camel.Exchange;
import org.apache.camel.TypeConversionException;
import org.apache.camel.support.DefaultProducer;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.shiro.SecurityUtils;
import org.apache.shiro.subject.Subject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -99,8 +110,12 @@ public class FrameworkProducer extends DefaultProducer {

private static final String OPERATION_HEADER_KEY = "operation";

private static final String TIMEOUT_HEADER_KEY = "timeoutMilliseconds";

private CatalogFramework catalogFramework;

private ExecutorService executor;

private static final String CATALOG_RESPONSE_NULL = "Catalog response object is null";

/**
Expand All @@ -109,13 +124,14 @@ public class FrameworkProducer extends DefaultProducer {
* @param endpoint the Camel endpoint that created this consumer
* @param catalogFramework the DDF Catalog Framework to use
*/
public FrameworkProducer(Endpoint endpoint, CatalogFramework catalogFramework) {
public FrameworkProducer(CatalogEndpoint endpoint, CatalogFramework catalogFramework) {
super(endpoint);
this.catalogFramework = catalogFramework;
this.executor = endpoint.getExecutor();
}

@Override
public void process(Exchange exchange) throws FrameworkProducerException {
public void process(Exchange exchange) throws FrameworkProducerException, InterruptedException {
try {
LOGGER.debug("Entering process method");

Expand Down Expand Up @@ -148,7 +164,7 @@ public void process(Exchange exchange) throws FrameworkProducerException {
exchange.getIn().setBody(new ArrayList<Metacard>());
LOGGER.debug("Received a non-String as the operation type");
throw new FrameworkProducerException(cce);
} catch (SourceUnavailableException | IngestException e) {
} catch (SourceUnavailableException | IngestException | ExecutionException e) {
LOGGER.debug("Exception cataloging metacards", e);
throw new FrameworkProducerException(e);
}
Expand All @@ -165,7 +181,8 @@ public void process(Exchange exchange) throws FrameworkProducerException {
* @throws ddf.camel.component.catalog.framework.FrameworkProducerException
*/
private void create(final Exchange exchange)
throws SourceUnavailableException, IngestException, FrameworkProducerException {
throws SourceUnavailableException, IngestException, FrameworkProducerException,
InterruptedException, ExecutionException {
CreateResponse createResponse = null;

// read in data
Expand All @@ -188,7 +205,13 @@ private void create(final Exchange exchange)
}

LOGGER.debug("Making CREATE call to Catalog Framework...");
createResponse = catalogFramework.create(createRequest);
Object timeoutObj = exchange.getIn().getHeader(TIMEOUT_HEADER_KEY);
if (timeoutObj != null) {
createResponse =
processWithTimeout((long) timeoutObj, catalogFramework::create, createRequest);
} else {
createResponse = catalogFramework.create(createRequest);
}

if (createResponse == null) {
LOGGER.debug("CreateResponse is null from catalog framework");
Expand Down Expand Up @@ -218,6 +241,32 @@ private void create(final Exchange exchange)
processCatalogResponse(createResponse, exchange);
}

<T extends Operation, R> R processWithTimeout(
long timeout, CatalogFunction<T, R> catalog, T request)
throws InterruptedException, IngestTimeoutException, ExecutionException {
LOGGER.trace("Running catalog operation with timeout of {}", timeout);
Subject subject = SecurityUtils.getSubject();
if (subject instanceof ddf.security.Subject) {
request
.getProperties()
.put(SecurityConstants.SECURITY_SUBJECT, (ddf.security.Subject) subject);
}
try {
List<Future<R>> futures =
executor.invokeAll(
Collections.singleton((Callable<R>) () -> catalog.ingest(request)),
timeout,
TimeUnit.MILLISECONDS);
if (!futures.get(0).isDone()) {
LOGGER.warn("Ingest task was canceled due to timeout");
throw new IngestTimeoutException("The ingest request timed out 1");
}
return futures.get(0).get();
} catch (CancellationException e) {
throw new IngestTimeoutException("The ingest request timed out 2");
}
}

/**
* Updates metacard(s) in the catalog using the Catalog Framework.
*
Expand All @@ -229,7 +278,8 @@ private void create(final Exchange exchange)
* @throws ddf.camel.component.catalog.framework.FrameworkProducerException
*/
private void update(final Exchange exchange)
throws SourceUnavailableException, IngestException, FrameworkProducerException {
throws SourceUnavailableException, IngestException, FrameworkProducerException,
ExecutionException, InterruptedException {
UpdateResponse updateResponse = null;

// read in data from exchange
Expand Down Expand Up @@ -258,7 +308,13 @@ private void update(final Exchange exchange)
}

LOGGER.debug("Making UPDATE call to Catalog Framework...");
updateResponse = catalogFramework.update(updateRequest);
Object timeoutObj = exchange.getIn().getHeader(TIMEOUT_HEADER_KEY);
if (timeoutObj != null) {
updateResponse =
processWithTimeout((long) timeoutObj, catalogFramework::update, updateRequest);
} else {
updateResponse = catalogFramework.update(updateRequest);
}

if (updateResponse == null) {
LOGGER.debug("UpdateResponse is null from catalog framework");
Expand Down Expand Up @@ -534,4 +590,9 @@ private List<Metacard> readBodyDataAsMetacards(final Exchange exchange) {

return metacardsToProcess;
}

@FunctionalInterface
public interface CatalogFunction<T extends Operation, R> {
R ingest(T t) throws IngestException, SourceUnavailableException;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/**
* Copyright (c) Codice Foundation
*
* <p>This is free software: you can redistribute it and/or modify it under the terms of the GNU
* Lesser General Public License as published by the Free Software Foundation, either version 3 of
* the License, or any later version.
*
* <p>This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
* without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details. A copy of the GNU Lesser General Public
* License is distributed along with this program and can be found at
* <http://www.gnu.org/licenses/lgpl.html>.
*/
package ddf.camel.component.catalog.framework;

public class IngestTimeoutException extends FrameworkProducerException {
/** The constant serialVersionUID. */
private static final long serialVersionUID = 1L;

/** Instantiates a new {@code IngestTimeoutException}. */
public IngestTimeoutException() {
super();
}

/**
* Instantiates a new {@code IngestTimeoutException} with the provided message.
*
* @param message the message
*/
public IngestTimeoutException(String message) {
super(message);
}

/**
* Instantiates a new {@code IngestTimeoutException} with the provided message and throwable.
*
* @param message the message
* @param throwable the throwable
*/
public IngestTimeoutException(String message, Throwable throwable) {
super(message, throwable);
}

/**
* Instantiates a new {@code IngestTimeoutException} with the provided throwable.
*
* @param throwable the throwable
*/
public IngestTimeoutException(Throwable throwable) {
super(throwable);
}
}
Loading