Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add GroupNode and parallelize TableFunctionProcessorNode #14941

Draft
wants to merge 91 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
91 commits
Select commit Hold shift + click to select a range
ccb57c8
refactor
Cpaulyz Nov 19, 2024
ea028ef
refactor
Cpaulyz Nov 19, 2024
90d34e1
save
Cpaulyz Nov 19, 2024
0d971bc
save
Cpaulyz Nov 19, 2024
4490c55
rollback scalar function definition
Cpaulyz Nov 19, 2024
e1dc67e
add license
Cpaulyz Nov 19, 2024
9983999
add license
Cpaulyz Nov 19, 2024
93ad64b
fix it
Cpaulyz Nov 19, 2024
6a68d33
Merge branch 'table_udsf' into udsf
Cpaulyz Nov 19, 2024
499ae65
save
Cpaulyz Nov 19, 2024
007a2d8
spotless
Cpaulyz Nov 19, 2024
b8a4fb6
save
Cpaulyz Nov 19, 2024
f55f48d
fix builtin
Cpaulyz Nov 19, 2024
7321535
save
Cpaulyz Nov 19, 2024
87dc7ea
fix it
Cpaulyz Nov 19, 2024
fa3c683
fix it
Cpaulyz Nov 19, 2024
c215e63
fix it
Cpaulyz Nov 20, 2024
bc0873d
fix it
Cpaulyz Nov 20, 2024
656dc4c
fix it
Cpaulyz Nov 20, 2024
dbe07ca
fix review
Cpaulyz Nov 26, 2024
f30dda4
split drop function plan
Cpaulyz Nov 26, 2024
54b24c2
fix clear throw NPE
Cpaulyz Nov 26, 2024
fbe6fde
Merge branch 'table_udsf' into udsf
Cpaulyz Nov 27, 2024
0ed1d85
merge master
Cpaulyz Nov 27, 2024
b1912aa
revert useless change
Cpaulyz Nov 27, 2024
1e2ce48
Save
Cpaulyz Nov 27, 2024
e8ea11c
add IT
Cpaulyz Nov 27, 2024
b8d24a2
add license
Cpaulyz Nov 27, 2024
ebcd1b9
rename beforeStart and add getLocalDate
Cpaulyz Nov 28, 2024
5797329
Merge branch 'master' into udsf
Cpaulyz Nov 28, 2024
30be7e5
modify getdatatype
Cpaulyz Nov 28, 2024
69c6ec3
fix review
Cpaulyz Dec 4, 2024
324cac7
spotless
Cpaulyz Dec 4, 2024
736b709
add ut and fix it
Cpaulyz Dec 4, 2024
7fb82cf
save
Cpaulyz Dec 4, 2024
8f067a5
update pom and add date IT
Cpaulyz Dec 4, 2024
60fd763
add license
Cpaulyz Dec 4, 2024
bfb32a1
fix
Cpaulyz Dec 4, 2024
89adefc
fix cpp client
Cpaulyz Dec 4, 2024
2bb60b2
save
Cpaulyz Dec 5, 2024
7ce6d2b
save
Cpaulyz Dec 9, 2024
502775e
merge master
Cpaulyz Dec 9, 2024
b7141d1
fix it
Cpaulyz Dec 9, 2024
3f4e8e0
add aggregate IT
Cpaulyz Dec 10, 2024
864abf1
remove useless code
Cpaulyz Dec 10, 2024
db2b84b
exp
Cpaulyz Dec 11, 2024
0b33fb8
add removable
Cpaulyz Dec 12, 2024
1e7e8cb
delete useless:
Cpaulyz Dec 12, 2024
1374d01
done
Cpaulyz Dec 12, 2024
eb05523
resolve conflict
Cpaulyz Dec 13, 2024
852d728
save
Cpaulyz Dec 17, 2024
a9a696f
save
Cpaulyz Dec 17, 2024
806e5ea
analyze arguments
Cpaulyz Dec 17, 2024
5e6907b
save
Cpaulyz Dec 25, 2024
6e7f105
merge master
Cpaulyz Dec 25, 2024
2257a35
save
Cpaulyz Dec 29, 2024
d837a8e
save
Cpaulyz Jan 4, 2025
ba6de95
save
Cpaulyz Jan 7, 2025
6aa4546
merge master
Cpaulyz Jan 7, 2025
34c96ad
save
Cpaulyz Jan 8, 2025
60a9470
add PartitionRecognizer UT
Cpaulyz Jan 15, 2025
cb66448
add leaf operator
Cpaulyz Jan 16, 2025
87fc892
save
Cpaulyz Jan 20, 2025
602a726
merge master
Cpaulyz Feb 3, 2025
b9dd299
adjust api
Cpaulyz Feb 5, 2025
b27a282
add analyzer test
Cpaulyz Feb 7, 2025
dd881da
merge master
Cpaulyz Feb 7, 2025
53709c9
remove useless code
Cpaulyz Feb 7, 2025
82522ed
Add IT, OptimizedRule, example and register
Cpaulyz Feb 8, 2025
726c991
Add license
Cpaulyz Feb 8, 2025
03464e3
add serderialize method, fix UT and IT
Cpaulyz Feb 9, 2025
249691c
done
Cpaulyz Feb 22, 2025
0d43c04
update example
Cpaulyz Feb 22, 2025
d592a8b
merge
Cpaulyz Feb 22, 2025
1902b24
adjust package structure
Cpaulyz Feb 22, 2025
ec07201
Merge branch 'udtf' into udtf-optimize
Cpaulyz Feb 22, 2025
a8a5486
refactor
Cpaulyz Feb 22, 2025
4ba4807
fix pass through
Cpaulyz Feb 22, 2025
a8a6510
fix pass through
Cpaulyz Feb 22, 2025
22238fc
resolve conflict
Cpaulyz Feb 22, 2025
7723e8d
Merge branch 'udtf' into udtf-optimize
Cpaulyz Feb 22, 2025
ef93c66
fix UT
Cpaulyz Feb 22, 2025
de19f73
fix IT
Cpaulyz Feb 23, 2025
6f3d73b
Merge branch 'udtf' into udtf-optimize
Cpaulyz Feb 23, 2025
bc073a0
save
Cpaulyz Feb 23, 2025
fff00ed
remove prune when empty clause
Cpaulyz Feb 23, 2025
88f4815
resolve todo
Cpaulyz Feb 23, 2025
d734b70
save
Cpaulyz Feb 23, 2025
eeb91ec
save
Cpaulyz Feb 23, 2025
f1775e9
change collect node
Cpaulyz Feb 24, 2025
d2f3471
save
Cpaulyz Feb 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added a
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.udf.table;

