Skip to content

Commit

Permalink
NIFI-5922: Bug fixes; initialize, setup, and enable controller servic…
Browse files Browse the repository at this point in the history
…es; code cleanup
  • Loading branch information
markap14 authored and SamHjelmfelt committed May 17, 2019
1 parent 74c43b7 commit a72d559
Show file tree
Hide file tree
Showing 10 changed files with 188 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ public class NiFiStateless {
public static void main(final String[] args) throws IOException, ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {

String nifi_home = System.getenv("NIFI_HOME");
if(nifi_home == null || nifi_home.equals(""))
if(nifi_home == null || nifi_home.equals("")) {
nifi_home = ".";
}

final File libDir = new File(nifi_home+"/lib");
final File statelesslibDir = new File(nifi_home+"/stateless-lib");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public boolean validate() {
boolean hasSuccessOutputPort = this.successOutputPorts.contains(relationship);

if (!(hasChildren || hasAutoterminate || hasFailureOutputPort || hasSuccessOutputPort)) {
getLogger().error("Component: {}, Relationship: {}, needs either auto terminate, child processors, or an output port", new Object[] {toString(), relationship.getName()});
getLogger().error("Component: {}, Relationship: {}, either needs to be auto-terminated or connected to another component", new Object[] {toString(), relationship.getName()});
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.controller.exception.ControllerServiceInstantiationException;
import org.apache.nifi.controller.exception.ProcessorInstantiationException;
import org.apache.nifi.logging.ComponentLog;
Expand All @@ -33,6 +36,8 @@
import org.apache.nifi.registry.flow.VersionedControllerService;
import org.apache.nifi.registry.flow.VersionedProcessor;
import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.MalformedURLException;
import java.net.URL;
Expand All @@ -44,6 +49,7 @@
import java.util.Set;

public class ComponentFactory {
private static final Logger logger = LoggerFactory.getLogger(ComponentFactory.class);
private final ExtensionManager extensionManager;

public ComponentFactory(final ExtensionManager extensionManager) {
Expand All @@ -69,7 +75,7 @@ public StatelessProcessorWrapper createProcessor(final VersionedProcessor versio
final ClassLoader detectedClassLoader = extensionManager.createInstanceClassLoader(type, identifier, bundle,
classpathUrls == null ? Collections.emptySet() : classpathUrls);

System.out.println("Setting context class loader to " + detectedClassLoader + " (parent = " + detectedClassLoader.getParent() + ") to create " + type);
logger.debug("Setting context class loader to {} (parent = {}) to create {}", detectedClassLoader, detectedClassLoader.getParent(), type);
final Class<?> rawClass = Class.forName(type, true, detectedClassLoader);
Thread.currentThread().setContextClassLoader(detectedClassLoader);

Expand Down Expand Up @@ -138,12 +144,14 @@ private Set<URL> getAdditionalClasspathResources(final List<PropertyDescriptor>
}


public ControllerService createControllerService(final VersionedControllerService versionedControllerService, final VariableRegistry variableRegistry) {
return createControllerService(versionedControllerService, variableRegistry, null);
public ControllerService createControllerService(final VersionedControllerService versionedControllerService, final VariableRegistry variableRegistry,
final ControllerServiceLookup serviceLookup, final StateManager stateManager) {
return createControllerService(versionedControllerService, variableRegistry, null, serviceLookup, stateManager);
}


private ControllerService createControllerService(final VersionedControllerService versionedControllerService, final VariableRegistry variableRegistry, final Set<URL> classpathUrls) {
private ControllerService createControllerService(final VersionedControllerService versionedControllerService, final VariableRegistry variableRegistry, final Set<URL> classpathUrls,
final ControllerServiceLookup serviceLookup, final StateManager stateManager) {

final String type = versionedControllerService.getType();
final String identifier = versionedControllerService.getIdentifier();
Expand All @@ -161,22 +169,24 @@ private ControllerService createControllerService(final VersionedControllerServi
final ClassLoader detectedClassLoader = extensionManager.createInstanceClassLoader(type, identifier, bundle,
classpathUrls == null ? Collections.emptySet() : classpathUrls);

System.out.println("Setting context class loader to " + detectedClassLoader + " (parent = " + detectedClassLoader.getParent() + ") to create " + type);
logger.debug("Setting context class loader to {} (parent = {}) to create {}", detectedClassLoader, detectedClassLoader.getParent(), type);
final Class<?> rawClass = Class.forName(type, true, detectedClassLoader);
Thread.currentThread().setContextClassLoader(detectedClassLoader);

final Object extensionInstance = rawClass.newInstance();
final ComponentLog componentLog = new SLF4JComponentLog(extensionInstance);

final ControllerService service = (ControllerService) extensionInstance;
final ControllerServiceInitializationContext initializationContext = new StatelessControllerServiceInitializationContext(identifier, service, serviceLookup, stateManager);
service.initialize(initializationContext);

// If no classpath urls were provided, check if we need to add additional classpath URL's based on configured properties.
if (classpathUrls == null) {
final Set<URL> additionalClasspathUrls = getAdditionalClasspathResources(service.getPropertyDescriptors(), service.getIdentifier(), versionedControllerService.getProperties(),
variableRegistry, componentLog);

if (!additionalClasspathUrls.isEmpty()) {
return createControllerService(versionedControllerService, variableRegistry, additionalClasspathUrls);
return createControllerService(versionedControllerService, variableRegistry, additionalClasspathUrls, serviceLookup, stateManager);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,15 @@
public class StatelessControllerServiceConfiguration {

private final ControllerService service;
private final String name;

private final AtomicBoolean enabled = new AtomicBoolean(false);
private String annotationData;
private Map<PropertyDescriptor, String> properties = new HashMap<>();

public StatelessControllerServiceConfiguration(final ControllerService service) {
public StatelessControllerServiceConfiguration(final ControllerService service, final String name) {
this.service = service;
this.name = name;
}

public ControllerService getService() {
Expand Down Expand Up @@ -75,4 +78,8 @@ public String getAnnotationData() {
public Map<PropertyDescriptor, String> getProperties() {
return Collections.unmodifiableMap(properties);
}

public String getName() {
return name;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.stateless.core;

import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.controller.NodeTypeProvider;
import org.apache.nifi.logging.ComponentLog;

import java.io.File;

public class StatelessControllerServiceInitializationContext implements ControllerServiceInitializationContext {
private final ComponentLog logger;
private final String processorId;
private final ControllerServiceLookup controllerServiceLookup;
private final StateManager stateManager;

public StatelessControllerServiceInitializationContext(final String id, final ControllerService controllerService, final ControllerServiceLookup serviceLookup, final StateManager stateManager) {
processorId = id;
logger = new SLF4JComponentLog(controllerService);
controllerServiceLookup = serviceLookup;
this.stateManager = stateManager;
}

public String getIdentifier() {
return processorId;
}

public ComponentLog getLogger() {
return logger;
}

@Override
public StateManager getStateManager() {
return stateManager;
}

public ControllerServiceLookup getControllerServiceLookup() {
return controllerServiceLookup;
}

public NodeTypeProvider getNodeTypeProvider() {
return new NodeTypeProvider() {
public boolean isClustered() {
return false;
}

public boolean isPrimary() {
return false;
}
};
}

public String getKerberosServicePrincipal() {
return null; //this needs to be wired in.
}

public File getKerberosServiceKeytab() {
return null; //this needs to be wired in.
}

public File getKerberosConfigurationFile() {
return null; //this needs to be wired in.
}}
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.reporting.InitializationException;

import java.lang.reflect.InvocationTargetException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
Expand All @@ -47,15 +49,15 @@ public Map<String, StatelessControllerServiceConfiguration> getControllerService
}


public void addControllerService(final ControllerService service) throws InitializationException {
public void addControllerService(final ControllerService service, final String serviceName) throws InitializationException {
final String identifier = service.getIdentifier();
final SLF4JComponentLog logger = new SLF4JComponentLog(service);
controllerServiceLoggers.put(identifier, logger);

StatelessStateManager serviceStateManager = new StatelessStateManager();
controllerServiceStateManagers.put(identifier, serviceStateManager);

final StatelessProcessContext initContext = new StatelessProcessContext(requireNonNull(service), this, requireNonNull(identifier), logger, serviceStateManager);
final StatelessProcessContext initContext = new StatelessProcessContext(requireNonNull(service), this, serviceName, logger, serviceStateManager);
service.initialize(initContext);

try {
Expand All @@ -64,7 +66,7 @@ public void addControllerService(final ControllerService service) throws Initial
throw new InitializationException(e);
}

final StatelessControllerServiceConfiguration config = new StatelessControllerServiceConfiguration(service);
final StatelessControllerServiceConfiguration config = new StatelessControllerServiceConfiguration(service, serviceName);
controllerServiceMap.put(identifier, config);
}

Expand Down Expand Up @@ -116,7 +118,33 @@ public String getControllerServiceName(final String serviceIdentifier) {
return status == null ? null : serviceIdentifier;
}

public void enableControllerService(final ControllerService service, VariableRegistry registry) throws InvocationTargetException, IllegalAccessException {

public void enableControllerServices(final VariableRegistry variableRegistry) {
for (final StatelessControllerServiceConfiguration config : controllerServiceMap.values()) {
final ControllerService service = config.getService();
final Collection<ValidationResult> validationResults = validate(service, config.getName(), variableRegistry);
if (!validationResults.isEmpty()) {
throw new RuntimeException("Failed to enable Controller Service {id=" + service.getIdentifier() + ", name=" + config.getName() + ", type=" + service.getClass() + "} because " +
"validation failed: " + validationResults);
}

try {
enableControllerService(service, variableRegistry);
} catch (IllegalAccessException| InvocationTargetException e) {
throw new RuntimeException("Failed to enable Controller Service {id=" + service.getIdentifier() + ", name=" + config.getName() + ", type=" + service.getClass() + "}", e);
}
}
}

public Collection<ValidationResult> validate(final ControllerService service, final String serviceName, final VariableRegistry variableRegistry) {
final StateManager stateManager = controllerServiceStateManagers.get(service.getIdentifier());
final SLF4JComponentLog logger = controllerServiceLoggers.get(service.getIdentifier());
final StatelessProcessContext processContext = new StatelessProcessContext(service, this, serviceName, logger, stateManager, variableRegistry);
final StatelessValidationContext validationContext = new StatelessValidationContext(processContext, this, stateManager, variableRegistry);
return service.validate(validationContext);
}

private void enableControllerService(final ControllerService service, final VariableRegistry registry) throws InvocationTargetException, IllegalAccessException {
final StatelessControllerServiceConfiguration configuration = getConfiguration(service.getIdentifier());
if (configuration == null) {
throw new IllegalArgumentException("Controller Service " + service + " is not known");
Expand All @@ -125,6 +153,7 @@ public void enableControllerService(final ControllerService service, VariableReg
if (configuration.isEnabled()) {
throw new IllegalStateException("Cannot enable Controller Service " + service + " because it is not disabled");
}

final ConfigurationContext configContext = new StatelessConfigurationContext(service, configuration.getProperties(), this, registry);
ReflectionUtils.invokeMethodsWithAnnotation(OnEnabled.class, service, configContext);

Expand All @@ -150,8 +179,8 @@ private StatelessControllerServiceConfiguration getControllerServiceConfigToUpda
return configuration;
}

public ValidationResult setControllerServiceProperty(final ControllerService service, final PropertyDescriptor property, final StatelessProcessContext context, final VariableRegistry registry, final
String value) {
public ValidationResult setControllerServiceProperty(final ControllerService service, final PropertyDescriptor property, final StatelessProcessContext context,
final VariableRegistry registry, final String value) {
final StatelessStateManager serviceStateManager = controllerServiceStateManagers.get(service.getIdentifier());
if (serviceStateManager == null) {
throw new IllegalStateException("Controller service " + service + " has not been added to this TestRunner via the #addControllerService method");
Expand Down
Loading

0 comments on commit a72d559

Please sign in to comment.