Skip to content

Commit

Permalink
Refactor ClusterWorkerIdGenerator (#31319)
Browse files Browse the repository at this point in the history
* Refactor ClusterWorkerIdGenerator

* Refactor ClusterWorkerIdGenerator

* Refactor ClusterWorkerIdGenerator

* Refactor ClusterWorkerIdGenerator

* Refactor ClusterWorkerIdGenerator
  • Loading branch information
terrymanu authored May 20, 2024
1 parent 8c9a165 commit 39212ee
Show file tree
Hide file tree
Showing 6 changed files with 13 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public interface WorkerIdGenerator {
/**
* Generate worker ID.
*
* @param props props
* @param props properties
* @return worker ID
*/
int generate(Properties props);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@
import org.apache.shardingsphere.mode.repository.cluster.exception.ClusterPersistRepositoryException;

import java.util.Collection;
import java.util.LinkedList;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/**
* Worker ID generator for cluster mode.
Expand Down Expand Up @@ -78,18 +79,11 @@ private int generateNewWorkerId() {
private Optional<Integer> generateAvailableWorkerId() {
Collection<Integer> assignedWorkerIds = computeNodeStatusService.getAssignedWorkerIds();
ShardingSpherePreconditions.checkState(assignedWorkerIds.size() <= MAX_WORKER_ID + 1, WorkerIdAssignedException::new);
Collection<Integer> availableWorkerIds = new LinkedList<>();
for (int i = 0; i < 1024; i++) {
availableWorkerIds.add(i);
}
PriorityQueue<Integer> priorityQueue = new PriorityQueue<>(availableWorkerIds);
for (Integer each : assignedWorkerIds) {
priorityQueue.remove(each);
}
Integer preselectedWorkerId = priorityQueue.poll();
Preconditions.checkState(null != preselectedWorkerId, "Preselected worker-id can not be null.");
PriorityQueue<Integer> availableWorkerIds = IntStream.range(0, 1024).boxed().filter(each -> !assignedWorkerIds.contains(each)).collect(Collectors.toCollection(PriorityQueue::new));
Integer preselectedWorkerId = availableWorkerIds.poll();
Preconditions.checkNotNull(preselectedWorkerId, "Preselected worker-id can not be null.");
try {
repository.persistExclusiveEphemeral(WorkerIdNode.getWorkerIdGeneratorPath(preselectedWorkerId.toString()), instanceId);
repository.persistExclusiveEphemeral(WorkerIdNode.getWorkerIdGeneratorPath(preselectedWorkerId), instanceId);
return Optional.of(preselectedWorkerId);
} catch (final ClusterPersistRepositoryException ignore) {
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public final class WorkerIdNode {
* @param workerId worker id
* @return worker id generator path
*/
public static String getWorkerIdGeneratorPath(final String workerId) {
return String.join("/", "", ROOT_NODE, workerId);
public static String getWorkerIdGeneratorPath(final int workerId) {
return String.join("/", "", ROOT_NODE, String.valueOf(workerId));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@ class WorkerIdNodeTest {

@Test
void assertGetWorkerIdGeneratorPath() {
assertThat(WorkerIdNode.getWorkerIdGeneratorPath("1"), is("/worker_id/1"));
assertThat(WorkerIdNode.getWorkerIdGeneratorPath(1), is("/worker_id/1"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public interface ClusterPersistRepository extends PersistRepository {
* Persist exclusive ephemeral data.
*
* @param key key of data
* @param value is persisted or not
* @param value value of data
*/
void persistExclusiveEphemeral(String key, String value);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.shardingsphere.mode.manager.standalone.workerid.generator;

import org.apache.shardingsphere.infra.instance.workerid.WorkerIdAssignedException;
import org.apache.shardingsphere.infra.instance.workerid.WorkerIdGenerator;
import org.apache.shardingsphere.test.util.PropertiesBuilder;
import org.apache.shardingsphere.test.util.PropertiesBuilder.Property;
Expand All @@ -42,6 +43,6 @@ void assertGenerateWithProperties() {

@Test
void assertGenerateWithInvalidProperties() {
assertThrows(IllegalStateException.class, () -> new StandaloneWorkerIdGenerator().generate(PropertiesBuilder.build(new Property(WorkerIdGenerator.WORKER_ID_KEY, "1024"))));
assertThrows(WorkerIdAssignedException.class, () -> new StandaloneWorkerIdGenerator().generate(PropertiesBuilder.build(new Property(WorkerIdGenerator.WORKER_ID_KEY, "1024"))));
}
}

0 comments on commit 39212ee

Please sign in to comment.