Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
9854962
implementation
Sep 26, 2025
69dcee0
remove
Sep 26, 2025
e18b973
Merge branch 'main' into feat/prevent-export-result
aglinxinyuan Sep 26, 2025
99bf2a2
fix test
Sep 26, 2025
7284e33
format
Sep 26, 2025
1093e2d
Merge branch 'main' into feat/prevent-export-result
seongjinyoon Sep 26, 2025
34219c3
add comment
Sep 28, 2025
6e4a3ca
Merge branch 'main' into feat/prevent-export-result
seongjinyoon Sep 29, 2025
c6e4b4b
Merge branch 'main' into feat/prevent-export-result
seongjinyoon Sep 29, 2025
e8c0415
Merge branch 'main' into feat/prevent-export-result
seongjinyoon Sep 30, 2025
145121a
add comment
Sep 30, 2025
895f88b
add comment
Sep 30, 2025
2f3eff1
Merge branch 'main' into feat/prevent-export-result
seongjinyoon Sep 30, 2025
b926ccc
rename funtions + remove dead code
Oct 1, 2025
089cdec
Merge branch 'main' into feat/prevent-export-result
seongjinyoon Oct 1, 2025
0e6c624
fix
Oct 1, 2025
d3b5dfa
fix
Oct 1, 2025
46ce163
fix
Oct 1, 2025
5cdfc55
remove RestrictedDataset
Oct 1, 2025
227f381
change location parseDatasetPath
Oct 1, 2025
3afbd41
single query to check whether each dataset is downloadable or not
Oct 1, 2025
9975628
Merge branch 'main' into feat/prevent-export-result
seongjinyoon Oct 1, 2025
91ed30c
removed tolower
Oct 1, 2025
e91bb18
reuse code
Oct 1, 2025
90bd3c2
Merge branch 'main' into feat/prevent-export-result
seongjinyoon Oct 2, 2025
4a5d5d1
Merge branch 'apache:main' into feat/prevent-export-result
seongjinyoon Oct 3, 2025
9e07a00
fix
Oct 5, 2025
3bbcfb4
Merge branch 'main' into feat/prevent-export-result
seongjinyoon Oct 5, 2025
c2a42d8
fix
Oct 6, 2025
a10c09b
Merge branch 'main' into feat/prevent-export-result
seongjinyoon Oct 6, 2025
83a3d2b
Merge branch 'main' into feat/prevent-export-result
seongjinyoon Oct 6, 2025
e5730f2
Merge branch 'main' into feat/prevent-export-result
seongjinyoon Oct 8, 2025
48d3e09
fix
Oct 10, 2025
9881f81
Merge branch 'main' into feat/prevent-export-result
seongjinyoon Oct 10, 2025
8d6b934
Merge branch 'main' into feat/prevent-export-result
bobbai00 Oct 10, 2025
124035b
fix
Oct 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,26 @@

package edu.uci.ics.texera.web.resource.dashboard.user.workflow

import edu.uci.ics.amber.core.storage.{DocumentFactory, VFSResourceType, VFSURIFactory}
import edu.uci.ics.amber.core.storage.{
DocumentFactory,
FileResolver,
VFSResourceType,
VFSURIFactory
}
import edu.uci.ics.amber.core.tuple.Tuple
import edu.uci.ics.amber.core.virtualidentity._
import edu.uci.ics.amber.core.workflow.{GlobalPortIdentity, PortIdentity}
import edu.uci.ics.amber.engine.architecture.logreplay.{ReplayDestination, ReplayLogRecord}
import edu.uci.ics.amber.engine.common.Utils.{maptoStatusCode, stringToAggregatedState}
import edu.uci.ics.amber.engine.common.storage.SequentialRecordStorage
import edu.uci.ics.amber.util.serde.GlobalPortIdentitySerde.SerdeOps
import edu.uci.ics.amber.util.JSONUtils.objectMapper
import edu.uci.ics.texera.auth.SessionUser
import edu.uci.ics.texera.dao.SqlServer
import edu.uci.ics.texera.dao.SqlServer.withTransaction
import edu.uci.ics.texera.dao.jooq.generated.Tables._
import edu.uci.ics.texera.dao.jooq.generated.tables.daos.WorkflowExecutionsDao
import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.WorkflowExecutions
import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{User => UserPojo, WorkflowExecutions}
import edu.uci.ics.texera.web.model.http.request.result.ResultExportRequest
import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource._
import edu.uci.ics.texera.web.service.{ExecutionsMetadataPersistService, ResultExportService}
Expand Down Expand Up @@ -100,6 +106,131 @@ object WorkflowExecutionsResource {
}
}

