Skip to content

Commit

Permalink
formatting and pmd
Browse files Browse the repository at this point in the history
  • Loading branch information
alovew committed Aug 22, 2022
1 parent 5acd354 commit 9b70d23
Showing 1 changed file with 33 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ class StateMetricsTrackerTest {
private StateMetricsTracker stateMetricsTracker;
private static final String STREAM_1 = "stream1";
private static final String STREAM_2 = "stream2";
private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
private static final String SECOND_ZERO = "2022-01-01 12:00:00";
private static final String SECOND_ONE = "2022-01-01 12:00:01";
private static final String SECOND_TWO = "2022-01-01 12:00:02";
private static final String SECOND_FIVE = "2022-01-01 12:00:05";

@BeforeEach
void setup() {
Expand All @@ -45,26 +50,25 @@ void testStreamMaxandMeanSecondsBetweenStateMessageEmittedandCommitted()
final AirbyteStateMessage s2s1 = AirbyteMessageUtils.createStreamStateMessage(STREAM_2, 1);
final AirbyteStateMessage s2s2 = AirbyteMessageUtils.createStreamStateMessage(STREAM_2, 2);

final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
stateMetricsTracker.addState(s1s1, 0, LocalDateTime.parse("2022-01-01 12:00:00", formatter)); // stream 1 state
stateMetricsTracker.addState(s1s2, 1, LocalDateTime.parse("2022-01-01 12:00:01", formatter)); // stream 1 state
stateMetricsTracker.addState(s2s1, 0, LocalDateTime.parse("2022-01-01 12:00:02", formatter)); // stream 2 state
stateMetricsTracker.addState(s1s3, 2, LocalDateTime.parse("2022-01-01 12:00:03", formatter)); // stream 1 state
stateMetricsTracker.addState(s1s1, 0, LocalDateTime.parse(SECOND_ZERO, FORMATTER)); // stream 1 state
stateMetricsTracker.addState(s1s2, 1, LocalDateTime.parse(SECOND_ONE, FORMATTER)); // stream 1 state
stateMetricsTracker.addState(s2s1, 0, LocalDateTime.parse(SECOND_TWO, FORMATTER)); // stream 2 state
stateMetricsTracker.addState(s1s3, 2, LocalDateTime.parse("2022-01-01 12:00:03", FORMATTER)); // stream 1 state

// Committed up to 2nd state message in stream 1 - time to commit is 5 seconds (second 00 to second
// 05)
stateMetricsTracker.incrementTotalDestinationEmittedStateMessages();
stateMetricsTracker.updateStates(s1s2, 1, LocalDateTime.parse("2022-01-01 12:00:05", formatter));
stateMetricsTracker.updateStates(s1s2, 1, LocalDateTime.parse(SECOND_FIVE, FORMATTER));

// Committed final state message for stream 1 - time to commit is 7 seconds (second 03 to second 10)
stateMetricsTracker.incrementTotalDestinationEmittedStateMessages();
stateMetricsTracker.updateStates(s1s3, 2, LocalDateTime.parse("2022-01-01 12:00:10", formatter));
stateMetricsTracker.updateStates(s1s3, 2, LocalDateTime.parse("2022-01-01 12:00:10", FORMATTER));

stateMetricsTracker.addState(s2s2, 2, LocalDateTime.parse("2022-01-01 12:00:11", formatter));
stateMetricsTracker.addState(s2s2, 2, LocalDateTime.parse("2022-01-01 12:00:11", FORMATTER));

// Commit final state message for stream 2 - time to commit is 12 seconds (second 14 - second 02)
stateMetricsTracker.incrementTotalDestinationEmittedStateMessages();
stateMetricsTracker.updateStates(s2s2, 2, LocalDateTime.parse("2022-01-01 12:00:14", formatter));
stateMetricsTracker.updateStates(s2s2, 2, LocalDateTime.parse("2022-01-01 12:00:14", FORMATTER));

// max time across both streams was 12, mean time across all streams was (5 + 7 + 12)/3 == 24/3 == 8
assertEquals(12L, stateMetricsTracker.getMaxSecondsBetweenStateMessageEmittedAndCommitted());
Expand All @@ -79,18 +83,17 @@ void testGlobalMaxandMeanSecondsBetweenStateMessageEmittedandCommitted()
final AirbyteMessage s3 = AirbyteMessageUtils.createGlobalStateMessage(3, STREAM_1);

// 3 global state messages emitted
final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
stateMetricsTracker.addState(s1.getState(), 0, LocalDateTime.parse("2022-01-01 12:00:00", formatter));
stateMetricsTracker.addState(s2.getState(), 1, LocalDateTime.parse("2022-01-01 12:00:01", formatter));
stateMetricsTracker.addState(s3.getState(), 2, LocalDateTime.parse("2022-01-01 12:00:02", formatter));
stateMetricsTracker.addState(s1.getState(), 0, LocalDateTime.parse(SECOND_ZERO, FORMATTER));
stateMetricsTracker.addState(s2.getState(), 1, LocalDateTime.parse(SECOND_ONE, FORMATTER));
stateMetricsTracker.addState(s3.getState(), 2, LocalDateTime.parse(SECOND_TWO, FORMATTER));

// Committed up to 2nd state message - time to commit is 5 seconds (second 00 to second 05)
stateMetricsTracker.incrementTotalDestinationEmittedStateMessages();
stateMetricsTracker.updateStates(s2.getState(), 1, LocalDateTime.parse("2022-01-01 12:00:05", formatter));
stateMetricsTracker.updateStates(s2.getState(), 1, LocalDateTime.parse(SECOND_FIVE, FORMATTER));

// Committed final state message - time to commit is 7 seconds (second 02 to second 09)
stateMetricsTracker.incrementTotalDestinationEmittedStateMessages();
stateMetricsTracker.updateStates(s3.getState(), 2, LocalDateTime.parse("2022-01-01 12:00:09", formatter));
stateMetricsTracker.updateStates(s3.getState(), 2, LocalDateTime.parse("2022-01-01 12:00:09", FORMATTER));

assertEquals(7L, stateMetricsTracker.getMaxSecondsBetweenStateMessageEmittedAndCommitted());
assertEquals(6L, stateMetricsTracker.getMeanSecondsBetweenStateMessageEmittedAndCommitted());
Expand All @@ -105,26 +108,33 @@ void testStateMetricsTrackerOomExceptionThrown() throws StateMetricsTrackerOomEx
final AirbyteMessage s3 = AirbyteMessageUtils.createGlobalStateMessage(3, STREAM_1);

// 3 global state messages emitted
final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
stateMetricsTrackerOom.addState(s1.getState(), 0, LocalDateTime.parse("2022-01-01 12:00:00", formatter));
stateMetricsTrackerOom.addState(s2.getState(), 1, LocalDateTime.parse("2022-01-01 12:00:01", formatter));
stateMetricsTrackerOom.addState(s1.getState(), 0, LocalDateTime.parse(SECOND_ZERO, FORMATTER));
stateMetricsTrackerOom.addState(s2.getState(), 1, LocalDateTime.parse(SECOND_ONE, FORMATTER));

assertThrows(StateMetricsTrackerOomException.class,
() -> stateMetricsTrackerOom.addState(s3.getState(), 2, LocalDateTime.parse("2022-01-01 12:00:02", formatter)));
() -> stateMetricsTrackerOom.addState(s3.getState(), 2, LocalDateTime.parse(SECOND_TWO, FORMATTER)));

}

