diff --git a/aerospike-batch-updater/src/main/java/nosql/batch/update/aerospike/lock/AerospikeLockOperations.java b/aerospike-batch-updater/src/main/java/nosql/batch/update/aerospike/lock/AerospikeLockOperations.java index c171b0b..31847a1 100644 --- a/aerospike-batch-updater/src/main/java/nosql/batch/update/aerospike/lock/AerospikeLockOperations.java +++ b/aerospike-batch-updater/src/main/java/nosql/batch/update/aerospike/lock/AerospikeLockOperations.java @@ -139,9 +139,9 @@ static List processResults ( return locks; } - private AerospikeLock putLock(Value batchId, Key lockKey, boolean checkBatchId) throws TemporaryLockingException{ + AerospikeLock putLock(Value batchId, Key lockKey, boolean checkBatchId) throws TemporaryLockingException{ try { - aerospikeClient.add(putLockPolicy, lockKey, new Bin(BATCH_ID_BIN_NAME, batchId)); + aerospikeClient.put(putLockPolicy, lockKey, new Bin(BATCH_ID_BIN_NAME, batchId)); logger.trace("acquired lock key=[{}], batchId=[{}]", lockKey, batchId); return new AerospikeLock(LOCKED, lockKey); } catch (AerospikeException ae) { @@ -180,7 +180,7 @@ protected void checkExpectedValues(LOCKS batchLocks, List keysLoc expectedValuesOperations.checkExpectedValues(keysLocked, batchLocks.expectedValues()); } - private Value getBatchIdOfLock(Key lockKey){ + Value getBatchIdOfLock(Key lockKey){ Record record = aerospikeClient.get(null, lockKey); return getBatchId(record); } diff --git a/aerospike-batch-updater/src/test/java/nosql/batch/update/aerospike/lock/AerospikeLockOperationsTest.java b/aerospike-batch-updater/src/test/java/nosql/batch/update/aerospike/lock/AerospikeLockOperationsTest.java index 41c6a9d..928d5b7 100644 --- a/aerospike-batch-updater/src/test/java/nosql/batch/update/aerospike/lock/AerospikeLockOperationsTest.java +++ b/aerospike-batch-updater/src/test/java/nosql/batch/update/aerospike/lock/AerospikeLockOperationsTest.java @@ -1,23 +1,89 @@ package nosql.batch.update.aerospike.lock; +import com.aerospike.client.AerospikeClient; import com.aerospike.client.Key; +import com.aerospike.client.Value; +import nosql.batch.update.aerospike.basic.AerospikeBasicExpectedValueOperations; import nosql.batch.update.aerospike.lock.AerospikeLockOperations.LockResult; import nosql.batch.update.lock.Lock; -import nosql.batch.update.lock.PermanentLockingException; import nosql.batch.update.lock.TemporaryLockingException; import org.junit.Test; +import org.testcontainers.containers.GenericContainer; import java.net.SocketTimeoutException; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; import static java.util.concurrent.CompletableFuture.completedFuture; +import static nosql.batch.update.aerospike.AerospikeTestUtils.AEROSPIKE_PROPERTIES; +import static nosql.batch.update.aerospike.AerospikeTestUtils.getAerospikeClient; +import static nosql.batch.update.aerospike.AerospikeTestUtils.getAerospikeContainer; +import static nosql.batch.update.aerospike.wal.AerospikeWriteAheadLogManager.generateBatchId; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; public class AerospikeLockOperationsTest { + static final GenericContainer aerospike = getAerospikeContainer(); + + static final AerospikeClient client = getAerospikeClient(aerospike); + + static final AerospikeLockOperations aerospikeLockOperations = new AerospikeLockOperations( + client, new AerospikeBasicExpectedValueOperations(client), Executors.newFixedThreadPool(2)); + + private static int keyIncremental = 0; + + @Test + public void shouldLockKey(){ + Key key = new Key(AEROSPIKE_PROPERTIES.getNamespace(), "lock", Integer.toString(keyIncremental++)); + Value batchId = generateBatchId(); + AerospikeLock lock = aerospikeLockOperations.putLock(batchId, key, true); + assertThat(lock.key).isEqualTo(key); + + Value batchIdOfLock = aerospikeLockOperations.getBatchIdOfLock(key); + assertThat(batchIdOfLock).isEqualTo(batchId); + assertThat(batchIdOfLock.toString()).isEqualTo(batchId.toString()); + } + + @Test + public void shouldFailIfLockTheSameKey(){ + Key key = new Key(AEROSPIKE_PROPERTIES.getNamespace(), "lock", Integer.toString(keyIncremental++)); + Value batchId = generateBatchId(); + AerospikeLock lock = aerospikeLockOperations.putLock(batchId, key, false); + assertThat(lock.key).isEqualTo(key); + + Value batchId2 = generateBatchId(); + assertThatThrownBy(() -> aerospikeLockOperations.putLock(batchId2, key, false)) + .hasMessageContaining("Locked by concurrent update") + .hasMessageContaining(batchId.toString()) + .hasMessageContaining(batchId2.toString()); + + Value batchIdOfLock = aerospikeLockOperations.getBatchIdOfLock(key); + assertThat(batchIdOfLock).isEqualTo(batchId); + assertThat(batchIdOfLock.toString()).isEqualTo(batchId.toString()); + } + + @Test + public void shouldFailIfLockTheSameKeyInWal(){ + Key key = new Key(AEROSPIKE_PROPERTIES.getNamespace(), "lock", Integer.toString(keyIncremental++)); + Value batchId = generateBatchId(); + AerospikeLock lock = aerospikeLockOperations.putLock(batchId, key, true); + assertThat(lock.key).isEqualTo(key); + + Value batchId2 = generateBatchId(); + assertThatThrownBy(() -> aerospikeLockOperations.putLock(batchId2, key, true)) + .hasMessageContaining("Locked by other batch update") + .hasMessageContaining(batchId.toString()) + .hasMessageContaining(batchId2.toString()); + + Value batchIdOfLock = aerospikeLockOperations.getBatchIdOfLock(key); + assertThat(batchIdOfLock).isEqualTo(batchId); + assertThat(batchIdOfLock.toString()).isEqualTo(batchId.toString()); + } + @Test public void shouldSuccess(){