Skip to content

Commit 2a39712

Browse files
Core: Adds SortRewriteStrategy (#2609)
A rewrite strategy for data files which aims to reorder data with data files to optimally lay them out in relation to a column. For example, if the Sort strategy is used on a set of files which is ordered by column x and original has files File A (x: 0 - 50), File B ( x: 10 - 40) and File C ( x: 30 - 60), this Strategy will attempt to rewrite those files into File A' (x: 0-20), File B' (x: 21 - 40), File C' (x: 41 - 60).
1 parent c13d4e3 commit 2a39712

File tree

4 files changed

+281
-1
lines changed

4 files changed

+281
-1
lines changed

api/src/main/java/org/apache/iceberg/SortOrder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ public SortOrder build() {
286286
}
287287
}
288288

289-
static void checkCompatibility(SortOrder sortOrder, Schema schema) {
289+
public static void checkCompatibility(SortOrder sortOrder, Schema schema) {
290290
for (SortField field : sortOrder.fields) {
291291
Type sourceType = schema.findType(field.sourceId());
292292
ValidationException.check(
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iceberg.actions;
21+
22+
import java.util.Map;
23+
import java.util.Set;
24+
import org.apache.iceberg.FileScanTask;
25+
import org.apache.iceberg.SortOrder;
26+
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
27+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
28+
import org.apache.iceberg.util.PropertyUtil;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
31+
32+
/**
33+
* A rewrite strategy for data files which aims to reorder data with data files to optimally lay them out
34+
* in relation to a column. For example, if the Sort strategy is used on a set of files which is ordered
35+
* by column x and original has files File A (x: 0 - 50), File B ( x: 10 - 40) and File C ( x: 30 - 60),
36+
* this Strategy will attempt to rewrite those files into File A' (x: 0-20), File B' (x: 21 - 40),
37+
* File C' (x: 41 - 60).
38+
* <p>
39+
* Currently the there is no file overlap detection and we will rewrite all files if {@link SortStrategy#REWRITE_ALL}
40+
* is true (default: false). If this property is disabled any files that would be chosen by
41+
* {@link BinPackStrategy} will be rewrite candidates.
42+
* <p>
43+
* In the future other algorithms for determining files to rewrite will be provided.
44+
*/
45+
public abstract class SortStrategy extends BinPackStrategy {
46+
private static final Logger LOG = LoggerFactory.getLogger(SortStrategy.class);
47+
48+
/**
49+
* Rewrites all files, regardless of their size. Defaults to false, rewriting only mis-sized
50+
* files;
51+
*/
52+
public static final String REWRITE_ALL = "rewrite-all";
53+
public static final boolean REWRITE_ALL_DEFAULT = false;
54+
55+
56+
private static final Set<String> validOptions = ImmutableSet.of(
57+
REWRITE_ALL
58+
);
59+
60+
private boolean rewriteAll;
61+
private SortOrder sortOrder;
62+
63+
/**
64+
* Sets the sort order to be used in this strategy when rewriting files
65+
* @param order the order to use
66+
* @return this for method chaining
67+
*/
68+
public SortStrategy sortOrder(SortOrder order) {
69+
this.sortOrder = order;
70+
return this;
71+
}
72+
73+
protected SortOrder sortOrder() {
74+
return sortOrder;
75+
}
76+
77+
@Override
78+
public String name() {
79+
return "SORT";
80+
}
81+
82+
@Override
83+
public Set<String> validOptions() {
84+
return ImmutableSet.<String>builder()
85+
.addAll(super.validOptions())
86+
.addAll(validOptions)
87+
.build();
88+
}
89+
90+
@Override
91+
public RewriteStrategy options(Map<String, String> options) {
92+
super.options(options); // Also checks validity of BinPack options
93+
94+
rewriteAll = PropertyUtil.propertyAsBoolean(options,
95+
REWRITE_ALL,
96+
REWRITE_ALL_DEFAULT);
97+
98+
if (sortOrder == null) {
99+
sortOrder = table().sortOrder();
100+
}
101+
102+
validateOptions();
103+
return this;
104+
}
105+
106+
@Override
107+
public Iterable<FileScanTask> selectFilesToRewrite(Iterable<FileScanTask> dataFiles) {
108+
if (rewriteAll) {
109+
LOG.info("Sort Strategy for table {} set to rewrite all data files", table().name());
110+
return dataFiles;
111+
} else {
112+
return super.selectFilesToRewrite(dataFiles);
113+
}
114+
}
115+
116+
protected void validateOptions() {
117+
Preconditions.checkArgument(!sortOrder.isUnsorted(),
118+
"Can't use %s when there is no sort order, either define table %s's sort order or set sort" +
119+
"order in the action",
120+
name(), table().name());
121+
122+
SortOrder.checkCompatibility(sortOrder, table().schema());
123+
}
124+
}

core/src/test/java/org/apache/iceberg/MockFileScanTask.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
package org.apache.iceberg;
2121

22+
import org.mockito.Mockito;
23+
2224
public class MockFileScanTask extends BaseFileScanTask {
2325

2426
private final long length;
@@ -28,6 +30,18 @@ public MockFileScanTask(long length) {
2830
this.length = length;
2931
}
3032

33+
public MockFileScanTask(DataFile file) {
34+
super(file, null, null, null, null);
35+
this.length = file.fileSizeInBytes();
36+
}
37+
38+
public static MockFileScanTask mockTask(long length, int sortOrderId) {
39+
DataFile mockFile = Mockito.mock(DataFile.class);
40+
Mockito.when(mockFile.fileSizeInBytes()).thenReturn(length);
41+
Mockito.when(mockFile.sortOrderId()).thenReturn(sortOrderId);
42+
return new MockFileScanTask(mockFile);
43+
}
44+
3145
@Override
3246
public long length() {
3347
return length;
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iceberg.actions;
21+
22+
23+
import java.util.Collections;
24+
import java.util.List;
25+
import java.util.Set;
26+
import java.util.stream.IntStream;
27+
import org.apache.iceberg.AssertHelpers;
28+
import org.apache.iceberg.DataFile;
29+
import org.apache.iceberg.FileScanTask;
30+
import org.apache.iceberg.MockFileScanTask;
31+
import org.apache.iceberg.Schema;
32+
import org.apache.iceberg.SortOrder;
33+
import org.apache.iceberg.Table;
34+
import org.apache.iceberg.TableTestBase;
35+
import org.apache.iceberg.exceptions.ValidationException;
36+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
37+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
38+
import org.apache.iceberg.types.Types;
39+
import org.junit.Assert;
40+
import org.junit.Test;
41+
import org.junit.runner.RunWith;
42+
import org.junit.runners.Parameterized;
43+
44+
@RunWith(Parameterized.class)
45+
public class TestSortStrategy extends TableTestBase {
46+
47+
@Parameterized.Parameters(name = "formatVersion = {0}")
48+
public static Object[] parameters() {
49+
return new Object[] {2}; // We don't actually use the format version since everything is mock
50+
}
51+
52+
@Override
53+
public void setupTable() throws Exception {
54+
super.setupTable();
55+
table.replaceSortOrder().asc("data").commit();
56+
}
57+
58+
private static final long MB = 1024 * 1024;
59+
60+
public TestSortStrategy(int formatVersion) {
61+
super(formatVersion);
62+
}
63+
64+
class TestSortStrategyImpl extends SortStrategy {
65+
66+
@Override
67+
public Table table() {
68+
return table;
69+
}
70+
71+
@Override
72+
public Set<DataFile> rewriteFiles(List<FileScanTask> filesToRewrite) {
73+
throw new UnsupportedOperationException();
74+
}
75+
}
76+
77+
private SortStrategy defaultSort() {
78+
return (SortStrategy) new TestSortStrategyImpl().options(Collections.emptyMap());
79+
}
80+
81+
private List<FileScanTask> tasksForSortOrder(int sortOrderId, int... fileSizesMB) {
82+
ImmutableList.Builder<FileScanTask> files = ImmutableList.builder();
83+
IntStream.of(fileSizesMB).forEach(length -> files.add(MockFileScanTask.mockTask(length * MB, sortOrderId)));
84+
return files.build();
85+
}
86+
87+
@Test
88+
public void testInvalidSortOrder() {
89+
AssertHelpers.assertThrows("Should not allow an unsorted Sort order", IllegalArgumentException.class,
90+
() -> defaultSort().sortOrder(SortOrder.unsorted()).options(Collections.emptyMap()));
91+
92+
AssertHelpers.assertThrows("Should not allow a Sort order with bad columns", ValidationException.class,
93+
() -> {
94+
Schema badSchema = new Schema(
95+
ImmutableList.of(Types.NestedField.required(0, "nonexistant", Types.IntegerType.get())));
96+
97+
defaultSort()
98+
.sortOrder(SortOrder.builderFor(badSchema).asc("nonexistant").build())
99+
.options(Collections.emptyMap());
100+
});
101+
}
102+
103+
@Test
104+
public void testSelectAll() {
105+
List<FileScanTask> invalid = ImmutableList.<FileScanTask>builder()
106+
.addAll(tasksForSortOrder(-1, 500, 500, 500, 500))
107+
.addAll(tasksForSortOrder(table.sortOrder().orderId(), 10, 10, 2000, 10))
108+
.build();
109+
110+
List<FileScanTask> expected = ImmutableList.<FileScanTask>builder()
111+
.addAll(invalid)
112+
.addAll(tasksForSortOrder(table.sortOrder().orderId(), 500, 490, 520))
113+
.build();
114+
115+
RewriteStrategy strategy = defaultSort().options(ImmutableMap.of(SortStrategy.REWRITE_ALL, "true"));
116+
List<FileScanTask> actual = ImmutableList.copyOf(strategy.selectFilesToRewrite(expected));
117+
118+
Assert.assertEquals("Should mark all files for rewrite",
119+
expected, actual);
120+
}
121+
122+
@Test
123+
public void testUseSizeOptions() {
124+
List<FileScanTask> expected = ImmutableList.<FileScanTask>builder()
125+
.addAll(tasksForSortOrder(table.sortOrder().orderId(), 498, 551))
126+
.build();
127+
128+
List<FileScanTask> fileScanTasks = ImmutableList.<FileScanTask>builder()
129+
.addAll(expected)
130+
.addAll(tasksForSortOrder(table.sortOrder().orderId(), 500, 500))
131+
.build();
132+
133+
RewriteStrategy strategy = defaultSort().options(ImmutableMap.of(
134+
SortStrategy.MAX_FILE_SIZE_BYTES, Long.toString(550 * MB),
135+
SortStrategy.MIN_FILE_SIZE_BYTES, Long.toString(499 * MB)));
136+
137+
List<FileScanTask> actual = ImmutableList.copyOf(strategy.selectFilesToRewrite(fileScanTasks));
138+
139+
Assert.assertEquals("Should mark files for rewrite with adjusted min and max size",
140+
expected, actual);
141+
}
142+
}

0 commit comments

Comments
 (0)