Skip to content

Commit 5efc8d1

Browse files
Timeout reading delta tables after incremental updates with null values
1 parent 36664c3 commit 5efc8d1

22 files changed

+307
-65
lines changed

presto-delta/src/main/java/com/facebook/presto/delta/DeltaClient.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -91,12 +91,14 @@ public Optional<DeltaTable> getTable(
9191
Table deltaTable = loadDeltaTable(location.toString(), deltaEngine.get());
9292
Snapshot snapshot = getSnapshot(deltaTable, deltaEngine.get(), schemaTableName, snapshotId,
9393
snapshotAsOfTimestampMillis);
94+
Optional<Long> snapshotVersion = Optional.of(snapshot.getVersion(deltaEngine.get()));
95+
List<DeltaColumn> schema = getSchema(config, schemaTableName, deltaEngine.get(), snapshot);
9496
return Optional.of(new DeltaTable(
9597
schemaTableName.getSchemaName(),
9698
schemaTableName.getTableName(),
9799
tableLocation,
98-
Optional.of(snapshot.getVersion(deltaEngine.get())), // lock the snapshot version
99-
getSchema(config, schemaTableName, deltaEngine.get(), snapshot)));
100+
snapshotVersion, // lock the snapshot version
101+
schema));
100102
}
101103

