Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat] [broker] PIP-188 support blue-green cluster migration [part-1] #17962

Merged
merged 2 commits into from
Oct 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1457,6 +1457,10 @@ splitTopicAndPartitionLabelInPrometheus=false
# Otherwise, aggregate it by list index.
aggregatePublisherStatsByProducerName=false

# Interval between checks to see if cluster is migrated and marks topic migrated
# if cluster is marked migrated. Disable with value 0. (Default disabled).
clusterMigrationCheckDurationSeconds=0

### --- Schema storage --- ###
# The schema storage implementation used by this broker
schemaRegistryStorageClassName=org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,8 @@ void asyncOpenCursor(String name, InitialPosition initialPosition, Map<String, L

void asyncTerminate(TerminateCallback callback, Object ctx);

CompletableFuture<Position> asyncMigrate();

/**
* Terminate the managed ledger and return the last committed entry.
*
Expand Down Expand Up @@ -534,6 +536,11 @@ void asyncOpenCursor(String name, InitialPosition initialPosition, Map<String, L
*/
boolean isTerminated();

/**
* Returns whether the managed ledger was migrated.
*/
boolean isMigrated();

/**
* Returns managed-ledger config.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListMap;
Expand Down Expand Up @@ -241,6 +242,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {

protected static final int DEFAULT_LEDGER_DELETE_RETRIES = 3;
protected static final int DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC = 60;
private static final String MIGRATION_STATE_PROPERTY = "migrated";

public enum State {
None, // Uninitialized
Expand Down Expand Up @@ -268,6 +270,7 @@ public enum PositionBound {
private static final AtomicReferenceFieldUpdater<ManagedLedgerImpl, State> STATE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(ManagedLedgerImpl.class, State.class, "state");
protected volatile State state = null;
private volatile boolean migrated = false;

@Getter
private final OrderedScheduler scheduledExecutor;
Expand Down Expand Up @@ -343,7 +346,7 @@ public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper
// Get the next rollover time. Add a random value upto 5% to avoid rollover multiple ledgers at the same time
this.maximumRolloverTimeMs = getMaximumRolloverTimeMs(config);
this.mlOwnershipChecker = mlOwnershipChecker;
this.propertiesMap = new HashMap();
this.propertiesMap = new ConcurrentHashMap<>();
this.inactiveLedgerRollOverTimeMs = config.getInactiveLedgerRollOverTimeMs();
if (config.getManagedLedgerInterceptor() != null) {
this.managedLedgerInterceptor = config.getManagedLedgerInterceptor();
Expand All @@ -367,7 +370,6 @@ public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) {
lastConfirmedEntry = new PositionImpl(mlInfo.getTerminatedPosition());
log.info("[{}] Recovering managed ledger terminated at {}", name, lastConfirmedEntry);
}

for (LedgerInfo ls : mlInfo.getLedgerInfoList()) {
ledgers.put(ls.getLedgerId(), ls);
}
Expand All @@ -379,6 +381,7 @@ public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) {
propertiesMap.put(property.getKey(), property.getValue());
}
}
migrated = mlInfo.hasTerminatedPosition() && propertiesMap.containsKey(MIGRATION_STATE_PROPERTY);
if (managedLedgerInterceptor != null) {
managedLedgerInterceptor.onManagedLedgerPropertiesInitialize(propertiesMap);
}
Expand Down Expand Up @@ -1271,6 +1274,27 @@ private long consumedLedgerSize(long ledgerSize, long ledgerEntries, long consum
}
}

public CompletableFuture<Position> asyncMigrate() {
propertiesMap.put(MIGRATION_STATE_PROPERTY, Boolean.TRUE.toString());
CompletableFuture<Position> result = new CompletableFuture<>();
asyncTerminate(new TerminateCallback() {

@Override
public void terminateComplete(Position lastCommittedPosition, Object ctx) {
migrated = true;
log.info("[{}] topic successfully terminated and migrated at {}", name, lastCommittedPosition);
result.complete(lastCommittedPosition);
}

@Override
public void terminateFailed(ManagedLedgerException exception, Object ctx) {
log.info("[{}] topic failed to terminate and migrate ", name, exception);
result.completeExceptionally(exception);
}
}, null);
return result;
}

@Override
public synchronized void asyncTerminate(TerminateCallback callback, Object ctx) {
if (state == State.Fenced) {
Expand Down Expand Up @@ -1363,6 +1387,11 @@ public boolean isTerminated() {
return state == State.Terminated;
}

@Override
public boolean isMigrated() {
return migrated;
}

@Override
public void close() throws InterruptedException, ManagedLedgerException {
final CountDownLatch counter = new CountDownLatch(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2511,6 +2511,13 @@ The delayed message index bucket time step(in seconds) in per bucket snapshot se
)
private long brokerServiceCompactionPhaseOneLoopTimeInSeconds = 30;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Interval between checks to see if cluster is migrated and marks topic migrated "
+ " if cluster is marked migrated. Disable with value 0. (Default disabled)."
)
private int clusterMigrationCheckDurationSeconds = 0;

@FieldContext(
category = CATEGORY_SCHEMA,
doc = "Enforce schema validation on following cases:\n\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
Expand All @@ -59,6 +60,7 @@
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData;
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.FailureDomainImpl;
import org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl;
Expand Down Expand Up @@ -229,6 +231,66 @@ public void updateCluster(
});
}

@POST
@Path("/{cluster}/migrate")
@ApiOperation(
value = "Update the configuration for a cluster migration.",
notes = "This operation requires Pulsar superuser privileges.")
@ApiResponses(value = {
@ApiResponse(code = 204, message = "Cluster has been updated."),
@ApiResponse(code = 400, message = "Cluster url must not be empty."),
@ApiResponse(code = 403, message = "Don't have admin permission or policies are read-only."),
@ApiResponse(code = 404, message = "Cluster doesn't exist."),
@ApiResponse(code = 500, message = "Internal server error.")
})
public void updateClusterMigration(
@Suspended AsyncResponse asyncResponse,
@ApiParam(value = "The cluster name", required = true)
@PathParam("cluster") String cluster,
@ApiParam(value = "Is cluster migrated", required = true)
@QueryParam("migrated") boolean isMigrated,
@ApiParam(
value = "The cluster url data",
required = true,
examples = @Example(
value = @ExampleProperty(
mediaType = MediaType.APPLICATION_JSON,
value = """
{
"serviceUrl": "http://pulsar.example.com:8080",
"brokerServiceUrl": "pulsar://pulsar.example.com:6651"
}
"""
)
)
) ClusterUrl clusterUrl) {
if (isMigrated && clusterUrl.isEmpty()) {
asyncResponse.resume(new RestException(Status.BAD_REQUEST, "Cluster url must not be empty"));
return;
}
validateSuperUserAccessAsync()
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> clusterResources().updateClusterAsync(cluster, old -> {
ClusterDataImpl data = (ClusterDataImpl) old;
data.setMigrated(isMigrated);
data.setMigratedClusterUrl(clusterUrl);
return data;
}))
.thenAccept(__ -> {
log.info("[{}] Updated cluster {}", clientAppId(), cluster);
asyncResponse.resume(Response.ok().build());
}).exceptionally(ex -> {
log.error("[{}] Failed to update cluster {}", clientAppId(), cluster, ex);
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
if (realCause instanceof MetadataStoreException.NotFoundException) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Cluster does not exist"));
return null;
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@POST
@Path("/{cluster}/peers")
@ApiOperation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,19 @@ protected String getSubscriptionName() {
return subscription == null ? null : subscription.getName();
}

protected void checkAndApplyReachedEndOfTopicOrTopicMigration(List<Consumer> consumers) {
PersistentTopic topic = (PersistentTopic) subscription.getTopic();
checkAndApplyReachedEndOfTopicOrTopicMigration(topic, consumers);
}

public static void checkAndApplyReachedEndOfTopicOrTopicMigration(PersistentTopic topic, List<Consumer> consumers) {
if (topic.isMigrated()) {
consumers.forEach(c -> c.topicMigrated(topic.getMigratedClusterUrl()));
} else {
consumers.forEach(Consumer::reachedEndOfTopic);
}
}

@Override
public long getFilterProcessedMsgCount() {
return this.filterProcessedMsgs.longValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.resourcegroup.ResourceGroup;
import org.apache.pulsar.broker.resourcegroup.ResourceGroupPublishLimiter;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.ProducerBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.ProducerFencedException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicMigratedException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException;
import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
Expand All @@ -59,6 +61,7 @@
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.EntryFilters;
import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
Expand Down Expand Up @@ -686,7 +689,10 @@ public CompletableFuture<Optional<Long>> addProducer(Producer producer,
lock.writeLock().lock();
try {
checkTopicFenced();
if (isTerminated()) {
if (isMigrated()) {
log.warn("[{}] Attempting to add producer to a migrated topic", topic);
throw new TopicMigratedException("Topic was already migrated");
} else if (isTerminated()) {
log.warn("[{}] Attempting to add producer to a terminated topic", topic);
throw new TopicTerminatedException("Topic was already terminated");
}
Expand Down Expand Up @@ -1180,6 +1186,8 @@ public boolean deletePartitionedTopicMetadataWhileInactive() {

protected abstract boolean isTerminated();

protected abstract boolean isMigrated();

private static final Logger log = LoggerFactory.getLogger(AbstractTopic.class);

public InactiveTopicPolicies getInactiveTopicPolicies() {
Expand Down Expand Up @@ -1299,4 +1307,25 @@ public void updateBrokerSubscribeRate() {
topicPolicies.getSubscribeRate().updateBrokerValue(
subscribeRateInBroker(brokerService.pulsar().getConfiguration()));
}

public Optional<ClusterUrl> getMigratedClusterUrl() {
return getMigratedClusterUrl(brokerService.getPulsar());
}

public static CompletableFuture<Optional<ClusterUrl>> getMigratedClusterUrlAsync(PulsarService pulsar) {
return pulsar.getPulsarResources().getClusterResources().getClusterAsync(pulsar.getConfig().getClusterName())
.thenApply(clusterData -> (clusterData.isPresent() && clusterData.get().isMigrated())
? Optional.ofNullable(clusterData.get().getMigratedClusterUrl())
: Optional.empty());
}

public static Optional<ClusterUrl> getMigratedClusterUrl(PulsarService pulsar) {
try {
return getMigratedClusterUrlAsync(pulsar)
.get(pulsar.getPulsarResources().getClusterResources().getOperationTimeoutSec(), TimeUnit.SECONDS);
} catch (Exception e) {
log.warn("Failed to get migration cluster URL", e);
}
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,13 @@ protected void startInactivityMonitor() {
subscriptionExpiryCheckIntervalInSeconds,
subscriptionExpiryCheckIntervalInSeconds, TimeUnit.SECONDS);
}

// check cluster migration
int interval = pulsar().getConfiguration().getClusterMigrationCheckDurationSeconds();
if (interval > 0) {
inactivityMonitor.scheduleAtFixedRate(safeRun(() -> checkClusterMigration()), interval, interval,
TimeUnit.SECONDS);
}
}

protected void startMessageExpiryMonitor() {
Expand Down Expand Up @@ -1850,6 +1857,10 @@ public void checkGC() {
forEachTopic(Topic::checkGC);
}

public void checkClusterMigration() {
forEachTopic(Topic::checkClusterMigration);
}

public void checkMessageExpiry() {
forEachTopic(Topic::checkMessageExpiry);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,16 @@ public TopicTerminatedException(Throwable t) {
}
}

public static class TopicMigratedException extends BrokerServiceException {
public TopicMigratedException(String msg) {
super(msg);
}

public TopicMigratedException(Throwable t) {
super(t);
}
}

public static class ServerMetadataException extends BrokerServiceException {
public ServerMetadataException(Throwable t) {
super(t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
Expand All @@ -49,10 +50,12 @@
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.CommandTopicMigrated.ResourceType;
import org.apache.pulsar.common.api.proto.KeyLongValue;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.MessageIdData;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl;
import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.stats.Rate;
Expand Down Expand Up @@ -785,6 +788,16 @@ public void reachedEndOfTopic() {
cnx.getCommandSender().sendReachedEndOfTopic(consumerId);
}

public void topicMigrated(Optional<ClusterUrl> clusterUrl) {
if (clusterUrl.isPresent()) {
ClusterUrl url = clusterUrl.get();
cnx.getCommandSender().sendTopicMigrated(ResourceType.Consumer, consumerId, url.getBrokerServiceUrl(),
url.getBrokerServiceUrlTls());
// disconnect consumer after sending migrated cluster url
disconnect();
}
}

/**
* Checks if consumer-blocking on unAckedMessages is allowed for below conditions:<br/>
* a. consumer must have Shared-subscription<br/>
Expand Down
Loading