Skip to content

Commit

Permalink
fix ut
Browse files Browse the repository at this point in the history
  • Loading branch information
morningman committed May 12, 2022
1 parent 2bb1e36 commit 2c24f50
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ public final class FeMetaVersion {
// add row policy
public static final int VERSION_109 = 109;
// For routine load user info
public static final int VERSION_109 = 110;
// note: when increment meta version, should assign the latest version to VERSION_CURRENT
public static final int VERSION_110 = 110;
// NOTE: when increment meta version, should assign the latest version to VERSION_CURRENT
public static final int VERSION_CURRENT = VERSION_110;

// all logs meta version should >= the minimum version, so that we could remove many if clause, for example
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1633,7 +1633,7 @@ public void readFields(DataInput in) throws IOException {
throw new IOException("error happens when parsing create routine load stmt: " + origStmt.originStmt, e);
}

if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_109) {
if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_110) {
if (in.readBoolean()) {
userIdentity = UserIdentity.read(in);
userIdentity.setIsAnalyzed();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.doris.analysis.PartitionNames;
import org.apache.doris.analysis.RoutineLoadDataSourceProperties;
import org.apache.doris.analysis.Separator;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
Expand All @@ -47,12 +48,7 @@
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import mockit.Expectations;
import mockit.Injectable;
import mockit.Mock;
import mockit.MockUp;
import mockit.Mocked;
import mockit.Verifications;

import org.apache.kafka.common.PartitionInfo;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -65,6 +61,12 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
import mockit.Expectations;
import mockit.Injectable;
import mockit.Mock;
import mockit.MockUp;
import mockit.Mocked;
import mockit.Verifications;

public class KafkaRoutineLoadJobTest {
private static final Logger LOG = LogManager.getLogger(KafkaRoutineLoadJobTest.class);
Expand Down Expand Up @@ -129,25 +131,25 @@ public void testRoutineLoadTaskConcurrentNum(@Injectable PartitionInfo partition
// 2 partitions, 1 be
RoutineLoadJob routineLoadJob =
new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", clusterName1, 1L,
1L, "127.0.0.1:9020", "topic1");
1L, "127.0.0.1:9020", "topic1", UserIdentity.ADMIN);
Deencapsulation.setField(routineLoadJob, "currentKafkaPartitions", partitionList1);
Assert.assertEquals(2, routineLoadJob.calculateCurrentConcurrentTaskNum());

// 3 partitions, 4 be
routineLoadJob = new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", clusterName2, 1L,
1L, "127.0.0.1:9020", "topic1");
1L, "127.0.0.1:9020", "topic1", UserIdentity.ADMIN);
Deencapsulation.setField(routineLoadJob, "currentKafkaPartitions", partitionList2);
Assert.assertEquals(3, routineLoadJob.calculateCurrentConcurrentTaskNum());

// 4 partitions, 4 be
routineLoadJob = new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", clusterName2, 1L,
1L, "127.0.0.1:9020", "topic1");
1L, "127.0.0.1:9020", "topic1", UserIdentity.ADMIN);
Deencapsulation.setField(routineLoadJob, "currentKafkaPartitions", partitionList3);
Assert.assertEquals(4, routineLoadJob.calculateCurrentConcurrentTaskNum());

// 7 partitions, 4 be
routineLoadJob = new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", clusterName2, 1L,
1L, "127.0.0.1:9020", "topic1");
1L, "127.0.0.1:9020", "topic1", UserIdentity.ADMIN);
Deencapsulation.setField(routineLoadJob, "currentKafkaPartitions", partitionList4);
Assert.assertEquals(6, routineLoadJob.calculateCurrentConcurrentTaskNum());
}
Expand All @@ -162,7 +164,7 @@ public void testDivideRoutineLoadJob(@Injectable RoutineLoadManager routineLoadM

RoutineLoadJob routineLoadJob =
new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", "default", 1L,
1L, "127.0.0.1:9020", "topic1");
1L, "127.0.0.1:9020", "topic1", UserIdentity.ADMIN);

