Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#13190 - Improve URL handling for Airflow REST Client #14403

Merged
merged 5 commits into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import javax.net.ssl.SSLContext;
import javax.ws.rs.core.Response;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.client.utils.URIBuilder;
import org.json.JSONObject;
import org.openmetadata.schema.ServiceEntityInterface;
import org.openmetadata.schema.api.configuration.pipelineServiceClient.PipelineServiceClientConfiguration;
Expand Down Expand Up @@ -57,7 +58,7 @@ public class AirflowRESTClient extends PipelineServiceClient {
protected final String password;
protected final HttpClient client;
protected final URL serviceURL;
private static final String API_ENDPOINT = "api/v1/openmetadata";
private static final List<String> API_ENDPOINT_SEGMENTS = List.of("api", "v1", "openmetadata");
private static final String DAG_ID = "dag_id";

public AirflowRESTClient(PipelineServiceClientConfiguration config) throws KeyStoreException {
Expand Down Expand Up @@ -119,8 +120,7 @@ public PipelineServiceClientResponse deployPipeline(
IngestionPipeline ingestionPipeline, ServiceEntityInterface service) {
HttpResponse<String> response;
try {
String deployEndpoint = "%s/%s/deploy";
String deployUrl = String.format(deployEndpoint, serviceURL, API_ENDPOINT);
String deployUrl = buildURI("deploy").build().toString();
String pipelinePayload = JsonUtils.pojoToJson(ingestionPipeline);
response = post(deployUrl, pipelinePayload);
if (response.statusCode() == 200) {
Expand All @@ -147,10 +147,9 @@ public PipelineServiceClientResponse deletePipeline(IngestionPipeline ingestionP
String pipelineName = ingestionPipeline.getName();
HttpResponse<String> response;
try {
String deleteEndpoint = "%s/%s/delete?dag_id=%s";
response =
deleteRequestAuthenticatedForJsonContent(
deleteEndpoint, serviceURL, API_ENDPOINT, pipelineName);
URIBuilder uri = buildURI("delete");
uri.addParameter(DAG_ID, pipelineName);
response = deleteRequestAuthenticatedForJsonContent(uri.build().toString());
if (response.statusCode() == 200) {
return new PipelineServiceClientResponse().withCode(200).withPlatform(this.getPlatform());
}
Expand All @@ -171,8 +170,7 @@ public PipelineServiceClientResponse runPipeline(
String pipelineName = ingestionPipeline.getName();
HttpResponse<String> response;
try {
String triggerEndPoint = "%s/%s/trigger";
String triggerUrl = String.format(triggerEndPoint, serviceURL, API_ENDPOINT);
String triggerUrl = buildURI("trigger").build().toString();
JSONObject requestPayload = new JSONObject();
requestPayload.put(DAG_ID, pipelineName);
response = post(triggerUrl, requestPayload.toString());
Expand All @@ -196,14 +194,12 @@ public PipelineServiceClientResponse runPipeline(
public PipelineServiceClientResponse toggleIngestion(IngestionPipeline ingestionPipeline) {
HttpResponse<String> response;
try {
String toggleEndPoint;
String toggleUrl;
JSONObject requestPayload = new JSONObject();
requestPayload.put(DAG_ID, ingestionPipeline.getName());
// If the pipeline is currently enabled, disable it
if (ingestionPipeline.getEnabled().equals(Boolean.TRUE)) {
toggleEndPoint = "%s/%s/disable";
toggleUrl = String.format(toggleEndPoint, serviceURL, API_ENDPOINT);
toggleUrl = buildURI("disable").build().toString();
response = post(toggleUrl, requestPayload.toString());
if (response.statusCode() == 200) {
ingestionPipeline.setEnabled(false);
Expand All @@ -220,8 +216,7 @@ public PipelineServiceClientResponse toggleIngestion(IngestionPipeline ingestion
}
// otherwise, enable it back
} else {
toggleEndPoint = "%s/%s/enable";
toggleUrl = String.format(toggleEndPoint, serviceURL, API_ENDPOINT);
toggleUrl = buildURI("enable").build().toString();
response = post(toggleUrl, requestPayload.toString());
if (response.statusCode() == 200) {
ingestionPipeline.setEnabled(true);
Expand Down Expand Up @@ -250,10 +245,10 @@ public PipelineServiceClientResponse toggleIngestion(IngestionPipeline ingestion
public List<PipelineStatus> getQueuedPipelineStatusInternal(IngestionPipeline ingestionPipeline) {
HttpResponse<String> response;
try {
String statusEndPoint = "%s/%s/status?dag_id=%s&only_queued=true";
response =
getRequestAuthenticatedForJsonContent(
statusEndPoint, serviceURL, API_ENDPOINT, ingestionPipeline.getName());
URIBuilder uri = buildURI("status");
uri.addParameter(DAG_ID, ingestionPipeline.getName());
uri.addParameter("only_queued", "true");
response = getRequestAuthenticatedForJsonContent(uri.build().toString());
if (response.statusCode() == 200) {
return JsonUtils.readObjects(response.body(), PipelineStatus.class);
}
Expand All @@ -276,8 +271,8 @@ public List<PipelineStatus> getQueuedPipelineStatusInternal(IngestionPipeline in
public PipelineServiceClientResponse getServiceStatusInternal() {
HttpResponse<String> response;
try {
response =
getRequestAuthenticatedForJsonContent("%s/%s/health-auth", serviceURL, API_ENDPOINT);
String healthUrl = buildURI("health-auth").build().toString();
response = getRequestAuthenticatedForJsonContent(healthUrl);

// We can reach the APIs and get the status back from Airflow
if (response.statusCode() == 200) {
Expand Down Expand Up @@ -321,8 +316,7 @@ public PipelineServiceClientResponse getServiceStatusInternal() {
public PipelineServiceClientResponse runAutomationsWorkflow(Workflow workflow) {
HttpResponse<String> response;
try {
String automationsEndpoint = "%s/%s/run_automation";
String automationsUrl = String.format(automationsEndpoint, serviceURL, API_ENDPOINT);
String automationsUrl = buildURI("run_automation").build().toString();
String workflowPayload = JsonUtils.pojoToJson(workflow);
response = post(automationsUrl, workflowPayload);
if (response.statusCode() == 200) {
Expand Down Expand Up @@ -362,9 +356,7 @@ private PipelineServiceClientResponse sendPost(String endpoint, Object request)
HttpResponse<String> response;
String workflowPayload = JsonUtils.pojoToJson(request);
try {
String automationsEndpoint = "%s/%s/%s";
String automationsUrl =
String.format(automationsEndpoint, serviceURL, API_ENDPOINT, endpoint);
String automationsUrl = buildURI(endpoint).build().toString();
response = post(automationsUrl, workflowPayload);
if (response.statusCode() == 200) {
return new PipelineServiceClientResponse()
Expand All @@ -387,8 +379,7 @@ private PipelineServiceClientResponse sendPost(String endpoint, Object request)
public PipelineServiceClientResponse killIngestion(IngestionPipeline ingestionPipeline) {
HttpResponse<String> response;
try {
String killEndPoint = "%s/%s/kill";
String killUrl = String.format(killEndPoint, serviceURL, API_ENDPOINT);
String killUrl = buildURI("kill").build().toString();
JSONObject requestPayload = new JSONObject();
requestPayload.put(DAG_ID, ingestionPipeline.getName());
response = post(killUrl, requestPayload.toString());
Expand All @@ -410,7 +401,7 @@ public PipelineServiceClientResponse killIngestion(IngestionPipeline ingestionPi
public Map<String, String> requestGetHostIp() {
HttpResponse<String> response;
try {
response = getRequestAuthenticatedForJsonContent("%s/%s/ip", serviceURL, API_ENDPOINT);
response = getRequestAuthenticatedForJsonContent(buildURI("ip").build().toString());
if (response.statusCode() == 200) {
return JsonUtils.readValue(response.body(), new TypeReference<>() {});
}
Expand All @@ -428,15 +419,15 @@ public Map<String, String> getLastIngestionLogs(
HttpResponse<String> response;
String taskId = TYPE_TO_TASK.get(ingestionPipeline.getPipelineType().toString());
// Init empty after query param
String afterParam = "";

URIBuilder uri = buildURI("last_dag_logs");
if (after != null) {
afterParam = String.format("&after=%s", after);
uri.addParameter("after", after);
}
uri.addParameter(DAG_ID, ingestionPipeline.getName());
uri.addParameter("task_id", taskId);
try {
response =
getRequestAuthenticatedForJsonContent(
"%s/%s/last_dag_logs?dag_id=%s&task_id=%s%s",
serviceURL, API_ENDPOINT, ingestionPipeline.getName(), taskId, afterParam);
response = getRequestAuthenticatedForJsonContent(uri.build().toString());
if (response.statusCode() == 200) {
return JsonUtils.readValue(response.body(), new TypeReference<>() {});
}
Expand All @@ -448,25 +439,30 @@ public Map<String, String> getLastIngestionLogs(
String.format("Failed to get last ingestion logs due to %s", response.body()));
}

private HttpResponse<String> getRequestAuthenticatedForJsonContent(
String stringUrlFormat, Object... stringReplacement)
private URIBuilder buildURI(String path) {
try {
List<String> pathInternal = new ArrayList<>(API_ENDPOINT_SEGMENTS);
pathInternal.add(path);
return new URIBuilder(String.valueOf(serviceURL)).setPathSegments(pathInternal);
} catch (Exception e) {
throw PipelineServiceClientException.byMessage(
String.format("Failed to built request URI for path [%s].", path), e.getMessage());
}
}

private HttpResponse<String> getRequestAuthenticatedForJsonContent(String url)
throws IOException, InterruptedException {
HttpRequest request =
authenticatedRequestBuilder(stringUrlFormat, stringReplacement).GET().build();
HttpRequest request = authenticatedRequestBuilder(url).GET().build();
return client.send(request, HttpResponse.BodyHandlers.ofString());
}

private HttpResponse<String> deleteRequestAuthenticatedForJsonContent(
String stringUrlFormat, Object... stringReplacement)
private HttpResponse<String> deleteRequestAuthenticatedForJsonContent(String url)
throws IOException, InterruptedException {
HttpRequest request =
authenticatedRequestBuilder(stringUrlFormat, stringReplacement).DELETE().build();
HttpRequest request = authenticatedRequestBuilder(url).DELETE().build();
return client.send(request, HttpResponse.BodyHandlers.ofString());
}

private HttpRequest.Builder authenticatedRequestBuilder(
String stringUrlFormat, Object... stringReplacement) {
String url = String.format(stringUrlFormat, stringReplacement);
private HttpRequest.Builder authenticatedRequestBuilder(String url) {
return HttpRequest.newBuilder(URI.create(url))
.header(CONTENT_HEADER, CONTENT_TYPE)
.header(AUTH_HEADER, getBasicAuthenticationHeader(username, password));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,8 @@ void createGlossaryTerm_LanguageTest(TestInfo test) throws IOException {
for (String name : getAllHelloWorldTranslations()) {
CreateGlossaryTerm create = createRequest(name);
GlossaryTerm createdEntity = createEntity(create, ADMIN_AUTH_HEADERS);
GlossaryTerm glossaryGet = getEntityByName(createdEntity.getFullyQualifiedName(), ADMIN_AUTH_HEADERS);
GlossaryTerm glossaryGet =
getEntityByName(createdEntity.getFullyQualifiedName(), ADMIN_AUTH_HEADERS);
assertEquals(name, glossaryGet.getName());
}
}
Expand Down
Loading