Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(pick-2.0.2.1)[Fix](MySqlLoad)Fixed the MySqlLoad will create a new thread every time checkpoint is made #26031 #26136

Merged
merged 2 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -1507,6 +1507,8 @@ private void startMasterOnlyDaemonThreads() {

// start threads that should running on all FE
private void startNonMasterDaemonThreads() {
// start load manager thread
loadManager.start();
tabletStatMgr.start();
// load and export job label cleaner thread
labelCleaner.start();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.common;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

public class CustomThreadFactory implements ThreadFactory {
private final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;

public CustomThreadFactory(String name) {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
namePrefix = name + "-" + poolNumber.getAndIncrement() + "-thread-";
}

@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
if (t.isDaemon()) {
t.setDaemon(false);
}
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}
return t;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ public LoadManager(LoadJobScheduler loadJobScheduler) {
this.mysqlLoadManager = new MysqlLoadManager(tokenManager);
}

public void start() {
tokenManager.start();
mysqlLoadManager.start();
}

/**
* This method will be invoked by the broker load(v2) now.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.doris.catalog.Env;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.Config;
import org.apache.doris.common.CustomThreadFactory;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.UserException;
Expand Down Expand Up @@ -72,7 +73,7 @@
public class MysqlLoadManager {
private static final Logger LOG = LogManager.getLogger(MysqlLoadManager.class);

private final ThreadPoolExecutor mysqlLoadPool;
private ThreadPoolExecutor mysqlLoadPool;
private final TokenManager tokenManager;

private static class MySqlLoadContext {
Expand Down Expand Up @@ -137,14 +138,20 @@ public boolean isExpired() {
}

private final Map<String, MySqlLoadContext> loadContextMap = new ConcurrentHashMap<>();
private final EvictingQueue<MySqlLoadFailRecord> failedRecords;
private ScheduledExecutorService periodScheduler = Executors.newScheduledThreadPool(1);
private EvictingQueue<MySqlLoadFailRecord> failedRecords;
private ScheduledExecutorService periodScheduler;

public MysqlLoadManager(TokenManager tokenManager) {
this.tokenManager = tokenManager;
}

public void start() {
this.periodScheduler = Executors.newScheduledThreadPool(1,
new CustomThreadFactory("mysql-load-fail-record-cleaner"));
int poolSize = Config.mysql_load_thread_pool;
// MySqlLoad pool can accept 4 + 4 * 5 = 24 requests by default.
this.mysqlLoadPool = ThreadPoolManager.newDaemonFixedThreadPool(poolSize, poolSize * 5, "Mysql Load", true);
this.tokenManager = tokenManager;
this.mysqlLoadPool = ThreadPoolManager.newDaemonFixedThreadPool(poolSize, poolSize * 5,
"Mysql Load", true);
this.failedRecords = EvictingQueue.create(Config.mysql_load_in_memory_record);
this.periodScheduler.scheduleAtFixedRate(this::cleanFailedRecords, 1, 24, TimeUnit.HOURS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.doris.catalog.Env;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
import org.apache.doris.common.CustomThreadFactory;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.UserException;
import org.apache.doris.thrift.FrontendService;
Expand All @@ -41,18 +42,21 @@
public class TokenManager {
private static final Logger LOG = LogManager.getLogger(TokenManager.class);

private final int thriftTimeoutMs = 300 * 1000;
private final EvictingQueue<String> tokenQueue;
private final ScheduledExecutorService tokenGenerator;
private int thriftTimeoutMs = 300 * 1000;
private EvictingQueue<String> tokenQueue;
private ScheduledExecutorService tokenGenerator;

public TokenManager() {
}

public void start() {
this.tokenQueue = EvictingQueue.create(Config.token_queue_size);
// init one token to avoid async issue.
this.tokenQueue.offer(generateNewToken());
this.tokenGenerator = Executors.newScheduledThreadPool(1);
this.tokenGenerator.scheduleAtFixedRate(() -> {
tokenQueue.offer(generateNewToken());
}, 0, Config.token_generate_period_hour, TimeUnit.HOURS);
this.tokenGenerator = Executors.newScheduledThreadPool(1,
new CustomThreadFactory("token-generator"));
this.tokenGenerator.scheduleAtFixedRate(() -> tokenQueue.offer(generateNewToken()), 0,
Config.token_generate_period_hour, TimeUnit.HOURS);
}

private String generateNewToken() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedView;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.common.CustomThreadFactory;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.io.Text;
Expand Down Expand Up @@ -66,9 +67,11 @@ public class MTMVJobManager {

private final MTMVTaskManager taskManager;

private ScheduledExecutorService periodScheduler = Executors.newScheduledThreadPool(1);
private ScheduledExecutorService periodScheduler = Executors.newScheduledThreadPool(1,
new CustomThreadFactory("mtmv-job-period-scheduler"));

private ScheduledExecutorService cleanerScheduler = Executors.newScheduledThreadPool(1);
private ScheduledExecutorService cleanerScheduler = Executors.newScheduledThreadPool(1,
new CustomThreadFactory("mtmv-job-cleaner-scheduler"));

private final ReentrantReadWriteLock rwLock;

Expand All @@ -86,13 +89,15 @@ public void start() {
// check the scheduler before using it
// since it may be shutdown when master change to follower without process shutdown.
if (periodScheduler.isShutdown()) {
periodScheduler = Executors.newScheduledThreadPool(1);
periodScheduler = Executors.newScheduledThreadPool(1,
new CustomThreadFactory("mtmv-job-period-scheduler"));
}

registerJobs();

if (cleanerScheduler.isShutdown()) {
cleanerScheduler = Executors.newScheduledThreadPool(1);
cleanerScheduler = Executors.newScheduledThreadPool(1,
new CustomThreadFactory("mtmv-job-cleaner-scheduler"));
}
cleanerScheduler.scheduleAtFixedRate(() -> {
if (!Env.getCurrentEnv().isMaster()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.CustomThreadFactory;
import org.apache.doris.mtmv.MTMVUtils.TaskState;
import org.apache.doris.mtmv.metadata.MTMVJob;
import org.apache.doris.mtmv.metadata.MTMVTask;
Expand Down Expand Up @@ -65,13 +66,14 @@ public class MTMVTaskManager {
// keep track of all the completed tasks
private final Deque<MTMVTask> historyTasks = Queues.newLinkedBlockingDeque();

private ScheduledExecutorService taskScheduler = Executors.newScheduledThreadPool(1);
private ScheduledExecutorService taskScheduler = Executors.newScheduledThreadPool(1,
new CustomThreadFactory("mtmv-task-scheduler"));

private final AtomicInteger failedTaskCount = new AtomicInteger(0);

public void startTaskScheduler() {
if (taskScheduler.isShutdown()) {
taskScheduler = Executors.newScheduledThreadPool(1);
taskScheduler = Executors.newScheduledThreadPool(1, new CustomThreadFactory("mtmv-task-scheduler"));
}
taskScheduler.scheduleAtFixedRate(() -> {
checkRunningTask();
Expand Down
Loading