Skip to content

Commit

Permalink
NIFI-14009 Moved Toolkit Client code in its own nifi-toolkit-client m…
Browse files Browse the repository at this point in the history
…odule

This closes #9522

Signed-off-by: David Handermann <exceptionfactory@apache.org>
  • Loading branch information
pvillard31 authored and exceptionfactory committed Nov 18, 2024
1 parent 20c7913 commit ddef74b
Show file tree
Hide file tree
Showing 229 changed files with 1,603 additions and 1,570 deletions.
8 changes: 1 addition & 7 deletions nifi-system-tests/nifi-system-test-suite/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -162,15 +162,9 @@
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-toolkit-cli</artifactId>
<artifactId>nifi-toolkit-client</artifactId>
<version>2.1.0-SNAPSHOT</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.inject</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.scheduling.ExecutionNode;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.toolkit.cli.impl.client.nifi.ConnectionClient;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClient;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
import org.apache.nifi.toolkit.cli.impl.client.nifi.ProcessorClient;
import org.apache.nifi.toolkit.cli.impl.client.nifi.VersionsClient;
import org.apache.nifi.toolkit.client.ConnectionClient;
import org.apache.nifi.toolkit.client.NiFiClient;
import org.apache.nifi.toolkit.client.NiFiClientException;
import org.apache.nifi.toolkit.client.ProcessorClient;
import org.apache.nifi.toolkit.client.VersionsClient;
import org.apache.nifi.web.api.dto.AssetReferenceDTO;
import org.apache.nifi.web.api.dto.BundleDTO;
import org.apache.nifi.web.api.dto.ConfigVerificationResultDTO;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@

