Skip to content

Commit

Permalink
refactor for have all distributed messages of the new module in one b…
Browse files Browse the repository at this point in the history
…inary message
  • Loading branch information
tglman committed May 24, 2019
1 parent 48db5c0 commit 7be53b8
Show file tree
Hide file tree
Showing 27 changed files with 364 additions and 872 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,6 @@ public class OChannelBinaryProtocol {
public static final byte SUBSCRIBE_PUSH = 100;
public static final byte UNSUBSCRIBE_PUSH = 101;
public static final byte EXPERIMENTAL = 102;
public static final byte DISTRIBUTED_SUBMIT_REQUEST = 103;
public static final byte DISTRIBUTED_SUBMIT_RESPONSE = 104;
public static final byte DISTRIBUTED_OPERATION_REQUEST = 105;
public static final byte DISTRIBUTED_OPERATION_RESPONSE = 106;
public static final byte DISTRIBUTED_PROPAGATE_REQUEST = 107;
public static final byte DISTRIBUTED_ACK_RESPONSE = 108;
public static final byte DISTRIBUTED_CONFIRM_REQUEST = 109;

// REMOTE SB-TREE COLLECTIONS
public static final byte REQUEST_CREATE_SBTREE_BONSAI = 110;
Expand All @@ -117,15 +110,12 @@ public class OChannelBinaryProtocol {
public static final byte REQUEST_SBTREE_BONSAI_GET_ENTRIES_MAJOR = 113;
public static final byte REQUEST_RIDBAG_GET_SIZE = 114;

public static final byte DISTRIBUTED_STRUCTURAL_SUBMIT_REQUEST = 115;
public static final byte DISTRIBUTED_STRUCTURAL_SUBMIT_RESPONSE = 116;
public static final byte DISTRIBUTED_STRUCTURAL_OPERATION_REQUEST = 117;
public static final byte DISTRIBUTED_STRUCTURAL_OPERATION_RESPONSE = 118;

// TASK
public static final byte DISTRIBUTED_REQUEST = 120;
public static final byte DISTRIBUTED_RESPONSE = 121;
public static final byte DISTRIBUTED_CONNECT = 122;
public static final byte DISTRIBUTED_REQUEST = 120;
public static final byte DISTRIBUTED_RESPONSE = 121;
public static final byte DISTRIBUTED_CONNECT = 122;
public static final byte COORDINATED_DISTRIBUTED_MESSAGE = 123;

// INCOMING
public static final byte RESPONSE_STATUS_OK = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,11 @@ public void onAfterActivate() {
}

public void checkPort() {

// Use the inbound port in case it's not provided
if (this.nodeConfiguration.getTcpPort() == null) {
OServerNetworkListener protocol = server.getListenerByProtocol(ONetworkProtocolBinary.class);
this.nodeConfiguration.setTcpPort(protocol.getInboundAddr().getPort());
}

}

private ONodeInternalConfiguration generateInternalConfiguration() {
Expand Down Expand Up @@ -498,7 +496,6 @@ public synchronized void internalCreateDatabase(OSessionOperationId operationId,
//TODO:INIT CONFIG
super.create(database, null, null, ODatabaseType.valueOf(type), null);
getStructuralConfiguration().getSharedConfiguration().addDatabase(database);
getStructuralDistributedContext().getSubmitContext().receive(operationId);
this.databasesStatus.put(database, ODistributedStatus.ONLINE);
checkCoordinator(database);
//TODO: double check this notify, it may unblock as well checkReadyForHandleRequests that is not what is expected
Expand Down Expand Up @@ -541,10 +538,6 @@ public synchronized void checkDatabaseReady(String database) {
}
}

public OCoordinateMessagesFactory getCoordinateMessagesFactory() {
return networkManager.getCoordinateMessagesFactory();
}

@Override
public void close() {
if (networkManager != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.orientechnologies.orient.distributed.impl;

import com.orientechnologies.common.log.OLogManager;
import com.orientechnologies.orient.core.db.config.ONodeIdentity;
import com.orientechnologies.orient.distributed.OrientDBDistributed;
import com.orientechnologies.orient.distributed.impl.coordinator.ODistributedCoordinator;
import com.orientechnologies.orient.distributed.impl.coordinator.ODistributedExecutor;
Expand All @@ -26,94 +27,84 @@ private void checkDatabaseReady(String database) {
}

@Override
public void executeOperationRequest(OOperationRequest request) {
public void executeOperationRequest(ONodeIdentity sender, OOperationRequest request) {
checkDatabaseReady(request.getDatabase());
ODistributedContext distributedContext = distributed.getDistributedContext(request.getDatabase());
ODistributedExecutor executor = distributedContext.getExecutor();
ODistributedMember member = executor.getMember(request.getSenderNode());
ODistributedMember member = executor.getMember(sender);
executor.receive(member, request.getId(), request.getRequest());
}

@Override
public void executeOperationResponse(OOperationResponse response) {
public void executeOperationResponse(ONodeIdentity sender, OOperationResponse response) {
checkDatabaseReady(response.getDatabase());
ODistributedContext distributedContext = distributed.getDistributedContext(response.getDatabase());
ODistributedCoordinator coordinator = distributedContext.getCoordinator();
if (coordinator == null) {
OLogManager.instance().error(this, "Received coordinator response on a node that is not a coordinator ignoring it", null);
} else {
ODistributedMember member = coordinator.getMember(response.getSenderNode());
ODistributedMember member = coordinator.getMember(sender);
coordinator.receive(member, response.getId(), response.getResponse());
}
}

@Override
public void executeSubmitResponse(ONetworkSubmitResponse response) {
public void executeSubmitResponse(ONodeIdentity sender, ONetworkSubmitResponse response) {
checkDatabaseReady(response.getDatabase());
ODistributedContext distributedContext = distributed.getDistributedContext(response.getDatabase());
OSubmitContext context = distributedContext.getSubmitContext();
context.receive(response.getOperationId(), response.getResponse());
}

@Override
public void executeSubmitRequest(ONetworkSubmitRequest request) {
public void executeSubmitRequest(ONodeIdentity sender, ONetworkSubmitRequest request) {
checkDatabaseReady(request.getDatabase());
ODistributedContext distributedContext = distributed.getDistributedContext(request.getDatabase());
ODistributedCoordinator coordinator = distributedContext.getCoordinator();
if (coordinator == null) {
OLogManager.instance().error(this, "Received submit request on a node that is not a coordinator ignoring it", null);
} else {
ODistributedMember member = coordinator.getMember(request.getSenderNode());
ODistributedMember member = coordinator.getMember(sender);
coordinator.submit(member, request.getOperationId(), request.getRequest());
}
}

@Override
public void executeStructuralOperationRequest(OStructuralOperationRequest request) {
//TODO: To remove
}

@Override
public void executeStructuralOperationResponse(OStructuralOperationResponse response) {
//TODO: To remove
}

@Override
public void executeStructuralSubmitRequest(ONetworkStructuralSubmitRequest request) {
@Override
public void executeStructuralSubmitRequest(ONodeIdentity sender, ONetworkStructuralSubmitRequest request) {
OStructuralDistributedContext distributedContext = distributed.getStructuralDistributedContext();
distributedContext.execute(request.getSenderNode(), request.getOperationId(), request.getRequest());
distributedContext.execute(sender, request.getOperationId(), request.getRequest());
}

@Override
public void executeStructuralSubmitResponse(ONetworkStructuralSubmitResponse response) {
public void executeStructuralSubmitResponse(ONodeIdentity sender, ONetworkStructuralSubmitResponse response) {
OStructuralDistributedContext distributedContext = distributed.getStructuralDistributedContext();
OStructuralSubmitContext context = distributedContext.getSubmitContext();
context.receive(response.getOperationId(), response.getResponse());
}

@Override
public void executePropagate(ONetworkPropagate propagate) {
public void executePropagate(ONodeIdentity sender, ONetworkPropagate propagate) {
OStructuralDistributedContext distributedContext = distributed.getStructuralDistributedContext();
OStructuralFollower slave = distributedContext.getFollower();
OStructuralDistributedMember member = slave.getMember(propagate.getSenderNode());
OStructuralDistributedMember member = slave.getMember(sender);
slave.log(member, propagate.getId(), propagate.getOperation());
}

@Override
public void executeConfirm(ONetworkConfirm confirm) {
public void executeConfirm(ONodeIdentity sender, ONetworkConfirm confirm) {
OStructuralDistributedContext distributedContext = distributed.getStructuralDistributedContext();
OStructuralFollower slave = distributedContext.getFollower();
slave.confirm(confirm.getId());
}

@Override
public void executeAck(ONetworkAck ack) {
public void executeAck(ONodeIdentity sender, ONetworkAck ack) {
OStructuralDistributedContext distributedContext = distributed.getStructuralDistributedContext();
OStructuralLeader master = distributedContext.getLeader();
if (master == null) {
OLogManager.instance().error(this, "Received coordinator response on a node that is not a coordinator ignoring it", null);
} else {
master.receiveAck(ack.getSenderNode(), ack.getLogId());
master.receiveAck(sender, ack.getLogId());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
import com.orientechnologies.orient.distributed.impl.coordinator.OCoordinateMessagesFactory;
import com.orientechnologies.orient.distributed.impl.coordinator.ODistributedChannel;
import com.orientechnologies.orient.distributed.impl.coordinator.OLogId;
import com.orientechnologies.orient.distributed.impl.coordinator.network.*;
import com.orientechnologies.orient.distributed.impl.coordinator.network.ODistributedExecutable;
import com.orientechnologies.orient.distributed.impl.network.binary.OBinaryDistributedMessage;
import com.orientechnologies.orient.distributed.impl.network.binary.ODistributedChannelBinaryProtocol;
import com.orientechnologies.orient.enterprise.channel.binary.OChannelBinary;
import com.orientechnologies.orient.server.OClientConnection;
import com.orientechnologies.orient.server.distributed.ORemoteServerAvailabilityCheck;
Expand All @@ -20,9 +22,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import static com.orientechnologies.orient.enterprise.channel.binary.OChannelBinaryProtocol.*;
import static com.orientechnologies.orient.enterprise.channel.binary.OChannelBinaryProtocol.DISTRIBUTED_STRUCTURAL_OPERATION_RESPONSE;

public class ODistributedNetworkManager implements ODiscoveryListener {

private final ConcurrentMap<ONodeIdentity, ODistributedChannelBinaryProtocol> remoteServers = new ConcurrentHashMap<>();
Expand All @@ -31,17 +30,13 @@ public class ODistributedNetworkManager implements ODiscoveryListener {
private final ONodeInternalConfiguration internalConfiguration;
private OUDPMulticastNodeManager discoveryManager;
private OCoordinatedExecutorMessageHandler requestHandler;
private OCoordinateMessagesFactory coordinateMessagesFactory;

public ODistributedNetworkManager(OrientDBDistributed orientDB, ONodeConfiguration config,
ONodeInternalConfiguration internalConfiguration) {
this.orientDB = orientDB;
this.config = config;
this.internalConfiguration = internalConfiguration;
this.requestHandler = new OCoordinatedExecutorMessageHandler(orientDB);

//This now si simple but should be replaced by a factory depending to the protocol version
coordinateMessagesFactory = new OCoordinateMessagesFactory();
}

public ODistributedChannelBinaryProtocol getRemoteServer(final ONodeIdentity rNodeName) {
Expand Down Expand Up @@ -139,7 +134,7 @@ public Set<ONodeIdentity> getRemoteServers() {

public void coordinatedRequest(OClientConnection connection, int requestType, int clientTxId, OChannelBinary channel)
throws IOException {
OBinaryRequest<OBinaryResponse> request = newDistributedRequest(requestType);
OBinaryRequest<OBinaryResponse> request = new OBinaryDistributedMessage();
try {
request.read(channel, 0, null);
} catch (IOException e) {
Expand All @@ -150,35 +145,4 @@ public void coordinatedRequest(OClientConnection connection, int requestType, in
executable.executeDistributed(requestHandler);
}

private OBinaryRequest<OBinaryResponse> newDistributedRequest(int requestType) {
switch (requestType) {
case DISTRIBUTED_SUBMIT_REQUEST:
return new ONetworkSubmitRequest(coordinateMessagesFactory);
case DISTRIBUTED_SUBMIT_RESPONSE:
return new ONetworkSubmitResponse(coordinateMessagesFactory);
case DISTRIBUTED_OPERATION_REQUEST:
return new OOperationRequest(coordinateMessagesFactory);
case DISTRIBUTED_OPERATION_RESPONSE:
return new OOperationResponse(coordinateMessagesFactory);
case DISTRIBUTED_STRUCTURAL_SUBMIT_REQUEST:
return new ONetworkStructuralSubmitRequest(coordinateMessagesFactory);
case DISTRIBUTED_STRUCTURAL_SUBMIT_RESPONSE:
return new ONetworkStructuralSubmitResponse(coordinateMessagesFactory);
case DISTRIBUTED_STRUCTURAL_OPERATION_REQUEST:
return new OStructuralOperationRequest(coordinateMessagesFactory);
case DISTRIBUTED_STRUCTURAL_OPERATION_RESPONSE:
return new OStructuralOperationResponse(coordinateMessagesFactory);
case DISTRIBUTED_PROPAGATE_REQUEST:
return new ONetworkPropagate(coordinateMessagesFactory);
case DISTRIBUTED_ACK_RESPONSE:
return new ONetworkAck();
case DISTRIBUTED_CONFIRM_REQUEST:
return new ONetworkConfirm();
}
return null;
}

public OCoordinateMessagesFactory getCoordinateMessagesFactory() {
return coordinateMessagesFactory;
}
}
Loading

0 comments on commit 7be53b8

Please sign in to comment.