From a5098818cbd4bdb7bb93c09dc0b4ffb021be5507 Mon Sep 17 00:00:00 2001 From: tison Date: Sun, 13 Mar 2022 09:45:38 +0800 Subject: [PATCH 01/15] introduce Protocol Signed-off-by: tison --- .../src/main/resources/zookeeper.jute | 2 + .../zookeeper/protocol/DefaultProtocol.java | 33 +++++++++++ .../apache/zookeeper/protocol/Protocol.java | 14 +++++ .../zookeeper/protocol/ZK33Protocol.java | 59 +++++++++++++++++++ 4 files changed, 108 insertions(+) create mode 100644 zookeeper-server/src/main/java/org/apache/zookeeper/protocol/DefaultProtocol.java create mode 100644 zookeeper-server/src/main/java/org/apache/zookeeper/protocol/Protocol.java create mode 100644 zookeeper-server/src/main/java/org/apache/zookeeper/protocol/ZK33Protocol.java diff --git a/zookeeper-jute/src/main/resources/zookeeper.jute b/zookeeper-jute/src/main/resources/zookeeper.jute index 796ea396755..d52b4afefe2 100644 --- a/zookeeper-jute/src/main/resources/zookeeper.jute +++ b/zookeeper-jute/src/main/resources/zookeeper.jute @@ -65,12 +65,14 @@ module org.apache.zookeeper.proto { int timeOut; long sessionId; buffer passwd; + boolean readOnly; } class ConnectResponse { int protocolVersion; int timeOut; long sessionId; buffer passwd; + boolean readOnly; } class SetWatches { long relativeZxid; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/DefaultProtocol.java b/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/DefaultProtocol.java new file mode 100644 index 00000000000..bda4e02df85 --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/DefaultProtocol.java @@ -0,0 +1,33 @@ +package org.apache.zookeeper.protocol; + +import java.io.IOException; +import org.apache.jute.InputArchive; +import org.apache.jute.OutputArchive; +import org.apache.zookeeper.proto.ConnectRequest; +import org.apache.zookeeper.proto.ConnectResponse; + +public class DefaultProtocol implements Protocol { + @Override + public ConnectRequest deserializeConnectRequest(InputArchive inputArchive) throws IOException { + final ConnectRequest request = new ConnectRequest(); + request.deserialize(inputArchive, "connect"); + return request; + } + + @Override + public ConnectResponse deserializeConnectResponse(InputArchive inputArchive) throws IOException { + final ConnectResponse response = new ConnectResponse(); + response.deserialize(inputArchive, "connect"); + return response; + } + + @Override + public void serializeConnectRequest(OutputArchive outputArchive, ConnectRequest connectRequest) throws IOException { + connectRequest.serialize(outputArchive, "connect"); + } + + @Override + public void serializeConnectResponse(OutputArchive outputArchive, ConnectResponse connectResponse) throws IOException { + connectResponse.serialize(outputArchive, "connect"); + } +} diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/Protocol.java b/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/Protocol.java new file mode 100644 index 00000000000..8fcfa981f0a --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/Protocol.java @@ -0,0 +1,14 @@ +package org.apache.zookeeper.protocol; + +import java.io.IOException; +import org.apache.jute.InputArchive; +import org.apache.jute.OutputArchive; +import org.apache.zookeeper.proto.ConnectRequest; +import org.apache.zookeeper.proto.ConnectResponse; + +public interface Protocol { + ConnectRequest deserializeConnectRequest(InputArchive inputArchive) throws IOException; + ConnectResponse deserializeConnectResponse(InputArchive inputArchive) throws IOException; + void serializeConnectRequest(OutputArchive outputArchive, ConnectRequest connectRequest) throws IOException; + void serializeConnectResponse(OutputArchive outputArchive, ConnectResponse connectResponse) throws IOException; +} diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/ZK33Protocol.java b/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/ZK33Protocol.java new file mode 100644 index 00000000000..52835f3c31f --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/ZK33Protocol.java @@ -0,0 +1,59 @@ +package org.apache.zookeeper.protocol; + +import java.io.IOException; +import org.apache.jute.InputArchive; +import org.apache.jute.OutputArchive; +import org.apache.zookeeper.proto.ConnectRequest; +import org.apache.zookeeper.proto.ConnectResponse; + +/** + * ZooKeeper 3.3 and earlier doesn't handle ReadOnly field of {@link ConnectRequest} and {@link ConnectResponse}. + */ +public class ZK33Protocol implements Protocol { + @Override + public ConnectRequest deserializeConnectRequest(InputArchive inputArchive) throws IOException { + final ConnectRequest request = new ConnectRequest(); + inputArchive.startRecord("connect"); + request.setProtocolVersion(inputArchive.readInt("protocolVersion")); + request.setLastZxidSeen(inputArchive.readLong("lastZxidSeen")); + request.setTimeOut(inputArchive.readInt("timeOut")); + request.setSessionId(inputArchive.readLong("sessionId")); + request.setPasswd(inputArchive.readBuffer("passwd")); + inputArchive.endRecord("connect"); + return request; + } + + @Override + public ConnectResponse deserializeConnectResponse(InputArchive inputArchive) throws IOException { + final ConnectResponse response = new ConnectResponse(); + inputArchive.startRecord("connect"); + response.setProtocolVersion(inputArchive.readInt("protocolVersion")); + response.setTimeOut(inputArchive.readInt("timeOut")); + response.setSessionId(inputArchive.readLong("sessionId")); + response.setPasswd(inputArchive.readBuffer("passwd")); + response.setReadOnly(false); // old version doesn't have readonly concept + inputArchive.endRecord("connect"); + return response; + } + + @Override + public void serializeConnectRequest(OutputArchive outputArchive, ConnectRequest connectRequest) throws IOException { + outputArchive.startRecord(connectRequest, "connect"); + outputArchive.writeInt(connectRequest.getProtocolVersion(), "protocolVersion"); + outputArchive.writeLong(connectRequest.getLastZxidSeen(), "lastZxidSeen"); + outputArchive.writeInt(connectRequest.getTimeOut(), "timeOut"); + outputArchive.writeLong(connectRequest.getSessionId(), "sessionId"); + outputArchive.writeBuffer(connectRequest.getPasswd(), "passwd"); + outputArchive.endRecord(connectRequest, "connect"); + } + + @Override + public void serializeConnectResponse(OutputArchive outputArchive, ConnectResponse connectResponse) throws IOException { + outputArchive.startRecord(connectResponse, "connect"); + outputArchive.writeInt(connectResponse.getProtocolVersion(),"protocolVersion"); + outputArchive.writeInt(connectResponse.getTimeOut(),"timeOut"); + outputArchive.writeLong(connectResponse.getSessionId(),"sessionId"); + outputArchive.writeBuffer(connectResponse.getPasswd(),"passwd"); + outputArchive.endRecord(connectResponse, "connect"); + } +} From d21810cea52ac9dae0fa9f1a7b8167c3c6a7d0fe Mon Sep 17 00:00:00 2001 From: tison Date: Sun, 13 Mar 2022 09:46:28 +0800 Subject: [PATCH 02/15] buffer read exact length, serialize is naturally compatible Signed-off-by: tison --- .../zookeeper/protocol/DefaultProtocol.java | 11 ---------- .../apache/zookeeper/protocol/Protocol.java | 3 --- .../zookeeper/protocol/ZK33Protocol.java | 22 ------------------- 3 files changed, 36 deletions(-) diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/DefaultProtocol.java b/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/DefaultProtocol.java index bda4e02df85..2820e3e88d8 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/DefaultProtocol.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/DefaultProtocol.java @@ -2,7 +2,6 @@ import java.io.IOException; import org.apache.jute.InputArchive; -import org.apache.jute.OutputArchive; import org.apache.zookeeper.proto.ConnectRequest; import org.apache.zookeeper.proto.ConnectResponse; @@ -20,14 +19,4 @@ public ConnectResponse deserializeConnectResponse(InputArchive inputArchive) thr response.deserialize(inputArchive, "connect"); return response; } - - @Override - public void serializeConnectRequest(OutputArchive outputArchive, ConnectRequest connectRequest) throws IOException { - connectRequest.serialize(outputArchive, "connect"); - } - - @Override - public void serializeConnectResponse(OutputArchive outputArchive, ConnectResponse connectResponse) throws IOException { - connectResponse.serialize(outputArchive, "connect"); - } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/Protocol.java b/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/Protocol.java index 8fcfa981f0a..b6ce6d0d1b0 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/Protocol.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/Protocol.java @@ -2,13 +2,10 @@ import java.io.IOException; import org.apache.jute.InputArchive; -import org.apache.jute.OutputArchive; import org.apache.zookeeper.proto.ConnectRequest; import org.apache.zookeeper.proto.ConnectResponse; public interface Protocol { ConnectRequest deserializeConnectRequest(InputArchive inputArchive) throws IOException; ConnectResponse deserializeConnectResponse(InputArchive inputArchive) throws IOException; - void serializeConnectRequest(OutputArchive outputArchive, ConnectRequest connectRequest) throws IOException; - void serializeConnectResponse(OutputArchive outputArchive, ConnectResponse connectResponse) throws IOException; } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/ZK33Protocol.java b/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/ZK33Protocol.java index 52835f3c31f..0d49ae41dd3 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/ZK33Protocol.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/ZK33Protocol.java @@ -2,7 +2,6 @@ import java.io.IOException; import org.apache.jute.InputArchive; -import org.apache.jute.OutputArchive; import org.apache.zookeeper.proto.ConnectRequest; import org.apache.zookeeper.proto.ConnectResponse; @@ -35,25 +34,4 @@ public ConnectResponse deserializeConnectResponse(InputArchive inputArchive) thr inputArchive.endRecord("connect"); return response; } - - @Override - public void serializeConnectRequest(OutputArchive outputArchive, ConnectRequest connectRequest) throws IOException { - outputArchive.startRecord(connectRequest, "connect"); - outputArchive.writeInt(connectRequest.getProtocolVersion(), "protocolVersion"); - outputArchive.writeLong(connectRequest.getLastZxidSeen(), "lastZxidSeen"); - outputArchive.writeInt(connectRequest.getTimeOut(), "timeOut"); - outputArchive.writeLong(connectRequest.getSessionId(), "sessionId"); - outputArchive.writeBuffer(connectRequest.getPasswd(), "passwd"); - outputArchive.endRecord(connectRequest, "connect"); - } - - @Override - public void serializeConnectResponse(OutputArchive outputArchive, ConnectResponse connectResponse) throws IOException { - outputArchive.startRecord(connectResponse, "connect"); - outputArchive.writeInt(connectResponse.getProtocolVersion(),"protocolVersion"); - outputArchive.writeInt(connectResponse.getTimeOut(),"timeOut"); - outputArchive.writeLong(connectResponse.getSessionId(),"sessionId"); - outputArchive.writeBuffer(connectResponse.getPasswd(),"passwd"); - outputArchive.endRecord(connectResponse, "connect"); - } } From 505316c4b2c40c2941499cb3669caedd44447afa Mon Sep 17 00:00:00 2001 From: tison Date: Sun, 13 Mar 2022 09:58:35 +0800 Subject: [PATCH 03/15] introduce ProtocolManager Signed-off-by: tison --- .../zookeeper/protocol/DefaultProtocol.java | 4 +- .../zookeeper/protocol/ProtocolManager.java | 42 +++++++++++++++++++ .../zookeeper/protocol/ZK33Protocol.java | 4 +- 3 files changed, 48 insertions(+), 2 deletions(-) create mode 100644 zookeeper-server/src/main/java/org/apache/zookeeper/protocol/ProtocolManager.java diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/DefaultProtocol.java b/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/DefaultProtocol.java index 2820e3e88d8..df75ab96f42 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/DefaultProtocol.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/DefaultProtocol.java @@ -5,7 +5,9 @@ import org.apache.zookeeper.proto.ConnectRequest; import org.apache.zookeeper.proto.ConnectResponse; -public class DefaultProtocol implements Protocol { +public enum DefaultProtocol implements Protocol { + INSTANCE; + @Override public ConnectRequest deserializeConnectRequest(InputArchive inputArchive) throws IOException { final ConnectRequest request = new ConnectRequest(); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/ProtocolManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/ProtocolManager.java new file mode 100644 index 00000000000..be64df5a518 --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/ProtocolManager.java @@ -0,0 +1,42 @@ +package org.apache.zookeeper.protocol; + +import java.io.IOException; +import org.apache.jute.InputArchive; +import org.apache.zookeeper.proto.ConnectRequest; +import org.apache.zookeeper.proto.ConnectResponse; + +public class ProtocolManager { + private volatile Protocol protocol = null; + + public ConnectRequest deserializeConnectRequest(InputArchive inputArchive) throws IOException { + if (protocol != null) { + return protocol.deserializeConnectRequest(inputArchive); + } + + try { + final ConnectRequest request = DefaultProtocol.INSTANCE.deserializeConnectRequest(inputArchive); + this.protocol = DefaultProtocol.INSTANCE; + return request; + } catch (IOException e) { + final ConnectRequest request = ZK33Protocol.INSTANCE.deserializeConnectRequest(inputArchive); + this.protocol = ZK33Protocol.INSTANCE; + return request; + } + } + + public ConnectResponse deserializeConnectResponse(InputArchive inputArchive) throws IOException { + if (protocol != null) { + return protocol.deserializeConnectResponse(inputArchive); + } + + try { + final ConnectResponse response = DefaultProtocol.INSTANCE.deserializeConnectResponse(inputArchive); + this.protocol = DefaultProtocol.INSTANCE; + return response; + } catch (IOException e) { + final ConnectResponse response = ZK33Protocol.INSTANCE.deserializeConnectResponse(inputArchive); + this.protocol = ZK33Protocol.INSTANCE; + return response; + } + } +} diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/ZK33Protocol.java b/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/ZK33Protocol.java index 0d49ae41dd3..ebdd8902013 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/ZK33Protocol.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/ZK33Protocol.java @@ -8,7 +8,9 @@ /** * ZooKeeper 3.3 and earlier doesn't handle ReadOnly field of {@link ConnectRequest} and {@link ConnectResponse}. */ -public class ZK33Protocol implements Protocol { +public enum ZK33Protocol implements Protocol { + INSTANCE; + @Override public ConnectRequest deserializeConnectRequest(InputArchive inputArchive) throws IOException { final ConnectRequest request = new ConnectRequest(); From 664be7f8e577972274940afc4b1d75974ab7413e Mon Sep 17 00:00:00 2001 From: tison Date: Sun, 13 Mar 2022 10:17:04 +0800 Subject: [PATCH 04/15] adapt protocol Signed-off-by: tison --- .../java/org/apache/zookeeper/ClientCnxn.java | 31 +++---------------- .../apache/zookeeper/ClientCnxnSocket.java | 18 ++++------- .../zookeeper/protocol/ProtocolManager.java | 4 +++ .../apache/zookeeper/server/ServerCnxn.java | 14 +++------ .../zookeeper/server/ZooKeeperServer.java | 21 ++++--------- .../java/org/apache/zookeeper/MockPacket.java | 10 ------ .../server/quorum/WatchLeakTest.java | 4 +-- .../apache/zookeeper/test/MaxCnxnsTest.java | 14 ++------- .../test/SessionInvalidationTest.java | 2 +- 9 files changed, 30 insertions(+), 88 deletions(-) diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java index 347a68f80b5..f7ead62c480 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java @@ -285,8 +285,6 @@ static class Packet { WatchRegistration watchRegistration; - public boolean readOnly; - WatchDeregistration watchDeregistration; /** Convenience ctor */ @@ -295,23 +293,12 @@ static class Packet { ReplyHeader replyHeader, Record request, Record response, - WatchRegistration watchRegistration) { - this(requestHeader, replyHeader, request, response, watchRegistration, false); - } - - Packet( - RequestHeader requestHeader, - ReplyHeader replyHeader, - Record request, - Record response, - WatchRegistration watchRegistration, - boolean readOnly) { - + WatchRegistration watchRegistration + ) { this.requestHeader = requestHeader; this.replyHeader = replyHeader; this.request = request; this.response = response; - this.readOnly = readOnly; this.watchRegistration = watchRegistration; } @@ -325,8 +312,6 @@ public void createBB() { } if (request instanceof ConnectRequest) { request.serialize(boa, "connect"); - // append "am-I-allowed-to-be-readonly" flag - boa.writeBool(readOnly, "readOnly"); } else if (request != null) { request.serialize(boa, "request"); } @@ -1008,7 +993,7 @@ void primeConnection() throws IOException { clientCnxnSocket.getRemoteSocketAddress()); isFirstConnect = false; long sessId = (seenRwServerBefore) ? sessionId : 0; - ConnectRequest conReq = new ConnectRequest(0, lastZxid, sessionTimeout, sessId, sessionPasswd); + ConnectRequest conReq = new ConnectRequest(0, lastZxid, sessionTimeout, sessId, sessionPasswd, readOnly); // We add backwards since we are pushing into the front // Only send if there's a pending watch if (!clientConfig.getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET)) { @@ -1088,7 +1073,7 @@ record = new SetWatches2(setWatchesLastZxid, dataWatchesBatch, existWatchesBatch null, null)); } - outgoingQueue.addFirst(new Packet(null, null, conReq, null, null, readOnly)); + outgoingQueue.addFirst(new Packet(null, null, conReq, null, null)); clientCnxnSocket.connectionPrimed(); LOG.debug("Session establishment request sent on {}", clientCnxnSocket.getRemoteSocketAddress()); } @@ -1404,12 +1389,6 @@ private void cleanup() { /** * Callback invoked by the ClientCnxnSocket once a connection has been * established. - * - * @param _negotiatedSessionTimeout - * @param _sessionId - * @param _sessionPasswd - * @param isRO - * @throws IOException */ void onConnected( int _negotiatedSessionTimeout, @@ -1627,7 +1606,7 @@ public void sendPacket(Record request, Record response, AsyncCallback cb, int op ReplyHeader r = new ReplyHeader(); r.setXid(xid); - Packet p = new Packet(h, r, request, response, null, false); + Packet p = new Packet(h, r, request, response, null); p.cb = cb; sendThread.sendPacket(p); } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java index 9b53107e3d7..690cd922f50 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java @@ -32,6 +32,7 @@ import org.apache.zookeeper.common.Time; import org.apache.zookeeper.common.ZKConfig; import org.apache.zookeeper.proto.ConnectResponse; +import org.apache.zookeeper.protocol.ProtocolManager; import org.apache.zookeeper.server.ByteBufferInputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,6 +49,8 @@ abstract class ClientCnxnSocket { private static final Logger LOG = LoggerFactory.getLogger(ClientCnxnSocket.class); + private final ProtocolManager protocolManager = new ProtocolManager(); + protected boolean initialized; /** @@ -137,21 +140,12 @@ void readConnectResult() throws IOException { ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer); BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis); - ConnectResponse conRsp = new ConnectResponse(); - conRsp.deserialize(bbia, "connect"); - - // read "is read-only" flag - boolean isRO = false; - try { - isRO = bbia.readBool("readOnly"); - } catch (IOException e) { - // this is ok -- just a packet from an old server which - // doesn't contain readOnly field + ConnectResponse conRsp = protocolManager.deserializeConnectResponse(bbia); + if (protocolManager.isZK33Protol()) { LOG.warn("Connected to an old server; r-o mode will be unavailable"); } - this.sessionId = conRsp.getSessionId(); - sendThread.onConnected(conRsp.getTimeOut(), this.sessionId, conRsp.getPasswd(), isRO); + sendThread.onConnected(conRsp.getTimeOut(), this.sessionId, conRsp.getPasswd(), conRsp.getReadOnly()); } abstract boolean isConnected(); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/ProtocolManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/ProtocolManager.java index be64df5a518..9d187094c1b 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/ProtocolManager.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/ProtocolManager.java @@ -8,6 +8,10 @@ public class ProtocolManager { private volatile Protocol protocol = null; + public boolean isZK33Protol() { + return protocol != null && protocol instanceof ZK33Protocol; + } + public ConnectRequest deserializeConnectRequest(InputArchive inputArchive) throws IOException { if (protocol != null) { return protocol.deserializeConnectRequest(inputArchive); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java index b5b2645838d..32e515f13ac 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java @@ -46,6 +46,7 @@ import org.apache.zookeeper.metrics.Counter; import org.apache.zookeeper.proto.ReplyHeader; import org.apache.zookeeper.proto.RequestHeader; +import org.apache.zookeeper.protocol.ProtocolManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,16 +61,9 @@ public abstract class ServerCnxn implements Stats, Watcher { public static final Object me = new Object(); private static final Logger LOG = LoggerFactory.getLogger(ServerCnxn.class); - private Set authInfo = Collections.newSetFromMap(new ConcurrentHashMap()); - - /** - * If the client is of old version, we don't send r-o mode info to it. - * The reason is that if we would, old C client doesn't read it, which - * results in TCP RST packet, i.e. "connection reset by peer". - */ - boolean isOldClient = true; - - AtomicLong outstandingCount = new AtomicLong(); + public final ProtocolManager protocolManager = new ProtocolManager(); + private final Set authInfo = Collections.newSetFromMap(new ConcurrentHashMap<>()); + private final AtomicLong outstandingCount = new AtomicLong(); /** The ZooKeeperServer for this connection. May be null if the server * is not currently serving requests (for example if the server is not diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java index 86c13aea386..bfb8a30618e 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java @@ -1062,14 +1062,12 @@ public void finishSessionInit(ServerCnxn cnxn, boolean valid) { valid ? cnxn.getSessionTimeout() : 0, valid ? cnxn.getSessionId() : 0, // send 0 if session is no // longer valid - valid ? generatePasswd(cnxn.getSessionId()) : new byte[16]); + valid ? generatePasswd(cnxn.getSessionId()) : new byte[16], + this instanceof ReadOnlyZooKeeperServer); ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos); bos.writeInt(-1, "len"); rsp.serialize(bos, "connect"); - if (!cnxn.isOldClient) { - bos.writeBool(this instanceof ReadOnlyZooKeeperServer, "readOnly"); - } baos.close(); ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray()); bb.putInt(bb.remaining() - 4).rewind(); @@ -1370,8 +1368,7 @@ public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException, ClientCnxnLimitException { BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer)); - ConnectRequest connReq = new ConnectRequest(); - connReq.deserialize(bia, "connect"); + ConnectRequest connReq = cnxn.protocolManager.deserializeConnectRequest(bia); LOG.debug( "Session establishment request from client {} client's lastZxid is 0x{}", cnxn.getRemoteSocketAddress(), @@ -1395,21 +1392,15 @@ public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) throw new ClientCnxnLimitException(); } ServerMetrics.getMetrics().CONNECTION_TOKEN_DEFICIT.add(connThrottle.getDeficit()); - ServerMetrics.getMetrics().CONNECTION_REQUEST_COUNT.add(1); - boolean readOnly = false; - try { - readOnly = bia.readBool("readOnly"); - cnxn.isOldClient = false; - } catch (IOException e) { - // this is ok -- just a packet from an old client which - // doesn't contain readOnly field + if (cnxn.protocolManager.isZK33Protol()) { LOG.warn( "Connection request from old client {}; will be dropped if server is in r-o mode", cnxn.getRemoteSocketAddress()); } - if (!readOnly && this instanceof ReadOnlyZooKeeperServer) { + + if (!connReq.getReadOnly() && this instanceof ReadOnlyZooKeeperServer) { String msg = "Refusing session request for not-read-only client " + cnxn.getRemoteSocketAddress(); LOG.info(msg); throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.NOT_READ_ONLY_CLIENT); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/MockPacket.java b/zookeeper-server/src/test/java/org/apache/zookeeper/MockPacket.java index 9d880fd39b0..e4e6548ea23 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/MockPacket.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/MockPacket.java @@ -35,16 +35,6 @@ public MockPacket( super(requestHeader, replyHeader, request, response, watchRegistration); } - public MockPacket( - RequestHeader requestHeader, - ReplyHeader replyHeader, - Record request, - Record response, - WatchRegistration watchRegistration, - boolean readOnly) { - super(requestHeader, replyHeader, request, response, watchRegistration, readOnly); - } - public ByteBuffer createAndReturnBB() { createBB(); return this.bb; diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/WatchLeakTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/WatchLeakTest.java index 70f1844eac6..2fadcf36f96 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/WatchLeakTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/WatchLeakTest.java @@ -255,8 +255,8 @@ private ByteBuffer createConnRequest() { Random r = new Random(SESSION_ID ^ superSecret); byte[] p = new byte[16]; r.nextBytes(p); - ConnectRequest conReq = new ConnectRequest(0, 1L, 30000, SESSION_ID, p); - MockPacket packet = new MockPacket(null, null, conReq, null, null, false); + ConnectRequest conReq = new ConnectRequest(0, 1L, 30000, SESSION_ID, p, false); + MockPacket packet = new MockPacket(null, null, conReq, null, null); return packet.createAndReturnBB(); } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/MaxCnxnsTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/MaxCnxnsTest.java index f7c7a9534d7..0034fd6d04a 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/MaxCnxnsTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/MaxCnxnsTest.java @@ -51,18 +51,16 @@ public CnxnThread(int i) { } public void run() { - SocketChannel sChannel = null; - try { + try (SocketChannel sChannel = SocketChannel.open()) { /* * For future unwary socket programmers: although connect 'blocks' it * does not require an accept on the server side to return. Therefore * you can not assume that all the sockets are connected at the end of * this for loop. */ - sChannel = SocketChannel.open(); sChannel.connect(new InetSocketAddress(host, port)); // Construct a connection request - ConnectRequest conReq = new ConnectRequest(0, 0, 10000, 0, "password".getBytes()); + ConnectRequest conReq = new ConnectRequest(0, 0, 10000, 0, "password".getBytes(), false); ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); boa.writeInt(-1, "len"); @@ -95,14 +93,6 @@ public void run() { } } catch (IOException io) { // "Connection reset by peer" - } finally { - if (sChannel != null) { - try { - sChannel.close(); - } catch (Exception e) { - // Do nothing - } - } } } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionInvalidationTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionInvalidationTest.java index a9229d2b16b..1e7870bd293 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionInvalidationTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionInvalidationTest.java @@ -53,7 +53,7 @@ public void testCreateAfterCloseShouldFail() throws Exception { // open a connection boa.writeInt(44, "len"); - ConnectRequest conReq = new ConnectRequest(0, 0, 30000, 0, new byte[16]); + ConnectRequest conReq = new ConnectRequest(0, 0, 30000, 0, new byte[16], false); conReq.serialize(boa, "connect"); // close connection From 5b7a6982f77302462aecc31c7316a3ff6c9892d8 Mon Sep 17 00:00:00 2001 From: tison Date: Sun, 13 Mar 2022 10:35:44 +0800 Subject: [PATCH 05/15] buffer can be consumed once Signed-off-by: tison --- .../org/apache/zookeeper/ClientCnxnSocket.java | 2 +- .../apache/zookeeper/protocol/ProtocolManager.java | 14 ++++++++------ 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java index 690cd922f50..40623cb793f 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java @@ -134,7 +134,7 @@ void readConnectResult() throws IOException { } buf.append("]"); if (LOG.isTraceEnabled()) { - LOG.trace("readConnectResult {} {}", incomingBuffer.remaining(), buf.toString()); + LOG.trace("readConnectResult {} {}", incomingBuffer.remaining(), buf); } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/ProtocolManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/ProtocolManager.java index 9d187094c1b..e7312566901 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/ProtocolManager.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/ProtocolManager.java @@ -17,12 +17,13 @@ public ConnectRequest deserializeConnectRequest(InputArchive inputArchive) throw return protocol.deserializeConnectRequest(inputArchive); } + final ConnectRequest request = ZK33Protocol.INSTANCE.deserializeConnectRequest(inputArchive); try { - final ConnectRequest request = DefaultProtocol.INSTANCE.deserializeConnectRequest(inputArchive); + request.setReadOnly(inputArchive.readBool("readOnly")); this.protocol = DefaultProtocol.INSTANCE; return request; - } catch (IOException e) { - final ConnectRequest request = ZK33Protocol.INSTANCE.deserializeConnectRequest(inputArchive); + } catch (Exception e) { + request.setReadOnly(false); // old version doesn't have readonly concept this.protocol = ZK33Protocol.INSTANCE; return request; } @@ -33,12 +34,13 @@ public ConnectResponse deserializeConnectResponse(InputArchive inputArchive) thr return protocol.deserializeConnectResponse(inputArchive); } + final ConnectResponse response = ZK33Protocol.INSTANCE.deserializeConnectResponse(inputArchive); try { - final ConnectResponse response = DefaultProtocol.INSTANCE.deserializeConnectResponse(inputArchive); + response.setReadOnly(inputArchive.readBool("readOnly")); this.protocol = DefaultProtocol.INSTANCE; return response; - } catch (IOException e) { - final ConnectResponse response = ZK33Protocol.INSTANCE.deserializeConnectResponse(inputArchive); + } catch (Exception e) { + response.setReadOnly(false); // old version doesn't have readonly concept this.protocol = ZK33Protocol.INSTANCE; return response; } From df53201b0d52982c5ea266604a339d8c991791a8 Mon Sep 17 00:00:00 2001 From: tison Date: Sun, 13 Mar 2022 10:55:05 +0800 Subject: [PATCH 06/15] add doc Signed-off-by: tison --- .../zookeeper/protocol/DefaultProtocol.java | 21 ++++++++++++ .../apache/zookeeper/protocol/Protocol.java | 32 +++++++++++++++++++ .../zookeeper/protocol/ProtocolManager.java | 21 ++++++++++++ .../zookeeper/protocol/ZK33Protocol.java | 18 +++++++++++ 4 files changed, 92 insertions(+) diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/DefaultProtocol.java b/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/DefaultProtocol.java index df75ab96f42..ab79db11267 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/DefaultProtocol.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/DefaultProtocol.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.zookeeper.protocol; import java.io.IOException; @@ -5,6 +23,9 @@ import org.apache.zookeeper.proto.ConnectRequest; import org.apache.zookeeper.proto.ConnectResponse; +/** + * Default wire protocol for nightly ZooKeeper version. + */ public enum DefaultProtocol implements Protocol { INSTANCE; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/Protocol.java b/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/Protocol.java index b6ce6d0d1b0..4b824543023 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/Protocol.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/Protocol.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.zookeeper.protocol; import java.io.IOException; @@ -5,7 +23,21 @@ import org.apache.zookeeper.proto.ConnectRequest; import org.apache.zookeeper.proto.ConnectResponse; +/** + * Basically, wire protocol should be backward and forward compatible between minor versions. + * However, there are several case that it's different due to Jute's limitations. + */ public interface Protocol { + + /** + * Deserializing {@link ConnectRequest} should be specially handled for request from client + * version before and including ZooKeeper 3.3 which doesn't understand readOnly field. + */ ConnectRequest deserializeConnectRequest(InputArchive inputArchive) throws IOException; + + /** + * Deserializing {@link ConnectResponse} should be specially handled for response from server + * version before and including ZooKeeper 3.3 which doesn't understand readOnly field. + */ ConnectResponse deserializeConnectResponse(InputArchive inputArchive) throws IOException; } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/ProtocolManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/ProtocolManager.java index e7312566901..af8d917c155 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/ProtocolManager.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/ProtocolManager.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.zookeeper.protocol; import java.io.IOException; @@ -5,6 +23,9 @@ import org.apache.zookeeper.proto.ConnectRequest; import org.apache.zookeeper.proto.ConnectResponse; +/** + * A facade for switching behaviours between difference {@link Protocol}. + */ public class ProtocolManager { private volatile Protocol protocol = null; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/ZK33Protocol.java b/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/ZK33Protocol.java index ebdd8902013..ab3b56fd95c 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/ZK33Protocol.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/ZK33Protocol.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.zookeeper.protocol; import java.io.IOException; From d638819060dd15a77251112b40cc0c19b3aa338c Mon Sep 17 00:00:00 2001 From: tison Date: Sun, 13 Mar 2022 12:18:32 +0800 Subject: [PATCH 07/15] use dummy object instead of mokito mock Signed-off-by: tison --- .../server/quorum/ReadOnlyZooKeeperServerTest.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServerTest.java index 165dbdca1d7..ca5ef3737d7 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServerTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServerTest.java @@ -18,16 +18,16 @@ package org.apache.zookeeper.server.quorum; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.Mockito.mock; import java.nio.ByteBuffer; -import org.apache.zookeeper.server.NIOServerCnxn; +import org.apache.zookeeper.server.MockServerCnxn; import org.apache.zookeeper.server.ServerCnxn; import org.apache.zookeeper.server.ZKDatabase; import org.apache.zookeeper.server.ZooKeeperServer; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; /** * test ReadOnlyZooKeeperServer @@ -55,8 +55,7 @@ public void testReadOnlyZookeeperServer() { output.flip(); ServerCnxn.CloseRequestException e = assertThrows(ServerCnxn.CloseRequestException.class, () -> { - final NIOServerCnxn nioServerCnxn = mock(NIOServerCnxn.class); - readOnlyZooKeeperServer.processConnectRequest(nioServerCnxn, output); + readOnlyZooKeeperServer.processConnectRequest(new MockServerCnxn(), output); }); assertEquals(e.getReason(), ServerCnxn.DisconnectReason.NOT_READ_ONLY_CLIENT); } From 78db8942419dd8c4464be03fc9be527e6acf0780 Mon Sep 17 00:00:00 2001 From: tison Date: Sun, 13 Mar 2022 12:26:16 +0800 Subject: [PATCH 08/15] fix checkstyle Signed-off-by: tison --- .../server/quorum/ReadOnlyZooKeeperServerTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServerTest.java index ca5ef3737d7..3ee6016f086 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServerTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServerTest.java @@ -18,6 +18,9 @@ package org.apache.zookeeper.server.quorum; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; import java.nio.ByteBuffer; import org.apache.zookeeper.server.MockServerCnxn; import org.apache.zookeeper.server.ServerCnxn; @@ -25,9 +28,6 @@ import org.apache.zookeeper.server.ZooKeeperServer; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; import org.junit.jupiter.api.Test; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.Mockito.mock; /** * test ReadOnlyZooKeeperServer From 645596c801e30937672c3ace7be15956425e791c Mon Sep 17 00:00:00 2001 From: tison Date: Sun, 13 Mar 2022 14:06:56 +0800 Subject: [PATCH 09/15] use dummy object instead of mokito mock Signed-off-by: tison --- .../zookeeper/server/ZooKeeperServerCreationTest.java | 1 - .../org/apache/zookeeper/server/ZooKeeperServerTest.java | 7 +++---- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperServerCreationTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperServerCreationTest.java index 03f6113de87..ac46b4e0f29 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperServerCreationTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperServerCreationTest.java @@ -48,7 +48,6 @@ public void submitRequest(Request si) { zks.setZKDatabase(new ZKDatabase(fileTxnSnapLog)); zks.createSessionTracker(); - ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory(); ServerCnxn cnxn = new MockServerCnxn(); ConnectRequest connReq = new ConnectRequest(); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperServerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperServerTest.java index bd141206905..4d41dac1a9b 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperServerTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperServerTest.java @@ -162,10 +162,9 @@ public void testClientZxidAhead() { output.put((byte) 1); output.flip(); - ServerCnxn.CloseRequestException e = assertThrows(ServerCnxn.CloseRequestException.class, () -> { - final NIOServerCnxn nioServerCnxn = mock(NIOServerCnxn.class); - zooKeeperServer.processConnectRequest(nioServerCnxn, output); - }); + ServerCnxn.CloseRequestException e = assertThrows( + ServerCnxn.CloseRequestException.class, + () -> zooKeeperServer.processConnectRequest(new MockServerCnxn(), output)); assertEquals(e.getReason(), ServerCnxn.DisconnectReason.CLIENT_ZXID_AHEAD); } From ffcec099ba8d99d5cc04ca6b3ba3ad8608ba396a Mon Sep 17 00:00:00 2001 From: tison Date: Sun, 5 Jun 2022 16:32:28 +0800 Subject: [PATCH 10/15] simplify protocol manager logic Signed-off-by: tison --- .../apache/zookeeper/ClientCnxnSocket.java | 4 +- .../zookeeper/compat/ProtocolManager.java | 121 ++++++++++++++++++ .../zookeeper/protocol/DefaultProtocol.java | 45 ------- .../apache/zookeeper/protocol/Protocol.java | 43 ------- .../zookeeper/protocol/ProtocolManager.java | 69 ---------- .../zookeeper/protocol/ZK33Protocol.java | 57 --------- .../apache/zookeeper/server/ServerCnxn.java | 2 +- .../zookeeper/server/ZooKeeperServer.java | 2 +- 8 files changed, 125 insertions(+), 218 deletions(-) create mode 100644 zookeeper-server/src/main/java/org/apache/zookeeper/compat/ProtocolManager.java delete mode 100644 zookeeper-server/src/main/java/org/apache/zookeeper/protocol/DefaultProtocol.java delete mode 100644 zookeeper-server/src/main/java/org/apache/zookeeper/protocol/Protocol.java delete mode 100644 zookeeper-server/src/main/java/org/apache/zookeeper/protocol/ProtocolManager.java delete mode 100644 zookeeper-server/src/main/java/org/apache/zookeeper/protocol/ZK33Protocol.java diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java index 40623cb793f..be279e616db 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java @@ -32,7 +32,7 @@ import org.apache.zookeeper.common.Time; import org.apache.zookeeper.common.ZKConfig; import org.apache.zookeeper.proto.ConnectResponse; -import org.apache.zookeeper.protocol.ProtocolManager; +import org.apache.zookeeper.compat.ProtocolManager; import org.apache.zookeeper.server.ByteBufferInputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -141,7 +141,7 @@ void readConnectResult() throws IOException { ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer); BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis); ConnectResponse conRsp = protocolManager.deserializeConnectResponse(bbia); - if (protocolManager.isZK33Protol()) { + if (protocolManager.isReadonlyAvailable()) { LOG.warn("Connected to an old server; r-o mode will be unavailable"); } this.sessionId = conRsp.getSessionId(); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/compat/ProtocolManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/compat/ProtocolManager.java new file mode 100644 index 00000000000..5633b81c17d --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/compat/ProtocolManager.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.compat; + +import java.io.IOException; +import org.apache.jute.InputArchive; +import org.apache.zookeeper.proto.ConnectRequest; +import org.apache.zookeeper.proto.ConnectResponse; + +/** + * A manager for switching behaviours between difference wire protocol. + *

+ * Basically, wire protocol should be backward and forward compatible between minor versions. + * However, there are several cases that it's different due to Jute's limitations. + */ +public final class ProtocolManager { + private volatile Boolean isReadonlyAvailable = null; + + public boolean isReadonlyAvailable() { + return isReadonlyAvailable != null && isReadonlyAvailable; + } + + /** + * Deserializing {@link ConnectRequest} should be specially handled for request from client + * version before and including ZooKeeper 3.3 which doesn't understand readOnly field. + */ + public ConnectRequest deserializeConnectRequest(InputArchive inputArchive) throws IOException { + if (isReadonlyAvailable != null) { + if (isReadonlyAvailable) { + return deserializeConnectRequestWithReadonly(inputArchive); + } else { + return deserializeConnectRequestWithoutReadonly(inputArchive); + } + } + + final ConnectRequest request = deserializeConnectRequestWithoutReadonly(inputArchive); + try { + request.setReadOnly(inputArchive.readBool("readOnly")); + this.isReadonlyAvailable = true; + } catch (Exception e) { + request.setReadOnly(false); // old version doesn't have readonly concept + this.isReadonlyAvailable = false; + } + return request; + } + + private ConnectRequest deserializeConnectRequestWithReadonly(InputArchive inputArchive) throws IOException { + final ConnectRequest request = new ConnectRequest(); + request.deserialize(inputArchive, "connect"); + return request; + } + + private ConnectRequest deserializeConnectRequestWithoutReadonly(InputArchive inputArchive) throws IOException { + final ConnectRequest request = new ConnectRequest(); + inputArchive.startRecord("connect"); + request.setProtocolVersion(inputArchive.readInt("protocolVersion")); + request.setLastZxidSeen(inputArchive.readLong("lastZxidSeen")); + request.setTimeOut(inputArchive.readInt("timeOut")); + request.setSessionId(inputArchive.readLong("sessionId")); + request.setPasswd(inputArchive.readBuffer("passwd")); + inputArchive.endRecord("connect"); + return request; + } + + /** + * Deserializing {@link ConnectResponse} should be specially handled for response from server + * version before and including ZooKeeper 3.3 which doesn't understand readOnly field. + */ + public ConnectResponse deserializeConnectResponse(InputArchive inputArchive) throws IOException { + if (isReadonlyAvailable != null) { + if (isReadonlyAvailable) { + return deserializeConnectResponseWithReadonly(inputArchive); + } else { + return deserializeConnectResponseWithoutReadonly(inputArchive); + } + } + + final ConnectResponse response = deserializeConnectResponseWithoutReadonly(inputArchive); + try { + response.setReadOnly(inputArchive.readBool("readOnly")); + this.isReadonlyAvailable = true; + } catch (Exception e) { + response.setReadOnly(false); // old version doesn't have readonly concept + this.isReadonlyAvailable = false; + } + return response; + } + + private ConnectResponse deserializeConnectResponseWithReadonly(InputArchive inputArchive) throws IOException { + final ConnectResponse response = new ConnectResponse(); + response.deserialize(inputArchive, "connect"); + return response; + } + + private ConnectResponse deserializeConnectResponseWithoutReadonly(InputArchive inputArchive) throws IOException { + final ConnectResponse response = new ConnectResponse(); + inputArchive.startRecord("connect"); + response.setProtocolVersion(inputArchive.readInt("protocolVersion")); + response.setTimeOut(inputArchive.readInt("timeOut")); + response.setSessionId(inputArchive.readLong("sessionId")); + response.setPasswd(inputArchive.readBuffer("passwd")); + inputArchive.endRecord("connect"); + return response; + } +} diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/DefaultProtocol.java b/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/DefaultProtocol.java deleted file mode 100644 index ab79db11267..00000000000 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/DefaultProtocol.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zookeeper.protocol; - -import java.io.IOException; -import org.apache.jute.InputArchive; -import org.apache.zookeeper.proto.ConnectRequest; -import org.apache.zookeeper.proto.ConnectResponse; - -/** - * Default wire protocol for nightly ZooKeeper version. - */ -public enum DefaultProtocol implements Protocol { - INSTANCE; - - @Override - public ConnectRequest deserializeConnectRequest(InputArchive inputArchive) throws IOException { - final ConnectRequest request = new ConnectRequest(); - request.deserialize(inputArchive, "connect"); - return request; - } - - @Override - public ConnectResponse deserializeConnectResponse(InputArchive inputArchive) throws IOException { - final ConnectResponse response = new ConnectResponse(); - response.deserialize(inputArchive, "connect"); - return response; - } -} diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/Protocol.java b/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/Protocol.java deleted file mode 100644 index 4b824543023..00000000000 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/Protocol.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zookeeper.protocol; - -import java.io.IOException; -import org.apache.jute.InputArchive; -import org.apache.zookeeper.proto.ConnectRequest; -import org.apache.zookeeper.proto.ConnectResponse; - -/** - * Basically, wire protocol should be backward and forward compatible between minor versions. - * However, there are several case that it's different due to Jute's limitations. - */ -public interface Protocol { - - /** - * Deserializing {@link ConnectRequest} should be specially handled for request from client - * version before and including ZooKeeper 3.3 which doesn't understand readOnly field. - */ - ConnectRequest deserializeConnectRequest(InputArchive inputArchive) throws IOException; - - /** - * Deserializing {@link ConnectResponse} should be specially handled for response from server - * version before and including ZooKeeper 3.3 which doesn't understand readOnly field. - */ - ConnectResponse deserializeConnectResponse(InputArchive inputArchive) throws IOException; -} diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/ProtocolManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/ProtocolManager.java deleted file mode 100644 index af8d917c155..00000000000 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/ProtocolManager.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zookeeper.protocol; - -import java.io.IOException; -import org.apache.jute.InputArchive; -import org.apache.zookeeper.proto.ConnectRequest; -import org.apache.zookeeper.proto.ConnectResponse; - -/** - * A facade for switching behaviours between difference {@link Protocol}. - */ -public class ProtocolManager { - private volatile Protocol protocol = null; - - public boolean isZK33Protol() { - return protocol != null && protocol instanceof ZK33Protocol; - } - - public ConnectRequest deserializeConnectRequest(InputArchive inputArchive) throws IOException { - if (protocol != null) { - return protocol.deserializeConnectRequest(inputArchive); - } - - final ConnectRequest request = ZK33Protocol.INSTANCE.deserializeConnectRequest(inputArchive); - try { - request.setReadOnly(inputArchive.readBool("readOnly")); - this.protocol = DefaultProtocol.INSTANCE; - return request; - } catch (Exception e) { - request.setReadOnly(false); // old version doesn't have readonly concept - this.protocol = ZK33Protocol.INSTANCE; - return request; - } - } - - public ConnectResponse deserializeConnectResponse(InputArchive inputArchive) throws IOException { - if (protocol != null) { - return protocol.deserializeConnectResponse(inputArchive); - } - - final ConnectResponse response = ZK33Protocol.INSTANCE.deserializeConnectResponse(inputArchive); - try { - response.setReadOnly(inputArchive.readBool("readOnly")); - this.protocol = DefaultProtocol.INSTANCE; - return response; - } catch (Exception e) { - response.setReadOnly(false); // old version doesn't have readonly concept - this.protocol = ZK33Protocol.INSTANCE; - return response; - } - } -} diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/ZK33Protocol.java b/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/ZK33Protocol.java deleted file mode 100644 index ab3b56fd95c..00000000000 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/protocol/ZK33Protocol.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zookeeper.protocol; - -import java.io.IOException; -import org.apache.jute.InputArchive; -import org.apache.zookeeper.proto.ConnectRequest; -import org.apache.zookeeper.proto.ConnectResponse; - -/** - * ZooKeeper 3.3 and earlier doesn't handle ReadOnly field of {@link ConnectRequest} and {@link ConnectResponse}. - */ -public enum ZK33Protocol implements Protocol { - INSTANCE; - - @Override - public ConnectRequest deserializeConnectRequest(InputArchive inputArchive) throws IOException { - final ConnectRequest request = new ConnectRequest(); - inputArchive.startRecord("connect"); - request.setProtocolVersion(inputArchive.readInt("protocolVersion")); - request.setLastZxidSeen(inputArchive.readLong("lastZxidSeen")); - request.setTimeOut(inputArchive.readInt("timeOut")); - request.setSessionId(inputArchive.readLong("sessionId")); - request.setPasswd(inputArchive.readBuffer("passwd")); - inputArchive.endRecord("connect"); - return request; - } - - @Override - public ConnectResponse deserializeConnectResponse(InputArchive inputArchive) throws IOException { - final ConnectResponse response = new ConnectResponse(); - inputArchive.startRecord("connect"); - response.setProtocolVersion(inputArchive.readInt("protocolVersion")); - response.setTimeOut(inputArchive.readInt("timeOut")); - response.setSessionId(inputArchive.readLong("sessionId")); - response.setPasswd(inputArchive.readBuffer("passwd")); - response.setReadOnly(false); // old version doesn't have readonly concept - inputArchive.endRecord("connect"); - return response; - } -} diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java index 32e515f13ac..80492e1abe5 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java @@ -46,7 +46,7 @@ import org.apache.zookeeper.metrics.Counter; import org.apache.zookeeper.proto.ReplyHeader; import org.apache.zookeeper.proto.RequestHeader; -import org.apache.zookeeper.protocol.ProtocolManager; +import org.apache.zookeeper.compat.ProtocolManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java index bfb8a30618e..0edf8bb9228 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java @@ -1394,7 +1394,7 @@ public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) ServerMetrics.getMetrics().CONNECTION_TOKEN_DEFICIT.add(connThrottle.getDeficit()); ServerMetrics.getMetrics().CONNECTION_REQUEST_COUNT.add(1); - if (cnxn.protocolManager.isZK33Protol()) { + if (cnxn.protocolManager.isReadonlyAvailable()) { LOG.warn( "Connection request from old client {}; will be dropped if server is in r-o mode", cnxn.getRemoteSocketAddress()); From 658c629f2b4676f34d6572532be79f2269eb29e3 Mon Sep 17 00:00:00 2001 From: tison Date: Sun, 5 Jun 2022 16:37:10 +0800 Subject: [PATCH 11/15] fix checkstyle Signed-off-by: tison --- .../src/main/java/org/apache/zookeeper/ClientCnxnSocket.java | 2 +- .../src/main/java/org/apache/zookeeper/server/ServerCnxn.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java index be279e616db..35af4a2f121 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java @@ -31,8 +31,8 @@ import org.apache.zookeeper.client.ZKClientConfig; import org.apache.zookeeper.common.Time; import org.apache.zookeeper.common.ZKConfig; -import org.apache.zookeeper.proto.ConnectResponse; import org.apache.zookeeper.compat.ProtocolManager; +import org.apache.zookeeper.proto.ConnectResponse; import org.apache.zookeeper.server.ByteBufferInputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java index 80492e1abe5..661c2aa2f09 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java @@ -41,12 +41,12 @@ import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs.OpCode; +import org.apache.zookeeper.compat.ProtocolManager; import org.apache.zookeeper.data.Id; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.metrics.Counter; import org.apache.zookeeper.proto.ReplyHeader; import org.apache.zookeeper.proto.RequestHeader; -import org.apache.zookeeper.compat.ProtocolManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; From 269d3ad3e6c6c819f12ce93aeed4b947320ea1ce Mon Sep 17 00:00:00 2001 From: tison Date: Sun, 5 Jun 2022 16:44:00 +0800 Subject: [PATCH 12/15] retrigger ci Signed-off-by: tison From 1b853028dae6b6bc5b737b6ca434334ce75196f0 Mon Sep 17 00:00:00 2001 From: tison Date: Tue, 7 Jun 2022 06:52:48 +0800 Subject: [PATCH 13/15] retrigger ci Signed-off-by: tison From 58c0767c6961475748a79caa8f2dbd273add8527 Mon Sep 17 00:00:00 2001 From: tison Date: Sun, 26 Jun 2022 22:08:45 +0800 Subject: [PATCH 14/15] retrigger flasky tests Signed-off-by: tison From 6bcc27aa89701b2658ff49054b17484f88065153 Mon Sep 17 00:00:00 2001 From: tison Date: Sun, 26 Jun 2022 23:36:40 +0800 Subject: [PATCH 15/15] retrigger flasky tests Signed-off-by: tison