Skip to content

Commit 1a92c3f

Browse files
WalBehamotl
authored andcommitted
XMover/problematic-translogs: Large and small shards analysis/tracking
- Add max shard size tracking to size buckets - Add small shards analysis view to analyzer command
1 parent c7fc33a commit 1a92c3f

File tree

2 files changed

+291
-11
lines changed

2 files changed

+291
-11
lines changed

cratedb_toolkit/admin/xmover/analysis/shard.py

Lines changed: 288 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
ShardInfo,
2121
ShardRelocationConstraints,
2222
ShardRelocationResponse,
23+
TableStatsType,
2324
)
2425
from cratedb_toolkit.admin.xmover.util.database import CrateDBClient
2526
from cratedb_toolkit.admin.xmover.util.format import format_percentage, format_size
@@ -621,11 +622,11 @@ def get_shard_size_overview(self) -> Dict[str, Any]:
621622

622623
# Define size buckets (in GB)
623624
size_buckets = {
624-
"<1GB": {"count": 0, "total_size": 0.0, "avg_size_gb": 0.0},
625-
"1GB-5GB": {"count": 0, "total_size": 0.0, "avg_size_gb": 0.0},
626-
"5GB-10GB": {"count": 0, "total_size": 0.0, "avg_size_gb": 0.0},
627-
"10GB-50GB": {"count": 0, "total_size": 0.0, "avg_size_gb": 0.0},
628-
">=50GB": {"count": 0, "total_size": 0.0, "avg_size_gb": 0.0},
625+
"<1GB": {"count": 0, "total_size": 0.0, "avg_size_gb": 0.0, "max_size": 0.0},
626+
"1GB-5GB": {"count": 0, "total_size": 0.0, "avg_size_gb": 0.0, "max_size": 0.0},
627+
"5GB-10GB": {"count": 0, "total_size": 0.0, "avg_size_gb": 0.0, "max_size": 0.0},
628+
"10GB-50GB": {"count": 0, "total_size": 0.0, "avg_size_gb": 0.0, "max_size": 0.0},
629+
">=50GB": {"count": 0, "total_size": 0.0, "avg_size_gb": 0.0, "max_size": 0.0},
629630
}
630631

631632
if not started_shards:
@@ -652,19 +653,24 @@ def get_shard_size_overview(self) -> Dict[str, Any]:
652653
if size_gb >= 50:
653654
size_buckets[">=50GB"]["count"] += 1
654655
size_buckets[">=50GB"]["total_size"] += size_gb
656+
size_buckets[">=50GB"]["max_size"] = max(size_buckets[">=50GB"]["max_size"], size_gb)
655657
large_shards_count += 1
656658
elif size_gb >= 10:
657659
size_buckets["10GB-50GB"]["count"] += 1
658660
size_buckets["10GB-50GB"]["total_size"] += size_gb
661+
size_buckets["10GB-50GB"]["max_size"] = max(size_buckets["10GB-50GB"]["max_size"], size_gb)
659662
elif size_gb >= 5:
660663
size_buckets["5GB-10GB"]["count"] += 1
661664
size_buckets["5GB-10GB"]["total_size"] += size_gb
665+
size_buckets["5GB-10GB"]["max_size"] = max(size_buckets["5GB-10GB"]["max_size"], size_gb)
662666
elif size_gb >= 1:
663667
size_buckets["1GB-5GB"]["count"] += 1
664668
size_buckets["1GB-5GB"]["total_size"] += size_gb
669+
size_buckets["1GB-5GB"]["max_size"] = max(size_buckets["1GB-5GB"]["max_size"], size_gb)
665670
else:
666671
size_buckets["<1GB"]["count"] += 1
667672
size_buckets["<1GB"]["total_size"] += size_gb
673+
size_buckets["<1GB"]["max_size"] = max(size_buckets["<1GB"]["max_size"], size_gb)
668674
very_small_shards += 1
669675

670676
# Calculate the average size for each bucket
@@ -686,6 +692,123 @@ def get_shard_size_overview(self) -> Dict[str, Any]:
686692
"very_small_shards_percentage": very_small_percentage,
687693
}
688694

