Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async apis for csv import #18647

Merged
merged 21 commits into from
Nov 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.openmetadata.service.util.AsyncService;
import org.openmetadata.service.util.BulkAssetsOperationResponse;
import org.openmetadata.service.util.CSVExportResponse;
import org.openmetadata.service.util.CSVImportResponse;
import org.openmetadata.service.util.EntityUtil;
import org.openmetadata.service.util.EntityUtil.Fields;
import org.openmetadata.service.util.RestUtil;
Expand Down Expand Up @@ -465,6 +466,28 @@ public Response bulkRemoveFromAssetsAsync(
return Response.ok().entity(response).type(MediaType.APPLICATION_JSON).build();
}

public Response importCsvInternalAsync(
SecurityContext securityContext, String name, String csv, boolean dryRun) {
OperationContext operationContext =
new OperationContext(entityType, MetadataOperation.EDIT_ALL);
authorizer.authorize(securityContext, operationContext, getResourceContextByName(name));
String jobId = UUID.randomUUID().toString();
ExecutorService executorService = AsyncService.getInstance().getExecutorService();
executorService.submit(
() -> {
try {
CsvImportResult result = importCsvInternal(securityContext, name, csv, dryRun);
WebsocketNotificationHandler.sendCsvImportCompleteNotification(
jobId, securityContext, result);
} catch (Exception e) {
WebsocketNotificationHandler.sendCsvImportFailedNotification(
jobId, securityContext, e.getMessage());
}
});
CSVImportResponse response = new CSVImportResponse(jobId, "Import initiated successfully.");
return Response.ok().entity(response).type(MediaType.APPLICATION_JSON).build();
}

