diff --git a/nifi-stateless/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/NiFiStateless.java b/nifi-stateless/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/NiFiStateless.java index 778881bd4ad17..74b697ca39268 100644 --- a/nifi-stateless/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/NiFiStateless.java +++ b/nifi-stateless/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/NiFiStateless.java @@ -43,8 +43,9 @@ public class NiFiStateless { public static void main(final String[] args) throws IOException, ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException { String nifi_home = System.getenv("NIFI_HOME"); - if(nifi_home == null || nifi_home.equals("")) + if(nifi_home == null || nifi_home.equals("")) { nifi_home = "."; + } final File libDir = new File(nifi_home+"/lib"); final File statelesslibDir = new File(nifi_home+"/stateless-lib"); diff --git a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/AbstractStatelessComponent.java b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/AbstractStatelessComponent.java index 67151df04e1f6..3028e9f7f6a4d 100644 --- a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/AbstractStatelessComponent.java +++ b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/AbstractStatelessComponent.java @@ -87,7 +87,7 @@ public boolean validate() { boolean hasSuccessOutputPort = this.successOutputPorts.contains(relationship); if (!(hasChildren || hasAutoterminate || hasFailureOutputPort || hasSuccessOutputPort)) { - getLogger().error("Component: {}, Relationship: {}, needs either auto terminate, child processors, or an output port", new Object[] {toString(), relationship.getName()}); + getLogger().error("Component: {}, Relationship: {}, either needs to be auto-terminated or connected to another component", new Object[] {toString(), relationship.getName()}); return false; } } diff --git a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/ComponentFactory.java b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/ComponentFactory.java index 2fc3459f8a916..d0acdc710cd4b 100644 --- a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/ComponentFactory.java +++ b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/ComponentFactory.java @@ -21,7 +21,10 @@ import org.apache.nifi.bundle.Bundle; import org.apache.nifi.bundle.BundleCoordinate; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.StateManager; import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.controller.ControllerServiceInitializationContext; +import org.apache.nifi.controller.ControllerServiceLookup; import org.apache.nifi.controller.exception.ControllerServiceInstantiationException; import org.apache.nifi.controller.exception.ProcessorInstantiationException; import org.apache.nifi.logging.ComponentLog; @@ -33,6 +36,8 @@ import org.apache.nifi.registry.flow.VersionedControllerService; import org.apache.nifi.registry.flow.VersionedProcessor; import org.apache.nifi.util.file.classloader.ClassLoaderUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.net.MalformedURLException; import java.net.URL; @@ -44,6 +49,7 @@ import java.util.Set; public class ComponentFactory { + private static final Logger logger = LoggerFactory.getLogger(ComponentFactory.class); private final ExtensionManager extensionManager; public ComponentFactory(final ExtensionManager extensionManager) { @@ -69,7 +75,7 @@ public StatelessProcessorWrapper createProcessor(final VersionedProcessor versio final ClassLoader detectedClassLoader = extensionManager.createInstanceClassLoader(type, identifier, bundle, classpathUrls == null ? Collections.emptySet() : classpathUrls); - System.out.println("Setting context class loader to " + detectedClassLoader + " (parent = " + detectedClassLoader.getParent() + ") to create " + type); + logger.debug("Setting context class loader to {} (parent = {}) to create {}", detectedClassLoader, detectedClassLoader.getParent(), type); final Class rawClass = Class.forName(type, true, detectedClassLoader); Thread.currentThread().setContextClassLoader(detectedClassLoader); @@ -138,12 +144,14 @@ private Set getAdditionalClasspathResources(final List } - public ControllerService createControllerService(final VersionedControllerService versionedControllerService, final VariableRegistry variableRegistry) { - return createControllerService(versionedControllerService, variableRegistry, null); + public ControllerService createControllerService(final VersionedControllerService versionedControllerService, final VariableRegistry variableRegistry, + final ControllerServiceLookup serviceLookup, final StateManager stateManager) { + return createControllerService(versionedControllerService, variableRegistry, null, serviceLookup, stateManager); } - private ControllerService createControllerService(final VersionedControllerService versionedControllerService, final VariableRegistry variableRegistry, final Set classpathUrls) { + private ControllerService createControllerService(final VersionedControllerService versionedControllerService, final VariableRegistry variableRegistry, final Set classpathUrls, + final ControllerServiceLookup serviceLookup, final StateManager stateManager) { final String type = versionedControllerService.getType(); final String identifier = versionedControllerService.getIdentifier(); @@ -161,7 +169,7 @@ private ControllerService createControllerService(final VersionedControllerServi final ClassLoader detectedClassLoader = extensionManager.createInstanceClassLoader(type, identifier, bundle, classpathUrls == null ? Collections.emptySet() : classpathUrls); - System.out.println("Setting context class loader to " + detectedClassLoader + " (parent = " + detectedClassLoader.getParent() + ") to create " + type); + logger.debug("Setting context class loader to {} (parent = {}) to create {}", detectedClassLoader, detectedClassLoader.getParent(), type); final Class rawClass = Class.forName(type, true, detectedClassLoader); Thread.currentThread().setContextClassLoader(detectedClassLoader); @@ -169,6 +177,8 @@ private ControllerService createControllerService(final VersionedControllerServi final ComponentLog componentLog = new SLF4JComponentLog(extensionInstance); final ControllerService service = (ControllerService) extensionInstance; + final ControllerServiceInitializationContext initializationContext = new StatelessControllerServiceInitializationContext(identifier, service, serviceLookup, stateManager); + service.initialize(initializationContext); // If no classpath urls were provided, check if we need to add additional classpath URL's based on configured properties. if (classpathUrls == null) { @@ -176,7 +186,7 @@ private ControllerService createControllerService(final VersionedControllerServi variableRegistry, componentLog); if (!additionalClasspathUrls.isEmpty()) { - return createControllerService(versionedControllerService, variableRegistry, additionalClasspathUrls); + return createControllerService(versionedControllerService, variableRegistry, additionalClasspathUrls, serviceLookup, stateManager); } } diff --git a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessControllerServiceConfiguration.java b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessControllerServiceConfiguration.java index e2d2a74acccca..5cd32cf43ed87 100644 --- a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessControllerServiceConfiguration.java +++ b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessControllerServiceConfiguration.java @@ -27,12 +27,15 @@ public class StatelessControllerServiceConfiguration { private final ControllerService service; + private final String name; + private final AtomicBoolean enabled = new AtomicBoolean(false); private String annotationData; private Map properties = new HashMap<>(); - public StatelessControllerServiceConfiguration(final ControllerService service) { + public StatelessControllerServiceConfiguration(final ControllerService service, final String name) { this.service = service; + this.name = name; } public ControllerService getService() { @@ -75,4 +78,8 @@ public String getAnnotationData() { public Map getProperties() { return Collections.unmodifiableMap(properties); } + + public String getName() { + return name; + } } diff --git a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessControllerServiceInitializationContext.java b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessControllerServiceInitializationContext.java new file mode 100644 index 0000000000000..b0801f01f3e73 --- /dev/null +++ b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessControllerServiceInitializationContext.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.stateless.core; + +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.controller.ControllerServiceInitializationContext; +import org.apache.nifi.controller.ControllerServiceLookup; +import org.apache.nifi.controller.NodeTypeProvider; +import org.apache.nifi.logging.ComponentLog; + +import java.io.File; + +public class StatelessControllerServiceInitializationContext implements ControllerServiceInitializationContext { + private final ComponentLog logger; + private final String processorId; + private final ControllerServiceLookup controllerServiceLookup; + private final StateManager stateManager; + + public StatelessControllerServiceInitializationContext(final String id, final ControllerService controllerService, final ControllerServiceLookup serviceLookup, final StateManager stateManager) { + processorId = id; + logger = new SLF4JComponentLog(controllerService); + controllerServiceLookup = serviceLookup; + this.stateManager = stateManager; + } + + public String getIdentifier() { + return processorId; + } + + public ComponentLog getLogger() { + return logger; + } + + @Override + public StateManager getStateManager() { + return stateManager; + } + + public ControllerServiceLookup getControllerServiceLookup() { + return controllerServiceLookup; + } + + public NodeTypeProvider getNodeTypeProvider() { + return new NodeTypeProvider() { + public boolean isClustered() { + return false; + } + + public boolean isPrimary() { + return false; + } + }; + } + + public String getKerberosServicePrincipal() { + return null; //this needs to be wired in. + } + + public File getKerberosServiceKeytab() { + return null; //this needs to be wired in. + } + + public File getKerberosConfigurationFile() { + return null; //this needs to be wired in. + }} diff --git a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessControllerServiceLookup.java b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessControllerServiceLookup.java index 00b1c4862b7a1..f220ed87277b8 100644 --- a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessControllerServiceLookup.java +++ b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessControllerServiceLookup.java @@ -21,6 +21,7 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.state.StateManager; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ControllerServiceLookup; @@ -28,6 +29,7 @@ import org.apache.nifi.reporting.InitializationException; import java.lang.reflect.InvocationTargetException; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -47,7 +49,7 @@ public Map getControllerService } - public void addControllerService(final ControllerService service) throws InitializationException { + public void addControllerService(final ControllerService service, final String serviceName) throws InitializationException { final String identifier = service.getIdentifier(); final SLF4JComponentLog logger = new SLF4JComponentLog(service); controllerServiceLoggers.put(identifier, logger); @@ -55,7 +57,7 @@ public void addControllerService(final ControllerService service) throws Initial StatelessStateManager serviceStateManager = new StatelessStateManager(); controllerServiceStateManagers.put(identifier, serviceStateManager); - final StatelessProcessContext initContext = new StatelessProcessContext(requireNonNull(service), this, requireNonNull(identifier), logger, serviceStateManager); + final StatelessProcessContext initContext = new StatelessProcessContext(requireNonNull(service), this, serviceName, logger, serviceStateManager); service.initialize(initContext); try { @@ -64,7 +66,7 @@ public void addControllerService(final ControllerService service) throws Initial throw new InitializationException(e); } - final StatelessControllerServiceConfiguration config = new StatelessControllerServiceConfiguration(service); + final StatelessControllerServiceConfiguration config = new StatelessControllerServiceConfiguration(service, serviceName); controllerServiceMap.put(identifier, config); } @@ -116,7 +118,33 @@ public String getControllerServiceName(final String serviceIdentifier) { return status == null ? null : serviceIdentifier; } - public void enableControllerService(final ControllerService service, VariableRegistry registry) throws InvocationTargetException, IllegalAccessException { + + public void enableControllerServices(final VariableRegistry variableRegistry) { + for (final StatelessControllerServiceConfiguration config : controllerServiceMap.values()) { + final ControllerService service = config.getService(); + final Collection validationResults = validate(service, config.getName(), variableRegistry); + if (!validationResults.isEmpty()) { + throw new RuntimeException("Failed to enable Controller Service {id=" + service.getIdentifier() + ", name=" + config.getName() + ", type=" + service.getClass() + "} because " + + "validation failed: " + validationResults); + } + + try { + enableControllerService(service, variableRegistry); + } catch (IllegalAccessException| InvocationTargetException e) { + throw new RuntimeException("Failed to enable Controller Service {id=" + service.getIdentifier() + ", name=" + config.getName() + ", type=" + service.getClass() + "}", e); + } + } + } + + public Collection validate(final ControllerService service, final String serviceName, final VariableRegistry variableRegistry) { + final StateManager stateManager = controllerServiceStateManagers.get(service.getIdentifier()); + final SLF4JComponentLog logger = controllerServiceLoggers.get(service.getIdentifier()); + final StatelessProcessContext processContext = new StatelessProcessContext(service, this, serviceName, logger, stateManager, variableRegistry); + final StatelessValidationContext validationContext = new StatelessValidationContext(processContext, this, stateManager, variableRegistry); + return service.validate(validationContext); + } + + private void enableControllerService(final ControllerService service, final VariableRegistry registry) throws InvocationTargetException, IllegalAccessException { final StatelessControllerServiceConfiguration configuration = getConfiguration(service.getIdentifier()); if (configuration == null) { throw new IllegalArgumentException("Controller Service " + service + " is not known"); @@ -125,6 +153,7 @@ public void enableControllerService(final ControllerService service, VariableReg if (configuration.isEnabled()) { throw new IllegalStateException("Cannot enable Controller Service " + service + " because it is not disabled"); } + final ConfigurationContext configContext = new StatelessConfigurationContext(service, configuration.getProperties(), this, registry); ReflectionUtils.invokeMethodsWithAnnotation(OnEnabled.class, service, configContext); @@ -150,8 +179,8 @@ private StatelessControllerServiceConfiguration getControllerServiceConfigToUpda return configuration; } - public ValidationResult setControllerServiceProperty(final ControllerService service, final PropertyDescriptor property, final StatelessProcessContext context, final VariableRegistry registry, final - String value) { + public ValidationResult setControllerServiceProperty(final ControllerService service, final PropertyDescriptor property, final StatelessProcessContext context, + final VariableRegistry registry, final String value) { final StatelessStateManager serviceStateManager = controllerServiceStateManagers.get(service.getIdentifier()); if (serviceStateManager == null) { throw new IllegalStateException("Controller service " + service + " has not been added to this TestRunner via the #addControllerService method"); diff --git a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessFlow.java b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessFlow.java index a2a8f042f5c40..2b17376fa6b03 100644 --- a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessFlow.java +++ b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessFlow.java @@ -19,11 +19,10 @@ import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonObject; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.StateManager; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.exception.ProcessorInstantiationException; -import org.apache.nifi.stateless.bootstrap.ExtensionDiscovery; -import org.apache.nifi.stateless.bootstrap.InMemoryFlowFile; -import org.apache.nifi.stateless.bootstrap.RunnableFlow; import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.processor.Relationship; import org.apache.nifi.registry.VariableDescriptor; @@ -40,6 +39,9 @@ import org.apache.nifi.registry.flow.VersionedRemoteProcessGroup; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.security.util.SslContextFactory; +import org.apache.nifi.stateless.bootstrap.ExtensionDiscovery; +import org.apache.nifi.stateless.bootstrap.InMemoryFlowFile; +import org.apache.nifi.stateless.bootstrap.RunnableFlow; import javax.net.ssl.SSLContext; import java.io.File; @@ -117,12 +119,36 @@ public StatelessFlow(final VersionedProcessGroup flow, final ExtensionManager ex final Set controllerServices = flow.getControllerServices(); for (final VersionedControllerService versionedControllerService : controllerServices) { - final ControllerService service = componentFactory.createControllerService(versionedControllerService, variableRegistry); - serviceLookup.addControllerService(service); + final StateManager stateManager = new StatelessStateManager(); + + final ControllerService service = componentFactory.createControllerService(versionedControllerService, variableRegistry, serviceLookup, stateManager); + serviceLookup.addControllerService(service, versionedControllerService.getName()); + serviceLookup.setControllerServiceAnnotationData(service, versionedControllerService.getAnnotationData()); + + final SLF4JComponentLog logger = new SLF4JComponentLog(service); + final StatelessProcessContext processContext = new StatelessProcessContext(service, serviceLookup, versionedControllerService.getName(), logger, stateManager, variableRegistry); + + final Map versionedPropertyValues = versionedControllerService.getProperties(); + for (final Map.Entry entry : versionedPropertyValues.entrySet()) { + final String propertyName = entry.getKey(); + final String propertyValue = entry.getValue(); + final PropertyDescriptor descriptor = service.getPropertyDescriptor(propertyName); + + serviceLookup.setControllerServiceProperty(service, descriptor, processContext, variableRegistry, propertyValue); + } + + for (final PropertyDescriptor descriptor : service.getPropertyDescriptors()) { + final String versionedPropertyValue = versionedPropertyValues.get(descriptor.getName()); + if (versionedPropertyValue == null && descriptor.getDefaultValue() != null) { + serviceLookup.setControllerServiceProperty(service, descriptor, processContext, variableRegistry, descriptor.getDefaultValue()); + } + } } - final Map componentMap = new HashMap<>(); + serviceLookup.enableControllerServices(variableRegistry); + + final Map componentMap = new HashMap<>(); for (final VersionedConnection connection : connections) { boolean isInputPortConnection = false; @@ -249,10 +275,9 @@ public StatelessFlow(final VersionedProcessGroup flow, final ExtensionManager ex } } - roots = componentMap.entrySet() + roots = componentMap.values() .stream() - .filter(e -> e.getValue().getParents().isEmpty()) - .map(Map.Entry::getValue) + .filter(statelessComponent -> statelessComponent.getParents().isEmpty()) .collect(Collectors.toList()); } diff --git a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessProcessContext.java b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessProcessContext.java index 7566b5cd3217f..3766b0741bb1e 100644 --- a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessProcessContext.java +++ b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessProcessContext.java @@ -70,29 +70,25 @@ public class StatelessProcessContext implements SchedulingContext, ControllerSer private final StatelessControllerServiceLookup lookup; - public StatelessProcessContext(final ConfigurableComponent component, final StatelessControllerServiceLookup lookup, final String componentName, final StateManager stateManager, final VariableRegistry - variableRegistry) { + public StatelessProcessContext(final ConfigurableComponent component, final StatelessControllerServiceLookup lookup, final String componentName, final StateManager stateManager, + final VariableRegistry variableRegistry) { this(component, lookup, componentName, new SLF4JComponentLog(component), stateManager, variableRegistry); } - public StatelessProcessContext(final ConfigurableComponent component, final StatelessControllerServiceLookup lookup, final String componentName, final SLF4JComponentLog logger, final StatelessStateManager - statemanager) { + public StatelessProcessContext(final ConfigurableComponent component, final StatelessControllerServiceLookup lookup, final String componentName, final SLF4JComponentLog logger, + final StatelessStateManager statemanager) { this(component, lookup, componentName, logger, statemanager, VariableRegistry.EMPTY_REGISTRY); } - public StatelessProcessContext(final ConfigurableComponent component, - final StatelessControllerServiceLookup lookup, - final String componentName, - final SLF4JComponentLog logger, - final StateManager stateManager, - final VariableRegistry variableRegistry) { + public StatelessProcessContext(final ConfigurableComponent component, final StatelessControllerServiceLookup lookup, final String componentName, + final SLF4JComponentLog logger, final StateManager stateManager, final VariableRegistry variableRegistry) { this.component = Objects.requireNonNull(component); this.componentName = componentName == null ? "" : componentName; this.inputRequirement = component.getClass().getAnnotation(InputRequirement.class); this.lookup = lookup; this.stateManager = stateManager; this.variableRegistry = variableRegistry; - this.identifier = "ProcessContext-" + this.hashCode(); + this.identifier = component.getIdentifier(); this.logger = logger; } diff --git a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessProcessSession.java b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessProcessSession.java index 431b5cf477294..a6c5dec6bccea 100644 --- a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessProcessSession.java +++ b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessProcessSession.java @@ -777,12 +777,13 @@ public OutputStream write(FlowFile flowFile) { throw new IllegalArgumentException("Cannot export a flow file that I did not create"); } - final StatelessFlowFile StatelessFlowFile = validateState(flowFile); + validateState(flowFile); final ByteArrayOutputStream baos = new ByteArrayOutputStream() { @Override public void close() throws IOException { super.close(); final StatelessFlowFile newFlowFile = new StatelessFlowFile((StatelessFlowFile) flowFile, materializeContent); + newFlowFile.setData(toByteArray()); currentVersions.put(newFlowFile.getId(), newFlowFile); } }; diff --git a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessProcessorWrapper.java b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessProcessorWrapper.java index dd4f4a82976f7..108b2e2f6c752 100644 --- a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessProcessorWrapper.java +++ b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessProcessorWrapper.java @@ -107,8 +107,8 @@ private void initialize() { //Validate context final Collection validationResult = context.validate(); if (validationResult.stream().anyMatch(a -> !a.isValid()) || !this.validate()) { - throw new IllegalArgumentException("Processor is not valid: " - + String.join("\n", validationResult.stream().map(ValidationResult::toString).collect(Collectors.toList()))); + throw new IllegalArgumentException(processor + " is not valid: " + + validationResult.stream().map(ValidationResult::toString).collect(Collectors.joining("\n"))); } try (final CloseableNarLoader c = withNarClassLoader()) { @@ -147,7 +147,7 @@ public boolean runRecursive(final Queue output) { final AtomicBoolean nextStepCalled = new AtomicBoolean(false); try { - logger.info("Running " + this.processor.getClass().getSimpleName() + ".onTrigger with " + inputQueue.size() + " FlowFiles"); + logger.debug("Running {}.onTrigger with {} FlowFiles", new Object[] {this.processor.getClass().getSimpleName(), inputQueue.size()}); try (final CloseableNarLoader c = withNarClassLoader()) { // Trigger processor with the appropriate class loader processor.onTrigger(context, () -> {