From 44085266cd372c0eb7fd4ba83a5b6346cfe427d6 Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Mon, 30 Oct 2023 13:55:40 +0800 Subject: [PATCH 1/2] [Fix](MySqlLoad) Fix meaningless thread creation every time checkpoint mysql load (#26031) Add a unified thread name setting method (cherry picked from commit eb2cbae6e3b43c94e44791f3ae5e4d69e8202dba) --- .../java/org/apache/doris/catalog/Env.java | 2 + .../doris/common/CustomThreadFactory.java | 46 ++ .../apache/doris/load/loadv2/LoadManager.java | 5 + .../doris/load/loadv2/MysqlLoadManager.java | 17 +- .../doris/load/loadv2/TokenManager.java | 18 +- .../org/apache/doris/mtmv/MTMVJobManager.java | 13 +- .../apache/doris/mtmv/MTMVTaskManager.java | 6 +- .../scheduler/disruptor/TaskDisruptor.java | 134 +++++ .../scheduler/manager/TimerJobManager.java | 536 ++++++++++++++++++ 9 files changed, 759 insertions(+), 18 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/common/CustomThreadFactory.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index ea4410f60c6f58..9e17b2ee7ca37c 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -1507,6 +1507,8 @@ private void startMasterOnlyDaemonThreads() { // start threads that should running on all FE private void startNonMasterDaemonThreads() { + // start load manager thread + loadManager.start(); tabletStatMgr.start(); // load and export job label cleaner thread labelCleaner.start(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/CustomThreadFactory.java b/fe/fe-core/src/main/java/org/apache/doris/common/CustomThreadFactory.java new file mode 100644 index 00000000000000..153131ec25107b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/CustomThreadFactory.java @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +public class CustomThreadFactory implements ThreadFactory { + private final AtomicInteger poolNumber = new AtomicInteger(1); + private final ThreadGroup group; + private final AtomicInteger threadNumber = new AtomicInteger(1); + private final String namePrefix; + + public CustomThreadFactory(String name) { + SecurityManager s = System.getSecurityManager(); + group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); + namePrefix = name + "-" + poolNumber.getAndIncrement() + "-thread-"; + } + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); + if (t.isDaemon()) { + t.setDaemon(false); + } + if (t.getPriority() != Thread.NORM_PRIORITY) { + t.setPriority(Thread.NORM_PRIORITY); + } + return t; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index 73d4d1a57ad771..3f05e2c5a9887a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -104,6 +104,11 @@ public LoadManager(LoadJobScheduler loadJobScheduler) { this.mysqlLoadManager = new MysqlLoadManager(tokenManager); } + public void start() { + tokenManager.start(); + mysqlLoadManager.start(); + } + /** * This method will be invoked by the broker load(v2) now. */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java index ae1871e0d9ca28..42583506c56df3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java @@ -26,6 +26,7 @@ import org.apache.doris.catalog.Env; import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.Config; +import org.apache.doris.common.CustomThreadFactory; import org.apache.doris.common.LoadException; import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.UserException; @@ -72,7 +73,7 @@ public class MysqlLoadManager { private static final Logger LOG = LogManager.getLogger(MysqlLoadManager.class); - private final ThreadPoolExecutor mysqlLoadPool; + private ThreadPoolExecutor mysqlLoadPool; private final TokenManager tokenManager; private static class MySqlLoadContext { @@ -137,14 +138,20 @@ public boolean isExpired() { } private final Map loadContextMap = new ConcurrentHashMap<>(); - private final EvictingQueue failedRecords; - private ScheduledExecutorService periodScheduler = Executors.newScheduledThreadPool(1); + private EvictingQueue failedRecords; + private ScheduledExecutorService periodScheduler; public MysqlLoadManager(TokenManager tokenManager) { + this.tokenManager = tokenManager; + } + + public void start() { + this.periodScheduler = Executors.newScheduledThreadPool(1, + new CustomThreadFactory("mysql-load-fail-record-cleaner")); int poolSize = Config.mysql_load_thread_pool; // MySqlLoad pool can accept 4 + 4 * 5 = 24 requests by default. - this.mysqlLoadPool = ThreadPoolManager.newDaemonFixedThreadPool(poolSize, poolSize * 5, "Mysql Load", true); - this.tokenManager = tokenManager; + this.mysqlLoadPool = ThreadPoolManager.newDaemonFixedThreadPool(poolSize, poolSize * 5, + "Mysql Load", true); this.failedRecords = EvictingQueue.create(Config.mysql_load_in_memory_record); this.periodScheduler.scheduleAtFixedRate(this::cleanFailedRecords, 1, 24, TimeUnit.HOURS); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/TokenManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/TokenManager.java index f4cf451821298b..80f6c3f9b50fda 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/TokenManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/TokenManager.java @@ -20,6 +20,7 @@ import org.apache.doris.catalog.Env; import org.apache.doris.common.ClientPool; import org.apache.doris.common.Config; +import org.apache.doris.common.CustomThreadFactory; import org.apache.doris.common.FeConstants; import org.apache.doris.common.UserException; import org.apache.doris.thrift.FrontendService; @@ -41,18 +42,21 @@ public class TokenManager { private static final Logger LOG = LogManager.getLogger(TokenManager.class); - private final int thriftTimeoutMs = 300 * 1000; - private final EvictingQueue tokenQueue; - private final ScheduledExecutorService tokenGenerator; + private int thriftTimeoutMs = 300 * 1000; + private EvictingQueue tokenQueue; + private ScheduledExecutorService tokenGenerator; public TokenManager() { + } + + public void start() { this.tokenQueue = EvictingQueue.create(Config.token_queue_size); // init one token to avoid async issue. this.tokenQueue.offer(generateNewToken()); - this.tokenGenerator = Executors.newScheduledThreadPool(1); - this.tokenGenerator.scheduleAtFixedRate(() -> { - tokenQueue.offer(generateNewToken()); - }, 0, Config.token_generate_period_hour, TimeUnit.HOURS); + this.tokenGenerator = Executors.newScheduledThreadPool(1, + new CustomThreadFactory("token-generator")); + this.tokenGenerator.scheduleAtFixedRate(() -> tokenQueue.offer(generateNewToken()), 0, + Config.token_generate_period_hour, TimeUnit.HOURS); } private String generateNewToken() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java index 7295f40b6029e2..b58f26b863a278 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java @@ -21,6 +21,7 @@ import org.apache.doris.catalog.Env; import org.apache.doris.catalog.MaterializedView; import org.apache.doris.catalog.TableIf.TableType; +import org.apache.doris.common.CustomThreadFactory; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.io.Text; @@ -66,9 +67,11 @@ public class MTMVJobManager { private final MTMVTaskManager taskManager; - private ScheduledExecutorService periodScheduler = Executors.newScheduledThreadPool(1); + private ScheduledExecutorService periodScheduler = Executors.newScheduledThreadPool(1, + new CustomThreadFactory("mtmv-job-period-scheduler")); - private ScheduledExecutorService cleanerScheduler = Executors.newScheduledThreadPool(1); + private ScheduledExecutorService cleanerScheduler = Executors.newScheduledThreadPool(1, + new CustomThreadFactory("mtmv-job-cleaner-scheduler")); private final ReentrantReadWriteLock rwLock; @@ -86,13 +89,15 @@ public void start() { // check the scheduler before using it // since it may be shutdown when master change to follower without process shutdown. if (periodScheduler.isShutdown()) { - periodScheduler = Executors.newScheduledThreadPool(1); + periodScheduler = Executors.newScheduledThreadPool(1, + new CustomThreadFactory("mtmv-job-period-scheduler")); } registerJobs(); if (cleanerScheduler.isShutdown()) { - cleanerScheduler = Executors.newScheduledThreadPool(1); + cleanerScheduler = Executors.newScheduledThreadPool(1, + new CustomThreadFactory("mtmv-job-cleaner-scheduler")); } cleanerScheduler.scheduleAtFixedRate(() -> { if (!Env.getCurrentEnv().isMaster()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java index d6e370480bba73..138ede9e075f8d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java @@ -20,6 +20,7 @@ import org.apache.doris.catalog.Env; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; +import org.apache.doris.common.CustomThreadFactory; import org.apache.doris.mtmv.MTMVUtils.TaskState; import org.apache.doris.mtmv.metadata.MTMVJob; import org.apache.doris.mtmv.metadata.MTMVTask; @@ -65,13 +66,14 @@ public class MTMVTaskManager { // keep track of all the completed tasks private final Deque historyTasks = Queues.newLinkedBlockingDeque(); - private ScheduledExecutorService taskScheduler = Executors.newScheduledThreadPool(1); + private ScheduledExecutorService taskScheduler = Executors.newScheduledThreadPool(1, + new CustomThreadFactory("mtmv-task-scheduler")); private final AtomicInteger failedTaskCount = new AtomicInteger(0); public void startTaskScheduler() { if (taskScheduler.isShutdown()) { - taskScheduler = Executors.newScheduledThreadPool(1); + taskScheduler = Executors.newScheduledThreadPool(1, new CustomThreadFactory("mtmv-task-scheduler")); } taskScheduler.scheduleAtFixedRate(() -> { checkRunningTask(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java new file mode 100644 index 00000000000000..a8d98831f21201 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java @@ -0,0 +1,134 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.scheduler.disruptor; + +import org.apache.doris.common.Config; +import org.apache.doris.common.CustomThreadFactory; +import org.apache.doris.scheduler.constants.TaskType; +import org.apache.doris.scheduler.manager.TimerJobManager; +import org.apache.doris.scheduler.manager.TransientTaskManager; + +import com.lmax.disruptor.BlockingWaitStrategy; +import com.lmax.disruptor.EventTranslatorThreeArg; +import com.lmax.disruptor.TimeoutException; +import com.lmax.disruptor.WorkHandler; +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.dsl.ProducerType; +import lombok.extern.slf4j.Slf4j; + +import java.io.Closeable; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +/** + * This class represents a disruptor for processing event tasks consumed by a Disruptor. + * + *

The work handler retrieves the associated event job and executes it if it is running. + * If the event job is not running, the work handler logs an error message. If the event job execution fails, + * the work handler logs an error message and pauses the event job. + * + *

The work handler also handles system events by scheduling batch scheduler tasks. + */ +@Slf4j +public class TaskDisruptor implements Closeable { + + private final Disruptor disruptor; + private static final int DEFAULT_RING_BUFFER_SIZE = Config.async_task_queen_size; + + private static int consumerThreadCount = Config.async_task_consumer_thread_num; + + /** + * The default timeout for {@link #close()} in seconds. + */ + private static final int DEFAULT_CLOSE_WAIT_TIME_SECONDS = 5; + + /** + * Whether this disruptor has been closed. + * if true, then we can't publish any more events. + */ + private boolean isClosed = false; + + /** + * The default {@link EventTranslatorThreeArg} to use for {@link #tryPublish(Long, Long)}. + * This is used to avoid creating a new object for each publish. + */ + private static final EventTranslatorThreeArg TRANSLATOR + = (event, sequence, jobId, taskId, taskType) -> { + event.setId(jobId); + event.setTaskId(taskId); + event.setTaskType(taskType); + }; + + public TaskDisruptor(TimerJobManager timerJobManager, TransientTaskManager transientTaskManager) { + ThreadFactory producerThreadFactory = new CustomThreadFactory("task-disruptor-producer"); + disruptor = new Disruptor<>(TaskEvent.FACTORY, DEFAULT_RING_BUFFER_SIZE, producerThreadFactory, + ProducerType.SINGLE, new BlockingWaitStrategy()); + WorkHandler[] workers = new TaskHandler[consumerThreadCount]; + for (int i = 0; i < consumerThreadCount; i++) { + workers[i] = new TaskHandler(timerJobManager, transientTaskManager); + } + disruptor.handleEventsWithWorkerPool(workers); + disruptor.start(); + } + + /** + * Publishes a job to the disruptor. + * + * @param jobId job id + */ + public void tryPublish(Long jobId, Long taskId) { + if (isClosed) { + log.info("tryPublish failed, disruptor is closed, jobId: {}", jobId); + return; + } + try { + disruptor.publishEvent(TRANSLATOR, jobId, taskId, TaskType.TimerJobTask); + } catch (Exception e) { + log.error("tryPublish failed, jobId: {}", jobId, e); + } + } + + /** + * Publishes a task to the disruptor. + * + * @param taskId task id + */ + public void tryPublishTask(Long taskId) { + if (isClosed) { + log.info("tryPublish failed, disruptor is closed, taskId: {}", taskId); + return; + } + try { + disruptor.publishEvent(TRANSLATOR, taskId, 0L, TaskType.TransientTask); + } catch (Exception e) { + log.error("tryPublish failed, taskId: {}", taskId, e); + } + } + + + @Override + public void close() { + try { + isClosed = true; + // we can wait for 5 seconds, so that backlog can be committed + disruptor.shutdown(DEFAULT_CLOSE_WAIT_TIME_SECONDS, TimeUnit.SECONDS); + } catch (TimeoutException e) { + log.warn("close disruptor failed", e); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java new file mode 100644 index 00000000000000..c7a728cf049c55 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java @@ -0,0 +1,536 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.scheduler.manager; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.Config; +import org.apache.doris.common.CustomThreadFactory; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.PatternMatcher; +import org.apache.doris.common.io.Writable; +import org.apache.doris.common.util.LogBuilder; +import org.apache.doris.common.util.LogKey; +import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.scheduler.constants.JobCategory; +import org.apache.doris.scheduler.constants.JobStatus; +import org.apache.doris.scheduler.disruptor.TaskDisruptor; +import org.apache.doris.scheduler.job.Job; +import org.apache.doris.scheduler.job.TimerJobTask; + +import io.netty.util.HashedWheelTimer; +import io.netty.util.Timeout; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; + +import java.io.Closeable; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +@Slf4j +public class TimerJobManager implements Closeable, Writable { + + private final ConcurrentHashMap jobMap = new ConcurrentHashMap<>(128); + private long lastBatchSchedulerTimestamp; + private static final long BATCH_SCHEDULER_INTERVAL_SECONDS = 600; + + /** + * batch scheduler interval ms time + */ + private static final long BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS = BATCH_SCHEDULER_INTERVAL_SECONDS * 1000L; + + private boolean isClosed = false; + + /** + * key: jobid + * value: timeout list for one job + * it's used to cancel task, if task has started, it can't be canceled + */ + private final ConcurrentHashMap> jobTimeoutMap = new ConcurrentHashMap<>(128); + + /** + * scheduler tasks, it's used to scheduler job + */ + private HashedWheelTimer dorisTimer; + + /** + * Producer and Consumer model + * disruptor is used to handle task + * disruptor will start a thread pool to handle task + */ + @Setter + private TaskDisruptor disruptor; + + public TimerJobManager() { + this.lastBatchSchedulerTimestamp = System.currentTimeMillis(); + } + + public void start() { + dorisTimer = new HashedWheelTimer(new CustomThreadFactory("hashed-wheel-timer"), + 1, TimeUnit.SECONDS, 660); + dorisTimer.start(); + Long currentTimeMs = System.currentTimeMillis(); + jobMap.forEach((jobId, job) -> { + Long nextExecuteTimeMs = findFistExecuteTime(currentTimeMs, job.getStartTimeMs(), + job.getIntervalMs(), job.isCycleJob()); + job.setNextExecuteTimeMs(nextExecuteTimeMs); + }); + batchSchedulerTasks(); + cycleSystemSchedulerTasks(); + } + + public Long registerJob(Job job) throws DdlException { + job.checkJobParam(); + checkIsJobNameUsed(job.getDbName(), job.getJobName(), job.getJobCategory()); + jobMap.putIfAbsent(job.getJobId(), job); + initAndSchedulerJob(job); + Env.getCurrentEnv().getEditLog().logCreateJob(job); + return job.getJobId(); + } + + public void replayCreateJob(Job job) { + if (jobMap.containsKey(job.getJobId())) { + return; + } + jobMap.putIfAbsent(job.getJobId(), job); + initAndSchedulerJob(job); + log.info(new LogBuilder(LogKey.SCHEDULER_JOB, job.getJobId()) + .add("msg", "replay create scheduler job").build()); + } + + /** + * Replay update load job. + **/ + public void replayUpdateJob(Job job) { + jobMap.put(job.getJobId(), job); + if (JobStatus.RUNNING.equals(job.getJobStatus())) { + initAndSchedulerJob(job); + } + log.info(new LogBuilder(LogKey.SCHEDULER_JOB, job.getJobId()) + .add("msg", "replay update scheduler job").build()); + } + + public void replayDeleteJob(Job job) { + if (null == jobMap.get(job.getJobId())) { + return; + } + jobMap.remove(job.getJobId()); + log.info(new LogBuilder(LogKey.SCHEDULER_JOB, job.getJobId()) + .add("msg", "replay delete scheduler job").build()); + Env.getCurrentEnv().getJobTaskManager().deleteJobTasks(job.getJobId()); + } + + private void checkIsJobNameUsed(String dbName, String jobName, JobCategory jobCategory) throws DdlException { + Optional optionalJob = jobMap.values().stream().filter(job -> job.getJobCategory().equals(jobCategory)) + .filter(job -> job.getDbName().equals(dbName)) + .filter(job -> job.getJobName().equals(jobName)).findFirst(); + if (optionalJob.isPresent()) { + throw new DdlException("Name " + jobName + " already used in db " + dbName); + } + } + + private void initAndSchedulerJob(Job job) { + if (!job.getJobStatus().equals(JobStatus.RUNNING)) { + return; + } + + Long currentTimeMs = System.currentTimeMillis(); + Long nextExecuteTimeMs = findFistExecuteTime(currentTimeMs, job.getStartTimeMs(), + job.getIntervalMs(), job.isCycleJob()); + job.setNextExecuteTimeMs(nextExecuteTimeMs); + if (job.getNextExecuteTimeMs() < lastBatchSchedulerTimestamp) { + List executeTimestamp = findTasksBetweenTime(job, + lastBatchSchedulerTimestamp, + job.getNextExecuteTimeMs()); + if (!executeTimestamp.isEmpty()) { + for (Long timestamp : executeTimestamp) { + putOneTask(job.getJobId(), timestamp); + } + } + } + } + + private Long findFistExecuteTime(Long currentTimeMs, Long startTimeMs, Long intervalMs, boolean isCycleJob) { + // if job not delay, first execute time is start time + if (startTimeMs != 0L && startTimeMs > currentTimeMs) { + return startTimeMs; + } + // if job already delay, first execute time is current time + if (startTimeMs != 0L && startTimeMs < currentTimeMs) { + return currentTimeMs; + } + // if it's cycle job and not set start tine, first execute time is current time + interval + if (isCycleJob && startTimeMs == 0L) { + return currentTimeMs + intervalMs; + } + // if it's not cycle job and already delay, first execute time is current time + return currentTimeMs; + } + + public void unregisterJob(Long jobId) { + jobMap.remove(jobId); + } + + public void pauseJob(Long jobId) { + Job job = jobMap.get(jobId); + if (jobMap.get(jobId) == null) { + log.warn("pauseJob failed, jobId: {} not exist", jobId); + } + if (jobMap.get(jobId).getJobStatus().equals(JobStatus.PAUSED)) { + log.warn("pauseJob failed, jobId: {} is already paused", jobId); + } + pauseJob(job); + } + + public void stopJob(String dbName, String jobName, JobCategory jobCategory) throws DdlException { + Optional optionalJob = findJob(dbName, jobName, jobCategory); + + if (!optionalJob.isPresent()) { + throw new DdlException("Job " + jobName + " not exist in db " + dbName); + } + Job job = optionalJob.get(); + if (job.getJobStatus().equals(JobStatus.STOPPED)) { + throw new DdlException("Job " + jobName + " is already stopped"); + } + stopJob(optionalJob.get()); + Env.getCurrentEnv().getEditLog().logDeleteJob(optionalJob.get()); + } + + private void stopJob(Job job) { + if (JobStatus.RUNNING.equals(job.getJobStatus())) { + cancelJobAllTask(job.getJobId()); + } + job.setJobStatus(JobStatus.STOPPED); + jobMap.get(job.getJobId()).stop(); + Env.getCurrentEnv().getEditLog().logDeleteJob(job); + Env.getCurrentEnv().getJobTaskManager().deleteJobTasks(job.getJobId()); + } + + + public void resumeJob(String dbName, String jobName, JobCategory jobCategory) throws DdlException { + Optional optionalJob = findJob(dbName, jobName, jobCategory); + if (!optionalJob.isPresent()) { + throw new DdlException("Job " + jobName + " not exist in db " + dbName); + } + Job job = optionalJob.get(); + if (!job.getJobStatus().equals(JobStatus.PAUSED)) { + throw new DdlException("Job " + jobName + " is not paused"); + } + resumeJob(job); + } + + private void resumeJob(Job job) { + cancelJobAllTask(job.getJobId()); + job.setJobStatus(JobStatus.RUNNING); + jobMap.get(job.getJobId()).resume(); + initAndSchedulerJob(job); + Env.getCurrentEnv().getEditLog().logUpdateJob(job); + } + + public void pauseJob(String dbName, String jobName, JobCategory jobCategory) throws DdlException { + Optional optionalJob = findJob(dbName, jobName, jobCategory); + if (!optionalJob.isPresent()) { + throw new DdlException("Job " + jobName + " not exist in db " + dbName); + } + Job job = optionalJob.get(); + if (!job.getJobStatus().equals(JobStatus.RUNNING)) { + throw new DdlException("Job " + jobName + " is not running"); + } + pauseJob(job); + } + + private void pauseJob(Job job) { + cancelJobAllTask(job.getJobId()); + job.setJobStatus(JobStatus.PAUSED); + jobMap.get(job.getJobId()).pause(); + Env.getCurrentEnv().getEditLog().logUpdateJob(job); + } + + public void finishJob(long jobId) { + Job job = jobMap.get(jobId); + if (jobMap.get(jobId) == null) { + log.warn("update job status failed, jobId: {} not exist", jobId); + } + if (jobMap.get(jobId).getJobStatus().equals(JobStatus.FINISHED)) { + return; + } + job.setLatestCompleteExecuteTimeMs(System.currentTimeMillis()); + cancelJobAllTask(job.getJobId()); + job.setJobStatus(JobStatus.FINISHED); + Env.getCurrentEnv().getEditLog().logUpdateJob(job); + } + + private Optional findJob(String dbName, String jobName, JobCategory jobCategory) { + return jobMap.values().stream().filter(job -> checkJobMatch(job, dbName, jobName, jobCategory)).findFirst(); + } + + private boolean checkJobMatch(Job job, String dbName, String jobName, JobCategory jobCategory) { + return job.getDbName().equals(dbName) && job.getJobName().equals(jobName) + && job.getJobCategory().equals(jobCategory); + } + + + public void resumeJob(Long jobId) { + if (jobMap.get(jobId) == null) { + log.warn("resumeJob failed, jobId: {} not exist", jobId); + return; + } + Job job = jobMap.get(jobId); + resumeJob(job); + } + + public void stopJob(Long jobId) { + Job job = jobMap.get(jobId); + if (null == job) { + log.warn("stopJob failed, jobId: {} not exist", jobId); + return; + } + if (job.getJobStatus().equals(JobStatus.STOPPED)) { + log.warn("stopJob failed, jobId: {} is already stopped", jobId); + return; + } + stopJob(job); + } + + public Job getJob(Long jobId) { + return jobMap.get(jobId); + } + + public Map getAllJob() { + return jobMap; + } + + public void batchSchedulerTasks() { + executeJobIdsWithinLastTenMinutesWindow(); + } + + private List findTasksBetweenTime(Job job, Long endTimeEndWindow, Long nextExecuteTime) { + + List jobExecuteTimes = new ArrayList<>(); + if (!job.isCycleJob() && (nextExecuteTime < endTimeEndWindow)) { + jobExecuteTimes.add(nextExecuteTime); + return jobExecuteTimes; + } + if (job.isCycleJob() && (nextExecuteTime > endTimeEndWindow)) { + return new ArrayList<>(); + } + while (endTimeEndWindow >= nextExecuteTime) { + if (job.isTaskTimeExceeded()) { + break; + } + jobExecuteTimes.add(nextExecuteTime); + nextExecuteTime = job.getExecuteTimestampAndGeneratorNext(); + } + return jobExecuteTimes; + } + + /** + * We will get the task in the next time window, and then hand it over to the time wheel for timing trigger + */ + private void executeJobIdsWithinLastTenMinutesWindow() { + // if the task executes for more than 10 minutes, it will be delay, so, + // set lastBatchSchedulerTimestamp to current time + if (lastBatchSchedulerTimestamp < System.currentTimeMillis()) { + this.lastBatchSchedulerTimestamp = System.currentTimeMillis(); + } + this.lastBatchSchedulerTimestamp += BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS; + if (jobMap.isEmpty()) { + return; + } + jobMap.forEach((k, v) -> { + if (v.isRunning() && (v.getNextExecuteTimeMs() + + v.getIntervalMs() < lastBatchSchedulerTimestamp)) { + List executeTimes = findTasksBetweenTime( + v, lastBatchSchedulerTimestamp, + v.getNextExecuteTimeMs()); + if (!executeTimes.isEmpty()) { + for (Long executeTime : executeTimes) { + putOneTask(v.getJobId(), executeTime); + } + } + } + }); + } + + /** + * We will cycle system scheduler tasks every 10 minutes. + * Jobs will be re-registered after the task is completed + */ + private void cycleSystemSchedulerTasks() { + log.info("re-register system scheduler tasks" + TimeUtils.longToTimeString(System.currentTimeMillis())); + dorisTimer.newTimeout(timeout -> { + batchSchedulerTasks(); + clearFinishJob(); + cycleSystemSchedulerTasks(); + }, BATCH_SCHEDULER_INTERVAL_SECONDS, TimeUnit.SECONDS); + + } + + /** + * put one task to time wheel,it's well be trigger after delay milliseconds + * if the scheduler is closed, the task will not be put into the time wheel + * if delay is less than 0, the task will be trigger immediately + * + * @param jobId job id, we will use it to find the job + * @param startExecuteTime the task will be trigger in this time, unit is millisecond,and we will convert it to + * delay seconds, we just can be second precision + */ + public void putOneTask(Long jobId, Long startExecuteTime) { + if (isClosed) { + log.info("putOneTask failed, scheduler is closed, jobId: {}", jobId); + return; + } + long taskId = System.nanoTime(); + TimerJobTask task = new TimerJobTask(jobId, taskId, startExecuteTime, disruptor); + long delay = getDelaySecond(task.getStartTimestamp()); + Timeout timeout = dorisTimer.newTimeout(task, delay, TimeUnit.SECONDS); + if (timeout == null) { + log.error("putOneTask failed, jobId: {}", task.getJobId()); + return; + } + if (jobTimeoutMap.containsKey(task.getJobId())) { + jobTimeoutMap.get(task.getJobId()).put(task.getTaskId(), timeout); + JobTaskManager.addPrepareTaskStartTime(jobId, taskId, startExecuteTime); + return; + } + Map timeoutMap = new ConcurrentHashMap<>(); + timeoutMap.put(task.getTaskId(), timeout); + jobTimeoutMap.put(task.getJobId(), timeoutMap); + JobTaskManager.addPrepareTaskStartTime(jobId, taskId, startExecuteTime); + } + + // cancel all task for one job + // if task has started, it can't be canceled + public void cancelJobAllTask(Long jobId) { + if (!jobTimeoutMap.containsKey(jobId)) { + return; + } + + jobTimeoutMap.get(jobId).values().forEach(timeout -> { + if (!timeout.isExpired() || timeout.isCancelled()) { + timeout.cancel(); + } + }); + JobTaskManager.clearPrepareTaskByJobId(jobId); + } + + // get delay time, if startTimestamp is less than now, return 0 + private long getDelaySecond(long startTimestamp) { + long delay = 0; + long now = System.currentTimeMillis(); + if (startTimestamp > now) { + delay = startTimestamp - now; + } else { + //if execute time is less than now, return 0,immediately execute + log.warn("startTimestamp is less than now, startTimestamp: {}, now: {}", startTimestamp, now); + return delay; + } + return delay / 1000; + } + + @Override + public void close() throws IOException { + isClosed = true; + dorisTimer.stop(); + disruptor.close(); + } + + /** + * sort by job id + * + * @param dbFullName database name + * @param category job category + * @param matcher job name matcher + */ + public List queryJob(String dbFullName, String jobName, JobCategory category, PatternMatcher matcher) { + List jobs = new ArrayList<>(); + jobMap.values().forEach(job -> { + if (matchJob(job, dbFullName, jobName, category, matcher)) { + jobs.add(job); + } + }); + return jobs; + } + + private boolean matchJob(Job job, String dbFullName, String jobName, JobCategory category, PatternMatcher matcher) { + if (StringUtils.isNotBlank(dbFullName) && !job.getDbName().equalsIgnoreCase(dbFullName)) { + return false; + } + if (StringUtils.isNotBlank(jobName) && !job.getJobName().equalsIgnoreCase(jobName)) { + return false; + } + if (category != null && !job.getJobCategory().equals(category)) { + return false; + } + return null == matcher || matcher.match(job.getJobName()); + } + + public void putOneJobToQueen(Long jobId) { + long taskId = System.nanoTime(); + JobTaskManager.addPrepareTaskStartTime(jobId, taskId, System.currentTimeMillis()); + disruptor.tryPublish(jobId, taskId); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(jobMap.size()); + for (Job job : jobMap.values()) { + job.write(out); + } + } + + /** + * read job from data input, and init job + * + * @param in data input + * @throws IOException io exception when read data input error + */ + public void readFields(DataInput in) throws IOException { + int size = in.readInt(); + for (int i = 0; i < size; i++) { + Job job = Job.readFields(in); + jobMap.putIfAbsent(job.getJobId(), job); + } + } + + /** + * clear finish job,if job finish time is more than @Config.finish_job_max_saved_second, we will delete it + * this method will be called every 10 minutes, therefore, the actual maximum + * deletion time is Config.finish_job_max_saved_second + 10 min. + * we could to delete job in time, but it's not make sense.start + */ + private void clearFinishJob() { + Long now = System.currentTimeMillis(); + jobMap.values().forEach(job -> { + if (job.isFinished() && now - job.getLatestCompleteExecuteTimeMs() > Config.finish_job_max_saved_second) { + jobMap.remove(job.getJobId()); + Env.getCurrentEnv().getEditLog().logDeleteJob(job); + Env.getCurrentEnv().getJobTaskManager().deleteJobTasks(job.getJobId()); + log.debug("delete finish job:{}", job.getJobId()); + } + }); + + } +} From 65846af6afab125b3d91a5adc5de35a29ab14503 Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Tue, 31 Oct 2023 10:20:06 +0800 Subject: [PATCH 2/2] remove unless code --- .../scheduler/disruptor/TaskDisruptor.java | 134 ----- .../scheduler/manager/TimerJobManager.java | 536 ------------------ 2 files changed, 670 deletions(-) delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java deleted file mode 100644 index a8d98831f21201..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java +++ /dev/null @@ -1,134 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.scheduler.disruptor; - -import org.apache.doris.common.Config; -import org.apache.doris.common.CustomThreadFactory; -import org.apache.doris.scheduler.constants.TaskType; -import org.apache.doris.scheduler.manager.TimerJobManager; -import org.apache.doris.scheduler.manager.TransientTaskManager; - -import com.lmax.disruptor.BlockingWaitStrategy; -import com.lmax.disruptor.EventTranslatorThreeArg; -import com.lmax.disruptor.TimeoutException; -import com.lmax.disruptor.WorkHandler; -import com.lmax.disruptor.dsl.Disruptor; -import com.lmax.disruptor.dsl.ProducerType; -import lombok.extern.slf4j.Slf4j; - -import java.io.Closeable; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; - -/** - * This class represents a disruptor for processing event tasks consumed by a Disruptor. - * - *

The work handler retrieves the associated event job and executes it if it is running. - * If the event job is not running, the work handler logs an error message. If the event job execution fails, - * the work handler logs an error message and pauses the event job. - * - *

The work handler also handles system events by scheduling batch scheduler tasks. - */ -@Slf4j -public class TaskDisruptor implements Closeable { - - private final Disruptor disruptor; - private static final int DEFAULT_RING_BUFFER_SIZE = Config.async_task_queen_size; - - private static int consumerThreadCount = Config.async_task_consumer_thread_num; - - /** - * The default timeout for {@link #close()} in seconds. - */ - private static final int DEFAULT_CLOSE_WAIT_TIME_SECONDS = 5; - - /** - * Whether this disruptor has been closed. - * if true, then we can't publish any more events. - */ - private boolean isClosed = false; - - /** - * The default {@link EventTranslatorThreeArg} to use for {@link #tryPublish(Long, Long)}. - * This is used to avoid creating a new object for each publish. - */ - private static final EventTranslatorThreeArg TRANSLATOR - = (event, sequence, jobId, taskId, taskType) -> { - event.setId(jobId); - event.setTaskId(taskId); - event.setTaskType(taskType); - }; - - public TaskDisruptor(TimerJobManager timerJobManager, TransientTaskManager transientTaskManager) { - ThreadFactory producerThreadFactory = new CustomThreadFactory("task-disruptor-producer"); - disruptor = new Disruptor<>(TaskEvent.FACTORY, DEFAULT_RING_BUFFER_SIZE, producerThreadFactory, - ProducerType.SINGLE, new BlockingWaitStrategy()); - WorkHandler[] workers = new TaskHandler[consumerThreadCount]; - for (int i = 0; i < consumerThreadCount; i++) { - workers[i] = new TaskHandler(timerJobManager, transientTaskManager); - } - disruptor.handleEventsWithWorkerPool(workers); - disruptor.start(); - } - - /** - * Publishes a job to the disruptor. - * - * @param jobId job id - */ - public void tryPublish(Long jobId, Long taskId) { - if (isClosed) { - log.info("tryPublish failed, disruptor is closed, jobId: {}", jobId); - return; - } - try { - disruptor.publishEvent(TRANSLATOR, jobId, taskId, TaskType.TimerJobTask); - } catch (Exception e) { - log.error("tryPublish failed, jobId: {}", jobId, e); - } - } - - /** - * Publishes a task to the disruptor. - * - * @param taskId task id - */ - public void tryPublishTask(Long taskId) { - if (isClosed) { - log.info("tryPublish failed, disruptor is closed, taskId: {}", taskId); - return; - } - try { - disruptor.publishEvent(TRANSLATOR, taskId, 0L, TaskType.TransientTask); - } catch (Exception e) { - log.error("tryPublish failed, taskId: {}", taskId, e); - } - } - - - @Override - public void close() { - try { - isClosed = true; - // we can wait for 5 seconds, so that backlog can be committed - disruptor.shutdown(DEFAULT_CLOSE_WAIT_TIME_SECONDS, TimeUnit.SECONDS); - } catch (TimeoutException e) { - log.warn("close disruptor failed", e); - } - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java deleted file mode 100644 index c7a728cf049c55..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java +++ /dev/null @@ -1,536 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.scheduler.manager; - -import org.apache.doris.catalog.Env; -import org.apache.doris.common.Config; -import org.apache.doris.common.CustomThreadFactory; -import org.apache.doris.common.DdlException; -import org.apache.doris.common.PatternMatcher; -import org.apache.doris.common.io.Writable; -import org.apache.doris.common.util.LogBuilder; -import org.apache.doris.common.util.LogKey; -import org.apache.doris.common.util.TimeUtils; -import org.apache.doris.scheduler.constants.JobCategory; -import org.apache.doris.scheduler.constants.JobStatus; -import org.apache.doris.scheduler.disruptor.TaskDisruptor; -import org.apache.doris.scheduler.job.Job; -import org.apache.doris.scheduler.job.TimerJobTask; - -import io.netty.util.HashedWheelTimer; -import io.netty.util.Timeout; -import lombok.Setter; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; - -import java.io.Closeable; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; - -@Slf4j -public class TimerJobManager implements Closeable, Writable { - - private final ConcurrentHashMap jobMap = new ConcurrentHashMap<>(128); - private long lastBatchSchedulerTimestamp; - private static final long BATCH_SCHEDULER_INTERVAL_SECONDS = 600; - - /** - * batch scheduler interval ms time - */ - private static final long BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS = BATCH_SCHEDULER_INTERVAL_SECONDS * 1000L; - - private boolean isClosed = false; - - /** - * key: jobid - * value: timeout list for one job - * it's used to cancel task, if task has started, it can't be canceled - */ - private final ConcurrentHashMap> jobTimeoutMap = new ConcurrentHashMap<>(128); - - /** - * scheduler tasks, it's used to scheduler job - */ - private HashedWheelTimer dorisTimer; - - /** - * Producer and Consumer model - * disruptor is used to handle task - * disruptor will start a thread pool to handle task - */ - @Setter - private TaskDisruptor disruptor; - - public TimerJobManager() { - this.lastBatchSchedulerTimestamp = System.currentTimeMillis(); - } - - public void start() { - dorisTimer = new HashedWheelTimer(new CustomThreadFactory("hashed-wheel-timer"), - 1, TimeUnit.SECONDS, 660); - dorisTimer.start(); - Long currentTimeMs = System.currentTimeMillis(); - jobMap.forEach((jobId, job) -> { - Long nextExecuteTimeMs = findFistExecuteTime(currentTimeMs, job.getStartTimeMs(), - job.getIntervalMs(), job.isCycleJob()); - job.setNextExecuteTimeMs(nextExecuteTimeMs); - }); - batchSchedulerTasks(); - cycleSystemSchedulerTasks(); - } - - public Long registerJob(Job job) throws DdlException { - job.checkJobParam(); - checkIsJobNameUsed(job.getDbName(), job.getJobName(), job.getJobCategory()); - jobMap.putIfAbsent(job.getJobId(), job); - initAndSchedulerJob(job); - Env.getCurrentEnv().getEditLog().logCreateJob(job); - return job.getJobId(); - } - - public void replayCreateJob(Job job) { - if (jobMap.containsKey(job.getJobId())) { - return; - } - jobMap.putIfAbsent(job.getJobId(), job); - initAndSchedulerJob(job); - log.info(new LogBuilder(LogKey.SCHEDULER_JOB, job.getJobId()) - .add("msg", "replay create scheduler job").build()); - } - - /** - * Replay update load job. - **/ - public void replayUpdateJob(Job job) { - jobMap.put(job.getJobId(), job); - if (JobStatus.RUNNING.equals(job.getJobStatus())) { - initAndSchedulerJob(job); - } - log.info(new LogBuilder(LogKey.SCHEDULER_JOB, job.getJobId()) - .add("msg", "replay update scheduler job").build()); - } - - public void replayDeleteJob(Job job) { - if (null == jobMap.get(job.getJobId())) { - return; - } - jobMap.remove(job.getJobId()); - log.info(new LogBuilder(LogKey.SCHEDULER_JOB, job.getJobId()) - .add("msg", "replay delete scheduler job").build()); - Env.getCurrentEnv().getJobTaskManager().deleteJobTasks(job.getJobId()); - } - - private void checkIsJobNameUsed(String dbName, String jobName, JobCategory jobCategory) throws DdlException { - Optional optionalJob = jobMap.values().stream().filter(job -> job.getJobCategory().equals(jobCategory)) - .filter(job -> job.getDbName().equals(dbName)) - .filter(job -> job.getJobName().equals(jobName)).findFirst(); - if (optionalJob.isPresent()) { - throw new DdlException("Name " + jobName + " already used in db " + dbName); - } - } - - private void initAndSchedulerJob(Job job) { - if (!job.getJobStatus().equals(JobStatus.RUNNING)) { - return; - } - - Long currentTimeMs = System.currentTimeMillis(); - Long nextExecuteTimeMs = findFistExecuteTime(currentTimeMs, job.getStartTimeMs(), - job.getIntervalMs(), job.isCycleJob()); - job.setNextExecuteTimeMs(nextExecuteTimeMs); - if (job.getNextExecuteTimeMs() < lastBatchSchedulerTimestamp) { - List executeTimestamp = findTasksBetweenTime(job, - lastBatchSchedulerTimestamp, - job.getNextExecuteTimeMs()); - if (!executeTimestamp.isEmpty()) { - for (Long timestamp : executeTimestamp) { - putOneTask(job.getJobId(), timestamp); - } - } - } - } - - private Long findFistExecuteTime(Long currentTimeMs, Long startTimeMs, Long intervalMs, boolean isCycleJob) { - // if job not delay, first execute time is start time - if (startTimeMs != 0L && startTimeMs > currentTimeMs) { - return startTimeMs; - } - // if job already delay, first execute time is current time - if (startTimeMs != 0L && startTimeMs < currentTimeMs) { - return currentTimeMs; - } - // if it's cycle job and not set start tine, first execute time is current time + interval - if (isCycleJob && startTimeMs == 0L) { - return currentTimeMs + intervalMs; - } - // if it's not cycle job and already delay, first execute time is current time - return currentTimeMs; - } - - public void unregisterJob(Long jobId) { - jobMap.remove(jobId); - } - - public void pauseJob(Long jobId) { - Job job = jobMap.get(jobId); - if (jobMap.get(jobId) == null) { - log.warn("pauseJob failed, jobId: {} not exist", jobId); - } - if (jobMap.get(jobId).getJobStatus().equals(JobStatus.PAUSED)) { - log.warn("pauseJob failed, jobId: {} is already paused", jobId); - } - pauseJob(job); - } - - public void stopJob(String dbName, String jobName, JobCategory jobCategory) throws DdlException { - Optional optionalJob = findJob(dbName, jobName, jobCategory); - - if (!optionalJob.isPresent()) { - throw new DdlException("Job " + jobName + " not exist in db " + dbName); - } - Job job = optionalJob.get(); - if (job.getJobStatus().equals(JobStatus.STOPPED)) { - throw new DdlException("Job " + jobName + " is already stopped"); - } - stopJob(optionalJob.get()); - Env.getCurrentEnv().getEditLog().logDeleteJob(optionalJob.get()); - } - - private void stopJob(Job job) { - if (JobStatus.RUNNING.equals(job.getJobStatus())) { - cancelJobAllTask(job.getJobId()); - } - job.setJobStatus(JobStatus.STOPPED); - jobMap.get(job.getJobId()).stop(); - Env.getCurrentEnv().getEditLog().logDeleteJob(job); - Env.getCurrentEnv().getJobTaskManager().deleteJobTasks(job.getJobId()); - } - - - public void resumeJob(String dbName, String jobName, JobCategory jobCategory) throws DdlException { - Optional optionalJob = findJob(dbName, jobName, jobCategory); - if (!optionalJob.isPresent()) { - throw new DdlException("Job " + jobName + " not exist in db " + dbName); - } - Job job = optionalJob.get(); - if (!job.getJobStatus().equals(JobStatus.PAUSED)) { - throw new DdlException("Job " + jobName + " is not paused"); - } - resumeJob(job); - } - - private void resumeJob(Job job) { - cancelJobAllTask(job.getJobId()); - job.setJobStatus(JobStatus.RUNNING); - jobMap.get(job.getJobId()).resume(); - initAndSchedulerJob(job); - Env.getCurrentEnv().getEditLog().logUpdateJob(job); - } - - public void pauseJob(String dbName, String jobName, JobCategory jobCategory) throws DdlException { - Optional optionalJob = findJob(dbName, jobName, jobCategory); - if (!optionalJob.isPresent()) { - throw new DdlException("Job " + jobName + " not exist in db " + dbName); - } - Job job = optionalJob.get(); - if (!job.getJobStatus().equals(JobStatus.RUNNING)) { - throw new DdlException("Job " + jobName + " is not running"); - } - pauseJob(job); - } - - private void pauseJob(Job job) { - cancelJobAllTask(job.getJobId()); - job.setJobStatus(JobStatus.PAUSED); - jobMap.get(job.getJobId()).pause(); - Env.getCurrentEnv().getEditLog().logUpdateJob(job); - } - - public void finishJob(long jobId) { - Job job = jobMap.get(jobId); - if (jobMap.get(jobId) == null) { - log.warn("update job status failed, jobId: {} not exist", jobId); - } - if (jobMap.get(jobId).getJobStatus().equals(JobStatus.FINISHED)) { - return; - } - job.setLatestCompleteExecuteTimeMs(System.currentTimeMillis()); - cancelJobAllTask(job.getJobId()); - job.setJobStatus(JobStatus.FINISHED); - Env.getCurrentEnv().getEditLog().logUpdateJob(job); - } - - private Optional findJob(String dbName, String jobName, JobCategory jobCategory) { - return jobMap.values().stream().filter(job -> checkJobMatch(job, dbName, jobName, jobCategory)).findFirst(); - } - - private boolean checkJobMatch(Job job, String dbName, String jobName, JobCategory jobCategory) { - return job.getDbName().equals(dbName) && job.getJobName().equals(jobName) - && job.getJobCategory().equals(jobCategory); - } - - - public void resumeJob(Long jobId) { - if (jobMap.get(jobId) == null) { - log.warn("resumeJob failed, jobId: {} not exist", jobId); - return; - } - Job job = jobMap.get(jobId); - resumeJob(job); - } - - public void stopJob(Long jobId) { - Job job = jobMap.get(jobId); - if (null == job) { - log.warn("stopJob failed, jobId: {} not exist", jobId); - return; - } - if (job.getJobStatus().equals(JobStatus.STOPPED)) { - log.warn("stopJob failed, jobId: {} is already stopped", jobId); - return; - } - stopJob(job); - } - - public Job getJob(Long jobId) { - return jobMap.get(jobId); - } - - public Map getAllJob() { - return jobMap; - } - - public void batchSchedulerTasks() { - executeJobIdsWithinLastTenMinutesWindow(); - } - - private List findTasksBetweenTime(Job job, Long endTimeEndWindow, Long nextExecuteTime) { - - List jobExecuteTimes = new ArrayList<>(); - if (!job.isCycleJob() && (nextExecuteTime < endTimeEndWindow)) { - jobExecuteTimes.add(nextExecuteTime); - return jobExecuteTimes; - } - if (job.isCycleJob() && (nextExecuteTime > endTimeEndWindow)) { - return new ArrayList<>(); - } - while (endTimeEndWindow >= nextExecuteTime) { - if (job.isTaskTimeExceeded()) { - break; - } - jobExecuteTimes.add(nextExecuteTime); - nextExecuteTime = job.getExecuteTimestampAndGeneratorNext(); - } - return jobExecuteTimes; - } - - /** - * We will get the task in the next time window, and then hand it over to the time wheel for timing trigger - */ - private void executeJobIdsWithinLastTenMinutesWindow() { - // if the task executes for more than 10 minutes, it will be delay, so, - // set lastBatchSchedulerTimestamp to current time - if (lastBatchSchedulerTimestamp < System.currentTimeMillis()) { - this.lastBatchSchedulerTimestamp = System.currentTimeMillis(); - } - this.lastBatchSchedulerTimestamp += BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS; - if (jobMap.isEmpty()) { - return; - } - jobMap.forEach((k, v) -> { - if (v.isRunning() && (v.getNextExecuteTimeMs() - + v.getIntervalMs() < lastBatchSchedulerTimestamp)) { - List executeTimes = findTasksBetweenTime( - v, lastBatchSchedulerTimestamp, - v.getNextExecuteTimeMs()); - if (!executeTimes.isEmpty()) { - for (Long executeTime : executeTimes) { - putOneTask(v.getJobId(), executeTime); - } - } - } - }); - } - - /** - * We will cycle system scheduler tasks every 10 minutes. - * Jobs will be re-registered after the task is completed - */ - private void cycleSystemSchedulerTasks() { - log.info("re-register system scheduler tasks" + TimeUtils.longToTimeString(System.currentTimeMillis())); - dorisTimer.newTimeout(timeout -> { - batchSchedulerTasks(); - clearFinishJob(); - cycleSystemSchedulerTasks(); - }, BATCH_SCHEDULER_INTERVAL_SECONDS, TimeUnit.SECONDS); - - } - - /** - * put one task to time wheel,it's well be trigger after delay milliseconds - * if the scheduler is closed, the task will not be put into the time wheel - * if delay is less than 0, the task will be trigger immediately - * - * @param jobId job id, we will use it to find the job - * @param startExecuteTime the task will be trigger in this time, unit is millisecond,and we will convert it to - * delay seconds, we just can be second precision - */ - public void putOneTask(Long jobId, Long startExecuteTime) { - if (isClosed) { - log.info("putOneTask failed, scheduler is closed, jobId: {}", jobId); - return; - } - long taskId = System.nanoTime(); - TimerJobTask task = new TimerJobTask(jobId, taskId, startExecuteTime, disruptor); - long delay = getDelaySecond(task.getStartTimestamp()); - Timeout timeout = dorisTimer.newTimeout(task, delay, TimeUnit.SECONDS); - if (timeout == null) { - log.error("putOneTask failed, jobId: {}", task.getJobId()); - return; - } - if (jobTimeoutMap.containsKey(task.getJobId())) { - jobTimeoutMap.get(task.getJobId()).put(task.getTaskId(), timeout); - JobTaskManager.addPrepareTaskStartTime(jobId, taskId, startExecuteTime); - return; - } - Map timeoutMap = new ConcurrentHashMap<>(); - timeoutMap.put(task.getTaskId(), timeout); - jobTimeoutMap.put(task.getJobId(), timeoutMap); - JobTaskManager.addPrepareTaskStartTime(jobId, taskId, startExecuteTime); - } - - // cancel all task for one job - // if task has started, it can't be canceled - public void cancelJobAllTask(Long jobId) { - if (!jobTimeoutMap.containsKey(jobId)) { - return; - } - - jobTimeoutMap.get(jobId).values().forEach(timeout -> { - if (!timeout.isExpired() || timeout.isCancelled()) { - timeout.cancel(); - } - }); - JobTaskManager.clearPrepareTaskByJobId(jobId); - } - - // get delay time, if startTimestamp is less than now, return 0 - private long getDelaySecond(long startTimestamp) { - long delay = 0; - long now = System.currentTimeMillis(); - if (startTimestamp > now) { - delay = startTimestamp - now; - } else { - //if execute time is less than now, return 0,immediately execute - log.warn("startTimestamp is less than now, startTimestamp: {}, now: {}", startTimestamp, now); - return delay; - } - return delay / 1000; - } - - @Override - public void close() throws IOException { - isClosed = true; - dorisTimer.stop(); - disruptor.close(); - } - - /** - * sort by job id - * - * @param dbFullName database name - * @param category job category - * @param matcher job name matcher - */ - public List queryJob(String dbFullName, String jobName, JobCategory category, PatternMatcher matcher) { - List jobs = new ArrayList<>(); - jobMap.values().forEach(job -> { - if (matchJob(job, dbFullName, jobName, category, matcher)) { - jobs.add(job); - } - }); - return jobs; - } - - private boolean matchJob(Job job, String dbFullName, String jobName, JobCategory category, PatternMatcher matcher) { - if (StringUtils.isNotBlank(dbFullName) && !job.getDbName().equalsIgnoreCase(dbFullName)) { - return false; - } - if (StringUtils.isNotBlank(jobName) && !job.getJobName().equalsIgnoreCase(jobName)) { - return false; - } - if (category != null && !job.getJobCategory().equals(category)) { - return false; - } - return null == matcher || matcher.match(job.getJobName()); - } - - public void putOneJobToQueen(Long jobId) { - long taskId = System.nanoTime(); - JobTaskManager.addPrepareTaskStartTime(jobId, taskId, System.currentTimeMillis()); - disruptor.tryPublish(jobId, taskId); - } - - @Override - public void write(DataOutput out) throws IOException { - out.writeInt(jobMap.size()); - for (Job job : jobMap.values()) { - job.write(out); - } - } - - /** - * read job from data input, and init job - * - * @param in data input - * @throws IOException io exception when read data input error - */ - public void readFields(DataInput in) throws IOException { - int size = in.readInt(); - for (int i = 0; i < size; i++) { - Job job = Job.readFields(in); - jobMap.putIfAbsent(job.getJobId(), job); - } - } - - /** - * clear finish job,if job finish time is more than @Config.finish_job_max_saved_second, we will delete it - * this method will be called every 10 minutes, therefore, the actual maximum - * deletion time is Config.finish_job_max_saved_second + 10 min. - * we could to delete job in time, but it's not make sense.start - */ - private void clearFinishJob() { - Long now = System.currentTimeMillis(); - jobMap.values().forEach(job -> { - if (job.isFinished() && now - job.getLatestCompleteExecuteTimeMs() > Config.finish_job_max_saved_second) { - jobMap.remove(job.getJobId()); - Env.getCurrentEnv().getEditLog().logDeleteJob(job); - Env.getCurrentEnv().getJobTaskManager().deleteJobTasks(job.getJobId()); - log.debug("delete finish job:{}", job.getJobId()); - } - }); - - } -}