Skip to content

Commit

Permalink
support download full log
Browse files Browse the repository at this point in the history
  • Loading branch information
mayang.ysj authored and mayang.ysj committed Sep 4, 2024
1 parent fd7199b commit b8e7c15
Show file tree
Hide file tree
Showing 14 changed files with 487 additions and 297 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.oceanbase.odc.common.unit;

import javax.validation.constraints.NotNull;

/**
* {@link BinarySizeUnit}
*
Expand Down Expand Up @@ -49,6 +51,15 @@ public enum BinarySizeUnit {
}
}

public static long convertTo(long originalSize, @NotNull BinarySizeUnit originalUnit,
@NotNull BinarySizeUnit targetUnit) {
if (originalUnit.name().equals(targetUnit.name())) {
return originalSize;
}
long factor = (long) Math.pow(2, targetUnit.byteOffset - originalUnit.byteOffset);
return originalSize * factor;
}

public BinarySize of(long sizeDigit) {
return new BinarySize(sizeDigit, this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public SuccessResponse<String> getLog(@PathVariable Long id, @RequestParam OdcTa
@ApiOperation(value = "downloadLog", notes = "下载任务完整日志")
@RequestMapping(value = "/{id:[\\d]+}/tasks/log/download", method = RequestMethod.GET)
public ResponseEntity<InputStreamResource> downloadLog(@PathVariable Long id) throws IOException {
List<BinaryDataResult> results = flowTaskInstanceService.downloadLog(id);
List<BinaryDataResult> results = flowTaskInstanceService.downloadLog(id, false);
PreConditions.validExists(ResourceType.ODC_FILE, "id", id, () -> CollectionUtils.isNotEmpty(results));
return WebResponseUtils.getFileAttachmentResponseEntity(
new InputStreamResource(results.get(0).getInputStream()), TaskLogFilenameGenerator.generate(id));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Set;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.InputStreamResource;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort.Direction;
import org.springframework.data.web.PageableDefault;
Expand All @@ -30,13 +31,16 @@
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import com.oceanbase.odc.core.shared.PreConditions;
import com.oceanbase.odc.core.shared.constant.ResourceType;
import com.oceanbase.odc.core.shared.constant.TaskStatus;
import com.oceanbase.odc.core.shared.exception.UnsupportedException;
import com.oceanbase.odc.service.common.response.ListResponse;
import com.oceanbase.odc.service.common.response.PaginatedResponse;
import com.oceanbase.odc.service.common.response.Responses;
import com.oceanbase.odc.service.common.response.SuccessResponse;
import com.oceanbase.odc.service.dlm.model.RateLimitConfiguration;
import com.oceanbase.odc.service.flow.model.BinaryDataResult;
import com.oceanbase.odc.service.schedule.ScheduleService;
import com.oceanbase.odc.service.schedule.model.CreateScheduleReq;
import com.oceanbase.odc.service.schedule.model.OperationType;
Expand All @@ -55,6 +59,7 @@
import com.oceanbase.odc.service.schedule.model.UpdateScheduleReq;
import com.oceanbase.odc.service.task.model.OdcTaskLogLevel;

import cn.hutool.core.collection.CollUtil;
import io.swagger.annotations.ApiOperation;

/**
Expand Down Expand Up @@ -118,9 +123,18 @@ public SuccessResponse<Boolean> rollbackTask(@PathVariable Long scheduleId, @Pat
method = RequestMethod.GET)
public SuccessResponse<String> getTaskLog(@PathVariable Long scheduleId, @PathVariable Long taskId,
@RequestParam OdcTaskLogLevel logType) {
return Responses.success(scheduleService.getLog(scheduleId, taskId, logType));
return Responses.success(scheduleService.getLog(scheduleId, logType, false));
}

@ApiOperation(value = "DownloadScheduleTaskLog", notes = "下载计划任务全量日志")
@RequestMapping(value = "/schedules/{scheduleId:[\\d]+}/tasks/{taskId:[\\d]+}/log/download",
method = RequestMethod.GET)
public SuccessResponse<InputStreamResource> downloadScheduleTaskLog(@PathVariable Long scheduleId,
@PathVariable Long taskId) {
List<BinaryDataResult> results = scheduleService.downloadLog(scheduleId, false);
PreConditions.validExists(ResourceType.ODC_FILE, "id", taskId, () -> CollUtil.isNotEmpty(results));
return Responses.single(new InputStreamResource(results.get(0).getInputStream()));
}

@RequestMapping(value = "/schedules/{scheduleId:[\\d]+}/tasks/{taskId:[\\d]+}", method = RequestMethod.GET)
public SuccessResponse<ScheduleTaskDetailResp> detailScheduleTask(@PathVariable Long scheduleId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Set;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.InputStreamResource;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort.Direction;
import org.springframework.data.web.PageableDefault;
Expand All @@ -30,11 +31,14 @@
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import com.oceanbase.odc.core.shared.PreConditions;
import com.oceanbase.odc.core.shared.constant.ResourceType;
import com.oceanbase.odc.service.common.response.ListResponse;
import com.oceanbase.odc.service.common.response.PaginatedResponse;
import com.oceanbase.odc.service.common.response.Responses;
import com.oceanbase.odc.service.common.response.SuccessResponse;
import com.oceanbase.odc.service.dlm.DlmLimiterService;
import com.oceanbase.odc.service.flow.model.BinaryDataResult;
import com.oceanbase.odc.service.schedule.ScheduleService;
import com.oceanbase.odc.service.schedule.model.QueryScheduleParams;
import com.oceanbase.odc.service.schedule.model.ScheduleDetailRespHist;
Expand All @@ -44,6 +48,7 @@
import com.oceanbase.odc.service.schedule.model.ScheduleType;
import com.oceanbase.odc.service.task.model.OdcTaskLogLevel;

import cn.hutool.core.collection.CollUtil;
import io.swagger.annotations.ApiOperation;

/**
Expand Down Expand Up @@ -133,7 +138,17 @@ public SuccessResponse<Boolean> rollbackTask(@PathVariable Long scheduleId, @Pat
@RequestMapping(value = "/schedules/{scheduleId:[\\d]+}/tasks/{taskId:[\\d]+}/log", method = RequestMethod.GET)
public SuccessResponse<String> getScheduleTaskLog(@PathVariable Long scheduleId, @PathVariable Long taskId,
@RequestParam OdcTaskLogLevel logType) {
return Responses.single(scheduleService.getLog(scheduleId, taskId, logType));
return Responses.single(scheduleService.getLog(scheduleId, logType, false));
}

@ApiOperation(value = "DownloadScheduleTaskLog", notes = "下载计划任务全量日志")
@RequestMapping(value = "/schedules/{scheduleId:[\\d]+}/tasks/{taskId:[\\d]+}/log/download",
method = RequestMethod.GET)
public SuccessResponse<InputStreamResource> downloadScheduleTaskLog(@PathVariable Long scheduleId,
@PathVariable Long taskId) {
List<BinaryDataResult> results = scheduleService.downloadLog(scheduleId, false);
PreConditions.validExists(ResourceType.ODC_FILE, "id", taskId, () -> CollUtil.isNotEmpty(results));
return Responses.single(new InputStreamResource(results.get(0).getInputStream()));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2023 OceanBase.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.oceanbase.odc.service.config;

import java.io.File;

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

import cn.hutool.core.io.FileUtil;
import lombok.Getter;

@ConfigurationProperties(prefix = "odc.log")
@Component
@Getter
public class LoggerProperty {

// This is the directory path for storing temporary log files from scheduled tasks pods, defaulting
// to the classpath if not explicitly set.
private String tempScheduleTaskLogDir =
FileUtil.normalize(System.getProperty("user.dir") + File.separator + "log/running-job-temp-logs");

private String directory = "./log";

private Long maxLimitedCount = 10000L;

// unit:B
private Long maxSizeCount = 1024L * 1024;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright (c) 2023 OceanBase.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.oceanbase.odc.service.flow;

import java.io.File;
import java.util.Optional;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.fasterxml.jackson.core.type.TypeReference;
import com.oceanbase.odc.common.json.JsonUtils;
import com.oceanbase.odc.core.shared.constant.ErrorCodes;
import com.oceanbase.odc.core.shared.constant.TaskType;
import com.oceanbase.odc.core.shared.exception.NotFoundException;
import com.oceanbase.odc.metadb.task.TaskEntity;
import com.oceanbase.odc.service.common.response.SuccessResponse;
import com.oceanbase.odc.service.config.LoggerProperty;
import com.oceanbase.odc.service.dispatch.DispatchResponse;
import com.oceanbase.odc.service.dispatch.RequestDispatcher;
import com.oceanbase.odc.service.dispatch.TaskDispatchChecker;
import com.oceanbase.odc.service.task.TaskService;
import com.oceanbase.odc.service.task.executor.logger.LogUtils;
import com.oceanbase.odc.service.task.model.ExecutorInfo;
import com.oceanbase.odc.service.task.model.OdcTaskLogLevel;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

/**
* @author mayang
*/
@Service
@Slf4j
public class FlowTaskInstanceLoggerService {

private final FlowTaskInstanceService flowTaskInstanceService;
private final RequestDispatcher requestDispatcher;
private final TaskDispatchChecker dispatchChecker;
private final TaskService taskService;
@Autowired
private LoggerProperty loggerProperty;

public FlowTaskInstanceLoggerService(FlowTaskInstanceService flowTaskInstanceService,
RequestDispatcher requestDispatcher,
TaskDispatchChecker dispatchChecker,
TaskService taskService) {
this.flowTaskInstanceService = flowTaskInstanceService;
this.requestDispatcher = requestDispatcher;
this.dispatchChecker = dispatchChecker;
this.taskService = taskService;
}

@SneakyThrows
public String getLog(OdcTaskLogLevel level, Long flowInstanceId, boolean skipAuth) {
Optional<TaskEntity> taskEntityOptional =
flowTaskInstanceService.getLogDownloadableTaskEntity(flowInstanceId, skipAuth);
if (!taskEntityOptional.isPresent()) {
log.warn("get log failed, flowInstanceId: {}, skipAuth: {}", flowInstanceId, skipAuth);
return "get log failed";
}
TaskEntity taskEntity = taskEntityOptional.get();
if (!dispatchChecker.isTaskEntityOnThisMachine(taskEntity)) {
ExecutorInfo executorInfo = JsonUtils.fromJson(taskEntity.getExecutor(), ExecutorInfo.class);
DispatchResponse response = requestDispatcher.forward(executorInfo.getHost(), executorInfo.getPort());
return response.getContentByType(new TypeReference<SuccessResponse<String>>() {}).getData();
}
return getLog(taskEntity.getCreatorId(), taskEntity.getId() + "", taskEntity.getTaskType(), level);
}

@SneakyThrows
public File downloadLog(Long flowInstanceId, boolean skipAuth) {
Optional<TaskEntity> taskEntityOptional =
flowTaskInstanceService.getLogDownloadableTaskEntity(flowInstanceId, skipAuth);
if (!taskEntityOptional.isPresent()) {
throw new NotFoundException(ErrorCodes.NotFound, new Object[] {flowInstanceId},
ErrorCodes.TaskLogNotFound.getLocalizedMessage(new Object[] {"Id", flowInstanceId}));
}
TaskEntity taskEntity = taskEntityOptional.get();
if (!dispatchChecker.isTaskEntityOnThisMachine(taskEntity)) {
ExecutorInfo executorInfo = JsonUtils.fromJson(taskEntity.getExecutor(), ExecutorInfo.class);
DispatchResponse response = requestDispatcher.forward(executorInfo.getHost(), executorInfo.getPort());
return response.getContentByType(new TypeReference<SuccessResponse<File>>() {}).getData();
}
return getLogFile(taskEntity.getCreatorId(), taskEntity.getId() + "", taskEntity.getTaskType(),
OdcTaskLogLevel.ALL);
}

public File getLogFile(Long userId, String flowInstanceId, TaskType type, OdcTaskLogLevel logLevel) {
try {
return taskService.getLogFile(userId, flowInstanceId, type, logLevel);
} catch (NotFoundException ex) {
log.warn(ErrorCodes.TaskLogNotFound.getLocalizedMessage(new Object[] {"Id", flowInstanceId}));
return null;
}
}

private String getLog(Long userId, String jobId, TaskType type, OdcTaskLogLevel logLevel) {
return LogUtils.getLatestLogContent(getLogFile(userId, jobId, type, logLevel),
loggerProperty.getMaxLimitedCount(),
loggerProperty.getMaxSizeCount());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,11 @@
import com.oceanbase.odc.core.shared.exception.InternalServerError;
import com.oceanbase.odc.core.shared.exception.NotFoundException;
import com.oceanbase.odc.core.shared.exception.UnsupportedException;
import com.oceanbase.odc.metadb.collaboration.EnvironmentRepository;
import com.oceanbase.odc.metadb.flow.FlowInstanceRepository;
import com.oceanbase.odc.metadb.task.TaskEntity;
import com.oceanbase.odc.metadb.task.TaskRepository;
import com.oceanbase.odc.plugin.task.api.datatransfer.model.DataTransferConfig;
import com.oceanbase.odc.plugin.task.api.datatransfer.model.DataTransferTaskResult;
import com.oceanbase.odc.service.collaboration.environment.EnvironmentService;
import com.oceanbase.odc.service.common.FileManager;
import com.oceanbase.odc.service.common.model.FileBucket;
import com.oceanbase.odc.service.common.response.ListResponse;
Expand Down Expand Up @@ -107,7 +105,6 @@
import com.oceanbase.odc.service.permission.table.model.ApplyTableResult;
import com.oceanbase.odc.service.schedule.flowtask.AlterScheduleResult;
import com.oceanbase.odc.service.session.model.SqlExecuteResult;
import com.oceanbase.odc.service.task.TaskLoggerService;
import com.oceanbase.odc.service.task.TaskService;
import com.oceanbase.odc.service.task.config.TaskFrameworkEnabledProperties;
import com.oceanbase.odc.service.task.model.ExecutorInfo;
Expand Down Expand Up @@ -159,11 +156,7 @@ public class FlowTaskInstanceService {
@Autowired
private TaskFrameworkEnabledProperties taskFrameworkProperties;
@Autowired
private TaskLoggerService taskLoggerService;
@Autowired
private EnvironmentRepository environmentRepository;
@Autowired
private EnvironmentService environmentService;
private FlowTaskInstanceLoggerService flowTaskInstanceLoggerService;

@Value("${odc.task.async.result-preview-max-size-bytes:5242880}")
private long resultPreviewMaxSizeBytes;
Expand Down Expand Up @@ -193,42 +186,12 @@ public FlowInstanceDetailResp executeTask(@NotNull Long id) throws IOException {
}

public String getLog(@NotNull Long flowInstanceId, OdcTaskLogLevel level, boolean skipAuth) throws IOException {
Optional<TaskEntity> taskEntityOptional = getLogDownloadableTaskEntity(flowInstanceId, skipAuth);
if (!taskEntityOptional.isPresent()) {
return null;
}
TaskEntity taskEntity = taskEntityOptional.get();
if (taskFrameworkProperties.isEnabled() && taskEntity.getJobId() != null) {
// TODO: get the latest part of log when task framework is enabled @krihy
return taskLoggerService.getLogByTaskFramework(level, taskEntity.getJobId());
}
if (!dispatchChecker.isTaskEntityOnThisMachine(taskEntity)) {
// The task is not executing on current machine, need to forward the request
ExecutorInfo executorInfo = JsonUtils.fromJson(taskEntity.getExecutor(), ExecutorInfo.class);
DispatchResponse response = requestDispatcher.forward(executorInfo.getHost(), executorInfo.getPort());
return response.getContentByType(new TypeReference<SuccessResponse<String>>() {}).getData();
}
return taskService.getLog(taskEntity.getCreatorId(), taskEntity.getId() + "", taskEntity.getTaskType(), level);
return flowTaskInstanceLoggerService.getLog(level, flowInstanceId, skipAuth);
}

public List<BinaryDataResult> downloadLog(@NotNull Long flowInstanceId) throws IOException {
Optional<TaskEntity> taskEntityOptional = getLogDownloadableTaskEntity(flowInstanceId, false);
if (!taskEntityOptional.isPresent() || taskEntityOptional.get().getResultJson() == null) {
return Collections.emptyList();
}
TaskEntity taskEntity = taskEntityOptional.get();
// TODO: download log file when task framework is enabled @krihy
if (!dispatchChecker.isTaskEntityOnThisMachine(taskEntity)) {
// The task is not executing on current machine, need to forward the request
return dispatchRequest(taskEntity);
}
try {
File logFile = taskService.getLogFile(taskEntity.getCreatorId(), taskEntity.getId() + "",
taskEntity.getTaskType(), OdcTaskLogLevel.ALL);
return Collections.singletonList(new FileBasedDataResult(logFile));
} catch (NotFoundException ex) {
return Collections.emptyList();
}
public List<BinaryDataResult> downloadLog(@NotNull Long flowInstanceId, boolean skipAuth) throws IOException {
File logFile = flowTaskInstanceLoggerService.downloadLog(flowInstanceId, skipAuth);
return Collections.singletonList(new FileBasedDataResult(logFile));
}

public List<? extends FlowTaskResult> getResult(@NotNull Long id, boolean skipAuth) throws IOException {
Expand Down Expand Up @@ -754,7 +717,7 @@ private Optional<TaskEntity> getDownloadableTaskEntity(@NonNull Long flowInstanc
}, false);
}

private Optional<TaskEntity> getLogDownloadableTaskEntity(@NotNull Long flowInstanceId, boolean skipAuth) {
public Optional<TaskEntity> getLogDownloadableTaskEntity(@NotNull Long flowInstanceId, boolean skipAuth) {
return getTaskEntity(flowInstanceId,
instance -> (instance.getStatus().isFinalStatus() || instance.getStatus() == FlowNodeStatus.EXECUTING)
&& instance.getTaskType() != TaskType.SQL_CHECK && instance.getTaskType() != TaskType.PRE_CHECK
Expand Down
Loading

0 comments on commit b8e7c15

Please sign in to comment.