Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Projection Pushdown Prototype #48

Open
wants to merge 1 commit into
base: delta_core_with_flink_feature_branch
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
.getLogicalType()
).getFieldNames();

System.out.println("Scott > DeltaDynamicTableFactory > createDynamicTableSource :: columns " + String.join(",", columns));

return new DeltaDynamicTableSource(
hadoopConf,
options, // PR FlinkSql_PR_8 change to queryOptions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
*/
package io.delta.flink.internal.table;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

import io.delta.flink.internal.table.DeltaFlinkJobSpecificOptions.TableMode;
import io.delta.flink.source.DeltaSource;
Expand All @@ -28,21 +31,26 @@
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceProvider;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.hadoop.conf.Configuration;

/**
* Implementation of {@link ScanTableSource} interface for Table/SQL support for Delta Source
* connector.
*/
public class DeltaDynamicTableSource implements ScanTableSource {
public class DeltaDynamicTableSource implements ScanTableSource, SupportsProjectionPushDown {

private final Configuration hadoopConf;

private final ReadableConfig tableOptions;

private final List<String> columns;

/** ProjectionPushDown */
private List<String> columnsPruned = new ArrayList<>();

/**
* Constructor for creating Source of Flink dynamic table to Delta table.
*
Expand Down Expand Up @@ -88,7 +96,12 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon
));
}

sourceBuilder.columnNames(columns);
if (!columnsPruned.isEmpty()) {
System.out.println("Scott > DeltaDynamicTableSource > getScanRuntimeProvider :: columnsPruned " + String.join(",", columnsPruned));
sourceBuilder.columnNames(columnsPruned);
} else {
System.out.println("Scott > DeltaDYnamicTableSource > getScanRuntimeProvider :: columnsPruned is empty -- SKIPPING ADDING COLUMN NAMES TO SOURCE BUILDER");
}

return SourceProvider.of(sourceBuilder.build());
}
Expand All @@ -103,4 +116,31 @@ public String asSummaryString() {
return "DeltaSource";
}

///////////////////////////////////////////
// Projection Push Down / Column Pruning //
///////////////////////////////////////////


@Override
public boolean supportsNestedProjection() {
return false;
}

@Override
public void applyProjection(int[][] projectedFields, DataType producedDataType) {
columnsPruned = new ArrayList<>();
StringBuilder output = new StringBuilder();
for (int[] row : projectedFields) {
for (int idx : row) {
System.out.println("idx " + idx + " ----> column " + columns.get(idx));
columnsPruned.add(columns.get(idx));
output.append(Integer.toString(idx));
output.append(",");
}
output.append("\n");
}

System.out.println("Scott > DeltaDynamicTableSource > applyProjection :: producedDataType " + producedDataType.toString());
System.out.println("Scott > DeltaDynamicTableSource > applyProjection :: projectedFields\n" + output.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

import io.delta.flink.source.internal.builder.BoundedDeltaSourceBuilder;
import io.delta.flink.source.internal.builder.DeltaBulkFormat;
Expand Down Expand Up @@ -177,6 +178,8 @@ public DeltaSource<RowData> build() {
.partitionColumns(sourceSchema.getPartitionColumns())
.build();

System.out.println("Scott > RowDataBoundedDeltaSourceBuilder > build :: sourceSchema.columnNames " + String.join("", sourceSchema.getColumnNames()));

return new DeltaSource<>(
tablePath,
format,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public Reader<RowData> createReader(
org.apache.flink.configuration.Configuration configuration,
DeltaSourceSplit deltaSourceSplit) throws IOException {
// System.out.println("Scott > RowDataFormat :: createReader, split " + deltaSourceSplit.path());

// TODO: get the columns to be read and push them down!
return new DeltaCoreRowDataReader(deltaSourceSplit.deltaScanTaskCore);
// return this.decoratedInputFormat.createReader(configuration, deltaSourceSplit);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package io.delta.flink.table.it;

import java.util.ArrayList;
import java.util.List;

import io.delta.flink.utils.extensions.InMemoryCatalogExtension;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.junit.Test;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.RegisterExtension;

import static io.delta.flink.utils.DeltaTestUtils.buildCluster;

/*
To create the test table:
from pyspark.sql.functions import col
path = "/Users/scott.sandre/connectors/standalone/src/test/resources/delta/partitioned-table-small"
for i in range(15):
low = i * 10
high = low + 10
spark.range(low, high).withColumn("part_a", col("id") % 3).withColumn("part_b", col("id") % 5).write.format("delta").partitionBy("part_a", "part_b").mode("append").save(path)
sql(f"select * from delta.`{path}`").show()
sql(f"select COUNT(*) from delta.`{path}`").show()
*/

public class SimpleDeltaCoreSQLSuite {

@RegisterExtension
private final InMemoryCatalogExtension catalogExtension = new InMemoryCatalogExtension();

public void setupDeltaCatalog(TableEnvironment tableEnv) {
catalogExtension.setupDeltaCatalog(tableEnv);
}

private static final int PARALLELISM = 2;

private final MiniClusterWithClientResource miniClusterResource = buildCluster(PARALLELISM);

public TableEnvironment tableEnv;

// Why isn't this working?
@BeforeEach
public void setUp() {
System.out.println("Scott > SimpleDeltaCoreSQLSuite > setUp");
try {
miniClusterResource.before();
tableEnv = StreamTableEnvironment.create(getTestBatchEnv());
} catch (Exception e) {
throw new RuntimeException("Weren't able to setup the test dependencies", e);
}
}

@AfterEach
public void afterEach() {
miniClusterResource.after();
}

@Test
public void test_column_binding() throws Exception {
/*
from pyspark.sql.functions import col
path = "/Users/scott.sandre/connectors/standalone/src/test/resources/delta/test-table-001"
for i in range(5):
low = i * 20
high = low + 20
spark.range(low, high).withColumn("col1", col("id") % 2).withColumn("col2", col("id") % 2 + 10).withColumn("col3", col("id") % 2 + 20).write.format("delta").mode("append").save(path)

sql(f"select * from delta.`{path}`").show()
sql(f"select COUNT(*) from delta.`{path}`").show()
*/
if (tableEnv == null) {
System.out.println("Scott > tableEnv was null");
miniClusterResource.before();
tableEnv = StreamTableEnvironment.create(getTestBatchEnv());
setupDeltaCatalog(tableEnv);
}

String sourceTablePath = "../standalone/src/test/resources/delta/test-table-001";

String sourceTableSql = String.format(
"CREATE TABLE sourceTable2 ("
+ " id BIGINT,"
+ " col1 BIGINT,"
+ " col2 BIGINT,"
+ " col3 BIGINT"
+ ") WITH ("
+ " 'connector' = 'delta',"
+ " 'table-path' = '%s'"
+ ")",
sourceTablePath);
System.out.println("Scott > test_column_binding > sourceTableSql2 " + sourceTableSql);
tableEnv.executeSql(sourceTableSql);

String selectSql = "SELECT col3, col2, col1 FROM sourceTable2 /*+ OPTIONS('mode' = 'batch') */";
System.out.println("Scott > test_column_binding > selectSql " + selectSql);

TableResult resultTable = tableEnv.executeSql(selectSql);

try (CloseableIterator<Row> collect = resultTable.collect()) {
while (collect.hasNext()) {
Row row = collect.next();
Object c0 = row.getField(0);
Object c1 = row.getField(1);
Object c2 = row.getField(2);
System.out.println(String.format("%s, %s, %s", c0, c1, c2));
}
}
}

// @Test
// public void test_table_partition_push_down() throws Exception {
// if (tableEnv == null) {
// System.out.println("Scott > tableEnv was null");
// miniClusterResource.before();
// tableEnv = StreamTableEnvironment.create(getTestBatchEnv());
// setupDeltaCatalog(tableEnv);
// }
//
// String sourceTablePath = "../standalone/src/test/resources/delta/partitioned-table-small";
//
// String sourceTableSql = String.format(
// "CREATE TABLE sourceTable ("
// + " id BIGINT,"
// + " part_a BIGINT,"
// + " part_b BIGINT"
// + ") PARTITIONED BY (part_a, part_b) "
// + "WITH ("
// + " 'connector' = 'delta',"
// + " 'table-path' = '%s'"
// + ")",
// sourceTablePath);
// System.out.println("Scott > test_table_partition_push_down > sourceTableSql " + sourceTableSql);
// tableEnv.executeSql(sourceTableSql);
//
// String selectSql = "SELECT * FROM sourceTable /*+ OPTIONS('mode' = 'batch') */ WHERE (part_a = 0 AND part_b = 0) OR (part_a = 1 AND part_b = 1)";
// System.out.println("Scott > test_table_partition_push_down > selectSql " + selectSql);
//
// TableResult resultTable = tableEnv.executeSql(selectSql);
//
// try (CloseableIterator<Row> collect = resultTable.collect()) {
// while (collect.hasNext()) {
// Row row = collect.next();
// Object c0 = row.getField(0);
// Object c1 = row.getField(1);
// Object c2 = row.getField(2);
// System.out.println(String.format("%s, %s, %s", c0, c1, c2));
// }
// }
// }

/** Copied from FlinkSqlTestITCase.java */
private StreamExecutionEnvironment getTestBatchEnv() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
return env;
}
}
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{"commitInfo":{"timestamp":1679418576036,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"16","numOutputRows":"20","numOutputBytes":"19735"},"engineInfo":"Apache-Spark/3.3.1 Delta-Lake/2.2.0","txnId":"4d0ed8fb-6434-496e-a9d8-d0affa2e5aec"}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"ddd33c36-d176-44cb-8aa8-d91580248214","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"col1\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"col2\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"col3\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1679418574369}}
{"add":{"path":"part-00000-cc1233cc-17cd-43cf-b8d7-4af5d9c9d4fe-c000.snappy.parquet","partitionValues":{},"size":1225,"modificationTime":1679418576000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":0,\"col1\":0,\"col2\":10,\"col3\":20},\"maxValues\":{\"id\":0,\"col1\":0,\"col2\":10,\"col3\":20},\"nullCount\":{\"id\":0,\"col1\":0,\"col2\":0,\"col3\":0}}"}}
{"add":{"path":"part-00001-c2ef1a3e-9332-4dc2-b076-c28413fe324a-c000.snappy.parquet","partitionValues":{},"size":1226,"modificationTime":1679418575000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":1,\"col1\":1,\"col2\":11,\"col3\":21},\"maxValues\":{\"id\":1,\"col1\":1,\"col2\":11,\"col3\":21},\"nullCount\":{\"id\":0,\"col1\":0,\"col2\":0,\"col3\":0}}"}}
{"add":{"path":"part-00002-a90f7e22-a640-4a32-8f17-3ae7c47b93ea-c000.snappy.parquet","partitionValues":{},"size":1225,"modificationTime":1679418575000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":2,\"col1\":0,\"col2\":10,\"col3\":20},\"maxValues\":{\"id\":2,\"col1\":0,\"col2\":10,\"col3\":20},\"nullCount\":{\"id\":0,\"col1\":0,\"col2\":0,\"col3\":0}}"}}
{"add":{"path":"part-00003-c9095915-3523-43d6-9786-65cbb3eead24-c000.snappy.parquet","partitionValues":{},"size":1258,"modificationTime":1679418576000,"dataChange":true,"stats":"{\"numRecords\":2,\"minValues\":{\"id\":3,\"col1\":0,\"col2\":10,\"col3\":20},\"maxValues\":{\"id\":4,\"col1\":1,\"col2\":11,\"col3\":21},\"nullCount\":{\"id\":0,\"col1\":0,\"col2\":0,\"col3\":0}}"}}
{"add":{"path":"part-00004-ce4a9dc4-0fe4-4511-b679-217ab63e049a-c000.snappy.parquet","partitionValues":{},"size":1226,"modificationTime":1679418575000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":5,\"col1\":1,\"col2\":11,\"col3\":21},\"maxValues\":{\"id\":5,\"col1\":1,\"col2\":11,\"col3\":21},\"nullCount\":{\"id\":0,\"col1\":0,\"col2\":0,\"col3\":0}}"}}
{"add":{"path":"part-00005-1aa80402-80bb-461d-a982-0d53c0e7f08a-c000.snappy.parquet","partitionValues":{},"size":1225,"modificationTime":1679418576000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":6,\"col1\":0,\"col2\":10,\"col3\":20},\"maxValues\":{\"id\":6,\"col1\":0,\"col2\":10,\"col3\":20},\"nullCount\":{\"id\":0,\"col1\":0,\"col2\":0,\"col3\":0}}"}}
{"add":{"path":"part-00006-a47a342e-3d49-4c9b-81cc-9a0f08807cb0-c000.snappy.parquet","partitionValues":{},"size":1226,"modificationTime":1679418575000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":7,\"col1\":1,\"col2\":11,\"col3\":21},\"maxValues\":{\"id\":7,\"col1\":1,\"col2\":11,\"col3\":21},\"nullCount\":{\"id\":0,\"col1\":0,\"col2\":0,\"col3\":0}}"}}
{"add":{"path":"part-00007-c7a139ab-b2f8-4fb4-90a6-9cae678e280e-c000.snappy.parquet","partitionValues":{},"size":1257,"modificationTime":1679418575000,"dataChange":true,"stats":"{\"numRecords\":2,\"minValues\":{\"id\":8,\"col1\":0,\"col2\":10,\"col3\":20},\"maxValues\":{\"id\":9,\"col1\":1,\"col2\":11,\"col3\":21},\"nullCount\":{\"id\":0,\"col1\":0,\"col2\":0,\"col3\":0}}"}}
{"add":{"path":"part-00008-1219a0b8-2d9c-4efa-a992-f62396b771a2-c000.snappy.parquet","partitionValues":{},"size":1224,"modificationTime":1679418575000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":10,\"col1\":0,\"col2\":10,\"col3\":20},\"maxValues\":{\"id\":10,\"col1\":0,\"col2\":10,\"col3\":20},\"nullCount\":{\"id\":0,\"col1\":0,\"col2\":0,\"col3\":0}}"}}
{"add":{"path":"part-00009-8e8b26e4-65ee-4fd8-a958-7c1a57b0250f-c000.snappy.parquet","partitionValues":{},"size":1226,"modificationTime":1679418575000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":11,\"col1\":1,\"col2\":11,\"col3\":21},\"maxValues\":{\"id\":11,\"col1\":1,\"col2\":11,\"col3\":21},\"nullCount\":{\"id\":0,\"col1\":0,\"col2\":0,\"col3\":0}}"}}
{"add":{"path":"part-00010-5cd00d0c-f1c8-4ac8-8209-50fb9ec67512-c000.snappy.parquet","partitionValues":{},"size":1225,"modificationTime":1679418575000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":12,\"col1\":0,\"col2\":10,\"col3\":20},\"maxValues\":{\"id\":12,\"col1\":0,\"col2\":10,\"col3\":20},\"nullCount\":{\"id\":0,\"col1\":0,\"col2\":0,\"col3\":0}}"}}
{"add":{"path":"part-00011-dcd212cd-431d-457b-86a6-2a910e708de9-c000.snappy.parquet","partitionValues":{},"size":1258,"modificationTime":1679418576000,"dataChange":true,"stats":"{\"numRecords\":2,\"minValues\":{\"id\":13,\"col1\":0,\"col2\":10,\"col3\":20},\"maxValues\":{\"id\":14,\"col1\":1,\"col2\":11,\"col3\":21},\"nullCount\":{\"id\":0,\"col1\":0,\"col2\":0,\"col3\":0}}"}}
{"add":{"path":"part-00012-b749d93f-bf6d-44b9-b7f8-03730e7d71b2-c000.snappy.parquet","partitionValues":{},"size":1226,"modificationTime":1679418575000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":15,\"col1\":1,\"col2\":11,\"col3\":21},\"maxValues\":{\"id\":15,\"col1\":1,\"col2\":11,\"col3\":21},\"nullCount\":{\"id\":0,\"col1\":0,\"col2\":0,\"col3\":0}}"}}
{"add":{"path":"part-00013-4e975153-85b3-4ee0-90df-ad31068c8312-c000.snappy.parquet","partitionValues":{},"size":1225,"modificationTime":1679418575000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":16,\"col1\":0,\"col2\":10,\"col3\":20},\"maxValues\":{\"id\":16,\"col1\":0,\"col2\":10,\"col3\":20},\"nullCount\":{\"id\":0,\"col1\":0,\"col2\":0,\"col3\":0}}"}}
{"add":{"path":"part-00014-5acf69bd-0208-48c4-a3f1-884d1d35ad75-c000.snappy.parquet","partitionValues":{},"size":1226,"modificationTime":1679418575000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":17,\"col1\":1,\"col2\":11,\"col3\":21},\"maxValues\":{\"id\":17,\"col1\":1,\"col2\":11,\"col3\":21},\"nullCount\":{\"id\":0,\"col1\":0,\"col2\":0,\"col3\":0}}"}}
{"add":{"path":"part-00015-3c9c7e6e-2c09-4aa1-beea-0b4c743743bd-c000.snappy.parquet","partitionValues":{},"size":1257,"modificationTime":1679418576000,"dataChange":true,"stats":"{\"numRecords\":2,\"minValues\":{\"id\":18,\"col1\":0,\"col2\":10,\"col3\":20},\"maxValues\":{\"id\":19,\"col1\":1,\"col2\":11,\"col3\":21},\"nullCount\":{\"id\":0,\"col1\":0,\"col2\":0,\"col3\":0}}"}}
Loading