/**
* Computes which operators in a workflow are restricted due to dataset access controls.
*
* This function:
* 1. Parses the workflow JSON to find all operators and their dataset dependencies
* 2. Identifies operators using non-downloadable datasets that the user doesn't own
* 3. Uses BFS to propagate restrictions through the workflow graph
* 4. Returns a map of operator IDs to the restricted datasets they depend on
*
* @param wid The workflow ID
* @param currentUser The current user making the export request
* @return Map of operator ID -> Set of (ownerEmail, datasetName) tuples that block its export
*/
private def getNonDownloadableOperatorMap(
wid: Int,
currentUser: UserPojo
): Map[String, Set[(String, String)]] = {
// Load workflow
val workflowRecord = context
.select(WORKFLOW.CONTENT)
.from(WORKFLOW)
.where(WORKFLOW.WID.eq(wid).and(WORKFLOW.CONTENT.isNotNull).and(WORKFLOW.CONTENT.ne("")))
.fetchOne()

if (workflowRecord == null) {
return Map.empty
}

val content = workflowRecord.value1()

val rootNode =
try {
objectMapper.readTree(content)
} catch {
case _: Exception => return Map.empty
}

val operatorsNode = rootNode.path("operators")
val linksNode = rootNode.path("links")

// Collect all datasets used by operators (that user doesn't own)
val operatorDatasets = mutable.Map.empty[String, (String, String)]

operatorsNode.elements().asScala.foreach { operatorNode =>
val operatorId = operatorNode.path("operatorID").asText("")
if (operatorId.nonEmpty) {
val fileNameNode = operatorNode.path("operatorProperties").path("fileName")
if (fileNameNode.isTextual) {
FileResolver.parseDatasetOwnerAndName(fileNameNode.asText()).foreach {
case (ownerEmail, datasetName) =>
val isOwner =
Option(currentUser.getEmail)
.exists(_.equalsIgnoreCase(ownerEmail))
if (!isOwner) {
operatorDatasets.update(operatorId, (ownerEmail, datasetName))
}
}
}
}
}

if (operatorDatasets.isEmpty) {
return Map.empty
}

// Query all datasets
val uniqueDatasets = operatorDatasets.values.toSet
val conditions = uniqueDatasets.map {
case (ownerEmail, datasetName) =>
USER.EMAIL.equalIgnoreCase(ownerEmail).and(DATASET.NAME.equalIgnoreCase(datasetName))
}

val nonDownloadableDatasets = context
.select(USER.EMAIL, DATASET.NAME)
.from(DATASET)
.join(USER)
.on(DATASET.OWNER_UID.eq(USER.UID))
.where(conditions.reduce((a, b) => a.or(b)))
.and(DATASET.IS_DOWNLOADABLE.eq(false))
.fetch()
.asScala
.map(record => (record.value1(), record.value2()))
.toSet

// Filter to only operators with non-downloadable datasets
val restrictedSourceMap = operatorDatasets.filter {
case (_, dataset) =>
nonDownloadableDatasets.contains(dataset)
}

// Build dependency graph
val adjacency = mutable.Map.empty[String, mutable.ListBuffer[String]]

linksNode.elements().asScala.foreach { linkNode =>
val sourceId = linkNode.path("source").path("operatorID").asText("")
val targetId = linkNode.path("target").path("operatorID").asText("")
if (sourceId.nonEmpty && targetId.nonEmpty) {
adjacency.getOrElseUpdate(sourceId, mutable.ListBuffer.empty[String]) += targetId
}
}

// BFS to propagate restrictions
val restrictionMap = mutable.Map.empty[String, Set[(String, String)]]
val queue = mutable.Queue.empty[(String, Set[(String, String)])]

restrictedSourceMap.foreach {
case (operatorId, dataset) =>
queue.enqueue(operatorId -> Set(dataset))
}

while (queue.nonEmpty) {
val (currentOperatorId, datasetSet) = queue.dequeue()
val existing = restrictionMap.getOrElse(currentOperatorId, Set.empty)
val merged = existing ++ datasetSet
if (merged != existing) {
restrictionMap.update(currentOperatorId, merged)
adjacency
.get(currentOperatorId)
.foreach(_.foreach(nextOperator => queue.enqueue(nextOperator -> merged)))
}
}

restrictionMap.toMap
}

