Skip to content

Commit

Permalink
NIFI-13199 Update ValidateRecord to avoid writing to FlowFiles that w…
Browse files Browse the repository at this point in the history
…ill be auto-terminated

Signed-off-by: Matt Burgess <mattyb149@apache.org>

This closes apache#8942
  • Loading branch information
jrsteinebrey authored and mattyb149 committed Jun 17, 2024
1 parent bc95799 commit dda4282
Showing 1 changed file with 53 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -324,64 +324,64 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
final SchemaValidationResult result = validator.validate(record);
recordCount++;

RecordSetWriter writer;
if (result.isValid()) {
validCount++;
if (validFlowFile == null) {
validFlowFile = session.create(flowFile);
}

validWriter = writer = createIfNecessary(validWriter, validRecordWriterFactory, session, validFlowFile, validationSchema);
validWriter = createIfNecessary(validWriter, validRecordWriterFactory, session, validFlowFile, validationSchema);

writeRecord(validWriter, record);
} else {
invalidCount++;
logValidationErrors(flowFile, recordCount, result);

if (invalidFlowFile == null) {
invalidFlowFile = session.create(flowFile);
}

invalidWriter = writer = createIfNecessary(invalidWriter, invalidRecordWriterFactory, session, invalidFlowFile, record.getSchema());

// Add all of the validation errors to our Set<ValidationError> but only keep up to MAX_VALIDATION_ERRORS because if
// we keep too many then we both use up a lot of heap and risk outputting so much information in the Provenance Event
// that it is too noisy to be useful.
for (final ValidationError validationError : result.getValidationErrors()) {
final Optional<String> fieldName = validationError.getFieldName();
if (!context.isAutoTerminated(REL_INVALID)) {
// If REL_INVALID is not autoTerminated, then create a flow file and calculate the invalid details.
// If it is autoTerminated, then skip doing work which will just be discarded.
if (invalidFlowFile == null) {
invalidFlowFile = session.create(flowFile);
}

switch (validationError.getType()) {
case EXTRA_FIELD:
if (fieldName.isPresent()) {
extraFields.add(fieldName.get());
} else {
invalidWriter = createIfNecessary(invalidWriter, invalidRecordWriterFactory, session, invalidFlowFile, record.getSchema());

// Add all of the validation errors to our Set<ValidationError> but only keep up to MAX_VALIDATION_ERRORS because if
// we keep too many then we both use up a lot of heap and risk outputting so much information in the Provenance Event
// that it is too noisy to be useful.
for (final ValidationError validationError : result.getValidationErrors()) {
final Optional<String> fieldName = validationError.getFieldName();

switch (validationError.getType()) {
case EXTRA_FIELD:
if (fieldName.isPresent()) {
extraFields.add(fieldName.get());
} else {
otherProblems.add(validationError.getExplanation());
}
break;
case MISSING_FIELD:
if (fieldName.isPresent()) {
missingFields.add(fieldName.get());
} else {
otherProblems.add(validationError.getExplanation());
}
break;
case INVALID_FIELD:
if (fieldName.isPresent()) {
invalidFields.add(fieldName.get());
} else {
otherProblems.add(validationError.getExplanation());
}
break;
case OTHER:
otherProblems.add(validationError.getExplanation());
}
break;
case MISSING_FIELD:
if (fieldName.isPresent()) {
missingFields.add(fieldName.get());
} else {
otherProblems.add(validationError.getExplanation());
}
break;
case INVALID_FIELD:
if (fieldName.isPresent()) {
invalidFields.add(fieldName.get());
} else {
otherProblems.add(validationError.getExplanation());
}
break;
case OTHER:
otherProblems.add(validationError.getExplanation());
break;
break;
}
}
}
}

if (writer instanceof RawRecordWriter) {
((RawRecordWriter) writer).writeRawRecord(record);
} else {
writer.write(record);
writeRecord(invalidWriter, record);
}
}
}

Expand Down Expand Up @@ -450,6 +450,16 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
session.remove(flowFile);
}

private void writeRecord(final RecordSetWriter writer, final Record record) throws IOException {
if (writer != null) {
if (writer instanceof RawRecordWriter) {
((RawRecordWriter) writer).writeRawRecord(record);
} else {
writer.write(record);
}
}
}

private void closeQuietly(final RecordSetWriter writer) {
if (writer != null) {
try {
Expand Down

0 comments on commit dda4282

Please sign in to comment.