diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/airflow/AirflowRESTClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/airflow/AirflowRESTClient.java index fbca8c5c8a47..9bb5703945ab 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/airflow/AirflowRESTClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/airflow/AirflowRESTClient.java @@ -28,6 +28,7 @@ import javax.net.ssl.SSLContext; import javax.ws.rs.core.Response; import lombok.extern.slf4j.Slf4j; +import org.apache.http.client.utils.URIBuilder; import org.json.JSONObject; import org.openmetadata.schema.ServiceEntityInterface; import org.openmetadata.schema.api.configuration.pipelineServiceClient.PipelineServiceClientConfiguration; @@ -57,7 +58,7 @@ public class AirflowRESTClient extends PipelineServiceClient { protected final String password; protected final HttpClient client; protected final URL serviceURL; - private static final String API_ENDPOINT = "api/v1/openmetadata"; + private static final List API_ENDPOINT_SEGMENTS = List.of("api", "v1", "openmetadata"); private static final String DAG_ID = "dag_id"; public AirflowRESTClient(PipelineServiceClientConfiguration config) throws KeyStoreException { @@ -119,8 +120,7 @@ public PipelineServiceClientResponse deployPipeline( IngestionPipeline ingestionPipeline, ServiceEntityInterface service) { HttpResponse response; try { - String deployEndpoint = "%s/%s/deploy"; - String deployUrl = String.format(deployEndpoint, serviceURL, API_ENDPOINT); + String deployUrl = buildURI("deploy").build().toString(); String pipelinePayload = JsonUtils.pojoToJson(ingestionPipeline); response = post(deployUrl, pipelinePayload); if (response.statusCode() == 200) { @@ -147,10 +147,9 @@ public PipelineServiceClientResponse deletePipeline(IngestionPipeline ingestionP String pipelineName = ingestionPipeline.getName(); HttpResponse response; try { - String deleteEndpoint = "%s/%s/delete?dag_id=%s"; - response = - deleteRequestAuthenticatedForJsonContent( - deleteEndpoint, serviceURL, API_ENDPOINT, pipelineName); + URIBuilder uri = buildURI("delete"); + uri.addParameter(DAG_ID, pipelineName); + response = deleteRequestAuthenticatedForJsonContent(uri.build().toString()); if (response.statusCode() == 200) { return new PipelineServiceClientResponse().withCode(200).withPlatform(this.getPlatform()); } @@ -171,8 +170,7 @@ public PipelineServiceClientResponse runPipeline( String pipelineName = ingestionPipeline.getName(); HttpResponse response; try { - String triggerEndPoint = "%s/%s/trigger"; - String triggerUrl = String.format(triggerEndPoint, serviceURL, API_ENDPOINT); + String triggerUrl = buildURI("trigger").build().toString(); JSONObject requestPayload = new JSONObject(); requestPayload.put(DAG_ID, pipelineName); response = post(triggerUrl, requestPayload.toString()); @@ -196,14 +194,12 @@ public PipelineServiceClientResponse runPipeline( public PipelineServiceClientResponse toggleIngestion(IngestionPipeline ingestionPipeline) { HttpResponse response; try { - String toggleEndPoint; String toggleUrl; JSONObject requestPayload = new JSONObject(); requestPayload.put(DAG_ID, ingestionPipeline.getName()); // If the pipeline is currently enabled, disable it if (ingestionPipeline.getEnabled().equals(Boolean.TRUE)) { - toggleEndPoint = "%s/%s/disable"; - toggleUrl = String.format(toggleEndPoint, serviceURL, API_ENDPOINT); + toggleUrl = buildURI("disable").build().toString(); response = post(toggleUrl, requestPayload.toString()); if (response.statusCode() == 200) { ingestionPipeline.setEnabled(false); @@ -220,8 +216,7 @@ public PipelineServiceClientResponse toggleIngestion(IngestionPipeline ingestion } // otherwise, enable it back } else { - toggleEndPoint = "%s/%s/enable"; - toggleUrl = String.format(toggleEndPoint, serviceURL, API_ENDPOINT); + toggleUrl = buildURI("enable").build().toString(); response = post(toggleUrl, requestPayload.toString()); if (response.statusCode() == 200) { ingestionPipeline.setEnabled(true); @@ -250,10 +245,10 @@ public PipelineServiceClientResponse toggleIngestion(IngestionPipeline ingestion public List getQueuedPipelineStatusInternal(IngestionPipeline ingestionPipeline) { HttpResponse response; try { - String statusEndPoint = "%s/%s/status?dag_id=%s&only_queued=true"; - response = - getRequestAuthenticatedForJsonContent( - statusEndPoint, serviceURL, API_ENDPOINT, ingestionPipeline.getName()); + URIBuilder uri = buildURI("status"); + uri.addParameter(DAG_ID, ingestionPipeline.getName()); + uri.addParameter("only_queued", "true"); + response = getRequestAuthenticatedForJsonContent(uri.build().toString()); if (response.statusCode() == 200) { return JsonUtils.readObjects(response.body(), PipelineStatus.class); } @@ -276,8 +271,8 @@ public List getQueuedPipelineStatusInternal(IngestionPipeline in public PipelineServiceClientResponse getServiceStatusInternal() { HttpResponse response; try { - response = - getRequestAuthenticatedForJsonContent("%s/%s/health-auth", serviceURL, API_ENDPOINT); + String healthUrl = buildURI("health-auth").build().toString(); + response = getRequestAuthenticatedForJsonContent(healthUrl); // We can reach the APIs and get the status back from Airflow if (response.statusCode() == 200) { @@ -321,8 +316,7 @@ public PipelineServiceClientResponse getServiceStatusInternal() { public PipelineServiceClientResponse runAutomationsWorkflow(Workflow workflow) { HttpResponse response; try { - String automationsEndpoint = "%s/%s/run_automation"; - String automationsUrl = String.format(automationsEndpoint, serviceURL, API_ENDPOINT); + String automationsUrl = buildURI("run_automation").build().toString(); String workflowPayload = JsonUtils.pojoToJson(workflow); response = post(automationsUrl, workflowPayload); if (response.statusCode() == 200) { @@ -362,9 +356,7 @@ private PipelineServiceClientResponse sendPost(String endpoint, Object request) HttpResponse response; String workflowPayload = JsonUtils.pojoToJson(request); try { - String automationsEndpoint = "%s/%s/%s"; - String automationsUrl = - String.format(automationsEndpoint, serviceURL, API_ENDPOINT, endpoint); + String automationsUrl = buildURI(endpoint).build().toString(); response = post(automationsUrl, workflowPayload); if (response.statusCode() == 200) { return new PipelineServiceClientResponse() @@ -387,8 +379,7 @@ private PipelineServiceClientResponse sendPost(String endpoint, Object request) public PipelineServiceClientResponse killIngestion(IngestionPipeline ingestionPipeline) { HttpResponse response; try { - String killEndPoint = "%s/%s/kill"; - String killUrl = String.format(killEndPoint, serviceURL, API_ENDPOINT); + String killUrl = buildURI("kill").build().toString(); JSONObject requestPayload = new JSONObject(); requestPayload.put(DAG_ID, ingestionPipeline.getName()); response = post(killUrl, requestPayload.toString()); @@ -410,7 +401,7 @@ public PipelineServiceClientResponse killIngestion(IngestionPipeline ingestionPi public Map requestGetHostIp() { HttpResponse response; try { - response = getRequestAuthenticatedForJsonContent("%s/%s/ip", serviceURL, API_ENDPOINT); + response = getRequestAuthenticatedForJsonContent(buildURI("ip").build().toString()); if (response.statusCode() == 200) { return JsonUtils.readValue(response.body(), new TypeReference<>() {}); } @@ -428,15 +419,15 @@ public Map getLastIngestionLogs( HttpResponse response; String taskId = TYPE_TO_TASK.get(ingestionPipeline.getPipelineType().toString()); // Init empty after query param - String afterParam = ""; + + URIBuilder uri = buildURI("last_dag_logs"); if (after != null) { - afterParam = String.format("&after=%s", after); + uri.addParameter("after", after); } + uri.addParameter(DAG_ID, ingestionPipeline.getName()); + uri.addParameter("task_id", taskId); try { - response = - getRequestAuthenticatedForJsonContent( - "%s/%s/last_dag_logs?dag_id=%s&task_id=%s%s", - serviceURL, API_ENDPOINT, ingestionPipeline.getName(), taskId, afterParam); + response = getRequestAuthenticatedForJsonContent(uri.build().toString()); if (response.statusCode() == 200) { return JsonUtils.readValue(response.body(), new TypeReference<>() {}); } @@ -448,25 +439,30 @@ public Map getLastIngestionLogs( String.format("Failed to get last ingestion logs due to %s", response.body())); } - private HttpResponse getRequestAuthenticatedForJsonContent( - String stringUrlFormat, Object... stringReplacement) + private URIBuilder buildURI(String path) { + try { + List pathInternal = new ArrayList<>(API_ENDPOINT_SEGMENTS); + pathInternal.add(path); + return new URIBuilder(String.valueOf(serviceURL)).setPathSegments(pathInternal); + } catch (Exception e) { + throw PipelineServiceClientException.byMessage( + String.format("Failed to built request URI for path [%s].", path), e.getMessage()); + } + } + + private HttpResponse getRequestAuthenticatedForJsonContent(String url) throws IOException, InterruptedException { - HttpRequest request = - authenticatedRequestBuilder(stringUrlFormat, stringReplacement).GET().build(); + HttpRequest request = authenticatedRequestBuilder(url).GET().build(); return client.send(request, HttpResponse.BodyHandlers.ofString()); } - private HttpResponse deleteRequestAuthenticatedForJsonContent( - String stringUrlFormat, Object... stringReplacement) + private HttpResponse deleteRequestAuthenticatedForJsonContent(String url) throws IOException, InterruptedException { - HttpRequest request = - authenticatedRequestBuilder(stringUrlFormat, stringReplacement).DELETE().build(); + HttpRequest request = authenticatedRequestBuilder(url).DELETE().build(); return client.send(request, HttpResponse.BodyHandlers.ofString()); } - private HttpRequest.Builder authenticatedRequestBuilder( - String stringUrlFormat, Object... stringReplacement) { - String url = String.format(stringUrlFormat, stringReplacement); + private HttpRequest.Builder authenticatedRequestBuilder(String url) { return HttpRequest.newBuilder(URI.create(url)) .header(CONTENT_HEADER, CONTENT_TYPE) .header(AUTH_HEADER, getBasicAuthenticationHeader(username, password)); diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/resources/glossary/GlossaryTermResourceTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/resources/glossary/GlossaryTermResourceTest.java index 0f05dcfb0b37..c72a66052757 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/resources/glossary/GlossaryTermResourceTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/resources/glossary/GlossaryTermResourceTest.java @@ -508,7 +508,8 @@ void createGlossaryTerm_LanguageTest(TestInfo test) throws IOException { for (String name : getAllHelloWorldTranslations()) { CreateGlossaryTerm create = createRequest(name); GlossaryTerm createdEntity = createEntity(create, ADMIN_AUTH_HEADERS); - GlossaryTerm glossaryGet = getEntityByName(createdEntity.getFullyQualifiedName(), ADMIN_AUTH_HEADERS); + GlossaryTerm glossaryGet = + getEntityByName(createdEntity.getFullyQualifiedName(), ADMIN_AUTH_HEADERS); assertEquals(name, glossaryGet.getName()); } }