From 11da6577bd9cbbf45b30754703d0836c756bd0ee Mon Sep 17 00:00:00 2001 From: Jack Dingilian Date: Tue, 30 May 2023 15:35:47 -0400 Subject: [PATCH] Fix locking race between duplicate PartitionRecords w matching uid --- .../changestreams/dao/MetadataTableDao.java | 10 +++- .../dao/MetadataTableDaoTest.java | 53 ++++++++++++++++++- 2 files changed, 61 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDao.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDao.java index 295184e1cc1e..d4938e6bef26 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDao.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDao.java @@ -702,7 +702,15 @@ public boolean lockAndRecordPartition(PartitionRecord partitionRecord) { tableId, convertPartitionToStreamPartitionRowKey(partitionRecord.getPartition())) .condition(matchAnyString) .otherwise(mutation); - return !dataClient.checkAndMutateRow(rowMutation); + + boolean lockAcquired = !dataClient.checkAndMutateRow(rowMutation); + if (lockAcquired) { + return true; + } else { + // If the lock is already held we need to check if it was acquired by a duplicate + // work item with the same uuid since we last checked doHoldLock above. + return doHoldLock(partitionRecord.getPartition(), partitionRecord.getUuid()); + } } /** diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDaoTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDaoTest.java index a98814d4f480..c82e3eac2999 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDaoTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDaoTest.java @@ -25,6 +25,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Mockito.when; import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient; import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings; @@ -57,6 +58,8 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.Mockito; +import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -141,7 +144,7 @@ public void testNewPartitionConversionWithWithIllegalUtf8() } @Test - public void testLockPartitionRace() throws InterruptedException { + public void testLockPartitionRaceUniqueIds() throws InterruptedException { ByteStringRange partition = ByteStringRange.create("", ""); ByteString rowKey = metadataTableDao.convertPartitionToStreamPartitionRowKey(partition); // Class to try to lock the partition in a separate thread. @@ -223,6 +226,54 @@ public void run() { dataClient.mutateRow(rowMutation); } + @Test + public void testLockPartitionRaceDuplicateIds() throws InterruptedException { + ByteStringRange partition = ByteStringRange.create("", ""); + String uid = "a"; + MetadataTableDao spy = Mockito.spy(metadataTableDao); + // First call we sleep for ten seconds to ensure the duplicate acquires the + // lock before we return. + when(spy.doHoldLock(partition, uid)) + .then( + (Answer) + invocation -> { + Thread.sleep(10000); + return false; + }) + .thenCallRealMethod() + .thenCallRealMethod(); + + class LockPartition implements Runnable { + final PartitionRecord partitionRecord = + new PartitionRecord( + partition, + Collections.emptyList(), + uid, + Instant.now(), + Collections.emptyList(), + Instant.now().plus(Duration.standardMinutes(10))); + boolean locked = false; + + @Override + public void run() { + locked = spy.lockAndRecordPartition(partitionRecord); + } + } + + LockPartition dup1 = new LockPartition(); + Thread dup1Thread = new Thread(dup1); + LockPartition dup2 = new LockPartition(); + Thread dup2Thread = new Thread(dup2); + + dup1Thread.start(); + dup2Thread.start(); + dup1Thread.join(); + dup2Thread.join(); + + assertTrue(dup2.locked); + assertTrue(dup1.locked); + } + @Test public void testReadStreamPartitionsWithWatermark() throws InvalidProtocolBufferException { ByteStringRange lockedPartition = ByteStringRange.create("", "a");