Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize JobInstance #1861

Merged
merged 1 commit into from
Mar 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -19,33 +19,42 @@

import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import org.apache.shardingsphere.elasticjob.infra.env.IpUtils;

import java.lang.management.ManagementFactory;

/**
* Job instance.
*/
@RequiredArgsConstructor
@Getter
@Setter
@EqualsAndHashCode(of = "jobInstanceId")
public final class JobInstance {

private static final String DELIMITER = "@-@";

private final String jobInstanceId;
private String jobInstanceId;

private String labels;

private String serverIp;

public JobInstance() {
jobInstanceId = IpUtils.getIp() + DELIMITER + ManagementFactory.getRuntimeMXBean().getName().split("@")[0];
this(IpUtils.getIp() + DELIMITER + ManagementFactory.getRuntimeMXBean().getName().split("@")[0]);
}

public JobInstance(final String jobInstanceId) {
this(jobInstanceId, null);
}

public JobInstance(final String jobInstanceId, final String labels) {
this(jobInstanceId, labels, IpUtils.getIp());
}

/**
* Get server IP address.
*
* @return server IP address
*/
public String getIp() {
return jobInstanceId.substring(0, jobInstanceId.indexOf(DELIMITER));
public JobInstance(final String jobInstanceId, final String labels, final String serverIp) {
this.jobInstanceId = jobInstanceId;
this.labels = labels;
this.serverIp = serverIp;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.shardingsphere.elasticjob.infra.handler.sharding;

import org.apache.shardingsphere.elasticjob.infra.env.IpUtils;
import org.apache.shardingsphere.elasticjob.infra.yaml.YamlEngine;
import org.junit.Test;

import static org.hamcrest.CoreMatchers.is;
Expand All @@ -32,6 +33,14 @@ public void assertGetJobInstanceId() {

@Test
public void assertGetIp() {
assertThat(new JobInstance().getIp(), is(IpUtils.getIp()));
assertThat(new JobInstance().getServerIp(), is(IpUtils.getIp()));
}

@Test
public void assertYamlConvert() {
JobInstance actual = YamlEngine.unmarshal(YamlEngine.marshal(new JobInstance("id", "labels")), JobInstance.class);
assertThat(actual.getJobInstanceId(), is("id"));
assertThat(actual.getServerIp(), is(IpUtils.getIp()));
assertThat(actual.getLabels(), is("labels"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ private boolean isActiveElection(final String path, final String data) {

private boolean isPassiveElection(final String path, final Type eventType) {
JobInstance jobInstance = JobRegistry.getInstance().getJobInstance(jobName);
return !Objects.isNull(jobInstance) && isLeaderCrashed(path, eventType) && serverService.isAvailableServer(jobInstance.getIp());
return !Objects.isNull(jobInstance) && isLeaderCrashed(path, eventType) && serverService.isAvailableServer(jobInstance.getServerIp());
}

private boolean isLeaderCrashed(final String path, final Type eventType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public boolean isLeaderUntilBlock() {
while (!hasLeader() && serverService.hasAvailableServers()) {
log.info("Leader is electing, waiting for {} ms", 100);
BlockUtils.waitingShortTime();
if (!JobRegistry.getInstance().isShutdown(jobName) && serverService.isAvailableServer(JobRegistry.getInstance().getJobInstance(jobName).getIp())) {
if (!JobRegistry.getInstance().isShutdown(jobName) && serverService.isAvailableServer(JobRegistry.getInstance().getJobInstance(jobName).getServerIp())) {
electLeader();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,16 @@ public String getLocalInstancePath() {
* @return local instance value
*/
public String getLocalInstanceValue() {
return YamlEngine.marshal(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
return YamlEngine.marshal(JobRegistry.getInstance().getJobInstance(jobName));
}

String getInstancePath(final String instanceId) {

/**
* Get instance path.
*
* @param instanceId instance id
* @return instance path
*/
public String getInstancePath(final String instanceId) {
return String.format(INSTANCES, instanceId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.shardingsphere.elasticjob.lite.internal.instance;

import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
import org.apache.shardingsphere.elasticjob.infra.yaml.YamlEngine;
import org.apache.shardingsphere.elasticjob.lite.internal.server.ServerService;
import org.apache.shardingsphere.elasticjob.lite.internal.storage.JobNodeStorage;
import org.apache.shardingsphere.elasticjob.lite.internal.trigger.TriggerNode;
Expand Down Expand Up @@ -50,7 +51,7 @@ public InstanceService(final CoordinatorRegistryCenter regCenter, final String j
* Persist job online status.
*/
public void persistOnline() {
jobNodeStorage.fillEphemeralJobNode(instanceNode.getLocalInstancePath(), "");
jobNodeStorage.fillEphemeralJobNode(instanceNode.getLocalInstancePath(), instanceNode.getLocalInstanceValue());
}

/**
Expand All @@ -68,8 +69,8 @@ public void removeInstance() {
public List<JobInstance> getAvailableJobInstances() {
List<JobInstance> result = new LinkedList<>();
for (String each : jobNodeStorage.getJobNodeChildrenKeys(InstanceNode.ROOT)) {
JobInstance jobInstance = new JobInstance(each);
if (serverService.isEnableServer(jobInstance.getIp())) {
JobInstance jobInstance = YamlEngine.unmarshal(jobNodeStorage.getJobNodeData(instanceNode.getInstancePath(each)), JobInstance.class);
if (serverService.isEnableServer(jobInstance.getServerIp())) {
result.add(new JobInstance(each));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void stateChanged(final CuratorFramework client, final ConnectionState ne
if (ConnectionState.SUSPENDED == newState || ConnectionState.LOST == newState) {
jobScheduleController.pauseJob();
} else if (ConnectionState.RECONNECTED == newState) {
serverService.persistOnline(serverService.isEnableServer(JobRegistry.getInstance().getJobInstance(jobName).getIp()));
serverService.persistOnline(serverService.isEnableServer(JobRegistry.getInstance().getJobInstance(jobName).getServerIp()));
instanceService.persistOnline();
executionService.clearRunningInfo(shardingService.getLocalShardingItems());
jobScheduleController.resumeJob();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public boolean isLocalServerPath(final String path) {
if (Objects.isNull(jobInstance)) {
return false;
}
return path.equals(jobNodePath.getFullPath(String.format(SERVERS, jobInstance.getIp())));
return path.equals(jobNodePath.getFullPath(String.format(SERVERS, jobInstance.getServerIp())));
}

String getServerNode(final String ip) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public ServerService(final CoordinatorRegistryCenter regCenter, final String job
*/
public void persistOnline(final boolean enabled) {
if (!JobRegistry.getInstance().isShutdown(jobName)) {
jobNodeStorage.fillJobNode(serverNode.getServerNode(JobRegistry.getInstance().getJobInstance(jobName).getIp()), enabled ? ServerStatus.ENABLED.name() : ServerStatus.DISABLED.name());
jobNodeStorage.fillJobNode(serverNode.getServerNode(JobRegistry.getInstance().getJobInstance(jobName).getServerIp()), enabled ? ServerStatus.ENABLED.name() : ServerStatus.DISABLED.name());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobShardingStrategy;
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobShardingStrategyFactory;
import org.apache.shardingsphere.elasticjob.infra.yaml.YamlEngine;
import org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService;
import org.apache.shardingsphere.elasticjob.lite.internal.election.LeaderService;
import org.apache.shardingsphere.elasticjob.lite.internal.instance.InstanceNode;
Expand Down Expand Up @@ -59,6 +60,8 @@ public final class ShardingService {

private final InstanceService instanceService;

private final InstanceNode instanceNode;

private final ServerService serverService;

private final ExecutionService executionService;
Expand All @@ -71,6 +74,7 @@ public ShardingService(final CoordinatorRegistryCenter regCenter, final String j
leaderService = new LeaderService(regCenter, jobName);
configService = new ConfigurationService(regCenter, jobName);
instanceService = new InstanceService(regCenter, jobName);
instanceNode = new InstanceNode(jobName);
serverService = new ServerService(regCenter, jobName);
executionService = new ExecutionService(regCenter, jobName);
jobNodePath = new JobNodePath(jobName);
Expand Down Expand Up @@ -157,8 +161,8 @@ private void resetShardingInfo(final int shardingTotalCount) {
* @return sharding items
*/
public List<Integer> getShardingItems(final String jobInstanceId) {
JobInstance jobInstance = new JobInstance(jobInstanceId);
if (!serverService.isAvailableServer(jobInstance.getIp())) {
JobInstance jobInstance = YamlEngine.unmarshal(jobNodeStorage.getJobNodeData(instanceNode.getInstancePath(jobInstanceId)), JobInstance.class);
if (!serverService.isAvailableServer(jobInstance.getServerIp())) {
return Collections.emptyList();
}
List<Integer> result = new LinkedList<>();
Expand All @@ -177,7 +181,7 @@ public List<Integer> getShardingItems(final String jobInstanceId) {
* @return sharding items from localhost job server
*/
public List<Integer> getLocalShardingItems() {
if (JobRegistry.getInstance().isShutdown(jobName) || !serverService.isAvailableServer(JobRegistry.getInstance().getJobInstance(jobName).getIp())) {
if (JobRegistry.getInstance().isShutdown(jobName) || !serverService.isAvailableServer(JobRegistry.getInstance().getJobInstance(jobName).getServerIp())) {
return Collections.emptyList();
}
return getShardingItems(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public DisabledJobIntegrateTest(final TestType type) {

protected final void assertDisabledRegCenterInfo() {
assertThat(JobRegistry.getInstance().getCurrentShardingTotalCount(getJobName()), is(3));
assertThat(JobRegistry.getInstance().getJobInstance(getJobName()).getIp(), is(IpUtils.getIp()));
assertThat(JobRegistry.getInstance().getJobInstance(getJobName()).getServerIp(), is(IpUtils.getIp()));
JobConfiguration jobConfig = YamlEngine.unmarshal(getREGISTRY_CENTER().get("/" + getJobName() + "/config"), JobConfigurationPOJO.class).toJobConfiguration();
assertThat(jobConfig.getShardingTotalCount(), is(3));
if (getJobBootstrap() instanceof ScheduleJobBootstrap) {
Expand All @@ -49,7 +49,7 @@ protected final void assertDisabledRegCenterInfo() {
assertNull(jobConfig.getCron());
}
assertThat(jobConfig.getShardingItemParameters(), is("0=A,1=B,2=C"));
assertThat(getREGISTRY_CENTER().get("/" + getJobName() + "/servers/" + JobRegistry.getInstance().getJobInstance(getJobName()).getIp()), is(ServerStatus.DISABLED.name()));
assertThat(getREGISTRY_CENTER().get("/" + getJobName() + "/servers/" + JobRegistry.getInstance().getJobInstance(getJobName()).getServerIp()), is(ServerStatus.DISABLED.name()));
while (null != getREGISTRY_CENTER().get("/" + getJobName() + "/leader/election/instance")) {
BlockUtils.waitingShortTime();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public void assertJobRunning() {
}

private void setJobEnable() {
getREGISTRY_CENTER().persist("/" + getJobName() + "/servers/" + JobRegistry.getInstance().getJobInstance(getJobName()).getIp(), ServerStatus.ENABLED.name());
getREGISTRY_CENTER().persist("/" + getJobName() + "/servers/" + JobRegistry.getInstance().getJobInstance(getJobName()).getServerIp(), ServerStatus.ENABLED.name());
}

private void assertEnabledRegCenterInfo() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ protected EnabledJobIntegrateTest(final TestType type, final ElasticJob elasticJ
@Before
public final void assertEnabledRegCenterInfo() {
assertThat(JobRegistry.getInstance().getCurrentShardingTotalCount(getJobName()), is(3));
assertThat(JobRegistry.getInstance().getJobInstance(getJobName()).getIp(), is(IpUtils.getIp()));
assertThat(JobRegistry.getInstance().getJobInstance(getJobName()).getServerIp(), is(IpUtils.getIp()));
JobConfiguration jobConfig = YamlEngine.unmarshal(getREGISTRY_CENTER().get("/" + getJobName() + "/config"), JobConfigurationPOJO.class).toJobConfiguration();
assertThat(jobConfig.getShardingTotalCount(), is(3));
if (getJobBootstrap() instanceof ScheduleJobBootstrap) {
Expand All @@ -51,7 +51,7 @@ public final void assertEnabledRegCenterInfo() {
assertNull(jobConfig.getCron());
}
assertThat(jobConfig.getShardingItemParameters(), is("0=A,1=B,2=C"));
assertThat(getREGISTRY_CENTER().get("/" + getJobName() + "/servers/" + JobRegistry.getInstance().getJobInstance(getJobName()).getIp()), is(ServerStatus.ENABLED.name()));
assertThat(getREGISTRY_CENTER().get("/" + getJobName() + "/servers/" + JobRegistry.getInstance().getJobInstance(getJobName()).getServerIp()), is(ServerStatus.ENABLED.name()));
assertThat(getREGISTRY_CENTER().get("/" + getJobName() + "/leader/election/instance"), is(JobRegistry.getInstance().getJobInstance(getJobName()).getJobInstanceId()));
assertTrue(getREGISTRY_CENTER().isExisted("/" + getJobName() + "/instances/" + JobRegistry.getInstance().getJobInstance(getJobName()).getJobInstanceId()));
getREGISTRY_CENTER().remove("/" + getJobName() + "/leader/election");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import org.apache.shardingsphere.elasticjob.lite.internal.server.ServerService;
import org.apache.shardingsphere.elasticjob.lite.internal.server.ServerStatus;
import org.apache.shardingsphere.elasticjob.lite.internal.storage.JobNodeStorage;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.lite.util.ReflectionUtils;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -59,7 +59,7 @@ public final class ElectionListenerManagerTest {

@Before
public void setUp() {
JobRegistry.getInstance().addJobInstance("test_job", new JobInstance("127.0.0.1@-@0"));
JobRegistry.getInstance().addJobInstance("test_job", new JobInstance("127.0.0.1@-@0", null, "127.0.0.1"));
ReflectionUtils.setSuperclassFieldValue(electionListenerManager, "jobNodeStorage", jobNodeStorage);
ReflectionUtils.setFieldValue(electionListenerManager, "leaderService", leaderService);
ReflectionUtils.setFieldValue(electionListenerManager, "serverService", serverService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobScheduleController;
import org.apache.shardingsphere.elasticjob.lite.internal.server.ServerService;
import org.apache.shardingsphere.elasticjob.lite.internal.storage.JobNodeStorage;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.lite.util.ReflectionUtils;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -58,7 +58,7 @@ public final class LeaderServiceTest {

@Before
public void setUp() {
JobRegistry.getInstance().addJobInstance("test_job", new JobInstance("127.0.0.1@-@0"));
JobRegistry.getInstance().addJobInstance("test_job", new JobInstance("127.0.0.1@-@0", null, "127.0.0.1"));
leaderService = new LeaderService(null, "test_job");
ReflectionUtils.setFieldValue(leaderService, "jobNodeStorage", jobNodeStorage);
ReflectionUtils.setFieldValue(leaderService, "serverService", serverService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public final class InstanceServiceTest {

@Before
public void setUp() {
JobRegistry.getInstance().addJobInstance("test_job", new JobInstance("127.0.0.1@-@0"));
JobRegistry.getInstance().addJobInstance("test_job", new JobInstance("127.0.0.1@-@0", null, "127.0.0.1"));
instanceService = new InstanceService(null, "test_job");
InstanceNode instanceNode = new InstanceNode("test_job");
ReflectionUtils.setFieldValue(instanceService, "instanceNode", instanceNode);
Expand All @@ -61,9 +61,9 @@ public void setUp() {
@Test
public void assertPersistOnline() {
instanceService.persistOnline();
verify(jobNodeStorage).fillEphemeralJobNode("instances/127.0.0.1@-@0", "");
verify(jobNodeStorage).fillEphemeralJobNode("instances/127.0.0.1@-@0", "jobInstanceId: 127.0.0.1@-@0\nserverIp: 127.0.0.1\n");
}

@Test
public void assertRemoveInstance() {
instanceService.removeInstance();
Expand All @@ -73,6 +73,8 @@ public void assertRemoveInstance() {
@Test
public void assertGetAvailableJobInstances() {
when(jobNodeStorage.getJobNodeChildrenKeys(InstanceNode.ROOT)).thenReturn(Arrays.asList("127.0.0.1@-@0", "127.0.0.2@-@0"));
when(jobNodeStorage.getJobNodeData("instances/127.0.0.1@-@0")).thenReturn("jobInstanceId: 127.0.0.1@-@0\nlabels: labels\nserverIp: 127.0.0.1\n");
when(jobNodeStorage.getJobNodeData("instances/127.0.0.2@-@0")).thenReturn("jobInstanceId: 127.0.0.2@-@0\nlabels: labels\nserverIp: 127.0.0.2\n");
when(serverService.isEnableServer("127.0.0.1")).thenReturn(true);
assertThat(instanceService.getAvailableJobInstances(), is(Collections.singletonList(new JobInstance("127.0.0.1@-@0"))));
}
Expand All @@ -82,7 +84,7 @@ public void assertIsLocalJobInstanceExisted() {
when(jobNodeStorage.isJobNodeExisted("instances/127.0.0.1@-@0")).thenReturn(true);
assertTrue(instanceService.isLocalJobInstanceExisted());
}

@Test
public void assertTriggerAllInstances() {
when(jobNodeStorage.getJobNodeChildrenKeys(InstanceNode.ROOT)).thenReturn(Arrays.asList("127.0.0.1@-@0", "127.0.0.2@-@0"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public final class RegistryCenterConnectionStateListenerTest {

@Before
public void setUp() {
JobRegistry.getInstance().addJobInstance("test_job", new JobInstance("127.0.0.1@-@0"));
JobRegistry.getInstance().addJobInstance("test_job", new JobInstance("127.0.0.1@-@0", null, "127.0.0.1"));
regCenterConnectionStateListener = new RegistryCenterConnectionStateListener(null, "test_job");
ReflectionUtils.setFieldValue(regCenterConnectionStateListener, "serverService", serverService);
ReflectionUtils.setFieldValue(regCenterConnectionStateListener, "instanceService", instanceService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public final class ServerNodeTest {

@BeforeClass
public static void setUp() {
JobRegistry.getInstance().addJobInstance("test_job", new JobInstance("127.0.0.1@-@0"));
JobRegistry.getInstance().addJobInstance("test_job", new JobInstance("127.0.0.1@-@0", null, "127.0.0.1"));
}

@Test
Expand Down
Loading