Skip to content

Commit

Permalink
Merge pull request #402 from aerospike/stage
Browse files Browse the repository at this point in the history
Java Client 9.0.4 for JDK21
  • Loading branch information
BrianNichols authored Feb 5, 2025
2 parents 2d70771 + 5f3a506 commit 88fac26
Show file tree
Hide file tree
Showing 13 changed files with 70 additions and 81 deletions.
2 changes: 1 addition & 1 deletion benchmarks/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-parent</artifactId>
<version>9.0.3</version>
<version>9.0.4</version>
</parent>
<artifactId>aerospike-benchmarks</artifactId>
<packaging>jar</packaging>
Expand Down
2 changes: 1 addition & 1 deletion client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-parent</artifactId>
<version>9.0.3</version>
<version>9.0.4</version>
</parent>
<artifactId>aerospike-client${crypto.type}-jdk21</artifactId>
<packaging>jar</packaging>
Expand Down
2 changes: 2 additions & 0 deletions client/src/com/aerospike/client/async/AsyncTxnMonitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public static void execute(EventLoop eventLoop, Cluster cluster, WritePolicy pol
}

Txn txn = policy.txn;
txn.verifyCommand();

Key cmdKey = command.key;

if (txn.getWrites().contains(cmdKey)) {
Expand Down
52 changes: 13 additions & 39 deletions client/src/com/aerospike/client/cluster/Cluster.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2012-2024 Aerospike, Inc.
* Copyright 2012-2025 Aerospike, Inc.
*
* Portions may be licensed to Aerospike, Inc. under one or more contributor
* license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION 2.0.
Expand All @@ -20,6 +20,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -67,10 +68,6 @@ public class Cluster implements Runnable, Closeable {
// Initial host nodes specified by user.
private volatile Host[] seeds;

// All host aliases for all nodes in cluster.
// Only accessed within cluster tend thread.
protected final HashMap<Host,Node> aliases;

// Map of active nodes in cluster.
// Only accessed within cluster tend thread.
protected final HashMap<String,Node> nodesMap;
Expand Down Expand Up @@ -314,7 +311,6 @@ else if (policy.user != null && policy.user.length() > 0) {
rackIds = new int[] {policy.rackId};
}

aliases = new HashMap<Host,Node>();
nodesMap = new HashMap<String,Node>();
nodes = new Node[0];
partitionMap = new HashMap<String,Partitions>();
Expand Down Expand Up @@ -560,8 +556,8 @@ private final void tend(boolean failIfNotConnected, boolean isInit) {
findNodesToRemove(peers);

// Remove nodes in a batch.
if (peers.removeList.size() > 0) {
removeNodes(peers.removeList);
if (peers.removeNodes.size() > 0) {
removeNodes(peers.removeNodes);
}
}

Expand Down Expand Up @@ -751,21 +747,21 @@ protected Node createNode(NodeValidator nv) {
}

private final void findNodesToRemove(Peers peers) {
int refreshCount = peers.refreshCount;
ArrayList<Node> removeList = peers.removeList;
int refreshCount = peers.refreshCount;
HashSet<Node> removeNodes = peers.removeNodes;

for (Node node : nodes) {
if (! node.isActive()) {
// Inactive nodes must be removed.
removeList.add(node);
removeNodes.add(node);
continue;
}

if (refreshCount == 0 && node.failures >= 5) {
// All node info requests failed and this node had 5 consecutive failures.
// Remove node. If no nodes are left, seeds will be tried in next cluster
// tend iteration.
removeList.add(node);
removeNodes.add(node);
continue;
}

Expand All @@ -777,12 +773,12 @@ private final void findNodesToRemove(Peers peers) {
if (! findNodeInPartitionMap(node)) {
// Node doesn't have any partitions mapped to it.
// There is no point in keeping it in the cluster.
removeList.add(node);
removeNodes.add(node);
}
}
else {
// Node not responding. Remove it.
removeList.add(node);
removeNodes.add(node);
}
}
}
Expand Down Expand Up @@ -857,15 +853,9 @@ private final void addNode(Node node) {
}

nodesMap.put(node.getName(), node);

// Add node's aliases to global alias set.
// Aliases are only used in tend thread, so synchronization is not necessary.
for (Host alias : node.aliases) {
aliases.put(alias, node);
}
}

private final void removeNodes(List<Node> nodesToRemove) {
private final void removeNodes(HashSet<Node> nodesToRemove) {
// There is no need to delete nodes from partitionWriteMap because the nodes
// have already been set to inactive. Further connection requests will result
// in an exception and a different node will be tried.
Expand All @@ -875,13 +865,6 @@ private final void removeNodes(List<Node> nodesToRemove) {
// Remove node from map.
nodesMap.remove(node.getName());

// Remove node's aliases from cluster alias set.
// Aliases are only used in tend thread, so synchronization is not necessary.
for (Host alias : node.aliases) {
// Log.debug("Remove alias " + alias);
aliases.remove(alias);
}

if (metricsEnabled) {
// Flush node metrics before removal.
try {
Expand All @@ -901,7 +884,7 @@ private final void removeNodes(List<Node> nodesToRemove) {
/**
* Remove nodes using copy on write semantics.
*/
private final void removeNodesCopy(List<Node> nodesToRemove) {
private final void removeNodesCopy(HashSet<Node> nodesToRemove) {
// Create temporary nodes array.
// Since nodes are only marked for deletion using node references in the nodes array,
// and the tend thread is the only thread modifying nodes, we are guaranteed that nodes
Expand All @@ -911,7 +894,7 @@ private final void removeNodesCopy(List<Node> nodesToRemove) {

// Add nodes that are not in remove list.
for (Node node : nodes) {
if (findNode(node, nodesToRemove)) {
if (nodesToRemove.contains(node)) {
if (Log.infoEnabled()) {
Log.info("Remove node " + node);
}
Expand All @@ -937,15 +920,6 @@ private final void removeNodesCopy(List<Node> nodesToRemove) {
nodes = nodeArray;
}

private final static boolean findNode(Node search, List<Node> nodeList) {
for (Node node : nodeList) {
if (node.equals(search)) {
return true;
}
}
return false;
}

public final boolean isConnected() {
// Must copy array reference for copy on write semantics to work.
Node[] nodeArray = nodes;
Expand Down
51 changes: 38 additions & 13 deletions client/src/com/aerospike/client/cluster/Node.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2012-2024 Aerospike, Inc.
* Copyright 2012-2025 Aerospike, Inc.
*
* Portions may be licensed to Aerospike, Inc. under one or more contributor
* license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION 2.0.
Expand All @@ -18,12 +18,12 @@

import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -67,8 +67,8 @@ public class Node implements Closeable {

protected final Cluster cluster;
private final String name;
private final Host host;
protected final List<Host> aliases;
private String hostname; // Optional hostname.
private final Host host; // Host with IP address name.
protected final InetSocketAddress address;
private final Pool[] connectionPools;
private final AsyncPool[] asyncConnectionPools;
Expand Down Expand Up @@ -104,7 +104,6 @@ public class Node implements Closeable {
public Node(Cluster cluster, NodeValidator nv) {
this.cluster = cluster;
this.name = nv.name;
this.aliases = nv.aliases;
this.host = nv.primaryHost;
this.address = nv.primaryAddress;
this.tendConnection = nv.primaryConn;
Expand Down Expand Up @@ -455,10 +454,13 @@ protected final void refreshPeers(Peers peers) {
// Create new node.
Node node = cluster.createNode(nv);
peers.nodes.put(nv.name, node);
nodeValidated = true;
nodeValidated = true;

if (peer.replaceNode != null) {
peers.removeList.add(peer.replaceNode);
if (Log.infoEnabled()) {
Log.info("Replace node: " + peer.replaceNode);
}
peers.removeNodes.add(peer.replaceNode);
}
break;
}
Expand Down Expand Up @@ -499,14 +501,37 @@ private static boolean findPeerNode(Cluster cluster, Peers peers, Peer peer) {
node.referenceCount++;
return true;
}

// Match peer hosts with the node host.
for (Host h : peer.hosts) {
if (h.equals(node.host)) {
// Main node host is also the same as one of the peer hosts.
// Peer should not be added.
node.referenceCount++;
return true;
if (h.port == node.host.port) {
// Check for IP address (node.host.name is an IP address) or hostname if it exists.
if (h.name.equals(node.host.name) || (node.hostname != null && h.name.equals(node.hostname))) {
// Main node host is also the same as one of the peer hosts.
// Peer should not be added.
node.referenceCount++;
return true;
}

// Peer name might be a hostname. Get peer IP addresses and check with node IP address.
try {
InetAddress[] addresses = InetAddress.getAllByName(h.name);

for (InetAddress address : addresses) {
if (address.equals(node.address.getAddress()) ||
address.isLoopbackAddress()) {
// Set peer hostname for faster future lookups.
node.hostname = h.name;
node.referenceCount++;
return true;
}
}
}
catch (Throwable t) {
// Peer name is invalid. replaceNode may be set, but that node will
// not be replaced because NodeValidator will reject it.
Log.error("Invalid peer received by cluster tend: " + h.name);
}
}
}
peer.replaceNode = node;
Expand Down
17 changes: 1 addition & 16 deletions client/src/com/aerospike/client/cluster/NodeValidator.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2012-2023 Aerospike, Inc.
* Copyright 2012-2025 Aerospike, Inc.
*
* Portions may be licensed to Aerospike, Inc. under one or more contributor
* license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION 2.0.
Expand Down Expand Up @@ -34,7 +34,6 @@
public final class NodeValidator {
Node fallback;
String name;
List<Host> aliases;
Host primaryHost;
InetSocketAddress primaryAddress;
Connection primaryConn;
Expand All @@ -49,7 +48,6 @@ public final class NodeValidator {
*/
public Node seedNode(Cluster cluster, Host host, Peers peers) throws Throwable {
name = null;
aliases = null;
primaryHost = null;
primaryAddress = null;
primaryConn = null;
Expand All @@ -64,11 +62,6 @@ public Node seedNode(Cluster cluster, Host host, Peers peers) throws Throwable {
try {
validateAddress(cluster, address, host.tlsName, host.port, true);

// Only add address alias when not set by load balancer detection logic.
if (this.aliases == null) {
setAliases(address, host.tlsName, host.port);
}

Node node = new Node(cluster, this);

if (validatePeers(peers, node)) {
Expand Down Expand Up @@ -144,7 +137,6 @@ public void validateNode(Cluster cluster, Host host) throws Throwable {
for (InetAddress address : addresses) {
try {
validateAddress(cluster, address, host.tlsName, host.port, false);
setAliases(address, host.tlsName, host.port);
return;
}
catch (Throwable e) {
Expand Down Expand Up @@ -399,7 +391,6 @@ private void setAddress(Cluster cluster, HashMap<String,String> map, String addr
}

// Authenticated connection. Set real host.
setAliases(address, tlsName, h.port);
this.primaryHost = new Host(address.getHostAddress(), tlsName, h.port);
this.primaryAddress = socketAddress;
this.primaryConn.close();
Expand Down Expand Up @@ -428,12 +419,6 @@ private void setAddress(Cluster cluster, HashMap<String,String> map, String addr
}
}

private void setAliases(InetAddress address, String tlsName, int port) {
// Add capacity for current address plus IPV6 address and hostname.
this.aliases = new ArrayList<Host>(3);
this.aliases.add(new Host(address.getHostAddress(), tlsName, port));
}

private static final class SwitchClear {
private InetAddress clearAddress;
private InetSocketAddress clearSocketAddress;
Expand Down
6 changes: 3 additions & 3 deletions client/src/com/aerospike/client/cluster/Peers.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2012-2024 Aerospike, Inc.
* Copyright 2012-2025 Aerospike, Inc.
*
* Portions may be licensed to Aerospike, Inc. under one or more contributor
* license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION 2.0.
Expand All @@ -26,15 +26,15 @@
public final class Peers {
public final ArrayList<Peer> peers;
public final HashMap<String,Node> nodes;
public final ArrayList<Node> removeList;
public final HashSet<Node> removeNodes;
private final HashSet<Host> invalidHosts;
public int refreshCount;
public boolean genChanged;

public Peers(int peerCapacity) {
peers = new ArrayList<Peer>(peerCapacity);
nodes = new HashMap<String,Node>(16);
removeList = new ArrayList<Node>();
removeNodes = new HashSet<Node>(8);
invalidHosts = new HashSet<Host>(8);
}

Expand Down
2 changes: 1 addition & 1 deletion client/src/com/aerospike/client/command/Command.java
Original file line number Diff line number Diff line change
Expand Up @@ -2314,7 +2314,7 @@ private final void writeHeaderWrite(WritePolicy policy, int writeAttr, int field
dataBuffer[9] = (byte)readAttr;
dataBuffer[10] = (byte)writeAttr;
dataBuffer[11] = (byte)infoAttr;
dataBuffer[12] = (byte)txnAttr;;
dataBuffer[12] = (byte)txnAttr;
dataBuffer[13] = 0; // clear the result code
Buffer.intToBytes(generation, dataBuffer, 14);
Buffer.intToBytes(policy.expiration, dataBuffer, 18);
Expand Down
2 changes: 1 addition & 1 deletion client/src/com/aerospike/client/command/TxnMonitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public final class TxnMonitor {

public static void addKey(Cluster cluster, WritePolicy policy, Key cmdKey) {
Txn txn = policy.txn;
txn.verifyCommand();

if (txn.getWrites().contains(cmdKey)) {
// Transaction monitor already contains this key.
Expand All @@ -66,7 +67,6 @@ public static void addKeys(Cluster cluster, BatchPolicy policy, List<BatchRecord
}

public static Operation[] getTranOps(Txn txn, Key cmdKey) {
txn.verifyCommand();
txn.setNamespace(cmdKey.namespace);

if (txn.monitorExists()) {
Expand Down
Loading

0 comments on commit 88fac26

Please sign in to comment.