Skip to content

Commit b0945d1

Browse files
authored
[feat] [broker] PIP-188 support blue-green cluster migration [part-1] (#17962)
* [feat][PIP-188] support blue-green cluster migration [part-1] Add blue-green cluster migration Fix dependency * cleanup
1 parent 5b452d1 commit b0945d1

File tree

41 files changed

+886
-19
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+886
-19
lines changed

conf/broker.conf

+4
Original file line numberDiff line numberDiff line change
@@ -1461,6 +1461,10 @@ splitTopicAndPartitionLabelInPrometheus=false
14611461
# Otherwise, aggregate it by list index.
14621462
aggregatePublisherStatsByProducerName=false
14631463

1464+
# Interval between checks to see if cluster is migrated and marks topic migrated
1465+
# if cluster is marked migrated. Disable with value 0. (Default disabled).
1466+
clusterMigrationCheckDurationSeconds=0
1467+
14641468
### --- Schema storage --- ###
14651469
# The schema storage implementation used by this broker
14661470
schemaRegistryStorageClassName=org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java

+7
Original file line numberDiff line numberDiff line change
@@ -443,6 +443,8 @@ void asyncOpenCursor(String name, InitialPosition initialPosition, Map<String, L
443443

444444
void asyncTerminate(TerminateCallback callback, Object ctx);
445445

446+
CompletableFuture<Position> asyncMigrate();
447+
446448
/**
447449
* Terminate the managed ledger and return the last committed entry.
448450
*
@@ -534,6 +536,11 @@ void asyncOpenCursor(String name, InitialPosition initialPosition, Map<String, L
534536
*/
535537
boolean isTerminated();
536538

539+
/**
540+
* Returns whether the managed ledger was migrated.
541+
*/
542+
boolean isMigrated();
543+
537544
/**
538545
* Returns managed-ledger config.
539546
*/

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java

+31-2
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import java.util.UUID;
5151
import java.util.concurrent.CompletableFuture;
5252
import java.util.concurrent.CompletionException;
53+
import java.util.concurrent.ConcurrentHashMap;
5354
import java.util.concurrent.ConcurrentLinkedDeque;
5455
import java.util.concurrent.ConcurrentLinkedQueue;
5556
import java.util.concurrent.ConcurrentSkipListMap;
@@ -241,6 +242,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
241242

242243
protected static final int DEFAULT_LEDGER_DELETE_RETRIES = 3;
243244
protected static final int DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC = 60;
245+
private static final String MIGRATION_STATE_PROPERTY = "migrated";
244246

245247
public enum State {
246248
None, // Uninitialized
@@ -268,6 +270,7 @@ public enum PositionBound {
268270
private static final AtomicReferenceFieldUpdater<ManagedLedgerImpl, State> STATE_UPDATER =
269271
AtomicReferenceFieldUpdater.newUpdater(ManagedLedgerImpl.class, State.class, "state");
270272
protected volatile State state = null;
273+
private volatile boolean migrated = false;
271274

272275
@Getter
273276
private final OrderedScheduler scheduledExecutor;
@@ -343,7 +346,7 @@ public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper
343346
// Get the next rollover time. Add a random value upto 5% to avoid rollover multiple ledgers at the same time
344347
this.maximumRolloverTimeMs = getMaximumRolloverTimeMs(config);
345348
this.mlOwnershipChecker = mlOwnershipChecker;
346-
this.propertiesMap = new HashMap();
349+
this.propertiesMap = new ConcurrentHashMap<>();
347350
this.inactiveLedgerRollOverTimeMs = config.getInactiveLedgerRollOverTimeMs();
348351
if (config.getManagedLedgerInterceptor() != null) {
349352
this.managedLedgerInterceptor = config.getManagedLedgerInterceptor();
@@ -367,7 +370,6 @@ public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) {
367370
lastConfirmedEntry = new PositionImpl(mlInfo.getTerminatedPosition());
368371
log.info("[{}] Recovering managed ledger terminated at {}", name, lastConfirmedEntry);
369372
}
370-
371373
for (LedgerInfo ls : mlInfo.getLedgerInfoList()) {
372374
ledgers.put(ls.getLedgerId(), ls);
373375
}
@@ -379,6 +381,7 @@ public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) {
379381
propertiesMap.put(property.getKey(), property.getValue());
380382
}
381383
}
384+
migrated = mlInfo.hasTerminatedPosition() && propertiesMap.containsKey(MIGRATION_STATE_PROPERTY);
382385
if (managedLedgerInterceptor != null) {
383386
managedLedgerInterceptor.onManagedLedgerPropertiesInitialize(propertiesMap);
384387
}
@@ -1271,6 +1274,27 @@ private long consumedLedgerSize(long ledgerSize, long ledgerEntries, long consum
12711274
}
12721275
}
12731276

1277+
public CompletableFuture<Position> asyncMigrate() {
1278+
propertiesMap.put(MIGRATION_STATE_PROPERTY, Boolean.TRUE.toString());
1279+
CompletableFuture<Position> result = new CompletableFuture<>();
1280+
asyncTerminate(new TerminateCallback() {
1281+
1282+
@Override
1283+
public void terminateComplete(Position lastCommittedPosition, Object ctx) {
1284+
migrated = true;
1285+
log.info("[{}] topic successfully terminated and migrated at {}", name, lastCommittedPosition);
1286+
result.complete(lastCommittedPosition);
1287+
}
1288+
1289+
@Override
1290+
public void terminateFailed(ManagedLedgerException exception, Object ctx) {
1291+
log.info("[{}] topic failed to terminate and migrate ", name, exception);
1292+
result.completeExceptionally(exception);
1293+
}
1294+
}, null);
1295+
return result;
1296+
}
1297+
12741298
@Override
12751299
public synchronized void asyncTerminate(TerminateCallback callback, Object ctx) {
12761300
if (state == State.Fenced) {
@@ -1363,6 +1387,11 @@ public boolean isTerminated() {
13631387
return state == State.Terminated;
13641388
}
13651389

1390+
@Override
1391+
public boolean isMigrated() {
1392+
return migrated;
1393+
}
1394+
13661395
@Override
13671396
public void close() throws InterruptedException, ManagedLedgerException {
13681397
final CountDownLatch counter = new CountDownLatch(1);

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java

+7
Original file line numberDiff line numberDiff line change
@@ -2514,6 +2514,13 @@ The delayed message index bucket time step(in seconds) in per bucket snapshot se
25142514
)
25152515
private long brokerServiceCompactionPhaseOneLoopTimeInSeconds = 30;
25162516

2517+
@FieldContext(
2518+
category = CATEGORY_SERVER,
2519+
doc = "Interval between checks to see if cluster is migrated and marks topic migrated "
2520+
+ " if cluster is marked migrated. Disable with value 0. (Default disabled)."
2521+
)
2522+
private int clusterMigrationCheckDurationSeconds = 0;
2523+
25172524
@FieldContext(
25182525
category = CATEGORY_SCHEMA,
25192526
doc = "Enforce schema validation on following cases:\n\n"

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java

+62
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import javax.ws.rs.PUT;
4242
import javax.ws.rs.Path;
4343
import javax.ws.rs.PathParam;
44+
import javax.ws.rs.QueryParam;
4445
import javax.ws.rs.WebApplicationException;
4546
import javax.ws.rs.container.AsyncResponse;
4647
import javax.ws.rs.container.Suspended;
@@ -59,6 +60,7 @@
5960
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData;
6061
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl;
6162
import org.apache.pulsar.common.policies.data.ClusterData;
63+
import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl;
6264
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
6365
import org.apache.pulsar.common.policies.data.FailureDomainImpl;
6466
import org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl;
@@ -229,6 +231,66 @@ public void updateCluster(
229231
});
230232
}
231233

234+
@POST
235+
@Path("/{cluster}/migrate")
236+
@ApiOperation(
237+
value = "Update the configuration for a cluster migration.",
238+
notes = "This operation requires Pulsar superuser privileges.")
239+
@ApiResponses(value = {
240+
@ApiResponse(code = 204, message = "Cluster has been updated."),
241+
@ApiResponse(code = 400, message = "Cluster url must not be empty."),
242+
@ApiResponse(code = 403, message = "Don't have admin permission or policies are read-only."),
243+
@ApiResponse(code = 404, message = "Cluster doesn't exist."),
244+
@ApiResponse(code = 500, message = "Internal server error.")
245+
})
246+
public void updateClusterMigration(
247+
@Suspended AsyncResponse asyncResponse,
248+
@ApiParam(value = "The cluster name", required = true)
249+
@PathParam("cluster") String cluster,
250+
@ApiParam(value = "Is cluster migrated", required = true)
251+
@QueryParam("migrated") boolean isMigrated,
252+
@ApiParam(
253+
value = "The cluster url data",
254+
required = true,
255+
examples = @Example(
256+
value = @ExampleProperty(
257+
mediaType = MediaType.APPLICATION_JSON,
258+
value = """
259+
{
260+
"serviceUrl": "http://pulsar.example.com:8080",
261+
"brokerServiceUrl": "pulsar://pulsar.example.com:6651"
262+
}
263+
"""
264+
)
265+
)
266+
) ClusterUrl clusterUrl) {
267+
if (isMigrated && clusterUrl.isEmpty()) {
268+
asyncResponse.resume(new RestException(Status.BAD_REQUEST, "Cluster url must not be empty"));
269+
return;
270+
}
271+
validateSuperUserAccessAsync()
272+
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
273+
.thenCompose(__ -> clusterResources().updateClusterAsync(cluster, old -> {
274+
ClusterDataImpl data = (ClusterDataImpl) old;
275+
data.setMigrated(isMigrated);
276+
data.setMigratedClusterUrl(clusterUrl);
277+
return data;
278+
}))
279+
.thenAccept(__ -> {
280+
log.info("[{}] Updated cluster {}", clientAppId(), cluster);
281+
asyncResponse.resume(Response.ok().build());
282+
}).exceptionally(ex -> {
283+
log.error("[{}] Failed to update cluster {}", clientAppId(), cluster, ex);
284+
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
285+
if (realCause instanceof MetadataStoreException.NotFoundException) {
286+
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Cluster does not exist"));
287+
return null;
288+
}
289+
resumeAsyncResponseExceptionally(asyncResponse, ex);
290+
return null;
291+
});
292+
}
293+
232294
@POST
233295
@Path("/{cluster}/peers")
234296
@ApiOperation(

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java

+13
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,19 @@ protected String getSubscriptionName() {
338338
return subscription == null ? null : subscription.getName();
339339
}
340340

341+
protected void checkAndApplyReachedEndOfTopicOrTopicMigration(List<Consumer> consumers) {
342+
PersistentTopic topic = (PersistentTopic) subscription.getTopic();
343+
checkAndApplyReachedEndOfTopicOrTopicMigration(topic, consumers);
344+
}
345+
346+
public static void checkAndApplyReachedEndOfTopicOrTopicMigration(PersistentTopic topic, List<Consumer> consumers) {
347+
if (topic.isMigrated()) {
348+
consumers.forEach(c -> c.topicMigrated(topic.getMigratedClusterUrl()));
349+
} else {
350+
consumers.forEach(Consumer::reachedEndOfTopic);
351+
}
352+
}
353+
341354
@Override
342355
public long getFilterProcessedMsgCount() {
343356
return this.filterProcessedMsgs.longValue();

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java

+30-1
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,14 @@
4444
import org.apache.commons.collections4.CollectionUtils;
4545
import org.apache.commons.collections4.MapUtils;
4646
import org.apache.commons.lang3.tuple.Pair;
47+
import org.apache.pulsar.broker.PulsarService;
4748
import org.apache.pulsar.broker.ServiceConfiguration;
4849
import org.apache.pulsar.broker.resourcegroup.ResourceGroup;
4950
import org.apache.pulsar.broker.resourcegroup.ResourceGroupPublishLimiter;
5051
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
5152
import org.apache.pulsar.broker.service.BrokerServiceException.ProducerBusyException;
5253
import org.apache.pulsar.broker.service.BrokerServiceException.ProducerFencedException;
54+
import org.apache.pulsar.broker.service.BrokerServiceException.TopicMigratedException;
5355
import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException;
5456
import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
5557
import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
@@ -59,6 +61,7 @@
5961
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
6062
import org.apache.pulsar.common.naming.TopicName;
6163
import org.apache.pulsar.common.policies.data.BacklogQuota;
64+
import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl;
6265
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
6366
import org.apache.pulsar.common.policies.data.EntryFilters;
6467
import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
@@ -686,7 +689,10 @@ public CompletableFuture<Optional<Long>> addProducer(Producer producer,
686689
lock.writeLock().lock();
687690
try {
688691
checkTopicFenced();
689-
if (isTerminated()) {
692+
if (isMigrated()) {
693+
log.warn("[{}] Attempting to add producer to a migrated topic", topic);
694+
throw new TopicMigratedException("Topic was already migrated");
695+
} else if (isTerminated()) {
690696
log.warn("[{}] Attempting to add producer to a terminated topic", topic);
691697
throw new TopicTerminatedException("Topic was already terminated");
692698
}
@@ -1180,6 +1186,8 @@ public boolean deletePartitionedTopicMetadataWhileInactive() {
11801186

11811187
protected abstract boolean isTerminated();
11821188

1189+
protected abstract boolean isMigrated();
1190+
11831191
private static final Logger log = LoggerFactory.getLogger(AbstractTopic.class);
11841192

11851193
public InactiveTopicPolicies getInactiveTopicPolicies() {
@@ -1299,4 +1307,25 @@ public void updateBrokerSubscribeRate() {
12991307
topicPolicies.getSubscribeRate().updateBrokerValue(
13001308
subscribeRateInBroker(brokerService.pulsar().getConfiguration()));
13011309
}
1310+
1311+
public Optional<ClusterUrl> getMigratedClusterUrl() {
1312+
return getMigratedClusterUrl(brokerService.getPulsar());
1313+
}
1314+
1315+
public static CompletableFuture<Optional<ClusterUrl>> getMigratedClusterUrlAsync(PulsarService pulsar) {
1316+
return pulsar.getPulsarResources().getClusterResources().getClusterAsync(pulsar.getConfig().getClusterName())
1317+
.thenApply(clusterData -> (clusterData.isPresent() && clusterData.get().isMigrated())
1318+
? Optional.ofNullable(clusterData.get().getMigratedClusterUrl())
1319+
: Optional.empty());
1320+
}
1321+
1322+
public static Optional<ClusterUrl> getMigratedClusterUrl(PulsarService pulsar) {
1323+
try {
1324+
return getMigratedClusterUrlAsync(pulsar)
1325+
.get(pulsar.getPulsarResources().getClusterResources().getOperationTimeoutSec(), TimeUnit.SECONDS);
1326+
} catch (Exception e) {
1327+
log.warn("Failed to get migration cluster URL", e);
1328+
}
1329+
return Optional.empty();
1330+
}
13021331
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

+11
Original file line numberDiff line numberDiff line change
@@ -582,6 +582,13 @@ protected void startInactivityMonitor() {
582582
subscriptionExpiryCheckIntervalInSeconds,
583583
subscriptionExpiryCheckIntervalInSeconds, TimeUnit.SECONDS);
584584
}
585+
586+
// check cluster migration
587+
int interval = pulsar().getConfiguration().getClusterMigrationCheckDurationSeconds();
588+
if (interval > 0) {
589+
inactivityMonitor.scheduleAtFixedRate(safeRun(() -> checkClusterMigration()), interval, interval,
590+
TimeUnit.SECONDS);
591+
}
585592
}
586593

587594
protected void startMessageExpiryMonitor() {
@@ -1851,6 +1858,10 @@ public void checkGC() {
18511858
forEachTopic(Topic::checkGC);
18521859
}
18531860

1861+
public void checkClusterMigration() {
1862+
forEachTopic(Topic::checkClusterMigration);
1863+
}
1864+
18541865
public void checkMessageExpiry() {
18551866
forEachTopic(Topic::checkMessageExpiry);
18561867
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java

+10
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,16 @@ public TopicTerminatedException(Throwable t) {
100100
}
101101
}
102102

103+
public static class TopicMigratedException extends BrokerServiceException {
104+
public TopicMigratedException(String msg) {
105+
super(msg);
106+
}
107+
108+
public TopicMigratedException(Throwable t) {
109+
super(t);
110+
}
111+
}
112+
103113
public static class ServerMetadataException extends BrokerServiceException {
104114
public ServerMetadataException(Throwable t) {
105115
super(t);

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java

+13
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.List;
3232
import java.util.Map;
3333
import java.util.Objects;
34+
import java.util.Optional;
3435
import java.util.concurrent.CompletableFuture;
3536
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
3637
import java.util.concurrent.atomic.LongAdder;
@@ -49,10 +50,12 @@
4950
import org.apache.pulsar.common.api.proto.CommandAck;
5051
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
5152
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
53+
import org.apache.pulsar.common.api.proto.CommandTopicMigrated.ResourceType;
5254
import org.apache.pulsar.common.api.proto.KeyLongValue;
5355
import org.apache.pulsar.common.api.proto.KeySharedMeta;
5456
import org.apache.pulsar.common.api.proto.MessageIdData;
5557
import org.apache.pulsar.common.naming.TopicName;
58+
import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl;
5659
import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
5760
import org.apache.pulsar.common.protocol.Commands;
5861
import org.apache.pulsar.common.stats.Rate;
@@ -785,6 +788,16 @@ public void reachedEndOfTopic() {
785788
cnx.getCommandSender().sendReachedEndOfTopic(consumerId);
786789
}
787790

791+
public void topicMigrated(Optional<ClusterUrl> clusterUrl) {
792+
if (clusterUrl.isPresent()) {
793+
ClusterUrl url = clusterUrl.get();
794+
cnx.getCommandSender().sendTopicMigrated(ResourceType.Consumer, consumerId, url.getBrokerServiceUrl(),
795+
url.getBrokerServiceUrlTls());
796+
// disconnect consumer after sending migrated cluster url
797+
disconnect();
798+
}
799+
}
800+
788801
/**
789802
* Checks if consumer-blocking on unAckedMessages is allowed for below conditions:<br/>
790803
* a. consumer must have Shared-subscription<br/>

0 commit comments

Comments
 (0)