A few weeks have passed since you built your data flow with DataFlow Designer to filter out critical syslog events to a dedicated Kafka topic. Now that everyone has better visibility into real-time health, management wants to do historical analysis on the data. Your company is evaluating Apache Iceberg to build an open data lakehouse and you are tasked with building a flow that ingests the most critical syslog events into an Iceberg table. Visit the Cloudera YouTube channel for a video walkthrough of this use case.
- On the CDP Public Cloud Home Page, navigate to DataFlow
- Navigate to the ReadyFlow Gallery
- Explore the ReadyFlow Gallery
- Search for the “Kafka to Iceberg” ReadyFlow.
-
Click on “Create New Draft” to open the ReadyFlow in the Designer
-
Select the only available workspace and give your draft a name
-
Click "Create". You will be forwarded to the Designer
-
Start a Test Session by either clicking on the start a test session link in the banner or going to Flow Options and selecting Start in the Test Session section.
-
In the Test Session creation wizard, confirm the latest NiFi version is selected and click Start Test Session. Notice how the status at the top now says “Initializing Test Session”.
Note: Test Session initialization should take about 5-10 minutes.
The flow consists of three processors and looks very promising for our use case. The first processor reads data from a Kafka topic, the second processor gives us the option to batch up events and create larger files which are then written out to Iceberg by the PutIceberg processor. All we have to do now to reach our goal is to customize its configuration to our use case.
-
a. Navigate to Flow Options → Parameters
-
b. Select all parameters that show No value set and provide the following values
Note: The parameter values that need to be copied from the Trial Manager homepage are found by selecting Manage Trial in the upper right corner and then selecting Configurations. See screenshots below.
Name | Value |
---|---|
CDP Workload User | srv_nifi-kafka-ingest |
CDP Workload User Password | <Copy the value for 'nifi-kafka-ingest-password' from Trial Manager homepage> |
Data Input Format | JSON |
Hive Catalog Namespace | <There is a value set. Change from 'default' to 'syslog'> |
Iceberg Table Name | syslog_critical_archive |
Kafka Broker Endpoint | <Comma-separated list of Kafka Broker addresses. Copy the value for 'kafka_broker' from Trial Manager homepage> |
Kafka Consumer Group ID | cdf |
Kafka Source Topic | syslog_critical |
Schema Name | syslog |
Schema Registry Hostname | <Hostname of Schema Registry service. Copy the value for 'schema_registry_host_name' from Trial Manager homepage> |
- c. Click Apply Changes to save the parameter values
-
a. After your test session has started successfully, navigate to Flow Options → Services
-
b. Select CDP_Schema_Registry service and click Enable Service and Referencing Components action
-
c. Start from the top of the list and enable all remaining Controller services as needed
-
d. Make sure all services have been enabled
Navigate back to the Flow Designer canvas by clicking on the Back To Flow Designer link at the top of the screen.
3. Stop the ConsumeFromKafka processor using the right click action menu or the Stop button in the configuration drawer if it has been started automatically.
Our data warehouse team has created an Iceberg table that they want us to ingest the critical syslog data in. A challenge we are facing is that not all column names in the Iceberg table match our syslog record schema. So we have to add functionality to our flow that allows us to change the schema of our syslog records. To do this, we will be using the JoltTransformRecord processor.
-
Add a new JoltTransformRecord to the canvas by dragging the processor icon to the canvas.
-
In the Add Processor window, select the JoltTransformRecord type and name the processor TransformSchema.
-
Validate that your new processor now appears on the canvas.
-
Create connections from ConsumeFromKafka to TransformSchema by hovering over the ConsumeFromKafka processor and dragging the arrow that appears to TransformSchema. Pick the success relationship to connect.
Now connect the success relationship of TransformSchema to the MergeRecords processor.
-
Now that we have connected our new TransformSchema processor, we can delete the original connection between ConsumeFromKafka and MergeRecords.
Make sure that the ConsumeFromKafka processor is stopped. Right click on the success_ConsumeFromKafka-MergeContent connection and select Empty Queue. Then right-click select Delete.
If the connection contains queued data, you have to empty it first by right clicking and selecting Empty Queue.
Now all syslog events that we receive, will go through the TransformSchema processor.
-
To make sure that our schema transformation works, we have to create a new Record Writer Service and use it as the Record Writer for the TransformSchema processor.
Select the TransformSchema processor and open the configuration panel. Scroll to the Properties section, click the three dot menu in the Record Writer row and select Add Service to create a new Record Writer.
-
Select AvroRecordSetWriter, name it TransformedSchemaWriter and click Add.
Click Apply in the configuration panel to save your changes.
-
Now click the three dot menu again and select Go To Service to configure our new Avro Record Writer.
-
To configure our new Avro Record Writer, provide the following values:
Name | Description | Value |
---|---|---|
Schema Write Strategy | Specify whether/how CDF should write schema information | Embed Avro Schema |
Schema Access Strategy | Specify how CDF identifies the schema to apply | Use ‘Schema Name’ Property |
Schema Registry | Specify the Schema Registry that stores our schema | CDP_Schema_Registry |
Schema Name | The schema name to look up in the Schema Registry | syslog_transformed |
-
Convert the value that you provided for Schema Name into a parameter. Click on the three dot menu in the Schema Name row and select Convert To Parameter.
-
Give the parameter the name Schema Name Transformed and click “Add”. You have now created a new parameter from a value that can be used in more places in your data flow.
-
Apply your configuration changes and Enable the Service by clicking the power icon. Now you have configured our new Schema Writer and we can return back to the Flow Designer canvas.
-
In the configuration drawer, scroll down to the Referencing Components section and click on TransformSchema to get back to the canvas.
- In the properties section, provide the following values:
Name | Description | Value |
---|---|---|
Record Reader | Service used to parse incoming events | AvroReader |
Record Writer | Service used to format outgoing events | TransformedSchemaWriter |
Jolt Specification | The specification that describes how to modify the incoming JSON data. We are standardizing on lower case field names and renaming the timestamp field to event_timestamp. | <Copy from below> |
[
{
"operation": "shift",
"spec": {
"appName": "appname",
"timestamp": "event_timestamp",
"structuredData": {
"SDID": {
"eventId": "structureddata.sdid.eventid",
"eventSource": "structureddata.sdid.eventsource",
"iut": "structureddata.sdid.iut"
}
},
"*": {
"@": "&"
}
}
}
]
- Scroll to Relationships and select Terminate for the failure, original relationships and click Apply.
-
Start your ConsumeFromKafka and TransformSchema processors.
-
Validate that the transformed data matches our Iceberg table schema. Once events are queuing up in the connection between TransformSchema and MergeRecord, right click the connection and select List Queue.
-
Select any of the queued files and select the book icon to open it in the Data Viewer
-
Notice how all field names have been transformed to lower case and how the timestamp field has been renamed to event_timestamp.
Now that we have verified that our schema is being transformed as needed, it’s time to start the remaining processors and write our events into the Iceberg table. The MergeRecords processor is configured to batch events up to increase efficiency when writing to Iceberg. The final processor, WriteToIceberg takes our Avro records and writes them into a Parquet formatted table.
-
Select the MergeRecords processor and explore its configuration. It is configured to batch events up for at least 5 minutes or until the queued up events have reached Maximum Bin Size of 1GB.
-
Start the MergeRecords processor and verify that it batches up events and writes them out after 5 minutes. Tip: You can change the Max Bin Age configuration to something like “30 sec” to speed up processing.
-
Select the WriteToIceberg processor and explore its configuration. Notice how it relies on several parameters to establish a connection to the right database and table.
-
Start the WriteToIceberg processor and verify that it writes records successfully to Iceberg. If the metrics on the processor increase and you don’t see any warnings or events being written to the failure_WriteToIceberg connection, your writes are successful!
Congratulations! With this you have completed the second use case. Feel free to publish your flow to the catalog and create a deployment.
Info:
The completed flow definition for this use case can be found in the Catalog under the name "Use Case 2 - Cloudera - Write critical syslog events to Iceberg". If you run into issues during your flow development, it may be helpful to select this flow in the Catalog and select "Create New Draft" to open it in the Flow Designer to compare to your own.