Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix locking race between duplicate PartitionRecords w matching uid #27055

Merged
merged 1 commit into from
Jun 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -702,7 +702,15 @@ public boolean lockAndRecordPartition(PartitionRecord partitionRecord) {
tableId, convertPartitionToStreamPartitionRowKey(partitionRecord.getPartition()))
.condition(matchAnyString)
.otherwise(mutation);
return !dataClient.checkAndMutateRow(rowMutation);

boolean lockAcquired = !dataClient.checkAndMutateRow(rowMutation);
if (lockAcquired) {
return true;
} else {
// If the lock is already held we need to check if it was acquired by a duplicate
// work item with the same uuid since we last checked doHoldLock above.
return doHoldLock(partitionRecord.getPartition(), partitionRecord.getUuid());
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.when;

import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient;
import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings;
Expand Down Expand Up @@ -57,6 +58,8 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -141,7 +144,7 @@ public void testNewPartitionConversionWithWithIllegalUtf8()
}

@Test
public void testLockPartitionRace() throws InterruptedException {
public void testLockPartitionRaceUniqueIds() throws InterruptedException {
ByteStringRange partition = ByteStringRange.create("", "");
ByteString rowKey = metadataTableDao.convertPartitionToStreamPartitionRowKey(partition);
// Class to try to lock the partition in a separate thread.
Expand Down Expand Up @@ -223,6 +226,54 @@ public void run() {
dataClient.mutateRow(rowMutation);
}

@Test
public void testLockPartitionRaceDuplicateIds() throws InterruptedException {
ByteStringRange partition = ByteStringRange.create("", "");
String uid = "a";
MetadataTableDao spy = Mockito.spy(metadataTableDao);
// First call we sleep for ten seconds to ensure the duplicate acquires the
// lock before we return.
when(spy.doHoldLock(partition, uid))
.then(
(Answer<Boolean>)
invocation -> {
Thread.sleep(10000);
return false;
})
.thenCallRealMethod()
.thenCallRealMethod();

class LockPartition implements Runnable {
final PartitionRecord partitionRecord =
new PartitionRecord(
partition,
Collections.emptyList(),
uid,
Instant.now(),
Collections.emptyList(),
Instant.now().plus(Duration.standardMinutes(10)));
boolean locked = false;

@Override
public void run() {
locked = spy.lockAndRecordPartition(partitionRecord);
}
}

LockPartition dup1 = new LockPartition();
Thread dup1Thread = new Thread(dup1);
LockPartition dup2 = new LockPartition();
Thread dup2Thread = new Thread(dup2);

dup1Thread.start();
dup2Thread.start();
dup1Thread.join();
dup2Thread.join();

assertTrue(dup2.locked);
assertTrue(dup1.locked);
}

@Test
public void testReadStreamPartitionsWithWatermark() throws InvalidProtocolBufferException {
ByteStringRange lockedPartition = ByteStringRange.create("", "a");
Expand Down