Skip to content

Commit

Permalink
Add async apis for csv import (#18647)
Browse files Browse the repository at this point in the history
  • Loading branch information
sonika-shah authored and Sachin-chaurasiya committed Nov 25, 2024
1 parent 7f65ea0 commit ab30416
Show file tree
Hide file tree
Showing 26 changed files with 1,107 additions and 235 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.openmetadata.service.security.policyevaluator.ResourceContextInterface;
import org.openmetadata.service.util.AsyncService;
import org.openmetadata.service.util.CSVExportResponse;
import org.openmetadata.service.util.CSVImportResponse;
import org.openmetadata.service.util.EntityUtil;
import org.openmetadata.service.util.EntityUtil.Fields;
import org.openmetadata.service.util.RestUtil;
Expand Down Expand Up @@ -404,6 +405,78 @@ public Response exportCsvInternalAsync(SecurityContext securityContext, String n
return Response.accepted().entity(response).type(MediaType.APPLICATION_JSON).build();
}

public Response bulkAddToAssetsAsync(
SecurityContext securityContext, UUID entityId, BulkAssetsRequestInterface request) {
OperationContext operationContext =
new OperationContext(entityType, MetadataOperation.EDIT_ALL);
authorizer.authorize(securityContext, operationContext, getResourceContextById(entityId));
String jobId = UUID.randomUUID().toString();
ExecutorService executorService = AsyncService.getInstance().getExecutorService();
executorService.submit(
() -> {
try {
BulkOperationResult result =
repository.bulkAddAndValidateTagsToAssets(entityId, request);
WebsocketNotificationHandler.bulkAssetsOperationCompleteNotification(
jobId, securityContext, result);
} catch (Exception e) {
WebsocketNotificationHandler.bulkAssetsOperationFailedNotification(
jobId, securityContext, e.getMessage());
}
});
BulkAssetsOperationResponse response =
new BulkAssetsOperationResponse(
jobId, "Bulk Add tags to Asset operation initiated successfully.");
return Response.ok().entity(response).type(MediaType.APPLICATION_JSON).build();
}

public Response bulkRemoveFromAssetsAsync(
SecurityContext securityContext, UUID entityId, BulkAssetsRequestInterface request) {
OperationContext operationContext =
new OperationContext(entityType, MetadataOperation.EDIT_ALL);
authorizer.authorize(securityContext, operationContext, getResourceContextById(entityId));
String jobId = UUID.randomUUID().toString();
ExecutorService executorService = AsyncService.getInstance().getExecutorService();
executorService.submit(
() -> {
try {
BulkOperationResult result =
repository.bulkRemoveAndValidateTagsToAssets(entityId, request);
WebsocketNotificationHandler.bulkAssetsOperationCompleteNotification(
jobId, securityContext, result);
} catch (Exception e) {
WebsocketNotificationHandler.bulkAssetsOperationFailedNotification(
jobId, securityContext, e.getMessage());
}
});
BulkAssetsOperationResponse response =
new BulkAssetsOperationResponse(
jobId, "Bulk Remove tags to Asset operation initiated successfully.");
return Response.ok().entity(response).type(MediaType.APPLICATION_JSON).build();
}

public Response importCsvInternalAsync(
SecurityContext securityContext, String name, String csv, boolean dryRun) {
OperationContext operationContext =
new OperationContext(entityType, MetadataOperation.EDIT_ALL);
authorizer.authorize(securityContext, operationContext, getResourceContextByName(name));
String jobId = UUID.randomUUID().toString();
ExecutorService executorService = AsyncService.getInstance().getExecutorService();
executorService.submit(
() -> {
try {
CsvImportResult result = importCsvInternal(securityContext, name, csv, dryRun);
WebsocketNotificationHandler.sendCsvImportCompleteNotification(
jobId, securityContext, result);
} catch (Exception e) {
WebsocketNotificationHandler.sendCsvImportFailedNotification(
jobId, securityContext, e.getMessage());
}
});
CSVImportResponse response = new CSVImportResponse(jobId, "Import initiated successfully.");
return Response.ok().entity(response).type(MediaType.APPLICATION_JSON).build();
}

public String exportCsvInternal(SecurityContext securityContext, String name) throws IOException {
OperationContext operationContext =
new OperationContext(entityType, MetadataOperation.VIEW_ALL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,41 @@ public CsvImportResult importCsv(
return importCsvInternal(securityContext, name, csv, dryRun);
}

@PUT
@Path("/name/{name}/importAsync")
@Consumes(MediaType.TEXT_PLAIN)
@Produces(MediaType.APPLICATION_JSON)
@Valid
@Operation(
operationId = "importDatabaseAsync",
summary = "Import database schemas from CSV asynchronously",
description =
"Import database schemas from CSV to update database schemas asynchronously (no creation allowed).",
responses = {
@ApiResponse(
responseCode = "200",
description = "Import initiated successfully",
content =
@Content(
mediaType = "application/json",
schema = @Schema(implementation = CsvImportResult.class)))
})
public Response importCsvAsync(
@Context SecurityContext securityContext,
@Parameter(description = "Name of the Database", schema = @Schema(type = "string"))
@PathParam("name")
String name,
@Parameter(
description =
"Dry-run when true is used for validating the CSV without really importing it. (default=true)",
schema = @Schema(type = "boolean"))
@DefaultValue("true")
@QueryParam("dryRun")
boolean dryRun,
String csv) {
return importCsvInternalAsync(securityContext, name, csv, dryRun);
}

@PUT
@Path("/{id}/vote")
@Operation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,40 @@ public CsvImportResult importCsv(
return importCsvInternal(securityContext, name, csv, dryRun);
}

@PUT
@Path("/name/{name}/importAsync")
@Consumes(MediaType.TEXT_PLAIN)
@Produces(MediaType.APPLICATION_JSON)
@Valid
@Operation(
operationId = "importDatabaseSchemaAsync",
summary =
"Import tables from CSV to update database schema asynchronously (no creation allowed)",
responses = {
@ApiResponse(
responseCode = "200",
description = "Import initiated successfully",
content =
@Content(
mediaType = "application/json",
schema = @Schema(implementation = CsvImportResult.class)))
})
public Response importCsvAsync(
@Context SecurityContext securityContext,
@Parameter(description = "Name of the Database schema", schema = @Schema(type = "string"))
@PathParam("name")
String name,
@Parameter(
description =
"Dry-run when true is used for validating the CSV without really importing it. (default=true)",
schema = @Schema(type = "boolean"))
@DefaultValue("true")
@QueryParam("dryRun")
boolean dryRun,
String csv) {
return importCsvInternalAsync(securityContext, name, csv, dryRun);
}

@PUT
@Path("/{id}/vote")
@Operation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,38 @@ public CsvImportResult importCsv(
return importCsvInternal(securityContext, name, csv, dryRun);
}

@PUT
@Path("/name/{name}/importAsync")
@Consumes(MediaType.TEXT_PLAIN)
@Valid
@Operation(
operationId = "importTableAsync",
summary = "Import columns from CSV to update table asynchronously (no creation allowed)",
responses = {
@ApiResponse(
responseCode = "200",
description = "Import result",
content =
@Content(
mediaType = "application/json",
schema = @Schema(implementation = CsvImportResult.class)))
})
public Response importCsvAsync(
@Context SecurityContext securityContext,
@Parameter(description = "Name of the table", schema = @Schema(type = "string"))
@PathParam("name")
String name,
@Parameter(
description =
"Dry-run when true is used for validating the CSV without really importing it. (default=true)",
schema = @Schema(type = "boolean"))
@DefaultValue("true")
@QueryParam("dryRun")
boolean dryRun,
String csv) {
return importCsvInternalAsync(securityContext, name, csv, dryRun);
}

@DELETE
@Path("/{id}")
@Operation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,36 @@ public CsvImportResult importCsv(
return importCsvInternal(securityContext, name, csv, dryRun);
}

@PUT
@Path("/name/{name}/importAsync")
@Consumes(MediaType.TEXT_PLAIN)
@Produces(MediaType.APPLICATION_JSON)
@Valid
@Operation(
operationId = "importGlossaryAsync",
summary = "Import glossary in CSV format asynchronously",
responses = {
@ApiResponse(
responseCode = "200",
description = "Import initiated successfully",
content =
@Content(
mediaType = "application/json",
schema = @Schema(implementation = CsvImportResult.class)))
})
public Response importCsvAsync(
@Context SecurityContext securityContext,
@Parameter(description = "Name of the glossary", schema = @Schema(type = "string"))
@PathParam("name")
String name,
@RequestBody(description = "CSV data to import", required = true) String csv,
@Parameter(description = "Dry run the import", schema = @Schema(type = "boolean"))
@QueryParam("dryRun")
@DefaultValue("true")
boolean dryRun) {
return importCsvInternalAsync(securityContext, name, csv, dryRun);
}

private Glossary getGlossary(CreateGlossary create, String user) {
return getGlossary(repository, create, user);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,40 @@ public CsvImportResult importCsv(
return importCsvInternal(securityContext, name, csv, dryRun);
}

@PUT
@Path("/name/{name}/importAsync")
@Consumes(MediaType.TEXT_PLAIN)
@Produces(MediaType.APPLICATION_JSON)
@Valid
@Operation(
operationId = "importDatabaseServiceAsync",
summary =
"Import service from CSV to update database service asynchronously (no creation allowed)",
responses = {
@ApiResponse(
responseCode = "200",
description = "Import initiated successfully",
content =
@Content(
mediaType = "application/json",
schema = @Schema(implementation = CsvImportResult.class)))
})
public Response importCsvAsync(
@Context SecurityContext securityContext,
@Parameter(description = "Name of the Database Service", schema = @Schema(type = "string"))
@PathParam("name")
String name,
@Parameter(
description =
"Dry-run when true is used for validating the CSV without really importing it. (default=true)",
schema = @Schema(type = "boolean"))
@DefaultValue("true")
@QueryParam("dryRun")
boolean dryRun,
String csv) {
return importCsvInternalAsync(securityContext, name, csv, dryRun);
}

@DELETE
@Path("/{id}")
@Operation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,37 @@ public CsvImportResult importCsv(
return importCsvInternal(securityContext, name, csv, dryRun);
}

@PUT
@Path("/name/{name}/importAsync")
@Consumes(MediaType.TEXT_PLAIN)
@Produces(MediaType.APPLICATION_JSON)
@Valid
@Operation(
operationId = "importTeamsAsync",
summary = "Import from CSV to create, and update teams asynchronously.",
responses = {
@ApiResponse(
responseCode = "200",
description = "Import initiated successfully",
content =
@Content(
mediaType = "application/json",
schema = @Schema(implementation = CsvImportResult.class)))
})
public Response importCsvAsync(
@Context SecurityContext securityContext,
@PathParam("name") String name,
@Parameter(
description =
"Dry-run when true is used for validating the CSV without really importing it. (default=true)",
schema = @Schema(type = "boolean"))
@DefaultValue("true")
@QueryParam("dryRun")
boolean dryRun,
String csv) {
return importCsvInternalAsync(securityContext, name, csv, dryRun);
}

private Team getTeam(CreateTeam ct, String user) {
if (ct.getTeamType().equals(TeamType.ORGANIZATION)) {
throw new IllegalArgumentException(CREATE_ORGANIZATION);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1482,6 +1482,41 @@ public CsvImportResult importCsv(
return importCsvInternal(securityContext, team, csv, dryRun);
}

@PUT
@Path("/importAsync")
@Consumes(MediaType.TEXT_PLAIN)
@Valid
@Operation(
operationId = "importTeamsAsync",
summary = "Import from CSV to create, and update teams asynchronously.",
responses = {
@ApiResponse(
responseCode = "200",
description = "Import result",
content =
@Content(
mediaType = "application/json",
schema = @Schema(implementation = CsvImportResult.class)))
})
public Response importCsvAsync(
@Context SecurityContext securityContext,
@Parameter(
description = "Name of the team to under which the users are imported to",
required = true,
schema = @Schema(type = "string"))
@QueryParam("team")
String team,
@Parameter(
description =
"Dry-run when true is used for validating the CSV without really importing it. (default=true)",
schema = @Schema(type = "boolean"))
@DefaultValue("true")
@QueryParam("dryRun")
boolean dryRun,
String csv) {
return importCsvInternalAsync(securityContext, team, csv, dryRun);
}

public void validateEmailAlreadyExists(String email) {
if (repository.checkEmailAlreadyExists(email)) {
throw new CustomExceptionMessage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class WebSocketManager {
public static final String MENTION_CHANNEL = "mentionChannel";
public static final String ANNOUNCEMENT_CHANNEL = "announcementChannel";
public static final String CSV_EXPORT_CHANNEL = "csvExportChannel";
public static final String CSV_IMPORT_CHANNEL = "csvImportChannel";

@Getter
private final Map<UUID, Map<String, SocketIoSocket>> activityFeedEndpoints =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package org.openmetadata.service.util;

import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.openmetadata.schema.type.csv.CsvImportResult;

@NoArgsConstructor
public class CSVImportMessage {
@Getter @Setter private String jobId;
@Getter @Setter private String status;
@Getter @Setter private CsvImportResult result;
@Getter @Setter private String error;

public CSVImportMessage(String jobId, String status, CsvImportResult result, String error) {
this.jobId = jobId;
this.status = status;
this.result = result;
this.error = error;
}
}
Loading

0 comments on commit ab30416

Please sign in to comment.