From b3ea4c9950d3c83f0d6384a9326d7d306a9803ba Mon Sep 17 00:00:00 2001
From: Andrei Nekrasov <andreinek7@gmail.com>
Date: Mon, 25 Nov 2024 22:52:29 +0200
Subject: [PATCH 1/5] adding dynamic filter ratio metric to filter/join nodes

---
 spark-ui/src/reducers/SqlReducer.ts | 76 ++++++++++++++++++++++++++++-
 1 file changed, 74 insertions(+), 2 deletions(-)

diff --git a/spark-ui/src/reducers/SqlReducer.ts b/spark-ui/src/reducers/SqlReducer.ts
index 5fb040f..b5f1102 100644
--- a/spark-ui/src/reducers/SqlReducer.ts
+++ b/spark-ui/src/reducers/SqlReducer.ts
@@ -180,6 +180,7 @@ function calculateSql(
   icebergCommit: IcebergCommitsInfo | undefined,
 ): EnrichedSparkSQL {
   const enrichedSql = sql as EnrichedSparkSQL;
+  const graph = generateGraph(enrichedSql.edges, enrichedSql.nodes);
   const originalNumOfNodes = enrichedSql.nodes.length;
   const typeEnrichedNodes = enrichedSql.nodes.map((node) => {
     const type = calcNodeType(node.nodeName);
@@ -236,13 +237,14 @@ function calculateSql(
 
   const metricEnrichedNodes: EnrichedSqlNode[] = onlyGraphNodes.map((node) => {
     const exchangeMetrics = calcExchangeMetrics(node.nodeName, node.metrics);
+    const metrics_orig = calcNodeMetrics(node.type, node.metrics)
     const exchangeBroadcastDuration = calcBroadcastExchangeDuration(
       node.nodeName,
       node.metrics,
     );
     return {
       ...node,
-      metrics: calcNodeMetrics(node.type, node.metrics),
+      metrics: updateNodeMetrics(node, metrics_orig, graph, onlyGraphNodes),
       exchangeMetrics: exchangeMetrics,
       exchangeBroadcastDuration: exchangeBroadcastDuration,
     };
@@ -399,6 +401,7 @@ export function updateSqlNodeMetrics(
 
   const notEffectedSqls = currentStore.sqls.filter((sql) => sql.id !== sqlId);
   const runningSql = runningSqls[0];
+  const graph = generateGraph(runningSql.edges, runningSql.nodes);
   const nodes = runningSql.nodes.map((node) => {
     const matchedMetricsNodes = sqlMetrics.filter(
       (nodeMetrics) => nodeMetrics.id === node.nodeId,
@@ -407,7 +410,8 @@ export function updateSqlNodeMetrics(
       return node;
     }
 
-    const metrics = calcNodeMetrics(node.type, matchedMetricsNodes[0].metrics);
+    const metrics_orig = calcNodeMetrics(node.type, matchedMetricsNodes[0].metrics);
+    const metrics = updateNodeMetrics(node, metrics_orig, graph, runningSql.nodes);
     const exchangeMetrics = calcExchangeMetrics(node.nodeName, metrics);
 
     // TODO: maybe do a smarter replacement, or send only the initialized metrics
@@ -444,9 +448,11 @@ export function updateSqlNodeMetrics(
   const notEffectedSqlsBefore = currentStore.sqls.filter(
     (sql) => sql.id < sqlId,
   );
+
   const notEffectedSqlsAfter = currentStore.sqls.filter(
     (sql) => sql.id > sqlId,
   );
+  
   return {
     ...currentStore,
     sqls: [...notEffectedSqlsBefore, updatedSql, ...notEffectedSqlsAfter],
@@ -485,3 +491,69 @@ function calcBroadcastExchangeDuration(
   }
   return undefined;
 }
+
+function updateNodeMetrics(
+  node: EnrichedSqlNode,
+  updatedMetrics: EnrichedSqlMetric[],
+  graph: Graph,
+  allNodes: EnrichedSqlNode[],
+): EnrichedSqlMetric[] {
+  // Add custom logic for filter_ratio
+  // Implemented for happy path range followed by filter
+  // 1. Missing predecessor or ambigious - not computable
+  // 2. Multiple Predecessors - aggregate row counts of all predecessors
+  // 3. Filter after non row based nodes - ?
+  // 4. Filters with dependecies on broadcast variables
+  if (node.nodeName.includes("Filter") || node.nodeName.includes("Join")) {
+    const inputEdges = graph.inEdges(node.nodeId.toString());
+
+    if (!inputEdges || inputEdges.length === 0) {
+      return updatedMetrics;
+    }
+    
+    // 2. Multiple Predecessors - aggregate row counts of all predecessors
+    let totalInputRows = 0;
+    let validPredecessors = 0;
+
+    inputEdges.forEach((edge) => {
+      const inputNode = allNodes.find((n) => n.nodeId.toString() === edge.v);
+      if (inputNode) {
+        const inputRowsMetric = inputNode.metrics?.find((m) =>
+          m.name.includes("rows")
+        );
+        if (inputRowsMetric) {
+          const inputRows = parseFloat(inputRowsMetric.value.replace(/,/g, ""));
+          if (!isNaN(inputRows)) {
+            totalInputRows += inputRows;
+            validPredecessors++;
+          }
+        }
+      }
+    });
+
+    if (validPredecessors === 0) {
+      return updatedMetrics;
+    }
+
+     const outputRowsMetric = updatedMetrics.find((m) => m.name === "rows");
+     if (!outputRowsMetric) {
+       return updatedMetrics;
+     }
+
+    const outputRows = parseFloat(outputRowsMetric.value.replace(/,/g, ""));
+    if (isNaN(outputRows)) {
+      return updatedMetrics;
+    }
+
+    const filterRatio = ((totalInputRows - outputRows) / totalInputRows) * 100;
+    if(filterRatio <= 100){
+      updatedMetrics.push({
+        name: "filter_ratio",
+        value: filterRatio.toFixed(2).concat("%"),
+      });
+    }
+  }
+
+  return updatedMetrics;
+
+}

From 2a70cbd4ea376b4da0e8eab6054f3ebc103489b3 Mon Sep 17 00:00:00 2001
From: Andrei Nekrasov <andreinek7@gmail.com>
Date: Mon, 25 Nov 2024 23:06:33 +0200
Subject: [PATCH 2/5] adding dynamic filter ratio metric to filter/join nodes

---
 spark-ui/src/reducers/SqlReducer.ts | 32 +++++++++++++++--------------
 1 file changed, 17 insertions(+), 15 deletions(-)

diff --git a/spark-ui/src/reducers/SqlReducer.ts b/spark-ui/src/reducers/SqlReducer.ts
index b5f1102..5fd9642 100644
--- a/spark-ui/src/reducers/SqlReducer.ts
+++ b/spark-ui/src/reducers/SqlReducer.ts
@@ -237,14 +237,13 @@ function calculateSql(
 
   const metricEnrichedNodes: EnrichedSqlNode[] = onlyGraphNodes.map((node) => {
     const exchangeMetrics = calcExchangeMetrics(node.nodeName, node.metrics);
-    const metrics_orig = calcNodeMetrics(node.type, node.metrics)
     const exchangeBroadcastDuration = calcBroadcastExchangeDuration(
       node.nodeName,
       node.metrics,
     );
     return {
       ...node,
-      metrics: updateNodeMetrics(node, metrics_orig, graph, onlyGraphNodes),
+      metrics: updateNodeMetrics(node, node.metrics, graph, onlyGraphNodes),
       exchangeMetrics: exchangeMetrics,
       exchangeBroadcastDuration: exchangeBroadcastDuration,
     };
@@ -410,8 +409,7 @@ export function updateSqlNodeMetrics(
       return node;
     }
 
-    const metrics_orig = calcNodeMetrics(node.type, matchedMetricsNodes[0].metrics);
-    const metrics = updateNodeMetrics(node, metrics_orig, graph, runningSql.nodes);
+    const metrics = updateNodeMetrics(node, matchedMetricsNodes[0].metrics, graph, runningSql.nodes);
     const exchangeMetrics = calcExchangeMetrics(node.nodeName, metrics);
 
     // TODO: maybe do a smarter replacement, or send only the initialized metrics
@@ -448,11 +446,9 @@ export function updateSqlNodeMetrics(
   const notEffectedSqlsBefore = currentStore.sqls.filter(
     (sql) => sql.id < sqlId,
   );
-
   const notEffectedSqlsAfter = currentStore.sqls.filter(
     (sql) => sql.id > sqlId,
   );
-  
   return {
     ...currentStore,
     sqls: [...notEffectedSqlsBefore, updatedSql, ...notEffectedSqlsAfter],
@@ -493,17 +489,24 @@ function calcBroadcastExchangeDuration(
 }
 
 function updateNodeMetrics(
+  node: EnrichedSqlNode,
+  metrics: EnrichedSqlMetric[],
+  graph: Graph,
+  allNodes: EnrichedSqlNode[],
+): EnrichedSqlMetric[] {
+
+  const updatedMetrics = calcNodeMetrics(node.type, metrics);
+  addFilterRatioMetric(node, updatedMetrics, graph, allNodes);
+  return updatedMetrics;
+}
+
+function addFilterRatioMetric(
   node: EnrichedSqlNode,
   updatedMetrics: EnrichedSqlMetric[],
   graph: Graph,
   allNodes: EnrichedSqlNode[],
 ): EnrichedSqlMetric[] {
-  // Add custom logic for filter_ratio
-  // Implemented for happy path range followed by filter
-  // 1. Missing predecessor or ambigious - not computable
-  // 2. Multiple Predecessors - aggregate row counts of all predecessors
-  // 3. Filter after non row based nodes - ?
-  // 4. Filters with dependecies on broadcast variables
+
   if (node.nodeName.includes("Filter") || node.nodeName.includes("Join")) {
     const inputEdges = graph.inEdges(node.nodeId.toString());
 
@@ -511,7 +514,6 @@ function updateNodeMetrics(
       return updatedMetrics;
     }
     
-    // 2. Multiple Predecessors - aggregate row counts of all predecessors
     let totalInputRows = 0;
     let validPredecessors = 0;
 
@@ -535,7 +537,7 @@ function updateNodeMetrics(
       return updatedMetrics;
     }
 
-     const outputRowsMetric = updatedMetrics.find((m) => m.name === "rows");
+     const outputRowsMetric = updatedMetrics.find((m) => m.name.includes("rows"));
      if (!outputRowsMetric) {
        return updatedMetrics;
      }
@@ -555,5 +557,5 @@ function updateNodeMetrics(
   }
 
   return updatedMetrics;
-
 }
+

From ceafaadb5b8bbdcc9297e4b620d83a07521550d2 Mon Sep 17 00:00:00 2001
From: Andrei Nekrasov <andreinek7@gmail.com>
Date: Mon, 25 Nov 2024 23:10:48 +0200
Subject: [PATCH 3/5] adding dynamic filter ratio metric to filter/join nodes

---
 spark-ui/src/reducers/SqlReducer.ts | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/spark-ui/src/reducers/SqlReducer.ts b/spark-ui/src/reducers/SqlReducer.ts
index 5fd9642..e04f15c 100644
--- a/spark-ui/src/reducers/SqlReducer.ts
+++ b/spark-ui/src/reducers/SqlReducer.ts
@@ -537,10 +537,10 @@ function addFilterRatioMetric(
       return updatedMetrics;
     }
 
-     const outputRowsMetric = updatedMetrics.find((m) => m.name.includes("rows"));
-     if (!outputRowsMetric) {
-       return updatedMetrics;
-     }
+    const outputRowsMetric = updatedMetrics.find((m) => m.name.includes("rows"));
+    if (!outputRowsMetric) {
+      return updatedMetrics;
+    }
 
     const outputRows = parseFloat(outputRowsMetric.value.replace(/,/g, ""));
     if (isNaN(outputRows)) {

From a2dafcf7d31217af48df8c43a72e8253158d5a4c Mon Sep 17 00:00:00 2001
From: Andrei Nekrasov <andreinek7@gmail.com>
Date: Fri, 29 Nov 2024 18:04:46 +0200
Subject: [PATCH 4/5] updates after review

---
 spark-ui/src/reducers/SqlReducer.ts | 43 +++++++++++++++--------------
 1 file changed, 23 insertions(+), 20 deletions(-)

diff --git a/spark-ui/src/reducers/SqlReducer.ts b/spark-ui/src/reducers/SqlReducer.ts
index e04f15c..6b4afb0 100644
--- a/spark-ui/src/reducers/SqlReducer.ts
+++ b/spark-ui/src/reducers/SqlReducer.ts
@@ -495,9 +495,15 @@ function updateNodeMetrics(
   allNodes: EnrichedSqlNode[],
 ): EnrichedSqlMetric[] {
 
-  const updatedMetrics = calcNodeMetrics(node.type, metrics);
-  addFilterRatioMetric(node, updatedMetrics, graph, allNodes);
-  return updatedMetrics;
+  const updatedOriginalMetrics = calcNodeMetrics(node.type, metrics);
+  const filterRatio = addFilterRatioMetric(node, updatedOriginalMetrics, graph, allNodes);
+
+  return [
+    ...updatedOriginalMetrics,
+    ...(filterRatio !== null
+      ? [{ name: "Filter Ratio", value: filterRatio }]
+      : []),
+  ];
 }
 
 function addFilterRatioMetric(
@@ -505,17 +511,16 @@ function addFilterRatioMetric(
   updatedMetrics: EnrichedSqlMetric[],
   graph: Graph,
   allNodes: EnrichedSqlNode[],
-): EnrichedSqlMetric[] {
+): string | null {
 
   if (node.nodeName.includes("Filter") || node.nodeName.includes("Join")) {
     const inputEdges = graph.inEdges(node.nodeId.toString());
 
     if (!inputEdges || inputEdges.length === 0) {
-      return updatedMetrics;
+      return null;
     }
     
     let totalInputRows = 0;
-    let validPredecessors = 0;
 
     inputEdges.forEach((edge) => {
       const inputNode = allNodes.find((n) => n.nodeId.toString() === edge.v);
@@ -527,35 +532,33 @@ function addFilterRatioMetric(
           const inputRows = parseFloat(inputRowsMetric.value.replace(/,/g, ""));
           if (!isNaN(inputRows)) {
             totalInputRows += inputRows;
-            validPredecessors++;
           }
         }
       }
     });
 
-    if (validPredecessors === 0) {
-      return updatedMetrics;
+    if (totalInputRows === 0) {
+      return null;
     }
 
     const outputRowsMetric = updatedMetrics.find((m) => m.name.includes("rows"));
     if (!outputRowsMetric) {
-      return updatedMetrics;
+      return null;
     }
 
     const outputRows = parseFloat(outputRowsMetric.value.replace(/,/g, ""));
     if (isNaN(outputRows)) {
-      return updatedMetrics;
+      return null;
     }
 
-    const filterRatio = ((totalInputRows - outputRows) / totalInputRows) * 100;
-    if(filterRatio <= 100){
-      updatedMetrics.push({
-        name: "filter_ratio",
-        value: filterRatio.toFixed(2).concat("%"),
-      });
-    }
+    const filteredRowsPercentage = new Intl.NumberFormat('default', {
+      style: 'percent',
+      minimumFractionDigits: 2,
+      maximumFractionDigits: 2,
+    }).format((totalInputRows - outputRows) / totalInputRows);
+    
+    return filteredRowsPercentage;
   }
-
-  return updatedMetrics;
+  return null;
 }
 

From be8548a18ba20f64f6ffaa94aef2ac3c3ee88dea Mon Sep 17 00:00:00 2001
From: Andrei Nekrasov <andreinek7@gmail.com>
Date: Fri, 29 Nov 2024 18:20:29 +0200
Subject: [PATCH 5/5] add todo comment

---
 spark-ui/src/reducers/SqlReducer.ts | 1 +
 1 file changed, 1 insertion(+)

diff --git a/spark-ui/src/reducers/SqlReducer.ts b/spark-ui/src/reducers/SqlReducer.ts
index 6b4afb0..9e571d5 100644
--- a/spark-ui/src/reducers/SqlReducer.ts
+++ b/spark-ui/src/reducers/SqlReducer.ts
@@ -400,6 +400,7 @@ export function updateSqlNodeMetrics(
 
   const notEffectedSqls = currentStore.sqls.filter((sql) => sql.id !== sqlId);
   const runningSql = runningSqls[0];
+  // TODO: cache the graph
   const graph = generateGraph(runningSql.edges, runningSql.nodes);
   const nodes = runningSql.nodes.map((node) => {
     const matchedMetricsNodes = sqlMetrics.filter(