diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala index 24433b76cd9..7192f7f3fd6 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala @@ -19,7 +19,12 @@ package edu.uci.ics.texera.web.resource.dashboard.user.workflow -import edu.uci.ics.amber.core.storage.{DocumentFactory, VFSResourceType, VFSURIFactory} +import edu.uci.ics.amber.core.storage.{ + DocumentFactory, + FileResolver, + VFSResourceType, + VFSURIFactory +} import edu.uci.ics.amber.core.tuple.Tuple import edu.uci.ics.amber.core.virtualidentity._ import edu.uci.ics.amber.core.workflow.{GlobalPortIdentity, PortIdentity} @@ -27,12 +32,13 @@ import edu.uci.ics.amber.engine.architecture.logreplay.{ReplayDestination, Repla import edu.uci.ics.amber.engine.common.Utils.{maptoStatusCode, stringToAggregatedState} import edu.uci.ics.amber.engine.common.storage.SequentialRecordStorage import edu.uci.ics.amber.util.serde.GlobalPortIdentitySerde.SerdeOps +import edu.uci.ics.amber.util.JSONUtils.objectMapper import edu.uci.ics.texera.auth.SessionUser import edu.uci.ics.texera.dao.SqlServer import edu.uci.ics.texera.dao.SqlServer.withTransaction import edu.uci.ics.texera.dao.jooq.generated.Tables._ import edu.uci.ics.texera.dao.jooq.generated.tables.daos.WorkflowExecutionsDao -import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.WorkflowExecutions +import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{User => UserPojo, WorkflowExecutions} import edu.uci.ics.texera.web.model.http.request.result.ResultExportRequest import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource._ import edu.uci.ics.texera.web.service.{ExecutionsMetadataPersistService, ResultExportService} @@ -100,6 +106,131 @@ object WorkflowExecutionsResource { } } + /** + * Computes which operators in a workflow are restricted due to dataset access controls. + * + * This function: + * 1. Parses the workflow JSON to find all operators and their dataset dependencies + * 2. Identifies operators using non-downloadable datasets that the user doesn't own + * 3. Uses BFS to propagate restrictions through the workflow graph + * 4. Returns a map of operator IDs to the restricted datasets they depend on + * + * @param wid The workflow ID + * @param currentUser The current user making the export request + * @return Map of operator ID -> Set of (ownerEmail, datasetName) tuples that block its export + */ + private def getNonDownloadableOperatorMap( + wid: Int, + currentUser: UserPojo + ): Map[String, Set[(String, String)]] = { + // Load workflow + val workflowRecord = context + .select(WORKFLOW.CONTENT) + .from(WORKFLOW) + .where(WORKFLOW.WID.eq(wid).and(WORKFLOW.CONTENT.isNotNull).and(WORKFLOW.CONTENT.ne(""))) + .fetchOne() + + if (workflowRecord == null) { + return Map.empty + } + + val content = workflowRecord.value1() + + val rootNode = + try { + objectMapper.readTree(content) + } catch { + case _: Exception => return Map.empty + } + + val operatorsNode = rootNode.path("operators") + val linksNode = rootNode.path("links") + + // Collect all datasets used by operators (that user doesn't own) + val operatorDatasets = mutable.Map.empty[String, (String, String)] + + operatorsNode.elements().asScala.foreach { operatorNode => + val operatorId = operatorNode.path("operatorID").asText("") + if (operatorId.nonEmpty) { + val fileNameNode = operatorNode.path("operatorProperties").path("fileName") + if (fileNameNode.isTextual) { + FileResolver.parseDatasetOwnerAndName(fileNameNode.asText()).foreach { + case (ownerEmail, datasetName) => + val isOwner = + Option(currentUser.getEmail) + .exists(_.equalsIgnoreCase(ownerEmail)) + if (!isOwner) { + operatorDatasets.update(operatorId, (ownerEmail, datasetName)) + } + } + } + } + } + + if (operatorDatasets.isEmpty) { + return Map.empty + } + + // Query all datasets + val uniqueDatasets = operatorDatasets.values.toSet + val conditions = uniqueDatasets.map { + case (ownerEmail, datasetName) => + USER.EMAIL.equalIgnoreCase(ownerEmail).and(DATASET.NAME.equalIgnoreCase(datasetName)) + } + + val nonDownloadableDatasets = context + .select(USER.EMAIL, DATASET.NAME) + .from(DATASET) + .join(USER) + .on(DATASET.OWNER_UID.eq(USER.UID)) + .where(conditions.reduce((a, b) => a.or(b))) + .and(DATASET.IS_DOWNLOADABLE.eq(false)) + .fetch() + .asScala + .map(record => (record.value1(), record.value2())) + .toSet + + // Filter to only operators with non-downloadable datasets + val restrictedSourceMap = operatorDatasets.filter { + case (_, dataset) => + nonDownloadableDatasets.contains(dataset) + } + + // Build dependency graph + val adjacency = mutable.Map.empty[String, mutable.ListBuffer[String]] + + linksNode.elements().asScala.foreach { linkNode => + val sourceId = linkNode.path("source").path("operatorID").asText("") + val targetId = linkNode.path("target").path("operatorID").asText("") + if (sourceId.nonEmpty && targetId.nonEmpty) { + adjacency.getOrElseUpdate(sourceId, mutable.ListBuffer.empty[String]) += targetId + } + } + + // BFS to propagate restrictions + val restrictionMap = mutable.Map.empty[String, Set[(String, String)]] + val queue = mutable.Queue.empty[(String, Set[(String, String)])] + + restrictedSourceMap.foreach { + case (operatorId, dataset) => + queue.enqueue(operatorId -> Set(dataset)) + } + + while (queue.nonEmpty) { + val (currentOperatorId, datasetSet) = queue.dequeue() + val existing = restrictionMap.getOrElse(currentOperatorId, Set.empty) + val merged = existing ++ datasetSet + if (merged != existing) { + restrictionMap.update(currentOperatorId, merged) + adjacency + .get(currentOperatorId) + .foreach(_.foreach(nextOperator => queue.enqueue(nextOperator -> merged))) + } + } + + restrictionMap.toMap + } + def insertOperatorPortResultUri( eid: ExecutionIdentity, globalPortId: GlobalPortIdentity, @@ -660,6 +791,37 @@ class WorkflowExecutionsResource { executionsDao.update(execution) } + /** + * Returns which operators are restricted from export due to dataset access controls. + * This endpoint allows the frontend to check restrictions before attempting export. + * + * @param wid The workflow ID to check + * @param user The authenticated user + * @return JSON map of operator ID -> array of {ownerEmail, datasetName} that block its export + */ + @GET + @Path("/{wid}/result/downloadability") + @Produces(Array(MediaType.APPLICATION_JSON)) + @RolesAllowed(Array("REGULAR", "ADMIN")) + def getWorkflowResultDownloadability( + @PathParam("wid") wid: Integer, + @Auth user: SessionUser + ): Response = { + validateUserCanAccessWorkflow(user.getUser.getUid, wid) + + val datasetRestrictions = getNonDownloadableOperatorMap(wid, user.user) + + // Convert to frontend-friendly format: Map[operatorId -> Array[datasetLabel]] + val restrictionMap = datasetRestrictions.map { + case (operatorId, datasets) => + operatorId -> datasets.map { + case (ownerEmail, datasetName) => s"$datasetName ($ownerEmail)" + }.toArray + }.asJava + + Response.ok(restrictionMap).build() + } + @POST @Path("/result/export") @RolesAllowed(Array("REGULAR", "ADMIN")) @@ -675,6 +837,35 @@ class WorkflowExecutionsResource { .entity(Map("error" -> "No operator selected").asJava) .build() + // Get ALL non-downloadable in workflow + val datasetRestrictions = getNonDownloadableOperatorMap(request.workflowId, user.user) + // Filter to only user's selection + val restrictedOperators = request.operators.filter(op => datasetRestrictions.contains(op.id)) + // Check if any selected operator is restricted + if (restrictedOperators.nonEmpty) { + val restrictedDatasets = restrictedOperators.flatMap { op => + datasetRestrictions(op.id).map { + case (ownerEmail, datasetName) => + Map( + "operatorId" -> op.id, + "ownerEmail" -> ownerEmail, + "datasetName" -> datasetName + ).asJava + } + } + + return Response + .status(Response.Status.FORBIDDEN) + .`type`(MediaType.APPLICATION_JSON) + .entity( + Map( + "error" -> "Export blocked due to dataset restrictions", + "restrictedDatasets" -> restrictedDatasets.asJava + ).asJava + ) + .build() + } + try { request.destination match { case "local" => diff --git a/core/gui/src/app/dashboard/service/user/download/download.service.ts b/core/gui/src/app/dashboard/service/user/download/download.service.ts index f02a2411b54..aafb14a44c6 100644 --- a/core/gui/src/app/dashboard/service/user/download/download.service.ts +++ b/core/gui/src/app/dashboard/service/user/download/download.service.ts @@ -33,6 +33,7 @@ import { DashboardWorkflowComputingUnit } from "../../../../workspace/types/work var contentDisposition = require("content-disposition"); export const EXPORT_BASE_URL = "result/export"; +export const DOWNLOADABILITY_BASE_URL = "result/downloadability"; interface DownloadableItem { blob: Blob; @@ -44,6 +45,10 @@ export interface ExportWorkflowJsonResponse { message: string; } +export interface WorkflowResultDownloadabilityResponse { + [operatorId: string]: string[]; // operatorId -> array of dataset labels blocking export +} + @Injectable({ providedIn: "root", }) @@ -115,6 +120,18 @@ export class DownloadService { ); } + /** + * Retrieves workflow result downloadability information from the backend. + * Returns a map of operator IDs to arrays of dataset labels that block their export. + * + * @param workflowId The workflow ID to check + * @returns Observable of downloadability information + */ + public getWorkflowResultDownloadability(workflowId: number): Observable { + const urlPath = `${WORKFLOW_EXECUTIONS_API_BASE_URL}/${workflowId}/${DOWNLOADABILITY_BASE_URL}`; + return this.http.get(urlPath); + } + /** * Export the workflow result. If destination = "local", the server returns a BLOB (file). * Otherwise, it returns JSON with a status message. diff --git a/core/gui/src/app/workspace/component/result-exportation/result-exportation.component.html b/core/gui/src/app/workspace/component/result-exportation/result-exportation.component.html index 756e5f54112..1624a53a04d 100644 --- a/core/gui/src/app/workspace/component/result-exportation/result-exportation.component.html +++ b/core/gui/src/app/workspace/component/result-exportation/result-exportation.component.html @@ -18,126 +18,144 @@ -->
-
-
- - - - Export Type - - - - - - - - - - - - - - - Filename - - - - - - - - - - Destination - - - - - - - - - -
-
- - - -
-
{{dataset.dataset.did?.toString()}}
+ *ngIf="hasPartialNonDownloadable && blockingDatasetLabels.length > 0"> + +
+ +
+
+ + + + Export Type + + + + + + + + + + + + + + + Filename + + + + + + + + + + Destination + + + + + + + + + +
+
+
+ + + +
+
{{dataset.dataset.did?.toString()}}
- {{ dataset.dataset.name }} + {{ dataset.dataset.name }} - -
-
-
- + +
+
+
+ + +
-
- + + +
+ +
+
diff --git a/core/gui/src/app/workspace/component/result-exportation/result-exportation.component.ts b/core/gui/src/app/workspace/component/result-exportation/result-exportation.component.ts index c053b1545f4..4783bd93f23 100644 --- a/core/gui/src/app/workspace/component/result-exportation/result-exportation.component.ts +++ b/core/gui/src/app/workspace/component/result-exportation/result-exportation.component.ts @@ -19,7 +19,10 @@ import { UntilDestroy, untilDestroyed } from "@ngneat/until-destroy"; import { Component, inject, Input, OnInit } from "@angular/core"; -import { WorkflowResultExportService } from "../../service/workflow-result-export/workflow-result-export.service"; +import { + WorkflowResultExportService, + WorkflowResultDownloadability, +} from "../../service/workflow-result-export/workflow-result-export.service"; import { DashboardDataset } from "../../../dashboard/type/dashboard-dataset.interface"; import { DatasetService } from "../../../dashboard/service/user/dataset/dataset.service"; import { NZ_MODAL_DATA, NzModalRef, NzModalService } from "ng-zorro-antd/modal"; @@ -52,10 +55,65 @@ export class ResultExportationComponent implements OnInit { containsBinaryData: boolean = false; inputDatasetName = ""; selectedComputingUnit: DashboardWorkflowComputingUnit | null = null; + downloadability?: WorkflowResultDownloadability; userAccessibleDatasets: DashboardDataset[] = []; filteredUserAccessibleDatasets: DashboardDataset[] = []; + /** + * Gets the operator IDs to check for restrictions based on the source trigger. + * Menu: all operators, Context menu: highlighted operators only + */ + private getOperatorIdsToCheck(): readonly string[] { + if (this.sourceTriggered === "menu") { + return this.workflowActionService + .getTexeraGraph() + .getAllOperators() + .map(op => op.operatorID); + } else { + return this.workflowActionService.getJointGraphWrapper().getCurrentHighlightedOperatorIDs(); + } + } + + /** + * Computed property: operator IDs that can be exported + */ + get exportableOperatorIds(): string[] { + if (!this.downloadability) return []; + return this.downloadability.getExportableOperatorIds(this.getOperatorIdsToCheck()); + } + + /** + * Computed property: operator IDs that are blocked from export + */ + get blockedOperatorIds(): string[] { + if (!this.downloadability) return []; + return this.downloadability.getBlockedOperatorIds(this.getOperatorIdsToCheck()); + } + + /** + * Computed property: whether all selected operators are blocked + */ + get isExportRestricted(): boolean { + const operatorIds = this.getOperatorIdsToCheck(); + return this.exportableOperatorIds.length === 0 && operatorIds.length > 0; + } + + /** + * Computed property: whether some (but not all) operators are blocked + */ + get hasPartialNonDownloadable(): boolean { + return this.exportableOperatorIds.length > 0 && this.blockedOperatorIds.length > 0; + } + + /** + * Computed property: dataset labels that are blocking export + */ + get blockingDatasetLabels(): string[] { + if (!this.downloadability) return []; + return this.downloadability.getBlockingDatasets(this.getOperatorIdsToCheck()); + } + constructor( public workflowResultExportService: WorkflowResultExportService, private modalRef: NzModalRef, @@ -74,7 +132,14 @@ export class ResultExportationComponent implements OnInit { this.userAccessibleDatasets = datasets.filter(dataset => dataset.accessPrivilege === "WRITE"); this.filteredUserAccessibleDatasets = [...this.userAccessibleDatasets]; }); - this.updateOutputType(); + + this.workflowResultExportService + .computeRestrictionAnalysis() + .pipe(untilDestroyed(this)) + .subscribe(downloadability => { + this.downloadability = downloadability; + this.updateOutputType(); + }); this.computingUnitStatusService .getSelectedComputingUnit() @@ -85,19 +150,12 @@ export class ResultExportationComponent implements OnInit { } updateOutputType(): void { - // Determine if the caller of this component is menu or context menu - // if its menu then we need to export all operators else we need to export only highlighted operators - - let operatorIds: readonly string[]; - if (this.sourceTriggered === "menu") { - operatorIds = this.workflowActionService - .getTexeraGraph() - .getAllOperators() - .map(op => op.operatorID); - } else { - operatorIds = this.workflowActionService.getJointGraphWrapper().getCurrentHighlightedOperatorIDs(); + if (!this.downloadability) { + return; } + const operatorIds = this.getOperatorIdsToCheck(); + if (operatorIds.length === 0) { // No operators highlighted this.isTableOutput = false; @@ -106,13 +164,19 @@ export class ResultExportationComponent implements OnInit { return; } - // Assume they're all table or visualization - // until we find an operator that isn't + if (this.isExportRestricted) { + this.isTableOutput = false; + this.isVisualizationOutput = false; + this.containsBinaryData = false; + return; + } + + // Assume they're all table or visualization until we find an operator that isn't let allTable = true; let allVisualization = true; let anyBinaryData = false; - for (const operatorId of operatorIds) { + for (const operatorId of this.exportableOperatorIds) { const outputTypes = this.workflowResultService.determineOutputTypes(operatorId); if (!outputTypes.hasAnyResult) { continue; @@ -181,4 +245,14 @@ export class ResultExportationComponent implements OnInit { } }); } + + /** + * Getter that returns a comma-separated string of blocking dataset labels. + * Used in the template to display which datasets are preventing export. + * + * @returns String like "Dataset1 (user1@example.com), Dataset2 (user2@example.com)" + */ + get blockingDatasetSummary(): string { + return this.blockingDatasetLabels.join(", "); + } } diff --git a/core/gui/src/app/workspace/service/workflow-result-export/workflow-result-export.service.spec.ts b/core/gui/src/app/workspace/service/workflow-result-export/workflow-result-export.service.spec.ts index 7e45c34de2a..7dea1f097eb 100644 --- a/core/gui/src/app/workspace/service/workflow-result-export/workflow-result-export.service.spec.ts +++ b/core/gui/src/app/workspace/service/workflow-result-export/workflow-result-export.service.spec.ts @@ -35,6 +35,7 @@ import { PaginatedResultEvent } from "../../types/workflow-websocket.interface"; import { ExecutionState } from "../../types/execute-workflow.interface"; import * as JSZip from "jszip"; import { DownloadService } from "src/app/dashboard/service/user/download/download.service"; +import { DatasetService } from "../../../dashboard/service/user/dataset/dataset.service"; import { commonTestProviders } from "../../../common/testing/test-utils"; describe("WorkflowResultExportService", () => { @@ -45,6 +46,7 @@ describe("WorkflowResultExportService", () => { let executeWorkflowServiceSpy: jasmine.SpyObj; let workflowResultServiceSpy: jasmine.SpyObj; let downloadServiceSpy: jasmine.SpyObj; + let datasetServiceSpy: jasmine.SpyObj; let jointGraphWrapperSpy: jasmine.SpyObj; let texeraGraphSpy: jasmine.SpyObj; @@ -60,8 +62,24 @@ describe("WorkflowResultExportService", () => { jointGraphWrapperSpy.getJointOperatorHighlightStream.and.returnValue(of()); jointGraphWrapperSpy.getJointOperatorUnhighlightStream.and.returnValue(of()); - texeraGraphSpy = jasmine.createSpyObj("TexeraGraph", ["getAllOperators"]); + texeraGraphSpy = jasmine.createSpyObj("TexeraGraph", [ + "getAllOperators", + "getOperatorAddStream", + "getOperatorDeleteStream", + "getOperatorPropertyChangeStream", + "getLinkAddStream", + "getLinkDeleteStream", + "getDisabledOperatorsChangedStream", + "getAllLinks", + ]); texeraGraphSpy.getAllOperators.and.returnValue([]); + texeraGraphSpy.getOperatorAddStream.and.returnValue(of()); + texeraGraphSpy.getOperatorDeleteStream.and.returnValue(of()); + texeraGraphSpy.getOperatorPropertyChangeStream.and.returnValue(of()); + texeraGraphSpy.getLinkAddStream.and.returnValue(of()); + texeraGraphSpy.getLinkDeleteStream.and.returnValue(of()); + texeraGraphSpy.getDisabledOperatorsChangedStream.and.returnValue(of()); + texeraGraphSpy.getAllLinks.and.returnValue([]); const wsSpy = jasmine.createSpyObj("WorkflowWebsocketService", ["subscribeToEvent", "send"]); wsSpy.subscribeToEvent.and.returnValue(of()); // Return an empty observable @@ -87,6 +105,9 @@ describe("WorkflowResultExportService", () => { const downloadSpy = jasmine.createSpyObj("DownloadService", ["downloadOperatorsResult"]); downloadSpy.downloadOperatorsResult.and.returnValue(of(new Blob())); + const datasetSpy = jasmine.createSpyObj("DatasetService", ["retrieveAccessibleDatasets"]); + datasetSpy.retrieveAccessibleDatasets.and.returnValue(of([])); + TestBed.configureTestingModule({ imports: [HttpClientTestingModule], providers: [ @@ -97,6 +118,7 @@ describe("WorkflowResultExportService", () => { { provide: ExecuteWorkflowService, useValue: ewSpy }, { provide: WorkflowResultService, useValue: wrSpy }, { provide: DownloadService, useValue: downloadSpy }, + { provide: DatasetService, useValue: datasetSpy }, ...commonTestProviders, ], }); @@ -109,6 +131,7 @@ describe("WorkflowResultExportService", () => { executeWorkflowServiceSpy = TestBed.inject(ExecuteWorkflowService) as jasmine.SpyObj; workflowResultServiceSpy = TestBed.inject(WorkflowResultService) as jasmine.SpyObj; downloadServiceSpy = TestBed.inject(DownloadService) as jasmine.SpyObj; + datasetServiceSpy = TestBed.inject(DatasetService) as jasmine.SpyObj; }); it("should be created", () => { diff --git a/core/gui/src/app/workspace/service/workflow-result-export/workflow-result-export.service.ts b/core/gui/src/app/workspace/service/workflow-result-export/workflow-result-export.service.ts index d2a4b5ae4f2..1abad167ec5 100644 --- a/core/gui/src/app/workspace/service/workflow-result-export/workflow-result-export.service.ts +++ b/core/gui/src/app/workspace/service/workflow-result-export/workflow-result-export.service.ts @@ -26,7 +26,7 @@ import { PaginatedResultEvent, ResultExportResponse } from "../../types/workflow import { NotificationService } from "../../../common/service/notification/notification.service"; import { ExecuteWorkflowService } from "../execute-workflow/execute-workflow.service"; import { ExecutionState, isNotInExecution } from "../../types/execute-workflow.interface"; -import { filter } from "rxjs/operators"; +import { catchError, filter, map, take, tap } from "rxjs/operators"; import { OperatorResultService, WorkflowResultService } from "../workflow-result/workflow-result.service"; import { DownloadService } from "../../../dashboard/service/user/download/download.service"; import { HttpResponse } from "@angular/common/http"; @@ -34,6 +34,64 @@ import { ExportWorkflowJsonResponse } from "../../../dashboard/service/user/down import { DashboardWorkflowComputingUnit } from "../../types/workflow-computing-unit"; import { GuiConfigService } from "../../../common/service/gui-config.service"; +/** + * Result of workflow result downloadability analysis. + * Contains information about which operators are restricted from exporting + * due to non-downloadable dataset dependencies. + */ +export class WorkflowResultDownloadability { + /** + * Map of operator IDs to sets of blocking dataset labels. + * Key: Operator ID + * Value: Set of human-readable dataset labels (e.g., "dataset_name (owner@email.com)") + * that are blocking this operator from being exported + * + * An operator appears in this map if it directly uses or depends on (through data flow) + * one or more datasets that the current user is not allowed to download. + */ + restrictedOperatorMap: Map>; + + constructor(restrictedOperatorMap: Map>) { + this.restrictedOperatorMap = restrictedOperatorMap; + } + + /** + * Filters operator IDs to return only those that are not restricted by dataset access controls. + * + * @param operatorIds Array of operator IDs to filter + * @returns Array of operator IDs that can be exported + */ + getExportableOperatorIds(operatorIds: readonly string[]): string[] { + return operatorIds.filter(operatorId => !this.restrictedOperatorMap.has(operatorId)); + } + + /** + * Filters operator IDs to return only those that are restricted by dataset access controls. + * + * @param operatorIds Array of operator IDs to filter + * @returns Array of operator IDs that are blocked from export + */ + getBlockedOperatorIds(operatorIds: readonly string[]): string[] { + return operatorIds.filter(operatorId => this.restrictedOperatorMap.has(operatorId)); + } + + /** + * Gets the list of dataset labels that are blocking export for the given operators. + * Used to display user-friendly error messages about which datasets are causing restrictions. + * + * @param operatorIds Array of operator IDs to check + * @returns Array of dataset labels (e.g., "Dataset1 (user@example.com)") + */ + getBlockingDatasets(operatorIds: readonly string[]): string[] { + const labels = new Set(); + operatorIds.forEach(operatorId => { + const datasets = this.restrictedOperatorMap.get(operatorId); + datasets?.forEach(label => labels.add(label)); + }); + return Array.from(labels); + } +} + @Injectable({ providedIn: "root", }) @@ -60,36 +118,78 @@ export class WorkflowResultExportService { this.workflowActionService.getJointGraphWrapper().getJointOperatorHighlightStream(), this.workflowActionService.getJointGraphWrapper().getJointOperatorUnhighlightStream() ).subscribe(() => { - // check if there are any results to export on highlighted operators (either paginated or snapshot) - this.hasResultToExportOnHighlightedOperators = - isNotInExecution(this.executeWorkflowService.getExecutionState().state) && - this.workflowActionService - .getJointGraphWrapper() - .getCurrentHighlightedOperatorIDs() - .filter( - operatorId => - this.workflowResultService.hasAnyResult(operatorId) || - this.workflowResultService.getResultService(operatorId)?.getCurrentResultSnapshot() !== undefined - ).length > 0; - - // check if there are any results to export on all operators (either paginated or snapshot) - let staticHasResultToExportOnAllOperators = - isNotInExecution(this.executeWorkflowService.getExecutionState().state) && - this.workflowActionService - .getTexeraGraph() - .getAllOperators() - .map(operator => operator.operatorID) - .filter( - operatorId => - this.workflowResultService.hasAnyResult(operatorId) || - this.workflowResultService.getResultService(operatorId)?.getCurrentResultSnapshot() !== undefined - ).length > 0; - - // Notify subscribers of changes - this.hasResultToExportOnAllOperators.next(staticHasResultToExportOnAllOperators); + this.updateExportAvailabilityFlags(); }); } + /** + * Computes restriction analysis by calling the backend API. + * + * The backend analyzes the workflow to identify operators that are restricted from export + * due to non-downloadable dataset dependencies. The restriction propagates through the + * workflow graph via data flow. + * + * @returns Observable that emits the restriction analysis result + */ + public computeRestrictionAnalysis(): Observable { + const workflowId = this.workflowActionService.getWorkflow().wid; + if (!workflowId) { + return of(new WorkflowResultDownloadability(new Map>())); + } + + return this.downloadService.getWorkflowResultDownloadability(workflowId).pipe( + map(backendResponse => { + // Convert backend format to Map> + const restrictedOperatorMap = new Map>(); + Object.entries(backendResponse).forEach(([operatorId, datasetLabels]) => { + restrictedOperatorMap.set(operatorId, new Set(datasetLabels)); + }); + return new WorkflowResultDownloadability(restrictedOperatorMap); + }), + catchError(() => { + return of(new WorkflowResultDownloadability(new Map>())); + }) + ); + } + + /** + * Updates UI flags that control export button visibility and availability. + * + * Checks execution state and result availability to determine: + * - hasResultToExportOnHighlightedOperators: for context menu export button + * - hasResultToExportOnAllOperators: for top menu export button + * + * Export is only available when execution is idle and operators have results. + */ + private updateExportAvailabilityFlags(): void { + const executionIdle = isNotInExecution(this.executeWorkflowService.getExecutionState().state); + + const highlightedOperators = this.workflowActionService.getJointGraphWrapper().getCurrentHighlightedOperatorIDs(); + + const highlightedHasResult = highlightedOperators.some( + operatorId => + this.workflowResultService.hasAnyResult(operatorId) || + this.workflowResultService.getResultService(operatorId)?.getCurrentResultSnapshot() !== undefined + ); + + this.hasResultToExportOnHighlightedOperators = executionIdle && highlightedHasResult; + + const allOperatorIds = this.workflowActionService + .getTexeraGraph() + .getAllOperators() + .map(operator => operator.operatorID); + + const hasAnyResult = + executionIdle && + allOperatorIds.some( + operatorId => + this.workflowResultService.hasAnyResult(operatorId) || + this.workflowResultService.getResultService(operatorId)?.getCurrentResultSnapshot() !== undefined + ); + + this.hasResultToExportOnAllOperators.next(hasAnyResult); + } + /** * export the workflow execution result according the export type */ @@ -106,6 +206,51 @@ export class WorkflowResultExportService { destination: "dataset" | "local" = "dataset", // default to dataset unit: DashboardWorkflowComputingUnit | null // computing unit for cluster setting ): void { + this.computeRestrictionAnalysis() + .pipe(take(1)) + .subscribe(restrictionResult => + this.performExport( + exportType, + workflowName, + datasetIds, + rowIndex, + columnIndex, + filename, + exportAll, + destination, + unit, + restrictionResult + ) + ); + } + + /** + * Performs the actual export operation with restriction validation. + * + * This method handles the core export logic: + * 1. Validates configuration and computing unit availability + * 2. Determines operator scope (all vs highlighted) + * 3. Applies restriction filtering with user feedback + * 4. Makes the export API call + * 5. Handles response and shows appropriate notifications + * + * Shows error messages if all operators are blocked, warning messages if some are blocked. + * + * @param downloadability Downloadability analysis result containing restriction information + */ + private performExport( + exportType: string, + workflowName: string, + datasetIds: number[], + rowIndex: number, + columnIndex: number, + filename: string, + exportAll: boolean, + destination: "dataset" | "local", + unit: DashboardWorkflowComputingUnit | null, + downloadability: WorkflowResultDownloadability + ): void { + // Validates configuration and computing unit availability if (!this.config.env.exportExecutionResultEnabled) { return; } @@ -120,7 +265,7 @@ export class WorkflowResultExportService { return; } - // gather operator IDs + // Determines operator scope const operatorIds = exportAll ? this.workflowActionService .getTexeraGraph() @@ -128,17 +273,35 @@ export class WorkflowResultExportService { .map(operator => operator.operatorID) : [...this.workflowActionService.getJointGraphWrapper().getCurrentHighlightedOperatorIDs()]; - const operatorArray = operatorIds.map(operatorId => { - return { - id: operatorId, - outputType: this.workflowResultService.determineOutputExtension(operatorId, exportType), - }; - }); - if (operatorIds.length === 0) { return; } + // Applies restriction filtering with user feedback + const exportableOperatorIds = downloadability.getExportableOperatorIds(operatorIds); + + if (exportableOperatorIds.length === 0) { + const datasets = downloadability.getBlockingDatasets(operatorIds); + const suffix = datasets.length > 0 ? `: ${datasets.join(", ")}` : ""; + this.notificationService.error( + `Cannot export result: selection depends on dataset(s) that are not downloadable${suffix}` + ); + return; + } + + if (exportableOperatorIds.length < operatorIds.length) { + const datasets = downloadability.getBlockingDatasets(operatorIds); + const suffix = datasets.length > 0 ? ` (${datasets.join(", ")})` : ""; + this.notificationService.warning( + `Some operators were skipped because their results depend on dataset(s) that are not downloadable${suffix}` + ); + } + + const operatorArray = exportableOperatorIds.map(operatorId => ({ + id: operatorId, + outputType: this.workflowResultService.determineOutputExtension(operatorId, exportType), + })); + // show loading this.notificationService.loading("Exporting..."); diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/FileResolver.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/FileResolver.scala index 6bf138b4c0a..cea7a891739 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/FileResolver.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/FileResolver.scala @@ -75,6 +75,31 @@ object FileResolver { filePath.toUri } + /** + * Parses a dataset file path and extracts its components. + * Expected format: /ownerEmail/datasetName/versionName/fileRelativePath + * + * @param fileName The file path to parse + * @return Some((ownerEmail, datasetName, versionName, fileRelativePath)) if valid, None otherwise + */ + private def parseDatasetFilePath( + fileName: String + ): Option[(String, String, String, Array[String])] = { + val filePath = Paths.get(fileName) + val pathSegments = (0 until filePath.getNameCount).map(filePath.getName(_).toString).toArray + + if (pathSegments.length < 4) { + return None + } + + val ownerEmail = pathSegments(0) + val datasetName = pathSegments(1) + val versionName = pathSegments(2) + val fileRelativePathSegments = pathSegments.drop(3) + + Some((ownerEmail, datasetName, versionName, fileRelativePathSegments)) + } + /** * Attempts to resolve a given fileName to a URI. * @@ -88,14 +113,13 @@ object FileResolver { * @throws FileNotFoundException if the dataset file does not exist or cannot be created */ private def datasetResolveFunc(fileName: String): URI = { - val filePath = Paths.get(fileName) - val pathSegments = (0 until filePath.getNameCount).map(filePath.getName(_).toString).toArray + val (ownerEmail, datasetName, versionName, fileRelativePathSegments) = + parseDatasetFilePath(fileName).getOrElse( + throw new FileNotFoundException(s"Dataset file $fileName not found.") + ) - // extract info from the user-given fileName - val ownerEmail = pathSegments(0) - val datasetName = pathSegments(1) - val versionName = pathSegments(2) - val fileRelativePath = Paths.get(pathSegments.drop(3).head, pathSegments.drop(3).tail: _*) + val fileRelativePath = + Paths.get(fileRelativePathSegments.head, fileRelativePathSegments.tail: _*) // fetch the dataset and version from DB to get dataset ID and version hash val (dataset, datasetVersion) = @@ -168,4 +192,20 @@ object FileResolver { case _: Exception => false // Invalid URI format } } + + /** + * Parses a dataset file path to extract owner email and dataset name. + * Expected format: /ownerEmail/datasetName/versionName/fileRelativePath + * + * @param path The file path from operator properties + * @return Some((ownerEmail, datasetName)) if path is valid, None otherwise + */ + def parseDatasetOwnerAndName(path: String): Option[(String, String)] = { + if (path == null) { + return None + } + parseDatasetFilePath(path).map { + case (ownerEmail, datasetName, _, _) => (ownerEmail, datasetName) + } + } }