From c11ef44025e1aa0fa16f42f7ff3582ff7948a221 Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Fri, 15 Dec 2023 12:40:19 +0100 Subject: [PATCH 1/3] #13190 - Improve URL handling for Airflow REST Client --- .../pipeline/airflow/AirflowRESTClient.java | 79 +++++++++++-------- 1 file changed, 44 insertions(+), 35 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/airflow/AirflowRESTClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/airflow/AirflowRESTClient.java index a977a32a194b..805fe88cd15d 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/airflow/AirflowRESTClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/airflow/AirflowRESTClient.java @@ -16,6 +16,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import java.io.IOException; import java.net.URI; +import java.net.URISyntaxException; import java.net.URL; import java.net.http.HttpClient; import java.net.http.HttpRequest; @@ -23,11 +24,15 @@ import java.security.KeyStoreException; import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.stream.Stream; import javax.net.ssl.SSLContext; import javax.ws.rs.core.Response; import lombok.extern.slf4j.Slf4j; +import org.apache.http.client.utils.URIBuilder; import org.json.JSONObject; import org.openmetadata.schema.ServiceEntityInterface; import org.openmetadata.schema.api.configuration.pipelineServiceClient.PipelineServiceClientConfiguration; @@ -57,7 +62,7 @@ public class AirflowRESTClient extends PipelineServiceClient { protected final String password; protected final HttpClient client; protected final URL serviceURL; - private static final String API_ENDPOINT = "api/v1/openmetadata"; + private static final List API_ENDPOINT_SEGMENTS = List.of("api", "v1", "openmetadata"); private static final String DAG_ID = "dag_id"; public AirflowRESTClient(PipelineServiceClientConfiguration config) throws KeyStoreException { @@ -115,8 +120,7 @@ public PipelineServiceClientResponse deployPipeline( IngestionPipeline ingestionPipeline, ServiceEntityInterface service) { HttpResponse response; try { - String deployEndpoint = "%s/%s/deploy"; - String deployUrl = String.format(deployEndpoint, serviceURL, API_ENDPOINT); + String deployUrl = buildURI("deploy").build().toString(); String pipelinePayload = JsonUtils.pojoToJson(ingestionPipeline); response = post(deployUrl, pipelinePayload); if (response.statusCode() == 200) { @@ -140,8 +144,9 @@ public PipelineServiceClientResponse deletePipeline(IngestionPipeline ingestionP String pipelineName = ingestionPipeline.getName(); HttpResponse response; try { - String deleteEndpoint = "%s/%s/delete?dag_id=%s"; - response = deleteRequestAuthenticatedForJsonContent(deleteEndpoint, serviceURL, API_ENDPOINT, pipelineName); + URIBuilder uri = buildURI("delete"); + uri.addParameter(DAG_ID, pipelineName); + response = deleteRequestAuthenticatedForJsonContent(uri.build().toString()); if (response.statusCode() == 200) { return new PipelineServiceClientResponse().withCode(200).withPlatform(this.getPlatform()); } @@ -160,8 +165,7 @@ public PipelineServiceClientResponse runPipeline( String pipelineName = ingestionPipeline.getName(); HttpResponse response; try { - String triggerEndPoint = "%s/%s/trigger"; - String triggerUrl = String.format(triggerEndPoint, serviceURL, API_ENDPOINT); + String triggerUrl = buildURI("trigger").build().toString(); JSONObject requestPayload = new JSONObject(); requestPayload.put(DAG_ID, pipelineName); response = post(triggerUrl, requestPayload.toString()); @@ -183,14 +187,12 @@ public PipelineServiceClientResponse runPipeline( public PipelineServiceClientResponse toggleIngestion(IngestionPipeline ingestionPipeline) { HttpResponse response; try { - String toggleEndPoint; String toggleUrl; JSONObject requestPayload = new JSONObject(); requestPayload.put(DAG_ID, ingestionPipeline.getName()); // If the pipeline is currently enabled, disable it if (ingestionPipeline.getEnabled().equals(Boolean.TRUE)) { - toggleEndPoint = "%s/%s/disable"; - toggleUrl = String.format(toggleEndPoint, serviceURL, API_ENDPOINT); + toggleUrl = buildURI("disable").build().toString(); response = post(toggleUrl, requestPayload.toString()); if (response.statusCode() == 200) { ingestionPipeline.setEnabled(false); @@ -207,8 +209,7 @@ public PipelineServiceClientResponse toggleIngestion(IngestionPipeline ingestion } // otherwise, enable it back } else { - toggleEndPoint = "%s/%s/enable"; - toggleUrl = String.format(toggleEndPoint, serviceURL, API_ENDPOINT); + toggleUrl = buildURI("enable").build().toString(); response = post(toggleUrl, requestPayload.toString()); if (response.statusCode() == 200) { ingestionPipeline.setEnabled(true); @@ -237,9 +238,11 @@ public PipelineServiceClientResponse toggleIngestion(IngestionPipeline ingestion public List getQueuedPipelineStatusInternal(IngestionPipeline ingestionPipeline) { HttpResponse response; try { - String statusEndPoint = "%s/%s/status?dag_id=%s&only_queued=true"; + URIBuilder uri = buildURI("status"); + uri.addParameter(DAG_ID, ingestionPipeline.getName()); + uri.addParameter("only_queued", "true"); response = - getRequestAuthenticatedForJsonContent(statusEndPoint, serviceURL, API_ENDPOINT, ingestionPipeline.getName()); + getRequestAuthenticatedForJsonContent(uri.build().toString()); if (response.statusCode() == 200) { return JsonUtils.readObjects(response.body(), PipelineStatus.class); } @@ -261,7 +264,8 @@ public List getQueuedPipelineStatusInternal(IngestionPipeline in public PipelineServiceClientResponse getServiceStatusInternal() { HttpResponse response; try { - response = getRequestAuthenticatedForJsonContent("%s/%s/health-auth", serviceURL, API_ENDPOINT); + String healthUrl = buildURI("health-auth").build().toString(); + response = getRequestAuthenticatedForJsonContent(healthUrl); // We can reach the APIs and get the status back from Airflow if (response.statusCode() == 200) { @@ -298,8 +302,7 @@ public PipelineServiceClientResponse getServiceStatusInternal() { public PipelineServiceClientResponse runAutomationsWorkflow(Workflow workflow) { HttpResponse response; try { - String automationsEndpoint = "%s/%s/run_automation"; - String automationsUrl = String.format(automationsEndpoint, serviceURL, API_ENDPOINT); + String automationsUrl = buildURI("run_automation").build().toString(); String workflowPayload = JsonUtils.pojoToJson(workflow); response = post(automationsUrl, workflowPayload); if (response.statusCode() == 200) { @@ -333,8 +336,7 @@ private PipelineServiceClientResponse sendPost(String endpoint, Object request) HttpResponse response; String workflowPayload = JsonUtils.pojoToJson(request); try { - String automationsEndpoint = "%s/%s/%s"; - String automationsUrl = String.format(automationsEndpoint, serviceURL, API_ENDPOINT, endpoint); + String automationsUrl = buildURI(endpoint).build().toString(); response = post(automationsUrl, workflowPayload); if (response.statusCode() == 200) { return new PipelineServiceClientResponse() @@ -355,8 +357,7 @@ private PipelineServiceClientResponse sendPost(String endpoint, Object request) public PipelineServiceClientResponse killIngestion(IngestionPipeline ingestionPipeline) { HttpResponse response; try { - String killEndPoint = "%s/%s/kill"; - String killUrl = String.format(killEndPoint, serviceURL, API_ENDPOINT); + String killUrl = buildURI("kill").build().toString(); JSONObject requestPayload = new JSONObject(); requestPayload.put(DAG_ID, ingestionPipeline.getName()); response = post(killUrl, requestPayload.toString()); @@ -377,7 +378,7 @@ public PipelineServiceClientResponse killIngestion(IngestionPipeline ingestionPi public Map requestGetHostIp() { HttpResponse response; try { - response = getRequestAuthenticatedForJsonContent("%s/%s/ip", serviceURL, API_ENDPOINT); + response = getRequestAuthenticatedForJsonContent(buildURI("ip").build().toString()); if (response.statusCode() == 200) { return JsonUtils.readValue(response.body(), new TypeReference<>() {}); } @@ -393,15 +394,16 @@ public Map getLastIngestionLogs(IngestionPipeline ingestionPipel HttpResponse response; String taskId = TYPE_TO_TASK.get(ingestionPipeline.getPipelineType().toString()); // Init empty after query param - String afterParam = ""; + + URIBuilder uri = buildURI("last_dag_logs"); if (after != null) { - afterParam = String.format("&after=%s", after); + uri.addParameter("after", after); } + uri.addParameter(DAG_ID, ingestionPipeline.getName()); + uri.addParameter("task_id", taskId); try { response = - getRequestAuthenticatedForJsonContent( - "%s/%s/last_dag_logs?dag_id=%s&task_id=%s%s", - serviceURL, API_ENDPOINT, ingestionPipeline.getName(), taskId, afterParam); + getRequestAuthenticatedForJsonContent(uri.build().toString()); if (response.statusCode() == 200) { return JsonUtils.readValue(response.body(), new TypeReference<>() {}); } @@ -412,20 +414,27 @@ public Map getLastIngestionLogs(IngestionPipeline ingestionPipel String.format("Failed to get last ingestion logs due to %s", response.body())); } - private HttpResponse getRequestAuthenticatedForJsonContent( - String stringUrlFormat, Object... stringReplacement) throws IOException, InterruptedException { - HttpRequest request = authenticatedRequestBuilder(stringUrlFormat, stringReplacement).GET().build(); + private URIBuilder buildURI(String path) { + try { + List pathInternal = new ArrayList<>(API_ENDPOINT_SEGMENTS); + pathInternal.add(path); + return new URIBuilder(String.valueOf(serviceURL)).setPathSegments(pathInternal); + } catch (Exception e) { + throw PipelineServiceClientException.byMessage(String.format("Failed to built request URI for path [%s].", path), e.getMessage()); + } + } + + private HttpResponse getRequestAuthenticatedForJsonContent(String url) throws IOException, InterruptedException { + HttpRequest request = authenticatedRequestBuilder(url).GET().build(); return client.send(request, HttpResponse.BodyHandlers.ofString()); } - private HttpResponse deleteRequestAuthenticatedForJsonContent( - String stringUrlFormat, Object... stringReplacement) throws IOException, InterruptedException { - HttpRequest request = authenticatedRequestBuilder(stringUrlFormat, stringReplacement).DELETE().build(); + private HttpResponse deleteRequestAuthenticatedForJsonContent(String url) throws IOException, InterruptedException { + HttpRequest request = authenticatedRequestBuilder(url).DELETE().build(); return client.send(request, HttpResponse.BodyHandlers.ofString()); } - private HttpRequest.Builder authenticatedRequestBuilder(String stringUrlFormat, Object... stringReplacement) { - String url = String.format(stringUrlFormat, stringReplacement); + private HttpRequest.Builder authenticatedRequestBuilder(String url) { return HttpRequest.newBuilder(URI.create(url)) .header(CONTENT_HEADER, CONTENT_TYPE) .header(AUTH_HEADER, getBasicAuthenticationHeader(username, password)); From 466c4bc1c561859e47ee723707660bc38c0934ee Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Fri, 15 Dec 2023 13:14:03 +0100 Subject: [PATCH 2/3] Fmt --- .../pipeline/airflow/AirflowRESTClient.java | 21 ++++++++----------- 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/airflow/AirflowRESTClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/airflow/AirflowRESTClient.java index 805fe88cd15d..c4431472f650 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/airflow/AirflowRESTClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/airflow/AirflowRESTClient.java @@ -16,7 +16,6 @@ import com.fasterxml.jackson.core.type.TypeReference; import java.io.IOException; import java.net.URI; -import java.net.URISyntaxException; import java.net.URL; import java.net.http.HttpClient; import java.net.http.HttpRequest; @@ -24,11 +23,8 @@ import java.security.KeyStoreException; import java.time.Duration; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.stream.Stream; import javax.net.ssl.SSLContext; import javax.ws.rs.core.Response; import lombok.extern.slf4j.Slf4j; @@ -165,7 +161,7 @@ public PipelineServiceClientResponse runPipeline( String pipelineName = ingestionPipeline.getName(); HttpResponse response; try { - String triggerUrl = buildURI("trigger").build().toString(); + String triggerUrl = buildURI("trigger").build().toString(); JSONObject requestPayload = new JSONObject(); requestPayload.put(DAG_ID, pipelineName); response = post(triggerUrl, requestPayload.toString()); @@ -241,8 +237,7 @@ public List getQueuedPipelineStatusInternal(IngestionPipeline in URIBuilder uri = buildURI("status"); uri.addParameter(DAG_ID, ingestionPipeline.getName()); uri.addParameter("only_queued", "true"); - response = - getRequestAuthenticatedForJsonContent(uri.build().toString()); + response = getRequestAuthenticatedForJsonContent(uri.build().toString()); if (response.statusCode() == 200) { return JsonUtils.readObjects(response.body(), PipelineStatus.class); } @@ -402,8 +397,7 @@ public Map getLastIngestionLogs(IngestionPipeline ingestionPipel uri.addParameter(DAG_ID, ingestionPipeline.getName()); uri.addParameter("task_id", taskId); try { - response = - getRequestAuthenticatedForJsonContent(uri.build().toString()); + response = getRequestAuthenticatedForJsonContent(uri.build().toString()); if (response.statusCode() == 200) { return JsonUtils.readValue(response.body(), new TypeReference<>() {}); } @@ -420,16 +414,19 @@ private URIBuilder buildURI(String path) { pathInternal.add(path); return new URIBuilder(String.valueOf(serviceURL)).setPathSegments(pathInternal); } catch (Exception e) { - throw PipelineServiceClientException.byMessage(String.format("Failed to built request URI for path [%s].", path), e.getMessage()); + throw PipelineServiceClientException.byMessage( + String.format("Failed to built request URI for path [%s].", path), e.getMessage()); } } - private HttpResponse getRequestAuthenticatedForJsonContent(String url) throws IOException, InterruptedException { + private HttpResponse getRequestAuthenticatedForJsonContent(String url) + throws IOException, InterruptedException { HttpRequest request = authenticatedRequestBuilder(url).GET().build(); return client.send(request, HttpResponse.BodyHandlers.ofString()); } - private HttpResponse deleteRequestAuthenticatedForJsonContent(String url) throws IOException, InterruptedException { + private HttpResponse deleteRequestAuthenticatedForJsonContent(String url) + throws IOException, InterruptedException { HttpRequest request = authenticatedRequestBuilder(url).DELETE().build(); return client.send(request, HttpResponse.BodyHandlers.ofString()); } From 578ddd9f8f8d2014880560003121b85e1cbeb9a0 Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Tue, 19 Dec 2023 07:08:29 +0100 Subject: [PATCH 3/3] Format --- .../pipeline/airflow/AirflowRESTClient.java | 77 +++++++++++++------ .../glossary/GlossaryTermResourceTest.java | 3 +- 2 files changed, 56 insertions(+), 24 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/airflow/AirflowRESTClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/airflow/AirflowRESTClient.java index c4431472f650..9bb5703945ab 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/airflow/AirflowRESTClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/airflow/AirflowRESTClient.java @@ -77,7 +77,8 @@ public AirflowRESTClient(PipelineServiceClientConfiguration config) throws KeySt HttpClient.newBuilder() .version(HttpClient.Version.HTTP_1_1) .connectTimeout( - Duration.ofSeconds((Integer) config.getParameters().getAdditionalProperties().get(TIMEOUT_KEY))); + Duration.ofSeconds( + (Integer) config.getParameters().getAdditionalProperties().get(TIMEOUT_KEY))); if (sslContext == null) { this.client = clientBuilder.build(); @@ -89,8 +90,10 @@ public AirflowRESTClient(PipelineServiceClientConfiguration config) throws KeySt private static SSLContext createAirflowSSLContext(PipelineServiceClientConfiguration config) throws KeyStoreException { - String truststorePath = (String) config.getParameters().getAdditionalProperties().get(TRUSTSTORE_PATH_KEY); - String truststorePassword = (String) config.getParameters().getAdditionalProperties().get(TRUSTSTORE_PASSWORD_KEY); + String truststorePath = + (String) config.getParameters().getAdditionalProperties().get(TRUSTSTORE_PATH_KEY); + String truststorePassword = + (String) config.getParameters().getAdditionalProperties().get(TRUSTSTORE_PASSWORD_KEY); return SSLUtil.createSSLContext(truststorePath, truststorePassword, PLATFORM); } @@ -107,7 +110,8 @@ public final HttpResponse post(String endpoint, String payload, boolean return client.send(requestBuilder.build(), HttpResponse.BodyHandlers.ofString()); } - public final HttpResponse post(String endpoint, String payload) throws IOException, InterruptedException { + public final HttpResponse post(String endpoint, String payload) + throws IOException, InterruptedException { return post(endpoint, payload, true); } @@ -127,12 +131,15 @@ public PipelineServiceClientResponse deployPipeline( .withPlatform(this.getPlatform()); } } catch (Exception e) { - throw IngestionPipelineDeploymentException.byMessage(ingestionPipeline.getName(), e.getMessage()); + throw IngestionPipelineDeploymentException.byMessage( + ingestionPipeline.getName(), e.getMessage()); } throw new PipelineServiceClientException( String.format( "%s Failed to deploy Ingestion Pipeline due to airflow API returned %s and response %s", - ingestionPipeline.getName(), Response.Status.fromStatusCode(response.statusCode()), response.body())); + ingestionPipeline.getName(), + Response.Status.fromStatusCode(response.statusCode()), + response.body())); } @Override @@ -147,11 +154,13 @@ public PipelineServiceClientResponse deletePipeline(IngestionPipeline ingestionP return new PipelineServiceClientResponse().withCode(200).withPlatform(this.getPlatform()); } } catch (Exception e) { - LOG.error(String.format("Failed to delete Airflow Pipeline %s from Airflow DAGS", pipelineName)); + LOG.error( + String.format("Failed to delete Airflow Pipeline %s from Airflow DAGS", pipelineName)); } return new PipelineServiceClientResponse() .withCode(500) - .withReason(String.format("Failed to delete Airflow Pipeline %s from Airflow DAGS", pipelineName)) + .withReason( + String.format("Failed to delete Airflow Pipeline %s from Airflow DAGS", pipelineName)) .withPlatform(this.getPlatform()); } @@ -176,7 +185,9 @@ public PipelineServiceClientResponse runPipeline( } throw IngestionPipelineDeploymentException.byMessage( - pipelineName, "Failed to trigger IngestionPipeline", Response.Status.fromStatusCode(response.statusCode())); + pipelineName, + "Failed to trigger IngestionPipeline", + Response.Status.fromStatusCode(response.statusCode())); } @Override @@ -247,7 +258,8 @@ public List getQueuedPipelineStatusInternal(IngestionPipeline in // Return an empty list. We'll just show the stored status from the Ingestion Pipeline LOG.error( String.format( - "Got status code [%s] trying to get queued statuses: [%s]", response.statusCode(), response.body())); + "Got status code [%s] trying to get queued statuses: [%s]", + response.statusCode(), response.body())); return new ArrayList<>(); } @@ -270,26 +282,33 @@ public PipelineServiceClientResponse getServiceStatusInternal() { if (Boolean.TRUE.equals(validServerClientVersions(ingestionVersion))) { return buildHealthyStatus(ingestionVersion); } else { - return buildUnhealthyStatus(buildVersionMismatchErrorMessage(ingestionVersion, SERVER_VERSION)); + return buildUnhealthyStatus( + buildVersionMismatchErrorMessage(ingestionVersion, SERVER_VERSION)); } } // Auth error when accessing the APIs if (response.statusCode() == 401 || response.statusCode() == 403) { return buildUnhealthyStatus( - String.format("Authentication failed for user [%s] trying to access the Airflow APIs.", this.username)); + String.format( + "Authentication failed for user [%s] trying to access the Airflow APIs.", + this.username)); } // APIs URL not found if (response.statusCode() == 404) { - return buildUnhealthyStatus("Airflow APIs not found. Please follow the installation guide."); + return buildUnhealthyStatus( + "Airflow APIs not found. Please follow the installation guide."); } return buildUnhealthyStatus( - String.format("Unexpected status response: code [%s] - [%s]", response.statusCode(), response.body())); + String.format( + "Unexpected status response: code [%s] - [%s]", + response.statusCode(), response.body())); } catch (Exception e) { - return buildUnhealthyStatus(String.format("Failed to get REST status due to [%s].", e.getMessage())); + return buildUnhealthyStatus( + String.format("Failed to get REST status due to [%s].", e.getMessage())); } } @@ -312,7 +331,9 @@ public PipelineServiceClientResponse runAutomationsWorkflow(Workflow workflow) { throw new PipelineServiceClientException( String.format( "%s Failed to trigger workflow due to airflow API returned %s and response %s", - workflow.getName(), Response.Status.fromStatusCode(response.statusCode()), response.body())); + workflow.getName(), + Response.Status.fromStatusCode(response.statusCode()), + response.body())); } @Override @@ -321,8 +342,12 @@ public PipelineServiceClientResponse runApplicationFlow(App application) { } @Override - public PipelineServiceClientResponse validateAppRegistration(AppMarketPlaceDefinition appMarketPlaceDefinition) { - return new PipelineServiceClientResponse().withCode(200).withReason("Success").withPlatform(this.getPlatform()); + public PipelineServiceClientResponse validateAppRegistration( + AppMarketPlaceDefinition appMarketPlaceDefinition) { + return new PipelineServiceClientResponse() + .withCode(200) + .withReason("Success") + .withPlatform(this.getPlatform()); // TODO: Currently only internal apps are available, external apps will need this validation // return sendPost(APP_VALIDATE, appMarketPlaceDefinition); } @@ -345,7 +370,9 @@ private PipelineServiceClientResponse sendPost(String endpoint, Object request) throw new PipelineServiceClientException( String.format( "%s Failed to trigger flow due to airflow API returned %s and response %s", - workflowPayload, Response.Status.fromStatusCode(response.statusCode()), response.body())); + workflowPayload, + Response.Status.fromStatusCode(response.statusCode()), + response.body())); } @Override @@ -363,7 +390,8 @@ public PipelineServiceClientResponse killIngestion(IngestionPipeline ingestionPi .withPlatform(this.getPlatform()); } } catch (Exception e) { - throw PipelineServiceClientException.byMessage("Failed to kill running workflows", e.getMessage()); + throw PipelineServiceClientException.byMessage( + "Failed to kill running workflows", e.getMessage()); } throw new PipelineServiceClientException( String.format("Failed to kill running workflows due to %s", response.body())); @@ -378,14 +406,16 @@ public Map requestGetHostIp() { return JsonUtils.readValue(response.body(), new TypeReference<>() {}); } } catch (Exception e) { - throw PipelineServiceClientException.byMessage("Failed to get Pipeline Service host IP.", e.getMessage()); + throw PipelineServiceClientException.byMessage( + "Failed to get Pipeline Service host IP.", e.getMessage()); } throw new PipelineServiceClientException( String.format("Failed to get Pipeline Service host IP due to %s", response.body())); } @Override - public Map getLastIngestionLogs(IngestionPipeline ingestionPipeline, String after) { + public Map getLastIngestionLogs( + IngestionPipeline ingestionPipeline, String after) { HttpResponse response; String taskId = TYPE_TO_TASK.get(ingestionPipeline.getPipelineType().toString()); // Init empty after query param @@ -402,7 +432,8 @@ public Map getLastIngestionLogs(IngestionPipeline ingestionPipel return JsonUtils.readValue(response.body(), new TypeReference<>() {}); } } catch (Exception e) { - throw PipelineServiceClientException.byMessage("Failed to get last ingestion logs.", e.getMessage()); + throw PipelineServiceClientException.byMessage( + "Failed to get last ingestion logs.", e.getMessage()); } throw new PipelineServiceClientException( String.format("Failed to get last ingestion logs due to %s", response.body())); diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/resources/glossary/GlossaryTermResourceTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/resources/glossary/GlossaryTermResourceTest.java index 0f05dcfb0b37..c72a66052757 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/resources/glossary/GlossaryTermResourceTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/resources/glossary/GlossaryTermResourceTest.java @@ -508,7 +508,8 @@ void createGlossaryTerm_LanguageTest(TestInfo test) throws IOException { for (String name : getAllHelloWorldTranslations()) { CreateGlossaryTerm create = createRequest(name); GlossaryTerm createdEntity = createEntity(create, ADMIN_AUTH_HEADERS); - GlossaryTerm glossaryGet = getEntityByName(createdEntity.getFullyQualifiedName(), ADMIN_AUTH_HEADERS); + GlossaryTerm glossaryGet = + getEntityByName(createdEntity.getFullyQualifiedName(), ADMIN_AUTH_HEADERS); assertEquals(name, glossaryGet.getName()); } }