import org.apache.nifi.cluster.coordination.node.ClusterRoles;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClient;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientConfig;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
import org.apache.nifi.toolkit.cli.impl.client.nifi.RequestConfig;
import org.apache.nifi.toolkit.cli.impl.client.nifi.impl.JerseyNiFiClient;
import org.apache.nifi.toolkit.client.NiFiClient;
import org.apache.nifi.toolkit.client.NiFiClientConfig;
import org.apache.nifi.toolkit.client.NiFiClientException;
import org.apache.nifi.toolkit.client.RequestConfig;
import org.apache.nifi.toolkit.client.impl.JerseyNiFiClient;
import org.apache.nifi.web.api.dto.NodeDTO;
import org.apache.nifi.web.api.dto.status.ConnectionStatusSnapshotDTO;
import org.apache.nifi.web.api.dto.status.ProcessGroupStatusSnapshotDTO;
Expand Down Expand Up @@ -74,7 +74,7 @@ public abstract class NiFiSystemIT implements NiFiInstanceProvider {
private final ConcurrentMap<String, Long> lastLogTimestamps = new ConcurrentHashMap<>();

private static final String QUEUE_SIZE_LOGGING_KEY = "Queue Sizes";
// Group ID | Source Name | Dest Name | Conn Name | Queue Size |
// Group ID | Source Name | Dest Name | Conn Name | Queue Size |
private static final String QUEUE_SIZES_FORMAT = "| %1$-36.36s | %2$-30.30s | %3$-30.30s | %4$-30.30s | %5$-30.30s |";

public static final RequestConfig DO_NOT_REPLICATE = () -> Collections.singletonMap("request-replicated", Boolean.TRUE.toString());
Expand Down Expand Up @@ -136,7 +136,6 @@ protected boolean isAllowFactoryReuse() {
return true;
}


@AfterAll
public static void cleanup() {
logger.info("Beginning cleanup");
Expand All @@ -155,8 +154,10 @@ public void teardown() throws Exception {
logger.info("Beginning teardown");

try {
// In some cases a test can pass, but still leave a clustered instance with one of the nodes in a bad state, if the instance then gets reused
// it will cause later tests to fail, so it is better to destroy the environment if the cluster is in a bad state at the end of a test
// In some cases a test can pass, but still leave a clustered instance with one
// of the nodes in a bad state, if the instance then gets reused
// it will cause later tests to fail, so it is better to destroy the environment
// if the cluster is in a bad state at the end of a test
final NiFiInstance nifiInstance = nifiRef.get();
if (nifiInstance != null && nifiInstance.isClustered() && (!isCoordinatorElected() || !allNodesConnected(nifiInstance.getNumberOfNodes()))) {
logger.info("Clustered environment is in a bad state, will completely tear down the environments and start with a clean environment for the next test.");
Expand All @@ -179,8 +180,10 @@ public void teardown() throws Exception {
instanceCache.poison(nifiRef.get());
cleanup();
} else if (destroyFlowFailure != null) {
// If unable to destroy the flow, we need to shutdown the instance and delete the flow and completely recreate the environment.
// Otherwise, we will be left in an unknown state for the next test, and that can cause cascading failures that are very difficult
// If unable to destroy the flow, we need to shutdown the instance and delete
// the flow and completely recreate the environment.
// Otherwise, we will be left in an unknown state for the next test, and that
// can cause cascading failures that are very difficult
// to understand and troubleshoot.
logger.info("Because there was a failure when destroying the flow, will completely tear down the environments and start with a clean environment for the next test.");
instanceCache.poison(nifiRef.get());
Expand Down Expand Up @@ -214,18 +217,18 @@ public NiFiInstanceFactory getInstanceFactory() {

public NiFiInstanceFactory createStandaloneInstanceFactory() {
return new SpawnedStandaloneNiFiInstanceFactory(
new InstanceConfiguration.Builder()
.bootstrapConfig("src/test/resources/conf/default/bootstrap.conf")
.instanceDirectory("target/standalone-instance")
.overrideNifiProperties(getNifiPropertiesOverrides())
.unpackPythonExtensions(false)
.build());
new InstanceConfiguration.Builder()
.bootstrapConfig("src/test/resources/conf/default/bootstrap.conf")
.instanceDirectory("target/standalone-instance")
.overrideNifiProperties(getNifiPropertiesOverrides())
.unpackPythonExtensions(false)
.build());
}

public NiFiInstanceFactory createTwoNodeInstanceFactory() {
return new SpawnedClusterNiFiInstanceFactory(
"src/test/resources/conf/clustered/node1/bootstrap.conf",
"src/test/resources/conf/clustered/node2/bootstrap.conf");
"src/test/resources/conf/clustered/node1/bootstrap.conf",
"src/test/resources/conf/clustered/node2/bootstrap.conf");
}

public NiFiInstanceFactory createPythonicInstanceFactory() {
Expand Down Expand Up @@ -329,14 +332,14 @@ protected NiFiClientUtil getClientUtil() {

protected NiFiClient createClient(final int port) {
final NiFiClientConfig clientConfig = new NiFiClientConfig.Builder()
.baseUrl("http://localhost:" + port)
.connectTimeout(30000)
.readTimeout(30000)
.build();
.baseUrl("http://localhost:" + port)
.connectTimeout(30000)
.readTimeout(30000)
.build();

return new JerseyNiFiClient.Builder()
.config(clientConfig)
.build();
.config(clientConfig)
.build();
}

protected int getClientApiPort() {
Expand Down Expand Up @@ -418,7 +421,9 @@ protected void waitForNodeStatus(final NodeDTO nodeDto, final String status) thr
final ClusterEntity clusterEntity = getNifiClient().getControllerClient().getNodes();
final Collection<NodeDTO> nodes = clusterEntity.getCluster().getNodes();
final NodeDTO nodeDtoMatch = nodes.stream()
.filter(n -> n.getApiPort().equals(nodeDto.getApiPort())).findFirst().get();
.filter(n -> n.getApiPort().equals(nodeDto.getApiPort()))
.findFirst()
.get();
return nodeDtoMatch.getStatus().equals(status);
} catch (final Exception e) {
logger.error("Failed to determine node status", e);
Expand Down Expand Up @@ -509,11 +514,11 @@ protected void logQueueSizes() throws NiFiClientException, IOException {

logger.info("Dump of Queue Sizes:");
final String headerLine = String.format(QUEUE_SIZES_FORMAT,
"Group ID",
"Source Name",
"Destination Name",
"Connection Name",
"Queued");
"Group ID",
"Source Name",
"Destination Name",
"Connection Name",
"Queued");
logger.info(headerLine);

for (final ConnectionStatusSnapshotEntity connectionStatus : connectionStatuses) {
Expand All @@ -523,11 +528,11 @@ protected void logQueueSizes() throws NiFiClientException, IOException {
}

final String formatted = String.format(QUEUE_SIZES_FORMAT,
statusSnapshotDto.getGroupId(),
statusSnapshotDto.getSourceName(),
statusSnapshotDto.getDestinationName(),
statusSnapshotDto.getName(),
statusSnapshotDto.getQueued());
statusSnapshotDto.getGroupId(),
statusSnapshotDto.getSourceName(),
statusSnapshotDto.getDestinationName(),
statusSnapshotDto.getName(),
statusSnapshotDto.getQueued());
logger.info(formatted);
}

Expand Down Expand Up @@ -572,16 +577,19 @@ public NodeDTO getNodeDtoByNodeIndex(final int nodeIndex) throws NiFiClientExcep

public NodeDTO getNodeDtoByApiPort(final int apiPort) throws NiFiClientException, IOException {
final ClusterEntity clusterEntity = getNifiClient().getControllerClient().getNodes();
final NodeDTO node2Dto = clusterEntity.getCluster().getNodes().stream()
.filter(nodeDto -> nodeDto.getApiPort() == apiPort)
.findAny()
.orElseThrow(() -> new RuntimeException("Could not locate Node 2"));
final NodeDTO node2Dto = clusterEntity.getCluster()
.getNodes()
.stream()
.filter(nodeDto -> nodeDto.getApiPort() == apiPort)
.findAny()
.orElseThrow(() -> new RuntimeException("Could not locate Node 2"));

return node2Dto;
}

/**
* Disconnects a node from the cluster
*
* @param nodeIndex the 1-based index of the node
*/
protected void disconnectNode(final int nodeIndex) throws NiFiClientException, IOException, InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import org.apache.nifi.bootstrap.configuration.ConfigurationProvider;
import org.apache.nifi.bootstrap.configuration.StandardConfigurationProvider;
import org.apache.nifi.registry.security.util.KeystoreType;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClient;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientConfig;
import org.apache.nifi.toolkit.cli.impl.client.nifi.impl.JerseyNiFiClient;
import org.apache.nifi.toolkit.client.NiFiClient;
import org.apache.nifi.toolkit.client.NiFiClientConfig;
import org.apache.nifi.toolkit.client.impl.JerseyNiFiClient;
import org.apache.nifi.util.file.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -295,7 +295,6 @@ public void stop() {
}
}


private void cleanup() throws IOException {
if (instanceDirectory.exists()) {
FileUtils.deleteFile(instanceDirectory, true);
Expand Down Expand Up @@ -387,20 +386,20 @@ public NiFiClient createClient() throws IOException {
final String truststoreType = nifiProperties.getProperty("nifi.security.truststoreType");

final NiFiClientConfig clientConfig = new NiFiClientConfig.Builder()
.baseUrl("http://localhost:" + webPort)
.connectTimeout(30000)
.readTimeout(30000)
.keystoreFilename(getAbsolutePath(nifiProperties.getProperty("nifi.security.keystore")))
.keystorePassword(nifiProperties.getProperty("nifi.security.keystorePasswd"))
.keystoreType(keystoreType == null ? null : KeystoreType.valueOf(keystoreType))
.truststoreFilename(getAbsolutePath(nifiProperties.getProperty("nifi.security.truststore")))
.truststorePassword(nifiProperties.getProperty("nifi.security.truststorePasswd"))
.truststoreType(truststoreType == null ? null : KeystoreType.valueOf(truststoreType))
.build();
.baseUrl("http://localhost:" + webPort)
.connectTimeout(30000)
.readTimeout(30000)
.keystoreFilename(getAbsolutePath(nifiProperties.getProperty("nifi.security.keystore")))
.keystorePassword(nifiProperties.getProperty("nifi.security.keystorePasswd"))
.keystoreType(keystoreType == null ? null : KeystoreType.valueOf(keystoreType))
.truststoreFilename(getAbsolutePath(nifiProperties.getProperty("nifi.security.truststore")))
.truststorePassword(nifiProperties.getProperty("nifi.security.truststorePasswd"))
.truststoreType(truststoreType == null ? null : KeystoreType.valueOf(truststoreType))
.build();

return new JerseyNiFiClient.Builder()
.config(clientConfig)
.build();
.config(clientConfig)
.build();
}

private String getAbsolutePath(final String filename) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.nifi.tests.system.classloaders;

import org.apache.nifi.tests.system.NiFiSystemIT;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
import org.apache.nifi.toolkit.client.NiFiClientException;
import org.apache.nifi.web.api.entity.ConnectionEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.junit.jupiter.api.Test;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import org.apache.nifi.tests.system.NiFiInstanceFactory;
import org.apache.nifi.tests.system.NiFiSystemIT;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
import org.apache.nifi.toolkit.client.NiFiClientException;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.api.entity.ConnectionEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.tests.system.NiFiInstanceFactory;
import org.apache.nifi.tests.system.NiFiSystemIT;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
import org.apache.nifi.toolkit.client.NiFiClientException;
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.api.dto.PortDTO;
import org.apache.nifi.web.api.dto.ProcessorDTO;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.nifi.tests.system.NiFiInstanceFactory;
import org.apache.nifi.tests.system.NiFiSystemIT;
import org.apache.nifi.tests.system.SpawnedClusterNiFiInstanceFactory;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
import org.apache.nifi.toolkit.client.NiFiClientException;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.junit.jupiter.api.Test;

Expand Down
Loading

0 comments on commit ddef74b

Please sign in to comment.