Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Relax Type and Dedupe frequency #28758

Merged
merged 11 commits into from
Aug 1, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,21 @@
*/
public class TypeAndDedupeOperationValve extends ConcurrentHashMap<AirbyteStreamNameNamespacePair, Long> {

private static final long TWO_MINUTES_MILLIS = 1000 * 60 * 2;

private static final long FIVE_MINUTES_MILLIS = 1000 * 60 * 5;

private static final long TEN_MINUTES_MILLIS = 1000 * 60 * 10;

// 15 minutes is the maximum amount of time allowed between checkpoints as defined by
// The Airbyte Protocol
private static final long ONE_MINUTES_MILLIS = 1000 * 60 * 1;
private static final long FIFTEEN_MINUTES_MILLIS = 1000 * 60 * 15;
private static final long ONE_HOUR_MILLIS = 1000 * 60 * 60 * 1;
private static final long TWO_HOURS_MILLIS = 1000 * 60 * 60 * 2;
private static final long FOUR_HOURS_MILLIS = 1000 * 60 * 60 * 4;

// New users of airbyte likely want to see data flowing into their tables as soon as possible
// New users of airbyte likely want to see data flowing into their tables as soon as possible, and
// we want to catch new errors which might appear early within an incremental sync.
// However, as their destination tables grow in size, typing and de-duping data becomes an expensive
// operation
// operation.
// To strike a balance between showing data quickly and not slowing down the entire sync, we use an
// increasing
// interval based approach. This is not fancy, just hard coded intervals.
private static final List<Long> typeAndDedupeIncreasingIntervals = List.of(
TWO_MINUTES_MILLIS,
FIVE_MINUTES_MILLIS,
TEN_MINUTES_MILLIS,
FIFTEEN_MINUTES_MILLIS);
// increasing interval based approach, from 0 up to 4 hours.
// This is not fancy, just hard coded intervals.
private static final List<Long> typeAndDedupeIncreasingIntervals =
List.of(ONE_MINUTES_MILLIS, FIFTEEN_MINUTES_MILLIS, ONE_HOUR_MILLIS, TWO_HOURS_MILLIS, FOUR_HOURS_MILLIS);

private static final Supplier<Long> SYSTEM_NOW = () -> System.currentTimeMillis();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ private void elapseTime(Supplier<Long> timing, int iterations) {
public void testAddStream() {
final var valve = new TypeAndDedupeOperationValve(ALWAYS_ZERO);
valve.addStream(STREAM_A);
Assertions.assertEquals(1000 * 60 * 2, valve.getIncrementInterval(STREAM_A));
Assertions.assertEquals(1000 * 60 * 1, valve.getIncrementInterval(STREAM_A));
Assertions.assertFalse(valve.readyToTypeAndDedupe(STREAM_A));
Assertions.assertEquals(valve.get(STREAM_A), 0l);
}
Expand All @@ -54,46 +54,48 @@ public void testReadyToTypeAndDedupe() {
elapseTime(minuteUpdates, 1);
Assertions.assertTrue(valve.readyToTypeAndDedupe(STREAM_B));
valve.updateTimeAndIncreaseInterval(STREAM_A);
Assertions.assertEquals(1000 * 60 * 5,
Assertions.assertEquals(1000 * 60 * 15,
valve.getIncrementInterval(STREAM_A));
// method call increments time
Assertions.assertFalse(valve.readyToTypeAndDedupe(STREAM_A));
elapseTime(minuteUpdates, 5);
// More than enough time has passed now
elapseTime(minuteUpdates, 15);
Assertions.assertTrue(valve.readyToTypeAndDedupe(STREAM_A));
}

@Test
public void testIncrementInterval() {
final var valve = new TypeAndDedupeOperationValve(ALWAYS_ZERO);
valve.addStream(STREAM_A);
IntStream.rangeClosed(1, 3).forEach(i -> {
IntStream.rangeClosed(1, 4).forEach(i -> {
final var index = valve.incrementInterval(STREAM_A);
Assertions.assertEquals(i, index);
});
Assertions.assertEquals(3, valve.incrementInterval(STREAM_A));
Assertions.assertEquals(4, valve.incrementInterval(STREAM_A));
// Twice to be sure
Assertions.assertEquals(3, valve.incrementInterval(STREAM_A));
Assertions.assertEquals(4, valve.incrementInterval(STREAM_A));
}
Comment on lines 66 to 77
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jbfbell I'll be honest... I don't really understand what this test is doing. But, I added one more increment of time, so the numbers all +1 seems right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah probably should add a comment explaining - its testing that as you increment the interval, its at that expected interval, (if you're at 1 minute, when you increment you're now at 15 minutes) but with a cap so if you increment when you're already at the last index, you stay there.

But yes your change is correct


@Test
public void testUpdateTimeAndIncreaseInterval() {
final var valve = new TypeAndDedupeOperationValve(minuteUpdates);
valve.addStream(STREAM_A);
// 2 minutes
IntStream.range(0, 2).forEach(__ -> Assertions.assertFalse(valve.readyToTypeAndDedupe(STREAM_A)));
IntStream.range(0, 1).forEach(__ -> Assertions.assertFalse(valve.readyToTypeAndDedupe(STREAM_A)));
Assertions.assertTrue(valve.readyToTypeAndDedupe(STREAM_A));
valve.updateTimeAndIncreaseInterval(STREAM_A);
IntStream.range(0, 5).forEach(__ -> Assertions.assertFalse(valve.readyToTypeAndDedupe(STREAM_A)));
IntStream.range(0, 15).forEach(__ -> Assertions.assertFalse(valve.readyToTypeAndDedupe(STREAM_A)));
Assertions.assertTrue(valve.readyToTypeAndDedupe(STREAM_A));
valve.updateTimeAndIncreaseInterval(STREAM_A);
IntStream.range(0, 10).forEach(__ -> Assertions.assertFalse(valve.readyToTypeAndDedupe(STREAM_A)));
IntStream.range(0, 60).forEach(__ -> Assertions.assertFalse(valve.readyToTypeAndDedupe(STREAM_A)));
Assertions.assertTrue(valve.readyToTypeAndDedupe(STREAM_A));
valve.updateTimeAndIncreaseInterval(STREAM_A);
IntStream.range(0, 15).forEach(__ -> Assertions.assertFalse(valve.readyToTypeAndDedupe(STREAM_A)));
IntStream.range(0, 120).forEach(__ -> Assertions.assertFalse(valve.readyToTypeAndDedupe(STREAM_A)));
Assertions.assertTrue(valve.readyToTypeAndDedupe(STREAM_A));
valve.updateTimeAndIncreaseInterval(STREAM_A);
IntStream.range(0, 15).forEach(__ -> Assertions.assertFalse(valve.readyToTypeAndDedupe(STREAM_A)));
IntStream.range(0, 240).forEach(__ -> Assertions.assertFalse(valve.readyToTypeAndDedupe(STREAM_A)));
Assertions.assertTrue(valve.readyToTypeAndDedupe(STREAM_A));
valve.updateTimeAndIncreaseInterval(STREAM_A);
IntStream.range(0, 240).forEach(__ -> Assertions.assertFalse(valve.readyToTypeAndDedupe(STREAM_A)));
Assertions.assertTrue(valve.readyToTypeAndDedupe(STREAM_A));
}

Expand Down