Skip to content

Commit

Permalink
Replica.MASTER for read operations
Browse files Browse the repository at this point in the history
  • Loading branch information
skarpenko committed Apr 10, 2024
1 parent 771a81c commit 3e774d8
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ public static AerospikeBasicUpdateOperations basicUpdateOperations(
}

public static AerospikeLockOperations<AerospikeBasicBatchLocks, List<Record>> basicLockOperations(
IAerospikeClient reactorClient,
IAerospikeClient client,
ExecutorService aerospikeExecutorService) {
return new AerospikeLockOperations<>(
reactorClient,
new AerospikeBasicExpectedValueOperations(reactorClient),
client,
new AerospikeBasicExpectedValueOperations(client),
aerospikeExecutorService);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import com.aerospike.client.Bin;
import com.aerospike.client.IAerospikeClient;
import com.aerospike.client.Value;
import com.aerospike.client.policy.BatchPolicy;
import com.aerospike.client.policy.Replica;
import nosql.batch.update.aerospike.lock.AerospikeExpectedValuesOperations;
import nosql.batch.update.aerospike.lock.AerospikeLock;
import nosql.batch.update.lock.Lock;
Expand All @@ -16,9 +18,11 @@
public class AerospikeBasicExpectedValueOperations implements AerospikeExpectedValuesOperations<List<Record>> {

private final IAerospikeClient client;
private final BatchPolicy checkValuesPolicy;

public AerospikeBasicExpectedValueOperations(IAerospikeClient client) {
this.client = client;
this.checkValuesPolicy = buildCheckValuesPolicy(client);
}

@Override
Expand All @@ -41,7 +45,7 @@ public void checkExpectedValues(List<AerospikeLock> locks, List<Record> expected
expectedValuesToCheck.add(record);
}

client.get(null, batchReads);
client.get(checkValuesPolicy, batchReads);
for(int i = 0, n = expectedValuesToCheck.size(); i < n; i++){
checkValues(batchReads.get(i), expectedValuesToCheck.get(i));
}
Expand All @@ -61,4 +65,11 @@ private void checkValues(BatchRead batchRead, Record expectedValues) throws Perm
private boolean equals(Object actualValue, Value expectedValue) {
return expectedValue.equals(Value.get(actualValue));
}

private static BatchPolicy buildCheckValuesPolicy(IAerospikeClient aerospikeClient){
BatchPolicy checkValuesPolicy = new BatchPolicy(aerospikeClient.getBatchPolicyDefault());
checkValuesPolicy.replica = Replica.MASTER;
checkValuesPolicy.respondAllKeys = true;
return checkValuesPolicy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
import com.aerospike.client.Record;
import com.aerospike.client.ResultCode;
import com.aerospike.client.Value;
import com.aerospike.client.policy.BatchPolicy;
import com.aerospike.client.policy.RecordExistsAction;
import com.aerospike.client.policy.Replica;
import com.aerospike.client.policy.WritePolicy;
import nosql.batch.update.lock.LockOperations;
import nosql.batch.update.lock.LockingException;
Expand Down Expand Up @@ -36,6 +38,8 @@ public class AerospikeLockOperations<LOCKS extends AerospikeBatchLocks<EV>, EV>
private static final String BATCH_ID_BIN_NAME = "batch_id";

private final IAerospikeClient aerospikeClient;

private final BatchPolicy readLocksPolicy;
private final WritePolicy putLockPolicy;
private final WritePolicy deleteLockPolicy;
private final AerospikeExpectedValuesOperations<EV> expectedValuesOperations;
Expand All @@ -44,13 +48,20 @@ public class AerospikeLockOperations<LOCKS extends AerospikeBatchLocks<EV>, EV>
public AerospikeLockOperations(IAerospikeClient aerospikeClient,
AerospikeExpectedValuesOperations<EV> expectedValuesOperations,
ExecutorService aerospikeExecutor) {
this.putLockPolicy = configurePutLockPolicy(aerospikeClient.getWritePolicyDefault());
this.aerospikeClient = aerospikeClient;
this.putLockPolicy = configurePutLockPolicy(aerospikeClient.getWritePolicyDefault());
this.readLocksPolicy = configureGetLocksPolicy(aerospikeClient.getBatchPolicyDefault());
this.aerospikeExecutor = aerospikeExecutor;
this.deleteLockPolicy = putLockPolicy;
this.expectedValuesOperations = expectedValuesOperations;
}

private BatchPolicy configureGetLocksPolicy(BatchPolicy batchPolicyDefault) {
BatchPolicy batchPolicy = new BatchPolicy(batchPolicyDefault);
batchPolicy.replica = Replica.MASTER;
return batchPolicy;
}

private WritePolicy configurePutLockPolicy(WritePolicy writePolicyDefault){
WritePolicy writePolicy = new WritePolicy(writePolicyDefault);
writePolicy.recordExistsAction = RecordExistsAction.CREATE_ONLY;
Expand Down Expand Up @@ -186,7 +197,7 @@ public List<AerospikeLock> getLockedByBatchUpdate(LOCKS aerospikeBatchLocks, Val
List<Key> keys = aerospikeBatchLocks.keysToLock();

Key[] keysArray = keys.toArray(new Key[0]);
Record[] records = aerospikeClient.get(null, keysArray);
Record[] records = aerospikeClient.get(readLocksPolicy, keysArray);

List<AerospikeLock> keysFiltered = new ArrayList<>(keys.size());
for(int i = 0, m = keysArray.length; i < m; i++){
Expand Down

0 comments on commit 3e774d8

Please sign in to comment.