Skip to content

Commit 53ff8be

Browse files
Adds SortRewriteStrategy
A rewrite strategy for data files which aims to reorder data with data files to optimally lay them out in relation to a column. For example, if the Sort strategy is used on a set of files which is ordered by column x and original has files File A (x: 0 - 50), File B ( x: 10 - 40) and File C ( x: 30 - 60), this Strategy will attempt to rewrite those files into File A' (x: 0-20), File B' (x: 21 - 40), File C' (x: 41 - 60). Currently the there is no clustering detection and we will rewrite all files if {@link SortStrategy#REWRITE_ALL} is true (default). If this property is disabled any files with the incorrect sort-order as well as any files that would be chosen by {@link BinPackStrategy} will be rewrite candidates. In the future other algorithms for determining files to rewrite will be provided.
1 parent c2725b1 commit 53ff8be

File tree

4 files changed

+334
-1
lines changed

4 files changed

+334
-1
lines changed

api/src/main/java/org/apache/iceberg/SortOrder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ public SortOrder build() {
286286
}
287287
}
288288

289-
static void checkCompatibility(SortOrder sortOrder, Schema schema) {
289+
public static void checkCompatibility(SortOrder sortOrder, Schema schema) {
290290
for (SortField field : sortOrder.fields) {
291291
Type sourceType = schema.findType(field.sourceId());
292292
ValidationException.check(
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iceberg.actions;
21+
22+
import java.util.Map;
23+
import java.util.Map.Entry;
24+
import java.util.Optional;
25+
import java.util.Set;
26+
import org.apache.iceberg.FileScanTask;
27+
import org.apache.iceberg.SortOrder;
28+
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
29+
import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
30+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
31+
import org.apache.iceberg.util.PropertyUtil;
32+
import org.slf4j.Logger;
33+
import org.slf4j.LoggerFactory;
34+
35+
/**
36+
* A rewrite strategy for data files which aims to reorder data with data files to optimally lay them out
37+
* in relation to a column. For example, if the Sort strategy is used on a set of files which is ordered
38+
* by column x and original has files File A (x: 0 - 50), File B ( x: 10 - 40) and File C ( x: 30 - 60),
39+
* this Strategy will attempt to rewrite those files into File A' (x: 0-20), File B' (x: 21 - 40),
40+
* File C' (x: 41 - 60).
41+
* <p>
42+
* Currently the there is no clustering detection and we will rewrite all files if {@link SortStrategy#REWRITE_ALL}
43+
* is true (default). If this property is disabled any files with the incorrect sort-order as well as any files
44+
* that would be chosen by {@link BinPackStrategy} will be rewrite candidates.
45+
* <p>
46+
* In the future other algorithms for determining files to rewrite will be provided.
47+
*/
48+
abstract class SortStrategy extends BinPackStrategy {
49+
private static final Logger LOG = LoggerFactory.getLogger(SortStrategy.class);
50+
51+
/**
52+
* Rewrites all files, regardless of their size. Defaults to false, rewriting only wrong sort-order and mis-sized
53+
* files;
54+
*/
55+
public static final String REWRITE_ALL = "no-size-filter";
56+
public static final boolean REWRITE_ALL_DEFAULT = false;
57+
58+
private static final Set<String> validOptions = ImmutableSet.of(
59+
REWRITE_ALL
60+
);
61+
62+
private boolean rewriteAll;
63+
private SortOrder sortOrder;
64+
private int sortOrderId = -1;
65+
66+
/**
67+
* Sets the sort order to be used in this strategy when rewriting files
68+
* @param order the order to use
69+
* @return this for method chaining
70+
*/
71+
public SortStrategy sortOrder(SortOrder order) {
72+
this.sortOrder = order;
73+
74+
// See if this order matches any of our known orders
75+
Optional<Entry<Integer, SortOrder>> knownOrder = table().sortOrders().entrySet().stream()
76+
.filter(entry -> entry.getValue().sameOrder(order))
77+
.findFirst();
78+
knownOrder.ifPresent(entry -> sortOrderId = entry.getKey());
79+
80+
return this;
81+
}
82+
83+
@Override
84+
public String name() {
85+
return "SORT";
86+
}
87+
88+
@Override
89+
public Set<String> validOptions() {
90+
return ImmutableSet.<String>builder()
91+
.addAll(super.validOptions())
92+
.addAll(validOptions)
93+
.build();
94+
}
95+
96+
@Override
97+
public RewriteStrategy options(Map<String, String> options) {
98+
super.options(options); // Also checks validity of BinPack options
99+
100+
rewriteAll = PropertyUtil.propertyAsBoolean(options,
101+
REWRITE_ALL,
102+
REWRITE_ALL_DEFAULT);
103+
104+
if (sortOrder == null) {
105+
sortOrder = table().sortOrder();
106+
sortOrderId = sortOrder.orderId();
107+
}
108+
109+
validateOptions();
110+
return this;
111+
}
112+
113+
@Override
114+
public Iterable<FileScanTask> selectFilesToRewrite(Iterable<FileScanTask> dataFiles) {
115+
if (rewriteAll) {
116+
LOG.info("Sort Strategy for table {} set to rewrite all data files", table().name());
117+
return dataFiles;
118+
} else {
119+
FluentIterable filesWithCorrectOrder =
120+
FluentIterable.from(dataFiles).filter(file -> file.file().sortOrderId() == sortOrderId);
121+
122+
FluentIterable filesWithIncorrectOrder =
123+
FluentIterable.from(dataFiles).filter(file -> file.file().sortOrderId() != sortOrderId);
124+
return filesWithIncorrectOrder.append(super.selectFilesToRewrite(filesWithCorrectOrder));
125+
}
126+
}
127+
128+
private void validateOptions() {
129+
Preconditions.checkArgument(!sortOrder.isUnsorted(),
130+
"Can't use %s when there is no sort order, either define table %s's sort order or set sort" +
131+
"order in the action",
132+
name(), table().name());
133+
134+
SortOrder.checkCompatibility(sortOrder, table().schema());
135+
}
136+
}

core/src/test/java/org/apache/iceberg/MockFileScanTask.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
package org.apache.iceberg;
2121

22+
import org.mockito.Mockito;
23+
2224
public class MockFileScanTask extends BaseFileScanTask {
2325

2426
private final long length;
@@ -28,6 +30,18 @@ public MockFileScanTask(long length) {
2830
this.length = length;
2931
}
3032

33+
public MockFileScanTask(DataFile file) {
34+
super(file, null, null, null, null);
35+
this.length = file.fileSizeInBytes();
36+
}
37+
38+
public static MockFileScanTask mockTask(long length, int sortOrderId) {
39+
DataFile mockFile = Mockito.mock(DataFile.class);
40+
Mockito.when(mockFile.fileSizeInBytes()).thenReturn(length);
41+
Mockito.when(mockFile.sortOrderId()).thenReturn(sortOrderId);
42+
return new MockFileScanTask(mockFile);
43+
}
44+
3145
@Override
3246
public long length() {
3347
return length;
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iceberg.actions;
21+
22+
23+
import java.util.Collections;
24+
import java.util.List;
25+
import java.util.stream.IntStream;
26+
import org.apache.iceberg.AssertHelpers;
27+
import org.apache.iceberg.DataFile;
28+
import org.apache.iceberg.FileScanTask;
29+
import org.apache.iceberg.MockFileScanTask;
30+
import org.apache.iceberg.Schema;
31+
import org.apache.iceberg.SortOrder;
32+
import org.apache.iceberg.Table;
33+
import org.apache.iceberg.TableTestBase;
34+
import org.apache.iceberg.exceptions.ValidationException;
35+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
36+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
37+
import org.apache.iceberg.types.Types;
38+
import org.junit.Assert;
39+
import org.junit.Test;
40+
import org.junit.runner.RunWith;
41+
import org.junit.runners.Parameterized;
42+
43+
@RunWith(Parameterized.class)
44+
public class TestSortStrategy extends TableTestBase {
45+
46+
@Parameterized.Parameters(name = "formatVersion = {0}")
47+
public static Object[] parameters() {
48+
return new Object[] {2}; // We don't actually use the format version since everything is mock
49+
}
50+
51+
@Override
52+
public void setupTable() throws Exception {
53+
super.setupTable();
54+
table.replaceSortOrder().asc("data").commit();
55+
}
56+
57+
private static final long MB = 1024 * 1024;
58+
59+
public TestSortStrategy(int formatVersion) {
60+
super(formatVersion);
61+
}
62+
63+
class TestSortStrategyImpl extends SortStrategy {
64+
65+
@Override
66+
public Table table() {
67+
return table;
68+
}
69+
70+
@Override
71+
public List<DataFile> rewriteFiles(List<FileScanTask> filesToRewrite) {
72+
throw new UnsupportedOperationException();
73+
}
74+
}
75+
76+
private SortStrategy defaultSort() {
77+
return (SortStrategy) new TestSortStrategyImpl().options(Collections.emptyMap());
78+
}
79+
80+
private List<FileScanTask> tasksForSortOrder(int sortOrderId, int... fileSizesMB) {
81+
ImmutableList.Builder<FileScanTask> files = ImmutableList.builder();
82+
IntStream.of(fileSizesMB).forEach(length -> files.add(MockFileScanTask.mockTask(length * MB, sortOrderId)));
83+
return files.build();
84+
}
85+
86+
@Test
87+
public void testInvalidSortOrder() {
88+
AssertHelpers.assertThrows("Should not allow an unsorted Sort order", IllegalArgumentException.class,
89+
() -> defaultSort().sortOrder(SortOrder.unsorted()).options(Collections.emptyMap()));
90+
91+
AssertHelpers.assertThrows("Should not allow a Sort order with bad columns", ValidationException.class,
92+
() -> {
93+
Schema badSchema = new Schema(
94+
ImmutableList.of(Types.NestedField.required(0, "nonexistant", Types.IntegerType.get())));
95+
96+
defaultSort()
97+
.sortOrder(SortOrder.builderFor(badSchema).asc("nonexistant").build())
98+
.options(Collections.emptyMap());
99+
});
100+
}
101+
102+
@Test
103+
public void testSelectWrongSortOrder() {
104+
List<FileScanTask> expected = tasksForSortOrder(-1, 500, 500, 500, 500);
105+
RewriteStrategy strategy = defaultSort().options(Collections.emptyMap());
106+
List<FileScanTask> actual = ImmutableList.copyOf(strategy.selectFilesToRewrite(expected));
107+
108+
Assert.assertEquals("Should mark all files for rewrite if they have the wrong sort order",
109+
expected, actual);
110+
}
111+
112+
@Test
113+
public void testSelectCorrectSortOrder() {
114+
List<FileScanTask> fileScanTasks = tasksForSortOrder(table.sortOrder().orderId(), 500, 500, 500, 500);
115+
RewriteStrategy strategy = defaultSort().options(Collections.emptyMap());
116+
List<FileScanTask> actual = ImmutableList.copyOf(strategy.selectFilesToRewrite(fileScanTasks));
117+
118+
Assert.assertEquals("Should mark no files for rewrite if they have a good size and the right sort order",
119+
Collections.emptyList(), actual);
120+
}
121+
122+
@Test
123+
public void testSelectMixedOrderMixedSize() {
124+
List<FileScanTask> expected = ImmutableList.<FileScanTask>builder()
125+
.addAll(tasksForSortOrder(-1, 500, 500, 500, 500))
126+
.addAll(tasksForSortOrder(table.sortOrder().orderId(), 10, 10, 2000, 10))
127+
.build();
128+
129+
List<FileScanTask> fileScanTasks = ImmutableList.<FileScanTask>builder()
130+
.addAll(expected)
131+
.addAll(tasksForSortOrder(table.sortOrder().orderId(), 500, 490, 520))
132+
.build();
133+
134+
RewriteStrategy strategy = defaultSort().options(Collections.emptyMap());
135+
List<FileScanTask> actual = ImmutableList.copyOf(strategy.selectFilesToRewrite(fileScanTasks));
136+
137+
Assert.assertEquals("Should mark files for rewrite with invalid size and bad sort order",
138+
expected, actual);
139+
}
140+
141+
@Test
142+
public void testSelectAll() {
143+
List<FileScanTask> invalid = ImmutableList.<FileScanTask>builder()
144+
.addAll(tasksForSortOrder(-1, 500, 500, 500, 500))
145+
.addAll(tasksForSortOrder(table.sortOrder().orderId(), 10, 10, 2000, 10))
146+
.build();
147+
148+
List<FileScanTask> expected = ImmutableList.<FileScanTask>builder()
149+
.addAll(invalid)
150+
.addAll(tasksForSortOrder(table.sortOrder().orderId(), 500, 490, 520))
151+
.build();
152+
153+
RewriteStrategy strategy = defaultSort().options(ImmutableMap.of(SortStrategy.REWRITE_ALL, "true"));
154+
List<FileScanTask> actual = ImmutableList.copyOf(strategy.selectFilesToRewrite(expected));
155+
156+
Assert.assertEquals("Should mark all files for rewrite",
157+
expected, actual);
158+
}
159+
160+
@Test
161+
public void testUseSizeOptions() {
162+
List<FileScanTask> expected = ImmutableList.<FileScanTask>builder()
163+
.addAll(tasksForSortOrder(table.sortOrder().orderId(), 498, 551))
164+
.build();
165+
166+
List<FileScanTask> fileScanTasks = ImmutableList.<FileScanTask>builder()
167+
.addAll(expected)
168+
.addAll(tasksForSortOrder(table.sortOrder().orderId(), 500, 500))
169+
.build();
170+
171+
RewriteStrategy strategy = defaultSort().options(ImmutableMap.of(
172+
SortStrategy.MAX_FILE_SIZE_BYTES, Long.toString(550 * MB),
173+
SortStrategy.MIN_FILE_SIZE_BYTES, Long.toString(499 * MB)));
174+
175+
List<FileScanTask> actual = ImmutableList.copyOf(strategy.selectFilesToRewrite(fileScanTasks));
176+
177+
Assert.assertEquals("Should mark files for rewrite with adjusted min and max size",
178+
expected, actual);
179+
}
180+
181+
182+
}
183+

0 commit comments

Comments
 (0)