Skip to content

Commit

Permalink
NIFI-14284 ConsumeKinesisStream migrate property names to new convent…
Browse files Browse the repository at this point in the history
…ion (#9741)

Signed-off-by: David Handermann <exceptionfactory@apache.org>
  • Loading branch information
dariuszseweryn authored Feb 25, 2025
1 parent d018675 commit 71d244d
Showing 1 changed file with 33 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
Expand Down Expand Up @@ -169,31 +170,27 @@ public class ConsumeKinesisStream extends AbstractAwsAsyncProcessor<KinesisAsync
);

static final PropertyDescriptor KINESIS_STREAM_NAME = new PropertyDescriptor.Builder()
.name("kinesis-stream-name")
.displayName("Amazon Kinesis Stream Name")
.name("Amazon Kinesis Stream Name")
.description("The name of Kinesis Stream")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();

public static final PropertyDescriptor APPLICATION_NAME = new PropertyDescriptor.Builder()
.displayName("Application Name")
.name("amazon-kinesis-stream-application-name")
.name("Application Name")
.description("The Kinesis stream reader application name.")
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.required(true).build();

public static final PropertyDescriptor INITIAL_STREAM_POSITION = new PropertyDescriptor.Builder()
.displayName("Initial Stream Position")
.name("amazon-kinesis-stream-initial-position")
.name("Initial Stream Position")
.description("Initial position to read Kinesis streams.")
.allowableValues(LATEST, TRIM_HORIZON, AT_TIMESTAMP)
.defaultValue(LATEST.getValue())
.required(true).build();

public static final PropertyDescriptor STREAM_POSITION_TIMESTAMP = new PropertyDescriptor.Builder()
.displayName("Stream Position Timestamp")
.name("amazon-kinesis-stream-position-timestamp")
.name("Stream Position Timestamp")
.description("Timestamp position in stream from which to start reading Kinesis Records. " +
"Required if " + INITIAL_STREAM_POSITION.getDescription() + " is " + AT_TIMESTAMP.getDisplayName() + ". " +
"Uses the Timestamp Format to parse value into a Date.")
Expand All @@ -202,8 +199,7 @@ public class ConsumeKinesisStream extends AbstractAwsAsyncProcessor<KinesisAsync
.required(false).build();