695+
def get_large_shards_details(self) -> List[Dict[str, Any]]:
696+
"""Get detailed information about large shards (>=50GB) including partition values"""
697+
# Optimized query to fetch only large shards directly from database
698+
query = """
699+
SELECT
700+
s.schema_name,
701+
s.table_name,
702+
translate(p.values::text, ':{}', '=()') as partition_values,
703+
s.id as shard_id,
704+
s.size / 1024^3 as size_gb,
705+
s."primary" as is_primary,
706+
s.node['name'] as node_name,
707+
s.node['id'] as node_id
708+
FROM sys.shards s
709+
LEFT JOIN information_schema.table_partitions p
710+
ON s.table_name = p.table_name
711+
AND s.schema_name = p.table_schema
712+
AND s.partition_ident = p.partition_ident
713+
WHERE s.state = 'STARTED'
714+
AND s.size >= 50 * 1024^3 -- 50GB in bytes
715+
ORDER BY s.size DESC
716+
"""
717+
718+
result = self.client.execute_query(query)
719+
720+
large_shards = []
721+
for row in result.get("rows", []):
722+
# Get zone information from our nodes data
723+
node_id = row[7]
724+
zone = next((node.zone for node in self.nodes if node.id == node_id), "unknown")
725+
726+
large_shards.append(
727+
{
728+
"schema_name": row[0] or "doc",
729+
"table_name": row[1],
730+
"partition_values": row[2],
731+
"shard_id": row[3],
732+
"size_gb": float(row[4]) if row[4] else 0.0,
733+
"is_primary": row[5] or False,
734+
"node_name": row[6],
735+
"zone": zone,
736+
}
737+
)
738+
739+
return large_shards
740+
741+
def get_small_shards_details(self, limit: int = 10) -> List[Dict[str, Any]]:
742+
"""Get detailed information about the smallest shards, grouped by table/partition"""
743+
# Query to get all shards, ordered by size ascending to get the smallest
744+
query = """
745+
SELECT
746+
s.schema_name,
747+
s.table_name,
748+
translate(p.values::text, ':{}', '=()') as partition_values,
749+
s.id as shard_id,
750+
s.size / 1024^3 as size_gb,
751+
s."primary" as is_primary,
752+
s.node['name'] as node_name,
753+
s.node['id'] as node_id
754+
FROM sys.shards s
755+
LEFT JOIN information_schema.table_partitions p
756+
ON s.table_name = p.table_name
757+
AND s.schema_name = p.table_schema
758+
AND s.partition_ident = p.partition_ident
759+
WHERE s.state = 'STARTED'
760+
ORDER BY s.size ASC
761+
"""
762+
763+
result = self.client.execute_query(query)
764+
765+
# Group by table/partition to get aggregated stats
766+
table_partition_stats: TableStatsType = {}
767+
for row in result.get("rows", []):
768+
# Get zone information from our nodes data
769+
node_id = row[7]
770+
771+
# FIXME: `zone` does not get used.
772+
zone = next((node.zone for node in self.nodes if node.id == node_id), "unknown") # noqa: F841
773+
774+
# Create table key with schema
775+
schema_name = row[0] or "doc"
776+
table_name = row[1]
777+
table_display = table_name
778+
if schema_name and schema_name != "doc":
779+
table_display = f"{schema_name}.{table_name}"
780+
781+
# Create partition key
782+
partition_key = row[2] or "N/A"
783+
784+
# Create combined key
785+
key = (table_display, partition_key)
786+
787+
if key not in table_partition_stats:
788+
table_partition_stats[key] = {"sizes": [], "primary_count": 0, "replica_count": 0, "total_size": 0.0}
789+
790+
# Aggregate stats
791+
stats = table_partition_stats[key]
792+
size_gb = float(row[4]) if row[4] else 0.0
793+
stats["sizes"].append(size_gb)
794+
stats["total_size"] += size_gb
795+
if row[5]: # is_primary
796+
stats["primary_count"] += 1
797+
else:
798+
stats["replica_count"] += 1
799+
800+
# Sort by average size ascending (smallest first) and return top tables/partitions
801+
sorted_stats: List[Dict[str, Any]] = []
802+
for (table_name, partition_key), stats in table_partition_stats.items():
803+
avg_size = sum(stats["sizes"]) / len(stats["sizes"]) if stats["sizes"] else 0
804+
sorted_stats.append(
805+
{"table_name": table_name, "partition_key": partition_key, "stats": stats, "avg_size": avg_size}
806+
)
807+
808+
# Sort by average size and take the top 'limit' entries
809+
sorted_stats.sort(key=lambda x: x["avg_size"])
810+
return sorted_stats[:limit]
811+
689812
def get_cluster_overview(self) -> Dict[str, Any]:
690813
"""Get a comprehensive overview of the cluster"""
691814
# Get cluster watermark settings
@@ -1114,6 +1237,7 @@ def distribution(self, table: str = None):
11141237
size_table.add_column("Count", justify="right", style="magenta")
11151238
size_table.add_column("Percentage", justify="right", style="green")
11161239
size_table.add_column("Avg Size", justify="right", style="blue")
1240+
size_table.add_column("Max Size", justify="right", style="red")
11171241
size_table.add_column("Total Size", justify="right", style="yellow")
11181242