@Test
void testStateMetricsTrackerNoStateMatchExceptionThrown() throws StateMetricsTrackerNoStateMatchException {
void testStateMetricsTrackerNoStateMatchExceptionThrown() throws StateMetricsTrackerNoStateMatchException, StateMetricsTrackerOomException {
final AirbyteMessage s1 = AirbyteMessageUtils.createGlobalStateMessage(1, STREAM_1);
final AirbyteMessage s2 = AirbyteMessageUtils.createGlobalStateMessage(2, STREAM_1);
final AirbyteMessage s3 = AirbyteMessageUtils.createGlobalStateMessage(3, STREAM_1);

// destination emits state message hash that cannot be found in the list of source state messages
// destination emits state message hash when there are no source state message hashes stored
stateMetricsTracker.incrementTotalDestinationEmittedStateMessages();
final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
assertThrows(StateMetricsTrackerNoStateMatchException.class,
() -> stateMetricsTracker.updateStates(s2.getState(), 4, LocalDateTime.parse("2022-01-01 12:00:05", formatter)));
() -> stateMetricsTracker.updateStates(s1.getState(), 4, LocalDateTime.parse(SECOND_FIVE, FORMATTER)));

stateMetricsTracker.addState(s1.getState(), 0, LocalDateTime.parse(SECOND_ZERO, FORMATTER));
stateMetricsTracker.addState(s2.getState(), 1, LocalDateTime.parse(SECOND_ONE, FORMATTER));
stateMetricsTracker.addState(s3.getState(), 2, LocalDateTime.parse(SECOND_TWO, FORMATTER));

// destination emits a state message hash that does not correspond to any source state message
// hashes
assertThrows(StateMetricsTrackerNoStateMatchException.class,
() -> stateMetricsTracker.updateStates(s3.getState(), 4, LocalDateTime.parse(SECOND_FIVE, FORMATTER)));
}

}

0 comments on commit 9b70d23

Please sign in to comment.