Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add filter ratio dynamic metric #24

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 80 additions & 2 deletions spark-ui/src/reducers/SqlReducer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ function calculateSql(
icebergCommit: IcebergCommitsInfo | undefined,
): EnrichedSparkSQL {
const enrichedSql = sql as EnrichedSparkSQL;
const graph = generateGraph(enrichedSql.edges, enrichedSql.nodes);
const originalNumOfNodes = enrichedSql.nodes.length;
const typeEnrichedNodes = enrichedSql.nodes.map((node) => {
const type = calcNodeType(node.nodeName);
Expand Down Expand Up @@ -242,7 +243,7 @@ function calculateSql(
);
return {
...node,
metrics: calcNodeMetrics(node.type, node.metrics),
metrics: updateNodeMetrics(node, node.metrics, graph, onlyGraphNodes),
exchangeMetrics: exchangeMetrics,
exchangeBroadcastDuration: exchangeBroadcastDuration,
};
Expand Down Expand Up @@ -399,6 +400,8 @@ export function updateSqlNodeMetrics(

const notEffectedSqls = currentStore.sqls.filter((sql) => sql.id !== sqlId);
const runningSql = runningSqls[0];
// TODO: cache the graph
const graph = generateGraph(runningSql.edges, runningSql.nodes);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means we are generating the graph for every metric update cycle. This is not ideal.
We should cache the graph if there is no change.
Please at least add a todo comment to cache the graph.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I thought about it, but my bad to forget the todo.
I think it should work with map with sqlId key easily.
But detecting actual changes is something different. We can do a check on node metrics before and after or compare their hashes. What do you think?

const nodes = runningSql.nodes.map((node) => {
const matchedMetricsNodes = sqlMetrics.filter(
(nodeMetrics) => nodeMetrics.id === node.nodeId,
Expand All @@ -407,7 +410,7 @@ export function updateSqlNodeMetrics(
return node;
}

const metrics = calcNodeMetrics(node.type, matchedMetricsNodes[0].metrics);
const metrics = updateNodeMetrics(node, matchedMetricsNodes[0].metrics, graph, runningSql.nodes);
const exchangeMetrics = calcExchangeMetrics(node.nodeName, metrics);

// TODO: maybe do a smarter replacement, or send only the initialized metrics
Expand Down Expand Up @@ -485,3 +488,78 @@ function calcBroadcastExchangeDuration(
}
return undefined;
}

function updateNodeMetrics(
node: EnrichedSqlNode,
metrics: EnrichedSqlMetric[],
graph: Graph,
allNodes: EnrichedSqlNode[],
): EnrichedSqlMetric[] {

const updatedOriginalMetrics = calcNodeMetrics(node.type, metrics);
const filterRatio = addFilterRatioMetric(node, updatedOriginalMetrics, graph, allNodes);

return [
...updatedOriginalMetrics,
...(filterRatio !== null
? [{ name: "Filter Ratio", value: filterRatio }]
: []),
];
}

function addFilterRatioMetric(
node: EnrichedSqlNode,
updatedMetrics: EnrichedSqlMetric[],
graph: Graph,
allNodes: EnrichedSqlNode[],
): string | null {

if (node.nodeName.includes("Filter") || node.nodeName.includes("Join")) {
const inputEdges = graph.inEdges(node.nodeId.toString());

if (!inputEdges || inputEdges.length === 0) {
return null;
}

let totalInputRows = 0;

inputEdges.forEach((edge) => {
const inputNode = allNodes.find((n) => n.nodeId.toString() === edge.v);
if (inputNode) {
const inputRowsMetric = inputNode.metrics?.find((m) =>
m.name.includes("rows")
);
if (inputRowsMetric) {
const inputRows = parseFloat(inputRowsMetric.value.replace(/,/g, ""));
if (!isNaN(inputRows)) {
totalInputRows += inputRows;
}
}
}
});

if (totalInputRows === 0) {
return null;
}

const outputRowsMetric = updatedMetrics.find((m) => m.name.includes("rows"));
if (!outputRowsMetric) {
return null;
}

const outputRows = parseFloat(outputRowsMetric.value.replace(/,/g, ""));
if (isNaN(outputRows)) {
return null;
}

const filteredRowsPercentage = new Intl.NumberFormat('default', {
style: 'percent',
minimumFractionDigits: 2,
maximumFractionDigits: 2,
}).format((totalInputRows - outputRows) / totalInputRows);

return filteredRowsPercentage;
}
return null;
}