11191243
total_shards = size_overview["total_shards"]
@@ -1147,6 +1271,7 @@ def distribution(self, table: str = None):
11471271
count_str,
11481272
percentage_str,
11491273
f"{avg_size:.2f}GB" if avg_size > 0 else "0GB",
1274+
f"{bucket_data['max_size']:.2f}GB" if bucket_data["max_size"] > 0 else "0GB",
11501275
format_size(total_size),
11511276
)
11521277

@@ -1177,23 +1302,176 @@ def distribution(self, table: str = None):
11771302
for warning in warnings:
11781303
console.print(warning)
11791304

1305+
# Show compact table/partition breakdown of large shards if any exist
1306+
if size_overview["large_shards_count"] > 0:
1307+
console.print()
1308+
large_shards_details = self.analyzer.get_large_shards_details()
1309+
1310+
# Aggregate by table/partition
1311+
table_partition_stats: TableStatsType = {}
1312+
for shard in large_shards_details:
1313+
# Create table key with schema
1314+
table_display = shard["table_name"]
1315+
if shard["schema_name"] and shard["schema_name"] != "doc":
1316+
table_display = f"{shard['schema_name']}.{shard['table_name']}"
1317+
1318+
# Create partition key
1319+
partition_key = shard["partition_values"] or "N/A"
1320+
1321+
# Create combined key
1322+
key = (table_display, partition_key)
1323+
1324+
if key not in table_partition_stats:
1325+
table_partition_stats[key] = {
1326+
"sizes": [],
1327+
"primary_count": 0,
1328+
"replica_count": 0,
1329+
"total_size": 0.0,
1330+
}
1331+
1332+
# Aggregate stats
1333+
stats = table_partition_stats[key]
1334+
stats["sizes"].append(shard["size_gb"])
1335+
stats["total_size"] += shard["size_gb"]
1336+
if shard["is_primary"]:
1337+
stats["primary_count"] += 1
1338+
else:
1339+
stats["replica_count"] += 1
1340+
1341+
# Create compact table
1342+
large_shards_table = Table(title="Large Shards Breakdown by Table/Partition (>=50GB)", box=box.ROUNDED)
1343+
large_shards_table.add_column("Table", style="cyan")
1344+
large_shards_table.add_column("Partition", style="blue")
1345+
large_shards_table.add_column("Shards", justify="right", style="magenta")
1346+
large_shards_table.add_column("P/R", justify="center", style="yellow")
1347+
large_shards_table.add_column("Min Size", justify="right", style="green")
1348+
large_shards_table.add_column("Avg Size", justify="right", style="red")
1349+
large_shards_table.add_column("Max Size", justify="right", style="red")
1350+
large_shards_table.add_column("Total Size", justify="right", style="red")
1351+
1352+
# Sort by total size descending (most problematic first)
1353+
sorted_stats = sorted(table_partition_stats.items(), key=lambda x: x[1]["total_size"], reverse=True)
1354+
1355+
for (table_name, partition_key), stats in sorted_stats:
1356+
# Format partition display
1357+
partition_display = partition_key
1358+
if partition_display != "N/A" and len(partition_display) > 25:
1359+
partition_display = partition_display[:22] + "..."
1360+
1361+
# Calculate size stats
1362+
sizes = stats["sizes"]
1363+
min_size = min(sizes)
1364+
avg_size = sum(sizes) / len(sizes)
1365+
max_size = max(sizes)
1366+
total_size = stats["total_size"]
1367+
total_shards = len(sizes)
1368+
1369+
# Format primary/replica ratio
1370+
p_r_display = f"{stats['primary_count']}P/{stats['replica_count']}R"
1371+
1372+
large_shards_table.add_row(
1373+
table_name,
1374+
partition_display,
1375+
str(total_shards),
1376+
p_r_display,
1377+
f"{min_size:.1f}GB",
1378+
f"{avg_size:.1f}GB",
1379+
f"{max_size:.1f}GB",
1380+
f"{total_size:.1f}GB",
1381+
)
1382+
1383+
console.print(large_shards_table)
1384+
1385+
# Add summary stats
1386+
total_primary = sum(stats["primary_count"] for stats in table_partition_stats.values())
1387+
total_replica = sum(stats["replica_count"] for stats in table_partition_stats.values())
1388+
affected_table_partitions = len(table_partition_stats)
1389+
1390+
console.print()
1391+
console.print(
1392+
f"[dim]📊 Summary: {total_primary} primary, {total_replica} replica shards "
1393+
f"across {affected_table_partitions} table/partition(s)[/dim]"
1394+
)
1395+
1396+
# Show compact table/partition breakdown of smallest shards (top 10)
1397+
console.print()
1398+
small_shards_details = self.analyzer.get_small_shards_details(limit=10)
1399+
1400+
if small_shards_details:
1401+
# Create compact table
1402+
small_shards_table = Table(title="Smallest Shards Breakdown by Table/Partition (Top 10)", box=box.ROUNDED)
1403+
small_shards_table.add_column("Table", style="cyan")
1404+
small_shards_table.add_column("Partition", style="blue")
1405+
small_shards_table.add_column("Shards", justify="right", style="magenta")
1406+
small_shards_table.add_column("P/R", justify="center", style="yellow")
1407+
small_shards_table.add_column("Min Size", justify="right", style="green")
1408+
small_shards_table.add_column("Avg Size", justify="right", style="red")
1409+
small_shards_table.add_column("Max Size", justify="right", style="red")
1410+
small_shards_table.add_column("Total Size", justify="right", style="red")
1411+
1412+
for entry in small_shards_details:
1413+
table_name = entry["table_name"]
1414+
partition_key = entry["partition_key"]
1415+
stats = entry["stats"]
1416+
1417+
# Format partition display
1418+
partition_display = partition_key
1419+
if partition_display != "N/A" and len(partition_display) > 25:
1420+
partition_display = partition_display[:22] + "..."
1421+
1422+
# Calculate size stats
1423+
sizes = stats["sizes"]
1424+
min_size = min(sizes)
1425+
avg_size = sum(sizes) / len(sizes)
1426+
max_size = max(sizes)
1427+
total_size = stats["total_size"]
1428+
total_shards = len(sizes)
1429+
1430+
# Format primary/replica ratio
1431+
p_r_display = f"{stats['primary_count']}P/{stats['replica_count']}R"
1432+
1433+
small_shards_table.add_row(
1434+
table_name,
1435+
partition_display,
1436+
str(total_shards),
1437+
p_r_display,
1438+
f"{min_size:.1f}GB",
1439+
f"{avg_size:.1f}GB",
1440+
f"{max_size:.1f}GB",
1441+
f"{total_size:.1f}GB",
1442+
)
1443+
1444+
console.print(small_shards_table)
1445+
1446+
# Add summary stats for smallest shards
1447+
total_small_primary = sum(entry["stats"]["primary_count"] for entry in small_shards_details)
1448+
total_small_replica = sum(entry["stats"]["replica_count"] for entry in small_shards_details)
1449+
small_table_partitions = len(small_shards_details)
1450+
1451+
console.print()
1452+
console.print(
1453+
f"[dim]📊 Summary: {total_small_primary} primary, "
1454+
f"{total_small_replica} replica shards across {small_table_partitions} table/partition(s) "
1455+
f"with smallest average sizes[/dim]"
1456+
)
1457+
11801458
console.print()
11811459

