Skip to content

Commit

Permalink
[Enhancement](binlog) Add binlog enable diable check in BinlogManager (
Browse files Browse the repository at this point in the history
…#22173)

Signed-off-by: Jack Drogon <jack.xsuperman@gmail.com>
  • Loading branch information
JackDrogon authored Jul 27, 2023
1 parent a87d34b commit 816fd50
Show file tree
Hide file tree
Showing 14 changed files with 339 additions and 122 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.binlog;

import org.apache.doris.catalog.BinlogConfig;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class BinlogConfigCache {
private static final Logger LOG = LogManager.getLogger(BinlogConfigCache.class);

private Map<Long, BinlogConfig> dbTableBinlogEnableMap; // db or table all use id
private ReentrantReadWriteLock lock;

public BinlogConfigCache() {
dbTableBinlogEnableMap = new HashMap<Long, BinlogConfig>();
lock = new ReentrantReadWriteLock();
}

public BinlogConfig getDBBinlogConfig(long dbId) {
lock.readLock().lock();
BinlogConfig binlogConfig = dbTableBinlogEnableMap.get(dbId);
lock.readLock().unlock();
if (binlogConfig != null) {
return binlogConfig;
}

lock.writeLock().lock();
try {
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
if (db == null) {
LOG.warn("db not found. dbId: {}", dbId);
return null;
}

binlogConfig = db.getBinlogConfig();
dbTableBinlogEnableMap.put(dbId, binlogConfig);
} finally {
lock.writeLock().unlock();
}
return binlogConfig;
}

public boolean isEnableDB(long dbId) {
BinlogConfig dBinlogConfig = getDBBinlogConfig(dbId);
if (dBinlogConfig == null) {
return false;
}
return dBinlogConfig.isEnable();
}

public long getDBTtlSeconds(long dbId) {
BinlogConfig dBinlogConfig = getDBBinlogConfig(dbId);
if (dBinlogConfig == null) {
return BinlogConfig.TTL_SECONDS;
}
return dBinlogConfig.getTtlSeconds();
}

public BinlogConfig getTableBinlogConfig(long dbId, long tableId) {
lock.readLock().lock();
BinlogConfig tableBinlogConfig = dbTableBinlogEnableMap.get(tableId);
lock.readLock().unlock();
if (tableBinlogConfig != null) {
return tableBinlogConfig;
}

lock.writeLock().lock();
try {
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
if (db == null) {
LOG.warn("db not found. dbId: {}", dbId);
return null;
}

Table table = db.getTableOrMetaException(tableId);
if (table == null) {
LOG.warn("fail to get table. db: {}, table id: {}", db.getFullName(), tableId);
return null;
}
if (!(table instanceof OlapTable)) {
LOG.warn("table is not olap table. db: {}, table id: {}", db.getFullName(), tableId);
return null;
}

OlapTable olapTable = (OlapTable) table;
tableBinlogConfig = olapTable.getBinlogConfig();
dbTableBinlogEnableMap.put(tableId, tableBinlogConfig);
return tableBinlogConfig;
} catch (Exception e) {
LOG.warn("fail to get table. db: {}, table id: {}", dbId, tableId);
return null;
} finally {
lock.writeLock().unlock();
}
}

public boolean isEnableTable(long dbId, long tableId) {
BinlogConfig tableBinlogConfig = getTableBinlogConfig(dbId, tableId);
if (tableBinlogConfig == null) {
return false;
}
return tableBinlogConfig.isEnable();
}

public long getTableTtlSeconds(long dbId, long tableId) {
BinlogConfig tableBinlogConfig = getTableBinlogConfig(dbId, tableId);
if (tableBinlogConfig == null) {
return BinlogConfig.TTL_SECONDS;
}
return tableBinlogConfig.getTtlSeconds();
}

public void remove(long id) {
lock.writeLock().lock();
try {
dbTableBinlogEnableMap.remove(id);
} finally {
lock.writeLock().unlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@

public class BinlogGcer extends MasterDaemon {
private static final Logger LOG = LogManager.getLogger(BinlogGcer.class);
private static final long GC_DURATION_MS = 313 * 1000L; // 313s
private static final long GC_DURATION_MS = 15 * 1000L; // 15s

// TODO(Drogon): use this to control gc frequency by real gc time waste sample
private long lastGcTime = 0L;
Expand Down
102 changes: 83 additions & 19 deletions fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.persist.AlterDatabasePropertyInfo;
import org.apache.doris.persist.BinlogGcInfo;
import org.apache.doris.persist.DropPartitionInfo;
import org.apache.doris.persist.ModifyTablePropertyOperationLog;
import org.apache.doris.persist.TableAddOrDropColumnsInfo;
import org.apache.doris.thrift.TBinlog;
import org.apache.doris.thrift.TBinlogType;
Expand Down Expand Up @@ -53,43 +55,63 @@ public class BinlogManager {

private ReentrantReadWriteLock lock;
private Map<Long, DBBinlog> dbBinlogMap;
private BinlogConfigCache binlogConfigCache;

public BinlogManager() {
lock = new ReentrantReadWriteLock();
dbBinlogMap = Maps.newHashMap();
binlogConfigCache = new BinlogConfigCache();
}

private void addBinlog(TBinlog binlog) {
if (!Config.enable_feature_binlog) {
private void afterAddBinlog(TBinlog binlog) {
if (!binlog.isSetRemoveEnableCache()) {
return;
}
if (!binlog.isRemoveEnableCache()) {
return;
}

// find db BinlogConfig
long dbId = binlog.getDbId();
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
if (db == null) {
LOG.warn("db not found. dbId: {}", dbId);
boolean onlyDb = true;
if (binlog.isSetTableIds()) {
for (long tableId : binlog.getTableIds()) {
binlogConfigCache.remove(tableId);
onlyDb = false;
}
}
if (onlyDb) {
binlogConfigCache.remove(dbId);
}
}

private void addBinlog(TBinlog binlog) {
if (!Config.enable_feature_binlog) {
return;
}
boolean dbBinlogEnable = db.getBinlogConfig().isEnable();

DBBinlog dbBinlog;
lock.writeLock().lock();
try {
long dbId = binlog.getDbId();
dbBinlog = dbBinlogMap.get(dbId);

if (dbBinlog == null) {
dbBinlog = new DBBinlog(binlog);
dbBinlog = new DBBinlog(binlogConfigCache, binlog);
dbBinlogMap.put(dbId, dbBinlog);
}
} finally {
lock.writeLock().unlock();
}

dbBinlog.addBinlog(binlog, dbBinlogEnable);
dbBinlog.addBinlog(binlog);
}

private void addBinlog(long dbId, List<Long> tableIds, long commitSeq, long timestamp, TBinlogType type,
String data) {
String data, boolean removeEnableCache) {
if (!Config.enable_feature_binlog) {
return;
}

TBinlog binlog = new TBinlog();
// set commitSeq, timestamp, type, dbId, data
binlog.setCommitSeq(commitSeq);
Expand All @@ -101,7 +123,26 @@ private void addBinlog(long dbId, List<Long> tableIds, long commitSeq, long time
binlog.setTableIds(tableIds);
}
binlog.setTableRef(0);
addBinlog(binlog);
binlog.setRemoveEnableCache(removeEnableCache);

// Check if all db or table binlog is disable, return
boolean dbBinlogEnable = binlogConfigCache.isEnableDB(dbId);
boolean anyEnable = dbBinlogEnable;
if (tableIds != null) {
for (long tableId : tableIds) {
boolean tableBinlogEnable = binlogConfigCache.isEnableTable(dbId, tableId);
anyEnable = anyEnable || tableBinlogEnable;
if (anyEnable) {
break;
}
}
}

if (anyEnable) {
addBinlog(binlog);
}

afterAddBinlog(binlog);
}

public void addUpsertRecord(UpsertRecord upsertRecord) {
Expand All @@ -112,7 +153,7 @@ public void addUpsertRecord(UpsertRecord upsertRecord) {
TBinlogType type = TBinlogType.UPSERT;
String data = upsertRecord.toJson();

addBinlog(dbId, tableIds, commitSeq, timestamp, type, data);
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false);
}

public void addAddPartitionRecord(AddPartitionRecord addPartitionRecord) {
Expand All @@ -124,7 +165,7 @@ public void addAddPartitionRecord(AddPartitionRecord addPartitionRecord) {
TBinlogType type = TBinlogType.ADD_PARTITION;
String data = addPartitionRecord.toJson();

addBinlog(dbId, tableIds, commitSeq, timestamp, type, data);
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false);
}

public void addCreateTableRecord(CreateTableRecord createTableRecord) {
Expand All @@ -136,7 +177,7 @@ public void addCreateTableRecord(CreateTableRecord createTableRecord) {
TBinlogType type = TBinlogType.CREATE_TABLE;
String data = createTableRecord.toJson();

addBinlog(dbId, tableIds, commitSeq, timestamp, type, data);
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false);
}

public void addDropPartitionRecord(DropPartitionInfo dropPartitionInfo, long commitSeq) {
Expand All @@ -147,7 +188,7 @@ public void addDropPartitionRecord(DropPartitionInfo dropPartitionInfo, long com
TBinlogType type = TBinlogType.DROP_PARTITION;
String data = dropPartitionInfo.toJson();

addBinlog(dbId, tableIds, commitSeq, timestamp, type, data);
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false);
}

public void addDropTableRecord(DropTableRecord record) {
Expand All @@ -159,7 +200,7 @@ public void addDropTableRecord(DropTableRecord record) {
TBinlogType type = TBinlogType.DROP_TABLE;
String data = record.toJson();

addBinlog(dbId, tableIds, commitSeq, timestamp, type, data);
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false);
}

public void addAlterJobV2(AlterJobV2 alterJob, long commitSeq) {
Expand All @@ -170,7 +211,7 @@ public void addAlterJobV2(AlterJobV2 alterJob, long commitSeq) {
TBinlogType type = TBinlogType.ALTER_JOB;
String data = alterJob.toJson();

addBinlog(dbId, tableIds, commitSeq, timestamp, type, data);
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false);
}

public void addModifyTableAddOrDropColumns(TableAddOrDropColumnsInfo info, long commitSeq) {
Expand All @@ -181,7 +222,29 @@ public void addModifyTableAddOrDropColumns(TableAddOrDropColumnsInfo info, long
TBinlogType type = TBinlogType.MODIFY_TABLE_ADD_OR_DROP_COLUMNS;
String data = info.toJson();

addBinlog(dbId, tableIds, commitSeq, timestamp, type, data);
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false);
}

public void addAlterDatabaseProperty(AlterDatabasePropertyInfo info, long commitSeq) {
long dbId = info.getDbId();
List<Long> tableIds = Lists.newArrayList();

long timestamp = -1;
TBinlogType type = TBinlogType.ALTER_DATABASE_PROPERTY;
String data = info.toJson();

addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, true);
}

public void addModifyTableProperty(ModifyTablePropertyOperationLog info, long commitSeq) {
long dbId = info.getDbId();
List<Long> tableIds = Lists.newArrayList();
tableIds.add(info.getTableId());
long timestamp = -1;
TBinlogType type = TBinlogType.MODIFY_TABLE_PROPERTY;
String data = info.toJson();

addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, true);
}

// get binlog by dbId, return first binlog.version > version
Expand Down Expand Up @@ -383,7 +446,8 @@ public long read(DataInputStream dis, long checksum) throws IOException {
if (binlog.getType() == TBinlogType.DUMMY) {
// collect tableDummyBinlogs and dbDummyBinlog to recover DBBinlog and TableBinlog
if (binlog.getBelong() == -1) {
DBBinlog dbBinlog = DBBinlog.recoverDbBinlog(binlog, tableDummies, currentDbBinlogEnable);
DBBinlog dbBinlog = DBBinlog.recoverDbBinlog(binlogConfigCache, binlog, tableDummies,
currentDbBinlogEnable);
dbBinlogMap.put(dbId, dbBinlog);
} else {
tableDummies.add(binlog);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public long getDbId() {
return dbId;
}

// TODO(deadlinefen): delete this code later
// TODO(deadlinefen): deprecated this code later
public List<Long> getTableIds() {
if (tableIds == null) {
tableIds = Collections.emptyList();
Expand All @@ -102,7 +102,7 @@ public List<Long> getTableIds() {

public Map<Long, Long> getTableCommitSeqMap() {
if (tableCommitSeqMap == null) {
tableCommitSeqMap = Collections.emptyMap();
tableCommitSeqMap = Maps.newHashMap();
}
return tableCommitSeqMap;
}
Expand Down
Loading

0 comments on commit 816fd50

Please sign in to comment.