Skip to content

Commit

Permalink
Allow flushing collectors to only emit when complete.
Browse files Browse the repository at this point in the history
Without this option, a flushing collector (e.g. `EqualsFilter`) always
emits its value when being flushed, regardless of whether it's already
complete.

Only applies to a subset of all flushing collectors since most of them
are never "complete"; `All`, on the other hand, has this condition
already built-in.
  • Loading branch information
blackwinter committed Sep 14, 2020
1 parent 2528980 commit 8bf5bcb
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,16 @@
*/
public abstract class AbstractFlushingCollect extends AbstractCollect {

private boolean flushIncomplete = true;

public final void setFlushIncomplete(final boolean flushIncomplete) {
this.flushIncomplete = flushIncomplete;
}

@Override
public final void flush(final int recordCount, final int entityCount) {
if (isSameRecord(recordCount) && sameEntityConstraintSatisfied(entityCount)) {
if(isConditionMet()) {
if(isConditionMet() && (flushIncomplete || isComplete())) {
emit();
}
if (getReset()) {
Expand Down
6 changes: 6 additions & 0 deletions metamorph/src/main/resources/schemata/metamorph.xsd
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@
<attribute name="name" type="string" use="required" />
<attribute name="value" type="string" use="required" />

<attribute name="flushIncomplete" type="boolean" use="optional"
default="true" />
<attribute name="reset" type="boolean" use="optional"
default="false" />
<attribute name="sameEntity" type="boolean" use="optional"
Expand All @@ -301,6 +303,8 @@
<attribute name="name" type="string" use="required" />
<attribute name="value" type="string" use="required" />

<attribute name="flushIncomplete" type="boolean" use="optional"
default="true" />
<attribute name="reset" type="boolean" use="optional"
default="false" />
<attribute name="sameEntity" type="boolean" use="optional"
Expand Down Expand Up @@ -442,6 +446,8 @@
have an entity-name element otherwise an empty name is emitted</documentation>
</annotation>
</attribute>
<attribute name="flushIncomplete" type="boolean" use="optional"
default="true" />
<attribute name="reset" type="boolean" use="optional"
default="false" />
<attribute name="sameEntity" type="boolean" use="optional"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package org.metafacture.metamorph.collectors;

import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.verifyNoMoreInteractions;

import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -165,6 +166,38 @@ public void shouldEmitCurrentValueOnFlushEvent() {
ordered.verifyNoMoreInteractions();
}

@Test
public void shouldNotEmitCurrentValueOnFlushEventIfIncomplete() {
metamorph = InlineMorph.in(this)
.with("<rules>")
.with(" <combine name='combi' value='${one}${two}' flushWith='e' flushIncomplete='false' reset='true'>")
.with(" <data source='e.l' name='one' />")
.with(" <data source='e.m' name='two' />")
.with(" </combine>")
.with("</rules>")
.createConnectedTo(receiver);

metamorph.startRecord("1");
metamorph.startEntity("e");
metamorph.literal("l", "1");
metamorph.endEntity();
metamorph.startEntity("e");
metamorph.literal("l", "2");
metamorph.literal("m", "2");
metamorph.endEntity();
metamorph.startEntity("e");
metamorph.literal("l", "3");
metamorph.endEntity();
metamorph.endRecord();

final InOrder ordered = inOrder(receiver);
ordered.verify(receiver).startRecord("1");
ordered.verify(receiver).literal("combi", "22");
ordered.verify(receiver).endRecord();
ordered.verifyNoMoreInteractions();
verifyNoMoreInteractions(receiver);
}

@Test
public void shouldPostprocessCombinedValue() {
metamorph = InlineMorph.in(this)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verifyNoMoreInteractions;

import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -119,6 +120,40 @@ public void shouldEmitEnityOnFlushEvent() {
ordered.verifyNoMoreInteractions();
}

@Test
public void shouldNotEmitEnityOnFlushEventIfIncomplete() {
metamorph = InlineMorph.in(this)
.with("<rules>")
.with(" <entity name='entity' flushWith='record' flushIncomplete='false'>")
.with(" <data source='d1' name='l1' />")
.with(" <data source='d2' name='l2' />")
.with(" </entity>")
.with("</rules>")
.createConnectedTo(receiver);

metamorph.startRecord("1");
metamorph.literal("d1", "a");
metamorph.literal("d1", "b");
metamorph.literal("d2", "c");
metamorph.endRecord();
metamorph.startRecord("2");
metamorph.literal("d2", "c");
metamorph.endRecord();

final InOrder ordered = inOrder(receiver);
ordered.verify(receiver).startRecord("1");
ordered.verify(receiver).startEntity("entity");
ordered.verify(receiver).literal("l1", "a");
ordered.verify(receiver).literal("l1", "b");
ordered.verify(receiver).literal("l2", "c");
ordered.verify(receiver).endEntity();
ordered.verify(receiver).endRecord();
ordered.verify(receiver).startRecord("2");
ordered.verify(receiver).endRecord();
ordered.verifyNoMoreInteractions();
verifyNoMoreInteractions(receiver);
}

@Test
public void shouldEmitEntityOnEachFlushEvent() {
metamorph = InlineMorph.in(this)
Expand Down Expand Up @@ -488,7 +523,7 @@ public void shouldEmitEntityContentsAgainIfResetIsFalse() {
}

@Test
public void shouldNotEmitEntityContentsAgainIfResetIsFalse() {
public void shouldNotEmitEntityContentsAgainIfResetIsTrue() {
metamorph = InlineMorph.in(this)
.with("<rules>")
.with(" <entity name='entity' reset='true'>")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verifyNoMoreInteractions;

import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -233,4 +234,85 @@ public void shouldFireIfLiteralsInEntitiesAreReceivedThatAreNotListedInStatement
ordered.verifyNoMoreInteractions();
}

@Test
public void shouldFireOnFlush() {
metamorph = InlineMorph.in(this)
.with("<rules>")
.with(" <equalsFilter name='equalsFiltered' value='${one}' flushWith='field1.data2'>")
.with(" <data source='field1.data1' name='one' />")
.with(" <data source='field1.data2' name='two' />")
.with(" </equalsFilter>")
.with("</rules>")
.createConnectedTo(receiver);

metamorph.startRecord("1");
metamorph.startEntity("field1");
metamorph.literal("data1", "a");
metamorph.endEntity();
metamorph.endRecord();
metamorph.startRecord("2");
metamorph.startEntity("field1");
metamorph.literal("data2", "a");
metamorph.endEntity();
metamorph.endRecord();
metamorph.startRecord("3");
metamorph.startEntity("field1");
metamorph.literal("data1", "a");
metamorph.literal("data2", "a");
metamorph.endEntity();
metamorph.endRecord();

final InOrder ordered = inOrder(receiver);
ordered.verify(receiver).startRecord("1");
ordered.verify(receiver).endRecord();
ordered.verify(receiver).startRecord("2");
ordered.verify(receiver).literal("equalsFiltered", "");
ordered.verify(receiver).endRecord();
ordered.verify(receiver).startRecord("3");
ordered.verify(receiver).literal("equalsFiltered", "a");
ordered.verify(receiver).endRecord();
ordered.verifyNoMoreInteractions();
verifyNoMoreInteractions(receiver);
}

@Test
public void shouldNotFireOnFlushIfIncomplete() {
metamorph = InlineMorph.in(this)
.with("<rules>")
.with(" <equalsFilter name='equalsFiltered' value='${one}' flushWith='field1.data2' flushIncomplete='false'>")
.with(" <data source='field1.data1' name='one' />")
.with(" <data source='field1.data2' name='two' />")
.with(" </equalsFilter>")
.with("</rules>")
.createConnectedTo(receiver);

metamorph.startRecord("1");
metamorph.startEntity("field1");
metamorph.literal("data1", "a");
metamorph.endEntity();
metamorph.endRecord();
metamorph.startRecord("2");
metamorph.startEntity("field1");
metamorph.literal("data2", "a");
metamorph.endEntity();
metamorph.endRecord();
metamorph.startRecord("3");
metamorph.startEntity("field1");
metamorph.literal("data1", "a");
metamorph.literal("data2", "a");
metamorph.endEntity();
metamorph.endRecord();

final InOrder ordered = inOrder(receiver);
ordered.verify(receiver).startRecord("1");
ordered.verify(receiver).endRecord();
ordered.verify(receiver).startRecord("2");
ordered.verify(receiver).endRecord();
ordered.verify(receiver).startRecord("3");
ordered.verify(receiver).literal("equalsFiltered", "a");
ordered.verify(receiver).endRecord();
ordered.verifyNoMoreInteractions();
verifyNoMoreInteractions(receiver);
}

}

0 comments on commit 8bf5bcb

Please sign in to comment.