Skip to content

Commit

Permalink
[Enhance](multi-catalog) Use MetaIdMappingsLog
Browse files Browse the repository at this point in the history
  to replace InitCatalogLog/InitDatabaseLog.
  • Loading branch information
wangxiangyu committed Feb 20, 2024
1 parent 78c83d2 commit b59d7d4
Show file tree
Hide file tree
Showing 10 changed files with 85 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ protected void initForMaster() {

MetaIdMappingsLog log = new MetaIdMappingsLog();
log.setCatalogId(id);
log.setFromInitCtl(true);
log.setType(MetaIdMappingsLog.TYPE_FROM_INIT_CATALOG);
log.setLastUpdateTime(System.currentTimeMillis());
for (String dbName : allDatabases) {
if (!dbName.equals(InfoSchemaDb.DATABASE_NAME)) {
Expand Down Expand Up @@ -296,12 +296,17 @@ public void initForAllNodes(long lastUpdateTime) {
ExternalMetaIdMgr metaIdMgr = Env.getCurrentEnv().getExternalMetaIdMgr();
ExternalMetaIdMgr.CtlMetaIdMgr ctlMetaIdMgr = metaIdMgr.getCtlMetaIdMgr(id);
if (ctlMetaIdMgr != null) {
// use a temp map container
Map<Long, ExternalDatabase<? extends ExternalTable>> tmpIdToDb = Maps.newConcurrentMap();
Map<String, Long> tmpDbNameToId = Maps.newConcurrentMap();
Map<String, ExternalMetaIdMgr.DbMetaIdMgr> dbNameToMgr = ctlMetaIdMgr.getDbNameToMgr();
for (String dbName : dbNameToMgr.keySet()) {
ExternalDatabase<? extends ExternalTable> db = getDbForInit(dbName, dbNameToMgr.get(dbName).dbId, type);
idToDb.put(db.getId(), db);
dbNameToId.put(ClusterNamespace.getNameFromFullName(db.getFullName()), db.getId());
tmpIdToDb.put(db.getId(), db);
tmpDbNameToId.put(ClusterNamespace.getNameFromFullName(db.getFullName()), db.getId());
}
this.idToDb = tmpIdToDb;
this.dbNameToId = tmpDbNameToId;
}
this.lastUpdateTime = lastUpdateTime;
this.initialized = true;
Expand Down Expand Up @@ -345,7 +350,7 @@ public String getName() {

@Override
public String getType() {
return logType.name().toLowerCase(Locale.ROOT);
return type.name().toLowerCase(Locale.ROOT);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ public abstract class ExternalDatabase<T extends ExternalTable> implements Datab

protected boolean invalidCacheInInit = true;

protected Map<Long, T> idToTbl = Maps.newConcurrentMap();
protected Map<String, Long> tableNameToId = Maps.newConcurrentMap();
protected volatile Map<Long, T> idToTbl = Maps.newConcurrentMap();
protected volatile Map<String, Long> tableNameToId = Maps.newConcurrentMap();

/**
* Create external database.
Expand Down Expand Up @@ -127,7 +127,7 @@ protected void initForMaster() {

MetaIdMappingsLog log = new MetaIdMappingsLog();
log.setCatalogId(extCatalog.getId());
log.setFromInitDb(true);
log.setType(MetaIdMappingsLog.TYPE_FROM_INIT_DATABASE);
log.setLastUpdateTime(System.currentTimeMillis());
if (CollectionUtils.isNotEmpty(tableNames)) {
for (String tableName : tableNames) {
Expand All @@ -146,12 +146,17 @@ public void initForAllNodes(long lastUpdateTime) {
ExternalMetaIdMgr metaIdMgr = Env.getCurrentEnv().getExternalMetaIdMgr();
ExternalMetaIdMgr.DbMetaIdMgr dbMetaIdMgr = metaIdMgr.getDbMetaIdMgr(extCatalog.getId(), name);
if (dbMetaIdMgr != null) {
// use a temp map container
Map<Long, T> tmpIdToTbl = Maps.newConcurrentMap();
Map<String, Long> tmpTableNameToId = Maps.newConcurrentMap();
Map<String, ExternalMetaIdMgr.TblMetaIdMgr> tblNameToMgr = dbMetaIdMgr.getTblNameToMgr();
for (String tableName : tblNameToMgr.keySet()) {
T table = getExternalTable(tableName, tblNameToMgr.get(tableName).getTblId(), extCatalog);
idToTbl.put(table.getId(), table);
tableNameToId.put(tableName, table.getId());
tmpIdToTbl.put(table.getId(), table);
tmpTableNameToId.put(tableName, table.getId());
}
this.idToTbl = tmpIdToTbl;
this.tableNameToId = tmpTableNameToId;
}
this.lastUpdateTime = lastUpdateTime;
this.initialized = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,28 +112,59 @@ private PartitionMetaIdMgr getPartitionMetaIdMgr(long catalogId, String dbName,
public void replayMetaIdMappingsLog(@NotNull MetaIdMappingsLog log) {
Preconditions.checkNotNull(log);
long catalogId = log.getCatalogId();
CtlMetaIdMgr ctlMetaIdMgr = idToCtlMgr.computeIfAbsent(catalogId, CtlMetaIdMgr::new);
for (MetaIdMappingsLog.MetaIdMapping mapping : log.getMetaIdMappings()) {
handleMetaIdMapping(mapping, ctlMetaIdMgr);
}

CatalogIf<?> catalogIf = Env.getCurrentEnv().getCatalogMgr().getCatalog(log.getCatalogId());
CatalogIf<?> catalogIf = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId);
if (catalogIf == null) {
return;
}

if (log.isFromInitCtl()) {
((ExternalCatalog) catalogIf).initForAllNodes(log.getLastUpdateTime());
} else if (log.isFromInitDb()) {
@SuppressWarnings("rawtypes")
ExternalDatabase db = (ExternalDatabase) catalogIf.getDbNullable(log.getDbId());
if (db != null) {
db.initForAllNodes(log.getLastUpdateTime());
}
} else if (log.isFromHmsEvent()) {
MetastoreEventsProcessor metastoreEventsProcessor = Env.getCurrentEnv().getMetastoreEventsProcessor();
metastoreEventsProcessor.updateMasterLastSyncedEventId(
(HMSExternalCatalog) catalogIf, log.getLastSyncedEventId());
CtlMetaIdMgr ctlMetaIdMgr = idToCtlMgr.computeIfAbsent(catalogId, CtlMetaIdMgr::new);
CtlMetaIdMgr tmpCtlMetaIdMgr;
switch (log.getType()) {
case MetaIdMappingsLog.TYPE_FROM_INIT_CATALOG:
// use a new CtlMetaIdMgr to handle these logs, and replace the old one
tmpCtlMetaIdMgr = new CtlMetaIdMgr(catalogId);
for (MetaIdMappingsLog.MetaIdMapping mapping : log.getMetaIdMappings()) {
handleMetaIdMapping(mapping, tmpCtlMetaIdMgr);
}
idToCtlMgr.put(catalogId, tmpCtlMetaIdMgr);
// do the extra init operations
((ExternalCatalog) catalogIf).initForAllNodes(log.getLastUpdateTime());
break;

case MetaIdMappingsLog.TYPE_FROM_INIT_DATABASE:
// use a new CtlMetaIdMgr to handle these logs, and replace the old one
tmpCtlMetaIdMgr = new CtlMetaIdMgr(catalogId);
for (MetaIdMappingsLog.MetaIdMapping mapping : log.getMetaIdMappings()) {
handleMetaIdMapping(mapping, tmpCtlMetaIdMgr);
}
for (String dbName : ctlMetaIdMgr.dbNameToMgr.keySet()) {
// put the dbMetaIdMgr of ctlMetaIdMgr to tmpCtlMetaIdMgr which not contains
if (!tmpCtlMetaIdMgr.dbNameToMgr.containsKey(dbName)) {
tmpCtlMetaIdMgr.dbNameToMgr.put(dbName, ctlMetaIdMgr.dbNameToMgr.get(dbName));
}
}
idToCtlMgr.put(catalogId, tmpCtlMetaIdMgr);
// do the extra init operations
@SuppressWarnings("rawtypes")
ExternalDatabase db = (ExternalDatabase) catalogIf.getDbNullable(log.getDbId());
if (db != null) {
db.initForAllNodes(log.getLastUpdateTime());
}
break;

case MetaIdMappingsLog.TYPE_FROM_HMS_EVENT:
// handle these logs serialized
for (MetaIdMappingsLog.MetaIdMapping mapping : log.getMetaIdMappings()) {
handleMetaIdMapping(mapping, ctlMetaIdMgr);
}

MetastoreEventsProcessor metastoreEventsProcessor = Env.getCurrentEnv().getMetastoreEventsProcessor();
metastoreEventsProcessor.updateMasterLastSyncedEventId(
(HMSExternalCatalog) catalogIf, log.getLastSyncedEventId());
break;

default:
// do nothing
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,22 @@ public class MetaIdMappingsLog implements Writable {
public static final short META_OBJECT_TYPE_TABLE = 2;
public static final short META_OBJECT_TYPE_PARTITION = 3;

public static final short TYPE_UNKNOWN = 0;
// log is generated by hms event
public static final short TYPE_FROM_HMS_EVENT = 1;
// log is generated by init catalog
public static final short TYPE_FROM_INIT_CATALOG = 2;
// log is generated by init database
public static final short TYPE_FROM_INIT_DATABASE = 3;

@SerializedName(value = "ctlId")
private long catalogId = -1L;

@SerializedName(value = "dbId")
private long dbId = -1L;

@SerializedName(value = "fromEvent")
private boolean fromHmsEvent = false;

@SerializedName(value = "fromInitCtl")
private boolean fromInitCtl = false;

@SerializedName(value = "fromInitDb")
private boolean fromInitDb = false;
@SerializedName(value = "type")
private short type = TYPE_UNKNOWN;

@SerializedName(value = "lastUpdateTime")
private long lastUpdateTime = -1L;
Expand All @@ -85,7 +87,7 @@ public boolean equals(Object obj) {
return false;
}
return Objects.equals(this.catalogId, ((MetaIdMappingsLog) obj).catalogId)
&& Objects.equals(this.fromHmsEvent, ((MetaIdMappingsLog) obj).fromHmsEvent)
&& Objects.equals(this.type, ((MetaIdMappingsLog) obj).type)
&& Objects.equals(this.lastSyncedEventId, ((MetaIdMappingsLog) obj).lastSyncedEventId)
&& Objects.equals(this.metaIdMappings, ((MetaIdMappingsLog) obj).metaIdMappings);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.doris.common.DdlException;
import org.apache.doris.datasource.CatalogProperty;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.InitCatalogLog;
import org.apache.doris.datasource.SessionContext;

import com.google.common.collect.ImmutableList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.ExternalDatabase;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.InitCatalogLog;
import org.apache.doris.datasource.SessionContext;
import org.apache.doris.datasource.jdbc.client.JdbcClientConfig;
import org.apache.doris.datasource.property.PropertyConverter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ List<MetastoreEvent> getMetastoreEvents(List<NotificationEvent> events, HMSExter
private void logMetaIdMappings(long catalogId, long lastSyncedEventId, List<MetastoreEvent> mergedEvents) {
MetaIdMappingsLog log = new MetaIdMappingsLog();
log.setCatalogId(catalogId);
log.setFromHmsEvent(true);
log.setType(MetaIdMappingsLog.TYPE_FROM_HMS_EVENT);
log.setLastSyncedEventId(lastSyncedEventId);
for (MetastoreEvent event : mergedEvents) {
log.addMetaIdMappings(event.transferToMetaIdMappings());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.doris.common.DdlException;
import org.apache.doris.datasource.CatalogProperty;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.InitCatalogLog;
import org.apache.doris.datasource.SessionContext;
import org.apache.doris.datasource.credentials.CloudCredential;
import org.apache.doris.datasource.property.constants.MCProperties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public void testReplayMetaIdMappingsLog() {
ExternalMetaIdMgr mgr = new ExternalMetaIdMgr();
MetaIdMappingsLog log1 = new MetaIdMappingsLog();
log1.setCatalogId(1L);
log1.setFromHmsEvent(false);
log1.setType(MetaIdMappingsLog.TYPE_FROM_HMS_EVENT);
log1.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping(
MetaIdMappingsLog.OPERATION_TYPE_ADD,
MetaIdMappingsLog.META_OBJECT_TYPE_DATABASE,
Expand All @@ -37,7 +37,7 @@ public void testReplayMetaIdMappingsLog() {

MetaIdMappingsLog log2 = new MetaIdMappingsLog();
log2.setCatalogId(1L);
log2.setFromHmsEvent(false);
log2.setType(MetaIdMappingsLog.TYPE_FROM_HMS_EVENT);
log2.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping(
MetaIdMappingsLog.OPERATION_TYPE_DELETE,
MetaIdMappingsLog.META_OBJECT_TYPE_DATABASE,
Expand All @@ -47,7 +47,7 @@ public void testReplayMetaIdMappingsLog() {

MetaIdMappingsLog log3 = new MetaIdMappingsLog();
log3.setCatalogId(1L);
log3.setFromHmsEvent(false);
log3.setType(MetaIdMappingsLog.TYPE_FROM_HMS_EVENT);
log3.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping(
MetaIdMappingsLog.OPERATION_TYPE_ADD,
MetaIdMappingsLog.META_OBJECT_TYPE_TABLE,
Expand All @@ -58,7 +58,7 @@ public void testReplayMetaIdMappingsLog() {

MetaIdMappingsLog log4 = new MetaIdMappingsLog();
log4.setCatalogId(1L);
log4.setFromHmsEvent(false);
log4.setType(MetaIdMappingsLog.TYPE_FROM_HMS_EVENT);
log4.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping(
MetaIdMappingsLog.OPERATION_TYPE_DELETE,
MetaIdMappingsLog.META_OBJECT_TYPE_TABLE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public void testSerialization() throws Exception {
MetaIdMappingsLog log1 = new MetaIdMappingsLog();
Path path = Files.createFile(Paths.get("./metaIdMappingsLogTest.txt"));
try (DataOutputStream dos = new DataOutputStream(Files.newOutputStream(path))) {
log1.setFromHmsEvent(true);
log1.setType(MetaIdMappingsLog.TYPE_FROM_HMS_EVENT);
log1.setLastSyncedEventId(-1L);
log1.setCatalogId(1L);
log1.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping(
Expand Down

0 comments on commit b59d7d4

Please sign in to comment.