public static final PropertyDescriptor TIMESTAMP_FORMAT = new PropertyDescriptor.Builder()
.displayName("Timestamp Format")
.name("amazon-kinesis-stream-timestamp-format")
.name("Timestamp Format")
.description("Format to use for parsing the " + STREAM_POSITION_TIMESTAMP.getDisplayName() + " into a Date " +
"and converting the Kinesis Record's Approximate Arrival Timestamp into a FlowFile attribute.")
.addValidator((subject, input, context) -> {
Expand All @@ -223,65 +219,57 @@ public class ConsumeKinesisStream extends AbstractAwsAsyncProcessor<KinesisAsync
.required(true).build();

public static final PropertyDescriptor FAILOVER_TIMEOUT = new PropertyDescriptor.Builder()
.displayName("Failover Timeout")
.name("amazon-kinesis-stream-failover-timeout")
.name("Failover Timeout")
.description("Kinesis Client Library failover timeout")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("30 secs")
.required(true).build();

public static final PropertyDescriptor GRACEFUL_SHUTDOWN_TIMEOUT = new PropertyDescriptor.Builder()
.displayName("Graceful Shutdown Timeout")
.name("amazon-kinesis-stream-graceful-shutdown-timeout")
.name("Graceful Shutdown Timeout")
.description("Kinesis Client Library graceful shutdown timeout")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("20 secs")
.required(true).build();

public static final PropertyDescriptor CHECKPOINT_INTERVAL = new PropertyDescriptor.Builder()
.displayName("Checkpoint Interval")
.name("amazon-kinesis-stream-checkpoint-interval")
.name("Checkpoint Interval")
.description("Interval between Kinesis checkpoints")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("3 secs")
.required(true).build();

public static final PropertyDescriptor NUM_RETRIES = new PropertyDescriptor.Builder()
.displayName("Retry Count")
.name("amazon-kinesis-stream-retry-count")
.name("Retry Count")
.description("Number of times to retry a Kinesis operation (process record, checkpoint, shutdown)")
.addValidator(StandardValidators.INTEGER_VALIDATOR)
.defaultValue("10")
.required(true).build();

public static final PropertyDescriptor RETRY_WAIT = new PropertyDescriptor.Builder()
.displayName("Retry Wait")
.name("amazon-kinesis-stream-retry-wait")
.name("Retry Wait")
.description("Interval between Kinesis operation retries (process record, checkpoint, shutdown)")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("1 sec")
.required(true).build();

public static final PropertyDescriptor DYNAMODB_ENDPOINT_OVERRIDE = new PropertyDescriptor.Builder()
.displayName("DynamoDB Override")
.name("amazon-kinesis-stream-dynamodb-override")
.name("DynamoDB Override")
.description("DynamoDB override to use non-AWS deployments")
.addValidator(StandardValidators.URL_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.required(false).build();

public static final PropertyDescriptor REPORT_CLOUDWATCH_METRICS = new PropertyDescriptor.Builder()
.displayName("Report Metrics to CloudWatch")
.name("amazon-kinesis-stream-cloudwatch-flag")
.name("Report Metrics to CloudWatch")
.description("Whether to report Kinesis usage metrics to CloudWatch.")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.allowableValues("true", "false")
.defaultValue("false")
.required(true).build();

public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
.name("amazon-kinesis-stream-record-reader")
.displayName("Record Reader")
.name("Record Reader")
.description("The Record Reader to use for reading received messages." +
" The Kinesis Stream name can be referred to by Expression Language '${" +
AbstractKinesisRecordProcessor.KINESIS_RECORD_SCHEMA_KEY + "}' to access a schema." +
Expand All @@ -292,8 +280,7 @@ public class ConsumeKinesisStream extends AbstractAwsAsyncProcessor<KinesisAsync
.build();

public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
.name("amazon-kinesis-stream-record-writer")
.displayName("Record Writer")
.name("Record Writer")
.description("The Record Writer to use for serializing Records to an output FlowFile." +
" The Kinesis Stream name can be referred to by Expression Language '${" +
AbstractKinesisRecordProcessor.KINESIS_RECORD_SCHEMA_KEY + "}' to access a schema." +
Expand Down Expand Up @@ -405,6 +392,24 @@ public void onPropertyModified(final PropertyDescriptor descriptor, final String
}
}

@Override
public void migrateProperties(final PropertyConfiguration config) {
config.renameProperty("kinesis-stream-name", "Amazon Kinesis Stream Name");
config.renameProperty("amazon-kinesis-stream-application-name", "Application Name");
config.renameProperty("amazon-kinesis-stream-initial-position", "Initial Stream Position");
config.renameProperty("amazon-kinesis-stream-position-timestamp", "Stream Position Timestamp");
config.renameProperty("amazon-kinesis-stream-timestamp-format", "Timestamp Format");
config.renameProperty("amazon-kinesis-stream-failover-timeout", "Failover Timeout");
config.renameProperty("amazon-kinesis-stream-graceful-shutdown-timeout", "Graceful Shutdown Timeout");
config.renameProperty("amazon-kinesis-stream-checkpoint-interval", "Checkpoint Interval");
config.renameProperty("amazon-kinesis-stream-retry-count", "Retry Count");
config.renameProperty("amazon-kinesis-stream-retry-wait", "Retry Wait");
config.renameProperty("amazon-kinesis-stream-dynamodb-override", "DynamoDB Override");
config.renameProperty("amazon-kinesis-stream-cloudwatch-flag", "Report Metrics to CloudWatch");
config.renameProperty("amazon-kinesis-stream-record-reader", "Record Reader");
config.renameProperty("amazon-kinesis-stream-record-writer", "Record Writer");
}

@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
final PropertyDescriptor.Builder builder = new PropertyDescriptor.Builder()
Expand Down

0 comments on commit 71d244d

Please sign in to comment.