@@ -1847,6 +1847,7 @@ Version doGranuleRollback(Reference<GranuleMetadata> metadata,
1847
1847
int toPop = 0 ;
1848
1848
// keep bytes in delta files pending here, then add back already durable delta files at end
1849
1849
metadata->bytesInNewDeltaFiles = 0 ;
1850
+ metadata->newDeltaFileCount = 0 ;
1850
1851
1851
1852
for (auto & f : inFlightFiles) {
1852
1853
if (f.snapshot ) {
@@ -1863,6 +1864,7 @@ Version doGranuleRollback(Reference<GranuleMetadata> metadata,
1863
1864
} else {
1864
1865
metadata->pendingSnapshotVersion = f.version ;
1865
1866
metadata->bytesInNewDeltaFiles = 0 ;
1867
+ metadata->newDeltaFileCount = 0 ;
1866
1868
}
1867
1869
} else {
1868
1870
if (f.version > rollbackVersion) {
@@ -1879,6 +1881,7 @@ Version doGranuleRollback(Reference<GranuleMetadata> metadata,
1879
1881
ASSERT (f.version > cfRollbackVersion);
1880
1882
cfRollbackVersion = f.version ;
1881
1883
metadata->bytesInNewDeltaFiles += f.bytes ;
1884
+ metadata->newDeltaFileCount ++;
1882
1885
}
1883
1886
}
1884
1887
}
@@ -1911,6 +1914,7 @@ Version doGranuleRollback(Reference<GranuleMetadata> metadata,
1911
1914
i >= 0 && metadata->files .deltaFiles [i].version > metadata->pendingSnapshotVersion ;
1912
1915
i--) {
1913
1916
metadata->bytesInNewDeltaFiles += metadata->files .deltaFiles [i].logicalSize ;
1917
+ metadata->newDeltaFileCount ++;
1914
1918
}
1915
1919
1916
1920
// Track that this rollback happened, since we have to re-read mutations up to the rollback
@@ -2196,6 +2200,7 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
2196
2200
for (int i = files.deltaFiles .size () - 1 ; i >= 0 ; i--) {
2197
2201
if (files.deltaFiles [i].version > snapshotVersion) {
2198
2202
metadata->bytesInNewDeltaFiles += files.deltaFiles [i].logicalSize ;
2203
+ metadata->newDeltaFileCount ++;
2199
2204
}
2200
2205
}
2201
2206
}
@@ -2803,6 +2808,7 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
2803
2808
ASSERT (metadata->bufferedDeltaVersion <= lastDeltaVersion);
2804
2809
metadata->bufferedDeltaVersion = lastDeltaVersion; // In case flush was forced at non-mutation version
2805
2810
metadata->bytesInNewDeltaFiles += metadata->bufferedDeltaBytes ;
2811
+ metadata->newDeltaFileCount ++;
2806
2812
2807
2813
bwData->stats .mutationBytesBuffered -= metadata->bufferedDeltaBytes ;
2808
2814
@@ -2838,11 +2844,11 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
2838
2844
// making a bunch of extra delta files at some point, even if we don't consider it for a split
2839
2845
// yet
2840
2846
2841
- // If we have enough delta files, try to re-snapshot
2842
- // FIXME: have max file count in addition to bytes count
2847
+ // If we have enough delta file data, try to re-snapshot
2843
2848
if (snapshotEligible &&
2844
2849
(metadata->doEarlyReSnapshot () ||
2845
- metadata->bytesInNewDeltaFiles >= metadata->writeAmpTarget .getBytesBeforeCompact ())) {
2850
+ metadata->bytesInNewDeltaFiles >= metadata->writeAmpTarget .getBytesBeforeCompact () ||
2851
+ metadata->newDeltaFileCount >= 20 )) {
2846
2852
if (BW_DEBUG && !inFlightFiles.empty ()) {
2847
2853
fmt::print (" Granule [{0} - {1}) ready to re-snapshot at {2} after {3} > {4} bytes, "
2848
2854
" waiting for outstanding {5} files to finish\n " ,
@@ -2854,6 +2860,9 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
2854
2860
inFlightFiles.size ());
2855
2861
}
2856
2862
2863
+ CODE_PROBE (metadata->doEarlyReSnapshot (), " granule snapshotting early" );
2864
+ CODE_PROBE (metadata->newDeltaFileCount >= 20 , " granule snapshotting due to many small delta files" );
2865
+
2857
2866
// cancel previous candidate checker
2858
2867
checkMergeCandidate.cancel ();
2859
2868
@@ -2892,6 +2901,7 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
2892
2901
2893
2902
// reset metadata
2894
2903
metadata->bytesInNewDeltaFiles = 0 ;
2904
+ metadata->newDeltaFileCount = 0 ;
2895
2905
metadata->resetReadStats ();
2896
2906
2897
2907
// If we have more than one snapshot file and that file is unblocked (committedVersion >=
0 commit comments