From 6ce02ae181d9a5a4a514051eaf92a893b69763dd Mon Sep 17 00:00:00 2001 From: Sachin Sundar P S Date: Wed, 27 Oct 2021 02:57:42 -0700 Subject: [PATCH 1/3] Revert commit for e638c17. The change caused a regression for leases without owners. Added unit test. --- .../leases/dynamodb/DynamoDBLeaseTaker.java | 24 ++++++-- .../dynamodb/DynamoDBLeaseTakerTest.java | 59 ++++++++++++++++++- 2 files changed, 75 insertions(+), 8 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java index 06b2527b6..9c2a3d03f 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java @@ -543,16 +543,30 @@ private List chooseLeasesToSteal(Map leaseCounts, int ne * @return map of workerIdentifier to lease count */ private Map computeLeaseCounts(List expiredLeases) { + Map leaseCounts = new HashMap<>(); // The set will give much faster lookup than the original list, an // important optimization when the list is large Set expiredLeasesSet = new HashSet<>(expiredLeases); - Map leaseCounts = allLeases.values().stream() - .filter(lease -> !expiredLeasesSet.contains(lease)) - .collect(groupingBy(Lease::leaseOwner, summingInt(lease -> 1))); + // Compute the number of leases per worker by looking through allLeases and ignoring leases that have expired. + for (Lease lease : allLeases.values()) { + if (!expiredLeasesSet.contains(lease)) { + String leaseOwner = lease.leaseOwner(); + Integer oldCount = leaseCounts.get(leaseOwner); + if (oldCount == null) { + leaseCounts.put(leaseOwner, 1); + } else { + leaseCounts.put(leaseOwner, oldCount + 1); + } + } + } - // If I have no leases, I won't be represented in leaseCounts. Let's fix that. - leaseCounts.putIfAbsent(workerIdentifier, 0); + // If I have no leases, I wasn't represented in leaseCounts. Let's fix that. + Integer myCount = leaseCounts.get(workerIdentifier); + if (myCount == null) { + myCount = 0; + leaseCounts.put(workerIdentifier, myCount); + } return leaseCounts; } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerTest.java index 92dc28a2d..016ebea6b 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerTest.java @@ -14,8 +14,14 @@ */ package software.amazon.kinesis.leases.dynamodb; +import com.google.common.collect.ImmutableMap; + import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; import junit.framework.Assert; @@ -24,13 +30,39 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; +import software.amazon.kinesis.leases.Lease; +import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseTaker; +import software.amazon.kinesis.metrics.MetricsFactory; +import software.amazon.kinesis.metrics.NullMetricsScope; +import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; -/** - * - */ +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) public class DynamoDBLeaseTakerTest { + private static final String WORKER_IDENTIFIER = "foo"; + private static final long LEASE_DURATION_MILLIS = 1000L; + + private DynamoDBLeaseTaker dynamoDBLeaseTaker; + + @Mock + private LeaseRefresher leaseRefresher; + @Mock + private MetricsFactory metricsFactory; + @Mock + private Callable timeProvider; + + @Before + public void setup() { + this.dynamoDBLeaseTaker = new DynamoDBLeaseTaker(leaseRefresher, WORKER_IDENTIFIER, LEASE_DURATION_MILLIS, metricsFactory); + } + /** * @throws java.lang.Exception */ @@ -73,4 +105,25 @@ public final void testStringJoin() { Assert.assertEquals("foo, bar", DynamoDBLeaseTaker.stringJoin(strings, ", ")); } + + @Test + public void test_computeLeaseCounts() throws Exception { + Lease lease = new Lease(); + lease.checkpoint(new ExtendedSequenceNumber("checkpoint")); + lease.ownerSwitchesSinceCheckpoint(0L); + lease.leaseCounter(0L); + lease.leaseOwner(null); + lease.parentShardIds(Collections.singleton("parentShardId")); + lease.childShardIds(new HashSet<>()); + lease.leaseKey("1"); + final List leases = Collections.singletonList(lease); + + when(leaseRefresher.listLeases()).thenReturn(leases); + when(metricsFactory.createMetrics()).thenReturn(new NullMetricsScope()); + when(timeProvider.call()).thenReturn(1000L); + + Map actualOutput = dynamoDBLeaseTaker.takeLeases(timeProvider); + + assertEquals(ImmutableMap.of(), actualOutput); + } } From e66493da28f8d1c14138567f3637658b2cd09f8a Mon Sep 17 00:00:00 2001 From: Sachin Sundar P S Date: Wed, 27 Oct 2021 15:47:14 -0700 Subject: [PATCH 2/3] Added tests for expired leases. --- .../leases/dynamodb/DynamoDBLeaseTaker.java | 11 +++--- .../dynamodb/DynamoDBLeaseTakerTest.java | 35 ++++++++++++++++--- 2 files changed, 37 insertions(+), 9 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java index 9c2a3d03f..e7087e62e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java @@ -14,6 +14,8 @@ */ package software.amazon.kinesis.leases.dynamodb; +import com.google.common.annotations.VisibleForTesting; + import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -40,9 +42,6 @@ import software.amazon.kinesis.metrics.MetricsScope; import software.amazon.kinesis.metrics.MetricsUtil; -import static java.util.stream.Collectors.groupingBy; -import static java.util.stream.Collectors.summingInt; - /** * An implementation of {@link LeaseTaker} that uses DynamoDB via {@link LeaseRefresher}. */ @@ -63,13 +62,14 @@ public class DynamoDBLeaseTaker implements LeaseTaker { private final long leaseDurationNanos; private final MetricsFactory metricsFactory; - private final Map allLeases = new HashMap<>(); // TODO: Remove these defaults and use the defaults in the config private int maxLeasesForWorker = Integer.MAX_VALUE; private int maxLeasesToStealAtOneTime = 1; private long lastScanTimeNanos = 0L; + final Map allLeases = new HashMap<>(); + public DynamoDBLeaseTaker(LeaseRefresher leaseRefresher, String workerIdentifier, long leaseDurationMillis, final MetricsFactory metricsFactory) { this.leaseRefresher = leaseRefresher; @@ -542,7 +542,8 @@ private List chooseLeasesToSteal(Map leaseCounts, int ne * @param expiredLeases list of leases that are currently expired * @return map of workerIdentifier to lease count */ - private Map computeLeaseCounts(List expiredLeases) { + @VisibleForTesting + Map computeLeaseCounts(List expiredLeases) { Map leaseCounts = new HashMap<>(); // The set will give much faster lookup than the original list, an // important optimization when the list is large diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerTest.java index 016ebea6b..0dd9e489a 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerTest.java @@ -14,10 +14,12 @@ */ package software.amazon.kinesis.leases.dynamodb; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -105,10 +107,34 @@ public final void testStringJoin() { Assert.assertEquals("foo, bar", DynamoDBLeaseTaker.stringJoin(strings, ", ")); } + @Test + public void test_computeLeaseCounts_noExpiredLease() throws Exception { + final Lease lease = new Lease(); + lease.checkpoint(new ExtendedSequenceNumber("checkpoint")); + lease.ownerSwitchesSinceCheckpoint(0L); + lease.leaseCounter(0L); + lease.leaseOwner(null); + lease.parentShardIds(Collections.singleton("parentShardId")); + lease.childShardIds(new HashSet<>()); + lease.leaseKey("1"); + final List leases = Collections.singletonList(lease); + dynamoDBLeaseTaker.allLeases.put("1", lease); + + when(leaseRefresher.listLeases()).thenReturn(leases); + when(metricsFactory.createMetrics()).thenReturn(new NullMetricsScope()); + when(timeProvider.call()).thenReturn(1000L); + + final Map actualOutput = dynamoDBLeaseTaker.computeLeaseCounts(ImmutableList.of()); + + final Map expectedOutput = new HashMap<>(); + expectedOutput.put(null, 1); + expectedOutput.put("foo", 0); + assertEquals(expectedOutput, actualOutput); + } @Test - public void test_computeLeaseCounts() throws Exception { - Lease lease = new Lease(); + public void test_computeLeaseCounts_withExpiredLease() throws Exception { + final Lease lease = new Lease(); lease.checkpoint(new ExtendedSequenceNumber("checkpoint")); lease.ownerSwitchesSinceCheckpoint(0L); lease.leaseCounter(0L); @@ -117,13 +143,14 @@ public void test_computeLeaseCounts() throws Exception { lease.childShardIds(new HashSet<>()); lease.leaseKey("1"); final List leases = Collections.singletonList(lease); + dynamoDBLeaseTaker.allLeases.put("1", lease); when(leaseRefresher.listLeases()).thenReturn(leases); when(metricsFactory.createMetrics()).thenReturn(new NullMetricsScope()); when(timeProvider.call()).thenReturn(1000L); - Map actualOutput = dynamoDBLeaseTaker.takeLeases(timeProvider); + final Map actualOutput = dynamoDBLeaseTaker.computeLeaseCounts(leases); - assertEquals(ImmutableMap.of(), actualOutput); + assertEquals(ImmutableMap.of("foo", 0), actualOutput); } } From 72017417da01a82558905987f7a600ec36e98bf2 Mon Sep 17 00:00:00 2001 From: Sachin Sundar P S Date: Wed, 27 Oct 2021 16:39:12 -0700 Subject: [PATCH 3/3] Added more leases in tests. --- .log.swp | Bin 0 -> 32768 bytes .../dynamodb/DynamoDBLeaseTakerTest.java | 58 +++++++++++------- 2 files changed, 36 insertions(+), 22 deletions(-) create mode 100644 .log.swp diff --git a/.log.swp b/.log.swp new file mode 100644 index 0000000000000000000000000000000000000000..bdb60bb3666b44a9fe4b8aecb4b884dace9bfec5 GIT binary patch literal 32768 zcmeI)37FL683*v7c%ocVso)VVIcJ7lKm-jpJ zWfC&E=Y)}?&11twhP;{^#shEHMBB|a_L|$vFr3P0D(OUH9_{W`)Vpsy9_K7V<_2oW}1{xW-{0^3CK;$UlMq5Zi4*{ttL7 zcvtd|;fKQelm8vw8eU5N5nSV6MZO6hM*3OgACfO2{~LKN`CrMKq8;ID#s}~aw%?EZ zeYlq2CjSdOg7oK;zX$IQUqZeS-UI#?`3AU_?*Mc-E&sdZ$B?gwcR>Cz@^>ixZ1Qz* zt)B4F@_&MSHTfFyRpcAUSCW55{sMWE z{rrMze4mGF{Ej4FLFs#uKSw^C{8_k;-=~s4L!Kmmn)07X{uErxev ze;lsmf0g_(%6|j-qi}7%+sPlH{9Ei_zkYrL*Y??o{9*EA$(NIlBL6k{8RQR50Jk?en0to^84Ugzgx%`!w*LPKJbA0^>Z(IYw~;GO_9DM z`LEz|i7NjpCcm3}F!>^Q3#1=MeivNZPmKIdxXvG{$rr+pM*j22?|^rMKSX{zT*t>+ z^4s8DkbWEat#Cblw8K1I$FE!99g%)C`OWZ7@H5G8f_H{rO}+r$4gMJUjqtAU_sDO6 zYx$bvJVMKVJzV44oBTSsmhUw3YvCH7ndI}~y8ri*Ujx_q!y59d;aa{=$*-dAw>`)Y zgzn#!lzs&H6>u$IlKgT?e>wSOaBV-!$>+g)pnht}=fc~<4?;)R@?Q$q`A=u^OW<1m zlgWPxH<5o0`Ni=3#Vh&Fwd5DU+adi*@(bbGK0hG;1-v`bx57ZA`!|Qu46EG=bO zD*b+oj9r^#Ss=?8S-zBGiFZ7?RhF@`jF9CGIsV@yOIVgxvRokZwMMdR+ehkNmM*d^ zmO1`HSw_inm@JFroaJK)fcG1Nw?8#ndAscO_hCiZ$T7pmPBJ6o9V_cZ{>7~k4c{=~ zWMh$J)J{dJqSeuelP-6sMza>~u8837ib%qWI+hbT!M3u|Y^>6bMH8i!(QLf5CKa=? zVJ~foS(;8-rk%>9!n4A7qjmeE> zD=aq>OGF)ME?T!kJqogqq&5uU+QWeZ#LM-P03HQ)^}%$oiS7Clxa<~9oGv+*78F! z)RxjoE2;Tbp(yk#%~ry3%Aj#$MvfUV$sF%^8_Xo4uC%M9X{BOmX_^&gm+>bL4;^bp zV{R^*NYr#Sr9NWmOpRANu9cP06>{rg+F>hfPR`khxM@2(Gm}cUQgOK#s)1-0QfoMi z!^e~qR+@!?7FO+1NK5^e6xaBbe5(MWEmc@4Z?Lf9A-{H<{6H0Qt)v$|X;sm9+=|y} z+ZkKJRJ5}_r*p11BK>DerLAm;R9?QL6)Nt()(WI3UP=6sIX}$#jmldo&CCmES_b!; zoslX8)$5K3xO_Y0_e^xiqHxb}&k{c?ts=9F)>=pgBS!{$xjj#Yggt6aL#>3B@0Z`^ z=;k*ft&KXviX6~VJ3B$2l9;z*X9vjtxkts7ym$L5GpnLmbJA|V=tmQPxG*)tqyY=1v?wu#;)ShcfNV8z$A3vU|@9cy-OCEu=gZetJI z-EQ))TSdEkE;GG?edlejfhJdIe`Q4twY@!TdHMaV)B3d5^u6JRkd!HboscOy&_}*uSKCc?<{sp?yKD+pCN)68(BFaNT+(3MMx2H*o zEIHGVOeQC+q+?E&NxOIIA**ZTA1SIUh4ZGFGE_~r6SiAprpwJ5nXdRH`kKvl&QGqw z{(;WHdl);}Ie$u!&Q^q@nW&sjg!4y5{c?$aj4aHf(B-||NZO7!`wPotRYpzA?A5zh z51Hg8a>tm`M{xb$Zx_M!|N8wSxc=|=kKp>h z-!8td*Z=kYh;MrRU(f$%`qleYXMgvML3IDXdE}MkGvHm3elGdB@DO}C`8n_~d;|H} zaJ~QR5Znl+=Z9y(_5S$IUNm*BSDuaGhVxA+Lt({r5MJ=V<#& z$X&SJf3}L;fn&AtE_s&n-%NfcT=(xF+$g8<&A{bWjQpzuc^Zz@MmO>l<$oM`60YSR zN1lM|{mGNbr@(Prncs&<>b$hTX1bZuad{% z+I}{Y$H>1RpA6UeO-lq)>o*G5_;n)x8C>HRA^#~{`_~BaGvM03envhCuKPEY{B%lx z33(Y@+t2Ofr@^t>SW12>T+6?Ld?H-;|84S9DE(IQ32?bBD*tMZiqQCvhim)~CqJ3; zKbpK0j%qgsl8=LH`#OpICvY8KE%KA#8s8f7v2YCA#)af#;99@)$w$LA{)@>^glqYq zCm#jJYU54vA5;2|$VbApeKtlQw0=&2Ykb<0kD&Y`OkM)l`aPQbN0fhm@?+uJ{)dwHg=_pyCO-zQ^%o`Y1K0kaCGQQ#uwtA|-iy*- zN?r^{SdIDQMR2X(d&qmjvD$c+JVO3Dd6>4pi97_?`r8Wwh1PEmxQ-uf$-C3`i^#jt z_J@;qrSzwfcOg%ccZOrNF^l|YxSl^#6jbB$L%6p8!^vC2HGW0phmwyV{{i_V@>b*y`61*NlD8zkiM$2*GVm@+RcRksnC@6Y>MdE6DeUYyUcjd_TDM?`z2Sh3ojc zgnS>`{tEKOaBY8YlJ8CaF*%z3E^m7NzZaEnU+k#HzY$#PuMN2Y*Z-edO#aoE{ueZU zW5~aR>;6@e?|^Ik&LRH-j&5ySM*cbZUF6%zpCI2xzLxwm@(;E!<)cgQ~`zli+rBg#3@>E6M*rUQ51~d@K2DzeE0O@@?b~kvB&}()cbT?@YcFuJf}2$>L;eR+{?*8TGai8J z`QsJj_ro>*caYx)*Z4m|zLSCV%jzk+-q`Q_vj$S)&L zlFuWbMLw7Odh$!jmylmVzLNZxGZdyLdyKz%) zZqxIpMe_H}vW%6*vm<2*UO@zc2m}!bA`nC%h(Hj5AOb-If(Qf=2qF+f;D0m%Wy8je z8#`{2Jkl{)oih8$BeI3HvG6%L%%<$Tpsi%PX^3-kE7N{{OR)mdw7p8H06nW z-h+j6^4xXr`G5I`0s4Jf4x1LUGTuYnjei2h8D_Tt literal 0 HcmV?d00001 diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerTest.java index 0dd9e489a..193970f6f 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerTest.java @@ -18,12 +18,15 @@ import com.google.common.collect.ImmutableMap; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; +import java.util.function.Function; +import java.util.stream.Collectors; import junit.framework.Assert; @@ -109,16 +112,14 @@ public final void testStringJoin() { @Test public void test_computeLeaseCounts_noExpiredLease() throws Exception { - final Lease lease = new Lease(); - lease.checkpoint(new ExtendedSequenceNumber("checkpoint")); - lease.ownerSwitchesSinceCheckpoint(0L); - lease.leaseCounter(0L); - lease.leaseOwner(null); - lease.parentShardIds(Collections.singleton("parentShardId")); - lease.childShardIds(new HashSet<>()); - lease.leaseKey("1"); - final List leases = Collections.singletonList(lease); - dynamoDBLeaseTaker.allLeases.put("1", lease); + final List leases = new ImmutableList.Builder() + .add(createLease(null, "1")) + .add(createLease("foo", "2")) + .add(createLease("bar", "3")) + .add(createLease("baz", "4")) + .build(); + dynamoDBLeaseTaker.allLeases.putAll( + leases.stream().collect(Collectors.toMap(Lease::leaseKey, Function.identity()))); when(leaseRefresher.listLeases()).thenReturn(leases); when(metricsFactory.createMetrics()).thenReturn(new NullMetricsScope()); @@ -128,22 +129,21 @@ public void test_computeLeaseCounts_noExpiredLease() throws Exception { final Map expectedOutput = new HashMap<>(); expectedOutput.put(null, 1); - expectedOutput.put("foo", 0); + expectedOutput.put("foo", 1); + expectedOutput.put("bar", 1); + expectedOutput.put("baz", 1); assertEquals(expectedOutput, actualOutput); } @Test public void test_computeLeaseCounts_withExpiredLease() throws Exception { - final Lease lease = new Lease(); - lease.checkpoint(new ExtendedSequenceNumber("checkpoint")); - lease.ownerSwitchesSinceCheckpoint(0L); - lease.leaseCounter(0L); - lease.leaseOwner(null); - lease.parentShardIds(Collections.singleton("parentShardId")); - lease.childShardIds(new HashSet<>()); - lease.leaseKey("1"); - final List leases = Collections.singletonList(lease); - dynamoDBLeaseTaker.allLeases.put("1", lease); + final List leases = new ImmutableList.Builder() + .add(createLease("foo", "2")) + .add(createLease("bar", "3")) + .add(createLease("baz", "4")) + .build(); + dynamoDBLeaseTaker.allLeases.putAll( + leases.stream().collect(Collectors.toMap(Lease::leaseKey, Function.identity()))); when(leaseRefresher.listLeases()).thenReturn(leases); when(metricsFactory.createMetrics()).thenReturn(new NullMetricsScope()); @@ -151,6 +151,20 @@ public void test_computeLeaseCounts_withExpiredLease() throws Exception { final Map actualOutput = dynamoDBLeaseTaker.computeLeaseCounts(leases); - assertEquals(ImmutableMap.of("foo", 0), actualOutput); + final Map expectedOutput = new HashMap<>(); + expectedOutput.put("foo", 0); + assertEquals(expectedOutput, actualOutput); + } + + private Lease createLease(String leaseOwner, String leaseKey) { + final Lease lease = new Lease(); + lease.checkpoint(new ExtendedSequenceNumber("checkpoint")); + lease.ownerSwitchesSinceCheckpoint(0L); + lease.leaseCounter(0L); + lease.leaseOwner(leaseOwner); + lease.parentShardIds(Collections.singleton("parentShardId")); + lease.childShardIds(new HashSet<>()); + lease.leaseKey(leaseKey); + return lease; } }