Skip to content

Commit

Permalink
ZOOKEEPER-4492: Merge readOnly field into ConnectRequest and Response
Browse files Browse the repository at this point in the history
According to [this comment in ZOOKEEPER-102](https://issues.apache.org/jira/browse/ZOOKEEPER-102?focusedCommentId=16977000&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16977000) I introduce a `Protocol` abstraction and going to moving all wire protocol concept into `cnxn` and this scope, so that client and server's business logics handle only deserialized/real record.

cc eolivelli maoling Randgalt

This supersedes apache#1832.

Author: tison <wander4096@gmail.com>

Reviewers: Enrico Olivelli <eolivelli@apache.org>, Mate Szalay-Beko <symat@apache.org>

Closes apache#1837 from tisonkun/protocol
  • Loading branch information
tisonkun authored and anurag-harness committed Nov 2, 2022
1 parent bb34dc1 commit b154a55
Show file tree
Hide file tree
Showing 13 changed files with 155 additions and 97 deletions.
2 changes: 2 additions & 0 deletions zookeeper-jute/src/main/resources/zookeeper.jute
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,14 @@ module org.apache.zookeeper.proto {
int timeOut;
long sessionId;
buffer passwd;
boolean readOnly;
}
class ConnectResponse {
int protocolVersion;
int timeOut;
long sessionId;
buffer passwd;
boolean readOnly;
}
class SetWatches {
long relativeZxid;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,6 @@ static class Packet {

WatchRegistration watchRegistration;

public boolean readOnly;

WatchDeregistration watchDeregistration;

/** Convenience ctor */
Expand All @@ -295,23 +293,12 @@ static class Packet {
ReplyHeader replyHeader,
Record request,
Record response,
WatchRegistration watchRegistration) {
this(requestHeader, replyHeader, request, response, watchRegistration, false);
}

Packet(
RequestHeader requestHeader,
ReplyHeader replyHeader,
Record request,
Record response,
WatchRegistration watchRegistration,
boolean readOnly) {

WatchRegistration watchRegistration
) {
this.requestHeader = requestHeader;
this.replyHeader = replyHeader;
this.request = request;
this.response = response;
this.readOnly = readOnly;
this.watchRegistration = watchRegistration;
}

Expand All @@ -325,8 +312,6 @@ public void createBB() {
}
if (request instanceof ConnectRequest) {
request.serialize(boa, "connect");
// append "am-I-allowed-to-be-readonly" flag
boa.writeBool(readOnly, "readOnly");
} else if (request != null) {
request.serialize(boa, "request");
}
Expand Down Expand Up @@ -1008,7 +993,7 @@ void primeConnection() throws IOException {
clientCnxnSocket.getRemoteSocketAddress());
isFirstConnect = false;
long sessId = (seenRwServerBefore) ? sessionId : 0;
ConnectRequest conReq = new ConnectRequest(0, lastZxid, sessionTimeout, sessId, sessionPasswd);
ConnectRequest conReq = new ConnectRequest(0, lastZxid, sessionTimeout, sessId, sessionPasswd, readOnly);
// We add backwards since we are pushing into the front
// Only send if there's a pending watch
if (!clientConfig.getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET)) {
Expand Down Expand Up @@ -1088,7 +1073,7 @@ record = new SetWatches2(setWatchesLastZxid, dataWatchesBatch, existWatchesBatch
null,
null));
}
outgoingQueue.addFirst(new Packet(null, null, conReq, null, null, readOnly));
outgoingQueue.addFirst(new Packet(null, null, conReq, null, null));
clientCnxnSocket.connectionPrimed();
LOG.debug("Session establishment request sent on {}", clientCnxnSocket.getRemoteSocketAddress());
}
Expand Down Expand Up @@ -1406,12 +1391,6 @@ private void cleanup() {
/**
* Callback invoked by the ClientCnxnSocket once a connection has been
* established.
*
* @param _negotiatedSessionTimeout
* @param _sessionId
* @param _sessionPasswd
* @param isRO
* @throws IOException
*/
void onConnected(
int _negotiatedSessionTimeout,
Expand Down Expand Up @@ -1629,7 +1608,7 @@ public void sendPacket(Record request, Record response, AsyncCallback cb, int op
ReplyHeader r = new ReplyHeader();
r.setXid(xid);

Packet p = new Packet(h, r, request, response, null, false);
Packet p = new Packet(h, r, request, response, null);
p.cb = cb;
sendThread.sendPacket(p);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.zookeeper.client.ZKClientConfig;
import org.apache.zookeeper.common.Time;
import org.apache.zookeeper.common.ZKConfig;
import org.apache.zookeeper.compat.ProtocolManager;
import org.apache.zookeeper.proto.ConnectResponse;
import org.apache.zookeeper.server.ByteBufferInputStream;
import org.slf4j.Logger;
Expand All @@ -48,6 +49,8 @@ abstract class ClientCnxnSocket {

private static final Logger LOG = LoggerFactory.getLogger(ClientCnxnSocket.class);

private final ProtocolManager protocolManager = new ProtocolManager();

protected boolean initialized;

/**
Expand Down Expand Up @@ -131,27 +134,18 @@ void readConnectResult() throws IOException {
}
buf.append("]");
if (LOG.isTraceEnabled()) {
LOG.trace("readConnectResult {} {}", incomingBuffer.remaining(), buf.toString());
LOG.trace("readConnectResult {} {}", incomingBuffer.remaining(), buf);
}
}

ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ConnectResponse conRsp = new ConnectResponse();
conRsp.deserialize(bbia, "connect");

// read "is read-only" flag
boolean isRO = false;
try {
isRO = bbia.readBool("readOnly");
} catch (IOException e) {
// this is ok -- just a packet from an old server which
// doesn't contain readOnly field
ConnectResponse conRsp = protocolManager.deserializeConnectResponse(bbia);
if (protocolManager.isReadonlyAvailable()) {
LOG.warn("Connected to an old server; r-o mode will be unavailable");
}

this.sessionId = conRsp.getSessionId();
sendThread.onConnected(conRsp.getTimeOut(), this.sessionId, conRsp.getPasswd(), isRO);
sendThread.onConnected(conRsp.getTimeOut(), this.sessionId, conRsp.getPasswd(), conRsp.getReadOnly());
}

abstract boolean isConnected();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.zookeeper.compat;

import java.io.IOException;
import org.apache.jute.InputArchive;
import org.apache.zookeeper.proto.ConnectRequest;
import org.apache.zookeeper.proto.ConnectResponse;

/**
* A manager for switching behaviours between difference wire protocol.
* <p>
* Basically, wire protocol should be backward and forward compatible between minor versions.
* However, there are several cases that it's different due to Jute's limitations.
*/
public final class ProtocolManager {
private volatile Boolean isReadonlyAvailable = null;

public boolean isReadonlyAvailable() {
return isReadonlyAvailable != null && isReadonlyAvailable;
}

/**
* Deserializing {@link ConnectRequest} should be specially handled for request from client
* version before and including ZooKeeper 3.3 which doesn't understand readOnly field.
*/
public ConnectRequest deserializeConnectRequest(InputArchive inputArchive) throws IOException {
if (isReadonlyAvailable != null) {
if (isReadonlyAvailable) {
return deserializeConnectRequestWithReadonly(inputArchive);
} else {
return deserializeConnectRequestWithoutReadonly(inputArchive);
}
}

final ConnectRequest request = deserializeConnectRequestWithoutReadonly(inputArchive);
try {
request.setReadOnly(inputArchive.readBool("readOnly"));
this.isReadonlyAvailable = true;
} catch (Exception e) {
request.setReadOnly(false); // old version doesn't have readonly concept
this.isReadonlyAvailable = false;
}
return request;
}

private ConnectRequest deserializeConnectRequestWithReadonly(InputArchive inputArchive) throws IOException {
final ConnectRequest request = new ConnectRequest();
request.deserialize(inputArchive, "connect");
return request;
}

private ConnectRequest deserializeConnectRequestWithoutReadonly(InputArchive inputArchive) throws IOException {
final ConnectRequest request = new ConnectRequest();
inputArchive.startRecord("connect");
request.setProtocolVersion(inputArchive.readInt("protocolVersion"));
request.setLastZxidSeen(inputArchive.readLong("lastZxidSeen"));
request.setTimeOut(inputArchive.readInt("timeOut"));
request.setSessionId(inputArchive.readLong("sessionId"));
request.setPasswd(inputArchive.readBuffer("passwd"));
inputArchive.endRecord("connect");
return request;
}

/**
* Deserializing {@link ConnectResponse} should be specially handled for response from server
* version before and including ZooKeeper 3.3 which doesn't understand readOnly field.
*/
public ConnectResponse deserializeConnectResponse(InputArchive inputArchive) throws IOException {
if (isReadonlyAvailable != null) {
if (isReadonlyAvailable) {
return deserializeConnectResponseWithReadonly(inputArchive);
} else {
return deserializeConnectResponseWithoutReadonly(inputArchive);
}
}

final ConnectResponse response = deserializeConnectResponseWithoutReadonly(inputArchive);
try {
response.setReadOnly(inputArchive.readBool("readOnly"));
this.isReadonlyAvailable = true;
} catch (Exception e) {
response.setReadOnly(false); // old version doesn't have readonly concept
this.isReadonlyAvailable = false;
}
return response;
}

private ConnectResponse deserializeConnectResponseWithReadonly(InputArchive inputArchive) throws IOException {
final ConnectResponse response = new ConnectResponse();
response.deserialize(inputArchive, "connect");
return response;
}

private ConnectResponse deserializeConnectResponseWithoutReadonly(InputArchive inputArchive) throws IOException {
final ConnectResponse response = new ConnectResponse();
inputArchive.startRecord("connect");
response.setProtocolVersion(inputArchive.readInt("protocolVersion"));
response.setTimeOut(inputArchive.readInt("timeOut"));
response.setSessionId(inputArchive.readLong("sessionId"));
response.setPasswd(inputArchive.readBuffer("passwd"));
inputArchive.endRecord("connect");
return response;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.compat.ProtocolManager;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.metrics.Counter;
Expand All @@ -60,16 +61,9 @@ public abstract class ServerCnxn implements Stats, Watcher {
public static final Object me = new Object();
private static final Logger LOG = LoggerFactory.getLogger(ServerCnxn.class);

private Set<Id> authInfo = Collections.newSetFromMap(new ConcurrentHashMap<Id, Boolean>());

/**
* If the client is of old version, we don't send r-o mode info to it.
* The reason is that if we would, old C client doesn't read it, which
* results in TCP RST packet, i.e. "connection reset by peer".
*/
boolean isOldClient = true;

AtomicLong outstandingCount = new AtomicLong();
public final ProtocolManager protocolManager = new ProtocolManager();
private final Set<Id> authInfo = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final AtomicLong outstandingCount = new AtomicLong();

/** The ZooKeeperServer for this connection. May be null if the server
* is not currently serving requests (for example if the server is not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1073,14 +1073,12 @@ public void finishSessionInit(ServerCnxn cnxn, boolean valid) {
valid ? cnxn.getSessionTimeout() : 0,
valid ? cnxn.getSessionId() : 0, // send 0 if session is no
// longer valid
valid ? generatePasswd(cnxn.getSessionId()) : new byte[16]);
valid ? generatePasswd(cnxn.getSessionId()) : new byte[16],
this instanceof ReadOnlyZooKeeperServer);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
bos.writeInt(-1, "len");
rsp.serialize(bos, "connect");
if (!cnxn.isOldClient) {
bos.writeBool(this instanceof ReadOnlyZooKeeperServer, "readOnly");
}
baos.close();
ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
bb.putInt(bb.remaining() - 4).rewind();
Expand Down Expand Up @@ -1381,8 +1379,7 @@ public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer)
throws IOException, ClientCnxnLimitException {

BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
ConnectRequest connReq = new ConnectRequest();
connReq.deserialize(bia, "connect");
ConnectRequest connReq = cnxn.protocolManager.deserializeConnectRequest(bia);
LOG.debug(
"Session establishment request from client {} client's lastZxid is 0x{}",
cnxn.getRemoteSocketAddress(),
Expand All @@ -1406,21 +1403,15 @@ public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer)
throw new ClientCnxnLimitException();
}
ServerMetrics.getMetrics().CONNECTION_TOKEN_DEFICIT.add(connThrottle.getDeficit());

ServerMetrics.getMetrics().CONNECTION_REQUEST_COUNT.add(1);

boolean readOnly = false;
try {
readOnly = bia.readBool("readOnly");
cnxn.isOldClient = false;
} catch (IOException e) {
// this is ok -- just a packet from an old client which
// doesn't contain readOnly field
if (cnxn.protocolManager.isReadonlyAvailable()) {
LOG.warn(
"Connection request from old client {}; will be dropped if server is in r-o mode",
cnxn.getRemoteSocketAddress());
}
if (!readOnly && this instanceof ReadOnlyZooKeeperServer) {

if (!connReq.getReadOnly() && this instanceof ReadOnlyZooKeeperServer) {
String msg = "Refusing session request for not-read-only client " + cnxn.getRemoteSocketAddress();
LOG.info(msg);
throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.NOT_READ_ONLY_CLIENT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,6 @@ public MockPacket(
super(requestHeader, replyHeader, request, response, watchRegistration);
}

public MockPacket(
RequestHeader requestHeader,
ReplyHeader replyHeader,
Record request,
Record response,
WatchRegistration watchRegistration,
boolean readOnly) {
super(requestHeader, replyHeader, request, response, watchRegistration, readOnly);
}

public ByteBuffer createAndReturnBB() {
createBB();
return this.bb;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ public void submitRequest(Request si) {
zks.setZKDatabase(new ZKDatabase(fileTxnSnapLog));
zks.createSessionTracker();

ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
ServerCnxn cnxn = new MockServerCnxn();

ConnectRequest connReq = new ConnectRequest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,9 @@ public void testClientZxidAhead() {
output.put((byte) 1);
output.flip();

ServerCnxn.CloseRequestException e = assertThrows(ServerCnxn.CloseRequestException.class, () -> {
final NIOServerCnxn nioServerCnxn = mock(NIOServerCnxn.class);
zooKeeperServer.processConnectRequest(nioServerCnxn, output);
});
ServerCnxn.CloseRequestException e = assertThrows(
ServerCnxn.CloseRequestException.class,
() -> zooKeeperServer.processConnectRequest(new MockServerCnxn(), output));
assertEquals(e.getReason(), ServerCnxn.DisconnectReason.CLIENT_ZXID_AHEAD);
}

Expand Down
Loading

0 comments on commit b154a55

Please sign in to comment.