diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/JobMetrics.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/JobMetrics.java index 7c52013a6d8..a9f8c80548f 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/JobMetrics.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/JobMetrics.java @@ -149,7 +149,7 @@ public String toJsonString() { ObjectMapper objectMapper = new ObjectMapper(); objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false); try { - return objectMapper.writeValueAsString(this.metrics); + return objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(this.metrics); } catch (JsonProcessingException e) { ObjectNode objectNode = objectMapper.createObjectNode(); objectNode.put("err", "serialize JobMetrics err"); diff --git a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java index 920e0162e95..eb87196d6e8 100644 --- a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java +++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java @@ -53,6 +53,10 @@ public class ClientCommandArgs extends AbstractCommandArgs { description = "Cancel job by JobId") private String cancelJobId; + @Parameter(names = {"-m", "--metrics"}, + description = "Get job metrics by JobId") + private String metricsJobId; + @Parameter(names = {"-l", "--list"}, description = "list job status") private boolean listJob = false; @@ -89,6 +93,10 @@ public String getCancelJobId() { return cancelJobId; } + public String getMetricsJobId() { + return metricsJobId; + } + public boolean isListJob(){ return listJob; } diff --git a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java index c511b0514f4..08a98289d2f 100644 --- a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java +++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java @@ -77,6 +77,9 @@ public void execute() throws CommandExecuteException { System.out.println(jobState); } else if (null != clientCommandArgs.getCancelJobId()) { engineClient.cancelJob(Long.parseLong(clientCommandArgs.getCancelJobId())); + } else if (null != clientCommandArgs.getMetricsJobId()) { + String jobMetrics = engineClient.getJobMetrics(Long.parseLong(clientCommandArgs.getMetricsJobId())); + System.out.println(jobMetrics); } else { Path configFile = FileUtils.getConfigPath(clientCommandArgs); checkConfigExist(configFile); @@ -87,6 +90,8 @@ public void execute() throws CommandExecuteException { ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); clientJobProxy.waitForJobComplete(); + long jobId = clientJobProxy.getJobId(); + System.out.println(engineClient.getJobMetrics(jobId)); } } catch (ExecutionException | InterruptedException e) { throw new CommandExecuteException("SeaTunnel job executed failed", e);