import org.apache.iotdb.udf.api.exception.UDFException;
import org.apache.iotdb.udf.api.relational.TableFunction;
import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis;
import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider;
import org.apache.iotdb.udf.api.relational.table.argument.Argument;
import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema;
import org.apache.iotdb.udf.api.relational.table.argument.ScalarArgument;
import org.apache.iotdb.udf.api.relational.table.argument.TableArgument;
import org.apache.iotdb.udf.api.relational.table.processor.TableFunctionDataProcessor;
import org.apache.iotdb.udf.api.relational.table.specification.ParameterSpecification;
import org.apache.iotdb.udf.api.relational.table.specification.ScalarParameterSpecification;
import org.apache.iotdb.udf.api.relational.table.specification.TableParameterSpecification;
import org.apache.iotdb.udf.api.type.Type;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
* This is an internal example of the TableFunction implementation. This function is declared as row
* semantic without pass through columns.
*
* <p>CREATE DATABASE test;
*
* <p>USE test;
*
* <p>CREATE TABLE t1(device_id STRING TAG, s1 TEXT FIELD, s2 INT32 FIELD);
*
* <p>INSERT INTO t1(time, device_id, s1, s2) VALUES (1, 'd1', 'a', 1), (2, 'd1', null, 2), (3,
* 'd1', 'c', null);
*
* <p>CREATE FUNCTION exclude_column AS 'org.apache.iotdb.udf.table.ExcludeColumnExample';
*
* <p>SHOW FUNCTIONS;
*
* <p>SELECT * FROM TABLE(exclude_column(TABLE(t1), 's2'));
*/
public class ExcludeColumnExample implements TableFunction {
private final String TBL_PARAM = "DATA";
private final String COL_PARAM = "EXCLUDE";

@Override
public List<ParameterSpecification> getArgumentsSpecifications() {
return Arrays.asList(
TableParameterSpecification.builder().name(TBL_PARAM).rowSemantics().build(),
ScalarParameterSpecification.builder().name(COL_PARAM).type(Type.STRING).build());
}

@Override
public TableFunctionAnalysis analyze(Map<String, Argument> arguments) throws UDFException {
TableArgument tableArgument = (TableArgument) arguments.get(TBL_PARAM);
String excludeColumn = (String) ((ScalarArgument) arguments.get(COL_PARAM)).getValue();
List<Integer> requiredColumns = new ArrayList<>();
DescribedSchema.Builder schemaBuilder = DescribedSchema.builder();
for (int i = 0; i < tableArgument.getFieldNames().size(); i++) {
Optional<String> fieldName = tableArgument.getFieldNames().get(i);
if (!fieldName.isPresent() || !fieldName.get().equalsIgnoreCase(excludeColumn)) {
requiredColumns.add(i);
schemaBuilder.addField(fieldName, tableArgument.getFieldTypes().get(i));
}
}
return TableFunctionAnalysis.builder()
.properColumnSchema(schemaBuilder.build())
.requiredColumns(TBL_PARAM, requiredColumns)
.build();
}

@Override
public TableFunctionProcessorProvider getProcessorProvider(Map<String, Argument> arguments) {
return new TableFunctionProcessorProvider() {
@Override
public TableFunctionDataProcessor getDataProcessor() {
return (input, properColumnBuilders, passThroughIndexBuilder) -> {
for (int i = 0; i < input.size(); i++) {
if (input.isNull(i)) {
properColumnBuilders.get(i).appendNull();
} else {
properColumnBuilders.get(i).writeObject(input.getObject(i));
}
}
};
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.udf.table;

import org.apache.iotdb.udf.api.exception.UDFException;
import org.apache.iotdb.udf.api.relational.TableFunction;
import org.apache.iotdb.udf.api.relational.access.Record;
import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis;
import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider;
import org.apache.iotdb.udf.api.relational.table.argument.Argument;
import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema;
import org.apache.iotdb.udf.api.relational.table.argument.ScalarArgument;
import org.apache.iotdb.udf.api.relational.table.argument.TableArgument;
import org.apache.iotdb.udf.api.relational.table.processor.TableFunctionDataProcessor;
import org.apache.iotdb.udf.api.relational.table.specification.ParameterSpecification;
import org.apache.iotdb.udf.api.relational.table.specification.ScalarParameterSpecification;
import org.apache.iotdb.udf.api.relational.table.specification.TableParameterSpecification;
import org.apache.iotdb.udf.api.type.Type;

import org.apache.tsfile.block.column.ColumnBuilder;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;

public class HOPTableFunction implements TableFunction {

private static final String DATA_PARAMETER_NAME = "DATA";
private static final String TIMECOL_PARAMETER_NAME = "TIMECOL";
private static final String SLIDE_PARAMETER_NAME = "SLIDE";
private static final String SIZE_PARAMETER_NAME = "SIZE";
private static final String START_PARAMETER_NAME = "START";

@Override
public List<ParameterSpecification> getArgumentsSpecifications() {
return Arrays.asList(
TableParameterSpecification.builder()
.name(DATA_PARAMETER_NAME)
.passThroughColumns()
.keepWhenEmpty()
.build(),
ScalarParameterSpecification.builder()
.name(TIMECOL_PARAMETER_NAME)
.type(Type.STRING)
.build(),
ScalarParameterSpecification.builder().name(SLIDE_PARAMETER_NAME).type(Type.INT64).build(),
ScalarParameterSpecification.builder().name(SIZE_PARAMETER_NAME).type(Type.INT64).build(),
ScalarParameterSpecification.builder()
.name(START_PARAMETER_NAME)
.type(Type.TIMESTAMP)
.defaultValue(Long.MIN_VALUE)
.build());
}

private int findTimeColumnIndex(TableArgument tableArgument, String expectedFieldName) {
int requiredIndex = -1;
for (int i = 0; i < tableArgument.getFieldTypes().size(); i++) {
Optional<String> fieldName = tableArgument.getFieldNames().get(i);
if (fieldName.isPresent() && expectedFieldName.equalsIgnoreCase(fieldName.get())) {
requiredIndex = i;
break;
}
}
return requiredIndex;
}

@Override
public TableFunctionAnalysis analyze(Map<String, Argument> arguments) {
TableArgument tableArgument = (TableArgument) arguments.get(DATA_PARAMETER_NAME);
String expectedFieldName =
(String) ((ScalarArgument) arguments.get(TIMECOL_PARAMETER_NAME)).getValue();
int requiredIndex = findTimeColumnIndex(tableArgument, expectedFieldName);
if (requiredIndex == -1) {
throw new UDFException("The required field is not found in the input table");
}
DescribedSchema properColumnSchema =
new DescribedSchema.Builder()
.addField("window_start", Type.TIMESTAMP)
.addField("window_end", Type.TIMESTAMP)
.build();

// outputColumnSchema
return TableFunctionAnalysis.builder()
.properColumnSchema(properColumnSchema)
.requiredColumns(DATA_PARAMETER_NAME, Collections.singletonList(requiredIndex))
.build();
}

@Override
public TableFunctionProcessorProvider getProcessorProvider(Map<String, Argument> arguments) {
return new TableFunctionProcessorProvider() {
@Override
public TableFunctionDataProcessor getDataProcessor() {
return new HOPDataProcessor(
(Long) ((ScalarArgument) arguments.get(START_PARAMETER_NAME)).getValue(),
(Long) ((ScalarArgument) arguments.get(SLIDE_PARAMETER_NAME)).getValue(),
(Long) ((ScalarArgument) arguments.get(SIZE_PARAMETER_NAME)).getValue());
}
};
}

private static class HOPDataProcessor implements TableFunctionDataProcessor {

private final long slide;
private final long size;
private long curTime;
private long curIndex = 0;

public HOPDataProcessor(long startTime, long slide, long size) {
this.slide = slide;
this.size = size;
this.curTime = startTime;
}

@Override
public void process(
Record input,
List<ColumnBuilder> properColumnBuilders,
ColumnBuilder passThroughIndexBuilder) {
long timeValue = input.getLong(0);
if (curTime == Long.MIN_VALUE) {
curTime = timeValue;
}
if (curTime + size <= timeValue) {
// jump to appropriate window
long move = (timeValue - curTime - size) / slide + 1;
curTime += move * slide;
}
long slideTime = curTime;
while (slideTime <= timeValue && slideTime + size > timeValue) {
properColumnBuilders.get(0).writeLong(slideTime);
properColumnBuilders.get(1).writeLong(slideTime + size);
passThroughIndexBuilder.writeLong(curIndex);
slideTime += slide;
}
curIndex++;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.udf.table;

import org.apache.iotdb.udf.api.exception.UDFArgumentNotValidException;
import org.apache.iotdb.udf.api.exception.UDFException;
import org.apache.iotdb.udf.api.relational.TableFunction;
import org.apache.iotdb.udf.api.relational.access.Record;
import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis;
import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider;
import org.apache.iotdb.udf.api.relational.table.argument.Argument;
import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema;
import org.apache.iotdb.udf.api.relational.table.argument.ScalarArgument;
import org.apache.iotdb.udf.api.relational.table.processor.TableFunctionDataProcessor;
import org.apache.iotdb.udf.api.relational.table.specification.ParameterSpecification;
import org.apache.iotdb.udf.api.relational.table.specification.ScalarParameterSpecification;
import org.apache.iotdb.udf.api.relational.table.specification.TableParameterSpecification;
import org.apache.iotdb.udf.api.type.Type;

import org.apache.tsfile.block.column.ColumnBuilder;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;

/**
* This is an internal example of the TableFunction implementation. This function is declared as set
* semantic with pass through columns.
*
* <p>CREATE DATABASE test;
*
* <p>USE test;
*
* <p>CREATE TABLE t1(device_id STRING TAG, s1 TEXT FIELD, s2 INT32 FIELD);
*
* <p>INSERT INTO t1(time, device_id, s1, s2) VALUES (1, 'd1', 'a', 1), (2, 'd1', null, 2), (3,
* 'd1', 'c', null);
*
* <p>CREATE FUNCTION repeat AS 'org.apache.iotdb.udf.table.RepeatExample';
*
* <p>SHOW FUNCTIONS;
*
* <p>SELECT * FROM TABLE(repeat(TABLE(t1), 2));
*/
public class RepeatExample implements TableFunction {
private final String TBL_PARAM = "DATA";
private final String N_PARAM = "N";

@Override
public List<ParameterSpecification> getArgumentsSpecifications() {
return Arrays.asList(
TableParameterSpecification.builder().name(TBL_PARAM).passThroughColumns().build(),
ScalarParameterSpecification.builder().name(N_PARAM).type(Type.INT32).build());
}

@Override
public TableFunctionAnalysis analyze(Map<String, Argument> arguments) throws UDFException {

ScalarArgument count = (ScalarArgument) arguments.get("N");
if (count == null) {
throw new UDFArgumentNotValidException("count argument for function repeat() is missing");
} else if ((int) count.getValue() <= 0) {
throw new UDFArgumentNotValidException(
"count argument for function repeat() must be positive");
}
return TableFunctionAnalysis.builder()
.properColumnSchema(DescribedSchema.builder().addField("repeat_index", Type.INT32).build())
.requiredColumns(
TBL_PARAM,
Collections.singletonList(0)) // per spec, function must require at least one column
.build();
}

@Override
public TableFunctionProcessorProvider getProcessorProvider(Map<String, Argument> arguments) {
ScalarArgument count = (ScalarArgument) arguments.get("N");
return new TableFunctionProcessorProvider() {
@Override
public TableFunctionDataProcessor getDataProcessor() {
return new TableFunctionDataProcessor() {
private final int n = (int) count.getValue();
private long recordIndex = 0;

@Override
public void process(
Record input,
List<ColumnBuilder> properColumnBuilders,
ColumnBuilder passThroughIndexBuilder) {
properColumnBuilders.get(0).writeInt(0);
passThroughIndexBuilder.writeLong(recordIndex++);
}

@Override
public void finish(
List<ColumnBuilder> columnBuilders, ColumnBuilder passThroughIndexBuilder) {
for (int i = 1; i < n; i++) {
for (int j = 0; j < recordIndex; j++) {
columnBuilders.get(0).writeInt(i);
passThroughIndexBuilder.writeLong(j);
}
}
}
};
}
};
}
}
Loading
Loading