11821460
# Table-specific analysis if requested
11831461
if table:
11841462
console.print()
11851463
console.print(Panel.fit(f"[bold blue]Analysis for table: {table}[/bold blue]"))
11861464

1187-
stats = self.analyzer.analyze_distribution(table)
1465+
distribution_stats = self.analyzer.analyze_distribution(table)
11881466

11891467
table_summary = Table(title=f"Table {table} Distribution", box=box.ROUNDED)
11901468
table_summary.add_column("Metric", style="cyan")
11911469
table_summary.add_column("Value", style="magenta")
11921470

1193-
table_summary.add_row("Total Shards", str(stats.total_shards))
1194-
table_summary.add_row("Total Size", format_size(stats.total_size_gb))
1195-
table_summary.add_row("Zone Balance Score", f"{stats.zone_balance_score:.1f}/100")
1196-
table_summary.add_row("Node Balance Score", f"{stats.node_balance_score:.1f}/100")
1471+
table_summary.add_row("Total Shards", str(distribution_stats.total_shards))
1472+
table_summary.add_row("Total Size", format_size(distribution_stats.total_size_gb))
1473+
table_summary.add_row("Zone Balance Score", f"{distribution_stats.zone_balance_score:.1f}/100")
1474+
table_summary.add_row("Node Balance Score", f"{distribution_stats.node_balance_score:.1f}/100")
11971475

11981476
console.print(table_summary)
11991477

cratedb_toolkit/admin/xmover/model.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from dataclasses import dataclass
2-
from typing import Dict, Optional
2+
from typing import Any, Dict, Optional, Tuple
3+
4+
TableStatsType = Dict[Tuple[str, str], Dict[str, Any]]
35

46

57
@dataclass

0 commit comments

Comments
 (0)