def insertOperatorPortResultUri(
eid: ExecutionIdentity,
globalPortId: GlobalPortIdentity,
Expand Down Expand Up @@ -660,6 +791,37 @@ class WorkflowExecutionsResource {
executionsDao.update(execution)
}

/**
* Returns which operators are restricted from export due to dataset access controls.
* This endpoint allows the frontend to check restrictions before attempting export.
*
* @param wid The workflow ID to check
* @param user The authenticated user
* @return JSON map of operator ID -> array of {ownerEmail, datasetName} that block its export
*/
@GET
@Path("/{wid}/result/downloadability")
@Produces(Array(MediaType.APPLICATION_JSON))
@RolesAllowed(Array("REGULAR", "ADMIN"))
def getWorkflowResultDownloadability(
@PathParam("wid") wid: Integer,
@Auth user: SessionUser
): Response = {
validateUserCanAccessWorkflow(user.getUser.getUid, wid)

val datasetRestrictions = getNonDownloadableOperatorMap(wid, user.user)

// Convert to frontend-friendly format: Map[operatorId -> Array[datasetLabel]]
val restrictionMap = datasetRestrictions.map {
case (operatorId, datasets) =>
operatorId -> datasets.map {
case (ownerEmail, datasetName) => s"$datasetName ($ownerEmail)"
}.toArray
}.asJava

Response.ok(restrictionMap).build()
}

@POST
@Path("/result/export")
@RolesAllowed(Array("REGULAR", "ADMIN"))
Expand All @@ -675,6 +837,35 @@ class WorkflowExecutionsResource {
.entity(Map("error" -> "No operator selected").asJava)
.build()

// Get ALL non-downloadable in workflow
val datasetRestrictions = getNonDownloadableOperatorMap(request.workflowId, user.user)
// Filter to only user's selection
val restrictedOperators = request.operators.filter(op => datasetRestrictions.contains(op.id))
// Check if any selected operator is restricted
if (restrictedOperators.nonEmpty) {
val restrictedDatasets = restrictedOperators.flatMap { op =>
datasetRestrictions(op.id).map {
case (ownerEmail, datasetName) =>
Map(
"operatorId" -> op.id,
"ownerEmail" -> ownerEmail,
"datasetName" -> datasetName
).asJava
}
}

return Response
.status(Response.Status.FORBIDDEN)
.`type`(MediaType.APPLICATION_JSON)
.entity(
Map(
"error" -> "Export blocked due to dataset restrictions",
"restrictedDatasets" -> restrictedDatasets.asJava
).asJava
)
.build()
}

try {
request.destination match {
case "local" =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import { DashboardWorkflowComputingUnit } from "../../../../workspace/types/work
var contentDisposition = require("content-disposition");

export const EXPORT_BASE_URL = "result/export";
export const DOWNLOADABILITY_BASE_URL = "result/downloadability";

interface DownloadableItem {
blob: Blob;
Expand All @@ -44,6 +45,10 @@ export interface ExportWorkflowJsonResponse {
message: string;
}

export interface WorkflowResultDownloadabilityResponse {
[operatorId: string]: string[]; // operatorId -> array of dataset labels blocking export
}

@Injectable({
providedIn: "root",
})
Expand Down Expand Up @@ -115,6 +120,18 @@ export class DownloadService {
);
}

/**
* Retrieves workflow result downloadability information from the backend.
* Returns a map of operator IDs to arrays of dataset labels that block their export.
*
* @param workflowId The workflow ID to check
* @returns Observable of downloadability information
*/
public getWorkflowResultDownloadability(workflowId: number): Observable<WorkflowResultDownloadabilityResponse> {
const urlPath = `${WORKFLOW_EXECUTIONS_API_BASE_URL}/${workflowId}/${DOWNLOADABILITY_BASE_URL}`;
return this.http.get<WorkflowResultDownloadabilityResponse>(urlPath);
}

/**
* Export the workflow result. If destination = "local", the server returns a BLOB (file).
* Otherwise, it returns JSON with a status message.
Expand Down
Loading
Loading