Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIFI-13808: Record attributes and content for clone, upload and send. #9486

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1007,8 +1007,9 @@ public ProvenanceEventRecord next() {
// the representation of the FlowFile as it is committed, as this is the only way in which it really
// exists in our system -- all other representations are volatile representations that have not been
// exposed.
final boolean isUpdateAttributes = rawEvent.getEventType() != ProvenanceEventType.SEND && rawEvent.getEventType() != ProvenanceEventType.UPLOAD;
return enrich(rawEvent, flowFileRecordMap, checkpoint.records, isUpdateAttributes, commitNanos);
final boolean isUpdateAttributesAndContent = rawEvent.getEventType() != ProvenanceEventType.SEND && rawEvent.getEventType() != ProvenanceEventType.UPLOAD
&& rawEvent.getEventType() != ProvenanceEventType.CLONE;
return enrich(rawEvent, flowFileRecordMap, checkpoint.records, isUpdateAttributesAndContent, commitNanos);
} else if (autoTermIterator != null && autoTermIterator.hasNext()) {
return enrich(autoTermIterator.next(), flowFileRecordMap, checkpoint.records, true, commitNanos);
}
Expand Down Expand Up @@ -1085,13 +1086,13 @@ public ProvenanceEventRecord enrich(final ProvenanceEventRecord rawEvent, final

private ProvenanceEventRecord enrich(
final ProvenanceEventRecord rawEvent, final Map<String, FlowFileRecord> flowFileRecordMap, final Map<Long, StandardRepositoryRecord> records,
final boolean updateAttributes, final long commitNanos) {
final boolean updateAttributesAndContent, final long commitNanos) {
final ProvenanceEventBuilder recordBuilder = context.createProvenanceEventBuilder().fromEvent(rawEvent);
final FlowFileRecord eventFlowFile = flowFileRecordMap.get(rawEvent.getFlowFileUuid());
if (eventFlowFile != null) {
final StandardRepositoryRecord repoRecord = records.get(eventFlowFile.getId());

if (repoRecord.getCurrent() != null && repoRecord.getCurrentClaim() != null) {
if (updateAttributesAndContent && repoRecord.getCurrent() != null && repoRecord.getCurrentClaim() != null) {
final ContentClaim currentClaim = repoRecord.getCurrentClaim();
final long currentOffset = repoRecord.getCurrentClaimOffset();
final long size = eventFlowFile.getSize();
Expand All @@ -1100,7 +1101,7 @@ private ProvenanceEventRecord enrich(
recordBuilder.setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), currentOffset + currentClaim.getOffset(), size);
}

if (repoRecord.getOriginal() != null && repoRecord.getOriginalClaim() != null) {
if (updateAttributesAndContent && repoRecord.getOriginal() != null && repoRecord.getOriginalClaim() != null) {
final ContentClaim originalClaim = repoRecord.getOriginalClaim();
final long originalOffset = repoRecord.getOriginal().getContentClaimOffset();
final long originalSize = repoRecord.getOriginal().getSize();
Expand All @@ -1114,7 +1115,7 @@ private ProvenanceEventRecord enrich(
recordBuilder.setSourceQueueIdentifier(originalQueue.getIdentifier());
}

if (updateAttributes) {
if (updateAttributesAndContent) {
recordBuilder.setAttributes(repoRecord.getOriginalAttributes(), repoRecord.getUpdatedAttributes());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,10 @@ public void clone(final FlowFile parent, final FlowFile child, final boolean ver
final ProvenanceEventBuilder eventBuilder = build(parent, ProvenanceEventType.CLONE);
eventBuilder.addChildFlowFile(child);
eventBuilder.addParentFlowFile(parent);
events.add(eventBuilder.build());
final ProvenanceEventRecord eventRecord = eventBuilder.build();
final ProvenanceEventRecord enriched = eventEnricher == null ? eventRecord : eventEnricher.enrich(eventRecord, parent, System.nanoTime());

events.add(enriched);
} catch (final Exception e) {
logger.error("Failed to generate Provenance Event", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

class StandardProvenanceReporterTest {

Expand All @@ -46,4 +51,19 @@ public void testDetailsRetainedWithDelegate() {
assertNotNull(record);
assertEquals("These are details", record.getDetails());
}
}

@Test
public void testEnrichEvents() {
final ProvenanceEventRepository mockRepo = Mockito.mock(ProvenanceEventRepository.class);
final ProvenanceEventEnricher enricher = Mockito.mock(ProvenanceEventEnricher.class);
final StandardProvenanceReporter reporter = new StandardProvenanceReporter(null, "1234", "TestProc", mockRepo, enricher);
Mockito.when(mockRepo.eventBuilder()).thenReturn(new StandardProvenanceEventRecord.Builder());

final FlowFile flowFile = new StandardFlowFileRecord.Builder().id(10L).addAttribute("uuid", "10").build();
final FlowFile childFlowFile = new StandardFlowFileRecord.Builder().id(11L).addAttribute("uuid", "11").build();
reporter.send(flowFile, "test://noop");
reporter.upload(flowFile, 0, "test://noop");
reporter.clone(flowFile, childFlowFile);
verify(enricher, times(3)).enrich(any(), eq(flowFile), anyLong());
}
}