Skip to content

Commit 32f5687

Browse files
aokolnychyirdblue
andcommitted
[SPARK-23889][SQL] DataSourceV2: required sorting and clustering for writes
Lead-authored-by: Anton Okolnychyi <aokolnychyi@apple.com> Co-authored-by: Ryan Blue <blue@apache.org>
1 parent 26c0493 commit 32f5687

File tree

26 files changed

+1513
-59
lines changed

26 files changed

+1513
-59
lines changed
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.distributions;
19+
20+
import org.apache.spark.annotation.Experimental;
21+
import org.apache.spark.sql.connector.expressions.Expression;
22+
23+
/**
24+
* A distribution where tuples that share the same values for clustering expressions are co-located
25+
* in the same partition.
26+
*/
27+
@Experimental
28+
public interface ClusteredDistribution extends Distribution {
29+
/**
30+
* Returns clustering expressions.
31+
*/
32+
Expression[] clustering();
33+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.distributions;
19+
20+
import org.apache.spark.annotation.Experimental;
21+
22+
/**
23+
* An interface that defines how data is distributed across partitions.
24+
*/
25+
@Experimental
26+
public interface Distribution {}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.distributions;
19+
20+
import org.apache.spark.annotation.Experimental;
21+
import org.apache.spark.sql.connector.expressions.Expression;
22+
import org.apache.spark.sql.connector.expressions.SortOrder;
23+
24+
/**
25+
* Helper methods to create distributions to pass into Spark.
26+
*/
27+
@Experimental
28+
public class Distributions {
29+
private Distributions() {
30+
}
31+
32+
/**
33+
* Creates a distribution where no promises are made about co-location of data.
34+
*/
35+
public static UnspecifiedDistribution unspecified() {
36+
return LogicalDistributions.unspecified();
37+
}
38+
39+
/**
40+
* Creates a distribution where tuples that share the same values for clustering expressions are
41+
* co-located in the same partition.
42+
*/
43+
public static ClusteredDistribution clustered(Expression[] clustering) {
44+
return LogicalDistributions.clustered(clustering);
45+
}
46+
47+
/**
48+
* Creates a distribution where tuples have been ordered across partitions according
49+
* to ordering expressions, but not necessarily within a given partition.
50+
*/
51+
public static OrderedDistribution ordered(SortOrder[] ordering) {
52+
return LogicalDistributions.ordered(ordering);
53+
}
54+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.distributions;
19+
20+
import org.apache.spark.annotation.Experimental;
21+
import org.apache.spark.sql.connector.expressions.SortOrder;
22+
23+
/**
24+
* A distribution where tuples have been ordered across partitions according
25+
* to ordering expressions, but not necessarily within a given partition.
26+
*/
27+
@Experimental
28+
public interface OrderedDistribution extends Distribution {
29+
/**
30+
* Returns ordering expressions.
31+
*/
32+
SortOrder[] ordering();
33+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.distributions;
19+
20+
import org.apache.spark.annotation.Experimental;
21+
22+
/**
23+
* A distribution where no promises are made about co-location of data.
24+
*/
25+
@Experimental
26+
public interface UnspecifiedDistribution extends Distribution {}

sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Expressions.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,4 +164,15 @@ public static Transform hours(String column) {
164164
return LogicalExpressions.hours(Expressions.column(column));
165165
}
166166

167+
/**
168+
* Create a sort expression.
169+
*
170+
* @param expr an expression to produce values to sort
171+
* @param direction direction of the sort
172+
* @param nullOrder null order of the sort
173+
* @return a SortOrder
174+
*/
175+
public static SortOrder sort(Expression expr, SortDirection direction, NullOrdering nullOrder) {
176+
return LogicalExpressions.sort(expr, direction, nullOrder);
177+
}
167178
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.expressions;
19+
20+
import org.apache.spark.annotation.Experimental;
21+
22+
/**
23+
* A null order used in sorting expressions.
24+
*/
25+
@Experimental
26+
public enum NullOrdering {
27+
NULLS_FIRST, NULLS_LAST;
28+
29+
@Override
30+
public String toString() {
31+
switch (this) {
32+
case NULLS_FIRST:
33+
return "NULLS FIRST";
34+
case NULLS_LAST:
35+
return "NULLS LAST";
36+
default:
37+
throw new IllegalArgumentException("Unexpected null order: " + this);
38+
}
39+
}
40+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.expressions;
19+
20+
import org.apache.spark.annotation.Experimental;
21+
22+
/**
23+
* A sort direction used in sorting expressions.
24+
*/
25+
@Experimental
26+
public enum SortDirection {
27+
ASCENDING, DESCENDING;
28+
29+
@Override
30+
public String toString() {
31+
switch (this) {
32+
case ASCENDING:
33+
return "ASC";
34+
case DESCENDING:
35+
return "DESC";
36+
default:
37+
throw new IllegalArgumentException("Unexpected sort direction: " + this);
38+
}
39+
}
40+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.expressions;
19+
20+
import org.apache.spark.annotation.Experimental;
21+
22+
/**
23+
* Represents a sort order in the public expression API.
24+
*/
25+
@Experimental
26+
public interface SortOrder extends Expression {
27+
/**
28+
* Returns the sort expression.
29+
*/
30+
Expression expression();
31+
32+
/**
33+
* Returns the sort direction.
34+
*/
35+
SortDirection direction();
36+
37+
/**
38+
* Returns the null ordering.
39+
*/
40+
NullOrdering nullOrdering();
41+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.write;
19+
20+
import org.apache.spark.annotation.Experimental;
21+
import org.apache.spark.sql.connector.distributions.Distribution;
22+
import org.apache.spark.sql.connector.distributions.UnspecifiedDistribution;
23+
import org.apache.spark.sql.connector.expressions.SortOrder;
24+
25+
/**
26+
* A write that requires a specific distribution and ordering of data.
27+
*/
28+
@Experimental
29+
public interface RequiresDistributionAndOrdering extends Write {
30+
/**
31+
* Returns the distribution required by this write.
32+
* <p>
33+
* Spark will distribute incoming records to satisfy the required distribution before
34+
* passing those records to the data source table on write.
35+
* <p>
36+
* Implementations may return {@link UnspecifiedDistribution} if they don't require any specific
37+
* distribution of data on write.
38+
*
39+
* @return the required distribution
40+
*/
41+
Distribution requiredDistribution();
42+
43+
/**
44+
* Returns the ordering required by this write.
45+
* <p>
46+
* Spark will order incoming records within partitions to satisfy the required ordering
47+
* before passing those records to the data source table on write.
48+
* <p>
49+
* Implementations may return an empty array if they don't require any specific ordering of data
50+
* on write.
51+
*
52+
* @return the required ordering
53+
*/
54+
SortOrder[] requiredOrdering();
55+
}

0 commit comments

Comments
 (0)