diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/src/main/java/org/apache/doris/catalog/Catalog.java index b68c0d9e4bc985..75df268aff0f69 100755 --- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java @@ -1803,10 +1803,15 @@ public long loadTransactionState(DataInputStream dis, long checksum) throws IOEx public long loadRecycleBin(DataInputStream dis, long checksum) throws IOException { if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_10) { - Catalog.getCurrentRecycleBin().readFields(dis); + recycleBin.readFields(dis); if (!isCheckpointThread()) { // add tablet in Recycle bin to TabletInvertedIndex - Catalog.getCurrentRecycleBin().addTabletToInvertedIndex(); + recycleBin.addTabletToInvertedIndex(); + } + // create DatabaseTransactionMgr for db in recycle bin. + // these dbs do not exist in `idToDb` of the catalog. + for (Long dbId : recycleBin.getAllDbIds()) { + globalTransactionMgr.addDatabaseTransactionMgr(dbId); } } return checksum; diff --git a/fe/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java b/fe/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java index fae59a0ffb5f21..230790855df9ea 100644 --- a/fe/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java +++ b/fe/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java @@ -36,6 +36,7 @@ import org.apache.doris.thrift.TStorageMedium; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Range; import com.google.common.collect.Sets; @@ -172,6 +173,9 @@ private synchronized void eraseDatabaseWithSameName(String dbName) { iterator.remove(); idToRecycleTime.remove(entry.getKey()); + // remove database transaction manager + Catalog.getCurrentCatalog().getGlobalTransactionMgr().removeDatabaseTransactionMgr(db.getId()); + LOG.info("erase database[{}] name: {}", db.getId(), dbName); } } @@ -887,4 +891,9 @@ public void readFields(DataInput in) throws IOException { } } } + + // currently only used when loading image. So no synchronized protected. + public List getAllDbIds() { + return Lists.newArrayList(idToDatabase.keySet()); + } } diff --git a/fe/src/main/java/org/apache/doris/qe/AuditEventProcessor.java b/fe/src/main/java/org/apache/doris/qe/AuditEventProcessor.java index bc621e796896de..a648dc988b130e 100644 --- a/fe/src/main/java/org/apache/doris/qe/AuditEventProcessor.java +++ b/fe/src/main/java/org/apache/doris/qe/AuditEventProcessor.java @@ -56,6 +56,7 @@ public AuditEventProcessor(PluginMgr pluginMgr) { public void start() { workerThread = new Thread(new Worker(), "AuditEventProcessor"); + workerThread.setDaemon(true); workerThread.start(); } diff --git a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index 5f8fd92fd59f46..456c9277b98fb8 100644 --- a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -79,11 +79,15 @@ public DatabaseTransactionMgr getDatabaseTransactionMgr(long dbId) throws Analys } public void addDatabaseTransactionMgr(Long dbId) { - dbIdToDatabaseTransactionMgrs.putIfAbsent(dbId, new DatabaseTransactionMgr(dbId, catalog, idGenerator)); + if (dbIdToDatabaseTransactionMgrs.putIfAbsent(dbId, new DatabaseTransactionMgr(dbId, catalog, idGenerator)) == null) { + LOG.debug("add database transaction manager for db {}", dbId); + } } public void removeDatabaseTransactionMgr(Long dbId) { - dbIdToDatabaseTransactionMgrs.remove(dbId); + if (dbIdToDatabaseTransactionMgrs.remove(dbId) != null) { + LOG.debug("remove database transaction manager for db {}", dbId); + } } public long beginTransaction(long dbId, List tableIdList, String label, TxnCoordinator coordinator, LoadJobSourceType sourceType, @@ -355,19 +359,19 @@ public void write(DataOutput out) throws IOException { } public void readFields(DataInput in) throws IOException { - try { - int numTransactions = in.readInt(); - for (int i = 0; i < numTransactions; ++i) { - TransactionState transactionState = new TransactionState(); - transactionState.readFields(in); + int numTransactions = in.readInt(); + for (int i = 0; i < numTransactions; ++i) { + TransactionState transactionState = new TransactionState(); + transactionState.readFields(in); + try { DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(transactionState.getDbId()); dbTransactionMgr.unprotectUpsertTransactionState(transactionState, true); + } catch (AnalysisException e) { + LOG.warn("failed to get db transaction manager for txn: {}", transactionState); + throw new IOException("Read transaction states failed", e); } - idGenerator.readFields(in); - } catch (AnalysisException e) { - throw new IOException("Read transaction states failed", e); } - + idGenerator.readFields(in); } public TransactionState getTransactionStateByCallbackIdAndStatus(long dbId, long callbackId, Set status) {