102104
private Snapshot getSnapshot(

presto-delta/src/main/java/com/facebook/presto/delta/DeltaExpressionUtils.java

+84-43
Original file line numberDiff line numberDiff line change
@@ -127,44 +127,62 @@ private static class AllFilesIterator
127127
implements CloseableIterator<Row>
128128
{
129129
private final CloseableIterator<FilteredColumnarBatch> inputIterator;
130-
private Row nextItem;
130+
private Row nextRow;
131131
private boolean rowsRemaining;
132132
private CloseableIterator<Row> row;
133133

134134
public AllFilesIterator(CloseableIterator<FilteredColumnarBatch> inputIterator)
135135
{
136136
this.inputIterator = inputIterator;
137137
}
138+
138139
@Override
139140
public boolean hasNext()
140141
{
141-
if (nextItem != null) {
142+
if (this.nextRow != null) {
142143
return true;
143144
}
144145

145-
if (!rowsRemaining) {
146-
if (!inputIterator.hasNext()) {
147-
return false;
146+
if (!this.rowsRemaining) {
147+
while (this.inputIterator.hasNext()) {
148+
// get new batch
149+
FilteredColumnarBatch nextBatch = this.inputIterator.next();
150+
this.row = nextBatch.getRows();
151+
this.rowsRemaining = false;
152+
if (this.row.hasNext()) {
153+
// it the batch has rows to return, we break out of the loop
154+
this.nextRow = this.row.next();
155+
this.rowsRemaining = true;
156+
break;
157+
}
158+
else {
159+
// it the batch is empty we close the row iterator and we test
160+
// the next batch
161+
try {
162+
this.row.close();
163+
}
164+
catch (IOException e) {
165+
throw new GenericInternalException("Could not close row batch", e);
166+
}
167+
}
148168
}
149-
FilteredColumnarBatch nextFile = inputIterator.next();
150-
row = nextFile.getRows();
151-
}
152-
Row nextRow;
153-
rowsRemaining = false;
154-
if (row.hasNext()) {
155-
nextRow = row.next();
156-
nextItem = nextRow;
157-
rowsRemaining = true;
158169
}
159-
if (!rowsRemaining) {
160-
try {
161-
row.close();
170+
else {
171+
rowsRemaining = false;
172+
if (row.hasNext()) {
173+
nextRow = row.next();
174+
rowsRemaining = true;
162175
}
163-
catch (IOException e) {
164-
throw new GenericInternalException("Could not close row batch", e);
176+
if (!rowsRemaining) {
177+
try {
178+
row.close();
179+
}
180+
catch (IOException e) {
181+
throw new GenericInternalException("Could not close row batch", e);
182+
}
165183
}
166184
}
167-
return nextItem != null;
185+
return this.nextRow != null;
168186
}
169187

170188
@Override
@@ -173,8 +191,8 @@ public Row next()
173191
if (!hasNext()) {
174192
throw new NoSuchElementException("There are no more files");
175193
}
176-
Row toReturn = nextItem;
177-
nextItem = null;
194+
Row toReturn = nextRow;
195+
nextRow = null;
178196
return toReturn;
179197
}
180198

@@ -244,30 +262,53 @@ public boolean hasNext()
244262
return true;
245263
}
246264

247-
if (!rowsRemaining) {
248-
if (!inputIterator.hasNext()) {
249-
return false;
265+
if (rowsRemaining) {
266+
Row nextRow;
267+
rowsRemaining = false;
268+
while (row.hasNext()) {
269+
nextRow = row.next();
270+
if (evaluatePartitionPredicate(partitionPredicate, partitionColumns, typeManager,
271+
nextRow)) {
272+
nextItem = nextRow;
273+
rowsRemaining = true;
274+
break;
275+
}
250276
}
251-
FilteredColumnarBatch nextFile = inputIterator.next();
252-
row = nextFile.getRows();
253-
}
254-
Row nextRow;
255-
rowsRemaining = false;
256-
while (row.hasNext()) {
257-
nextRow = row.next();
258-
if (evaluatePartitionPredicate(partitionPredicate, partitionColumns, typeManager,
259-
nextRow)) {
260-
nextItem = nextRow;
261-
rowsRemaining = true;
262-
break;
277+
if (!rowsRemaining) {
278+
try {
279+
row.close();
280+
}
281+
catch (IOException e) {
282+
throw new GenericInternalException("Cloud not close row batch", e);
283+
}
263284
}
264285
}
265-
if (!rowsRemaining) {
266-
try {
267-
row.close();
268-
}
269-
catch (IOException e) {
270-
throw new GenericInternalException("Cloud not close row batch", e);
286+
else {
287+
while (inputIterator.hasNext()) {
288+
FilteredColumnarBatch nextFile = inputIterator.next();
289+
row = nextFile.getRows();
290+
Row nextRow;
291+
rowsRemaining = false;
292+
while (row.hasNext()) {
293+
nextRow = row.next();
294+
if (evaluatePartitionPredicate(partitionPredicate, partitionColumns, typeManager,
295+
nextRow)) {
296+
nextItem = nextRow;
297+
rowsRemaining = true;
298+
break;
299+
}
300+
}
301+
if (rowsRemaining) {
302+
break;
303+
}
304+
else {
305+
try {
306+
row.close();
307+
}
308+
catch (IOException e) {
309+
throw new GenericInternalException("Cloud not close row batch", e);
310+
}
311+
}
271312
}
272313
}
273314
return nextItem != null;

presto-delta/src/test/java/com/facebook/presto/delta/AbstractDeltaDistributedQueryTestBase.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ private static DistributedQueryRunner createDeltaQueryRunner(Map<String, String>
188188
* @param deltaTableName Name of the delta table which is on the classpath.
189189
* @param hiveTableName Name of the Hive table that the Delta table is to be registered as in HMS
190190
*/
191-
private static void registerDeltaTableInHMS(QueryRunner queryRunner, String deltaTableName, String hiveTableName)
191+
protected static void registerDeltaTableInHMS(QueryRunner queryRunner, String deltaTableName, String hiveTableName)
192192
{
193193
queryRunner.execute(format(
194194
"CREATE TABLE %s.\"%s\".\"%s\" (dummyColumn INT) WITH (external_location = '%s')",
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.delta;
15+
16+
import com.facebook.presto.Session;
17+
import com.facebook.presto.common.type.TimeZoneKey;
18+
import com.facebook.presto.hive.HivePlugin;
19+
import com.facebook.presto.testing.MaterializedResult;
20+
import com.facebook.presto.testing.MaterializedRow;
21+
import com.facebook.presto.testing.QueryRunner;
22+
import com.facebook.presto.tests.DistributedQueryRunner;
23+
import com.facebook.presto.tpch.TpchPlugin;
24+
import com.google.common.collect.ImmutableMap;
25+
import org.testng.annotations.Test;
26+
27+
import java.nio.file.FileSystems;
28+
import java.nio.file.Path;
29+
import java.util.Map;
30+
31+
import static com.facebook.presto.testing.TestingSession.testSessionBuilder;
32+
import static com.facebook.presto.testing.assertions.Assert.assertEquals;
33+
import static java.lang.String.format;
34+
import static java.util.Locale.US;
35+
import static org.testng.Assert.assertNull;
36+
import static org.testng.Assert.assertTrue;
37+
38+
public class IncrementalUpdateQueriesTest
39+
extends AbstractDeltaDistributedQueryTestBase
40+
{
41+
private final String version = "delta_v3";
42+
private final String controlTableName = "deltatbl-partition-prune";
43+
private final String targetTableName = controlTableName + "-incremental";
44+
45+
@Override
46+
protected QueryRunner createQueryRunner()
47+
throws Exception
48+
{
49+
QueryRunner queryRunner = createDeltaQueryRunner(ImmutableMap.of(
50+
"experimental.pushdown-subfields-enabled", "true",
51+
"experimental.pushdown-dereference-enabled", "true"));
52+
registerDeltaTableInHMS(queryRunner,
53+
version + FileSystems.getDefault().getSeparator() + targetTableName,
54+
version + FileSystems.getDefault().getSeparator() + targetTableName);
55+
return queryRunner;
56+
}
57+
private static DistributedQueryRunner createDeltaQueryRunner(Map<String, String> extraProperties)
58+
throws Exception
59+
{
60+
Session session = testSessionBuilder()
61+
.setCatalog(DELTA_CATALOG)
62+
.setSchema(DELTA_SCHEMA.toLowerCase(US))
63+
.setTimeZoneKey(TimeZoneKey.getTimeZoneKey("Europe/Madrid"))
64+
.build();
65+
66+
DistributedQueryRunner queryRunner = DistributedQueryRunner.builder(session)
67+
.setExtraProperties(extraProperties)
68+
.build();
69+
70+
// Install the TPCH plugin for test data (not in Delta format)
71+
queryRunner.installPlugin(new TpchPlugin());
72+
queryRunner.createCatalog("tpch", "tpch");
73+
74+
Path dataDirectory = queryRunner.getCoordinator().getDataDirectory().resolve("delta_metadata");
75+
Path catalogDirectory = dataDirectory.getParent().resolve("catalog");
76+
77+
// Install a Delta connector catalog
78+
queryRunner.installPlugin(new DeltaPlugin());
79+
Map<String, String> deltaProperties = ImmutableMap.<String, String>builder()
80+
.put("hive.metastore", "file")
81+
.put("hive.metastore.catalog.dir", catalogDirectory.toFile().toURI().toString())
82+
.put("delta.case-sensitive-partitions-enabled", "true")
83+
.build();
84+
queryRunner.createCatalog(DELTA_CATALOG, "delta", deltaProperties);
85+
86+
// Install a Hive connector catalog that uses the same metastore as Delta
87+
// This catalog will be used to create tables in metastore as the Delta connector doesn't
88+
// support creating tables yet.
89+
queryRunner.installPlugin(new HivePlugin("hive"));
90+
Map<String, String> hiveProperties = ImmutableMap.<String, String>builder()
91+
.put("hive.metastore", "file")
92+
.put("hive.metastore.catalog.dir", catalogDirectory.toFile().toURI().toString())
93+
.put("hive.allow-drop-table", "true")
94+
.put("hive.security", "legacy")
95+
.build();
96+
queryRunner.createCatalog(HIVE_CATALOG, "hive", hiveProperties);
97+
queryRunner.execute(format("CREATE SCHEMA %s.%s", HIVE_CATALOG, DELTA_SCHEMA));
98+
99+
return queryRunner;
100+
}
101+
102+
private void checkQueryOutputOnIncrementalWithNullRows(String controlTableQuery, String testTableQuery)
103+
{
104+
MaterializedResult expectedResult = getQueryRunner().execute(controlTableQuery);
105+
MaterializedResult testResult = getQueryRunner().execute(testTableQuery);
106+
assertTrue(testResult.getRowCount() > expectedResult.getRowCount());
107+
// check that the non-null elements are equal in both tables
108+
for (int i = 0; i < expectedResult.getRowCount(); i++) {
109+
assertEquals(expectedResult.getMaterializedRows().get(i),
110+
testResult.getMaterializedRows().get(i));
111+
}
112+
// check that the remaining elements in the test table are null
113+
for (int i = expectedResult.getRowCount(); i < testResult.getRowCount(); i++) {
114+
MaterializedRow row = testResult.getMaterializedRows().get(i);
115+
for (Object field : row.getFields()) {
116+
assertNull(field);
117+
}
118+
}
119+
}
120+
121+
@Test
122+
public void readTableAllColumnsAfterIncrementalUpdateTest()
123+
{
124+
String testTableQuery =
125+
format("SELECT * FROM \"%s\".\"%s\" order by date, city asc", PATH_SCHEMA, goldenTablePathWithPrefix(version,
126+
targetTableName));
127+
String controlTableQuery =
128+
format("SELECT * FROM \"%s\".\"%s\" order by date, city asc", PATH_SCHEMA, goldenTablePathWithPrefix(version,
129+
controlTableName));
130+
checkQueryOutputOnIncrementalWithNullRows(controlTableQuery, testTableQuery);
131+
}
132+
133+
@Test
134+
public void readTableAllColumnsAfterIncrementalUpdateFilteringNullsTest()
135+
{
136+
String testTableQuery =
137+
format("SELECT * FROM \"%s\".\"%s\" where name is not null order by date, city asc",
138+
PATH_SCHEMA, goldenTablePathWithPrefix(version,
139+
targetTableName));
140+
String controlTableQuery =
141+
format("SELECT * FROM \"%s\".\"%s\" where name is not null order by date, city asc",
142+
PATH_SCHEMA, goldenTablePathWithPrefix(version,
143+
controlTableName));
144+
MaterializedResult expectedResult = getQueryRunner().execute(controlTableQuery);
145+
MaterializedResult testResult = getQueryRunner().execute(testTableQuery);
146+
assertEquals(testResult.getMaterializedRows(), expectedResult.getMaterializedRows());
147+
}
148+
149+
@Test
150+
public void readTableNonePartitionedColumnAfterIncrementalUpdateTest()
151+
{
152+
String testTableQuery =
153+
format("SELECT name, cnt FROM \"%s\".\"%s\" order by name, cnt asc", PATH_SCHEMA,
154+
goldenTablePathWithPrefix(version, targetTableName));
155+
String controlTableQuery =
156+
format("SELECT name, cnt FROM \"%s\".\"%s\" order by name, cnt asc", PATH_SCHEMA,
157+
goldenTablePathWithPrefix(version, controlTableName));
158+
checkQueryOutputOnIncrementalWithNullRows(controlTableQuery, testTableQuery);
159+
}
160+
161+
@Test
162+
public void readTableNonePartitionedColumnAfterIncrementalUpdateFilteringNullsTest()
163+
{
164+
String testTableQuery =
165+
format("SELECT name, cnt FROM \"%s\".\"%s\" where name is not null and " +
166+
"cnt is not null order by name, cnt asc", PATH_SCHEMA,
167+
goldenTablePathWithPrefix(version, targetTableName));
168+
String controlTableQuery =
169+
format("SELECT name, cnt FROM \"%s\".\"%s\" where name is not null and " +
170+
"cnt is not null order by name, cnt asc", PATH_SCHEMA,
171+
goldenTablePathWithPrefix(version, controlTableName));
172+
MaterializedResult expectedResult = getQueryRunner().execute(controlTableQuery);
173+
MaterializedResult testResult = getQueryRunner().execute(testTableQuery);
174+
assertEquals(testResult.getMaterializedRows(), expectedResult.getMaterializedRows());
175+
}
176+
177+
@Test
178+
public void readTablePartitionedColumnAfterIncrementalUpdateTest()
179+
{
180+
String testTableQuery =
181+
format("SELECT date, city FROM \"%s\".\"%s\" order by date, city asc", PATH_SCHEMA, goldenTablePathWithPrefix(version,
182+
targetTableName));
183+
String controlTableQuery =
184+
format("SELECT date, city FROM \"%s\".\"%s\" order by date, city asc", PATH_SCHEMA, goldenTablePathWithPrefix(version,
185+
controlTableName));
186+
checkQueryOutputOnIncrementalWithNullRows(controlTableQuery, testTableQuery);
187+
}
188+
189+
@Test
190+
public void readTablePartitionedColumnFilteringNullValuesAfterIncrementalUpdateTest()
191+
{
192+
String testTableQuery =
193+
format("SELECT date FROM \"%s\".\"%s\" where date is not null order by date asc",
194+
PATH_SCHEMA, goldenTablePathWithPrefix(version,
195+
targetTableName));
196+
String controlTableQuery =
197+
format("SELECT date FROM \"%s\".\"%s\" where date is not null order by date asc",
198+
PATH_SCHEMA, goldenTablePathWithPrefix(version,
199+
controlTableName));
200+
MaterializedResult expectedResult = getQueryRunner().execute(controlTableQuery);
201+
MaterializedResult testResult = getQueryRunner().execute(testTableQuery);
202+
assertEquals(testResult.getMaterializedRows(), expectedResult.getMaterializedRows());
203+
}
204+
}

0 commit comments

Comments
 (0)