From 3e774d85a0a28f361c01b7cca8932b86df929062 Mon Sep 17 00:00:00 2001 From: skarpenko Date: Wed, 10 Apr 2024 09:44:41 +0200 Subject: [PATCH] Replica.MASTER for read operations --- .../basic/AerospikeBasicBatchUpdater.java | 6 +++--- .../AerospikeBasicExpectedValueOperations.java | 13 ++++++++++++- .../aerospike/lock/AerospikeLockOperations.java | 15 +++++++++++++-- 3 files changed, 28 insertions(+), 6 deletions(-) diff --git a/aerospike-batch-updater/src/main/java/nosql/batch/update/aerospike/basic/AerospikeBasicBatchUpdater.java b/aerospike-batch-updater/src/main/java/nosql/batch/update/aerospike/basic/AerospikeBasicBatchUpdater.java index 945a619..5a830a3 100644 --- a/aerospike-batch-updater/src/main/java/nosql/batch/update/aerospike/basic/AerospikeBasicBatchUpdater.java +++ b/aerospike-batch-updater/src/main/java/nosql/batch/update/aerospike/basic/AerospikeBasicBatchUpdater.java @@ -39,11 +39,11 @@ public static AerospikeBasicUpdateOperations basicUpdateOperations( } public static AerospikeLockOperations> basicLockOperations( - IAerospikeClient reactorClient, + IAerospikeClient client, ExecutorService aerospikeExecutorService) { return new AerospikeLockOperations<>( - reactorClient, - new AerospikeBasicExpectedValueOperations(reactorClient), + client, + new AerospikeBasicExpectedValueOperations(client), aerospikeExecutorService); } diff --git a/aerospike-batch-updater/src/main/java/nosql/batch/update/aerospike/basic/AerospikeBasicExpectedValueOperations.java b/aerospike-batch-updater/src/main/java/nosql/batch/update/aerospike/basic/AerospikeBasicExpectedValueOperations.java index 469d3b2..5769df0 100644 --- a/aerospike-batch-updater/src/main/java/nosql/batch/update/aerospike/basic/AerospikeBasicExpectedValueOperations.java +++ b/aerospike-batch-updater/src/main/java/nosql/batch/update/aerospike/basic/AerospikeBasicExpectedValueOperations.java @@ -4,6 +4,8 @@ import com.aerospike.client.Bin; import com.aerospike.client.IAerospikeClient; import com.aerospike.client.Value; +import com.aerospike.client.policy.BatchPolicy; +import com.aerospike.client.policy.Replica; import nosql.batch.update.aerospike.lock.AerospikeExpectedValuesOperations; import nosql.batch.update.aerospike.lock.AerospikeLock; import nosql.batch.update.lock.Lock; @@ -16,9 +18,11 @@ public class AerospikeBasicExpectedValueOperations implements AerospikeExpectedValuesOperations> { private final IAerospikeClient client; + private final BatchPolicy checkValuesPolicy; public AerospikeBasicExpectedValueOperations(IAerospikeClient client) { this.client = client; + this.checkValuesPolicy = buildCheckValuesPolicy(client); } @Override @@ -41,7 +45,7 @@ public void checkExpectedValues(List locks, List expected expectedValuesToCheck.add(record); } - client.get(null, batchReads); + client.get(checkValuesPolicy, batchReads); for(int i = 0, n = expectedValuesToCheck.size(); i < n; i++){ checkValues(batchReads.get(i), expectedValuesToCheck.get(i)); } @@ -61,4 +65,11 @@ private void checkValues(BatchRead batchRead, Record expectedValues) throws Perm private boolean equals(Object actualValue, Value expectedValue) { return expectedValue.equals(Value.get(actualValue)); } + + private static BatchPolicy buildCheckValuesPolicy(IAerospikeClient aerospikeClient){ + BatchPolicy checkValuesPolicy = new BatchPolicy(aerospikeClient.getBatchPolicyDefault()); + checkValuesPolicy.replica = Replica.MASTER; + checkValuesPolicy.respondAllKeys = true; + return checkValuesPolicy; + } } diff --git a/aerospike-batch-updater/src/main/java/nosql/batch/update/aerospike/lock/AerospikeLockOperations.java b/aerospike-batch-updater/src/main/java/nosql/batch/update/aerospike/lock/AerospikeLockOperations.java index 0c67151..c171b0b 100644 --- a/aerospike-batch-updater/src/main/java/nosql/batch/update/aerospike/lock/AerospikeLockOperations.java +++ b/aerospike-batch-updater/src/main/java/nosql/batch/update/aerospike/lock/AerospikeLockOperations.java @@ -7,7 +7,9 @@ import com.aerospike.client.Record; import com.aerospike.client.ResultCode; import com.aerospike.client.Value; +import com.aerospike.client.policy.BatchPolicy; import com.aerospike.client.policy.RecordExistsAction; +import com.aerospike.client.policy.Replica; import com.aerospike.client.policy.WritePolicy; import nosql.batch.update.lock.LockOperations; import nosql.batch.update.lock.LockingException; @@ -36,6 +38,8 @@ public class AerospikeLockOperations, EV> private static final String BATCH_ID_BIN_NAME = "batch_id"; private final IAerospikeClient aerospikeClient; + + private final BatchPolicy readLocksPolicy; private final WritePolicy putLockPolicy; private final WritePolicy deleteLockPolicy; private final AerospikeExpectedValuesOperations expectedValuesOperations; @@ -44,13 +48,20 @@ public class AerospikeLockOperations, EV> public AerospikeLockOperations(IAerospikeClient aerospikeClient, AerospikeExpectedValuesOperations expectedValuesOperations, ExecutorService aerospikeExecutor) { - this.putLockPolicy = configurePutLockPolicy(aerospikeClient.getWritePolicyDefault()); this.aerospikeClient = aerospikeClient; + this.putLockPolicy = configurePutLockPolicy(aerospikeClient.getWritePolicyDefault()); + this.readLocksPolicy = configureGetLocksPolicy(aerospikeClient.getBatchPolicyDefault()); this.aerospikeExecutor = aerospikeExecutor; this.deleteLockPolicy = putLockPolicy; this.expectedValuesOperations = expectedValuesOperations; } + private BatchPolicy configureGetLocksPolicy(BatchPolicy batchPolicyDefault) { + BatchPolicy batchPolicy = new BatchPolicy(batchPolicyDefault); + batchPolicy.replica = Replica.MASTER; + return batchPolicy; + } + private WritePolicy configurePutLockPolicy(WritePolicy writePolicyDefault){ WritePolicy writePolicy = new WritePolicy(writePolicyDefault); writePolicy.recordExistsAction = RecordExistsAction.CREATE_ONLY; @@ -186,7 +197,7 @@ public List getLockedByBatchUpdate(LOCKS aerospikeBatchLocks, Val List keys = aerospikeBatchLocks.keysToLock(); Key[] keysArray = keys.toArray(new Key[0]); - Record[] records = aerospikeClient.get(null, keysArray); + Record[] records = aerospikeClient.get(readLocksPolicy, keysArray); List keysFiltered = new ArrayList<>(keys.size()); for(int i = 0, m = keysArray.length; i < m; i++){