2020
2121import com .github .benmanes .caffeine .cache .Caffeine ;
2222import com .github .benmanes .caffeine .cache .LoadingCache ;
23+ import java .io .Closeable ;
2324import java .io .IOException ;
25+ import java .nio .ByteBuffer ;
2426import java .util .List ;
2527import java .util .Map ;
2628import java .util .Set ;
4042import org .apache .iceberg .metrics .ScanMetricsUtil ;
4143import org .apache .iceberg .relocated .com .google .common .collect .Iterables ;
4244import org .apache .iceberg .relocated .com .google .common .collect .Lists ;
45+ import org .apache .iceberg .relocated .com .google .common .collect .Maps ;
4346import org .apache .iceberg .relocated .com .google .common .collect .Sets ;
4447import org .apache .iceberg .types .Types ;
4548import org .apache .iceberg .util .ContentFileUtil ;
49+ import org .apache .iceberg .util .Pair ;
4650import org .apache .iceberg .util .ParallelIterable ;
4751
4852class ManifestGroup {
4953 private static final Types .StructType EMPTY_STRUCT = Types .StructType .of ();
5054
5155 private final FileIO io ;
52- private final Set <ManifestFile > dataManifests ;
56+ private Set <ManifestFile > dataManifests ;
5357 private final DeleteFileIndex .Builder deleteIndexBuilder ;
5458 private Predicate <ManifestEntry <DataFile >> manifestEntryPredicate ;
5559 private Map <Integer , PartitionSpec > specsById ;
@@ -64,6 +68,7 @@ class ManifestGroup {
6468 private Set <Integer > columnsToKeepStats ;
6569 private ExecutorService executorService ;
6670 private ScanMetrics scanMetrics ;
71+ private DeleteFileIndex deleteFiles ;
6772
6873 ManifestGroup (FileIO io , Iterable <ManifestFile > manifests ) {
6974 this (
@@ -162,6 +167,34 @@ ManifestGroup planWith(ExecutorService newExecutorService) {
162167 return this ;
163168 }
164169
170+ ManifestGroup planByPartition () {
171+ Map <Pair <Integer , StructLike >, Integer > partitionRefCount = Maps .newHashMap ();
172+ Map <ManifestFile , Set <Pair <Integer , StructLike >>> distinctPartitionsInManifest =
173+ Maps .newHashMap ();
174+ for (ManifestFile file : dataManifests ) {
175+ Set <Pair <Integer , StructLike >> visited = Sets .newHashSet ();
176+ try (ManifestReader <DataFile > reader = ManifestFiles .read (file , io )) {
177+ for (DataFile dataFile : reader ) {
178+ Pair <Integer , StructLike > partition = Pair .of (dataFile .specId (), dataFile .partition ());
179+ if (visited .add (partition )) {
180+ partitionRefCount .put (partition , partitionRefCount .getOrDefault (partition , 0 ) + 1 );
181+ }
182+ }
183+ } catch (IOException e ) {
184+ throw new RuntimeException (e );
185+ }
186+ distinctPartitionsInManifest .put (file , visited );
187+ }
188+
189+ Set <ManifestFile > newDataFiles = Sets .newHashSet ();
190+ for (ManifestFile file : dataManifests ) {
191+ newDataFiles .add (
192+ new CloseableManifest (file , partitionRefCount , distinctPartitionsInManifest ));
193+ }
194+ this .dataManifests = newDataFiles ;
195+ return this ;
196+ }
197+
165198 /**
166199 * Returns an iterable of scan tasks. It is safe to add entries of this iterable to a collection
167200 * as {@link DataFile} in each {@link FileScanTask} is defensively copied.
@@ -172,6 +205,144 @@ public CloseableIterable<FileScanTask> planFiles() {
172205 return plan (ManifestGroup ::createFileScanTasks );
173206 }
174207
208+ private class CloseableManifest implements ManifestFile , Closeable {
209+ private final ManifestFile delegate ;
210+ private final Map <Pair <Integer , StructLike >, Integer > partitionRefCount ;
211+ private final Map <ManifestFile , Set <Pair <Integer , StructLike >>> distinctPartitionsInManifest ;
212+
213+ private CloseableManifest (
214+ ManifestFile delegate ,
215+ Map <Pair <Integer , StructLike >, Integer > partitionRefCount ,
216+ Map <ManifestFile , Set <Pair <Integer , StructLike >>> distinctPartitionsInManifest ) {
217+ this .delegate = delegate ;
218+ this .partitionRefCount = partitionRefCount ;
219+ this .distinctPartitionsInManifest = distinctPartitionsInManifest ;
220+ }
221+
222+ @ Override
223+ public void close () {
224+ synchronized (partitionRefCount ) {
225+ Set <Pair <Integer , StructLike >> pairs = distinctPartitionsInManifest .get (delegate );
226+ for (Pair <Integer , StructLike > partition : pairs ) {
227+ partitionRefCount .put (partition , partitionRefCount .get (partition ) - 1 );
228+ if (partitionRefCount .get (partition ) == 0 ) {
229+ deleteFiles .removeIndex (partition .first (), partition .second ());
230+ }
231+ }
232+ }
233+ }
234+
235+ @ Override
236+ public boolean hasAddedFiles () {
237+ return delegate .hasAddedFiles ();
238+ }
239+
240+ @ Override
241+ public boolean hasExistingFiles () {
242+ return delegate .hasExistingFiles ();
243+ }
244+
245+ @ Override
246+ public boolean hasDeletedFiles () {
247+ return delegate .hasDeletedFiles ();
248+ }
249+
250+ @ Override
251+ public ByteBuffer keyMetadata () {
252+ return delegate .keyMetadata ();
253+ }
254+
255+ @ Override
256+ public Long firstRowId () {
257+ return delegate .firstRowId ();
258+ }
259+
260+ @ Override
261+ public String path () {
262+ return delegate .path ();
263+ }
264+
265+ @ Override
266+ public long length () {
267+ return delegate .length ();
268+ }
269+
270+ @ Override
271+ public int partitionSpecId () {
272+ return delegate .partitionSpecId ();
273+ }
274+
275+ @ Override
276+ public ManifestContent content () {
277+ return delegate .content ();
278+ }
279+
280+ @ Override
281+ public long sequenceNumber () {
282+ return delegate .sequenceNumber ();
283+ }
284+
285+ @ Override
286+ public long minSequenceNumber () {
287+ return delegate .minSequenceNumber ();
288+ }
289+
290+ @ Override
291+ public Long snapshotId () {
292+ return delegate .snapshotId ();
293+ }
294+
295+ @ Override
296+ public Integer addedFilesCount () {
297+ return delegate .addedFilesCount ();
298+ }
299+
300+ @ Override
301+ public Long addedRowsCount () {
302+ return delegate .addedRowsCount ();
303+ }
304+
305+ @ Override
306+ public Integer existingFilesCount () {
307+ return delegate .existingFilesCount ();
308+ }
309+
310+ @ Override
311+ public Long existingRowsCount () {
312+ return delegate .existingRowsCount ();
313+ }
314+
315+ @ Override
316+ public Integer deletedFilesCount () {
317+ return delegate .deletedFilesCount ();
318+ }
319+
320+ @ Override
321+ public Long deletedRowsCount () {
322+ return delegate .deletedRowsCount ();
323+ }
324+
325+ @ Override
326+ public List <PartitionFieldSummary > partitions () {
327+ return delegate .partitions ();
328+ }
329+
330+ @ Override
331+ public ManifestFile copy () {
332+ return delegate .copy ();
333+ }
334+
335+ @ Override
336+ public int hashCode () {
337+ return delegate .hashCode ();
338+ }
339+
340+ @ Override
341+ public boolean equals (Object obj ) {
342+ return delegate .equals (obj );
343+ }
344+ }
345+
175346 public <T extends ScanTask > CloseableIterable <T > plan (CreateTasksFunction <T > createTasksFunc ) {
176347 LoadingCache <Integer , ResidualEvaluator > residualCache =
177348 Caffeine .newBuilder ()
@@ -182,7 +353,7 @@ public <T extends ScanTask> CloseableIterable<T> plan(CreateTasksFunction<T> cre
182353 return ResidualEvaluator .of (spec , filter , caseSensitive );
183354 });
184355
185- DeleteFileIndex deleteFiles = deleteIndexBuilder .scanMetrics (scanMetrics ).build ();
356+ deleteFiles = deleteIndexBuilder .scanMetrics (scanMetrics ).build ();
186357
187358 boolean dropStats = ManifestReader .dropStats (columns );
188359 if (deleteFiles .hasEqualityDeletes ()) {
@@ -352,6 +523,9 @@ public void close() throws IOException {
352523 if (iterable != null ) {
353524 iterable .close ();
354525 }
526+ if (manifest instanceof CloseableManifest ) {
527+ ((CloseableManifest ) manifest ).close ();
528+ }
355529 }
356530 });
357531 }
0 commit comments