Skip to content

Commit

Permalink
Merge branch 'apache:main' into NIFI-13397-A
Browse files Browse the repository at this point in the history
  • Loading branch information
jrsteinebrey authored Jun 17, 2024
2 parents ac76a1f + cbe52a3 commit 8a7e29c
Show file tree
Hide file tree
Showing 45 changed files with 1,652 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ public enum MiNiFiProperties {
C2_KEEP_ALIVE_DURATION("c2.rest.keepAliveDuration", "5 min", false, true, TIME_PERIOD_VALIDATOR),
C2_REST_HTTP_HEADERS("c2.rest.http.headers", "Accept:application/json", false, true, VALID),
C2_CONFIG_DIRECTORY("c2.config.directory", "./conf", false, true, VALID),
C2_RUNTIME_MANIFEST_IDENTIFIER("c2.runtime.manifest.identifier", "", false, true, VALID),
C2_RUNTIME_TYPE("c2.runtime.type", "", false, true, VALID),
C2_RUNTIME_MANIFEST_IDENTIFIER("c2.runtime.manifest.identifier", "minifi", false, true, VALID),
C2_RUNTIME_TYPE("c2.runtime.type", "minifi-java", false, true, VALID),
C2_ASSET_DIRECTORY("c2.asset.directory", "./asset", false, true, VALID),
C2_SECURITY_TRUSTSTORE_LOCATION("c2.security.truststore.location", "", false, false, VALID),
C2_SECURITY_TRUSTSTORE_PASSWORD("c2.security.truststore.password", "", true, false, VALID),
Expand Down
4 changes: 2 additions & 2 deletions nifi-commons/nifi-metrics/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-jvm</artifactId>
<version>4.2.25</version>
<version>4.2.26</version>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>4.2.25</version>
<version>4.2.26</version>
</dependency>
</dependencies>
</project>
2 changes: 1 addition & 1 deletion nifi-commons/nifi-property-protection-azure/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>msal4j</artifactId>
<version>1.15.0</version>
<version>1.15.1</version>
</dependency>
</dependencies>
</project>
7 changes: 7 additions & 0 deletions nifi-commons/nifi-record-path/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -125,5 +125,12 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>

<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-json-record-utils</artifactId>
<version>2.0.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,13 @@

package org.apache.nifi.record.path;

import org.apache.nifi.json.JsonRecordSource;
import org.apache.nifi.json.JsonSchemaInference;
import org.apache.nifi.json.JsonTreeRowRecordReader;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.record.path.exception.RecordPathException;
import org.apache.nifi.schema.inference.TimeValueInference;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.MapRecord;
Expand All @@ -31,7 +37,10 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.IllegalCharsetNameException;
import java.nio.charset.StandardCharsets;
import java.sql.Date;
Expand Down Expand Up @@ -241,11 +250,67 @@ public void testDescendantField() {
}

@Test
public void testParent() {
public void testDescendantFieldWithArrayOfRecords() throws IOException, MalformedRecordException {
final String recordJson = """
{
"container" : {
"id" : "0",
"metadata" : {
"filename" : "file1.pdf",
"page.count" : "165"
},
"textElement" : null,
"containers" : [ {
"id" : "1",
"title" : null,
"metadata" : {
"end.page" : 1,
"start.page" : 1
},
"textElement" : {
"text" : "Table of Contents",
"metadata" : { }
},
"containers" : [ ]
} ]
}
}
""";

final JsonSchemaInference schemaInference = new JsonSchemaInference(new TimeValueInference("MM/dd/yyyy", "HH:mm:ss", "MM/dd/yyyy HH:mm:ss"));
final JsonRecordSource jsonRecordSource = new JsonRecordSource(new ByteArrayInputStream(recordJson.getBytes(StandardCharsets.UTF_8)));
final RecordSchema schema = schemaInference.inferSchema(jsonRecordSource);

final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(new ByteArrayInputStream(recordJson.getBytes(StandardCharsets.UTF_8)), Mockito.mock(ComponentLog.class),
schema, "MM/dd/yyyy", "HH:mm:ss", "MM/dd/yyyy HH:mm:ss");
final Record record = reader.nextRecord();

final List<FieldValue> fieldValues = RecordPath.compile("//textElement[./text = 'Table of Contents']/metadata/insertion").evaluate(record).getSelectedFields().toList();
assertEquals(1, fieldValues.size());
fieldValues.getFirst().updateValue("Hello");
record.incorporateInactiveFields();

final Record container = (Record) record.getValue("container");
final Object[] containers = (Object[]) container.getValue("containers");
final Record textElement = (Record) (((Record) containers[0]).getValue("textElement"));
final Record metadata = (Record) textElement.getValue("metadata");
assertEquals("Hello", metadata.getValue("insertion"));

final List<RecordField> metadataFields = metadata.getSchema().getFields();
assertEquals(1, metadataFields.size());
assertEquals("insertion", metadataFields.getFirst().getFieldName());
}

private Record createAccountRecord(final int id, final double balance) {
final Map<String, Object> accountValues = new HashMap<>();
accountValues.put("id", 1);
accountValues.put("balance", 123.45D);
final Record accountRecord = new MapRecord(getAccountSchema(), accountValues);
accountValues.put("id", id);
accountValues.put("balance", balance);
return new MapRecord(getAccountSchema(), accountValues);
}

@Test
public void testParent() {
final Record accountRecord = createAccountRecord(1, 123.45D);

final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
final Map<String, Object> values = new HashMap<>();
Expand Down Expand Up @@ -2234,9 +2299,24 @@ private List<RecordField> getDefaultFields() {
final DataType accountsType = RecordFieldType.ARRAY.getArrayDataType(accountDataType);
final RecordField accountsField = new RecordField("accounts", accountsType);
fields.add(accountsField);

final DataType bankType = RecordFieldType.CHOICE.getChoiceDataType(
RecordFieldType.STRING.getDataType(),
RecordFieldType.RECORD.getRecordDataType(getBankSchema())
);
final RecordField banksField = new RecordField("banks", RecordFieldType.ARRAY.getArrayDataType(bankType));
fields.add(banksField);

return fields;
}

private RecordSchema getBankSchema() {
final DataType accountDataType = RecordFieldType.RECORD.getRecordDataType(getAccountSchema());
final DataType accountsType = RecordFieldType.ARRAY.getArrayDataType(accountDataType);
final RecordSchema bankSchema = new SimpleRecordSchema(List.of(new RecordField("accounts", accountsType)));
return bankSchema;
}

private RecordSchema getAccountSchema() {
final List<RecordField> accountFields = new ArrayList<>();
accountFields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1884,9 +1884,43 @@ private static Optional<DataType> getWiderRecordType(final RecordDataType thisRe
return Optional.of(otherRecordDataType);
}

// Check if all fields in 'thisSchema' are equal to or wider than all fields in 'otherSchema'
if (isRecordWider(thisSchema, otherSchema)) {
return Optional.of(thisRecordDataType);
}
if (isRecordWider(otherSchema, thisSchema)) {
return Optional.of(otherRecordDataType);
}

return Optional.empty();
}

public static boolean isRecordWider(final RecordSchema potentiallyWider, final RecordSchema potentiallyNarrower) {
final List<RecordField> narrowerFields = potentiallyNarrower.getFields();

for (final RecordField narrowerField : narrowerFields) {
final Optional<RecordField> widerField = potentiallyWider.getField(narrowerField.getFieldName());
if (widerField.isEmpty()) {
return false;
}

if (widerField.get().getDataType().equals(narrowerField.getDataType())) {
continue;
}

final Optional<DataType> widerType = getWiderType(widerField.get().getDataType(), narrowerField.getDataType());
if (widerType.isEmpty()) {
return false;
}

if (!widerType.get().equals(widerField.get().getDataType())) {
return false;
}
}

return true;
}

private static boolean isDecimalType(final RecordFieldType fieldType) {
return switch (fieldType) {
case FLOAT, DOUBLE, DECIMAL -> true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,67 @@ public void testWiderRecordWhenAllFieldsContainedWithin() {
assertEquals(((RecordDataType) widerType.get()).getChildSchema(), widerRecord.getSchema());
}

@Test
public void testWiderRecordWhenChildRecordHasAllFieldsContainedWithin() {
final Record jane = DataTypeUtils.toRecord(Map.of(
"name", "Jane"
), "");

final Record smallRecord = DataTypeUtils.toRecord(Map.of(
"firstName", "John",
"lastName", "Doe",
"child", jane,
"age", 30), "");

final Record janeWithAge = DataTypeUtils.toRecord(Map.of(
"name", "Jane",
"age", 2
), "");

final Record widerRecord = DataTypeUtils.toRecord(Map.of(
"firstName", "John",
"lastName", "Doe",
"fullName", "John Doe",
"child", janeWithAge,
"age", 30), "");

final Optional<DataType> widerType = DataTypeUtils.getWiderType(RecordFieldType.RECORD.getRecordDataType(smallRecord.getSchema()),
RecordFieldType.RECORD.getRecordDataType(widerRecord.getSchema()));
assertTrue(widerType.isPresent());
assertEquals(((RecordDataType) widerType.get()).getChildSchema(), widerRecord.getSchema());
}

@Test
public void testIsRecordWiderWithExtraField() {
final Record jane = DataTypeUtils.toRecord(Map.of(
), "");

final Record smallRecord = DataTypeUtils.toRecord(Map.of(
"firstName", "John",
"lastName", "Doe",
"child", jane,
"age", 30), "");

final Record janeWithAge = DataTypeUtils.toRecord(Map.of(
"name", "Jane",
"age", 2
), "");

final Record widerRecord = DataTypeUtils.toRecord(Map.of(
"firstName", "John",
"lastName", "Doe",
"fullName", "John Doe",
"child", janeWithAge,
"age", 30), "");

assertFalse(DataTypeUtils.isRecordWider(smallRecord.getSchema(), widerRecord.getSchema()));
assertTrue(DataTypeUtils.isRecordWider(widerRecord.getSchema(), smallRecord.getSchema()));

assertFalse(DataTypeUtils.isRecordWider(jane.getSchema(), janeWithAge.getSchema()));
assertTrue(DataTypeUtils.isRecordWider(janeWithAge.getSchema(), jane.getSchema()));
}


@Test
public void testWiderRecordDifferingFields() {
final Record firstRecord = DataTypeUtils.toRecord(Map.of(
Expand Down
35 changes: 19 additions & 16 deletions nifi-docs/src/main/asciidoc/toolkit-guide.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ The following are available commands:
nifi pg-start
nifi pg-stop
nifi pg-create
nifi pg-export
nifi pg-get-version
nifi pg-stop-version-control
nifi pg-change-version
Expand All @@ -92,6 +91,7 @@ The following are available commands:
nifi pg-get-param-context
nifi pg-set-param-context
nifi pg-replace
nifi pg-export
nifi get-services
nifi get-service
nifi create-service
Expand All @@ -106,6 +106,12 @@ The following are available commands:
nifi export-reporting-tasks
nifi export-reporting-task
nifi import-reporting-tasks
nifi create-flow-analysis-rule
nifi get-flow-analysis-rules
nifi get-flow-analysis-rule
nifi enable-flow-analysis-rules
nifi disable-flow-analysis-rules
nifi delete-flow-analysis-rule
nifi list-users
nifi create-user
nifi list-user-groups
Expand Down Expand Up @@ -296,21 +302,18 @@ For example, typing tab at an empty prompt should display possible commands for
Typing "nifi " and then a tab will show the sub-commands for NiFi:

#> nifi
change-version-processor delete-reporting-task get-policy merge-param-context pg-replace update-user-group
cluster-summary disable-services get-reg-client-id offload-node pg-set-param-context
connect-node disconnect-node get-reporting-task pg-change-version pg-start
create-param-context enable-services get-reporting-tasks pg-connect pg-status
create-param-provider export-param-context get-root-id pg-create pg-stop
create-reg-client export-reporting-task get-service pg-create-service pg-stop-version-control
create-reporting-task export-reporting-tasks get-services pg-disable-services remove-inherited-param-contexts
create-service fetch-params import-param-context pg-enable-services set-inherited-param-contexts
create-user get-access-token import-reporting-tasks pg-export set-param
create-user-group get-access-token-spnego list-param-contexts pg-get-all-versions set-param-provider-property
current-user get-controller-configuration list-param-providers pg-get-param-context start-reporting-tasks
delete-node get-node list-reg-clients pg-get-services stop-reporting-tasks
delete-param get-nodes list-user-groups pg-get-version update-controller-configuration
delete-param-context get-param-context list-users pg-import update-policy
delete-param-provider get-param-provider logout-access-token pg-list update-reg-client
change-version-processor delete-flow-analysis-rule export-reporting-task get-policy list-user-groups pg-export pg-stop-version-control
cluster-summary delete-node export-reporting-tasks get-reg-client-id list-users pg-get-all-versions remove-inherited-param-contexts
connect-node delete-param fetch-params get-reporting-task logout-access-token pg-get-param-context set-inherited-param-contexts
create-flow-analysis-rule delete-param-context get-access-token get-reporting-tasks merge-param-context pg-get-services set-param
create-param-context delete-param-provider get-access-token-spnego get-root-id offload-node pg-get-version set-param-provider-property
create-param-provider delete-reporting-task get-controller-configuration get-service pg-change-all-versions pg-import start-reporting-tasks
create-reg-client disable-flow-analysis-rules get-flow-analysis-rule get-services pg-change-version pg-list stop-reporting-tasks
create-reporting-task disable-services get-flow-analysis-rules import-param-context pg-connect pg-replace update-controller-configuration
create-service disconnect-node get-node import-reporting-tasks pg-create pg-set-param-context update-policy
create-user enable-flow-analysis-rules get-nodes list-param-contexts pg-create-service pg-start update-reg-client
create-user-group enable-services get-param-context list-param-providers pg-disable-services pg-status update-user-group
current-user export-param-context get-param-provider list-reg-clients pg-enable-services pg-stop

Arguments that represent a path to a file, such as `-p` or when setting a properties file in the session, will auto-complete the path being typed:

Expand Down
2 changes: 1 addition & 1 deletion nifi-extension-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@
<dependency>
<groupId>org.eclipse.jdt</groupId>
<artifactId>ecj</artifactId>
<version>3.37.0</version>
<version>3.38.0</version>
<scope>provided</scope>
</dependency>
<!-- Jetty EE10 Glassfish JSTL and deps -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.6.6</version>
<version>3.6.7</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
Expand Down Expand Up @@ -191,7 +191,7 @@
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>3.6.6</version>
<version>3.6.7</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
2 changes: 1 addition & 1 deletion nifi-extension-bundles/nifi-azure-bundle/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
<properties>
<!-- when changing the Azure SDK version, also update msal4j to the version that is required by azure-identity -->
<azure.sdk.bom.version>1.2.24</azure.sdk.bom.version>
<msal4j.version>1.15.0</msal4j.version>
<msal4j.version>1.15.1</msal4j.version>
<qpid.proton.version>0.34.1</qpid.proton.version>
</properties>

Expand Down
2 changes: 1 addition & 1 deletion nifi-extension-bundles/nifi-box-bundle/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
<dependency>
<groupId>com.box</groupId>
<artifactId>box-java-sdk</artifactId>
<version>4.9.1</version>
<version>4.10.0</version>
</dependency>
</dependencies>
</dependencyManagement>
Expand Down
Loading

0 comments on commit 8a7e29c

Please sign in to comment.