new Expectations(catalog) {
{
Expand Down Expand Up @@ -207,7 +209,7 @@ public void testProcessTimeOutTasks(@Injectable GlobalTransactionMgr globalTrans

RoutineLoadJob routineLoadJob =
new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", "default", 1L,
1L, "127.0.0.1:9020", "topic1");
1L, "127.0.0.1:9020", "topic1", UserIdentity.ADMIN);
long maxBatchIntervalS = 10;
Deencapsulation.setField(routineLoadJob, "maxBatchIntervalS", maxBatchIntervalS);
new Expectations() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.doris.analysis.CreateRoutineLoadStmt;
import org.apache.doris.analysis.SqlParser;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Table;
Expand All @@ -34,18 +35,19 @@
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java_cup.runtime.Symbol;
import mockit.Expectations;
import mockit.Injectable;
import mockit.Mock;
import mockit.MockUp;
import mockit.Mocked;

import org.apache.kafka.common.PartitionInfo;
import org.junit.Assert;
import org.junit.Test;

import java.util.List;
import java.util.Map;
import java_cup.runtime.Symbol;
import mockit.Expectations;
import mockit.Injectable;
import mockit.Mock;
import mockit.MockUp;
import mockit.Mocked;

public class RoutineLoadJobTest {

Expand Down Expand Up @@ -316,7 +318,7 @@ public void testGetBeIdToConcurrentTaskNum(@Injectable RoutineLoadTaskInfo routi
@Test
public void testGetShowCreateInfo() throws UserException {
KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(111L, "test_load", "test", 1,
11, "localhost:9092", "test_topic");
11, "localhost:9092", "test_topic", UserIdentity.ADMIN);
Deencapsulation.setField(routineLoadJob, "maxErrorNum", 10);
Deencapsulation.setField(routineLoadJob, "maxBatchRows", 10);
Deencapsulation.setField(routineLoadJob, "maxBatchRows", 10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.doris.analysis.Separator;
import org.apache.doris.analysis.StopRoutineLoadStmt;
import org.apache.doris.analysis.TableName;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.common.AnalysisException;
Expand All @@ -43,16 +44,13 @@
import org.apache.doris.persist.RoutineLoadOperation;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.system.BeSelectionPolicy;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TResourceInfo;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import mockit.Expectations;
import mockit.Injectable;
import mockit.Mock;
import mockit.MockUp;
import mockit.Mocked;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.Assert;
Expand All @@ -62,6 +60,11 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
import mockit.Expectations;
import mockit.Injectable;
import mockit.Mock;
import mockit.MockUp;
import mockit.Mocked;

public class RoutineLoadManagerTest {

Expand Down Expand Up @@ -98,7 +101,7 @@ public void testAddJobByStmt(@Injectable PaloAuth paloAuth,
createRoutineLoadStmt.setOrigStmt(new OriginStatement("dummy", 0));

KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, jobName, "default_cluster", 1L, 1L,
serverAddress, topicName);
serverAddress, topicName, UserIdentity.ADMIN);

new MockUp<KafkaRoutineLoadJob>() {
@Mock
Expand Down Expand Up @@ -197,15 +200,15 @@ public void testCreateWithSameName(@Mocked ConnectContext connectContext) {
String topicName = "topic1";
String serverAddress = "http://127.0.0.1:8080";
KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, jobName, "default_cluster", 1L, 1L,
serverAddress, topicName);
serverAddress, topicName, UserIdentity.ADMIN);

RoutineLoadManager routineLoadManager = new RoutineLoadManager();

Map<Long, Map<String, List<RoutineLoadJob>>> dbToNameToRoutineLoadJob = Maps.newConcurrentMap();
Map<String, List<RoutineLoadJob>> nameToRoutineLoadJob = Maps.newConcurrentMap();
List<RoutineLoadJob> routineLoadJobList = Lists.newArrayList();
KafkaRoutineLoadJob kafkaRoutineLoadJobWithSameName = new KafkaRoutineLoadJob(1L, jobName, "default_cluster",
1L, 1L, serverAddress, topicName);
1L, 1L, serverAddress, topicName, UserIdentity.ADMIN);
routineLoadJobList.add(kafkaRoutineLoadJobWithSameName);
nameToRoutineLoadJob.put(jobName, routineLoadJobList);
dbToNameToRoutineLoadJob.put(1L, nameToRoutineLoadJob);
Expand All @@ -227,7 +230,7 @@ public void testCreateWithSameNameOfStoppedJob(@Mocked ConnectContext connectCon
String topicName = "topic1";
String serverAddress = "http://127.0.0.1:8080";
KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, jobName, "default_cluster", 1L, 1L,
serverAddress, topicName);
serverAddress, topicName, UserIdentity.ADMIN);

RoutineLoadManager routineLoadManager = new RoutineLoadManager();

Expand All @@ -243,7 +246,7 @@ public void testCreateWithSameNameOfStoppedJob(@Mocked ConnectContext connectCon
Map<String, List<RoutineLoadJob>> nameToRoutineLoadJob = Maps.newConcurrentMap();
List<RoutineLoadJob> routineLoadJobList = Lists.newArrayList();
KafkaRoutineLoadJob kafkaRoutineLoadJobWithSameName = new KafkaRoutineLoadJob(1L, jobName, "default_cluster",
1L, 1L, serverAddress, topicName);
1L, 1L, serverAddress, topicName, UserIdentity.ADMIN);
Deencapsulation.setField(kafkaRoutineLoadJobWithSameName, "state", RoutineLoadJob.JobState.STOPPED);
routineLoadJobList.add(kafkaRoutineLoadJobWithSameName);
nameToRoutineLoadJob.put(jobName, routineLoadJobList);
Expand Down Expand Up @@ -737,23 +740,26 @@ public void testStopRoutineLoadJob(@Injectable StopRoutineLoadStmt stopRoutineLo

@Test
public void testCheckBeToTask(@Mocked Catalog catalog,
@Mocked SystemInfoService systemInfoService) throws LoadException {
@Mocked SystemInfoService systemInfoService) throws LoadException, DdlException {
List<Long> beIdsInCluster = Lists.newArrayList();
beIdsInCluster.add(1L);
Map<Long, Integer> beIdToMaxConcurrentTasks = Maps.newHashMap();
beIdToMaxConcurrentTasks.put(1L, 10);
new Expectations() {
{
systemInfoService.getClusterBackendIds("default", true);
systemInfoService.selectBackendIdsByPolicy((BeSelectionPolicy) any, anyInt);
minTimes = 0;
result = beIdsInCluster;
}
};

RoutineLoadManager routineLoadManager = new RoutineLoadManager();
KafkaRoutineLoadJob job = new KafkaRoutineLoadJob(1L, "testjob", SystemInfoService.DEFAULT_CLUSTER,
10000, 10001, "192.168.1.1:9090", "testtopic", UserIdentity.ADMIN);
routineLoadManager.addRoutineLoadJob(job, "testdb");
Config.max_routine_load_task_num_per_be = 10;
Deencapsulation.setField(routineLoadManager, "beIdToMaxConcurrentTasks", beIdToMaxConcurrentTasks);
Assert.assertEquals(1L, routineLoadManager.getAvailableBeForTask(1L, "default"));
Assert.assertEquals(1L, routineLoadManager.getAvailableBeForTask(1L, 1L, "default"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.load.routineload;

import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
Expand All @@ -33,15 +34,17 @@

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import mockit.Expectations;
import mockit.Injectable;
import mockit.Mocked;

import org.junit.Assert;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import mockit.Expectations;
import mockit.Injectable;
import mockit.Mocked;


public class RoutineLoadSchedulerTest {

Expand Down Expand Up @@ -73,7 +76,7 @@ public void testNormalRunOneCycle(@Mocked Catalog catalog,
Deencapsulation.setField(catalog, "routineLoadTaskScheduler", routineLoadTaskScheduler);

KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, "test", clusterName, 1L, 1L,
"xxx", "test");
"xxx", "test", UserIdentity.ADMIN);
Deencapsulation.setField(kafkaRoutineLoadJob,"state", RoutineLoadJob.JobState.NEED_SCHEDULE);
List<RoutineLoadJob> routineLoadJobList = new ArrayList<>();
routineLoadJobList.add(kafkaRoutineLoadJob);
Expand Down Expand Up @@ -136,7 +139,7 @@ public void functionTest(@Mocked Catalog catalog,
};

KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, "test", "default_cluster", 1L, 1L,
"10.74.167.16:8092", "test");
"10.74.167.16:8092", "test", UserIdentity.ADMIN);
RoutineLoadManager routineLoadManager = new RoutineLoadManager();
routineLoadManager.addRoutineLoadJob(kafkaRoutineLoadJob, "db");

Expand Down Expand Up @@ -167,7 +170,7 @@ public void functionTest(@Mocked Catalog catalog,
executorService.submit(routineLoadTaskScheduler);

KafkaRoutineLoadJob kafkaRoutineLoadJob1 = new KafkaRoutineLoadJob(1L, "test_custom_partition",
"default_cluster", 1L, 1L, "xxx", "test_1");
"default_cluster", 1L, 1L, "xxx", "test_1", UserIdentity.ADMIN);
List<Integer> customKafkaPartitions = new ArrayList<>();
customKafkaPartitions.add(2);
Deencapsulation.setField(kafkaRoutineLoadJob1, "customKafkaPartitions", customKafkaPartitions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.transaction;

import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.CatalogTestUtil;
import org.apache.doris.catalog.FakeCatalog;
Expand Down Expand Up @@ -55,8 +56,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import mockit.Injectable;
import mockit.Mocked;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.junit.Assert;
import org.junit.Before;
Expand All @@ -67,6 +67,9 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import mockit.Injectable;
import mockit.Mocked;


public class GlobalTransactionMgrTest {

Expand Down Expand Up @@ -313,7 +316,7 @@ public void testCommitRoutineLoadTransaction(@Injectable TabletCommitInfo tablet
transTablets.add(tabletCommitInfo3);

KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(1L, "test", "default_cluster", 1L, 1L, "host:port",
"topic");
"topic", UserIdentity.ADMIN);
List<RoutineLoadTaskInfo> routineLoadTaskInfoList = Deencapsulation.getField(routineLoadJob, "routineLoadTaskInfoList");
Map<Integer, Long> partitionIdToOffset = Maps.newHashMap();
partitionIdToOffset.put(1, 0L);
Expand Down Expand Up @@ -382,7 +385,9 @@ public void testCommitRoutineLoadTransactionWithErrorMax(@Injectable TabletCommi
transTablets.add(tabletCommitInfo2);
transTablets.add(tabletCommitInfo3);

KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(1L, "test", "default_cluster", 1L, 1L, "host:port", "topic");
KafkaRoutineLoadJob routineLoadJob =
new KafkaRoutineLoadJob(1L, "test", "default_cluster", 1L, 1L, "host:port", "topic",
UserIdentity.ADMIN);
List<RoutineLoadTaskInfo> routineLoadTaskInfoList = Deencapsulation.getField(routineLoadJob, "routineLoadTaskInfoList");
Map<Integer, Long> partitionIdToOffset = Maps.newHashMap();
partitionIdToOffset.put(1, 0L);
Expand Down

0 comments on commit 2c24f50

Please sign in to comment.