public String exportCsvInternal(SecurityContext securityContext, String name) throws IOException {
OperationContext operationContext =
new OperationContext(entityType, MetadataOperation.VIEW_ALL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,41 @@ public CsvImportResult importCsv(
return importCsvInternal(securityContext, name, csv, dryRun);
}

@PUT
@Path("/name/{name}/importAsync")
@Consumes(MediaType.TEXT_PLAIN)
@Produces(MediaType.APPLICATION_JSON)
@Valid
@Operation(
operationId = "importDatabaseAsync",
summary = "Import database schemas from CSV asynchronously",
description =
"Import database schemas from CSV to update database schemas asynchronously (no creation allowed).",
responses = {
@ApiResponse(
responseCode = "200",
description = "Import initiated successfully",
content =
@Content(
mediaType = "application/json",
schema = @Schema(implementation = CsvImportResult.class)))
})
public Response importCsvAsync(
@Context SecurityContext securityContext,
@Parameter(description = "Name of the Database", schema = @Schema(type = "string"))
@PathParam("name")
String name,
@Parameter(
description =
"Dry-run when true is used for validating the CSV without really importing it. (default=true)",
schema = @Schema(type = "boolean"))
@DefaultValue("true")
@QueryParam("dryRun")
boolean dryRun,
String csv) {
return importCsvInternalAsync(securityContext, name, csv, dryRun);
}

@PUT
@Path("/{id}/vote")
@Operation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,40 @@ public CsvImportResult importCsv(
return importCsvInternal(securityContext, name, csv, dryRun);
}

@PUT
@Path("/name/{name}/importAsync")
@Consumes(MediaType.TEXT_PLAIN)
@Produces(MediaType.APPLICATION_JSON)
@Valid
@Operation(
operationId = "importDatabaseSchemaAsync",
summary =
"Import tables from CSV to update database schema asynchronously (no creation allowed)",
responses = {
@ApiResponse(
responseCode = "200",
description = "Import initiated successfully",
content =
@Content(
mediaType = "application/json",
schema = @Schema(implementation = CsvImportResult.class)))
})
public Response importCsvAsync(
@Context SecurityContext securityContext,
@Parameter(description = "Name of the Database schema", schema = @Schema(type = "string"))
@PathParam("name")
String name,
@Parameter(
description =
"Dry-run when true is used for validating the CSV without really importing it. (default=true)",
schema = @Schema(type = "boolean"))
@DefaultValue("true")
@QueryParam("dryRun")
boolean dryRun,
String csv) {
return importCsvInternalAsync(securityContext, name, csv, dryRun);
}

@PUT
@Path("/{id}/vote")
@Operation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,38 @@ public CsvImportResult importCsv(
return importCsvInternal(securityContext, name, csv, dryRun);
}

@PUT
@Path("/name/{name}/importAsync")
@Consumes(MediaType.TEXT_PLAIN)
@Valid
@Operation(
operationId = "importTableAsync",
summary = "Import columns from CSV to update table asynchronously (no creation allowed)",
responses = {
@ApiResponse(
responseCode = "200",
description = "Import result",
content =
@Content(
mediaType = "application/json",
schema = @Schema(implementation = CsvImportResult.class)))
})
public Response importCsvAsync(
@Context SecurityContext securityContext,
@Parameter(description = "Name of the table", schema = @Schema(type = "string"))
@PathParam("name")
String name,
@Parameter(
description =
"Dry-run when true is used for validating the CSV without really importing it. (default=true)",
schema = @Schema(type = "boolean"))
@DefaultValue("true")
@QueryParam("dryRun")
boolean dryRun,
String csv) {
return importCsvInternalAsync(securityContext, name, csv, dryRun);
}

@DELETE
@Path("/{id}")
@Operation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,36 @@ public CsvImportResult importCsv(
return importCsvInternal(securityContext, name, csv, dryRun);
}

@PUT
@Path("/name/{name}/importAsync")
@Consumes(MediaType.TEXT_PLAIN)
@Produces(MediaType.APPLICATION_JSON)
@Valid
@Operation(
operationId = "importGlossaryAsync",
summary = "Import glossary in CSV format asynchronously",
responses = {
@ApiResponse(
responseCode = "200",
description = "Import initiated successfully",
content =
@Content(
mediaType = "application/json",
schema = @Schema(implementation = CsvImportResult.class)))
})
public Response importCsvAsync(
@Context SecurityContext securityContext,
@Parameter(description = "Name of the glossary", schema = @Schema(type = "string"))
@PathParam("name")
String name,
@RequestBody(description = "CSV data to import", required = true) String csv,
@Parameter(description = "Dry run the import", schema = @Schema(type = "boolean"))
@QueryParam("dryRun")
@DefaultValue("true")
boolean dryRun) {
return importCsvInternalAsync(securityContext, name, csv, dryRun);
}

private Glossary getGlossary(CreateGlossary create, String user) {
return getGlossary(repository, create, user);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,40 @@ public CsvImportResult importCsv(
return importCsvInternal(securityContext, name, csv, dryRun);
}

@PUT
@Path("/name/{name}/importAsync")
@Consumes(MediaType.TEXT_PLAIN)
@Produces(MediaType.APPLICATION_JSON)
@Valid
@Operation(
operationId = "importDatabaseServiceAsync",
summary =
"Import service from CSV to update database service asynchronously (no creation allowed)",
responses = {
@ApiResponse(
responseCode = "200",
description = "Import initiated successfully",
content =
@Content(
mediaType = "application/json",
schema = @Schema(implementation = CsvImportResult.class)))
})
public Response importCsvAsync(
@Context SecurityContext securityContext,
@Parameter(description = "Name of the Database Service", schema = @Schema(type = "string"))
@PathParam("name")
String name,
@Parameter(
description =
"Dry-run when true is used for validating the CSV without really importing it. (default=true)",
schema = @Schema(type = "boolean"))
@DefaultValue("true")
@QueryParam("dryRun")
boolean dryRun,
String csv) {
return importCsvInternalAsync(securityContext, name, csv, dryRun);
}

@DELETE
@Path("/{id}")
@Operation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,37 @@ public CsvImportResult importCsv(
return importCsvInternal(securityContext, name, csv, dryRun);
}

@PUT
@Path("/name/{name}/importAsync")
@Consumes(MediaType.TEXT_PLAIN)
@Produces(MediaType.APPLICATION_JSON)
@Valid
@Operation(
operationId = "importTeamsAsync",
summary = "Import from CSV to create, and update teams asynchronously.",
responses = {
@ApiResponse(
responseCode = "200",
description = "Import initiated successfully",
content =
@Content(
mediaType = "application/json",
schema = @Schema(implementation = CsvImportResult.class)))
})
public Response importCsvAsync(
@Context SecurityContext securityContext,
@PathParam("name") String name,
@Parameter(
description =
"Dry-run when true is used for validating the CSV without really importing it. (default=true)",
schema = @Schema(type = "boolean"))
@DefaultValue("true")
@QueryParam("dryRun")
boolean dryRun,
String csv) {
return importCsvInternalAsync(securityContext, name, csv, dryRun);
}

private Team getTeam(CreateTeam ct, String user) {
if (ct.getTeamType().equals(TeamType.ORGANIZATION)) {
throw new IllegalArgumentException(CREATE_ORGANIZATION);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1482,6 +1482,41 @@ public CsvImportResult importCsv(
return importCsvInternal(securityContext, team, csv, dryRun);
}

@PUT
@Path("/importAsync")
@Consumes(MediaType.TEXT_PLAIN)
@Valid
@Operation(
operationId = "importTeamsAsync",
summary = "Import from CSV to create, and update teams asynchronously.",
responses = {
@ApiResponse(
responseCode = "200",
description = "Import result",
content =
@Content(
mediaType = "application/json",
schema = @Schema(implementation = CsvImportResult.class)))
})
public Response importCsvAsync(
@Context SecurityContext securityContext,
@Parameter(
description = "Name of the team to under which the users are imported to",
required = true,
schema = @Schema(type = "string"))
@QueryParam("team")
String team,
@Parameter(
description =
"Dry-run when true is used for validating the CSV without really importing it. (default=true)",
schema = @Schema(type = "boolean"))
@DefaultValue("true")
@QueryParam("dryRun")
boolean dryRun,
String csv) {
return importCsvInternalAsync(securityContext, team, csv, dryRun);
}

public void validateEmailAlreadyExists(String email) {
if (repository.checkEmailAlreadyExists(email)) {
throw new CustomExceptionMessage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class WebSocketManager {
public static final String MENTION_CHANNEL = "mentionChannel";
public static final String ANNOUNCEMENT_CHANNEL = "announcementChannel";
public static final String CSV_EXPORT_CHANNEL = "csvExportChannel";
public static final String CSV_IMPORT_CHANNEL = "csvImportChannel";

public static final String BULK_ASSETS_CHANNEL = "bulkAssetsChannel";

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package org.openmetadata.service.util;

import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.openmetadata.schema.type.csv.CsvImportResult;

@NoArgsConstructor
public class CSVImportMessage {
@Getter @Setter private String jobId;
@Getter @Setter private String status;
@Getter @Setter private CsvImportResult result;
@Getter @Setter private String error;

public CSVImportMessage(String jobId, String status, CsvImportResult result, String error) {
this.jobId = jobId;
this.status = status;
this.result = result;
this.error = error;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.openmetadata.service.util;

import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

@NoArgsConstructor
public class CSVImportResponse {
@Getter @Setter private String jobId;
@Getter @Setter private String message;

public CSVImportResponse(String jobId, String message) {
this.jobId = jobId;
this.message = message;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.openmetadata.schema.type.Post;
import org.openmetadata.schema.type.Relationship;
import org.openmetadata.schema.type.api.BulkOperationResult;
import org.openmetadata.schema.type.csv.CsvImportResult;
import org.openmetadata.service.Entity;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.resources.feeds.MessageParser;
Expand Down Expand Up @@ -194,4 +195,22 @@ private static UUID getUserIdFromSecurityContext(SecurityContext securityContext
User user = Entity.getCollectionDAO().userDAO().findEntityByName(username);
return user.getId();
}

public static void sendCsvImportCompleteNotification(
String jobId, SecurityContext securityContext, CsvImportResult result) {
CSVImportMessage message = new CSVImportMessage(jobId, "COMPLETED", result, null);
String jsonMessage = JsonUtils.pojoToJson(message);
UUID userId = getUserIdFromSecurityContext(securityContext);
WebSocketManager.getInstance()
.sendToOne(userId, WebSocketManager.CSV_IMPORT_CHANNEL, jsonMessage);
}

public static void sendCsvImportFailedNotification(
String jobId, SecurityContext securityContext, String errorMessage) {
CSVExportMessage message = new CSVExportMessage(jobId, "FAILED", null, errorMessage);
String jsonMessage = JsonUtils.pojoToJson(message);
UUID userId = getUserIdFromSecurityContext(securityContext);
WebSocketManager.getInstance()
.sendToOne(userId, WebSocketManager.CSV_IMPORT_CHANNEL, jsonMessage);
}
}
Loading
Loading