Skip to content

Commit 715139d

Browse files
committed
resolved comments
1 parent 74db560 commit 715139d

File tree

3 files changed

+8
-9
lines changed

3 files changed

+8
-9
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MetadataState.Stable;
4040
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MetadataState.Unstable;
4141
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData.state;
42-
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData.versionId;
4342
import static org.apache.pulsar.metadata.api.extended.SessionEvent.SessionLost;
4443
import static org.apache.pulsar.metadata.api.extended.SessionEvent.SessionReestablished;
4544
import com.google.common.annotations.VisibleForTesting;
@@ -104,7 +103,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
104103
NamespaceName.SYSTEM_NAMESPACE,
105104
"loadbalancer-service-unit-state").toString();
106105
private static final long MAX_IN_FLIGHT_STATE_WAITING_TIME_IN_MILLIS = 30 * 1000; // 30sec
107-
private static final long VERSION_ID_INIT = 1; // initial versionId
106+
public static final long VERSION_ID_INIT = 1; // initial versionId
108107
private static final long OWNERSHIP_MONITOR_DELAY_TIME_IN_SECS = 60;
109108
public static final long MAX_CLEAN_UP_DELAY_TIME_IN_SECS = 3 * 60; // 3 mins
110109
private static final long MIN_CLEAN_UP_DELAY_TIME_IN_SECS = 0; // 0 secs to clean immediately
@@ -454,11 +453,11 @@ public CompletableFuture<Optional<String>> getOwnerAsync(String serviceUnit) {
454453

455454
private long getNextVersionId(String serviceUnit) {
456455
var data = tableview.get(serviceUnit);
457-
return versionId(data) + 1;
456+
return getNextVersionId(data);
458457
}
459458

460459
private long getNextVersionId(ServiceUnitStateData data) {
461-
return versionId(data) + 1;
460+
return data == null ? VERSION_ID_INIT : data.versionId() + 1;
462461
}
463462

464463
public CompletableFuture<String> publishAssignEventAsync(String serviceUnit, String broker) {

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java

-4
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,4 @@ public ServiceUnitStateData(ServiceUnitState state, String broker, boolean force
5151
public static ServiceUnitState state(ServiceUnitStateData data) {
5252
return data == null ? ServiceUnitState.Init : data.state();
5353
}
54-
55-
public static long versionId(ServiceUnitStateData data) {
56-
return data == null ? 0 : data.versionId();
57-
}
5854
}

pulsar-broker/src/test/java/org/apache/pulsar/compaction/ServiceUnitStateCompactionTest.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Splitting;
2727
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.isValidTransition;
2828
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData.state;
29-
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData.versionId;
3029
import static org.testng.Assert.assertEquals;
3130
import static org.testng.Assert.assertNotNull;
3231
import static org.testng.Assert.assertNull;
@@ -55,6 +54,7 @@
5554
import org.apache.commons.lang3.tuple.Pair;
5655
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
5756
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState;
57+
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
5858
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateCompactionStrategy;
5959
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
6060
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -953,4 +953,8 @@ public void testReadUnCompacted()
953953
assertNull(none);
954954
}
955955
}
956+
957+
public static long versionId(ServiceUnitStateData data) {
958+
return data == null ? ServiceUnitStateChannelImpl.VERSION_ID_INIT - 1 : data.versionId();
959+
}
956960
}

0 commit comments

Comments
 (0)