diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogConfigCache.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogConfigCache.java new file mode 100644 index 00000000000000..c414b853078012 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogConfigCache.java @@ -0,0 +1,146 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.binlog; + +import org.apache.doris.catalog.BinlogConfig; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Table; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class BinlogConfigCache { + private static final Logger LOG = LogManager.getLogger(BinlogConfigCache.class); + + private Map dbTableBinlogEnableMap; // db or table all use id + private ReentrantReadWriteLock lock; + + public BinlogConfigCache() { + dbTableBinlogEnableMap = new HashMap(); + lock = new ReentrantReadWriteLock(); + } + + public BinlogConfig getDBBinlogConfig(long dbId) { + lock.readLock().lock(); + BinlogConfig binlogConfig = dbTableBinlogEnableMap.get(dbId); + lock.readLock().unlock(); + if (binlogConfig != null) { + return binlogConfig; + } + + lock.writeLock().lock(); + try { + Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId); + if (db == null) { + LOG.warn("db not found. dbId: {}", dbId); + return null; + } + + binlogConfig = db.getBinlogConfig(); + dbTableBinlogEnableMap.put(dbId, binlogConfig); + } finally { + lock.writeLock().unlock(); + } + return binlogConfig; + } + + public boolean isEnableDB(long dbId) { + BinlogConfig dBinlogConfig = getDBBinlogConfig(dbId); + if (dBinlogConfig == null) { + return false; + } + return dBinlogConfig.isEnable(); + } + + public long getDBTtlSeconds(long dbId) { + BinlogConfig dBinlogConfig = getDBBinlogConfig(dbId); + if (dBinlogConfig == null) { + return BinlogConfig.TTL_SECONDS; + } + return dBinlogConfig.getTtlSeconds(); + } + + public BinlogConfig getTableBinlogConfig(long dbId, long tableId) { + lock.readLock().lock(); + BinlogConfig tableBinlogConfig = dbTableBinlogEnableMap.get(tableId); + lock.readLock().unlock(); + if (tableBinlogConfig != null) { + return tableBinlogConfig; + } + + lock.writeLock().lock(); + try { + Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId); + if (db == null) { + LOG.warn("db not found. dbId: {}", dbId); + return null; + } + + Table table = db.getTableOrMetaException(tableId); + if (table == null) { + LOG.warn("fail to get table. db: {}, table id: {}", db.getFullName(), tableId); + return null; + } + if (!(table instanceof OlapTable)) { + LOG.warn("table is not olap table. db: {}, table id: {}", db.getFullName(), tableId); + return null; + } + + OlapTable olapTable = (OlapTable) table; + tableBinlogConfig = olapTable.getBinlogConfig(); + dbTableBinlogEnableMap.put(tableId, tableBinlogConfig); + return tableBinlogConfig; + } catch (Exception e) { + LOG.warn("fail to get table. db: {}, table id: {}", dbId, tableId); + return null; + } finally { + lock.writeLock().unlock(); + } + } + + public boolean isEnableTable(long dbId, long tableId) { + BinlogConfig tableBinlogConfig = getTableBinlogConfig(dbId, tableId); + if (tableBinlogConfig == null) { + return false; + } + return tableBinlogConfig.isEnable(); + } + + public long getTableTtlSeconds(long dbId, long tableId) { + BinlogConfig tableBinlogConfig = getTableBinlogConfig(dbId, tableId); + if (tableBinlogConfig == null) { + return BinlogConfig.TTL_SECONDS; + } + return tableBinlogConfig.getTtlSeconds(); + } + + public void remove(long id) { + lock.writeLock().lock(); + try { + dbTableBinlogEnableMap.remove(id); + } finally { + lock.writeLock().unlock(); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogGcer.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogGcer.java index 96d41946ffb10a..6dbe47ea220ac5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogGcer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogGcer.java @@ -41,7 +41,7 @@ public class BinlogGcer extends MasterDaemon { private static final Logger LOG = LogManager.getLogger(BinlogGcer.class); - private static final long GC_DURATION_MS = 313 * 1000L; // 313s + private static final long GC_DURATION_MS = 15 * 1000L; // 15s // TODO(Drogon): use this to control gc frequency by real gc time waste sample private long lastGcTime = 0L; diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java index 11075c4fc44473..43a95ed28ea266 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java @@ -22,8 +22,10 @@ import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; import org.apache.doris.common.Pair; +import org.apache.doris.persist.AlterDatabasePropertyInfo; import org.apache.doris.persist.BinlogGcInfo; import org.apache.doris.persist.DropPartitionInfo; +import org.apache.doris.persist.ModifyTablePropertyOperationLog; import org.apache.doris.persist.TableAddOrDropColumnsInfo; import org.apache.doris.thrift.TBinlog; import org.apache.doris.thrift.TBinlogType; @@ -53,43 +55,63 @@ public class BinlogManager { private ReentrantReadWriteLock lock; private Map dbBinlogMap; + private BinlogConfigCache binlogConfigCache; public BinlogManager() { lock = new ReentrantReadWriteLock(); dbBinlogMap = Maps.newHashMap(); + binlogConfigCache = new BinlogConfigCache(); } - private void addBinlog(TBinlog binlog) { - if (!Config.enable_feature_binlog) { + private void afterAddBinlog(TBinlog binlog) { + if (!binlog.isSetRemoveEnableCache()) { + return; + } + if (!binlog.isRemoveEnableCache()) { return; } - // find db BinlogConfig long dbId = binlog.getDbId(); - Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId); - if (db == null) { - LOG.warn("db not found. dbId: {}", dbId); + boolean onlyDb = true; + if (binlog.isSetTableIds()) { + for (long tableId : binlog.getTableIds()) { + binlogConfigCache.remove(tableId); + onlyDb = false; + } + } + if (onlyDb) { + binlogConfigCache.remove(dbId); + } + } + + private void addBinlog(TBinlog binlog) { + if (!Config.enable_feature_binlog) { return; } - boolean dbBinlogEnable = db.getBinlogConfig().isEnable(); DBBinlog dbBinlog; lock.writeLock().lock(); try { + long dbId = binlog.getDbId(); dbBinlog = dbBinlogMap.get(dbId); + if (dbBinlog == null) { - dbBinlog = new DBBinlog(binlog); + dbBinlog = new DBBinlog(binlogConfigCache, binlog); dbBinlogMap.put(dbId, dbBinlog); } } finally { lock.writeLock().unlock(); } - dbBinlog.addBinlog(binlog, dbBinlogEnable); + dbBinlog.addBinlog(binlog); } private void addBinlog(long dbId, List tableIds, long commitSeq, long timestamp, TBinlogType type, - String data) { + String data, boolean removeEnableCache) { + if (!Config.enable_feature_binlog) { + return; + } + TBinlog binlog = new TBinlog(); // set commitSeq, timestamp, type, dbId, data binlog.setCommitSeq(commitSeq); @@ -101,7 +123,26 @@ private void addBinlog(long dbId, List tableIds, long commitSeq, long time binlog.setTableIds(tableIds); } binlog.setTableRef(0); - addBinlog(binlog); + binlog.setRemoveEnableCache(removeEnableCache); + + // Check if all db or table binlog is disable, return + boolean dbBinlogEnable = binlogConfigCache.isEnableDB(dbId); + boolean anyEnable = dbBinlogEnable; + if (tableIds != null) { + for (long tableId : tableIds) { + boolean tableBinlogEnable = binlogConfigCache.isEnableTable(dbId, tableId); + anyEnable = anyEnable || tableBinlogEnable; + if (anyEnable) { + break; + } + } + } + + if (anyEnable) { + addBinlog(binlog); + } + + afterAddBinlog(binlog); } public void addUpsertRecord(UpsertRecord upsertRecord) { @@ -112,7 +153,7 @@ public void addUpsertRecord(UpsertRecord upsertRecord) { TBinlogType type = TBinlogType.UPSERT; String data = upsertRecord.toJson(); - addBinlog(dbId, tableIds, commitSeq, timestamp, type, data); + addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false); } public void addAddPartitionRecord(AddPartitionRecord addPartitionRecord) { @@ -124,7 +165,7 @@ public void addAddPartitionRecord(AddPartitionRecord addPartitionRecord) { TBinlogType type = TBinlogType.ADD_PARTITION; String data = addPartitionRecord.toJson(); - addBinlog(dbId, tableIds, commitSeq, timestamp, type, data); + addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false); } public void addCreateTableRecord(CreateTableRecord createTableRecord) { @@ -136,7 +177,7 @@ public void addCreateTableRecord(CreateTableRecord createTableRecord) { TBinlogType type = TBinlogType.CREATE_TABLE; String data = createTableRecord.toJson(); - addBinlog(dbId, tableIds, commitSeq, timestamp, type, data); + addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false); } public void addDropPartitionRecord(DropPartitionInfo dropPartitionInfo, long commitSeq) { @@ -147,7 +188,7 @@ public void addDropPartitionRecord(DropPartitionInfo dropPartitionInfo, long com TBinlogType type = TBinlogType.DROP_PARTITION; String data = dropPartitionInfo.toJson(); - addBinlog(dbId, tableIds, commitSeq, timestamp, type, data); + addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false); } public void addDropTableRecord(DropTableRecord record) { @@ -159,7 +200,7 @@ public void addDropTableRecord(DropTableRecord record) { TBinlogType type = TBinlogType.DROP_TABLE; String data = record.toJson(); - addBinlog(dbId, tableIds, commitSeq, timestamp, type, data); + addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false); } public void addAlterJobV2(AlterJobV2 alterJob, long commitSeq) { @@ -170,7 +211,7 @@ public void addAlterJobV2(AlterJobV2 alterJob, long commitSeq) { TBinlogType type = TBinlogType.ALTER_JOB; String data = alterJob.toJson(); - addBinlog(dbId, tableIds, commitSeq, timestamp, type, data); + addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false); } public void addModifyTableAddOrDropColumns(TableAddOrDropColumnsInfo info, long commitSeq) { @@ -181,7 +222,29 @@ public void addModifyTableAddOrDropColumns(TableAddOrDropColumnsInfo info, long TBinlogType type = TBinlogType.MODIFY_TABLE_ADD_OR_DROP_COLUMNS; String data = info.toJson(); - addBinlog(dbId, tableIds, commitSeq, timestamp, type, data); + addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false); + } + + public void addAlterDatabaseProperty(AlterDatabasePropertyInfo info, long commitSeq) { + long dbId = info.getDbId(); + List tableIds = Lists.newArrayList(); + + long timestamp = -1; + TBinlogType type = TBinlogType.ALTER_DATABASE_PROPERTY; + String data = info.toJson(); + + addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, true); + } + + public void addModifyTableProperty(ModifyTablePropertyOperationLog info, long commitSeq) { + long dbId = info.getDbId(); + List tableIds = Lists.newArrayList(); + tableIds.add(info.getTableId()); + long timestamp = -1; + TBinlogType type = TBinlogType.MODIFY_TABLE_PROPERTY; + String data = info.toJson(); + + addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, true); } // get binlog by dbId, return first binlog.version > version @@ -383,7 +446,8 @@ public long read(DataInputStream dis, long checksum) throws IOException { if (binlog.getType() == TBinlogType.DUMMY) { // collect tableDummyBinlogs and dbDummyBinlog to recover DBBinlog and TableBinlog if (binlog.getBelong() == -1) { - DBBinlog dbBinlog = DBBinlog.recoverDbBinlog(binlog, tableDummies, currentDbBinlogEnable); + DBBinlog dbBinlog = DBBinlog.recoverDbBinlog(binlogConfigCache, binlog, tableDummies, + currentDbBinlogEnable); dbBinlogMap.put(dbId, dbBinlog); } else { tableDummies.add(binlog); diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogTombstone.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogTombstone.java index 2b6e3cb8e143a4..48d5e04244cbae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogTombstone.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogTombstone.java @@ -92,7 +92,7 @@ public long getDbId() { return dbId; } - // TODO(deadlinefen): delete this code later + // TODO(deadlinefen): deprecated this code later public List getTableIds() { if (tableIds == null) { tableIds = Collections.emptyList(); @@ -102,7 +102,7 @@ public List getTableIds() { public Map getTableCommitSeqMap() { if (tableCommitSeqMap == null) { - tableCommitSeqMap = Collections.emptyMap(); + tableCommitSeqMap = Maps.newHashMap(); } return tableCommitSeqMap; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java index 30cbfd0e15cc5e..4e134104b6d49c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java @@ -17,24 +17,15 @@ package org.apache.doris.binlog; -import org.apache.doris.catalog.Database; -import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.OlapTable; -import org.apache.doris.catalog.Table; import org.apache.doris.common.Pair; import org.apache.doris.thrift.TBinlog; import org.apache.doris.thrift.TBinlogType; import org.apache.doris.thrift.TStatus; import org.apache.doris.thrift.TStatusCode; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - import java.util.TreeSet; public class BinlogUtils { - private static final Logger LOG = LogManager.getLogger(BinlogUtils.class); - public static Pair getBinlog(TreeSet binlogs, long prevCommitSeq) { TStatus status = new TStatus(TStatusCode.OK); TBinlog firstBinlog = binlogs.first(); @@ -90,33 +81,6 @@ public static TBinlog newDummyBinlog(long dbId, long tableId) { return dummy; } - public static boolean tableEnabledBinlog(long dbId, long tableId) { - Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId); - if (db == null) { - LOG.error("db not found. dbId: {}", dbId); - return false; - } - - OlapTable table; - try { - Table tbl = db.getTableOrMetaException(tableId); - if (tbl == null) { - LOG.warn("fail to get table. db: {}, table id: {}", db.getFullName(), tableId); - return false; - } - if (!(tbl instanceof OlapTable)) { - LOG.warn("table is not olap table. db: {}, table id: {}", db.getFullName(), tableId); - return false; - } - table = (OlapTable) tbl; - } catch (Exception e) { - LOG.warn("fail to get table. db: {}, table id: {}", db.getFullName(), tableId); - return false; - } - - return table.getBinlogConfig().isEnable(); - } - public static long getExpiredMs(long ttlSeconds) { long currentSeconds = System.currentTimeMillis() / 1000; if (currentSeconds < ttlSeconds) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java index 151c5e5be90d59..35134eca87d0bf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java @@ -17,8 +17,7 @@ package org.apache.doris.binlog; -import org.apache.doris.catalog.Database; -import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.BinlogConfig; import org.apache.doris.common.Pair; import org.apache.doris.thrift.TBinlog; import org.apache.doris.thrift.TBinlogType; @@ -55,9 +54,12 @@ public class DBBinlog { private List tableDummyBinlogs; - public DBBinlog(TBinlog binlog) { + private BinlogConfigCache binlogConfigCache; + + public DBBinlog(BinlogConfigCache binlogConfigCache, TBinlog binlog) { lock = new ReentrantReadWriteLock(); this.dbId = binlog.getDbId(); + this.binlogConfigCache = binlogConfigCache; // allBinlogs treeset order by commitSeq allBinlogs = Sets.newTreeSet(Comparator.comparingLong(TBinlog::getCommitSeq)); @@ -74,14 +76,16 @@ public DBBinlog(TBinlog binlog) { allBinlogs.add(dummy); } - public static DBBinlog recoverDbBinlog(TBinlog dbDummy, List tableDummies, boolean dbBinlogEnable) { - DBBinlog dbBinlog = new DBBinlog(dbDummy); + public static DBBinlog recoverDbBinlog(BinlogConfigCache binlogConfigCache, TBinlog dbDummy, + List tableDummies, boolean dbBinlogEnable) { + DBBinlog dbBinlog = new DBBinlog(binlogConfigCache, dbDummy); + long dbId = dbDummy.getDbId(); for (TBinlog tableDummy : tableDummies) { long tableId = tableDummy.getBelong(); - if (!dbBinlogEnable && !BinlogUtils.tableEnabledBinlog(dbBinlog.getDbId(), tableId)) { + if (!dbBinlogEnable && !binlogConfigCache.isEnableTable(dbId, tableId)) { continue; } - dbBinlog.tableBinlogMap.put(tableId, new TableBinlog(tableDummy, tableId)); + dbBinlog.tableBinlogMap.put(tableId, new TableBinlog(binlogConfigCache, tableDummy, dbId, tableId)); dbBinlog.tableDummyBinlogs.add(tableDummy); } @@ -111,11 +115,12 @@ public void recoverBinlog(TBinlog binlog, boolean dbBinlogEnable) { } } + // TODO(Drogon): remove TableBinlog after DropTable, think table drop && recovery private TableBinlog getTableBinlog(TBinlog binlog, long tableId, boolean dbBinlogEnable) { TableBinlog tableBinlog = tableBinlogMap.get(tableId); if (tableBinlog == null) { - if (dbBinlogEnable || BinlogUtils.tableEnabledBinlog(dbId, tableId)) { - tableBinlog = new TableBinlog(binlog, tableId); + if (dbBinlogEnable || binlogConfigCache.isEnableTable(dbId, tableId)) { + tableBinlog = new TableBinlog(binlogConfigCache, binlog, dbId, tableId); tableBinlogMap.put(tableId, tableBinlog); tableDummyBinlogs.add(tableBinlog.getDummyBinlog()); } @@ -123,21 +128,25 @@ private TableBinlog getTableBinlog(TBinlog binlog, long tableId, boolean dbBinlo return tableBinlog; } - public void addBinlog(TBinlog binlog, boolean dbBinlogEnable) { + // guard by BinlogManager, if addBinlog called, more than one(db/tables) enable binlog + public void addBinlog(TBinlog binlog) { + boolean dbBinlogEnable = binlogConfigCache.isEnableDB(dbId); List tableIds = binlog.getTableIds(); + lock.writeLock().lock(); try { + allBinlogs.add(binlog); + if (binlog.getTimestamp() > 0 && dbBinlogEnable) { timestamps.add(Pair.of(binlog.getCommitSeq(), binlog.getTimestamp())); } - allBinlogs.add(binlog); - if (tableIds == null) { return; } // HACK: for metadata fix + // we should not add binlog for create table and drop table in table binlog if (!binlog.isSetType()) { return; } @@ -205,22 +214,22 @@ public Pair getBinlogLag(long tableId, long prevCommitSeq) { public BinlogTombstone gc() { // check db - Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId); - if (db == null) { + BinlogConfig dbBinlogConfig = binlogConfigCache.getDBBinlogConfig(dbId); + if (dbBinlogConfig == null) { LOG.error("db not found. dbId: {}", dbId); return null; } - boolean dbBinlogEnable = db.getBinlogConfig().isEnable(); + boolean dbBinlogEnable = dbBinlogConfig.isEnable(); BinlogTombstone tombstone; if (dbBinlogEnable) { // db binlog is enabled, only one binlogTombstones - long ttlSeconds = db.getBinlogConfig().getTtlSeconds(); + long ttlSeconds = dbBinlogConfig.getTtlSeconds(); long expiredMs = BinlogUtils.getExpiredMs(ttlSeconds); tombstone = dbBinlogEnableGc(expiredMs); } else { - tombstone = dbBinlogDisableGc(db); + tombstone = dbBinlogDisableGc(); } return tombstone; @@ -250,7 +259,7 @@ private BinlogTombstone collectTableTombstone(List tableTombsto return dbTombstone; } - private BinlogTombstone dbBinlogDisableGc(Database db) { + private BinlogTombstone dbBinlogDisableGc() { List tombstones = Lists.newArrayList(); List tableBinlogs; @@ -262,7 +271,7 @@ private BinlogTombstone dbBinlogDisableGc(Database db) { } for (TableBinlog tableBinlog : tableBinlogs) { - BinlogTombstone tombstone = tableBinlog.gc(db); + BinlogTombstone tombstone = tableBinlog.ttlGc(); if (tombstone != null) { tombstones.add(tombstone); } @@ -348,7 +357,7 @@ private BinlogTombstone dbBinlogEnableGc(long expiredMs) { List tableTombstones = Lists.newArrayList(); for (TableBinlog tableBinlog : tableBinlogMap.values()) { // step 2.1: gc tableBinlogļ¼Œand get table tombstone - BinlogTombstone tableTombstone = tableBinlog.gc(expiredCommitSeq); + BinlogTombstone tableTombstone = tableBinlog.commitSeqGc(expiredCommitSeq); if (tableTombstone != null) { tableTombstones.add(tableTombstone); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java index 8934084e99f059..0857ae7abb1922 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java @@ -17,9 +17,7 @@ package org.apache.doris.binlog; -import org.apache.doris.catalog.Database; -import org.apache.doris.catalog.OlapTable; -import org.apache.doris.catalog.Table; +import org.apache.doris.catalog.BinlogConfig; import org.apache.doris.common.Pair; import org.apache.doris.thrift.TBinlog; import org.apache.doris.thrift.TBinlogType; @@ -37,11 +35,14 @@ public class TableBinlog { private static final Logger LOG = LogManager.getLogger(TableBinlog.class); + private long dbId; private long tableId; private ReentrantReadWriteLock lock; private TreeSet binlogs; + private BinlogConfigCache binlogConfigCache; - public TableBinlog(TBinlog binlog, long tableId) { + public TableBinlog(BinlogConfigCache binlogConfigCache, TBinlog binlog, long dbId, long tableId) { + this.dbId = dbId; this.tableId = tableId; lock = new ReentrantReadWriteLock(); binlogs = Sets.newTreeSet(Comparator.comparingLong(TBinlog::getCommitSeq)); @@ -53,6 +54,7 @@ public TableBinlog(TBinlog binlog, long tableId) { dummy = BinlogUtils.newDummyBinlog(binlog.getDbId(), tableId); } binlogs.add(dummy); + this.binlogConfigCache = binlogConfigCache; } public TBinlog getDummyBinlog() { @@ -100,7 +102,7 @@ public Pair getBinlogLag(long prevCommitSeq) { } } - private Pair getLastUpsertAndLargestCommitSeq(long expired, BinlogComparator check) { + private Pair getLastUpsertAndLargestCommitSeq(long expired, BinlogComparator checker) { if (binlogs.size() <= 1) { return null; } @@ -111,7 +113,7 @@ private Pair getLastUpsertAndLargestCommitSeq(long expired, Binlo TBinlog lastExpiredBinlog = null; while (iter.hasNext()) { TBinlog binlog = iter.next(); - if (check.isExpired(binlog, expired)) { + if (checker.isExpired(binlog, expired)) { lastExpiredBinlog = binlog; --binlog.table_ref; if (binlog.getType() == TBinlogType.UPSERT) { @@ -133,7 +135,7 @@ private Pair getLastUpsertAndLargestCommitSeq(long expired, Binlo } // this method call when db binlog enable - public BinlogTombstone gc(long expiredCommitSeq) { + public BinlogTombstone commitSeqGc(long expiredCommitSeq) { Pair tombstoneInfo; // step 1: get tombstoneUpsertBinlog and dummyBinlog @@ -163,31 +165,20 @@ public BinlogTombstone gc(long expiredCommitSeq) { } // this method call when db binlog disable - public BinlogTombstone gc(Database db) { + public BinlogTombstone ttlGc() { // step 1: get expire time - OlapTable table; - try { - Table tbl = db.getTableOrMetaException(tableId); - if (tbl == null) { - LOG.warn("fail to get table. db: {}, table id: {}", db.getFullName(), tableId); - return null; - } - if (!(tbl instanceof OlapTable)) { - LOG.warn("table is not olap table. db: {}, table id: {}", db.getFullName(), tableId); - return null; - } - table = (OlapTable) tbl; - } catch (Exception e) { - LOG.warn("fail to get table. db: {}, table id: {}", db.getFullName(), tableId); + BinlogConfig tableBinlogConfig = binlogConfigCache.getTableBinlogConfig(dbId, tableId); + if (tableBinlogConfig == null) { return null; } - long ttlSeconds = table.getBinlogConfig().getTtlSeconds(); + long ttlSeconds = tableBinlogConfig.getTtlSeconds(); long expiredMs = BinlogUtils.getExpiredMs(ttlSeconds); if (expiredMs < 0) { return null; } + LOG.info("ttl gc. dbId: {}, tableId: {}, expiredMs: {}", dbId, tableId, expiredMs); // step 2: get tombstoneUpsertBinlog and dummyBinlog Pair tombstoneInfo; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 967e129728a2cb..e956cf9f0ab6d0 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -4400,8 +4400,9 @@ public void modifyTableDynamicPartition(Database db, OlapTable table, Map properties) { Preconditions.checkArgument(table.isWriteLockHeldByCurrentThread()); table.setReplicaAllocation(properties); - // log - ModifyTablePropertyOperationLog info = new ModifyTablePropertyOperationLog(db.getId(), table.getId(), - properties); + ModifyTablePropertyOperationLog info = + new ModifyTablePropertyOperationLog(db.getId(), table.getId(), table.getName(), + properties); editLog.logModifyReplicationNum(info); LOG.debug("modify table[{}] replication num to {}", table.getName(), properties.get(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM)); @@ -4500,8 +4501,9 @@ public void modifyTableProperties(Database db, OlapTable table, Map properties = stmt.getProperties(); db.writeLockOrDdlException(); @@ -768,7 +769,7 @@ public void alterDatabaseProperty(AlterDatabasePropertyStmt stmt) throws DdlExce return; } - AlterDatabasePropertyInfo info = new AlterDatabasePropertyInfo(dbName, properties); + AlterDatabasePropertyInfo info = new AlterDatabasePropertyInfo(dbId, dbName, properties); Env.getCurrentEnv().getEditLog().logAlterDatabaseProperty(info); } finally { db.writeUnlock(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/AlterDatabasePropertyInfo.java b/fe/fe-core/src/main/java/org/apache/doris/persist/AlterDatabasePropertyInfo.java index 56858c86a667bb..5f1c2bec5b2321 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/AlterDatabasePropertyInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/AlterDatabasePropertyInfo.java @@ -29,6 +29,9 @@ import java.util.Map; public class AlterDatabasePropertyInfo implements Writable { + @SerializedName(value = "dbId") + private long dbId; + @SerializedName(value = "dbName") private String dbName; @@ -41,11 +44,16 @@ public AlterDatabasePropertyInfo() { this.properties = null; } - public AlterDatabasePropertyInfo(String dbName, Map properties) { + public AlterDatabasePropertyInfo(long dbId, String dbName, Map properties) { + this.dbId = dbId; this.dbName = dbName; this.properties = properties; } + public long getDbId() { + return dbId; + } + public String getDbName() { return dbName; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index eda39bc4f692e8..9b5a54323e11f2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -750,6 +750,7 @@ public static void loadJournal(Env env, Long logId, JournalEntity journal) { case OperationType.OP_MODIFY_REPLICATION_NUM: { ModifyTablePropertyOperationLog log = (ModifyTablePropertyOperationLog) journal.getData(); env.replayModifyTableProperty(opCode, log); + env.getBinlogManager().addModifyTableProperty(log, logId); break; } case OperationType.OP_MODIFY_DISTRIBUTION_BUCKET_NUM: { @@ -1039,6 +1040,7 @@ public static void loadJournal(Env env, Long logId, JournalEntity journal) { LOG.info("replay alter database property: {}", alterDatabasePropertyInfo); env.replayAlterDatabaseProperty(alterDatabasePropertyInfo.getDbName(), alterDatabasePropertyInfo.getProperties()); + env.getBinlogManager().addAlterDatabaseProperty(alterDatabasePropertyInfo, logId); break; } case OperationType.OP_GC_BINLOG: { @@ -1623,24 +1625,30 @@ public void logModifyDistributionType(TableInfo tableInfo) { logEdit(OperationType.OP_MODIFY_DISTRIBUTION_TYPE, tableInfo); } + private long logModifyTableProperty(short op, ModifyTablePropertyOperationLog info) { + long logId = logEdit(op, info); + Env.getCurrentEnv().getBinlogManager().addModifyTableProperty(info, logId); + return logId; + } + public void logDynamicPartition(ModifyTablePropertyOperationLog info) { - logEdit(OperationType.OP_DYNAMIC_PARTITION, info); + logModifyTableProperty(OperationType.OP_DYNAMIC_PARTITION, info); } - public void logModifyReplicationNum(ModifyTablePropertyOperationLog info) { - logEdit(OperationType.OP_MODIFY_REPLICATION_NUM, info); + public long logModifyReplicationNum(ModifyTablePropertyOperationLog info) { + return logModifyTableProperty(OperationType.OP_MODIFY_REPLICATION_NUM, info); } public void logModifyDefaultDistributionBucketNum(ModifyTableDefaultDistributionBucketNumOperationLog info) { logEdit(OperationType.OP_MODIFY_DISTRIBUTION_BUCKET_NUM, info); } - public void logModifyInMemory(ModifyTablePropertyOperationLog info) { - logEdit(OperationType.OP_MODIFY_IN_MEMORY, info); + public long logModifyInMemory(ModifyTablePropertyOperationLog info) { + return logModifyTableProperty(OperationType.OP_MODIFY_IN_MEMORY, info); } - public void logUpdateBinlogConfig(ModifyTablePropertyOperationLog info) { - logEdit(OperationType.OP_UPDATE_BINLOG_CONFIG, info); + public long logUpdateBinlogConfig(ModifyTablePropertyOperationLog info) { + return logModifyTableProperty(OperationType.OP_UPDATE_BINLOG_CONFIG, info); } public void logAlterLightSchemaChange(AlterLightSchemaChangeInfo info) { @@ -1829,7 +1837,9 @@ public void logDeleteAnalysisTask(AnalyzeDeletionLog log) { } public long logAlterDatabaseProperty(AlterDatabasePropertyInfo log) { - return logEdit(OperationType.OP_ALTER_DATABASE_PROPERTY, log); + long logId = logEdit(OperationType.OP_ALTER_DATABASE_PROPERTY, log); + Env.getCurrentEnv().getBinlogManager().addAlterDatabaseProperty(log, logId); + return logId; } public long logGcBinlog(BinlogGcInfo log) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/ModifyTablePropertyOperationLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/ModifyTablePropertyOperationLog.java index f5a0a5d59df8e2..a782db9f9c069e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/ModifyTablePropertyOperationLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/ModifyTablePropertyOperationLog.java @@ -35,13 +35,27 @@ public class ModifyTablePropertyOperationLog implements Writable { private long dbId; @SerializedName(value = "tableId") private long tableId; + @SerializedName(value = "tableName") + private String tableName; @SerializedName(value = "properties") private Map properties = new HashMap<>(); + @SerializedName(value = "sql") + private String sql; - public ModifyTablePropertyOperationLog(long dbId, long tableId, Map properties) { + public ModifyTablePropertyOperationLog(long dbId, long tableId, String tableName, Map properties) { this.dbId = dbId; this.tableId = tableId; + this.tableName = tableName; this.properties = properties; + + StringBuilder sb = new StringBuilder(); + sb.append("SET ("); + for (Map.Entry entry : properties.entrySet()) { + sb.append(entry.getKey()).append("=").append(entry.getValue()).append(","); + } + sb.deleteCharAt(sb.length() - 1); // remove last ',' + sb.append(")"); + this.sql = sb.toString(); } public long getDbId() { @@ -64,4 +78,8 @@ public void write(DataOutput out) throws IOException { public static ModifyTablePropertyOperationLog read(DataInput in) throws IOException { return GsonUtils.GSON.fromJson(Text.readString(in), ModifyTablePropertyOperationLog.class); } + + public String toJson() { + return GsonUtils.GSON.toJson(this); + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/persist/ModifyDynamicPartitionInfoTest.java b/fe/fe-core/src/test/java/org/apache/doris/persist/ModifyDynamicPartitionInfoTest.java index 87ae23470a5898..bff50dcf768e0f 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/persist/ModifyDynamicPartitionInfoTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/persist/ModifyDynamicPartitionInfoTest.java @@ -54,7 +54,7 @@ public void testNormal() throws IOException { properties.put(DynamicPartitionProperty.END, "3"); properties.put(DynamicPartitionProperty.PREFIX, "p"); properties.put(DynamicPartitionProperty.BUCKETS, "30"); - ModifyTablePropertyOperationLog modifyDynamicPartitionInfo = new ModifyTablePropertyOperationLog(100L, 200L, properties); + ModifyTablePropertyOperationLog modifyDynamicPartitionInfo = new ModifyTablePropertyOperationLog(100L, 200L, "test", properties); modifyDynamicPartitionInfo.write(out); out.flush(); out.close(); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 06c3d96c722ee3..0ea96a15f480b8 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -965,6 +965,8 @@ enum TBinlogType { ALTER_JOB = 5, MODIFY_TABLE_ADD_OR_DROP_COLUMNS = 6, DUMMY = 7, + ALTER_DATABASE_PROPERTY = 8, + MODIFY_TABLE_PROPERTY = 9, } struct TBinlog { @@ -976,6 +978,7 @@ struct TBinlog { 6: optional string data 7: optional i64 belong // belong == -1 if type is not DUMMY 8: optional i64 table_ref // only use for gc + 9: optional bool remove_enable_cache } struct TGetBinlogResult {