diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 6dda1e152b9f2a..bbd5d34e2afe3f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -740,7 +740,7 @@ public Env(boolean isCheckpointCatalog) { this.feSessionMgr = new FESessionMgr(); this.temporaryTableMgr = new TemporaryTableMgr(); this.aliveSessionSet = Sets.newConcurrentHashSet(); - this.tabletInvertedIndex = new TabletInvertedIndex(); + this.tabletInvertedIndex = EnvFactory.getInstance().createTabletInvertedIndex(); this.colocateTableIndex = new ColocateTableIndex(); this.recycleBin = new CatalogRecycleBin(); this.functionSet = new FunctionSet(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/EnvFactory.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/EnvFactory.java index 12f39db81d57b1..f8930ee6247459 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/EnvFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/EnvFactory.java @@ -86,6 +86,10 @@ public SystemInfoService createSystemInfoService() { return new SystemInfoService(); } + public TabletInvertedIndex createTabletInvertedIndex() { + return new LocalTabletInvertedIndex(); + } + public Type getPartitionClass() { return Partition.class; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalTabletInvertedIndex.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalTabletInvertedIndex.java new file mode 100644 index 00000000000000..6d47592c9e9851 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalTabletInvertedIndex.java @@ -0,0 +1,1046 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.catalog; + +import org.apache.doris.catalog.Replica.ReplicaState; +import org.apache.doris.clone.PartitionRebalancer.TabletMove; +import org.apache.doris.common.Config; +import org.apache.doris.common.FeConstants; +import org.apache.doris.common.Pair; +import org.apache.doris.cooldown.CooldownConf; +import org.apache.doris.master.PartitionInfoCollector.PartitionCollectInfo; +import org.apache.doris.task.PublishVersionTask; +import org.apache.doris.thrift.TPartitionVersionInfo; +import org.apache.doris.thrift.TStorageMedium; +import org.apache.doris.thrift.TTablet; +import org.apache.doris.thrift.TTabletInfo; +import org.apache.doris.thrift.TTabletMetaInfo; +import org.apache.doris.transaction.GlobalTransactionMgrIface; +import org.apache.doris.transaction.PartitionCommitInfo; +import org.apache.doris.transaction.TableCommitInfo; +import org.apache.doris.transaction.TransactionState; +import org.apache.doris.transaction.TransactionStatus; + +import com.google.common.base.Preconditions; +import com.google.common.collect.HashBasedTable; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.LinkedHashMultimap; +import com.google.common.collect.ListMultimap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Ordering; +import com.google.common.collect.SetMultimap; +import com.google.common.collect.Sets; +import com.google.common.collect.Table; +import com.google.common.collect.TreeMultimap; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public class LocalTabletInvertedIndex extends TabletInvertedIndex { + private static final Logger LOG = LogManager.getLogger(LocalTabletInvertedIndex.class); + + // tablet id -> (backend id -> replica) + private Table replicaMetaTable = HashBasedTable.create(); + + // backing replica table, for visiting backend replicas faster. + // backend id -> (tablet id -> replica) + private Table backingReplicaMetaTable = HashBasedTable.create(); + + // partition id -> partition info. + // notice partition info update every Config.partition_info_update_interval_secs seconds, + // so it may be stale. + private volatile ImmutableMap partitionCollectInfoMap = ImmutableMap.of(); + + private final ExecutorService taskPool = new ThreadPoolExecutor( + Config.tablet_report_thread_pool_num, + 2 * Config.tablet_report_thread_pool_num, + // tablet report task default 60s once + 120L, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(Config.tablet_report_queue_size), + new ThreadPoolExecutor.DiscardOldestPolicy()); + + public LocalTabletInvertedIndex() { + super(); + } + + @Override + public void tabletReport(long backendId, Map backendTablets, + Map backendPartitionsVersion, + final HashMap storageMediumMap, + ListMultimap tabletSyncMap, + ListMultimap tabletDeleteFromMeta, + Set tabletFoundInMeta, + ListMultimap tabletMigrationMap, + Map partitionVersionSyncMap, + Map> transactionsToPublish, + SetMultimap transactionsToClear, + ListMultimap tabletRecoveryMap, + List tabletToUpdate, + List cooldownConfToPush, + List cooldownConfToUpdate) { + List> cooldownTablets = new ArrayList<>(); + long feTabletNum = 0; + long stamp = readLock(); + long start = System.currentTimeMillis(); + + try { + if (LOG.isDebugEnabled()) { + LOG.debug("begin to do tablet diff with backend[{}]. num: {}", backendId, backendTablets.size()); + } + + Map replicaMetaWithBackend = backingReplicaMetaTable.row(backendId); + if (replicaMetaWithBackend != null) { + feTabletNum = replicaMetaWithBackend.size(); + processTabletReportAsync(backendId, backendTablets, backendPartitionsVersion, storageMediumMap, + tabletSyncMap, tabletDeleteFromMeta, tabletFoundInMeta, tabletMigrationMap, + partitionVersionSyncMap, transactionsToPublish, transactionsToClear, tabletRecoveryMap, + tabletToUpdate, cooldownTablets, replicaMetaWithBackend); + } + } finally { + readUnlock(stamp); + } + + // Process cooldown configs outside of read lock to avoid deadlock + cooldownTablets.forEach(p -> handleCooldownConf(p.first, p.second, cooldownConfToPush, cooldownConfToUpdate)); + + logTabletReportSummary(backendId, feTabletNum, backendTablets, backendPartitionsVersion, + tabletSyncMap, tabletDeleteFromMeta, tabletFoundInMeta, tabletMigrationMap, + partitionVersionSyncMap, transactionsToPublish, transactionsToClear, + tabletToUpdate, tabletRecoveryMap, start); + } + + /** + * Process tablet report asynchronously using thread pool + */ + private void processTabletReportAsync(long backendId, Map backendTablets, + Map backendPartitionsVersion, + HashMap storageMediumMap, + ListMultimap tabletSyncMap, + ListMultimap tabletDeleteFromMeta, + Set tabletFoundInMeta, + ListMultimap tabletMigrationMap, + Map partitionVersionSyncMap, + Map> transactionsToPublish, + SetMultimap transactionsToClear, + ListMultimap tabletRecoveryMap, + List tabletToUpdate, + List> cooldownTablets, + Map replicaMetaWithBackend) { + // Calculate optimal chunk size to balance task granularity and concurrency + // For large tablet counts (40W-50W), we want smaller chunks to maximize parallelism + // Target: create at least threadPoolSize * 4 tasks for better load balancing + int totalTablets = replicaMetaWithBackend.size(); + int threadPoolSize = Config.tablet_report_thread_pool_num; + int targetTasks = threadPoolSize * 4; // Create 4x tasks as threads for better load balancing + int chunkSize = Math.max(500, totalTablets / targetTasks); + + // Cap chunk size to avoid too large tasks + // so thread pool queue will not be fulled with few large tasks + int maxChunkSize = 10000; + chunkSize = Math.min(chunkSize, maxChunkSize); + List> entries = new ArrayList<>(replicaMetaWithBackend.entrySet()); + List> tabletFutures = new ArrayList<>(); + int estimatedTasks = (totalTablets + chunkSize - 1) / chunkSize; + if (LOG.isDebugEnabled()) { + LOG.debug("Processing tablet report for backend[{}]: total tablets={}, chunkSize={}, estimated tasks={}", + backendId, totalTablets, chunkSize, estimatedTasks); + } + + for (int i = 0; i < entries.size(); i += chunkSize) { + final int start = i; + final int end = Math.min(i + chunkSize, entries.size()); + + CompletableFuture future = CompletableFuture.runAsync(() -> { + for (int j = start; j < end; j++) { + Map.Entry entry = entries.get(j); + processTabletEntry(backendId, backendTablets, storageMediumMap, tabletSyncMap, + tabletDeleteFromMeta, tabletFoundInMeta, tabletMigrationMap, + transactionsToPublish, transactionsToClear, tabletRecoveryMap, + tabletToUpdate, cooldownTablets, entry); + } + }, taskPool); + + tabletFutures.add(future); + } + + // Process partition versions in parallel + CompletableFuture partitionFuture = CompletableFuture.runAsync(() -> { + processPartitionVersions(backendPartitionsVersion, partitionVersionSyncMap); + }, taskPool); + + // Wait for all tasks to complete + CompletableFuture.allOf(tabletFutures.toArray(new CompletableFuture[0])).join(); + partitionFuture.join(); + } + + /** + * Process a single tablet entry from backend report + */ + private void processTabletEntry(long backendId, Map backendTablets, + HashMap storageMediumMap, + ListMultimap tabletSyncMap, + ListMultimap tabletDeleteFromMeta, + Set tabletFoundInMeta, + ListMultimap tabletMigrationMap, + Map> transactionsToPublish, + SetMultimap transactionsToClear, + ListMultimap tabletRecoveryMap, + List tabletToUpdate, + List> cooldownTablets, + Map.Entry entry) { + long tabletId = entry.getKey(); + Replica replica = entry.getValue(); + + Preconditions.checkState(tabletMetaMap.containsKey(tabletId), + "tablet " + tabletId + " not exists, backend " + backendId); + TabletMeta tabletMeta = tabletMetaMap.get(tabletId); + + if (backendTablets.containsKey(tabletId)) { + // Tablet exists in both FE and BE + TTablet backendTablet = backendTablets.get(tabletId); + TTabletInfo backendTabletInfo = backendTablet.getTabletInfos().get(0); + + tabletFoundInMeta.add(tabletId); + + processExistingTablet(backendId, tabletId, replica, tabletMeta, backendTabletInfo, + storageMediumMap, tabletSyncMap, tabletMigrationMap, transactionsToPublish, + transactionsToClear, tabletRecoveryMap, tabletToUpdate, cooldownTablets); + } else { + // Tablet exists in FE but not in BE - may need deletion + processDeletedTablet(backendId, tabletId, tabletMeta, tabletDeleteFromMeta); + } + } + + /** + * Process tablet that exists in both FE and BE + */ + private void processExistingTablet(long backendId, long tabletId, Replica replica, + TabletMeta tabletMeta, TTabletInfo backendTabletInfo, + HashMap storageMediumMap, + ListMultimap tabletSyncMap, + ListMultimap tabletMigrationMap, + Map> transactionsToPublish, + SetMultimap transactionsToClear, + ListMultimap tabletRecoveryMap, + List tabletToUpdate, + List> cooldownTablets) { + // Check and prepare tablet meta info update + TTabletMetaInfo tabletMetaInfo = prepareTabletMetaInfo(replica, tabletMeta, backendTabletInfo); + + // Check if version sync is needed + if (needSync(replica, backendTabletInfo)) { + synchronized (tabletSyncMap) { + tabletSyncMap.put(tabletMeta.getDbId(), tabletId); + } + } + + // Update replica path and schema hash + updateReplicaBasicInfo(replica, backendTabletInfo); + + // Check if replica needs recovery + if (needRecover(replica, tabletMeta.getOldSchemaHash(), backendTabletInfo)) { + logReplicaRecovery(replica, tabletId, backendId, backendTabletInfo); + synchronized (tabletRecoveryMap) { + tabletRecoveryMap.put(tabletMeta.getDbId(), tabletId); + } + } + + // Handle cooldown policy + if (Config.enable_storage_policy && backendTabletInfo.isSetCooldownTerm()) { + synchronized (cooldownTablets) { + cooldownTablets.add(Pair.of(tabletMeta, backendTabletInfo)); + } + replica.setCooldownMetaId(backendTabletInfo.getCooldownMetaId()); + replica.setCooldownTerm(backendTabletInfo.getCooldownTerm()); + } + + // Check storage medium migration + checkStorageMediumMigration(tabletId, tabletMeta, backendTabletInfo, + storageMediumMap, tabletMigrationMap); + + // Handle transactions + if (backendTabletInfo.isSetTransactionIds()) { + handleBackendTransactions(backendId, backendTabletInfo.getTransactionIds(), tabletId, + tabletMeta, transactionsToPublish, transactionsToClear); + } + + // Update replica version count + updateReplicaVersionCount(replica, backendTabletInfo); + + // Add tablet meta info to update list if needed + if (tabletMetaInfo != null) { + tabletMetaInfo.setTabletId(tabletId); + synchronized (tabletToUpdate) { + tabletToUpdate.add(tabletMetaInfo); + } + } + } + + /** + * Prepare tablet meta info for BE update if needed + */ + private TTabletMetaInfo prepareTabletMetaInfo(Replica replica, TabletMeta tabletMeta, + TTabletInfo backendTabletInfo) { + TTabletMetaInfo tabletMetaInfo = null; + + // Check replica id mismatch + if (backendTabletInfo.getReplicaId() != replica.getId() + && replica.getState() != ReplicaState.CLONE) { + tabletMetaInfo = new TTabletMetaInfo(); + tabletMetaInfo.setReplicaId(replica.getId()); + } + + // Check in-memory flag + PartitionCollectInfo partitionCollectInfo = + partitionCollectInfoMap.get(backendTabletInfo.getPartitionId()); + boolean isInMemory = partitionCollectInfo != null && partitionCollectInfo.isInMemory(); + if (isInMemory != backendTabletInfo.isIsInMemory()) { + if (tabletMetaInfo == null) { + tabletMetaInfo = new TTabletMetaInfo(); + } + tabletMetaInfo.setIsInMemory(isInMemory); + } + + // Check partition id mismatch + if (Config.fix_tablet_partition_id_eq_0 + && tabletMeta.getPartitionId() > 0 + && backendTabletInfo.getPartitionId() == 0) { + LOG.warn("be report tablet partition id not eq fe, in be {} but in fe {}", + backendTabletInfo, tabletMeta); + if (tabletMetaInfo == null) { + tabletMetaInfo = new TTabletMetaInfo(); + } + tabletMetaInfo.setPartitionId(tabletMeta.getPartitionId()); + } + + return tabletMetaInfo; + } + + /** + * Update replica's basic info like path hash and schema hash + */ + private void updateReplicaBasicInfo(Replica replica, TTabletInfo backendTabletInfo) { + // Update path hash + if (backendTabletInfo.isSetPathHash() + && replica.getPathHash() != backendTabletInfo.getPathHash()) { + replica.setPathHash(backendTabletInfo.getPathHash()); + } + + // Update schema hash + if (backendTabletInfo.isSetSchemaHash() + && replica.getState() == ReplicaState.NORMAL + && replica.getSchemaHash() != backendTabletInfo.getSchemaHash()) { + replica.setSchemaHash(backendTabletInfo.getSchemaHash()); + } + } + + /** + * Log replica recovery information + */ + private void logReplicaRecovery(Replica replica, long tabletId, long backendId, + TTabletInfo backendTabletInfo) { + LOG.warn("replica {} of tablet {} on backend {} need recovery. " + + "replica in FE: {}, report version {}, report schema hash: {}, " + + "is bad: {}, is version missing: {}", + replica.getId(), tabletId, backendId, replica, + backendTabletInfo.getVersion(), + backendTabletInfo.getSchemaHash(), + backendTabletInfo.isSetUsed() ? !backendTabletInfo.isUsed() : "false", + backendTabletInfo.isSetVersionMiss() ? backendTabletInfo.isVersionMiss() : "unset"); + } + + /** + * Check if storage medium migration is needed + */ + private void checkStorageMediumMigration(long tabletId, TabletMeta tabletMeta, + TTabletInfo backendTabletInfo, + HashMap storageMediumMap, + ListMultimap tabletMigrationMap) { + if (Config.disable_storage_medium_check) { + return; + } + + long partitionId = tabletMeta.getPartitionId(); + TStorageMedium storageMedium = storageMediumMap.get(partitionId); + + if (storageMedium != null && backendTabletInfo.isSetStorageMedium() + && isLocal(storageMedium) + && isLocal(backendTabletInfo.getStorageMedium()) + && isLocal(tabletMeta.getStorageMedium())) { + + if (storageMedium != backendTabletInfo.getStorageMedium()) { + synchronized (tabletMigrationMap) { + tabletMigrationMap.put(storageMedium, tabletId); + } + } + + if (storageMedium != tabletMeta.getStorageMedium()) { + tabletMeta.setStorageMedium(storageMedium); + } + } + } + + /** + * Update replica's version count + */ + private void updateReplicaVersionCount(Replica replica, TTabletInfo backendTabletInfo) { + if (backendTabletInfo.isSetTotalVersionCount()) { + replica.setTotalVersionCount(backendTabletInfo.getTotalVersionCount()); + replica.setVisibleVersionCount(backendTabletInfo.isSetVisibleVersionCount() + ? backendTabletInfo.getVisibleVersionCount() + : backendTabletInfo.getTotalVersionCount()); + } + } + + /** + * Process tablet that exists in FE but not reported by BE + */ + private void processDeletedTablet(long backendId, long tabletId, TabletMeta tabletMeta, + ListMultimap tabletDeleteFromMeta) { + if (LOG.isDebugEnabled()) { + LOG.debug("backend[{}] does not report tablet[{}-{}]", backendId, tabletId, tabletMeta); + } + synchronized (tabletDeleteFromMeta) { + tabletDeleteFromMeta.put(tabletMeta.getDbId(), tabletId); + } + } + + /** + * Process partition versions reported by BE + */ + private void processPartitionVersions(Map backendPartitionsVersion, + Map partitionVersionSyncMap) { + for (Map.Entry entry : backendPartitionsVersion.entrySet()) { + long partitionId = entry.getKey(); + long backendVersion = entry.getValue(); + PartitionCollectInfo partitionInfo = partitionCollectInfoMap.get(partitionId); + + if (partitionInfo != null && partitionInfo.getVisibleVersion() > backendVersion) { + partitionVersionSyncMap.put(partitionId, partitionInfo.getVisibleVersion()); + } + } + } + + /** + * Log tablet report summary + */ + private void logTabletReportSummary(long backendId, long feTabletNum, + Map backendTablets, + Map backendPartitionsVersion, + ListMultimap tabletSyncMap, + ListMultimap tabletDeleteFromMeta, + Set tabletFoundInMeta, + ListMultimap tabletMigrationMap, + Map partitionVersionSyncMap, + Map> transactionsToPublish, + SetMultimap transactionsToClear, + List tabletToUpdate, + ListMultimap tabletRecoveryMap, + long startTime) { + long endTime = System.currentTimeMillis(); + long toClearTransactionsNum = transactionsToClear.keySet().size(); + long toClearTransactionsPartitions = transactionsToClear.values().size(); + long toPublishTransactionsNum = transactionsToPublish.values().stream() + .mapToLong(m -> m.keySet().size()).sum(); + long toPublishTransactionsPartitions = transactionsToPublish.values().stream() + .mapToLong(m -> m.values().size()).sum(); + + LOG.info("finished to do tablet diff with backend[{}]. fe tablet num: {}, backend tablet num: {}. " + + "sync: {}, metaDel: {}, foundInMeta: {}, migration: {}, " + + "backend partition num: {}, backend need update: {}, " + + "found invalid transactions {}(partitions: {}), " + + "found republish transactions {}(partitions: {}), " + + "tabletToUpdate: {}, need recovery: {}, cost: {} ms", + backendId, feTabletNum, backendTablets.size(), tabletSyncMap.size(), + tabletDeleteFromMeta.size(), tabletFoundInMeta.size(), tabletMigrationMap.size(), + backendPartitionsVersion.size(), partitionVersionSyncMap.size(), + toClearTransactionsNum, toClearTransactionsPartitions, + toPublishTransactionsNum, toPublishTransactionsPartitions, + tabletToUpdate.size(), tabletRecoveryMap.size(), (endTime - startTime)); + } + + private void handleBackendTransactions(long backendId, List transactionIds, long tabletId, + TabletMeta tabletMeta, Map> transactionsToPublish, + SetMultimap transactionsToClear) { + GlobalTransactionMgrIface transactionMgr = Env.getCurrentGlobalTransactionMgr(); + long partitionId = tabletMeta.getPartitionId(); + for (Long transactionId : transactionIds) { + TransactionState transactionState = transactionMgr.getTransactionState(tabletMeta.getDbId(), transactionId); + if (transactionState == null || transactionState.getTransactionStatus() == TransactionStatus.ABORTED) { + synchronized (transactionsToClear) { + transactionsToClear.put(transactionId, tabletMeta.getPartitionId()); + } + if (LOG.isDebugEnabled()) { + LOG.debug("transaction id [{}] is not valid any more, clear it from backend [{}]", + transactionId, backendId); + } + } else if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) { + publishPartition(transactionState, transactionId, tabletMeta, partitionId, transactionsToPublish); + } else if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED) { + // for some reasons, transaction pushlish succeed replica num less than quorum, + // this transaction's status can not to be VISIBLE, and this publish task of + // this replica of this tablet on this backend need retry publish success to + // make transaction VISIBLE when last publish failed. + Map> publishVersionTask = transactionState.getPublishVersionTasks(); + List tasks = publishVersionTask.get(backendId); + if (tasks == null) { + continue; + } + for (PublishVersionTask task : tasks) { + if (task != null && task.isFinished()) { + List errorTablets = task.getErrorTablets(); + if (errorTablets != null) { + for (int i = 0; i < errorTablets.size(); i++) { + if (tabletId == errorTablets.get(i)) { + publishPartition(transactionState, transactionId, tabletMeta, partitionId, + transactionsToPublish); + break; + } + } + } + } + } + } + } + } + + // the transactionId may be sub transaction id or transaction id + private TPartitionVersionInfo generatePartitionVersionInfoWhenReport(TransactionState transactionState, + long transactionId, TabletMeta tabletMeta, long partitionId) { + TableCommitInfo tableCommitInfo; + if (transactionState.getSubTxnIds() == null) { + tableCommitInfo = transactionState.getTableCommitInfo(tabletMeta.getTableId()); + } else { + tableCommitInfo = transactionState.getTableCommitInfoBySubTxnId(transactionId); + } + if (tableCommitInfo != null && tableCommitInfo.getPartitionCommitInfo(partitionId) != null) { + PartitionCommitInfo partitionCommitInfo = tableCommitInfo.getPartitionCommitInfo(partitionId); + return new TPartitionVersionInfo(tabletMeta.getPartitionId(), + partitionCommitInfo.getVersion(), 0); + } + return null; + } + + private void publishPartition(TransactionState transactionState, long transactionId, TabletMeta tabletMeta, + long partitionId, Map> transactionsToPublish) { + TPartitionVersionInfo versionInfo = generatePartitionVersionInfoWhenReport(transactionState, + transactionId, tabletMeta, partitionId); + if (versionInfo != null) { + synchronized (transactionsToPublish) { + SetMultimap map = transactionsToPublish.get( + transactionState.getDbId()); + if (map == null) { + map = LinkedHashMultimap.create(); + transactionsToPublish.put(transactionState.getDbId(), map); + } + map.put(transactionId, versionInfo); + } + } + } + + private boolean needSync(Replica replicaInFe, TTabletInfo backendTabletInfo) { + if (backendTabletInfo.isSetUsed() && !backendTabletInfo.isUsed()) { + // tablet is bad, do not sync + // it will be handled in needRecovery() + return false; + } + + if (replicaInFe.getState() == ReplicaState.ALTER) { + // ignore the replica is ALTER state. its version will be taken care by load process and alter table process + return false; + } + + long versionInFe = replicaInFe.getVersion(); + + if (backendTabletInfo.getVersion() > versionInFe) { + // backend replica's version is larger or newer than replica in FE, sync it. + return true; + } else if (versionInFe == backendTabletInfo.getVersion()) { + // backend replica's version is equal to replica in FE, but replica in FE is bad, + // while backend replica is good, sync it + if (replicaInFe.isBad()) { + return true; + } + + // FE' s replica last failed version > partition's committed version + // this can be occur when be report miss version, fe will set last failed version = visible version + 1 + // then last failed version may greater than partition's committed version + // + // But here cannot got variable partition, we just check lastFailedVersion = version + 1, + // In ReportHandler.sync, we will check if last failed version > partition's committed version again. + if (replicaInFe.getLastFailedVersion() == versionInFe + 1) { + return true; + } + } + + return false; + } + + private void handleCooldownConf(TabletMeta tabletMeta, TTabletInfo beTabletInfo, + List cooldownConfToPush, List cooldownConfToUpdate) { + Tablet tablet; + try { + OlapTable table = (OlapTable) Env.getCurrentInternalCatalog().getDbNullable(tabletMeta.getDbId()) + .getTable(tabletMeta.getTableId()) + .get(); + table.readLock(); + try { + tablet = table.getPartition(tabletMeta.getPartitionId()).getIndex(tabletMeta.getIndexId()) + .getTablet(beTabletInfo.tablet_id); + } finally { + table.readUnlock(); + } + } catch (RuntimeException e) { + if (!Env.getCurrentRecycleBin().isRecyclePartition(tabletMeta.getDbId(), + tabletMeta.getTableId(), tabletMeta.getPartitionId())) { + LOG.warn("failed to get tablet. tabletId={}", beTabletInfo.tablet_id); + } + return; + } + Pair cooldownConf = tablet.getCooldownConf(); + if (beTabletInfo.getCooldownTerm() > cooldownConf.second) { // should not be here + LOG.warn("report cooldownTerm({}) > cooldownTerm in TabletMeta({}), tabletId={}", + beTabletInfo.getCooldownTerm(), cooldownConf.second, beTabletInfo.tablet_id); + return; + } + + if (cooldownConf.first <= 0) { // invalid cooldownReplicaId + CooldownConf conf = new CooldownConf(tabletMeta.getDbId(), tabletMeta.getTableId(), + tabletMeta.getPartitionId(), tabletMeta.getIndexId(), beTabletInfo.tablet_id, cooldownConf.second); + cooldownConfToUpdate.add(conf); + return; + } + + // check cooldown replica is alive + Map replicaMap = replicaMetaTable.row(beTabletInfo.getTabletId()); + if (replicaMap.isEmpty()) { + return; + } + boolean replicaAlive = false; + for (Replica replica : replicaMap.values()) { + if (replica.getId() == cooldownConf.first) { + if (replica.isAlive()) { + replicaAlive = true; + } + break; + } + } + if (!replicaAlive) { + CooldownConf conf = new CooldownConf(tabletMeta.getDbId(), tabletMeta.getTableId(), + tabletMeta.getPartitionId(), tabletMeta.getIndexId(), beTabletInfo.tablet_id, cooldownConf.second); + cooldownConfToUpdate.add(conf); + return; + } + + if (beTabletInfo.getCooldownTerm() < cooldownConf.second) { + CooldownConf conf = new CooldownConf(beTabletInfo.tablet_id, cooldownConf.first, cooldownConf.second); + cooldownConfToPush.add(conf); + return; + } + } + + @Override + public List getReplicas(Long tabletId) { + long stamp = readLock(); + try { + Map replicaMap = replicaMetaTable.row(tabletId); + return replicaMap.values().stream().collect(Collectors.toList()); + } finally { + readUnlock(stamp); + } + } + + /** + * Be will set `used' to false for bad replicas and `version_miss' to true for replicas with hole + * in their version chain. In either case, those replicas need to be fixed by TabletScheduler. + */ + private boolean needRecover(Replica replicaInFe, int schemaHashInFe, TTabletInfo backendTabletInfo) { + if (replicaInFe.getState() != ReplicaState.NORMAL) { + // only normal replica need recover + // case: + // the replica's state is CLONE, which means this a newly created replica in clone process. + // and an old out-of-date replica reports here, and this report should not mark this replica as + // 'need recovery'. + // Other state such as ROLLUP/SCHEMA_CHANGE, the replica behavior is unknown, so for safety reason, + // also not mark this replica as 'need recovery'. + return false; + } + + if (backendTabletInfo.isSetUsed() && !backendTabletInfo.isUsed()) { + // tablet is bad + return true; + } + + if (schemaHashInFe != backendTabletInfo.getSchemaHash() || backendTabletInfo.getVersion() == -1) { + // no data file exist on BE, maybe this is a newly created schema change tablet. no need to recovery + return false; + } + + if (backendTabletInfo.isSetVersionMiss() && backendTabletInfo.isVersionMiss()) { + // even if backend version is less than fe's version, but if version_miss is false, + // which means this may be a stale report. + // so we only return true if version_miss is true. + return true; + } + + // backend versions regressive due to bugs + if (replicaInFe.checkVersionRegressive(backendTabletInfo.getVersion())) { + return true; + } + + return false; + } + + @Override + public void deleteTablet(long tabletId) { + long stamp = writeLock(); + try { + Map replicas = replicaMetaTable.rowMap().remove(tabletId); + if (replicas != null) { + for (long backendId : replicas.keySet()) { + backingReplicaMetaTable.remove(backendId, tabletId); + } + } + tabletMetaMap.remove(tabletId); + if (LOG.isDebugEnabled()) { + LOG.debug("delete tablet: {}", tabletId); + } + } finally { + writeUnlock(stamp); + } + } + + @Override + public void addReplica(long tabletId, Replica replica) { + long stamp = writeLock(); + try { + long backendId = replica.getBackendIdWithoutException(); + Preconditions.checkState(tabletMetaMap.containsKey(tabletId), + "tablet " + tabletId + " not exists, replica " + replica.getId() + + ", backend " + backendId); + replicaMetaTable.put(tabletId, backendId, replica); + backingReplicaMetaTable.put(backendId, tabletId, replica); + if (LOG.isDebugEnabled()) { + LOG.debug("add replica {} of tablet {} in backend {}", + replica.getId(), tabletId, backendId); + } + } finally { + writeUnlock(stamp); + } + } + + @Override + public void deleteReplica(long tabletId, long backendId) { + long stamp = writeLock(); + try { + Preconditions.checkState(tabletMetaMap.containsKey(tabletId), + "tablet " + tabletId + " not exists, backend " + backendId); + if (replicaMetaTable.containsRow(tabletId)) { + Replica replica = replicaMetaTable.remove(tabletId, backendId); + + backingReplicaMetaTable.remove(backendId, tabletId); + if (LOG.isDebugEnabled()) { + LOG.debug("delete replica {} of tablet {} in backend {}", + replica.getId(), tabletId, backendId); + } + } else { + // this may happen when fe restart after tablet is empty(bug cause) + // add log instead of assertion to observe + LOG.error("tablet[{}] contains no replica in inverted index", tabletId); + } + } finally { + writeUnlock(stamp); + } + } + + @Override + public Replica getReplica(long tabletId, long backendId) { + long stamp = readLock(); + try { + Preconditions.checkState(tabletMetaMap.containsKey(tabletId), + "tablet " + tabletId + " not exists, backend " + backendId); + return replicaMetaTable.get(tabletId, backendId); + } finally { + readUnlock(stamp); + } + } + + @Override + public List getReplicasByTabletId(long tabletId) { + long stamp = readLock(); + try { + if (replicaMetaTable.containsRow(tabletId)) { + return Lists.newArrayList(replicaMetaTable.row(tabletId).values()); + } + return Lists.newArrayList(); + } finally { + readUnlock(stamp); + } + } + + @Override + public Long getTabletSizeByBackendId(long backendId) { + Long ret = 0L; + long stamp = readLock(); + try { + Map replicaMetaWithBackend = backingReplicaMetaTable.row(backendId); + if (replicaMetaWithBackend != null) { + ret += replicaMetaWithBackend.size(); + } + } finally { + readUnlock(stamp); + } + return ret; + } + + @Override + public List getTabletIdsByBackendId(long backendId) { + List tabletIds = Lists.newArrayList(); + long stamp = readLock(); + try { + Map replicaMetaWithBackend = backingReplicaMetaTable.row(backendId); + if (replicaMetaWithBackend != null) { + tabletIds.addAll(replicaMetaWithBackend.keySet()); + } + } finally { + readUnlock(stamp); + } + return tabletIds; + } + + @Override + public List> getTabletSizeByBackendIdAndStorageMedium(long backendId, + TStorageMedium storageMedium) { + List> tabletIdSizes = Lists.newArrayList(); + long stamp = readLock(); + try { + Map replicaMetaWithBackend = backingReplicaMetaTable.row(backendId); + if (replicaMetaWithBackend != null) { + tabletIdSizes = replicaMetaWithBackend.entrySet().stream() + .filter(entry -> tabletMetaMap.get(entry.getKey()).getStorageMedium() == storageMedium) + .map(entry -> Pair.of(entry.getKey(), entry.getValue().getDataSize())) + .collect(Collectors.toList()); + } + } finally { + readUnlock(stamp); + } + return tabletIdSizes; + } + + @Override + public int getTabletNumByBackendId(long backendId) { + long stamp = readLock(); + try { + Map replicaMetaWithBackend = backingReplicaMetaTable.row(backendId); + if (replicaMetaWithBackend != null) { + return replicaMetaWithBackend.size(); + } + } finally { + readUnlock(stamp); + } + return 0; + } + + @Override + public Map getReplicaNumByBeIdAndStorageMedium(long backendId) { + Map replicaNumMap = Maps.newHashMap(); + long hddNum = 0; + long ssdNum = 0; + long stamp = readLock(); + try { + Map replicaMetaWithBackend = backingReplicaMetaTable.row(backendId); + if (replicaMetaWithBackend != null) { + for (long tabletId : replicaMetaWithBackend.keySet()) { + if (tabletMetaMap.get(tabletId).getStorageMedium() == TStorageMedium.HDD) { + hddNum++; + } else { + ssdNum++; + } + } + } + } finally { + readUnlock(stamp); + } + replicaNumMap.put(TStorageMedium.HDD, hddNum); + replicaNumMap.put(TStorageMedium.SSD, ssdNum); + return replicaNumMap; + } + + @Override + protected void innerClear() { + replicaMetaTable.clear(); + backingReplicaMetaTable.clear(); + } + + @Override + public void setPartitionCollectInfoMap(ImmutableMap partitionCollectInfoMap) { + this.partitionCollectInfoMap = partitionCollectInfoMap; + } + + // Only build from available bes, exclude colocate tables + @Override + public Map> buildPartitionInfoBySkew( + List availableBeIds, Map> movesInProgress) { + Set dbIds = Sets.newHashSet(); + Set tableIds = Sets.newHashSet(); + Set partitionIds = Sets.newHashSet(); + // Clone ut mocked env, but CatalogRecycleBin is not mockable (it extends from Thread) + if (!FeConstants.runningUnitTest) { + Env.getCurrentRecycleBin().getRecycleIds(dbIds, tableIds, partitionIds); + } + long stamp = readLock(); + + // 1. gen > + // for each replica(all tablets): + // find beId, then replicaCount++ + Map>> partitionReplicasInfoMaps = Maps.newHashMap(); + for (TStorageMedium medium : TStorageMedium.values()) { + partitionReplicasInfoMaps.put(medium, HashBasedTable.create()); + } + try { + // Changes to the returned set will update the underlying table + // tablet id -> (backend id -> replica) + Set> cells = replicaMetaTable.cellSet(); + for (Table.Cell cell : cells) { + Long tabletId = cell.getRowKey(); + Long beId = cell.getColumnKey(); + Pair movePair = movesInProgress.get(tabletId); + TabletMove move = movePair != null ? movePair.first : null; + // there exists move from fromBe to toBe + if (move != null && beId == move.fromBe + && availableBeIds.contains(move.toBe)) { + + // if movePair.second == -1, it means toBe hadn't added this tablet but it will add later; + // otherwise it means toBe had added this tablet + boolean toBeHadReplica = movePair.second != -1L; + if (toBeHadReplica) { + // toBe had add this tablet, fromBe just ignore this tablet + continue; + } + + // later fromBe will delete this replica + // and toBe will add a replica + // so this replica should belong to toBe + beId = move.toBe; + } + + try { + Preconditions.checkState(availableBeIds.contains(beId), "dead be " + beId); + TabletMeta tabletMeta = tabletMetaMap.get(tabletId); + if (dbIds.contains(tabletMeta.getDbId()) || tableIds.contains(tabletMeta.getTableId()) + || partitionIds.contains(tabletMeta.getPartitionId())) { + continue; + } + Preconditions.checkNotNull(tabletMeta, "invalid tablet " + tabletId); + Preconditions.checkState( + !Env.getCurrentColocateIndex().isColocateTable(tabletMeta.getTableId()), + "table " + tabletMeta.getTableId() + " should not be the colocate table"); + + TStorageMedium medium = tabletMeta.getStorageMedium(); + Table> partitionReplicasInfo = partitionReplicasInfoMaps.get(medium); + Map countMap = partitionReplicasInfo.get( + tabletMeta.getPartitionId(), tabletMeta.getIndexId()); + if (countMap == null) { + // If one be doesn't have any replica of one partition, it should be counted too. + countMap = availableBeIds.stream().collect(Collectors.toMap(i -> i, i -> 0L)); + } + + Long count = countMap.get(beId); + countMap.put(beId, count + 1L); + partitionReplicasInfo.put(tabletMeta.getPartitionId(), tabletMeta.getIndexId(), countMap); + partitionReplicasInfoMaps.put(medium, partitionReplicasInfo); + } catch (IllegalStateException | NullPointerException e) { + // If the tablet or be has some problem, don't count in + if (LOG.isDebugEnabled()) { + LOG.debug(e.getMessage()); + } + } + } + } finally { + readUnlock(stamp); + } + + // 2. Populate ClusterBalanceInfo::table_info_by_skew + // for each PartitionId-MaterializedIndex: + // for each beId: record max_count, min_count(replicaCount) + // put to table_info_by_skew + Map> skewMaps = Maps.newHashMap(); + for (TStorageMedium medium : TStorageMedium.values()) { + TreeMultimap partitionInfoBySkew + = TreeMultimap.create(Ordering.natural(), Ordering.arbitrary()); + Set>> mapCells + = partitionReplicasInfoMaps.getOrDefault(medium, HashBasedTable.create()).cellSet(); + for (Table.Cell> cell : mapCells) { + Map countMap = cell.getValue(); + Preconditions.checkNotNull(countMap); + PartitionBalanceInfo pbi = new PartitionBalanceInfo(cell.getRowKey(), cell.getColumnKey()); + for (Map.Entry entry : countMap.entrySet()) { + Long beID = entry.getKey(); + Long replicaCount = entry.getValue(); + pbi.beByReplicaCount.put(replicaCount, beID); + } + // beByReplicaCount values are natural ordering + long minCount = pbi.beByReplicaCount.keySet().first(); + long maxCount = pbi.beByReplicaCount.keySet().last(); + partitionInfoBySkew.put(maxCount - minCount, pbi); + } + skewMaps.put(medium, partitionInfoBySkew); + } + return skewMaps; + } + + // just for ut + @Override + public Table getReplicaMetaTable() { + long stamp = readLock(); + try { + return HashBasedTable.create(replicaMetaTable); + } finally { + readUnlock(stamp); + } + } + + // just for ut + @Override + public Table getBackingReplicaMetaTable() { + long stamp = readLock(); + try { + return HashBasedTable.create(backingReplicaMetaTable); + } finally { + readUnlock(stamp); + } + } + + private boolean isLocal(TStorageMedium storageMedium) { + return storageMedium == TStorageMedium.HDD || storageMedium == TStorageMedium.SSD; + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java index d90a7861662269..71b99a0a86200e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java @@ -17,35 +17,19 @@ package org.apache.doris.catalog; -import org.apache.doris.catalog.Replica.ReplicaState; import org.apache.doris.clone.PartitionRebalancer.TabletMove; -import org.apache.doris.common.Config; -import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; import org.apache.doris.cooldown.CooldownConf; import org.apache.doris.master.PartitionInfoCollector.PartitionCollectInfo; -import org.apache.doris.task.PublishVersionTask; import org.apache.doris.thrift.TPartitionVersionInfo; import org.apache.doris.thrift.TStorageMedium; import org.apache.doris.thrift.TTablet; -import org.apache.doris.thrift.TTabletInfo; import org.apache.doris.thrift.TTabletMetaInfo; -import org.apache.doris.transaction.GlobalTransactionMgrIface; -import org.apache.doris.transaction.PartitionCommitInfo; -import org.apache.doris.transaction.TableCommitInfo; -import org.apache.doris.transaction.TransactionState; -import org.apache.doris.transaction.TransactionStatus; -import com.google.common.base.Preconditions; -import com.google.common.collect.HashBasedTable; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.LinkedHashMultimap; import com.google.common.collect.ListMultimap; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.Ordering; import com.google.common.collect.SetMultimap; -import com.google.common.collect.Sets; import com.google.common.collect.Table; import com.google.common.collect.TreeMultimap; import org.apache.logging.log4j.LogManager; @@ -56,11 +40,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.StampedLock; import java.util.stream.Collectors; @@ -70,7 +49,7 @@ * Checkpoint thread is no need to modify this inverted index, because this inverted index will not be wrote * into images, all meta data are in catalog, and the inverted index will be rebuild when FE restart. */ -public class TabletInvertedIndex { +public abstract class TabletInvertedIndex { private static final Logger LOG = LogManager.getLogger(TabletInvertedIndex.class); public static final int NOT_EXIST_VALUE = -1; @@ -81,58 +60,24 @@ public class TabletInvertedIndex { private StampedLock lock = new StampedLock(); // tablet id -> tablet meta - private Map tabletMetaMap = Maps.newHashMap(); - - /* - * we use this to save memory. - * we do not need create TabletMeta instance for each tablet, - * cause tablets in one (Partition-MaterializedIndex) has same parent info - * (dbId, tableId, partitionId, indexId, schemaHash) - * we use 'tabletMetaTable' to do the update things - * (eg. update schema hash in TabletMeta) - * partition id -> (index id -> tablet meta) - */ - private Table tabletMetaTable = HashBasedTable.create(); - - // tablet id -> (backend id -> replica) - // for cloud mode, no need to known the replica's backend, so use backend id = -1 in cloud mode. - private Table replicaMetaTable = HashBasedTable.create(); - - // backing replica table, for visiting backend replicas faster. - // backend id -> (tablet id -> replica) - private Table backingReplicaMetaTable = HashBasedTable.create(); - - // partition id -> partition info. - // notice partition info update every Config.partition_info_update_interval_secs seconds, - // so it may be stale. - // Notice only none-cloud use it for be reporting tablets. This map is empty in cloud mode. - private volatile ImmutableMap partitionCollectInfoMap = ImmutableMap.of(); - - private final ExecutorService taskPool = new ThreadPoolExecutor( - Config.tablet_report_thread_pool_num, - 2 * Config.tablet_report_thread_pool_num, - // tablet report task default 60s once - 120L, - TimeUnit.SECONDS, - new LinkedBlockingQueue<>(Config.tablet_report_queue_size), - new ThreadPoolExecutor.DiscardOldestPolicy()); + protected Map tabletMetaMap = Maps.newHashMap(); public TabletInvertedIndex() { } - private long readLock() { + protected long readLock() { return this.lock.readLock(); } - private void readUnlock(long stamp) { + protected void readUnlock(long stamp) { this.lock.unlockRead(stamp); } - private long writeLock() { + protected long writeLock() { return this.lock.writeLock(); } - private void writeUnlock(long stamp) { + protected void writeUnlock(long stamp) { this.lock.unlockWrite(stamp); } @@ -150,464 +95,7 @@ public void tabletReport(long backendId, Map backendTablets, List tabletToUpdate, List cooldownConfToPush, List cooldownConfToUpdate) { - List> cooldownTablets = new ArrayList<>(); - long feTabletNum = 0; - long stamp = readLock(); - long start = System.currentTimeMillis(); - - try { - if (LOG.isDebugEnabled()) { - LOG.debug("begin to do tablet diff with backend[{}]. num: {}", backendId, backendTablets.size()); - } - - Map replicaMetaWithBackend = backingReplicaMetaTable.row(backendId); - if (replicaMetaWithBackend != null) { - feTabletNum = replicaMetaWithBackend.size(); - processTabletReportAsync(backendId, backendTablets, backendPartitionsVersion, storageMediumMap, - tabletSyncMap, tabletDeleteFromMeta, tabletFoundInMeta, tabletMigrationMap, - partitionVersionSyncMap, transactionsToPublish, transactionsToClear, tabletRecoveryMap, - tabletToUpdate, cooldownTablets, replicaMetaWithBackend); - } - } finally { - readUnlock(stamp); - } - - // Process cooldown configs outside of read lock to avoid deadlock - cooldownTablets.forEach(p -> handleCooldownConf(p.first, p.second, cooldownConfToPush, cooldownConfToUpdate)); - - logTabletReportSummary(backendId, feTabletNum, backendTablets, backendPartitionsVersion, - tabletSyncMap, tabletDeleteFromMeta, tabletFoundInMeta, tabletMigrationMap, - partitionVersionSyncMap, transactionsToPublish, transactionsToClear, - tabletToUpdate, tabletRecoveryMap, start); - } - - /** - * Process tablet report asynchronously using thread pool - */ - private void processTabletReportAsync(long backendId, Map backendTablets, - Map backendPartitionsVersion, - HashMap storageMediumMap, - ListMultimap tabletSyncMap, - ListMultimap tabletDeleteFromMeta, - Set tabletFoundInMeta, - ListMultimap tabletMigrationMap, - Map partitionVersionSyncMap, - Map> transactionsToPublish, - SetMultimap transactionsToClear, - ListMultimap tabletRecoveryMap, - List tabletToUpdate, - List> cooldownTablets, - Map replicaMetaWithBackend) { - // Calculate optimal chunk size to balance task granularity and concurrency - // For large tablet counts (40W-50W), we want smaller chunks to maximize parallelism - // Target: create at least threadPoolSize * 4 tasks for better load balancing - int totalTablets = replicaMetaWithBackend.size(); - int threadPoolSize = Config.tablet_report_thread_pool_num; - int targetTasks = threadPoolSize * 4; // Create 4x tasks as threads for better load balancing - int chunkSize = Math.max(500, totalTablets / targetTasks); - - // Cap chunk size to avoid too large tasks - // so thread pool queue will not be fulled with few large tasks - int maxChunkSize = 10000; - chunkSize = Math.min(chunkSize, maxChunkSize); - List> entries = new ArrayList<>(replicaMetaWithBackend.entrySet()); - List> tabletFutures = new ArrayList<>(); - int estimatedTasks = (totalTablets + chunkSize - 1) / chunkSize; - if (LOG.isDebugEnabled()) { - LOG.debug("Processing tablet report for backend[{}]: total tablets={}, chunkSize={}, estimated tasks={}", - backendId, totalTablets, chunkSize, estimatedTasks); - } - - for (int i = 0; i < entries.size(); i += chunkSize) { - final int start = i; - final int end = Math.min(i + chunkSize, entries.size()); - - CompletableFuture future = CompletableFuture.runAsync(() -> { - for (int j = start; j < end; j++) { - Map.Entry entry = entries.get(j); - processTabletEntry(backendId, backendTablets, storageMediumMap, tabletSyncMap, - tabletDeleteFromMeta, tabletFoundInMeta, tabletMigrationMap, - transactionsToPublish, transactionsToClear, tabletRecoveryMap, - tabletToUpdate, cooldownTablets, entry); - } - }, taskPool); - - tabletFutures.add(future); - } - - // Process partition versions in parallel - CompletableFuture partitionFuture = CompletableFuture.runAsync(() -> { - processPartitionVersions(backendPartitionsVersion, partitionVersionSyncMap); - }, taskPool); - - // Wait for all tasks to complete - CompletableFuture.allOf(tabletFutures.toArray(new CompletableFuture[0])).join(); - partitionFuture.join(); - } - - /** - * Process a single tablet entry from backend report - */ - private void processTabletEntry(long backendId, Map backendTablets, - HashMap storageMediumMap, - ListMultimap tabletSyncMap, - ListMultimap tabletDeleteFromMeta, - Set tabletFoundInMeta, - ListMultimap tabletMigrationMap, - Map> transactionsToPublish, - SetMultimap transactionsToClear, - ListMultimap tabletRecoveryMap, - List tabletToUpdate, - List> cooldownTablets, - Map.Entry entry) { - long tabletId = entry.getKey(); - Replica replica = entry.getValue(); - - Preconditions.checkState(tabletMetaMap.containsKey(tabletId), - "tablet " + tabletId + " not exists, backend " + backendId); - TabletMeta tabletMeta = tabletMetaMap.get(tabletId); - - if (backendTablets.containsKey(tabletId)) { - // Tablet exists in both FE and BE - TTablet backendTablet = backendTablets.get(tabletId); - TTabletInfo backendTabletInfo = backendTablet.getTabletInfos().get(0); - - tabletFoundInMeta.add(tabletId); - - processExistingTablet(backendId, tabletId, replica, tabletMeta, backendTabletInfo, - storageMediumMap, tabletSyncMap, tabletMigrationMap, transactionsToPublish, - transactionsToClear, tabletRecoveryMap, tabletToUpdate, cooldownTablets); - } else { - // Tablet exists in FE but not in BE - may need deletion - processDeletedTablet(backendId, tabletId, tabletMeta, tabletDeleteFromMeta); - } - } - - /** - * Process tablet that exists in both FE and BE - */ - private void processExistingTablet(long backendId, long tabletId, Replica replica, - TabletMeta tabletMeta, TTabletInfo backendTabletInfo, - HashMap storageMediumMap, - ListMultimap tabletSyncMap, - ListMultimap tabletMigrationMap, - Map> transactionsToPublish, - SetMultimap transactionsToClear, - ListMultimap tabletRecoveryMap, - List tabletToUpdate, - List> cooldownTablets) { - // Check and prepare tablet meta info update - TTabletMetaInfo tabletMetaInfo = prepareTabletMetaInfo(replica, tabletMeta, backendTabletInfo); - - // Check if version sync is needed - if (needSync(replica, backendTabletInfo)) { - synchronized (tabletSyncMap) { - tabletSyncMap.put(tabletMeta.getDbId(), tabletId); - } - } - - // Update replica path and schema hash - updateReplicaBasicInfo(replica, backendTabletInfo); - - // Check if replica needs recovery - if (needRecover(replica, tabletMeta.getOldSchemaHash(), backendTabletInfo)) { - logReplicaRecovery(replica, tabletId, backendId, backendTabletInfo); - synchronized (tabletRecoveryMap) { - tabletRecoveryMap.put(tabletMeta.getDbId(), tabletId); - } - } - - // Handle cooldown policy - if (Config.enable_storage_policy && backendTabletInfo.isSetCooldownTerm()) { - synchronized (cooldownTablets) { - cooldownTablets.add(Pair.of(tabletMeta, backendTabletInfo)); - } - replica.setCooldownMetaId(backendTabletInfo.getCooldownMetaId()); - replica.setCooldownTerm(backendTabletInfo.getCooldownTerm()); - } - - // Check storage medium migration - checkStorageMediumMigration(tabletId, tabletMeta, backendTabletInfo, - storageMediumMap, tabletMigrationMap); - - // Handle transactions - if (backendTabletInfo.isSetTransactionIds()) { - handleBackendTransactions(backendId, backendTabletInfo.getTransactionIds(), tabletId, - tabletMeta, transactionsToPublish, transactionsToClear); - } - - // Update replica version count - updateReplicaVersionCount(replica, backendTabletInfo); - - // Add tablet meta info to update list if needed - if (tabletMetaInfo != null) { - tabletMetaInfo.setTabletId(tabletId); - synchronized (tabletToUpdate) { - tabletToUpdate.add(tabletMetaInfo); - } - } - } - - /** - * Prepare tablet meta info for BE update if needed - */ - private TTabletMetaInfo prepareTabletMetaInfo(Replica replica, TabletMeta tabletMeta, - TTabletInfo backendTabletInfo) { - TTabletMetaInfo tabletMetaInfo = null; - - // Check replica id mismatch - if (backendTabletInfo.getReplicaId() != replica.getId() - && replica.getState() != ReplicaState.CLONE) { - tabletMetaInfo = new TTabletMetaInfo(); - tabletMetaInfo.setReplicaId(replica.getId()); - } - - // Check in-memory flag - PartitionCollectInfo partitionCollectInfo = - partitionCollectInfoMap.get(backendTabletInfo.getPartitionId()); - boolean isInMemory = partitionCollectInfo != null && partitionCollectInfo.isInMemory(); - if (isInMemory != backendTabletInfo.isIsInMemory()) { - if (tabletMetaInfo == null) { - tabletMetaInfo = new TTabletMetaInfo(); - } - tabletMetaInfo.setIsInMemory(isInMemory); - } - - // Check partition id mismatch - if (Config.fix_tablet_partition_id_eq_0 - && tabletMeta.getPartitionId() > 0 - && backendTabletInfo.getPartitionId() == 0) { - LOG.warn("be report tablet partition id not eq fe, in be {} but in fe {}", - backendTabletInfo, tabletMeta); - if (tabletMetaInfo == null) { - tabletMetaInfo = new TTabletMetaInfo(); - } - tabletMetaInfo.setPartitionId(tabletMeta.getPartitionId()); - } - - return tabletMetaInfo; - } - - /** - * Update replica's basic info like path hash and schema hash - */ - private void updateReplicaBasicInfo(Replica replica, TTabletInfo backendTabletInfo) { - // Update path hash - if (backendTabletInfo.isSetPathHash() - && replica.getPathHash() != backendTabletInfo.getPathHash()) { - replica.setPathHash(backendTabletInfo.getPathHash()); - } - - // Update schema hash - if (backendTabletInfo.isSetSchemaHash() - && replica.getState() == ReplicaState.NORMAL - && replica.getSchemaHash() != backendTabletInfo.getSchemaHash()) { - replica.setSchemaHash(backendTabletInfo.getSchemaHash()); - } - } - - /** - * Log replica recovery information - */ - private void logReplicaRecovery(Replica replica, long tabletId, long backendId, - TTabletInfo backendTabletInfo) { - LOG.warn("replica {} of tablet {} on backend {} need recovery. " - + "replica in FE: {}, report version {}, report schema hash: {}, " - + "is bad: {}, is version missing: {}", - replica.getId(), tabletId, backendId, replica, - backendTabletInfo.getVersion(), - backendTabletInfo.getSchemaHash(), - backendTabletInfo.isSetUsed() ? !backendTabletInfo.isUsed() : "false", - backendTabletInfo.isSetVersionMiss() ? backendTabletInfo.isVersionMiss() : "unset"); - } - - /** - * Check if storage medium migration is needed - */ - private void checkStorageMediumMigration(long tabletId, TabletMeta tabletMeta, - TTabletInfo backendTabletInfo, - HashMap storageMediumMap, - ListMultimap tabletMigrationMap) { - if (Config.disable_storage_medium_check) { - return; - } - - long partitionId = tabletMeta.getPartitionId(); - TStorageMedium storageMedium = storageMediumMap.get(partitionId); - - if (storageMedium != null && backendTabletInfo.isSetStorageMedium() - && isLocal(storageMedium) - && isLocal(backendTabletInfo.getStorageMedium()) - && isLocal(tabletMeta.getStorageMedium())) { - - if (storageMedium != backendTabletInfo.getStorageMedium()) { - synchronized (tabletMigrationMap) { - tabletMigrationMap.put(storageMedium, tabletId); - } - } - - if (storageMedium != tabletMeta.getStorageMedium()) { - tabletMeta.setStorageMedium(storageMedium); - } - } - } - - /** - * Update replica's version count - */ - private void updateReplicaVersionCount(Replica replica, TTabletInfo backendTabletInfo) { - if (backendTabletInfo.isSetTotalVersionCount()) { - replica.setTotalVersionCount(backendTabletInfo.getTotalVersionCount()); - replica.setVisibleVersionCount(backendTabletInfo.isSetVisibleVersionCount() - ? backendTabletInfo.getVisibleVersionCount() - : backendTabletInfo.getTotalVersionCount()); - } - } - - /** - * Process tablet that exists in FE but not reported by BE - */ - private void processDeletedTablet(long backendId, long tabletId, TabletMeta tabletMeta, - ListMultimap tabletDeleteFromMeta) { - if (LOG.isDebugEnabled()) { - LOG.debug("backend[{}] does not report tablet[{}-{}]", backendId, tabletId, tabletMeta); - } - synchronized (tabletDeleteFromMeta) { - tabletDeleteFromMeta.put(tabletMeta.getDbId(), tabletId); - } - } - - /** - * Process partition versions reported by BE - */ - private void processPartitionVersions(Map backendPartitionsVersion, - Map partitionVersionSyncMap) { - for (Map.Entry entry : backendPartitionsVersion.entrySet()) { - long partitionId = entry.getKey(); - long backendVersion = entry.getValue(); - PartitionCollectInfo partitionInfo = partitionCollectInfoMap.get(partitionId); - - if (partitionInfo != null && partitionInfo.getVisibleVersion() > backendVersion) { - partitionVersionSyncMap.put(partitionId, partitionInfo.getVisibleVersion()); - } - } - } - - /** - * Log tablet report summary - */ - private void logTabletReportSummary(long backendId, long feTabletNum, - Map backendTablets, - Map backendPartitionsVersion, - ListMultimap tabletSyncMap, - ListMultimap tabletDeleteFromMeta, - Set tabletFoundInMeta, - ListMultimap tabletMigrationMap, - Map partitionVersionSyncMap, - Map> transactionsToPublish, - SetMultimap transactionsToClear, - List tabletToUpdate, - ListMultimap tabletRecoveryMap, - long startTime) { - long endTime = System.currentTimeMillis(); - long toClearTransactionsNum = transactionsToClear.keySet().size(); - long toClearTransactionsPartitions = transactionsToClear.values().size(); - long toPublishTransactionsNum = transactionsToPublish.values().stream() - .mapToLong(m -> m.keySet().size()).sum(); - long toPublishTransactionsPartitions = transactionsToPublish.values().stream() - .mapToLong(m -> m.values().size()).sum(); - - LOG.info("finished to do tablet diff with backend[{}]. fe tablet num: {}, backend tablet num: {}. " - + "sync: {}, metaDel: {}, foundInMeta: {}, migration: {}, " - + "backend partition num: {}, backend need update: {}, " - + "found invalid transactions {}(partitions: {}), " - + "found republish transactions {}(partitions: {}), " - + "tabletToUpdate: {}, need recovery: {}, cost: {} ms", - backendId, feTabletNum, backendTablets.size(), tabletSyncMap.size(), - tabletDeleteFromMeta.size(), tabletFoundInMeta.size(), tabletMigrationMap.size(), - backendPartitionsVersion.size(), partitionVersionSyncMap.size(), - toClearTransactionsNum, toClearTransactionsPartitions, - toPublishTransactionsNum, toPublishTransactionsPartitions, - tabletToUpdate.size(), tabletRecoveryMap.size(), (endTime - startTime)); - } - - private void handleBackendTransactions(long backendId, List transactionIds, long tabletId, - TabletMeta tabletMeta, Map> transactionsToPublish, - SetMultimap transactionsToClear) { - GlobalTransactionMgrIface transactionMgr = Env.getCurrentGlobalTransactionMgr(); - long partitionId = tabletMeta.getPartitionId(); - for (Long transactionId : transactionIds) { - TransactionState transactionState = transactionMgr.getTransactionState(tabletMeta.getDbId(), transactionId); - if (transactionState == null || transactionState.getTransactionStatus() == TransactionStatus.ABORTED) { - synchronized (transactionsToClear) { - transactionsToClear.put(transactionId, tabletMeta.getPartitionId()); - } - if (LOG.isDebugEnabled()) { - LOG.debug("transaction id [{}] is not valid any more, clear it from backend [{}]", - transactionId, backendId); - } - } else if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) { - publishPartition(transactionState, transactionId, tabletMeta, partitionId, transactionsToPublish); - } else if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED) { - // for some reasons, transaction pushlish succeed replica num less than quorum, - // this transaction's status can not to be VISIBLE, and this publish task of - // this replica of this tablet on this backend need retry publish success to - // make transaction VISIBLE when last publish failed. - Map> publishVersionTask = transactionState.getPublishVersionTasks(); - List tasks = publishVersionTask.get(backendId); - if (tasks == null) { - continue; - } - for (PublishVersionTask task : tasks) { - if (task != null && task.isFinished()) { - List errorTablets = task.getErrorTablets(); - if (errorTablets != null) { - for (int i = 0; i < errorTablets.size(); i++) { - if (tabletId == errorTablets.get(i)) { - publishPartition(transactionState, transactionId, tabletMeta, partitionId, - transactionsToPublish); - break; - } - } - } - } - } - } - } - } - - // the transactionId may be sub transaction id or transaction id - private TPartitionVersionInfo generatePartitionVersionInfoWhenReport(TransactionState transactionState, - long transactionId, TabletMeta tabletMeta, long partitionId) { - TableCommitInfo tableCommitInfo; - if (transactionState.getSubTxnIds() == null) { - tableCommitInfo = transactionState.getTableCommitInfo(tabletMeta.getTableId()); - } else { - tableCommitInfo = transactionState.getTableCommitInfoBySubTxnId(transactionId); - } - if (tableCommitInfo != null && tableCommitInfo.getPartitionCommitInfo(partitionId) != null) { - PartitionCommitInfo partitionCommitInfo = tableCommitInfo.getPartitionCommitInfo(partitionId); - return new TPartitionVersionInfo(tabletMeta.getPartitionId(), - partitionCommitInfo.getVersion(), 0); - } - return null; - } - - private void publishPartition(TransactionState transactionState, long transactionId, TabletMeta tabletMeta, - long partitionId, Map> transactionsToPublish) { - TPartitionVersionInfo versionInfo = generatePartitionVersionInfoWhenReport(transactionState, - transactionId, tabletMeta, partitionId); - if (versionInfo != null) { - synchronized (transactionsToPublish) { - SetMultimap map = transactionsToPublish.get( - transactionState.getDbId()); - if (map == null) { - map = LinkedHashMultimap.create(); - transactionsToPublish.put(transactionState.getDbId(), map); - } - map.put(transactionId, versionInfo); - } - } + throw new UnsupportedOperationException("tabletReport is not supported in TabletInvertedIndex"); } public TabletMeta getTabletMeta(long tabletId) { @@ -632,157 +120,7 @@ public List getTabletMetaList(List tabletIdList) { } } - private boolean needSync(Replica replicaInFe, TTabletInfo backendTabletInfo) { - if (backendTabletInfo.isSetUsed() && !backendTabletInfo.isUsed()) { - // tablet is bad, do not sync - // it will be handled in needRecovery() - return false; - } - - if (replicaInFe.getState() == ReplicaState.ALTER) { - // ignore the replica is ALTER state. its version will be taken care by load process and alter table process - return false; - } - - long versionInFe = replicaInFe.getVersion(); - - if (backendTabletInfo.getVersion() > versionInFe) { - // backend replica's version is larger or newer than replica in FE, sync it. - return true; - } else if (versionInFe == backendTabletInfo.getVersion()) { - // backend replica's version is equal to replica in FE, but replica in FE is bad, - // while backend replica is good, sync it - if (replicaInFe.isBad()) { - return true; - } - - // FE' s replica last failed version > partition's committed version - // this can be occur when be report miss version, fe will set last failed version = visible version + 1 - // then last failed version may greater than partition's committed version - // - // But here cannot got variable partition, we just check lastFailedVersion = version + 1, - // In ReportHandler.sync, we will check if last failed version > partition's committed version again. - if (replicaInFe.getLastFailedVersion() == versionInFe + 1) { - return true; - } - } - - return false; - } - - private void handleCooldownConf(TabletMeta tabletMeta, TTabletInfo beTabletInfo, - List cooldownConfToPush, List cooldownConfToUpdate) { - Tablet tablet; - try { - OlapTable table = (OlapTable) Env.getCurrentInternalCatalog().getDbNullable(tabletMeta.getDbId()) - .getTable(tabletMeta.getTableId()) - .get(); - table.readLock(); - try { - tablet = table.getPartition(tabletMeta.getPartitionId()).getIndex(tabletMeta.getIndexId()) - .getTablet(beTabletInfo.tablet_id); - } finally { - table.readUnlock(); - } - } catch (RuntimeException e) { - if (!Env.getCurrentRecycleBin().isRecyclePartition(tabletMeta.getDbId(), - tabletMeta.getTableId(), tabletMeta.getPartitionId())) { - LOG.warn("failed to get tablet. tabletId={}", beTabletInfo.tablet_id); - } - return; - } - Pair cooldownConf = tablet.getCooldownConf(); - if (beTabletInfo.getCooldownTerm() > cooldownConf.second) { // should not be here - LOG.warn("report cooldownTerm({}) > cooldownTerm in TabletMeta({}), tabletId={}", - beTabletInfo.getCooldownTerm(), cooldownConf.second, beTabletInfo.tablet_id); - return; - } - - if (cooldownConf.first <= 0) { // invalid cooldownReplicaId - CooldownConf conf = new CooldownConf(tabletMeta.getDbId(), tabletMeta.getTableId(), - tabletMeta.getPartitionId(), tabletMeta.getIndexId(), beTabletInfo.tablet_id, cooldownConf.second); - cooldownConfToUpdate.add(conf); - return; - } - - // check cooldown replica is alive - Map replicaMap = replicaMetaTable.row(beTabletInfo.getTabletId()); - if (replicaMap.isEmpty()) { - return; - } - boolean replicaAlive = false; - for (Replica replica : replicaMap.values()) { - if (replica.getId() == cooldownConf.first) { - if (replica.isAlive()) { - replicaAlive = true; - } - break; - } - } - if (!replicaAlive) { - CooldownConf conf = new CooldownConf(tabletMeta.getDbId(), tabletMeta.getTableId(), - tabletMeta.getPartitionId(), tabletMeta.getIndexId(), beTabletInfo.tablet_id, cooldownConf.second); - cooldownConfToUpdate.add(conf); - return; - } - - if (beTabletInfo.getCooldownTerm() < cooldownConf.second) { - CooldownConf conf = new CooldownConf(beTabletInfo.tablet_id, cooldownConf.first, cooldownConf.second); - cooldownConfToPush.add(conf); - return; - } - } - - public List getReplicas(Long tabletId) { - long stamp = readLock(); - try { - Map replicaMap = replicaMetaTable.row(tabletId); - return replicaMap.values().stream().collect(Collectors.toList()); - } finally { - readUnlock(stamp); - } - } - - /** - * Be will set `used' to false for bad replicas and `version_miss' to true for replicas with hole - * in their version chain. In either case, those replicas need to be fixed by TabletScheduler. - */ - private boolean needRecover(Replica replicaInFe, int schemaHashInFe, TTabletInfo backendTabletInfo) { - if (replicaInFe.getState() != ReplicaState.NORMAL) { - // only normal replica need recover - // case: - // the replica's state is CLONE, which means this a newly created replica in clone process. - // and an old out-of-date replica reports here, and this report should not mark this replica as - // 'need recovery'. - // Other state such as ROLLUP/SCHEMA_CHANGE, the replica behavior is unknown, so for safety reason, - // also not mark this replica as 'need recovery'. - return false; - } - - if (backendTabletInfo.isSetUsed() && !backendTabletInfo.isUsed()) { - // tablet is bad - return true; - } - - if (schemaHashInFe != backendTabletInfo.getSchemaHash() || backendTabletInfo.getVersion() == -1) { - // no data file exist on BE, maybe this is a newly created schema change tablet. no need to recovery - return false; - } - - if (backendTabletInfo.isSetVersionMiss() && backendTabletInfo.isVersionMiss()) { - // even if backend version is less than fe's version, but if version_miss is false, - // which means this may be a stale report. - // so we only return true if version_miss is true. - return true; - } - - // backend versions regressive due to bugs - if (replicaInFe.checkVersionRegressive(backendTabletInfo.getVersion())) { - return true; - } - - return false; - } + public abstract List getReplicas(Long tabletId); // always add tablet before adding replicas public void addTablet(long tabletId, TabletMeta tabletMeta) { @@ -792,13 +130,6 @@ public void addTablet(long tabletId, TabletMeta tabletMeta) { return; } tabletMetaMap.put(tabletId, tabletMeta); - if (!tabletMetaTable.contains(tabletMeta.getPartitionId(), tabletMeta.getIndexId())) { - tabletMetaTable.put(tabletMeta.getPartitionId(), tabletMeta.getIndexId(), tabletMeta); - if (LOG.isDebugEnabled()) { - LOG.debug("add tablet meta: {}", tabletId); - } - } - if (LOG.isDebugEnabled()) { LOG.debug("add tablet: {}", tabletId); } @@ -807,146 +138,30 @@ public void addTablet(long tabletId, TabletMeta tabletMeta) { } } - public void deleteTablet(long tabletId) { - long stamp = writeLock(); - try { - Map replicas = replicaMetaTable.rowMap().remove(tabletId); - if (replicas != null) { - for (long backendId : replicas.keySet()) { - backingReplicaMetaTable.remove(backendId, tabletId); - } - } - TabletMeta tabletMeta = tabletMetaMap.remove(tabletId); - if (tabletMeta != null) { - tabletMetaTable.remove(tabletMeta.getPartitionId(), tabletMeta.getIndexId()); - if (LOG.isDebugEnabled()) { - LOG.debug("delete tablet meta: {}", tabletId); - } - } + public abstract void deleteTablet(long tabletId); - if (LOG.isDebugEnabled()) { - LOG.debug("delete tablet: {}", tabletId); - } - } finally { - writeUnlock(stamp); - } - } + public abstract void addReplica(long tabletId, Replica replica); - public void addReplica(long tabletId, Replica replica) { - long stamp = writeLock(); - try { - // cloud mode, create table not need backendId, represent with -1. - long backendId = Config.isCloudMode() ? -1 : replica.getBackendIdWithoutException(); - Preconditions.checkState(tabletMetaMap.containsKey(tabletId), - "tablet " + tabletId + " not exists, replica " + replica.getId() - + ", backend " + backendId); - replicaMetaTable.put(tabletId, backendId, replica); - backingReplicaMetaTable.put(backendId, tabletId, replica); - if (LOG.isDebugEnabled()) { - LOG.debug("add replica {} of tablet {} in backend {}", - replica.getId(), tabletId, backendId); - } - } finally { - writeUnlock(stamp); - } - } + public abstract void deleteReplica(long tabletId, long backendId); - public void deleteReplica(long tabletId, long backendId) { - long stamp = writeLock(); - try { - Preconditions.checkState(tabletMetaMap.containsKey(tabletId), - "tablet " + tabletId + " not exists, backend " + backendId); - if (Config.isCloudMode()) { - backendId = -1; - } - if (replicaMetaTable.containsRow(tabletId)) { - Replica replica = replicaMetaTable.remove(tabletId, backendId); + public abstract Replica getReplica(long tabletId, long backendId); - backingReplicaMetaTable.remove(backendId, tabletId); - if (LOG.isDebugEnabled()) { - LOG.debug("delete replica {} of tablet {} in backend {}", - replica.getId(), tabletId, backendId); - } - } else { - // this may happen when fe restart after tablet is empty(bug cause) - // add log instead of assertion to observe - LOG.error("tablet[{}] contains no replica in inverted index", tabletId); - } - } finally { - writeUnlock(stamp); - } - } - - public Replica getReplica(long tabletId, long backendId) { - long stamp = readLock(); - try { - Preconditions.checkState(tabletMetaMap.containsKey(tabletId), - "tablet " + tabletId + " not exists, backend " + backendId); - if (Config.isCloudMode()) { - backendId = -1; - } - return replicaMetaTable.get(tabletId, backendId); - } finally { - readUnlock(stamp); - } - } - - public List getReplicasByTabletId(long tabletId) { - long stamp = readLock(); - try { - if (replicaMetaTable.containsRow(tabletId)) { - return Lists.newArrayList(replicaMetaTable.row(tabletId).values()); - } - return Lists.newArrayList(); - } finally { - readUnlock(stamp); - } - } + public abstract List getReplicasByTabletId(long tabletId); public Long getTabletSizeByBackendId(long backendId) { - Long ret = 0L; - long stamp = readLock(); - try { - Map replicaMetaWithBackend = backingReplicaMetaTable.row(backendId); - if (replicaMetaWithBackend != null) { - ret += replicaMetaWithBackend.size(); - } - } finally { - readUnlock(stamp); - } - return ret; + throw new UnsupportedOperationException( + "getTabletSizeByBackendId is not supported in TabletInvertedIndex"); } public List getTabletIdsByBackendId(long backendId) { - List tabletIds = Lists.newArrayList(); - long stamp = readLock(); - try { - Map replicaMetaWithBackend = backingReplicaMetaTable.row(backendId); - if (replicaMetaWithBackend != null) { - tabletIds.addAll(replicaMetaWithBackend.keySet()); - } - } finally { - readUnlock(stamp); - } - return tabletIds; + throw new UnsupportedOperationException( + "getTabletIdsByBackendId is not supported in TabletInvertedIndex"); } public List> getTabletSizeByBackendIdAndStorageMedium(long backendId, TStorageMedium storageMedium) { - List> tabletIdSizes = Lists.newArrayList(); - long stamp = readLock(); - try { - Map replicaMetaWithBackend = backingReplicaMetaTable.row(backendId); - if (replicaMetaWithBackend != null) { - tabletIdSizes = replicaMetaWithBackend.entrySet().stream() - .filter(entry -> tabletMetaMap.get(entry.getKey()).getStorageMedium() == storageMedium) - .map(entry -> Pair.of(entry.getKey(), entry.getValue().getDataSize())) - .collect(Collectors.toList()); - } - } finally { - readUnlock(stamp); - } - return tabletIdSizes; + throw new UnsupportedOperationException( + "getTabletSizeByBackendIdAndStorageMedium is not supported in TabletInvertedIndex"); } public List getTabletIdsByBackendIdAndStorageMedium(long backendId, @@ -956,168 +171,36 @@ public List getTabletIdsByBackendIdAndStorageMedium(long backendId, } public int getTabletNumByBackendId(long backendId) { - long stamp = readLock(); - try { - Map replicaMetaWithBackend = backingReplicaMetaTable.row(backendId); - if (replicaMetaWithBackend != null) { - return replicaMetaWithBackend.size(); - } - } finally { - readUnlock(stamp); - } - return 0; + throw new UnsupportedOperationException( + "getTabletNumByBackendId is not supported in TabletInvertedIndex"); } public Map getReplicaNumByBeIdAndStorageMedium(long backendId) { - Map replicaNumMap = Maps.newHashMap(); - long hddNum = 0; - long ssdNum = 0; - long stamp = readLock(); - try { - Map replicaMetaWithBackend = backingReplicaMetaTable.row(backendId); - if (replicaMetaWithBackend != null) { - for (long tabletId : replicaMetaWithBackend.keySet()) { - if (tabletMetaMap.get(tabletId).getStorageMedium() == TStorageMedium.HDD) { - hddNum++; - } else { - ssdNum++; - } - } - } - } finally { - readUnlock(stamp); - } - replicaNumMap.put(TStorageMedium.HDD, hddNum); - replicaNumMap.put(TStorageMedium.SSD, ssdNum); - return replicaNumMap; + throw new UnsupportedOperationException( + "getReplicaNumByBeIdAndStorageMedium is not supported in TabletInvertedIndex"); } + protected abstract void innerClear(); + // just for test public void clear() { long stamp = writeLock(); try { tabletMetaMap.clear(); - tabletMetaTable.clear(); - replicaMetaTable.clear(); - backingReplicaMetaTable.clear(); + innerClear(); } finally { writeUnlock(stamp); } } public void setPartitionCollectInfoMap(ImmutableMap partitionCollectInfoMap) { - this.partitionCollectInfoMap = partitionCollectInfoMap; + throw new UnsupportedOperationException( + "setPartitionCollectInfoMap is not supported in TabletInvertedIndex"); } - // Only build from available bes, exclude colocate tables public Map> buildPartitionInfoBySkew( List availableBeIds, Map> movesInProgress) { - Set dbIds = Sets.newHashSet(); - Set tableIds = Sets.newHashSet(); - Set partitionIds = Sets.newHashSet(); - // Clone ut mocked env, but CatalogRecycleBin is not mockable (it extends from Thread) - if (!FeConstants.runningUnitTest) { - Env.getCurrentRecycleBin().getRecycleIds(dbIds, tableIds, partitionIds); - } - long stamp = readLock(); - - // 1. gen > - // for each replica(all tablets): - // find beId, then replicaCount++ - Map>> partitionReplicasInfoMaps = Maps.newHashMap(); - for (TStorageMedium medium : TStorageMedium.values()) { - partitionReplicasInfoMaps.put(medium, HashBasedTable.create()); - } - try { - // Changes to the returned set will update the underlying table - // tablet id -> (backend id -> replica) - Set> cells = replicaMetaTable.cellSet(); - for (Table.Cell cell : cells) { - Long tabletId = cell.getRowKey(); - Long beId = cell.getColumnKey(); - Pair movePair = movesInProgress.get(tabletId); - TabletMove move = movePair != null ? movePair.first : null; - // there exists move from fromBe to toBe - if (move != null && beId == move.fromBe - && availableBeIds.contains(move.toBe)) { - - // if movePair.second == -1, it means toBe hadn't added this tablet but it will add later; - // otherwise it means toBe had added this tablet - boolean toBeHadReplica = movePair.second != -1L; - if (toBeHadReplica) { - // toBe had add this tablet, fromBe just ignore this tablet - continue; - } - - // later fromBe will delete this replica - // and toBe will add a replica - // so this replica should belong to toBe - beId = move.toBe; - } - - try { - Preconditions.checkState(availableBeIds.contains(beId), "dead be " + beId); - TabletMeta tabletMeta = tabletMetaMap.get(tabletId); - if (dbIds.contains(tabletMeta.getDbId()) || tableIds.contains(tabletMeta.getTableId()) - || partitionIds.contains(tabletMeta.getPartitionId())) { - continue; - } - Preconditions.checkNotNull(tabletMeta, "invalid tablet " + tabletId); - Preconditions.checkState( - !Env.getCurrentColocateIndex().isColocateTable(tabletMeta.getTableId()), - "table " + tabletMeta.getTableId() + " should not be the colocate table"); - - TStorageMedium medium = tabletMeta.getStorageMedium(); - Table> partitionReplicasInfo = partitionReplicasInfoMaps.get(medium); - Map countMap = partitionReplicasInfo.get( - tabletMeta.getPartitionId(), tabletMeta.getIndexId()); - if (countMap == null) { - // If one be doesn't have any replica of one partition, it should be counted too. - countMap = availableBeIds.stream().collect(Collectors.toMap(i -> i, i -> 0L)); - } - - Long count = countMap.get(beId); - countMap.put(beId, count + 1L); - partitionReplicasInfo.put(tabletMeta.getPartitionId(), tabletMeta.getIndexId(), countMap); - partitionReplicasInfoMaps.put(medium, partitionReplicasInfo); - } catch (IllegalStateException | NullPointerException e) { - // If the tablet or be has some problem, don't count in - if (LOG.isDebugEnabled()) { - LOG.debug(e.getMessage()); - } - } - } - } finally { - readUnlock(stamp); - } - - // 2. Populate ClusterBalanceInfo::table_info_by_skew - // for each PartitionId-MaterializedIndex: - // for each beId: record max_count, min_count(replicaCount) - // put to table_info_by_skew - Map> skewMaps = Maps.newHashMap(); - for (TStorageMedium medium : TStorageMedium.values()) { - TreeMultimap partitionInfoBySkew - = TreeMultimap.create(Ordering.natural(), Ordering.arbitrary()); - Set>> mapCells - = partitionReplicasInfoMaps.getOrDefault(medium, HashBasedTable.create()).cellSet(); - for (Table.Cell> cell : mapCells) { - Map countMap = cell.getValue(); - Preconditions.checkNotNull(countMap); - PartitionBalanceInfo pbi = new PartitionBalanceInfo(cell.getRowKey(), cell.getColumnKey()); - for (Map.Entry entry : countMap.entrySet()) { - Long beID = entry.getKey(); - Long replicaCount = entry.getValue(); - pbi.beByReplicaCount.put(replicaCount, beID); - } - // beByReplicaCount values are natural ordering - long minCount = pbi.beByReplicaCount.keySet().first(); - long maxCount = pbi.beByReplicaCount.keySet().last(); - partitionInfoBySkew.put(maxCount - minCount, pbi); - } - skewMaps.put(medium, partitionInfoBySkew); - } - return skewMaps; + throw new UnsupportedOperationException("buildPartitionInfoBySkew is not supported in TabletInvertedIndex"); } public static class PartitionBalanceInfo { @@ -1145,32 +228,12 @@ public String toString() { // just for ut public Table getReplicaMetaTable() { - long stamp = readLock(); - try { - return HashBasedTable.create(replicaMetaTable); - } finally { - readUnlock(stamp); - } + throw new UnsupportedOperationException("getReplicaMetaTable is not supported in TabletInvertedIndex"); } // just for ut public Table getBackingReplicaMetaTable() { - long stamp = readLock(); - try { - return HashBasedTable.create(backingReplicaMetaTable); - } finally { - readUnlock(stamp); - } - } - - // just for ut - public Table getTabletMetaTable() { - long stamp = readLock(); - try { - return HashBasedTable.create(tabletMetaTable); - } finally { - readUnlock(stamp); - } + throw new UnsupportedOperationException("getBackingReplicaMetaTable is not supported in TabletInvertedIndex"); } // just for ut @@ -1183,8 +246,4 @@ public Map getTabletMetaMap() { } } - private boolean isLocal(TStorageMedium storageMedium) { - return storageMedium == TStorageMedium.HDD || storageMedium == TStorageMedium.SSD; - } - } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnvFactory.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnvFactory.java index 958c4fd5e1fe6a..5b240dcd8eff6c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnvFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnvFactory.java @@ -28,6 +28,7 @@ import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.ReplicaAllocation; import org.apache.doris.catalog.Tablet; +import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.cloud.common.util.CloudPropertyAnalyzer; import org.apache.doris.cloud.datasource.CloudInternalCatalog; import org.apache.doris.cloud.load.CleanCopyJobScheduler; @@ -83,6 +84,11 @@ public SystemInfoService createSystemInfoService() { return new CloudSystemInfoService(); } + @Override + public TabletInvertedIndex createTabletInvertedIndex() { + return new CloudTabletInvertedIndex(); + } + @Override public Type getPartitionClass() { return CloudPartition.class; diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletInvertedIndex.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletInvertedIndex.java new file mode 100644 index 00000000000000..c0ebb3c134d4e3 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletInvertedIndex.java @@ -0,0 +1,133 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cloud.catalog; + +import org.apache.doris.catalog.Replica; +import org.apache.doris.catalog.TabletInvertedIndex; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; +import java.util.Map; + +public class CloudTabletInvertedIndex extends TabletInvertedIndex { + private static final Logger LOG = LogManager.getLogger(CloudTabletInvertedIndex.class); + + // tablet id -> replica + // for cloud mode, no need to know the replica's backend + private Map replicaMetaMap = Maps.newHashMap(); + + public CloudTabletInvertedIndex() { + super(); + } + + @Override + public List getReplicas(Long tabletId) { + long stamp = readLock(); + try { + if (replicaMetaMap.containsKey(tabletId)) { + return Lists.newArrayList(replicaMetaMap.get(tabletId)); + } + return Lists.newArrayList(); + } finally { + readUnlock(stamp); + } + } + + @Override + public void deleteTablet(long tabletId) { + long stamp = writeLock(); + try { + replicaMetaMap.remove(tabletId); + tabletMetaMap.remove(tabletId); + if (LOG.isDebugEnabled()) { + LOG.debug("delete tablet: {}", tabletId); + } + } finally { + writeUnlock(stamp); + } + } + + @Override + public void addReplica(long tabletId, Replica replica) { + long stamp = writeLock(); + try { + Preconditions.checkState(tabletMetaMap.containsKey(tabletId), + "tablet " + tabletId + " not exists, replica " + replica.getId()); + replicaMetaMap.put(tabletId, replica); + if (LOG.isDebugEnabled()) { + LOG.debug("add replica {} of tablet {}", replica.getId(), tabletId); + } + } finally { + writeUnlock(stamp); + } + } + + @Override + public void deleteReplica(long tabletId, long backendId) { + long stamp = writeLock(); + try { + Preconditions.checkState(tabletMetaMap.containsKey(tabletId), "tablet " + tabletId + " not exists"); + if (replicaMetaMap.containsKey(tabletId)) { + Replica replica = replicaMetaMap.remove(tabletId); + if (LOG.isDebugEnabled()) { + LOG.debug("delete replica {} of tablet {}", replica.getId(), tabletId); + } + } else { + // this may happen when fe restart after tablet is empty(bug cause) + // add log instead of assertion to observe + LOG.error("tablet[{}] contains no replica in inverted index", tabletId); + } + } finally { + writeUnlock(stamp); + } + } + + @Override + public Replica getReplica(long tabletId, long backendId) { + long stamp = readLock(); + try { + Preconditions.checkState(tabletMetaMap.containsKey(tabletId), "tablet " + tabletId + " not exists"); + return replicaMetaMap.get(tabletId); + } finally { + readUnlock(stamp); + } + } + + @Override + public List getReplicasByTabletId(long tabletId) { + long stamp = readLock(); + try { + if (replicaMetaMap.containsKey(tabletId)) { + return Lists.newArrayList(replicaMetaMap.get(tabletId)); + } + return Lists.newArrayList(); + } finally { + readUnlock(stamp); + } + } + + @Override + protected void innerClear() { + replicaMetaMap.clear(); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/backup/BackupHandlerTest.java b/fe/fe-core/src/test/java/org/apache/doris/backup/BackupHandlerTest.java index 6940406459f448..1edd718169db47 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/backup/BackupHandlerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/backup/BackupHandlerTest.java @@ -21,6 +21,7 @@ import org.apache.doris.catalog.BrokerMgr; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.LocalTabletInvertedIndex; import org.apache.doris.catalog.MaterializedIndex; import org.apache.doris.catalog.MaterializedIndex.IndexExtState; import org.apache.doris.catalog.OlapTable; @@ -92,7 +93,7 @@ public class BackupHandlerTest { private String tmpPath = "./tmp" + System.currentTimeMillis(); - private TabletInvertedIndex invertedIndex = new TabletInvertedIndex(); + private TabletInvertedIndex invertedIndex = new LocalTabletInvertedIndex(); @Before public void setUp() throws Exception { diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java index 3a744e04c14ed8..c81231bf3d0873 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java @@ -32,7 +32,6 @@ import org.junit.Assert; import org.junit.jupiter.api.Test; -import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -60,8 +59,6 @@ public void testDuplicateCreateTable() throws Exception { + "distributed by hash(k2) buckets 1\n" + "properties('replication_num' = '1','colocate_with'='test'); "; createTable(sql); Set tabletIdSetAfterCreateFirstTable = env.getTabletInvertedIndex().getReplicaMetaTable().rowKeySet(); - Set tabletMetaSetBeforeCreateFirstTable = - new HashSet<>(env.getTabletInvertedIndex().getTabletMetaTable().values()); Set colocateTableIdBeforeCreateFirstTable = env.getColocateTableIndex().getTable2Group().keySet(); Assert.assertTrue(colocateTableIdBeforeCreateFirstTable.size() > 0); Assert.assertTrue(tabletIdSetAfterCreateFirstTable.size() > 0); @@ -71,13 +68,10 @@ public void testDuplicateCreateTable() throws Exception { Set tabletIdSetAfterDuplicateCreateTable1 = env.getTabletInvertedIndex().getReplicaMetaTable().rowKeySet(); Set tabletIdSetAfterDuplicateCreateTable2 = env.getTabletInvertedIndex().getBackingReplicaMetaTable().columnKeySet(); Set tabletIdSetAfterDuplicateCreateTable3 = env.getTabletInvertedIndex().getTabletMetaMap().keySet(); - Set tabletIdSetAfterDuplicateCreateTable4 = - new HashSet<>(env.getTabletInvertedIndex().getTabletMetaTable().values()); Assert.assertEquals(tabletIdSetAfterCreateFirstTable, tabletIdSetAfterDuplicateCreateTable1); Assert.assertEquals(tabletIdSetAfterCreateFirstTable, tabletIdSetAfterDuplicateCreateTable2); Assert.assertEquals(tabletIdSetAfterCreateFirstTable, tabletIdSetAfterDuplicateCreateTable3); - Assert.assertEquals(tabletMetaSetBeforeCreateFirstTable, tabletIdSetAfterDuplicateCreateTable4); // check whether table id is cleared from colocate group after duplicate create table Set colocateTableIdAfterCreateFirstTable = env.getColocateTableIndex().getTable2Group().keySet(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java index 6d20800b52bf02..a9ee9b97022787 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java @@ -53,7 +53,7 @@ public class TabletTest { @Before public void makeTablet() { - invertedIndex = new TabletInvertedIndex(); + invertedIndex = new LocalTabletInvertedIndex(); infoService = new SystemInfoService(); for (long beId = 1L; beId <= 4L; beId++) { Backend be = new Backend(beId, "127.0.0." + beId, 8030); diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/ClusterLoadStatisticsTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/ClusterLoadStatisticsTest.java index 118018f3b58269..798a5d1707b239 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/ClusterLoadStatisticsTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/ClusterLoadStatisticsTest.java @@ -20,6 +20,7 @@ import org.apache.doris.catalog.DiskInfo; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.LocalReplica; +import org.apache.doris.catalog.LocalTabletInvertedIndex; import org.apache.doris.catalog.Replica.ReplicaState; import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.catalog.TabletMeta; @@ -148,7 +149,7 @@ public void setUp() { systemInfoService.addBackend(be4); // tablet - invertedIndex = new TabletInvertedIndex(); + invertedIndex = new LocalTabletInvertedIndex(); invertedIndex.addTablet(50000, new TabletMeta(1, 2, 3, 4, 5, TStorageMedium.HDD)); invertedIndex.addReplica(50000, new LocalReplica(50001, be1.getId(), 0, ReplicaState.NORMAL)); diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/DecommissionTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/DecommissionTest.java index a04249aad947ec..af0ab7e29b287e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/DecommissionTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/DecommissionTest.java @@ -18,6 +18,7 @@ package org.apache.doris.clone; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.LocalTabletInvertedIndex; import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.common.Config; import org.apache.doris.common.ExceptionChecker; @@ -65,7 +66,7 @@ public class DecommissionTest { private long id = 10086; private final SystemInfoService systemInfoService = new SystemInfoService(); - private final TabletInvertedIndex invertedIndex = new TabletInvertedIndex(); + private final TabletInvertedIndex invertedIndex = new LocalTabletInvertedIndex(); @BeforeClass public static void beforeClass() throws Exception { diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/DiskRebalanceTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/DiskRebalanceTest.java index 9ccd607b915496..8a0121c1543ab7 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/DiskRebalanceTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/DiskRebalanceTest.java @@ -23,6 +23,7 @@ import org.apache.doris.catalog.Env; import org.apache.doris.catalog.HashDistributionInfo; import org.apache.doris.catalog.KeysType; +import org.apache.doris.catalog.LocalTabletInvertedIndex; import org.apache.doris.catalog.MaterializedIndex; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Partition; @@ -74,7 +75,7 @@ public class DiskRebalanceTest { private OlapTable olapTable; private final SystemInfoService systemInfoService = new SystemInfoService(); - private final TabletInvertedIndex invertedIndex = new TabletInvertedIndex(); + private final TabletInvertedIndex invertedIndex = new LocalTabletInvertedIndex(); private Map statisticMap; private Map backendsWorkingSlots = Maps.newHashMap(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java index fc3bbb28485c54..4e91ec5be3b8b8 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java @@ -23,6 +23,7 @@ import org.apache.doris.catalog.Env; import org.apache.doris.catalog.HashDistributionInfo; import org.apache.doris.catalog.KeysType; +import org.apache.doris.catalog.LocalTabletInvertedIndex; import org.apache.doris.catalog.MaterializedIndex; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Partition; @@ -85,7 +86,7 @@ public class RebalanceTest { private OlapTable olapTable; private final SystemInfoService systemInfoService = new SystemInfoService(); - private final TabletInvertedIndex invertedIndex = new TabletInvertedIndex(); + private final TabletInvertedIndex invertedIndex = new LocalTabletInvertedIndex(); private Map statisticMap; @Before diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletReplicaTooSlowTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletReplicaTooSlowTest.java index dde4be35ef63f6..7375d8173546cb 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletReplicaTooSlowTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletReplicaTooSlowTest.java @@ -19,6 +19,7 @@ import org.apache.doris.catalog.DiskInfo; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.LocalTabletInvertedIndex; import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.common.Config; @@ -69,7 +70,7 @@ public class TabletReplicaTooSlowTest { private long id = 10086; private final SystemInfoService systemInfoService = new SystemInfoService(); - private final TabletInvertedIndex invertedIndex = new TabletInvertedIndex(); + private final TabletInvertedIndex invertedIndex = new LocalTabletInvertedIndex(); private Table statisticMap; @BeforeClass diff --git a/fe/fe-core/src/test/java/org/apache/doris/cluster/SystemInfoServiceTest.java b/fe/fe-core/src/test/java/org/apache/doris/cluster/SystemInfoServiceTest.java index 7b7e85137a2ada..5c5b077ab83596 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/cluster/SystemInfoServiceTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/cluster/SystemInfoServiceTest.java @@ -19,6 +19,7 @@ import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.LocalTabletInvertedIndex; import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.common.AnalysisException; @@ -120,7 +121,7 @@ public void setUp() throws IOException { minTimes = 0; result = systemInfoService; - invertedIndex = new TabletInvertedIndex(); + invertedIndex = new LocalTabletInvertedIndex(); Env.getCurrentInvertedIndex(); minTimes = 0; result = invertedIndex; diff --git a/fe/fe-core/src/test/java/org/apache/doris/http/DorisHttpTestCase.java b/fe/fe-core/src/test/java/org/apache/doris/http/DorisHttpTestCase.java index a57f3b497e9750..f51917bfe0fd39 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/http/DorisHttpTestCase.java +++ b/fe/fe-core/src/test/java/org/apache/doris/http/DorisHttpTestCase.java @@ -28,6 +28,7 @@ import org.apache.doris.catalog.KeysType; import org.apache.doris.catalog.LocalReplica; import org.apache.doris.catalog.LocalTablet; +import org.apache.doris.catalog.LocalTabletInvertedIndex; import org.apache.doris.catalog.MaterializedIndex; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Partition; @@ -364,7 +365,7 @@ public static void afterClass() { public void setUp() { Env env = newDelegateCatalog(); SystemInfoService systemInfoService = new SystemInfoService(); - TabletInvertedIndex tabletInvertedIndex = new TabletInvertedIndex(); + TabletInvertedIndex tabletInvertedIndex = new LocalTabletInvertedIndex(); new MockUp() { @Mock SchemaChangeHandler getSchemaChangeHandler() { diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/CreateTableCommandTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/CreateTableCommandTest.java index 04e4bc043c69b4..93770ae7c58320 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/CreateTableCommandTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/CreateTableCommandTest.java @@ -30,7 +30,6 @@ import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.ReplicaAllocation; import org.apache.doris.catalog.ScalarType; -import org.apache.doris.catalog.TabletMeta; import org.apache.doris.catalog.Type; import org.apache.doris.common.ConfigBase; import org.apache.doris.common.ConfigException; @@ -54,7 +53,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.function.Executable; -import java.util.HashSet; import java.util.List; import java.util.Set; @@ -89,8 +87,6 @@ public void testDuplicateCreateTable() throws Exception { + "properties('replication_num' = '1','colocate_with'='test'); "; createTable(sql); Set tabletIdSetAfterCreateFirstTable = env.getTabletInvertedIndex().getReplicaMetaTable().rowKeySet(); - Set tabletMetaSetBeforeCreateFirstTable = - new HashSet<>(env.getTabletInvertedIndex().getTabletMetaTable().values()); Set colocateTableIdBeforeCreateFirstTable = env.getColocateTableIndex().getTable2Group().keySet(); Assertions.assertTrue(colocateTableIdBeforeCreateFirstTable.size() > 0); Assertions.assertTrue(tabletIdSetAfterCreateFirstTable.size() > 0); @@ -102,13 +98,10 @@ public void testDuplicateCreateTable() throws Exception { Set tabletIdSetAfterDuplicateCreateTable2 = env.getTabletInvertedIndex().getBackingReplicaMetaTable() .columnKeySet(); Set tabletIdSetAfterDuplicateCreateTable3 = env.getTabletInvertedIndex().getTabletMetaMap().keySet(); - Set tabletIdSetAfterDuplicateCreateTable4 = - new HashSet<>(env.getTabletInvertedIndex().getTabletMetaTable().values()); Assertions.assertEquals(tabletIdSetAfterCreateFirstTable, tabletIdSetAfterDuplicateCreateTable1); Assertions.assertEquals(tabletIdSetAfterCreateFirstTable, tabletIdSetAfterDuplicateCreateTable2); Assertions.assertEquals(tabletIdSetAfterCreateFirstTable, tabletIdSetAfterDuplicateCreateTable3); - Assertions.assertEquals(tabletMetaSetBeforeCreateFirstTable, tabletIdSetAfterDuplicateCreateTable4); // check whether table id is cleared from colocate group after duplicate create table Set colocateTableIdAfterCreateFirstTable = env.getColocateTableIndex().getTable2Group().keySet();