Skip to content

Commit

Permalink
chore: 任务调度代码优化
Browse files Browse the repository at this point in the history
  • Loading branch information
Charles7c committed Jul 24, 2024
1 parent eec95a3 commit 797221b
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@
import io.netty.handler.timeout.WriteTimeoutHandler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.client.config.RequestConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.http.codec.json.Jackson2JsonDecoder;
import org.springframework.http.codec.json.Jackson2JsonEncoder;
Expand All @@ -50,9 +48,9 @@
* @author Charles7c
* @since 2024/6/25 18:03
*/
@Slf4j
@Configuration
@RequiredArgsConstructor
@Slf4j
public class HttpExchangeConfiguration {

private final ObjectMapper objectMapper;
Expand Down Expand Up @@ -81,28 +79,29 @@ public JobBatchApi jobBatchApi() {
@Bean
public HttpServiceProxyFactory httpServiceProxyFactory() {
HttpClient httpClient = HttpClient.create()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30000)
.doOnConnected(conn -> {
conn.addHandlerLast(new ReadTimeoutHandler(10));
conn.addHandlerLast(new WriteTimeoutHandler(10));
}).wiretap(true);
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30000)
.doOnConnected(conn -> {
conn.addHandlerLast(new ReadTimeoutHandler(10));
conn.addHandlerLast(new WriteTimeoutHandler(10));
})
.wiretap(true);

WebClient webClient = WebClient.builder()
.codecs(config -> config.defaultCodecs().jackson2JsonEncoder(new Jackson2JsonEncoder(objectMapper)))
.codecs(config -> config.defaultCodecs().jackson2JsonDecoder(new Jackson2JsonDecoder(objectMapper)))
.clientConnector(new ReactorClientHttpConnector(httpClient))
.filter(logRequest())
.filter(logResponse())
.filter((request, next) -> {
// 设置请求头
ClientRequest filtered = ClientRequest.from(request)
.header(JobConstants.NAMESPACE_ID_HEADER, namespace)
.header(JobConstants.AUTH_TOKEN_HEADER, jobClient().getToken())
.build();
return next.exchange(filtered);
})
.baseUrl(baseUrl)
.build();
.codecs(config -> config.defaultCodecs().jackson2JsonEncoder(new Jackson2JsonEncoder(objectMapper)))
.codecs(config -> config.defaultCodecs().jackson2JsonDecoder(new Jackson2JsonDecoder(objectMapper)))
.clientConnector(new ReactorClientHttpConnector(httpClient))
.filter(logRequest())
.filter(logResponse())
.filter((request, next) -> {
// 设置请求头
ClientRequest filtered = ClientRequest.from(request)
.header(JobConstants.NAMESPACE_ID_HEADER, namespace)
.header(JobConstants.AUTH_TOKEN_HEADER, jobClient().getToken())
.build();
return next.exchange(filtered);
})
.baseUrl(baseUrl)
.build();
return HttpServiceProxyFactory.builderFor(WebClientAdapter.create(webClient)).build();
}

Expand All @@ -115,21 +114,21 @@ public JobClient jobClient() {
* 打印请求日志
*/
private ExchangeFilterFunction logRequest() {
return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
log.info("---> {} {}", clientRequest.method(), clientRequest.url());
return Mono.just(clientRequest);
return ExchangeFilterFunction.ofRequestProcessor(request -> {
log.info("---> {} {}", request.method(), request.url());
return Mono.just(request);
});
}

/**
* 打印响应日志
*/
private ExchangeFilterFunction logResponse() {
return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> clientResponse.bodyToMono(String.class).flatMap(body -> {
log.info("<--- {}", clientResponse.statusCode());
log.info("Content-Type:{}", clientResponse.headers().contentType().orElse(MediaType.APPLICATION_JSON));
log.info("body: {}", body);
return Mono.just(ClientResponse.from(clientResponse).body(body).build());
}));
return ExchangeFilterFunction.ofResponseProcessor(response -> response.bodyToMono(String.class)
.flatMap(body -> {
log.info("<--- {}", response.statusCode());
log.info(body);
return Mono.just(ClientResponse.from(response).body(body).build());
}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
import top.continew.admin.job.api.JobClient;
import top.continew.admin.job.model.JobInstanceLogPageResult;
import top.continew.admin.job.model.query.JobInstanceLogQuery;
import top.continew.admin.job.model.query.JobLogQuery;
import top.continew.admin.job.model.query.JobInstanceQuery;
import top.continew.admin.job.model.resp.JobLogResp;
import top.continew.admin.job.model.query.JobLogQuery;
import top.continew.admin.job.model.resp.JobInstanceResp;
import top.continew.admin.job.model.resp.JobLogResp;
import top.continew.admin.job.service.JobLogService;
import top.continew.starter.extension.crud.model.resp.PageResp;

Expand All @@ -49,8 +49,8 @@ public class JobLogServiceImpl implements JobLogService {
@Override
public PageResp<JobLogResp> page(JobLogQuery query) {
return jobClient.requestPage(() -> jobBatchApi.page(query.getJobId(), query.getJobName(), query
.getGroupName(), query.getTaskBatchStatus().getValue(), query.getDatetimeRange(), query.getPage(), query
.getSize()));
.getGroupName(), query.getTaskBatchStatus() != null ? query.getTaskBatchStatus().getValue() : null, query
.getDatetimeRange(), query.getPage(), query.getSize()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ public class JobServiceImpl implements JobService {

@Override
public PageResp<JobResp> page(JobQuery query) {
return jobClient.requestPage(() -> jobApi.page(query.getGroupName(), query.getJobName(), query.getJobStatus()
.getValue(), query.getPage(), query.getSize()));
return jobClient.requestPage(() -> jobApi.page(query.getGroupName(), query.getJobName(), query
.getJobStatus() != null ? query.getJobStatus().getValue() : null, query.getPage(), query.getSize()));
}

@Override
Expand Down

0 comments on commit 797221b

Please sign in to comment.