diff --git a/spark-ui/src/reducers/SqlReducer.ts b/spark-ui/src/reducers/SqlReducer.ts index 5fb040f..9e571d5 100644 --- a/spark-ui/src/reducers/SqlReducer.ts +++ b/spark-ui/src/reducers/SqlReducer.ts @@ -180,6 +180,7 @@ function calculateSql( icebergCommit: IcebergCommitsInfo | undefined, ): EnrichedSparkSQL { const enrichedSql = sql as EnrichedSparkSQL; + const graph = generateGraph(enrichedSql.edges, enrichedSql.nodes); const originalNumOfNodes = enrichedSql.nodes.length; const typeEnrichedNodes = enrichedSql.nodes.map((node) => { const type = calcNodeType(node.nodeName); @@ -242,7 +243,7 @@ function calculateSql( ); return { ...node, - metrics: calcNodeMetrics(node.type, node.metrics), + metrics: updateNodeMetrics(node, node.metrics, graph, onlyGraphNodes), exchangeMetrics: exchangeMetrics, exchangeBroadcastDuration: exchangeBroadcastDuration, }; @@ -399,6 +400,8 @@ export function updateSqlNodeMetrics( const notEffectedSqls = currentStore.sqls.filter((sql) => sql.id !== sqlId); const runningSql = runningSqls[0]; + // TODO: cache the graph + const graph = generateGraph(runningSql.edges, runningSql.nodes); const nodes = runningSql.nodes.map((node) => { const matchedMetricsNodes = sqlMetrics.filter( (nodeMetrics) => nodeMetrics.id === node.nodeId, @@ -407,7 +410,7 @@ export function updateSqlNodeMetrics( return node; } - const metrics = calcNodeMetrics(node.type, matchedMetricsNodes[0].metrics); + const metrics = updateNodeMetrics(node, matchedMetricsNodes[0].metrics, graph, runningSql.nodes); const exchangeMetrics = calcExchangeMetrics(node.nodeName, metrics); // TODO: maybe do a smarter replacement, or send only the initialized metrics @@ -485,3 +488,78 @@ function calcBroadcastExchangeDuration( } return undefined; } + +function updateNodeMetrics( + node: EnrichedSqlNode, + metrics: EnrichedSqlMetric[], + graph: Graph, + allNodes: EnrichedSqlNode[], +): EnrichedSqlMetric[] { + + const updatedOriginalMetrics = calcNodeMetrics(node.type, metrics); + const filterRatio = addFilterRatioMetric(node, updatedOriginalMetrics, graph, allNodes); + + return [ + ...updatedOriginalMetrics, + ...(filterRatio !== null + ? [{ name: "Filter Ratio", value: filterRatio }] + : []), + ]; +} + +function addFilterRatioMetric( + node: EnrichedSqlNode, + updatedMetrics: EnrichedSqlMetric[], + graph: Graph, + allNodes: EnrichedSqlNode[], +): string | null { + + if (node.nodeName.includes("Filter") || node.nodeName.includes("Join")) { + const inputEdges = graph.inEdges(node.nodeId.toString()); + + if (!inputEdges || inputEdges.length === 0) { + return null; + } + + let totalInputRows = 0; + + inputEdges.forEach((edge) => { + const inputNode = allNodes.find((n) => n.nodeId.toString() === edge.v); + if (inputNode) { + const inputRowsMetric = inputNode.metrics?.find((m) => + m.name.includes("rows") + ); + if (inputRowsMetric) { + const inputRows = parseFloat(inputRowsMetric.value.replace(/,/g, "")); + if (!isNaN(inputRows)) { + totalInputRows += inputRows; + } + } + } + }); + + if (totalInputRows === 0) { + return null; + } + + const outputRowsMetric = updatedMetrics.find((m) => m.name.includes("rows")); + if (!outputRowsMetric) { + return null; + } + + const outputRows = parseFloat(outputRowsMetric.value.replace(/,/g, "")); + if (isNaN(outputRows)) { + return null; + } + + const filteredRowsPercentage = new Intl.NumberFormat('default', { + style: 'percent', + minimumFractionDigits: 2, + maximumFractionDigits: 2, + }).format((totalInputRows - outputRows) / totalInputRows); + + return filteredRowsPercentage; + } + return null; +} +