Skip to content

Commit

Permalink
[Enhance](multi-catalog) Use MetaIdMappingsLog
Browse files Browse the repository at this point in the history
  to replace InitCatalogLog/InitDatabaseLog.
  • Loading branch information
wangxiangyu committed Feb 21, 2024
1 parent 4a99382 commit 7f2470c
Showing 1 changed file with 0 additions and 149 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,11 @@

package org.apache.doris.catalog.constraint;

import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.es.EsExternalCatalog;
import org.apache.doris.datasource.es.EsExternalDatabase;
import org.apache.doris.datasource.es.EsExternalTable;
import org.apache.doris.journal.JournalEntity;
import org.apache.doris.nereids.util.PlanPatternMatchSupported;
import org.apache.doris.nereids.util.RelationUtil;
Expand All @@ -37,7 +30,6 @@
import org.apache.doris.persist.OperationType;
import org.apache.doris.utframe.TestWithFeService;

import com.google.common.collect.ImmutableList;
import mockit.Mock;
import mockit.MockUp;
import org.apache.hadoop.util.Lists;
Expand All @@ -51,7 +43,6 @@
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
Expand Down Expand Up @@ -213,144 +204,4 @@ void constraintWithTablePersistTest() throws Exception {
dropConstraint("alter table t2 drop constraint pk");
dropConstraint("alter table t1 drop constraint uk");
}

@Test
void externalTableTest() throws Exception {
ExternalTable externalTable = new ExternalTable();
try {
externalTable.addPrimaryKeyConstraint("pk", ImmutableList.of("col"), false);
} catch (Exception ignore) {
// ignore
}
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
DataOutput output = new DataOutputStream(outputStream);
externalTable.write(output);
InputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
DataInput input = new DataInputStream(inputStream);
TableIf loadTable = ExternalTable.read(input);
Assertions.assertEquals(1, loadTable.getConstraintsMap().size());
}

@Test
void addConstraintLogPersistForExternalTableTest() throws Exception {
Config.edit_log_type = "local";
createCatalog("create catalog es properties('type' = 'es', 'elasticsearch.hosts' = 'http://192.168.0.1',"
+ " 'elasticsearch.username' = 'user1');");

Env.getCurrentEnv().changeCatalog(connectContext, "es");
EsExternalCatalog esCatalog = (EsExternalCatalog) getCatalog("es");
EsExternalDatabase db = new EsExternalDatabase(esCatalog, 10002, "es_db1");
EsExternalTable tbl = new EsExternalTable(10003, "es_tbl1", "es_db1", esCatalog);
ImmutableList<Column> schema = ImmutableList.of(new Column("k1", PrimitiveType.INT));
tbl.setNewFullSchema(schema);
db.addTableForTest(tbl);
esCatalog.addDatabaseForTest(db);
Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(esCatalog).addSchemaForTest(db.getFullName(), tbl.getName(), schema);
new MockUp<RelationUtil>() {
@Mock
public TableIf getTable(List<String> qualifierName, Env env) {
return tbl;
}
};

new MockUp<ExternalTable>() {
@Mock
public DatabaseIf getDatabase() {
return db;
}
};

new MockUp<TableIdentifier>() {
@Mock
public TableIf toTableIf() {
return tbl;
}
};

addConstraint("alter table es.es_db1.es_tbl1 add constraint pk primary key (k1)");
addConstraint("alter table es.es_db1.es_tbl1 add constraint uk unique (k1)");
TableIf tableIf = RelationUtil.getTable(
RelationUtil.getQualifierName(connectContext, Lists.newArrayList("test", "t1")),
connectContext.getEnv());
Map<String, Constraint> constraintMap = tableIf.getConstraintsMap();
tableIf.getConstraintsMapUnsafe().clear();
Assertions.assertTrue(tableIf.getConstraintsMap().isEmpty());
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
DataOutput output = new DataOutputStream(outputStream);
for (Constraint value : new ArrayList<>(constraintMap.values())) {
JournalEntity journalEntity = new JournalEntity();
journalEntity.setData(new AlterConstraintLog(value, tableIf));
journalEntity.setOpCode(OperationType.OP_ADD_CONSTRAINT);
journalEntity.write(output);
}
InputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
DataInput input = new DataInputStream(inputStream);
for (int i = 0; i < constraintMap.values().size(); i++) {
JournalEntity journalEntity = new JournalEntity();
journalEntity.readFields(input);
EditLog.loadJournal(Env.getCurrentEnv(), 0L, journalEntity);
}
Assertions.assertEquals(tableIf.getConstraintsMap(), constraintMap);
Env.getCurrentEnv().changeCatalog(connectContext, "internal");
}

@Test
void dropConstraintLogPersistForExternalTest() throws Exception {
Config.edit_log_type = "local";
createCatalog("create catalog es2 properties('type' = 'es', 'elasticsearch.hosts' = 'http://192.168.0.1',"
+ " 'elasticsearch.username' = 'user1');");

Env.getCurrentEnv().changeCatalog(connectContext, "es2");
EsExternalCatalog esCatalog = (EsExternalCatalog) getCatalog("es2");
EsExternalDatabase db = new EsExternalDatabase(esCatalog, 10002, "es_db1");
EsExternalTable tbl = new EsExternalTable(10003, "es_tbl1", "es_db1", esCatalog);
ImmutableList<Column> schema = ImmutableList.of(new Column("k1", PrimitiveType.INT));
tbl.setNewFullSchema(schema);
db.addTableForTest(tbl);
esCatalog.addDatabaseForTest(db);
Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(esCatalog).addSchemaForTest(db.getFullName(), tbl.getName(), schema);
new MockUp<RelationUtil>() {
@Mock
public TableIf getTable(List<String> qualifierName, Env env) {
return tbl;
}
};

new MockUp<ExternalTable>() {
@Mock
public DatabaseIf getDatabase() {
return db;
}
};

new MockUp<TableIdentifier>() {
@Mock
public TableIf toTableIf() {
return tbl;
}
};
addConstraint("alter table es.es_db1.es_tbl1 add constraint pk primary key (k1)");
addConstraint("alter table es.es_db1.es_tbl1 add constraint uk unique (k1)");
TableIf tableIf = RelationUtil.getTable(
RelationUtil.getQualifierName(connectContext, Lists.newArrayList("test", "t1")),
connectContext.getEnv());
Map<String, Constraint> constraintMap = tableIf.getConstraintsMap();
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
DataOutput output = new DataOutputStream(outputStream);
for (Constraint value : constraintMap.values()) {
JournalEntity journalEntity = new JournalEntity();
journalEntity.setData(new AlterConstraintLog(value, tableIf));
journalEntity.setOpCode(OperationType.OP_DROP_CONSTRAINT);
journalEntity.write(output);
}
InputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
DataInput input = new DataInputStream(inputStream);
for (int i = 0; i < constraintMap.values().size(); i++) {
JournalEntity journalEntity = new JournalEntity();
journalEntity.readFields(input);
EditLog.loadJournal(Env.getCurrentEnv(), 0L, journalEntity);
}
Assertions.assertTrue(tableIf.getConstraintsMap().isEmpty());
Env.getCurrentEnv().changeCatalog(connectContext, "internal");
}
}

0 comments on commit 7f2470c

Please sign in to comment.