Skip to content

Commit

Permalink
Revert renaming some variable name of MasterService
Browse files Browse the repository at this point in the history
Signed-off-by: Tianli Feng <ftianli@amazon.com>
  • Loading branch information
Tianli Feng committed Jul 14, 2022
1 parent c4ed72f commit 6030c53
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -911,7 +911,7 @@ assert getLocalNode().equals(applierState.nodes().getMasterNode())

final boolean activePublication = currentPublication.map(CoordinatorPublication::isActiveForCurrentLeader).orElse(false);
if (becomingClusterManager && activePublication == false) {
// cluster state update task to become cluster-manager is submitted to ClusterManagerService,
// cluster state update task to become cluster-manager is submitted to MasterService,
// but publication has not started yet
assert followersChecker.getKnownFollowers().isEmpty() : followersChecker.getKnownFollowers();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ void stabilise(long stabilisationDurationMillis) {
final ClusterNode leader = getAnyLeader();
final long leaderTerm = leader.coordinator.getCurrentTerm();

final int pendingTaskCount = leader.clusterManagerService.getFakeMasterServicePendingTaskCount();
final int pendingTaskCount = leader.masterService.getFakeMasterServicePendingTaskCount();
runFor((pendingTaskCount + 1) * DEFAULT_CLUSTER_STATE_UPDATE_DELAY, "draining task queue");

final Matcher<Long> isEqualToLeaderVersion = equalTo(leader.coordinator.getLastAcceptedState().getVersion());
Expand Down Expand Up @@ -1025,7 +1025,7 @@ class ClusterNode {
private final DiscoveryNode localNode;
final MockPersistedState persistedState;
final Settings nodeSettings;
private AckedFakeThreadPoolMasterService clusterManagerService;
private AckedFakeThreadPoolMasterService masterService;
private DisruptableClusterApplierService clusterApplierService;
private ClusterService clusterService;
TransportService transportService;
Expand Down Expand Up @@ -1105,7 +1105,7 @@ protected Optional<DisruptableMockTransport> getDisruptableMockTransport(Transpo
null,
emptySet()
);
clusterManagerService = new AckedFakeThreadPoolMasterService(
masterService = new AckedFakeThreadPoolMasterService(
localNode.getId(),
"test",
threadPool,
Expand All @@ -1119,7 +1119,7 @@ protected Optional<DisruptableMockTransport> getDisruptableMockTransport(Transpo
deterministicTaskQueue,
threadPool
);
clusterService = new ClusterService(settings, clusterSettings, clusterManagerService, clusterApplierService);
clusterService = new ClusterService(settings, clusterSettings, masterService, clusterApplierService);
clusterService.setNodeConnectionsService(
new NodeConnectionsService(clusterService.getSettings(), threadPool, transportService)
);
Expand All @@ -1134,7 +1134,7 @@ protected Optional<DisruptableMockTransport> getDisruptableMockTransport(Transpo
transportService,
writableRegistry(),
allocationService,
clusterManagerService,
masterService,
this::getPersistedState,
Cluster.this::provideSeedHosts,
clusterApplierService,
Expand All @@ -1144,7 +1144,7 @@ protected Optional<DisruptableMockTransport> getDisruptableMockTransport(Transpo
getElectionStrategy(),
nodeHealthService
);
clusterManagerService.setClusterStatePublisher(coordinator);
masterService.setClusterStatePublisher(coordinator);
final GatewayService gatewayService = new GatewayService(
settings,
allocationService,
Expand Down Expand Up @@ -1331,11 +1331,11 @@ AckCollector submitUpdateTask(
onNode(() -> {
logger.trace("[{}] submitUpdateTask: enqueueing [{}]", localNode.getId(), source);
final long submittedTerm = coordinator.getCurrentTerm();
clusterManagerService.submitStateUpdateTask(source, new ClusterStateUpdateTask() {
masterService.submitStateUpdateTask(source, new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
assertThat(currentState.term(), greaterThanOrEqualTo(submittedTerm));
clusterManagerService.nextAckCollector = ackCollector;
masterService.nextAckCollector = ackCollector;
return clusterStateUpdate.apply(currentState);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,21 +84,21 @@ public void testFakeClusterManagerService() {
doAnswer(invocationOnMock -> runnableTasks.add((Runnable) invocationOnMock.getArguments()[0])).when(executorService).execute(any());
when(mockThreadPool.generic()).thenReturn(executorService);

FakeThreadPoolMasterService clusterManagerService = new FakeThreadPoolMasterService(
FakeThreadPoolMasterService masterService = new FakeThreadPoolMasterService(
"test_node",
"test",
mockThreadPool,
runnableTasks::add
);
clusterManagerService.setClusterStateSupplier(lastClusterStateRef::get);
clusterManagerService.setClusterStatePublisher((event, publishListener, ackListener) -> {
masterService.setClusterStateSupplier(lastClusterStateRef::get);
masterService.setClusterStatePublisher((event, publishListener, ackListener) -> {
lastClusterStateRef.set(event.state());
publishingCallback.set(publishListener);
});
clusterManagerService.start();
masterService.start();

AtomicBoolean firstTaskCompleted = new AtomicBoolean();
clusterManagerService.submitStateUpdateTask("test1", new ClusterStateUpdateTask() {
masterService.submitStateUpdateTask("test1", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return ClusterState.builder(currentState)
Expand Down Expand Up @@ -138,7 +138,7 @@ public void onFailure(String source, Exception e) {
assertThat(runnableTasks.size(), equalTo(0));

AtomicBoolean secondTaskCompleted = new AtomicBoolean();
clusterManagerService.submitStateUpdateTask("test2", new ClusterStateUpdateTask() {
masterService.submitStateUpdateTask("test2", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return ClusterState.builder(currentState)
Expand Down

0 comments on commit 6030c53

Please sign in to comment.