Skip to content

Commit

Permalink
NIFI-13745 Added StateManager Support to Python Processor API (#9419)
Browse files Browse the repository at this point in the history
Signed-off-by: David Handermann <exceptionfactory@apache.org>
  • Loading branch information
pgyori authored Nov 12, 2024
1 parent f15b7ca commit 2f3dd01
Show file tree
Hide file tree
Showing 8 changed files with 419 additions and 1 deletion.
58 changes: 58 additions & 0 deletions nifi-docs/src/main/asciidoc/python-developer-guide.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,64 @@ that there are no longer any invocations of the `transform` method running when



[[state-manager]]
=== State Manager

The `context` object that is the parameter of the `transform()`, `create()`, `onScheduled()` and `onStopped()` methods
provides access to State Manager through the `getStateManager()` method. State Manager is responsible for providing
a simple API for storing and retrieving state. For information on State Manager, refer to the NiFi Developer's Guide.

The Python StateManager object returned by `context.getStateManager()` provides access to the underlying Java StateManager.
With the Python StateManager, the state is handled in the form of Python dictionaries (as opposed to Java Maps).
Just like in Java, StateManager can handle both `LOCAL` and `CLUSTER` state.

Should an error occur accessing the state using the StateManager's methods, a StateException is thrown that can be caught and
handled in the Python Processor's code. This enables the Python developer to implement the Processor in such a way that
when a state-related error occurs, the Processor can continue its operation without disruption.
If a StateException is thrown by StateManager but not caught in the Processor's code, the ProcessSession is rolled back
and an error is logged.

Below is an example Processor that uses StateManager. This example assumes that the Processor is executed on
primary node only, and only on one thread.
----
from nifiapi.componentstate import Scope, StateManager, StateException
from nifiapi.flowfilesource import FlowFileSource, FlowFileSourceResult
class CreateFlowFile(FlowFileSource):
class Java:
implements = ['org.apache.nifi.python.processor.FlowFileSource']
class ProcessorDetails:
version = '0.0.1-SNAPSHOT'
description = '''A Python processor that creates FlowFiles and uses StateManager.'''
tags = ['test', 'python', 'source', 'state']
def __init__(self, **kwargs):
pass
def onScheduled(self, context):
try:
self.state = context.getStateManager().getState(Scope.CLUSTER).toMap()
except StateException as e:
self.logger.warn('Failed to read processor state. ' + str(e))
self.state = dict()
def create(self, context):
old_value = int(self.state.get('FlowFileNumber', '0'))
new_value = old_value + 1
new_state = {'FlowFileNumber': str(new_value)}
try:
context.getStateManager().setState(new_state, Scope.CLUSTER)
self.state = new_state
except StateException as e:
self.logger.warn('Failed to save state. ' + str(e))
return FlowFileSourceResult(relationship='success', attributes=new_state, contents=None)
----



[[documenting_use_cases]]
== Documenting Use Cases

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
} catch (final Py4JNetworkException e) {
throw new ProcessException("Failed to communicate with Python Process", e);
} catch (final Exception e) {
getLogger().error("Failed to create FlowFile {}", e);
getLogger().error("Failed to create FlowFile", e);
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.nifi.python.processor;

import org.apache.nifi.annotation.behavior.DefaultRunDuration;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.SupportsSensitiveDynamicProperties;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
Expand All @@ -26,6 +27,7 @@
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
Expand All @@ -44,6 +46,8 @@

@SupportsBatching(defaultDuration = DefaultRunDuration.TWENTY_FIVE_MILLIS)
@SupportsSensitiveDynamicProperties
@Stateful(scopes = {Scope.LOCAL, Scope.CLUSTER},
description = "Python processors can store and retrieve state using the State Management APIs. Consult the State Manager section of the Developer's Guide for more details.")
public abstract class PythonProcessorProxy<T extends PythonProcessor> extends AbstractProcessor implements AsyncLoadedProcessor {
private final String processorType;
private volatile PythonProcessorInitializationContext initContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.nifi.components.AsyncLoadedProcessor;
import org.apache.nifi.components.AsyncLoadedProcessor.LoadState;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.flowfile.FlowFile;
Expand All @@ -31,6 +32,7 @@
import org.apache.nifi.python.PythonProcessConfig;
import org.apache.nifi.python.PythonProcessorDetails;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.state.MockStateManager;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
Expand Down Expand Up @@ -610,6 +612,76 @@ public void testCreateNothing() {
runner.assertTransferCount("success", 0);
}

@Test
public void testStateManagerSetState() {
final TestRunner runner = createStateManagerTesterProcessor("setState");

final MockStateManager stateManager = runner.getStateManager();
stateManager.assertStateNotSet();

runner.run();

stateManager.assertStateSet(Scope.CLUSTER);
stateManager.assertStateEquals(Map.of("state_key_1", "state_value_1"), Scope.CLUSTER);
runner.assertTransferCount("success", 1);
}

@Test
public void testStateManagerGetState() throws IOException {
final TestRunner runner = createStateManagerTesterProcessor("getState");

final MockStateManager stateManager = initializeStateManager(runner);

runner.run();

// The processor reads the state and adds the key-value pairs
// to the FlowFile as attributes.
stateManager.assertStateEquals(Map.of("state_key_1", "state_value_1"), Scope.CLUSTER);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship("success").get(0);
flowFile.assertAttributeEquals("state_key_1", "state_value_1");
}

@Test
public void testStateManagerReplace() throws IOException {
final TestRunner runner = createStateManagerTesterProcessor("replace");

final MockStateManager stateManager = initializeStateManager(runner);

runner.run();

final Map finalState = Map.of("state_key_2", "state_value_2");
stateManager.assertStateEquals(finalState, Scope.CLUSTER);
runner.assertTransferCount("success", 1);
}

@Test
public void testStateManagerClear() throws IOException {
final TestRunner runner = createStateManagerTesterProcessor("clear");

final MockStateManager stateManager = initializeStateManager(runner);

runner.run();

stateManager.assertStateEquals(Map.of(), Scope.CLUSTER);
runner.assertTransferCount("success", 1);
}

@Test
public void testStateManagerExceptionHandling() {
// Use-case tested: StateManager's exception can be caught
// in the Python code and execution continued without producing errors.
final TestRunner runner = createProcessor("TestStateManagerException");
waitForValid(runner);

final MockStateManager stateManager = runner.getStateManager();
stateManager.setFailOnStateSet(Scope.CLUSTER, true);

runner.run();

runner.assertTransferCount("success", 1);
runner.getFlowFilesForRelationship("success").get(0).assertAttributeEquals("exception_msg", "Set state failed");
}

public interface StringLookupService extends ControllerService {
Optional<String> lookup(Map<String, String> coordinates);
}
Expand Down Expand Up @@ -690,4 +762,18 @@ private void testSourceProcessor(final String processorName,
output.assertContentEquals(expectedContent);
}

private TestRunner createStateManagerTesterProcessor(String methodToTest) {
final TestRunner runner = createProcessor("TestStateManager");
runner.setProperty("StateManager Method To Test", methodToTest);
waitForValid(runner);
return runner;
}

private MockStateManager initializeStateManager(TestRunner runner) throws IOException {
final MockStateManager stateManager = runner.getStateManager();
final Map initialState = Map.of("state_key_1", "state_value_1");
stateManager.setState(initialState, Scope.CLUSTER);
return stateManager;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from nifiapi.componentstate import Scope, StateManager, StateException
from nifiapi.flowfilesource import FlowFileSource, FlowFileSourceResult
from nifiapi.properties import PropertyDescriptor, StandardValidators, PropertyDependency

class TestStateManager(FlowFileSource):
class Java:
implements = ['org.apache.nifi.python.processor.FlowFileSource']

class ProcessorDetails:
version = '0.0.1-SNAPSHOT'
description = '''A Python source processor that uses StateManager.'''
tags = ['text', 'test', 'python', 'source']

METHOD_TO_TEST = PropertyDescriptor(
name='StateManager Method To Test',
description='''The name of StateManager's method that should be tested.''',
required=True
)

property_descriptors = [METHOD_TO_TEST]

def __init__(self, **kwargs):
pass

def getPropertyDescriptors(self):
return self.property_descriptors

def create(self, context):
method_to_test = context.getProperty(self.METHOD_TO_TEST).getValue()
flowfile_attributes = None
state_manager = context.getStateManager()

match method_to_test.lower():
case 'setstate':
new_state = {'state_key_1': 'state_value_1'}
state_manager.setState(new_state, Scope.CLUSTER)
case 'getstate':
flowfile_attributes = state_manager.getState(Scope.CLUSTER).toMap()
case 'replace':
old_state = state_manager.getState(Scope.CLUSTER)
new_state = {'state_key_2': 'state_value_2'}
state_manager.replace(old_state, new_state, Scope.CLUSTER)
case 'clear':
state_manager.clear(Scope.CLUSTER)
case _:
pass

return FlowFileSourceResult(relationship='success', attributes=flowfile_attributes, contents='Output FlowFile Contents')
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from nifiapi.componentstate import Scope, StateManager, StateException
from nifiapi.flowfilesource import FlowFileSource, FlowFileSourceResult

class TestStateManagerException(FlowFileSource):
class Java:
implements = ['org.apache.nifi.python.processor.FlowFileSource']

class ProcessorDetails:
version = '0.0.1-SNAPSHOT'
description = '''A Python source processor that uses StateManager.'''
tags = ['text', 'test', 'python', 'source']

def __init__(self, **kwargs):
pass

def create(self, context):
state_manager = context.getStateManager()
flowfile_attributes = None
try:
new_state = {'state_key_1': 'state_value_1'}
state_manager.setState(new_state, Scope.CLUSTER)
except StateException as state_exception:
flowfile_attributes = {'exception_msg': str(state_exception)}

return FlowFileSourceResult(relationship='success', attributes=flowfile_attributes, contents='Output FlowFile Contents')
Loading

0 comments on commit 2f3dd01

Please sign in to comment.