diff --git a/.scalafmt.conf b/.scalafmt.conf new file mode 100644 index 0000000000..47bb6d961e --- /dev/null +++ b/.scalafmt.conf @@ -0,0 +1,70 @@ +runner.dialect = scala212 + +# Version is required to make sure IntelliJ picks the right version +version = 3.4.3 +preset = default + +# Max column +maxColumn = 100 + +# This parameter simply says the .stripMargin method was not redefined by the user to assign +# special meaning to indentation preceding the | character. Hence, that indentation can be modified. +assumeStandardLibraryStripMargin = true +align.stripMargin = true + +# Align settings +align.preset = none +align.closeParenSite = false +align.openParenCallSite = false +danglingParentheses.defnSite = false +danglingParentheses.callSite = false +danglingParentheses.ctrlSite = true +danglingParentheses.tupleSite = false +align.openParenCallSite = false +align.openParenDefnSite = false +align.openParenTupleSite = false + +# Newlines +newlines.alwaysBeforeElseAfterCurlyIf = false +newlines.beforeCurlyLambdaParams = multiline # Newline before lambda params +newlines.afterCurlyLambdaParams = squash # No newline after lambda params +newlines.inInterpolation = "avoid" +newlines.avoidInResultType = true +optIn.annotationNewlines = true + +# Scaladoc +docstrings.style = Asterisk # Javadoc style +docstrings.removeEmpty = true +docstrings.oneline = fold +docstrings.forceBlankLineBefore = true + +# Indentation +indent.extendSite = 2 # This makes sure extend is not indented as the ctor parameters + +# Rewrites +rewrite.rules = [AvoidInfix, Imports, RedundantBraces, SortModifiers] + +# Imports +rewrite.imports.sort = scalastyle +rewrite.imports.groups = [ + ["com.alibaba.fluss\\..*"], + ["com.alibaba.fluss.shade\\..*"], + [".*"], + ["javax\\..*"], + ["java\\..*"], + ["scala\\..*"] +] +rewrite.imports.contiguousGroups = no +importSelectors = singleline # Imports in a single line, like IntelliJ + +# Remove redundant braces in string interpolation. +rewrite.redundantBraces.stringInterpolation = true +rewrite.redundantBraces.defnBodies = false +rewrite.redundantBraces.generalExpressions = false +rewrite.redundantBraces.ifElseExpressions = false +rewrite.redundantBraces.methodBodies = false +rewrite.redundantBraces.includeUnitMethods = false +rewrite.redundantBraces.maxBreaks = 1 + +# Remove trailing commas +rewrite.trailingCommas.style = "never" diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java index 3e7484f19d..6b53dc48df 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java @@ -919,7 +919,7 @@ void testInvalidColumnProjection() throws Exception { assertThatThrownBy(() -> createLogScanner(table, new int[] {1, 2, 3, 4, 5})) .isInstanceOf(IllegalArgumentException.class) .hasMessage( - "Projected field index 2 is out of bound for schema ROW<`a` INT, `b` STRING>"); + "Projected field index 2 is out of bound for schema ROW<`a` INT '...', `b` STRING '...'>"); } @ParameterizedTest diff --git a/fluss-common/src/main/java/org/apache/fluss/cluster/Cluster.java b/fluss-common/src/main/java/org/apache/fluss/cluster/Cluster.java index c5528c8f2c..eee883fb25 100644 --- a/fluss-common/src/main/java/org/apache/fluss/cluster/Cluster.java +++ b/fluss-common/src/main/java/org/apache/fluss/cluster/Cluster.java @@ -147,9 +147,7 @@ public ServerNode getCoordinatorServer() { return coordinatorServer; } - /** - * @return The known set of alive tablet servers. - */ + /** @return The known set of alive tablet servers. */ public Map getAliveTabletServers() { return aliveTabletServersById; } diff --git a/fluss-common/src/main/java/org/apache/fluss/memory/MemorySegmentPool.java b/fluss-common/src/main/java/org/apache/fluss/memory/MemorySegmentPool.java index c36e44c533..c3bf2d158b 100644 --- a/fluss-common/src/main/java/org/apache/fluss/memory/MemorySegmentPool.java +++ b/fluss-common/src/main/java/org/apache/fluss/memory/MemorySegmentPool.java @@ -66,14 +66,10 @@ public interface MemorySegmentPool { */ void returnAll(List memory); - /** - * @return Free page number. - */ + /** @return Free page number. */ int freePages(); - /** - * @return the available memory size in bytes. - */ + /** @return the available memory size in bytes. */ long availableMemory(); void close(); diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java index 2de8419365..d6d946724a 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java +++ b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java @@ -72,7 +72,9 @@ private Schema(List columns, @Nullable PrimaryKey primaryKey) { .map( column -> new DataField( - column.getName(), column.getDataType())) + column.getName(), + column.getDataType(), + column.getComment().orElse(null))) .collect(Collectors.toList())); } diff --git a/fluss-common/src/main/java/org/apache/fluss/predicate/Predicate.java b/fluss-common/src/main/java/org/apache/fluss/predicate/Predicate.java index e08c8347b5..2d67f7f32c 100644 --- a/fluss-common/src/main/java/org/apache/fluss/predicate/Predicate.java +++ b/fluss-common/src/main/java/org/apache/fluss/predicate/Predicate.java @@ -47,9 +47,7 @@ public interface Predicate extends Serializable { */ boolean test(long rowCount, InternalRow minValues, InternalRow maxValues, Long[] nullCounts); - /** - * @return the negation predicate of this predicate if possible. - */ + /** @return the negation predicate of this predicate if possible. */ Optional negate(); T visit(PredicateVisitor visitor); diff --git a/fluss-common/src/main/java/org/apache/fluss/record/DefaultValueRecordBatch.java b/fluss-common/src/main/java/org/apache/fluss/record/DefaultValueRecordBatch.java index b8aefc75b6..1490d97a9f 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/DefaultValueRecordBatch.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/DefaultValueRecordBatch.java @@ -257,9 +257,7 @@ public void append(short schemaId, BinaryRow row) throws IOException { currentRecordNumber++; } - /** - * @param valueBytes consisted of schema id and the row encoded in the value bytes - */ + /** @param valueBytes consisted of schema id and the row encoded in the value bytes */ public void append(byte[] valueBytes) throws IOException { if (isClosed) { throw new IllegalStateException( diff --git a/fluss-common/src/main/java/org/apache/fluss/record/send/SendWritableOutput.java b/fluss-common/src/main/java/org/apache/fluss/record/send/SendWritableOutput.java index c799fe4ec5..f38822c935 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/send/SendWritableOutput.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/send/SendWritableOutput.java @@ -34,9 +34,7 @@ public class SendWritableOutput extends ByteBufWritableOutput { /** The current reader index of the underlying {@link #buf} for building next {@link Send}. */ private int currentReaderIndex = 0; - /** - * @param buf The ByteBuf that has capacity of data size excluding zero-copy. - */ + /** @param buf The ByteBuf that has capacity of data size excluding zero-copy. */ public SendWritableOutput(ByteBuf buf) { super(buf); this.sends = new ArrayDeque<>(1); diff --git a/fluss-common/src/main/java/org/apache/fluss/remote/RemoteLogSegment.java b/fluss-common/src/main/java/org/apache/fluss/remote/RemoteLogSegment.java index 39480e4d10..3022a23b26 100644 --- a/fluss-common/src/main/java/org/apache/fluss/remote/RemoteLogSegment.java +++ b/fluss-common/src/main/java/org/apache/fluss/remote/RemoteLogSegment.java @@ -93,16 +93,12 @@ public UUID remoteLogSegmentId() { return remoteLogSegmentId; } - /** - * @return Remote log start offset of this segment (inclusive). - */ + /** @return Remote log start offset of this segment (inclusive). */ public long remoteLogStartOffset() { return remoteLogStartOffset; } - /** - * @return Remote log end offset of this segment (inclusive). - */ + /** @return Remote log end offset of this segment (inclusive). */ public long remoteLogEndOffset() { return remoteLogEndOffset; } diff --git a/fluss-common/src/main/java/org/apache/fluss/row/TimestampNtz.java b/fluss-common/src/main/java/org/apache/fluss/row/TimestampNtz.java index cc2e4586dc..289f2880d9 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/TimestampNtz.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/TimestampNtz.java @@ -44,6 +44,10 @@ public class TimestampNtz implements Comparable, Serializable { // the number of milliseconds in a day. private static final long MILLIS_PER_DAY = 86400000; // = 24 * 60 * 60 * 1000 + public static final long MICROS_PER_MILLIS = 1000L; + + public static final long NANOS_PER_MICROS = 1000L; + // this field holds the integral second and the milli-of-second. private final long millisecond; @@ -70,6 +74,12 @@ public int getNanoOfMillisecond() { return nanoOfMillisecond; } + /** Converts this {@link TimestampNtz} object to micros. */ + public long toMicros() { + long micros = Math.multiplyExact(millisecond, MICROS_PER_MILLIS); + return micros + nanoOfMillisecond / NANOS_PER_MICROS; + } + /** * Creates an instance of {@link TimestampNtz} from milliseconds. * diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/CatalogExceptionUtils.java b/fluss-common/src/main/java/org/apache/fluss/utils/CatalogExceptionUtils.java similarity index 98% rename from fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/CatalogExceptionUtils.java rename to fluss-common/src/main/java/org/apache/fluss/utils/CatalogExceptionUtils.java index 2e4a89a13a..aa47a393dc 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/CatalogExceptionUtils.java +++ b/fluss-common/src/main/java/org/apache/fluss/utils/CatalogExceptionUtils.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.fluss.flink.utils; +package org.apache.fluss.utils; import org.apache.fluss.exception.DatabaseAlreadyExistException; import org.apache.fluss.exception.DatabaseNotEmptyException; diff --git a/fluss-common/src/main/java/org/apache/fluss/utils/DateTimeUtils.java b/fluss-common/src/main/java/org/apache/fluss/utils/DateTimeUtils.java index 9106993eae..21d108d442 100644 --- a/fluss-common/src/main/java/org/apache/fluss/utils/DateTimeUtils.java +++ b/fluss-common/src/main/java/org/apache/fluss/utils/DateTimeUtils.java @@ -56,6 +56,13 @@ public class DateTimeUtils { /** The number of milliseconds in an hour. */ private static final long MILLIS_PER_HOUR = 3600000L; // = 60 * 60 * 1000 + /** + * The number of milliseconds in a day. + * + *

This is the modulo 'mask' used when converting TIMESTAMP values to DATE and TIME values. + */ + public static final long MILLIS_PER_DAY = 86400000L; // = 24 * 60 * 60 * 1000 + private static final DateTimeFormatter DEFAULT_TIMESTAMP_FORMATTER = new DateTimeFormatterBuilder() .appendPattern("yyyy-[MM][M]-[dd][d]") @@ -256,6 +263,19 @@ public static Integer parseTime(String v) { + milli; } + /** + * Converts the Java type used for UDF parameters of SQL DATE type ({@link java.sql.Date}) to + * internal representation (int). + */ + public static int toInternal(java.sql.Date date) { + long ts = date.getTime() + TimeZone.getDefault().getOffset(date.getTime()); + return (int) (ts / MILLIS_PER_DAY); + } + + public static int toInternal(LocalDate date) { + return ymdToUnixDate(date.getYear(), date.getMonthValue(), date.getDayOfMonth()); + } + private static boolean isInteger(String s) { boolean isInt = s.length() > 0; for (int i = 0; i < s.length(); i++) { diff --git a/fluss-common/src/main/java/org/apache/fluss/utils/InternalRowUtils.java b/fluss-common/src/main/java/org/apache/fluss/utils/InternalRowUtils.java new file mode 100644 index 0000000000..3c7f314ffa --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/utils/InternalRowUtils.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.utils; + +import org.apache.fluss.row.BinaryString; +import org.apache.fluss.row.Decimal; +import org.apache.fluss.row.GenericRow; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.row.indexed.IndexedRow; +import org.apache.fluss.types.DataType; +import org.apache.fluss.types.RowType; + +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.util.List; + +/** Utils for {@link InternalRow} structures. */ +public class InternalRowUtils { + + public static InternalRow copyInternalRow(InternalRow row, RowType rowType) { + if (row instanceof IndexedRow) { + return ((IndexedRow) row).copy(); + } else { + GenericRow ret = new GenericRow(row.getFieldCount()); + + InternalRow.FieldGetter[] fieldGetters = createFieldGetters(rowType.getChildren()); + for (int i = 0; i < row.getFieldCount(); ++i) { + ret.setField(i, fieldGetters[i].getFieldOrNull(row)); + } + + return ret; + } + } + + public static Object copy(Object o, DataType type) { + if (o instanceof BinaryString) { + return ((BinaryString) o).copy(); + } else if (o instanceof InternalRow) { + return copyInternalRow((InternalRow) o, (RowType) type); + } else if (o instanceof Decimal) { + return ((Decimal) o).copy(); + } + return o; + } + + public static long castToIntegral(Decimal dec) { + BigDecimal bd = dec.toBigDecimal(); + // rounding down. This is consistent with float=>int, + // and consistent with SQLServer, Spark. + bd = bd.setScale(0, RoundingMode.DOWN); + return bd.longValue(); + } + + public static InternalRow.FieldGetter[] createFieldGetters(List fieldTypes) { + InternalRow.FieldGetter[] fieldGetters = new InternalRow.FieldGetter[fieldTypes.size()]; + for (int i = 0; i < fieldTypes.size(); i++) { + fieldGetters[i] = createNullCheckingFieldGetter(fieldTypes.get(i), i); + } + return fieldGetters; + } + + public static InternalRow.FieldGetter createNullCheckingFieldGetter( + DataType dataType, int index) { + InternalRow.FieldGetter getter = InternalRow.createFieldGetter(dataType, index); + if (dataType.isNullable()) { + return getter; + } else { + return row -> { + if (row.isNullAt(index)) { + return null; + } + return getter.getFieldOrNull(row); + }; + } + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/utils/types/Either.java b/fluss-common/src/main/java/org/apache/fluss/utils/types/Either.java index 39f1c5260c..801401d33d 100644 --- a/fluss-common/src/main/java/org/apache/fluss/utils/types/Either.java +++ b/fluss-common/src/main/java/org/apache/fluss/utils/types/Either.java @@ -60,16 +60,12 @@ public static Either right(R value) { */ public abstract R right() throws IllegalStateException; - /** - * @return true if this is a Left value, false if this is a Right value - */ + /** @return true if this is a Left value, false if this is a Right value */ public final boolean isLeft() { return getClass() == Left.class; } - /** - * @return true if this is a Right value, false if this is a Left value - */ + /** @return true if this is a Right value, false if this is a Left value */ public final boolean isRight() { return getClass() == Right.class; } diff --git a/fluss-common/src/test/java/org/apache/fluss/security/auth/AuthenticationFactoryTest.java b/fluss-common/src/test/java/org/apache/fluss/security/auth/AuthenticationFactoryTest.java index c71edeca0d..7bc19d6dfc 100644 --- a/fluss-common/src/test/java/org/apache/fluss/security/auth/AuthenticationFactoryTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/security/auth/AuthenticationFactoryTest.java @@ -80,8 +80,7 @@ void testIdentifierCaseInsensitive() { .isInstanceOf(TestIdentifierClientAuthenticator.class); assertThat( AuthenticationFactory.loadServerAuthenticatorSuppliers(configuration) - .values() - .stream() + .values().stream() .findAny() .get() .get()) @@ -94,8 +93,7 @@ void testIdentifierCaseInsensitive() { .isInstanceOf(TestIdentifierClientAuthenticator.class); assertThat( AuthenticationFactory.loadServerAuthenticatorSuppliers(configuration) - .values() - .stream() + .values().stream() .findAny() .get() .get()) diff --git a/fluss-filesystems/fluss-fs-gs/src/test/java/org/apache/fluss/fs/gs/GSFileSystemBehaviorITCase.java b/fluss-filesystems/fluss-fs-gs/src/test/java/org/apache/fluss/fs/gs/GSFileSystemBehaviorITCase.java index b1902c2bbb..58298c1fdb 100644 --- a/fluss-filesystems/fluss-fs-gs/src/test/java/org/apache/fluss/fs/gs/GSFileSystemBehaviorITCase.java +++ b/fluss-filesystems/fluss-fs-gs/src/test/java/org/apache/fluss/fs/gs/GSFileSystemBehaviorITCase.java @@ -93,7 +93,8 @@ private static void applyInMemoryStorage(FileSystem fileSystem) throws IOExcepti GoogleCloudStorageFileSystem googleCloudStorageFileSystem = new GoogleCloudStorageFileSystem( googleCloudStorageOptions -> inMemoryGoogleCloudStorage, - GoogleCloudStorageFileSystemOptions.DEFAULT.toBuilder() + GoogleCloudStorageFileSystemOptions.DEFAULT + .toBuilder() .setCloudStorageOptions(inMemoryGoogleCloudStorage.getOptions()) .build()); diff --git a/fluss-filesystems/fluss-fs-s3/src/main/java/com/amazonaws/services/s3/model/transform/XmlResponsesSaxParser.java b/fluss-filesystems/fluss-fs-s3/src/main/java/com/amazonaws/services/s3/model/transform/XmlResponsesSaxParser.java index 6decac572d..9502cf04c4 100644 --- a/fluss-filesystems/fluss-fs-s3/src/main/java/com/amazonaws/services/s3/model/transform/XmlResponsesSaxParser.java +++ b/fluss-filesystems/fluss-fs-s3/src/main/java/com/amazonaws/services/s3/model/transform/XmlResponsesSaxParser.java @@ -924,16 +924,12 @@ public static class ListAllMyBucketsHandler extends AbstractHandler { private Bucket currentBucket = null; - /** - * @return the buckets listed in the document. - */ + /** @return the buckets listed in the document. */ public List getBuckets() { return buckets; } - /** - * @return the owner of the buckets. - */ + /** @return the owner of the buckets. */ public Owner getOwner() { return bucketsOwner; } @@ -990,9 +986,7 @@ public static class AccessControlListHandler extends AbstractHandler { private Grantee currentGrantee = null; private Permission currentPermission = null; - /** - * @return an object representing the ACL document. - */ + /** @return an object representing the ACL document. */ public AccessControlList getAccessControlList() { return accessControlList; } @@ -1073,9 +1067,7 @@ public static class BucketLoggingConfigurationHandler extends AbstractHandler { private final BucketLoggingConfiguration bucketLoggingConfiguration = new BucketLoggingConfiguration(); - /** - * @return an object representing the bucket's LoggingStatus document. - */ + /** @return an object representing the bucket's LoggingStatus document. */ public BucketLoggingConfiguration getBucketLoggingConfiguration() { return bucketLoggingConfiguration; } @@ -1105,9 +1097,7 @@ public static class BucketLocationHandler extends AbstractHandler { private String location = null; - /** - * @return the bucket's location. - */ + /** @return the bucket's location. */ public String getLocation() { return location; } diff --git a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/flink/api/connector/sink2/Sink.java b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/flink/api/connector/sink2/Sink.java index 8cab1cb533..5dfc628acc 100644 --- a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/flink/api/connector/sink2/Sink.java +++ b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/flink/api/connector/sink2/Sink.java @@ -53,9 +53,7 @@ interface InitContext { */ MailboxExecutor getMailboxExecutor(); - /** - * @return The metric group this writer belongs to. - */ + /** @return The metric group this writer belongs to. */ SinkWriterMetricGroup metricGroup(); } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java index 7dd6511581..a71408bdc1 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java @@ -26,7 +26,6 @@ import org.apache.fluss.exception.InvalidTableException; import org.apache.fluss.flink.lake.LakeCatalog; import org.apache.fluss.flink.procedure.ProcedureManager; -import org.apache.fluss.flink.utils.CatalogExceptionUtils; import org.apache.fluss.flink.utils.DataLakeUtils; import org.apache.fluss.flink.utils.FlinkConversions; import org.apache.fluss.metadata.DatabaseDescriptor; @@ -35,6 +34,7 @@ import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.utils.CatalogExceptionUtils; import org.apache.fluss.utils.ExceptionUtils; import org.apache.fluss.utils.IOUtils; @@ -82,13 +82,13 @@ import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.fluss.config.ConfigOptions.BOOTSTRAP_SERVERS; -import static org.apache.fluss.flink.utils.CatalogExceptionUtils.isPartitionAlreadyExists; -import static org.apache.fluss.flink.utils.CatalogExceptionUtils.isPartitionInvalid; -import static org.apache.fluss.flink.utils.CatalogExceptionUtils.isPartitionNotExist; -import static org.apache.fluss.flink.utils.CatalogExceptionUtils.isTableInvalid; -import static org.apache.fluss.flink.utils.CatalogExceptionUtils.isTableNotExist; -import static org.apache.fluss.flink.utils.CatalogExceptionUtils.isTableNotPartitioned; import static org.apache.fluss.flink.utils.FlinkConversions.toFlussDatabase; +import static org.apache.fluss.utils.CatalogExceptionUtils.isPartitionAlreadyExists; +import static org.apache.fluss.utils.CatalogExceptionUtils.isPartitionInvalid; +import static org.apache.fluss.utils.CatalogExceptionUtils.isPartitionNotExist; +import static org.apache.fluss.utils.CatalogExceptionUtils.isTableInvalid; +import static org.apache.fluss.utils.CatalogExceptionUtils.isTableNotExist; +import static org.apache.fluss.utils.CatalogExceptionUtils.isTableNotPartitioned; /** * A Flink Catalog for fluss. diff --git a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/utils/IcebergConversionsTest.java b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/utils/IcebergConversionsTest.java index 9d18e9d5c9..2972ce3df6 100644 --- a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/utils/IcebergConversionsTest.java +++ b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/utils/IcebergConversionsTest.java @@ -44,8 +44,7 @@ import static org.assertj.core.api.Assertions.assertThat; /** UT for {@link IcebergConversions}. */ -class IcebergConversionsTest { - ; +class IcebergConversionsTest {; @Test void testToPartition(@TempDir File tempWarehouseDir) { diff --git a/fluss-protogen/fluss-protogen-tests/src/test/java/org/apache/fluss/protogen/tests/BytesTest.java b/fluss-protogen/fluss-protogen-tests/src/test/java/org/apache/fluss/protogen/tests/BytesTest.java index b2ccc11872..393ee3c80d 100644 --- a/fluss-protogen/fluss-protogen-tests/src/test/java/org/apache/fluss/protogen/tests/BytesTest.java +++ b/fluss-protogen/fluss-protogen-tests/src/test/java/org/apache/fluss/protogen/tests/BytesTest.java @@ -62,7 +62,8 @@ public void testBytes() throws Exception { // test binary equals to protobuf Bytes.B pbb = - Bytes.B.newBuilder() + Bytes.B + .newBuilder() .setPayload(ByteString.copyFrom(new byte[] {1, 2, 3, 4, 5})) .build(); @@ -254,7 +255,8 @@ public void testRepeatedBytes() throws Exception { assertThat(lpb.getExtraItemAt(1)).isEqualTo(new byte[] {4, 5, 6, 7}); Bytes.B pbb = - Bytes.B.newBuilder() + Bytes.B + .newBuilder() .addExtraItems(ByteString.copyFrom(new byte[] {1, 2, 3})) .addExtraItems(ByteString.copyFrom(new byte[] {4, 5, 6, 7})) .build(); diff --git a/fluss-protogen/fluss-protogen-tests/src/test/java/org/apache/fluss/protogen/tests/StringsTest.java b/fluss-protogen/fluss-protogen-tests/src/test/java/org/apache/fluss/protogen/tests/StringsTest.java index b93590e1a1..d8b7abcc27 100644 --- a/fluss-protogen/fluss-protogen-tests/src/test/java/org/apache/fluss/protogen/tests/StringsTest.java +++ b/fluss-protogen/fluss-protogen-tests/src/test/java/org/apache/fluss/protogen/tests/StringsTest.java @@ -58,7 +58,8 @@ public void testStrings() throws Exception { assertThat(lps.getNameAt(2)).isEqualTo("c"); Strings.S pbs = - Strings.S.newBuilder() + Strings.S + .newBuilder() .setId("id") .addNames("a") .addNames("b") diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/NettyUtils.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/NettyUtils.java index b6418e6dc1..40ab37711f 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/NettyUtils.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/NettyUtils.java @@ -36,9 +36,7 @@ /** Utils of netty. */ public class NettyUtils { - /** - * @return an EventLoopGroup suitable for the current platform - */ + /** @return an EventLoopGroup suitable for the current platform */ public static EventLoopGroup newEventLoopGroup(int nThreads, String threadNamePrefix) { if (Epoll.isAvailable()) { // Regular Epoll based event loop diff --git a/fluss-server/src/main/java/org/apache/fluss/server/log/LogSegments.java b/fluss-server/src/main/java/org/apache/fluss/server/log/LogSegments.java index 1a0c9a187e..fa1c9a384e 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogSegments.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogSegments.java @@ -175,30 +175,22 @@ public Optional floorSegment(long offset) { return floorEntry(offset).map(Map.Entry::getValue); } - /** - * @return the entry associated with the greatest offset, if it exists. - */ + /** @return the entry associated with the greatest offset, if it exists. */ public Optional> lastEntry() { return Optional.ofNullable(segments.lastEntry()); } - /** - * @return the log segment with the greatest offset, if it exists. - */ + /** @return the log segment with the greatest offset, if it exists. */ public Optional lastSegment() { return Optional.ofNullable(segments.lastEntry()).map(Map.Entry::getValue); } - /** - * @return the entry associated with the greatest offset, if it exists. - */ + /** @return the entry associated with the greatest offset, if it exists. */ public Optional> firstEntry() { return Optional.ofNullable(segments.firstEntry()); } - /** - * @return the log segment with the greatest offset, if it exists. - */ + /** @return the log segment with the greatest offset, if it exists. */ public Optional firstSegment() { return Optional.ofNullable(segments.firstEntry()).map(Map.Entry::getValue); } diff --git a/fluss-spark/fluss-spark-3.3/pom.xml b/fluss-spark/fluss-spark-3.3/pom.xml new file mode 100644 index 0000000000..04e69c7a98 --- /dev/null +++ b/fluss-spark/fluss-spark-3.3/pom.xml @@ -0,0 +1,85 @@ + + + + + 4.0.0 + + org.apache.fluss + fluss-spark + 0.8-SNAPSHOT + + + fluss-spark-3.3 + + Fluss : Engine Spark : 3.3 + + + 3.3.3 + + + + + org.apache.spark + spark-sql_${scala.binary.version} + ${spark.version} + provided + + + + org.apache.spark + spark-core_${scala.binary.version} + ${spark.version} + provided + + + + org.apache.fluss + fluss-spark-common + ${project.version} + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + shade-fluss + package + + shade + + + + + org.apache.fluss:fluss-spark-common + org.apache.fluss:fluss-client + + + + + + + + + \ No newline at end of file diff --git a/fluss-spark/fluss-spark-common/pom.xml b/fluss-spark/fluss-spark-common/pom.xml new file mode 100644 index 0000000000..7b8fb8a8a2 --- /dev/null +++ b/fluss-spark/fluss-spark-common/pom.xml @@ -0,0 +1,121 @@ + + + + + 4.0.0 + + org.apache.fluss + fluss-spark + 0.8-SNAPSHOT + + + fluss-spark-common + Fluss : Engine Spark : Common + + + 3.5.0 + + + + + org.apache.fluss + fluss-client + ${project.version} + + + + + org.apache.spark + spark-sql_${scala.binary.version} + ${spark.version} + + + + org.apache.spark + spark-core_${scala.binary.version} + ${spark.version} + + + + + + + org.apache.spark + spark-sql_${scala.binary.version} + ${spark.version} + tests + test + + + + org.apache.spark + spark-catalyst_${scala.binary.version} + ${spark.version} + tests + test + + + + org.apache.spark + spark-core_${scala.binary.version} + ${spark.version} + tests + test + + + + org.apache.spark + spark-hive_${scala.binary.version} + ${spark.version} + test + + + + org.apache.spark + spark-avro_${scala.binary.version} + ${spark.version} + test + + + + org.apache.curator + curator-test + ${curator.version} + test + + + + org.apache.fluss + fluss-common + ${project.version} + test + test-jar + + + org.apache.fluss + fluss-common + 0.8-SNAPSHOT + compile + + + + + \ No newline at end of file diff --git a/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/SparkConnectorOptions.java b/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/SparkConnectorOptions.java new file mode 100644 index 0000000000..2944b292ee --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/SparkConnectorOptions.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark; + +import org.apache.fluss.config.ConfigOption; +import org.apache.fluss.config.FlussConfigUtils; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.fluss.config.ConfigBuilder.key; + +/** Options for spark connector. */ +public class SparkConnectorOptions { + + public static final String MOCK_SYSTEM_TIME = "_mockSystemTime"; + + public static final ConfigOption MAX_TRIGGER_DELAY = + key("max.trigger.delay") + .durationType() + .defaultValue(Duration.ofMinutes(15)) + .withDescription( + "The maximum delay between two adjacent batches, which used to create MinRowsReadLimit with min.offset.per.trigger together.Note:Kv table not support it."); + + public static final ConfigOption MAX_OFFSET_PER_TRIGGER = + key("max.offset.per.trigger") + .longType() + .noDefaultValue() + .withDescription("The maximum number of rows returned in a single batch."); + + public static final ConfigOption MIN_OFFSET_PER_TRIGGER = + key("min.offset.per.trigger") + .longType() + .noDefaultValue() + .withDescription( + "The minimum number of rows returned in a single batch, which used to create MinRowsReadLimit with max.trigger.delay together."); + + public static final ConfigOption BUCKET_NUMBER = + key("bucket.num") + .intType() + .noDefaultValue() + .withDescription("The number of buckets of a Fluss table."); + + public static final ConfigOption BUCKET_KEY = + key("bucket.key") + .stringType() + .noDefaultValue() + .withDescription( + "Specific the distribution policy of the Fluss table. " + + "Data will be distributed to each bucket according to the hash value of bucket-key. " + + "If you specify multiple fields, delimiter is ','. " + + "If the table is with primary key, you can't specific bucket key currently. " + + "The bucket keys will always be the primary key. " + + "If the table is not with primary key, you can specific bucket key, and when the bucket key is not specified, " + + "the data will be distributed to each bucket randomly."); + + public static final ConfigOption BOOTSTRAP_SERVERS = + key("bootstrap.servers") + .stringType() + .noDefaultValue() + .withDescription( + "A list of host/port pairs to use for establishing the initial connection to the Fluss cluster. " + + "The list should be in the form host1:port1,host2:port2,...."); + + public static final ConfigOption PRIMARY_KEY = + key("primary.key") + .stringType() + .noDefaultValue() + .withDescription("the primary key of fluss table, such as key1,key2,..."); + + public static final ConfigOption TMP_DIRS = + key("io.tmp.dirs") + .stringType() + .defaultValue(System.getProperty("java.io.tmpdir")) + .withDeprecatedKeys("taskmanager.tmp.dirs") + .withDescription( + "Directories for temporary files, separated by\",\", \"|\", or the system's java.io.File.pathSeparator."); + + // -------------------------------------------------------------------------------------------- + // Lookup specific options + // -------------------------------------------------------------------------------------------- + + public static final ConfigOption LOOKUP_ASYNC = + key("lookup.async") + .booleanType() + .defaultValue(true) + .withDescription("Whether to set async lookup. Default is true."); + + // -------------------------------------------------------------------------------------------- + // Scan specific options + // -------------------------------------------------------------------------------------------- + + public static final ConfigOption SCAN_STARTUP_MODE = + key("scan.startup.mode") + .enumType(ScanStartupMode.class) + .defaultValue(ScanStartupMode.FULL) + .withDescription( + "Optional startup mode for Fluss source. Default is 'initial'."); + + public static final ConfigOption SCAN_STARTUP_TIMESTAMP = + key("scan.startup.timestamp") + .stringType() + .noDefaultValue() + .withDescription( + "Optional timestamp for Fluss source in case of startup mode is timestamp. " + + "The format is 'timestamp' or 'yyyy-MM-dd HH:mm:ss'. " + + "Like '1678883047356' or '2023-12-09 23:09:12'."); + + public static final ConfigOption SCAN_PARTITION_DISCOVERY_INTERVAL = + key("scan.partition.discovery.interval") + .durationType() + .defaultValue(Duration.ofSeconds(10)) + .withDescription( + "The interval in milliseconds for the Fluss source to discover " + + "the new partitions for partitioned table while scanning." + + " A non-positive value disables the partition discovery."); + + // -------------------------------------------------------------------------------------------- + // table storage specific options + // -------------------------------------------------------------------------------------------- + + public static final List> TABLE_OPTIONS = + new ArrayList<>(FlussConfigUtils.TABLE_OPTIONS.values()); + + // -------------------------------------------------------------------------------------------- + // client specific options + // -------------------------------------------------------------------------------------------- + + public static final List> CLIENT_OPTIONS = + new ArrayList<>(FlussConfigUtils.CLIENT_OPTIONS.values()); + + // ------------------------------------------------------------------------------------------ + + /** Startup mode for the fluss scanner, see {@link #SCAN_STARTUP_MODE}. */ + public enum ScanStartupMode { + FULL( + "full", + "Performs a full snapshot on the table upon first startup, " + + "and continue to read the latest changelog with exactly once guarantee. " + + "If the table to read is a log table, the full snapshot means " + + "reading from earliest log offset. If the table to read is a primary key table, " + + "the full snapshot means reading a latest snapshot which " + + "materializes all changes on the table."), + EARLIEST("earliest", ("Start reading logs from the earliest offset.")), + LATEST("latest", ("Start reading logs from the latest offset.")), + TIMESTAMP("timestamp", ("Start reading logs from user-supplied timestamp.")); + + private final String value; + private final String description; + + ScanStartupMode(String value, String description) { + this.value = value; + this.description = description; + } + + @Override + public String toString() { + return value; + } + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/SparkInternalRow.java b/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/SparkInternalRow.java new file mode 100644 index 0000000000..b0c028bd23 --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/SparkInternalRow.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark; + +import org.apache.fluss.row.BinaryString; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.row.TimestampLtz; +import org.apache.fluss.row.TimestampNtz; +import org.apache.fluss.types.BigIntType; +import org.apache.fluss.types.DataType; +import org.apache.fluss.types.DataTypeChecks; +import org.apache.fluss.types.RowType; + +import org.apache.spark.sql.catalyst.util.ArrayData; +import org.apache.spark.sql.catalyst.util.MapData; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.unsafe.types.CalendarInterval; +import org.apache.spark.unsafe.types.UTF8String; + +import static org.apache.fluss.utils.InternalRowUtils.copyInternalRow; + +/** Spark {@link org.apache.spark.sql.catalyst.InternalRow} to wrap {@link InternalRow}. */ +public class SparkInternalRow extends org.apache.spark.sql.catalyst.InternalRow { + + private final RowType rowType; + + private InternalRow row; + + public SparkInternalRow(RowType rowType) { + this.rowType = rowType; + } + + public SparkInternalRow replace(InternalRow row) { + this.row = row; + return this; + } + + @Override + public int numFields() { + return row.getFieldCount(); + } + + @Override + public void setNullAt(int i) { + throw new UnsupportedOperationException(); + } + + @Override + public void update(int i, Object value) { + throw new UnsupportedOperationException(); + } + + @Override + public org.apache.spark.sql.catalyst.InternalRow copy() { + return new SparkInternalRow(rowType).replace(copyInternalRow(row, rowType)); + } + + @Override + public boolean isNullAt(int ordinal) { + return row.isNullAt(ordinal); + } + + @Override + public boolean getBoolean(int ordinal) { + return row.getBoolean(ordinal); + } + + @Override + public byte getByte(int ordinal) { + return row.getByte(ordinal); + } + + @Override + public short getShort(int ordinal) { + return row.getShort(ordinal); + } + + @Override + public int getInt(int ordinal) { + return row.getInt(ordinal); + } + + @Override + public long getLong(int ordinal) { + if (rowType.getTypeAt(ordinal) instanceof BigIntType) { + return row.getLong(ordinal); + } + + return getTimestampMicros(ordinal); + } + + private long getTimestampMicros(int ordinal) { + DataType type = rowType.getTypeAt(ordinal); + if (type instanceof org.apache.fluss.types.TimestampType) { + return fromFluss(row.getTimestampNtz(ordinal, DataTypeChecks.getPrecision(type))); + } else { + return fromFluss(row.getTimestampLtz(ordinal, DataTypeChecks.getPrecision(type))); + } + } + + @Override + public float getFloat(int ordinal) { + return row.getFloat(ordinal); + } + + @Override + public double getDouble(int ordinal) { + return row.getDouble(ordinal); + } + + @Override + public Decimal getDecimal(int ordinal, int precision, int scale) { + org.apache.fluss.row.Decimal decimal = row.getDecimal(ordinal, precision, scale); + return fromFluss(decimal); + } + + @Override + public UTF8String getUTF8String(int ordinal) { + return fromFluss(row.getString(ordinal)); + } + + @Override + public byte[] getBinary(int ordinal) { + return row.getBinary(ordinal, -1); + } + + @Override + public CalendarInterval getInterval(int ordinal) { + throw new UnsupportedOperationException(); + } + + @Override + public org.apache.spark.sql.catalyst.InternalRow getStruct(int ordinal, int numFields) { + // todo: support struct + return null; + } + + @Override + public ArrayData getArray(int ordinal) { + // TODO, fluss support array type + // https://github.com/alibaba/fluss/issues/168 + return null; + } + + @Override + public MapData getMap(int ordinal) { + // TODO, fluss support map type + https: // github.com/alibaba/fluss/issues/169 + return null; + } + + @Override + public Object get(int ordinal, org.apache.spark.sql.types.DataType dataType) { + return SpecializedGettersReader.read(this, ordinal, dataType); + } + + public static Object fromFluss(Object o, DataType type) { + if (o == null) { + return null; + } + switch (type.getTypeRoot()) { + case TIMESTAMP_WITHOUT_TIME_ZONE: + return fromFluss((TimestampNtz) o); + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return fromFluss((TimestampLtz) o); + case CHAR: + case STRING: + return fromFluss((BinaryString) o); + case DECIMAL: + return fromFluss((org.apache.fluss.row.Decimal) o); + case ROW: + return fromFluss((InternalRow) o, (RowType) type); + default: + return o; + } + } + + public static UTF8String fromFluss(BinaryString string) { + return UTF8String.fromBytes(string.toBytes()); + } + + public static Decimal fromFluss(org.apache.fluss.row.Decimal decimal) { + return Decimal.apply(decimal.toBigDecimal()); + } + + public static org.apache.spark.sql.catalyst.InternalRow fromFluss( + InternalRow row, RowType rowType) { + return new SparkInternalRow(rowType).replace(row); + } + + public static long fromFluss(TimestampNtz timestamp) { + return timestamp.toMicros(); + } + + public static long fromFluss(TimestampLtz timestamp) { + return timestamp.toEpochMicros(); + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/SparkRow.java b/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/SparkRow.java new file mode 100644 index 0000000000..d93f2b3ada --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/SparkRow.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark; + +import org.apache.fluss.row.BinaryString; +import org.apache.fluss.row.Decimal; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.row.TimestampLtz; +import org.apache.fluss.row.TimestampNtz; +import org.apache.fluss.types.DateType; +import org.apache.fluss.types.RowType; +import org.apache.fluss.utils.DateTimeUtils; + +import org.apache.spark.sql.Row; + +import java.io.Serializable; +import java.sql.Date; +import java.sql.Timestamp; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; + +/** A {@link InternalRow} wraps spark {@link Row}. */ +public class SparkRow implements InternalRow, Serializable { + + private final RowType type; + private final Row row; + + public SparkRow(RowType type, Row row) { + this.type = type; + this.row = row; + } + + @Override + public int getFieldCount() { + return type.getFieldCount(); + } + + @Override + public boolean isNullAt(int i) { + return row.isNullAt(i); + } + + @Override + public boolean getBoolean(int i) { + return row.getBoolean(i); + } + + @Override + public byte getByte(int i) { + return row.getByte(i); + } + + @Override + public short getShort(int i) { + return row.getShort(i); + } + + @Override + public int getInt(int i) { + if (type.getTypeAt(i) instanceof DateType) { + return toFlussDate(row.get(i)); + } + return row.getInt(i); + } + + @Override + public long getLong(int i) { + return row.getLong(i); + } + + @Override + public float getFloat(int i) { + return row.getFloat(i); + } + + @Override + public double getDouble(int i) { + return row.getDouble(i); + } + + @Override + public BinaryString getChar(int pos, int length) { + return BinaryString.fromString(row.getString(pos).substring(0, length)); + } + + @Override + public BinaryString getString(int i) { + return BinaryString.fromString(row.getString(i)); + } + + @Override + public Decimal getDecimal(int i, int precision, int scale) { + return Decimal.fromBigDecimal(row.getDecimal(i), precision, scale); + } + + @Override + public TimestampNtz getTimestampNtz(int pos, int precision) { + Object object = row.get(pos); + if (object instanceof java.sql.Timestamp) { + java.sql.Timestamp ts = (java.sql.Timestamp) object; + return TimestampNtz.fromLocalDateTime(ts.toLocalDateTime()); + } else if (object instanceof LocalDateTime) { + return TimestampNtz.fromLocalDateTime((LocalDateTime) object); + } else { + throw new UnsupportedOperationException( + "Unsupported type for TimestampNtz: " + object.getClass().getName()); + } + } + + @Override + public TimestampLtz getTimestampLtz(int pos, int precision) { + Object object = row.get(pos); + if (object instanceof java.time.Instant) { + Instant instant = (Instant) object; + return TimestampLtz.fromInstant(instant); + } else if (object instanceof Timestamp) { + return TimestampLtz.fromEpochMillis( + ((Timestamp) object).getTime(), ((Timestamp) object).getNanos()); + } else { + throw new UnsupportedOperationException( + "Unsupported type for TimestampLtz: " + object.getClass().getName()); + } + } + + @Override + public byte[] getBinary(int pos, int length) { + return new byte[0]; + } + + @Override + public byte[] getBytes(int pos) { + return new byte[0]; + } + + private static int toFlussDate(Object object) { + if (object instanceof Date) { + return DateTimeUtils.toInternal((Date) object); + } else { + return DateTimeUtils.toInternal((LocalDate) object); + } + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/SpecializedGettersReader.java b/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/SpecializedGettersReader.java new file mode 100644 index 0000000000..1053faf132 --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/SpecializedGettersReader.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark; + +import org.apache.spark.sql.catalyst.expressions.SpecializedGetters; +import org.apache.spark.sql.types.BinaryType; +import org.apache.spark.sql.types.BooleanType; +import org.apache.spark.sql.types.ByteType; +import org.apache.spark.sql.types.CalendarIntervalType; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DateType; +import org.apache.spark.sql.types.DecimalType; +import org.apache.spark.sql.types.DoubleType; +import org.apache.spark.sql.types.FloatType; +import org.apache.spark.sql.types.IntegerType; +import org.apache.spark.sql.types.LongType; +import org.apache.spark.sql.types.NullType; +import org.apache.spark.sql.types.ShortType; +import org.apache.spark.sql.types.StringType; +import org.apache.spark.sql.types.TimestampType; + +/** + * Reader of Spark {@link SpecializedGetters}. Copied from Spark to avoid conflict between Spark2 + * and Spark3 . + */ +public final class SpecializedGettersReader { + + private SpecializedGettersReader() {} + + public static Object read(SpecializedGetters obj, int ordinal, DataType dataType) { + if (obj.isNullAt(ordinal) || dataType instanceof NullType) { + return null; + } + if (dataType instanceof BooleanType) { + return obj.getBoolean(ordinal); + } + if (dataType instanceof ByteType) { + return obj.getByte(ordinal); + } + if (dataType instanceof ShortType) { + return obj.getShort(ordinal); + } + if (dataType instanceof IntegerType) { + return obj.getInt(ordinal); + } + if (dataType instanceof LongType) { + return obj.getLong(ordinal); + } + if (dataType instanceof FloatType) { + return obj.getFloat(ordinal); + } + if (dataType instanceof DoubleType) { + return obj.getDouble(ordinal); + } + if (dataType instanceof StringType) { + return obj.getUTF8String(ordinal); + } + if (dataType instanceof DecimalType) { + DecimalType dt = (DecimalType) dataType; + return obj.getDecimal(ordinal, dt.precision(), dt.scale()); + } + if (dataType instanceof DateType) { + return obj.getInt(ordinal); + } + if (dataType instanceof TimestampType) { + return obj.getLong(ordinal); + } + if (dataType instanceof CalendarIntervalType) { + return obj.getInterval(ordinal); + } + if (dataType instanceof BinaryType) { + return obj.getBinary(ordinal); + } + + throw new UnsupportedOperationException("Unsupported data type " + dataType.simpleString()); + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/TableBucketInfo.java b/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/TableBucketInfo.java new file mode 100644 index 0000000000..026d61ce3e --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/TableBucketInfo.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark; + +import org.apache.fluss.metadata.TableBucket; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.Objects; + +/** Table bucket info. */ +public class TableBucketInfo implements Serializable { + private final TableBucket tableBucket; + private final String partitionName; + private Long snapshotId; + + public TableBucketInfo() { + this(null, null, null); + } + + public TableBucketInfo( + TableBucket tableBucket, @Nullable String partitionName, @Nullable Long snapshotId) { + this.tableBucket = tableBucket; + this.partitionName = partitionName; + this.snapshotId = snapshotId; + } + + public TableBucketInfo(TableBucket tableBucket) { + this(tableBucket, null, null); + } + + public TableBucketInfo(TableBucket tableBucket, String partitionName) { + this(tableBucket, partitionName, null); + } + + public boolean isBatch() { + return snapshotId != null; + } + + public TableBucket getTableBucket() { + return tableBucket; + } + + public String getPartitionName() { + return partitionName; + } + + public Long getSnapshotId() { + return snapshotId; + } + + public void setSnapshotId(Long snapshotId) { + this.snapshotId = snapshotId; + } + + @Override + public String toString() { + return "TableBucketInfo{" + + "tableBucket=" + + tableBucket + + ", partitionName='" + + partitionName + + '\'' + + ", snapshotId=" + + snapshotId + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TableBucketInfo that = (TableBucketInfo) o; + return Objects.equals(tableBucket, that.tableBucket) + && Objects.equals(partitionName, that.partitionName) + && Objects.equals(snapshotId, that.snapshotId); + } + + @Override + public int hashCode() { + return Objects.hash(tableBucket, partitionName, snapshotId); + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/catalog/SparkCatalog.java b/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/catalog/SparkCatalog.java new file mode 100644 index 0000000000..58677ed687 --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/catalog/SparkCatalog.java @@ -0,0 +1,454 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.catalog; + +import org.apache.fluss.client.Connection; +import org.apache.fluss.client.ConnectionFactory; +import org.apache.fluss.client.admin.Admin; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.exception.PartitionNotExistException; +import org.apache.fluss.exception.TableNotExistException; +import org.apache.fluss.exception.TableNotPartitionedException; +import org.apache.fluss.metadata.DatabaseDescriptor; +import org.apache.fluss.metadata.PartitionInfo; +import org.apache.fluss.metadata.PartitionSpec; +import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.metadata.TableInfo; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.spark.SparkTable; +import org.apache.fluss.spark.exception.CatalogException; +import org.apache.fluss.utils.CatalogExceptionUtils; +import org.apache.fluss.utils.ExceptionUtils; +import org.apache.fluss.utils.IOUtils; + +import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException; +import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException; +import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException; +import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException; +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; +import org.apache.spark.sql.connector.catalog.FunctionCatalog; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.NamespaceChange; +import org.apache.spark.sql.connector.catalog.SupportsNamespaces; +import org.apache.spark.sql.connector.catalog.Table; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.catalog.TableChange; +import org.apache.spark.sql.connector.catalog.functions.UnboundFunction; +import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +import java.io.Closeable; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.fluss.spark.utils.SparkConversions.toFlussClientConfig; +import static org.apache.fluss.spark.utils.SparkConversions.toFlussTable; +import static org.apache.fluss.utils.CatalogExceptionUtils.isPartitionAlreadyExists; +import static org.apache.fluss.utils.CatalogExceptionUtils.isPartitionInvalid; +import static org.apache.fluss.utils.CatalogExceptionUtils.isPartitionNotExist; +import static org.apache.fluss.utils.CatalogExceptionUtils.isTableNotExist; +import static org.apache.fluss.utils.CatalogExceptionUtils.isTableNotPartitioned; +import static org.apache.fluss.utils.Preconditions.checkArgument; + +/** A Spark Catalog for Fluss. */ +public class SparkCatalog implements SupportsNamespaces, FunctionCatalog, TableCatalog, Closeable { + + private static final String[] DEFAULT_NAMESPACE = new String[] {"fluss"}; + + private String catalogName; + private Connection connection; + private Admin admin; + private Configuration flussConfigs; + + @Override + public void initialize(String name, CaseInsensitiveStringMap options) { + this.catalogName = name; + this.flussConfigs = toFlussClientConfig(options); + connection = ConnectionFactory.createConnection(flussConfigs); + admin = connection.getAdmin(); + } + + @Override + public String[] defaultNamespace() { + return DEFAULT_NAMESPACE; + } + + @Override + public String name() { + return this.catalogName; + } + + @Override + public boolean namespaceExists(String[] namespace) { + isValidateNamespace(namespace); + try { + return admin.databaseExists(namespace[0]).get(); + } catch (Exception e) { + throw new CatalogException( + String.format("Failed to check if database %s exists in %s", namespace, name()), + e); + } + } + + @Override + public String[][] listNamespaces() { + try { + List databases = admin.listDatabases().get(); + String[][] namespaces = new String[databases.size()][]; + + for (int i = 0; i < databases.size(); ++i) { + namespaces[i] = new String[] {databases.get(i)}; + } + + return namespaces; + } catch (Exception e) { + throw new CatalogException( + String.format("Failed to list all databases in %s", name()), e); + } + } + + @Override + public String[][] listNamespaces(String[] namespace) throws NoSuchNamespaceException { + if (namespace.length == 0) { + return listNamespaces(); + } else { + isValidateNamespace(namespace); + if (namespaceExists(namespace)) { + return new String[0][]; + } + throw new NoSuchNamespaceException(namespace); + } + } + + @Override + public Map loadNamespaceMetadata(String[] namespace) + throws NoSuchNamespaceException { + isValidateNamespace(namespace); + if (namespaceExists(namespace)) { + return Collections.emptyMap(); + } + throw new NoSuchNamespaceException(namespace); + } + + @Override + public void createNamespace(String[] namespace, Map metadata) + throws NamespaceAlreadyExistsException { + isValidateNamespace(namespace); + try { + admin.createDatabase( + namespace[0], + DatabaseDescriptor.builder().customProperties(metadata).build(), + false) + .get(); + } catch (Exception e) { + Throwable t = ExceptionUtils.stripExecutionException(e); + if (CatalogExceptionUtils.isDatabaseAlreadyExist(t)) { + throw new NamespaceAlreadyExistsException(namespace); + } else { + throw new CatalogException( + String.format("Failed to create database %s in %s", namespace, name()), t); + } + } + } + + @Override + public void alterNamespace(String[] namespace, NamespaceChange... changes) + throws NoSuchNamespaceException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean dropNamespace(String[] namespace, boolean cascade) + throws NoSuchNamespaceException, NonEmptyNamespaceException { + isValidateNamespace(namespace); + try { + admin.dropDatabase(namespace[0], false, cascade).get(); + return true; + } catch (Exception e) { + Throwable t = ExceptionUtils.stripExecutionException(e); + if (CatalogExceptionUtils.isDatabaseNotExist(t)) { + throw new NoSuchNamespaceException(namespace); + } else if (CatalogExceptionUtils.isDatabaseNotEmpty(t)) { + throw new NonEmptyNamespaceException(namespace); + } else { + throw new CatalogException( + String.format("Failed to drop database %s in %s", namespace, name()), t); + } + } + } + + @Override + public Table loadTable(Identifier ident, String version) throws NoSuchTableException { + throw new UnsupportedOperationException(); + } + + @Override + public Table loadTable(Identifier ident, long timestamp) throws NoSuchTableException { + throw new UnsupportedOperationException(); + } + + @Override + public void invalidateTable(Identifier ident) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean tableExists(Identifier ident) { + try { + return admin.tableExists(toTablePath(ident)).get(); + } catch (Exception e) { + throw new CatalogException( + String.format("Failed to check if table %s exists in %s", ident, name()), + ExceptionUtils.stripExecutionException(e)); + } + } + + @Override + public boolean purgeTable(Identifier ident) throws UnsupportedOperationException { + throw new UnsupportedOperationException(); + } + + @Override + public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceException { + isValidateNamespace(namespace); + try { + List tables = admin.listTables(namespace[0]).get(); + Identifier[] identifiers = new Identifier[tables.size()]; + for (int i = 0; i < tables.size(); i++) { + identifiers[i] = Identifier.of(namespace, tables.get(i)); + } + return identifiers; + } catch (Exception e) { + Throwable t = ExceptionUtils.stripExecutionException(e); + if (CatalogExceptionUtils.isDatabaseNotExist(t)) { + throw new NoSuchNamespaceException(namespace); + } + throw new CatalogException( + String.format( + "Failed to list all tables in database %s in %s", namespace, name()), + t); + } + } + + @Override + public Table loadTable(Identifier ident) throws NoSuchTableException { + try { + TableInfo tableInfo = admin.getTableInfo(toTablePath(ident)).get(); + return new SparkTable(this, flussConfigs, tableInfo); + } catch (Exception e) { + Throwable t = ExceptionUtils.stripExecutionException(e); + if (isTableNotExist(t)) { + throw new NoSuchTableException(ident); + } else { + throw new CatalogException( + String.format("Failed to get table %s in %s", ident, name()), t); + } + } + } + + @Override + public Table createTable( + Identifier ident, + StructType schema, + Transform[] partitions, + Map properties) + throws TableAlreadyExistsException, NoSuchNamespaceException { + try { + TableDescriptor tableDescriptor = toFlussTable(schema, partitions, properties); + TablePath tablePath = toTablePath(ident); + admin.createTable(tablePath, tableDescriptor, false).get(); + return loadTable(ident); + } catch (Exception e) { + Throwable t = ExceptionUtils.stripExecutionException(e); + if (CatalogExceptionUtils.isDatabaseNotExist(t)) { + throw new NoSuchNamespaceException(ident.namespace()); + } else if (CatalogExceptionUtils.isTableAlreadyExist(t)) { + throw new TableAlreadyExistsException(ident); + } else { + throw new CatalogException( + String.format("Failed to create table %s in %s", ident, name()), t); + } + } + } + + @Override + public Table alterTable(Identifier ident, TableChange... changes) throws NoSuchTableException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean dropTable(Identifier ident) { + try { + admin.dropTable(toTablePath(ident), false).get(); + return true; + } catch (Exception e) { + Throwable t = ExceptionUtils.stripExecutionException(e); + throw new CatalogException( + String.format("Failed to drop table %s in %s", ident, name()), t); + } + } + + @Override + public void renameTable(Identifier oldIdent, Identifier newIdent) + throws NoSuchTableException, TableAlreadyExistsException { + throw new UnsupportedOperationException(); + } + + public void createPartitions( + TablePath tablePath, PartitionSpec partitionSpec, Map properties) + throws PartitionsAlreadyExistException, UnsupportedOperationException { + + try { + admin.createPartition(tablePath, partitionSpec, true).get(); + } catch (Exception e) { + Throwable t = ExceptionUtils.stripExecutionException(e); + if (isTableNotExist(t)) { + throw new TableNotExistException("Table does not exist: " + tablePath); + } else if (isTableNotPartitioned(t)) { + throw new TableNotPartitionedException("Table is not partitioned: " + tablePath); + } else if (isPartitionInvalid(t)) { + List partitionKeys = null; + try { + TableInfo tableInfo = admin.getTableInfo(tablePath).get(); + partitionKeys = tableInfo.getPartitionKeys(); + } catch (Exception ee) { + // ignore. + } + if (partitionKeys == null) { + // throw general exception if getting partition keys failed. + throw new CatalogException( + String.format( + "PartitionSpec %s does not match partition keys of table %s in catalog %s.", + partitionSpec, tablePath, catalogName), + e); + } + + } else if (isPartitionAlreadyExists(t)) { + throw new PartitionsAlreadyExistException( + "Partition already exists: " + partitionSpec); + } else { + throw new CatalogException( + String.format( + "Failed to create partition with partition spec %s of table %s in %s", + partitionSpec, tablePath, catalogName), + t); + } + } + } + + public boolean dropPartition(TablePath tablePath, PartitionSpec partitionSpec) { + try { + admin.dropPartition(tablePath, partitionSpec, true).get(); + } catch (Exception e) { + Throwable t = ExceptionUtils.stripExecutionException(e); + if (isPartitionNotExist(t)) { + throw new PartitionNotExistException("Partition does not exist: " + partitionSpec); + } else { + throw new CatalogException( + String.format( + "Failed to drop partition with partition spec %s of table %s in %s", + partitionSpec, tablePath, catalogName), + t); + } + } + return true; + } + + public List> listPartitions(TablePath tablePath) + throws TableNotExistException, TableNotPartitionedException, CatalogException { + + try { + List partitionInfos = admin.listPartitionInfos(tablePath).get(); + return partitionInfos.stream() + .map(partitionInfo -> partitionInfo.getPartitionSpec().getSpecMap()) + .collect(Collectors.toList()); + } catch (Exception e) { + Throwable t = ExceptionUtils.stripExecutionException(e); + if (isTableNotExist(t)) { + throw new TableNotExistException("Table does not exist: " + tablePath, e); + } else if (isTableNotPartitioned(t)) { + throw new TableNotPartitionedException("Table is not partitioned: " + tablePath); + } else { + throw new CatalogException( + String.format( + "Failed to list partitions of table %s in %s", + tablePath, catalogName), + t); + } + } + } + + public boolean partitionExists(TablePath tablePath, PartitionSpec partitionSpec) + throws TableNotExistException, TableNotPartitionedException, CatalogException { + + try { + List partitionInfos = admin.listPartitionInfos(tablePath).get(); + return partitionInfos.stream() + .anyMatch( + info -> + info.getPartitionSpec() + .getSpecMap() + .equals(partitionSpec.getSpecMap())); + } catch (Exception e) { + Throwable t = ExceptionUtils.stripExecutionException(e); + if (isTableNotExist(t)) { + throw new TableNotExistException("Table does not exist: " + tablePath, e); + } else if (isTableNotPartitioned(t)) { + throw new TableNotPartitionedException("Table is not partitioned: " + tablePath); + } else { + throw new CatalogException( + String.format( + "Failed to list partitions of table %s in %s", + tablePath, catalogName), + t); + } + } + } + + @Override + public Identifier[] listFunctions(String[] namespace) throws NoSuchNamespaceException { + throw new UnsupportedOperationException(); + } + + @Override + public UnboundFunction loadFunction(Identifier ident) throws NoSuchFunctionException { + throw new UnsupportedOperationException(); + } + + @Override + public void close() { + IOUtils.closeQuietly(admin, "fluss-admin"); + IOUtils.closeQuietly(connection, "fluss-connection"); + } + + private void isValidateNamespace(String[] namespace) { + checkArgument( + namespace.length == 1, "Namespace %s is not valid", Arrays.toString(namespace)); + } + + private TablePath toTablePath(Identifier ident) { + isValidateNamespace(ident.namespace()); + return TablePath.of(ident.namespace()[0], ident.name()); + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/exception/CatalogException.java b/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/exception/CatalogException.java new file mode 100644 index 0000000000..4b5d15c80d --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/exception/CatalogException.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.exception; + +/** The exception which was throw when spark catalog process failed. */ +public class CatalogException extends RuntimeException { + + public CatalogException(String message) { + super(message); + } + + public CatalogException(Throwable cause) { + super(cause); + } + + public CatalogException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/exception/SparkRuntimeException.java b/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/exception/SparkRuntimeException.java new file mode 100644 index 0000000000..78f0e716a8 --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/exception/SparkRuntimeException.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.exception; + +/** Base class of all spark-specific unchecked exceptions. */ +public class SparkRuntimeException extends RuntimeException { + private static final long serialVersionUID = 193141189399279147L; + + /** + * Creates a new Exception with the given message and null as the cause. + * + * @param message The exception message + */ + public SparkRuntimeException(String message) { + super(message); + } + + /** + * Creates a new exception with a null message and the given cause. + * + * @param cause The exception that caused this exception + */ + public SparkRuntimeException(Throwable cause) { + super(cause); + } + + /** + * Creates a new exception with the given message and cause. + * + * @param message The exception message + * @param cause The exception that caused this exception + */ + public SparkRuntimeException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/utils/SparkConversions.java b/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/utils/SparkConversions.java new file mode 100644 index 0000000000..8d290a250a --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/utils/SparkConversions.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.utils; + +import org.apache.fluss.config.ConfigOption; +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.config.FlussConfigUtils; +import org.apache.fluss.metadata.Schema; +import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.metadata.ValidationException; +import org.apache.fluss.spark.SparkConnectorOptions; + +import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +import java.io.File; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** Utils for conversion between Spark and Fluss. */ +public class SparkConversions { + private static final DateTimeFormatter DATE_TIME_FORMATTER = + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + + /** Convert Spark's table to Fluss's table. */ + public static TableDescriptor toFlussTable( + StructType sparkSchema, Transform[] partitions, Map properties) { + // schema + Schema.Builder schemBuilder = Schema.newBuilder(); + + if (properties.containsKey(SparkConnectorOptions.PRIMARY_KEY.key())) { + List primaryKey = + Arrays.stream( + properties + .get(SparkConnectorOptions.PRIMARY_KEY.key()) + .split(",")) + .map(String::trim) + .collect(Collectors.toList()); + schemBuilder.primaryKey(primaryKey); + } + + Schema schema = + schemBuilder + .fromColumns( + Arrays.stream(sparkSchema.fields()) + .map( + field -> + new Schema.Column( + field.name(), + SparkTypeUtils.toFlussType( + field.dataType()) + .copy(field.nullable()), + field.getComment() + .getOrElse(() -> null))) + .collect(Collectors.toList())) + .build(); + + // partition keys + List partitionKeys = + Arrays.stream(partitions) + .map(partition -> partition.references()[0].describe()) + .collect(Collectors.toList()); + + // bucket keys + List bucketKey; + if (properties.containsKey(SparkConnectorOptions.BUCKET_KEY.key())) { + bucketKey = + Arrays.stream(properties.get(SparkConnectorOptions.BUCKET_KEY.key()).split(",")) + .map(String::trim) + .collect(Collectors.toList()); + } else { + // use primary keys - partition keys + bucketKey = + schema.getPrimaryKey() + .map( + pk -> { + List bucketKeys = + new ArrayList<>(pk.getColumnNames()); + bucketKeys.removeAll(partitionKeys); + return bucketKeys; + }) + .orElse(Collections.emptyList()); + } + Integer bucketNum = null; + if (properties.containsKey(SparkConnectorOptions.BUCKET_NUMBER.key())) { + bucketNum = Integer.parseInt(properties.get(SparkConnectorOptions.BUCKET_NUMBER.key())); + } + + // process properties + Map flussTableProperties = + convertSparkOptionsToFlussTableProperties(properties); + + // comment + String comment = properties.get("comment"); + + // TODO: process watermark + return TableDescriptor.builder() + .schema(schema) + .partitionedBy(partitionKeys) + .distributedBy(bucketNum, bucketKey) + .comment(comment) + .properties(flussTableProperties) + .customProperties(properties) + .build(); + } + + private static Map convertSparkOptionsToFlussTableProperties( + Map options) { + Map properties = new HashMap<>(); + for (ConfigOption option : SparkConnectorOptions.TABLE_OPTIONS) { + if (options.containsKey(option.key())) { + properties.put(option.key(), options.get(option.key())); + } + } + return properties; + } + + public static Configuration toFlussClientConfig(CaseInsensitiveStringMap options) { + Configuration flussConfig = new Configuration(); + flussConfig.setString( + SparkConnectorOptions.BOOTSTRAP_SERVERS.key(), + options.get(SparkConnectorOptions.BOOTSTRAP_SERVERS.key())); + // forward all client configs + for (ConfigOption option : FlussConfigUtils.CLIENT_OPTIONS.values()) { + if (options.get(option.key()) != null) { + flussConfig.setString(option.key(), options.get(option.key())); + } + } + + String tmpDir = flussConfig.get(SparkConnectorOptions.TMP_DIRS); + // pass io tmp dir to fluss client. + flussConfig.setString( + ConfigOptions.CLIENT_SCANNER_IO_TMP_DIR, + new File(tmpDir, "/fluss").getAbsolutePath()); + return flussConfig; + } + + public static Configuration toFlussConfig(CaseInsensitiveStringMap options) { + Configuration flussConfig = new Configuration(); + flussConfig.setString( + SparkConnectorOptions.BOOTSTRAP_SERVERS.key(), + options.get(SparkConnectorOptions.BOOTSTRAP_SERVERS.key())); + // forward all configs + options.forEach(flussConfig::setString); + + String tmpDir = flussConfig.get(SparkConnectorOptions.TMP_DIRS); + // pass io tmp dir to fluss client. + flussConfig.setString( + ConfigOptions.CLIENT_SCANNER_IO_TMP_DIR, + new File(tmpDir, "/fluss").getAbsolutePath()); + return flussConfig; + } + + /** + * Parses timestamp String to Long. + * + *

timestamp String format was given as following: + * + *

+     *     scan.startup.timestamp = 1678883047356
+     *     scan.startup.timestamp = 2023-12-09 23:09:12
+     * 
+ * + * @return timestamp as long value + */ + public static long parseTimestamp(String timestampStr, String optionKey, ZoneId timeZone) { + if (timestampStr.matches("\\d+")) { + return Long.parseLong(timestampStr); + } + + try { + return LocalDateTime.parse(timestampStr, DATE_TIME_FORMATTER) + .atZone(timeZone) + .toInstant() + .toEpochMilli(); + } catch (Exception e) { + throw new ValidationException( + String.format( + "Invalid properties '%s' should follow the format " + + "'yyyy-MM-dd HH:mm:ss' or 'timestamp', but is '%s'. " + + "You can config like: '2023-12-09 23:09:12' or '1678883047356'.", + optionKey, timestampStr), + e); + } + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/utils/SparkTypeUtils.java b/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/utils/SparkTypeUtils.java new file mode 100644 index 0000000000..a1a6498317 --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/utils/SparkTypeUtils.java @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.utils; + +import org.apache.fluss.types.BigIntType; +import org.apache.fluss.types.BinaryType; +import org.apache.fluss.types.BooleanType; +import org.apache.fluss.types.BytesType; +import org.apache.fluss.types.CharType; +import org.apache.fluss.types.DataField; +import org.apache.fluss.types.DataTypeDefaultVisitor; +import org.apache.fluss.types.DateType; +import org.apache.fluss.types.DecimalType; +import org.apache.fluss.types.DoubleType; +import org.apache.fluss.types.FloatType; +import org.apache.fluss.types.IntType; +import org.apache.fluss.types.LocalZonedTimestampType; +import org.apache.fluss.types.RowType; +import org.apache.fluss.types.SmallIntType; +import org.apache.fluss.types.StringType; +import org.apache.fluss.types.TimeType; +import org.apache.fluss.types.TimestampType; +import org.apache.fluss.types.TinyIntType; + +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.LongType; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.types.UserDefinedType; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +/** Utils for spark {@link DataType}. */ +public class SparkTypeUtils { + + private SparkTypeUtils() {} + + public static StructType fromFlussRowType(RowType type) { + return (StructType) fromFlussType(type); + } + + public static DataType fromFlussType(org.apache.fluss.types.DataType type) { + return type.accept(FlussToSparkTypeVisitor.INSTANCE); + } + + public static org.apache.fluss.types.DataType toFlussType(DataType dataType) { + return SparkToFlussTypeVisitor.visit(dataType); + } + + private static class FlussToSparkTypeVisitor extends DataTypeDefaultVisitor { + + private static final FlussToSparkTypeVisitor INSTANCE = new FlussToSparkTypeVisitor(); + + @Override + public DataType visit(CharType charType) { + return new org.apache.spark.sql.types.CharType(charType.getLength()); + } + + @Override + public DataType visit(StringType stringType) { + return DataTypes.StringType; + } + + @Override + public DataType visit(BooleanType booleanType) { + return DataTypes.BooleanType; + } + + @Override + public DataType visit(BinaryType binaryType) { + return DataTypes.BinaryType; + } + + @Override + public DataType visit(DecimalType decimalType) { + return DataTypes.createDecimalType(decimalType.getPrecision(), decimalType.getScale()); + } + + @Override + public DataType visit(TinyIntType tinyIntType) { + return DataTypes.ByteType; + } + + @Override + public DataType visit(SmallIntType smallIntType) { + return DataTypes.ShortType; + } + + @Override + public DataType visit(IntType intType) { + return DataTypes.IntegerType; + } + + @Override + public DataType visit(FloatType floatType) { + return DataTypes.FloatType; + } + + @Override + public DataType visit(DoubleType doubleType) { + return DataTypes.DoubleType; + } + + @Override + public DataType visit(DateType dateType) { + return DataTypes.DateType; + } + + @Override + public DataType visit(TimestampType timestampType) { + return DataTypes.TimestampNTZType; + } + + @Override + public DataType visit(LocalZonedTimestampType localZonedTimestampType) { + return DataTypes.TimestampType; + } + + /** + * For simplicity, as a temporary solution, we directly convert the non-null attribute to + * nullable on the Spark side. + */ + @Override + public DataType visit(RowType rowType) { + List fields = new ArrayList<>(rowType.getFieldCount()); + for (DataField field : rowType.getFields()) { + StructField structField = + DataTypes.createStructField( + field.getName(), field.getType().accept(this), true); + structField = + field.getDescription().map(structField::withComment).orElse(structField); + + fields.add(structField); + } + return DataTypes.createStructType(fields); + } + + @Override + public DataType visit(BigIntType bigIntType) { + return DataTypes.LongType; + } + + @Override + public DataType visit(TimeType timeType) { + return DataTypes.IntegerType; + } + + @Override + public DataType visit(BytesType bytesType) { + return DataTypes.BinaryType; + } + + @Override + protected DataType defaultMethod(org.apache.fluss.types.DataType dataType) { + throw new UnsupportedOperationException("Unsupported type: " + dataType); + } + } + + private static class SparkToFlussTypeVisitor { + + static org.apache.fluss.types.DataType visit(DataType type) { + return visit(type, new SparkToFlussTypeVisitor()); + } + + static org.apache.fluss.types.DataType visit( + DataType type, SparkToFlussTypeVisitor visitor) { + if (type instanceof StructType) { + StructField[] fields = ((StructType) type).fields(); + List fieldResults = new ArrayList<>(fields.length); + + for (StructField field : fields) { + fieldResults.add(visit(field.dataType(), visitor)); + } + + return visitor.struct((StructType) type, fieldResults); + + } else if (type instanceof UserDefinedType) { + throw new UnsupportedOperationException("User-defined types are not supported"); + + } else { + return visitor.atomic(type); + } + } + + public org.apache.fluss.types.DataType struct( + StructType struct, List fieldResults) { + StructField[] fields = struct.fields(); + List newFields = new ArrayList<>(fields.length); + for (int i = 0; i < fields.length; i += 1) { + StructField field = fields[i]; + org.apache.fluss.types.DataType fieldType = + fieldResults.get(i).copy(field.nullable()); + String comment = field.getComment().getOrElse(() -> null); + newFields.add(new DataField(field.name(), fieldType, comment)); + } + + return new RowType(newFields); + } + + public org.apache.fluss.types.DataType atomic(DataType atomic) { + if (atomic instanceof org.apache.spark.sql.types.BooleanType) { + return new BooleanType(); + } else if (atomic instanceof org.apache.spark.sql.types.ByteType) { + return new TinyIntType(); + } else if (atomic instanceof org.apache.spark.sql.types.ShortType) { + return new SmallIntType(); + } else if (atomic instanceof org.apache.spark.sql.types.IntegerType) { + return new IntType(); + } else if (atomic instanceof LongType) { + return new BigIntType(); + } else if (atomic instanceof org.apache.spark.sql.types.FloatType) { + return new FloatType(); + } else if (atomic instanceof org.apache.spark.sql.types.DoubleType) { + return new DoubleType(); + } else if (atomic instanceof org.apache.spark.sql.types.StringType) { + return new StringType(); + } else if (atomic instanceof org.apache.spark.sql.types.CharType) { + return new CharType(((org.apache.spark.sql.types.CharType) atomic).length()); + } else if (atomic instanceof org.apache.spark.sql.types.DateType) { + return new DateType(); + } else if (atomic instanceof org.apache.spark.sql.types.TimestampType) { + // spark only support 6 digits of precision + return new LocalZonedTimestampType(6); + } else if (atomic instanceof org.apache.spark.sql.types.TimestampNTZType) { + // spark only support 6 digits of precision + return new TimestampType(6); + } else if (atomic instanceof org.apache.spark.sql.types.DecimalType) { + return new DecimalType( + ((org.apache.spark.sql.types.DecimalType) atomic).precision(), + ((org.apache.spark.sql.types.DecimalType) atomic).scale()); + } else if (atomic instanceof org.apache.spark.sql.types.BinaryType) { + return new BytesType(); + } + + throw new UnsupportedOperationException( + "Not a supported type: " + atomic.catalogString()); + } + } + + public static RowType project(RowType inputType, int[] mapping) { + List fields = inputType.getFields(); + return new RowType( + Arrays.stream(mapping).mapToObj(fields::get).collect(Collectors.toList())); + } + + public static RowType project(RowType inputType, List names) { + List fields = inputType.getFields(); + List fieldNames = + fields.stream().map(DataField::getName).collect(Collectors.toList()); + return new RowType( + names.stream() + .map(k -> fields.get(fieldNames.indexOf(k))) + .collect(Collectors.toList())); + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/FlussInputPartition.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/FlussInputPartition.scala new file mode 100644 index 0000000000..dc80fc5605 --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/FlussInputPartition.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark + +import org.apache.fluss.metadata.TableBucket +import org.apache.spark.sql.connector.read.InputPartition + +/** A [[InputPartition]] for reading Fluss data in a batch based streaming/batch query. */ +trait FlussInputPartition extends InputPartition { + def split: FlussOffsetRange +} + +case class SimpleFlussInputPartition(split: FlussOffsetRange) extends FlussInputPartition +object FlussInputPartition { + def apply(split: FlussOffsetRange): FlussInputPartition = { + SimpleFlussInputPartition(split) + } +} + +case class FlussOffsetRange(tableBucketInfo: TableBucketInfo, fromOffset: Long, untilOffset: Long) { + def tableBucket: TableBucket = tableBucketInfo.getTableBucket + + def partitionName: String = tableBucketInfo.getPartitionName + def snapshotId: Long = tableBucketInfo.getSnapshotId + + /** ignore snapshot offset */ + def size: Long = untilOffset - fromOffset +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala new file mode 100644 index 0000000000..e5cc0ac5e1 --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark + +import org.apache.fluss.config.Configuration +import org.apache.fluss.metadata.{PartitionSpec, TableInfo} +import org.apache.fluss.spark.catalog.SparkCatalog +import org.apache.fluss.spark.utils.SparkTypeUtils +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{BoundReference, Cast, GenericInternalRow, Literal} +import org.apache.spark.sql.connector.catalog.{SupportsPartitionManagement, SupportsRead, SupportsWrite, Table, TableCapability, TableCatalog} +import org.apache.spark.sql.connector.read.ScanBuilder +import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} +import org.apache.spark.sql.types.{StringType, StructType} +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +import java.{util => ju} + +import scala.collection.JavaConverters._ + +case class SparkTable(catalog: SparkCatalog, flussConfig: Configuration, table: TableInfo) + extends Table + with SupportsRead + with SupportsWrite + with SupportsPartitionManagement { + + override def name(): String = { + table.getTablePath.toString + } + + override def schema(): StructType = { + SparkTypeUtils.fromFlussRowType(table.getRowType) + } + + override def capabilities(): ju.Set[TableCapability] = ??? + + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = ??? + + override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = ??? + + override def partitionSchema(): StructType = { + SparkTypeUtils.fromFlussRowType( + SparkTypeUtils.project(table.getRowType, table.getPartitionKeys)) + } + + override def createPartition(ident: InternalRow, properties: ju.Map[String, String]): Unit = { + catalog.createPartitions( + table.getTablePath, + convertToFlussPartitionSpec(ident, partitionSchema()), + properties) + } + + override def dropPartition(ident: InternalRow): Boolean = { + catalog.dropPartition(table.getTablePath, convertToFlussPartitionSpec(ident, partitionSchema())) + } + + override def replacePartitionMetadata( + ident: InternalRow, + properties: ju.Map[String, String]): Unit = { + throw new UnsupportedOperationException("Replace partition is not supported") + + } + + override def loadPartitionMetadata(ident: InternalRow): ju.Map[String, String] = { + throw new UnsupportedOperationException("Load partition is not supported") + + } + + override def listPartitionIdentifiers( + names: Array[String], + ident: InternalRow): Array[InternalRow] = { + val partitionSpec = convertToFlussPartitionSpec(names, ident) + if (ident.numFields == partitionSchema().length) { + if (catalog.partitionExists(table.getTablePath, partitionSpec)) { + Array(ident) + } else { + Array.empty + } + } else { + val indexes = names.map(partitionSchema().fieldIndex) + val dataTypes = names.map(partitionSchema()(_).dataType) + val currentRow = new GenericInternalRow(new Array[Any](names.length)) + catalog + .listPartitions(table.getTablePath) + .asScala + .map(part => convertToPartIdent(part, partitionSchema())) + .filter { + partition => + for (i <- names.indices) { + currentRow.values(i) = partition.get(indexes(i), dataTypes(i)) + } + currentRow == ident + } + }.toArray + } + + private def convertToFlussPartitionSpec( + ident: InternalRow, + partitionSchema: StructType): PartitionSpec = { + val partitionSpec: ju.Map[String, String] = new ju.HashMap() + partitionSchema.zipWithIndex.foreach { + case (field, index) => + val value = Cast(BoundReference(index, field.dataType, nullable = false), StringType) + .eval(ident) + .toString + partitionSpec.put(field.name, value) + } + new PartitionSpec(partitionSpec) + } + private def convertToFlussPartitionSpec( + names: Array[String], + ident: InternalRow): PartitionSpec = { + val partitionSpec = names.zipWithIndex + .map { + case (name, index) => + val value = Cast( + BoundReference(index, partitionSchema()(name).dataType, nullable = false), + StringType).eval(ident).toString + (name, value) + } + .toMap + .asJava + new PartitionSpec(partitionSpec) + } + + def convertToPartIdent( + partitionSpec: ju.Map[String, String], + partitionSchema: StructType): InternalRow = { + InternalRow.fromSeq(partitionSchema.map { + field => Cast(Literal(partitionSpec.asScala(field.name)), field.dataType, None).eval() + }) + } +} diff --git a/fluss-spark/fluss-spark-common/src/test/java/org/apache/fluss/spark/SparkInternalRowTest.java b/fluss-spark/fluss-spark-common/src/test/java/org/apache/fluss/spark/SparkInternalRowTest.java new file mode 100644 index 0000000000..438fdcf15c --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/test/java/org/apache/fluss/spark/SparkInternalRowTest.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark; + +import org.apache.fluss.row.Decimal; +import org.apache.fluss.row.GenericRow; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.row.TimestampLtz; +import org.apache.fluss.row.TimestampNtz; +import org.apache.fluss.spark.utils.SparkTypeUtils; +import org.apache.fluss.types.DataTypes; +import org.apache.fluss.types.RowType; +import org.apache.fluss.utils.DateTimeUtils; + +import org.apache.spark.sql.catalyst.CatalystTypeConverters; +import org.apache.spark.sql.catalyst.util.CharVarcharUtils; +import org.junit.jupiter.api.Test; + +import java.math.BigDecimal; +import java.nio.charset.StandardCharsets; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.TimeZone; +import java.util.stream.Collectors; + +import scala.Function1; +import scala.collection.JavaConverters; + +import static org.apache.fluss.row.BinaryString.fromString; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link SparkInternalRow}. */ +public class SparkInternalRowTest { + public static final RowType ALL_TYPES = + RowType.builder(true) // posX and posY have field id 0 and 1, here we start from 2 + .field("id", DataTypes.INT().copy(false)) + .field("name", DataTypes.STRING()) /* optional by default */ + .field("char", DataTypes.CHAR(10)) + .field("varchar", DataTypes.STRING()) + .field("salary", DataTypes.DOUBLE().copy(false)) + .field("boolean", DataTypes.BOOLEAN().copy(false)) + .field("tinyint", DataTypes.TINYINT()) + .field("smallint", DataTypes.SMALLINT()) + .field("bigint", DataTypes.BIGINT()) + .field("bytes", DataTypes.BYTES()) + .field("timestamp", DataTypes.TIMESTAMP_LTZ()) + .field("timestamp_ntz", DataTypes.TIMESTAMP()) + .field("date", DataTypes.DATE()) + .field("decimal", DataTypes.DECIMAL(2, 2)) + .field("decimal2", DataTypes.DECIMAL(38, 2)) + .field("decimal3", DataTypes.DECIMAL(10, 1)) + .build(); + + @Test + public void test() { + TimeZone tz = TimeZone.getDefault(); + TimeZone.setDefault(TimeZone.getTimeZone("Asia/Shanghai")); + InternalRow rowData = + GenericRow.of( + 1, + fromString("hzy"), + fromString("alibaba"), + fromString("fluss"), + 22.2, + true, + (byte) 22, + (short) 356, + 23567222L, + "varbinary_v".getBytes(StandardCharsets.UTF_8), + TimestampLtz.fromInstant( + LocalDateTime.parse("2007-12-03T10:15:30") + .toInstant(ZoneOffset.UTC)), + TimestampNtz.fromLocalDateTime(LocalDateTime.parse("2007-12-03T10:15:30")), + DateTimeUtils.toInternal(LocalDate.parse("2022-05-02")), + Decimal.fromBigDecimal(BigDecimal.valueOf(0.21), 2, 2), + Decimal.fromBigDecimal(BigDecimal.valueOf(65782123123.01), 38, 2), + Decimal.fromBigDecimal(BigDecimal.valueOf(62123123.5), 10, 1)); + + // CatalystTypeConverters does not support char and varchar, we need to replace char and + // varchar with string + Function1 sparkConverter = + CatalystTypeConverters.createToScalaConverter( + CharVarcharUtils.replaceCharVarcharWithString( + SparkTypeUtils.fromFlussType(ALL_TYPES))); + org.apache.spark.sql.Row sparkRow = + (org.apache.spark.sql.Row) + sparkConverter.apply(new SparkInternalRow(ALL_TYPES).replace(rowData)); + + String expected = + "1," + + "hzy," + + "alibaba," + + "fluss," + + "22.2," + + "true," + + "22," + + "356," + + "23567222," + + "[B@," + + "2007-12-03 18:15:30.0," + + "2007-12-03T10:15:30," + + "2022-05-02," + + "0.21," + + "65782123123.01," + + "62123123.5"; + assertThat(sparkRowToString(sparkRow)).isEqualTo(expected); + + SparkRow sparkRowData = new SparkRow(ALL_TYPES, sparkRow); + sparkRow = + (org.apache.spark.sql.Row) + sparkConverter.apply(new SparkInternalRow(ALL_TYPES).replace(sparkRowData)); + assertThat(sparkRowToString(sparkRow)).isEqualTo(expected); + TimeZone.setDefault(tz); + } + + private String sparkRowToString(org.apache.spark.sql.Row row) { + return JavaConverters.seqAsJavaList(row.toSeq()).stream() + .map( + x -> + (x instanceof scala.collection.Seq) + ? JavaConverters.seqAsJavaList( + (scala.collection.Seq) x) + : x) + .map(Object::toString) + // Since the toString result of Spark's binary col is unstable, replace it + .map(x -> x.startsWith("[B@") ? "[B@" : x) + .collect(Collectors.joining(",")); + } +} diff --git a/fluss-spark/fluss-spark-common/src/test/java/org/apache/fluss/spark/catalog/SparkCatalogITCase.java b/fluss-spark/fluss-spark-common/src/test/java/org/apache/fluss/spark/catalog/SparkCatalogITCase.java new file mode 100644 index 0000000000..3cb480f497 --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/test/java/org/apache/fluss/spark/catalog/SparkCatalogITCase.java @@ -0,0 +1,463 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.catalog; + +import org.apache.fluss.client.Connection; +import org.apache.fluss.client.ConnectionFactory; +import org.apache.fluss.client.admin.Admin; +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.metadata.Schema; +import org.apache.fluss.metadata.TableInfo; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.server.testutils.FlussClusterExtension; +import org.apache.fluss.types.BigIntType; +import org.apache.fluss.types.BooleanType; +import org.apache.fluss.types.BytesType; +import org.apache.fluss.types.CharType; +import org.apache.fluss.types.DateType; +import org.apache.fluss.types.DecimalType; +import org.apache.fluss.types.DoubleType; +import org.apache.fluss.types.FloatType; +import org.apache.fluss.types.IntType; +import org.apache.fluss.types.LocalZonedTimestampType; +import org.apache.fluss.types.SmallIntType; +import org.apache.fluss.types.StringType; +import org.apache.fluss.types.TimestampType; +import org.apache.fluss.types.TinyIntType; + +import org.apache.spark.SparkConf; +import org.apache.spark.sql.AnalysisException; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException; +import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** IT case for {@link SparkCatalog}. */ +public class SparkCatalogITCase { + + private static final Logger LOG = LoggerFactory.getLogger(SparkCatalogITCase.class); + + private static final String DB = "my_db"; + private static final String TABLE = "my_table"; + private static final String PARTITION_TABLE = "test_partitioned_table"; + + @RegisterExtension + public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION = + FlussClusterExtension.builder().setNumOfTabletServers(1).build(); + + private static SparkSession spark; + private static Admin admin; + + @BeforeAll + public static void beforeAll() { + Configuration flussConf = FLUSS_CLUSTER_EXTENSION.getClientConfig(); + Map configs = getSparkConfigs(flussConf); + SparkConf sparkConf = + new SparkConf().setAppName("bss-spark-unit-tests").setMaster("local[*]"); + configs.forEach(sparkConf::set); + spark = SparkSession.builder().config(sparkConf).getOrCreate(); + spark.sparkContext().setLogLevel("WARN"); + + Connection connection = ConnectionFactory.createConnection(flussConf); + admin = connection.getAdmin(); + } + + @AfterAll + public static void afterAll() { + try { + spark.close(); + admin.close(); + } catch (Exception e) { + // ignore + } + } + + @AfterEach + public void afterEach() { + sql("DROP TABLE IF EXISTS fluss_catalog." + DB + "." + TABLE); + sql("DROP TABLE IF EXISTS fluss_catalog." + DB + "." + PARTITION_TABLE); + sql("DROP DATABASE IF EXISTS fluss_catalog." + DB); + } + + @Test + public void createDatabaseTest() { + sql("CREATE DATABASE fluss_catalog." + DB); + assertThatThrownBy(() -> sql("CREATE DATABASE fluss_catalog." + DB)) + .isInstanceOf(NamespaceAlreadyExistsException.class) + .hasMessageContaining( + "[SCHEMA_ALREADY_EXISTS] Cannot create schema `my_db` because it already exists."); + sql("CREATE DATABASE IF NOT EXISTS fluss_catalog." + DB); + List databases = + sql("SHOW DATABASES IN fluss_catalog").collectAsList().stream() + .map(row -> row.getString(0)) + .collect(Collectors.toList()); + assertThat(databases.size()).isEqualTo(2); + assertThat("fluss").isIn(databases); + assertThat(DB).isIn(databases); + } + + @Test + public void dropDatabaseTest() { + sql("CREATE DATABASE fluss_catalog." + DB); + sql("DROP DATABASE fluss_catalog." + DB); + assertThatThrownBy(() -> sql("DROP DATABASE fluss_catalog." + DB)) + .isInstanceOf(NoSuchNamespaceException.class) + .hasMessageContaining("[SCHEMA_NOT_FOUND] The schema `my_db` cannot be found."); + sql("DROP DATABASE IF EXISTS fluss_catalog." + DB); + List databases = + sql("SHOW DATABASES IN fluss_catalog").collectAsList().stream() + .map(row -> row.getString(0)) + .collect(Collectors.toList()); + assertThat(databases.size()).isEqualTo(1); + assertThat(databases.get(0)).isEqualTo("fluss"); + } + + @Test + public void dropDatabaseWithCascadeTest() { + sql("CREATE DATABASE fluss_catalog." + DB); + sql("CREATE TABLE fluss_catalog." + DB + "." + TABLE + " (id INT, name STRING)"); + assertThatThrownBy(() -> sql("DROP DATABASE fluss_catalog." + DB)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining( + "[SCHEMA_NOT_EMPTY] Cannot drop a schema `my_db` because it contains objects."); + List tables = + sql("SHOW TABLES IN fluss_catalog." + DB).collectAsList().stream() + .map(row -> row.getString(1)) + .collect(Collectors.toList()); + assertThat(tables.size()).isEqualTo(1); + assertThat(tables.get(0)).isEqualTo(TABLE); + List databases = + sql("SHOW DATABASES IN fluss_catalog").collectAsList().stream() + .map(row -> row.getString(0)) + .collect(Collectors.toList()); + assertThat(databases.size()).isEqualTo(2); + + sql("DROP DATABASE fluss_catalog." + DB + " CASCADE"); + databases = + sql("SHOW DATABASES IN fluss_catalog").collectAsList().stream() + .map(row -> row.getString(0)) + .collect(Collectors.toList()); + assertThat(databases.size()).isEqualTo(1); + assertThat(databases.get(0)).isEqualTo("fluss"); + } + + @Test + public void createTableTest() { + assertThatThrownBy( + () -> + sql( + "CREATE TABLE fluss_catalog." + + DB + + "." + + TABLE + + " (id INT, name STRING)")) + .isInstanceOf(NoSuchNamespaceException.class) + .hasMessageContaining("[SCHEMA_NOT_FOUND] The schema `my_db` cannot be found."); + sql("CREATE DATABASE fluss_catalog." + DB); + sql("CREATE TABLE fluss_catalog." + DB + "." + TABLE + " (id INT, name STRING)"); + assertThatThrownBy( + () -> + sql( + "CREATE TABLE fluss_catalog." + + DB + + "." + + TABLE + + " (id INT, name STRING)")) + .isInstanceOf(TableAlreadyExistsException.class) + .hasMessageContaining( + "[TABLE_OR_VIEW_ALREADY_EXISTS] Cannot create table or view `my_db`.`my_table` because it already exists."); + sql( + "CREATE TABLE IF NOT EXISTS fluss_catalog." + + DB + + "." + + TABLE + + " (id INT, name STRING)"); + List tables = + sql("SHOW TABLES IN fluss_catalog." + DB).collectAsList().stream() + .map(row -> row.getString(1)) + .collect(Collectors.toList()); + assertThat(tables.size()).isEqualTo(1); + assertThat(tables.get(0)).isEqualTo(TABLE); + } + + @Test + public void dropTableTest() { + sql("CREATE DATABASE fluss_catalog." + DB); + sql("CREATE TABLE fluss_catalog." + DB + "." + TABLE + " (id INT, name STRING)"); + List tables = + sql("SHOW TABLES IN fluss_catalog." + DB).collectAsList().stream() + .map(row -> row.getString(1)) + .collect(Collectors.toList()); + assertThat(tables.size()).isEqualTo(1); + assertThat(tables.get(0)).isEqualTo(TABLE); + sql("DROP TABLE fluss_catalog." + DB + "." + TABLE); + assertThatThrownBy(() -> sql("DROP TABLE fluss_catalog." + DB + "." + TABLE)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining( + "[TABLE_OR_VIEW_NOT_FOUND] The table or view `fluss_catalog`.`my_db`.`my_table` cannot be found."); + sql("DROP TABLE IF EXISTS fluss_catalog." + DB + "." + TABLE); + tables = + sql("SHOW TABLES IN fluss_catalog." + DB).collectAsList().stream() + .map(row -> row.getString(1)) + .collect(Collectors.toList()); + assertThat(tables).isEmpty(); + } + + @Test + public void fieldTypeTest() { + sql("CREATE DATABASE fluss_catalog." + DB); + sql( + "CREATE TABLE fluss_catalog." + + DB + + "." + + TABLE + + " (" + + " int_field INT," + + " short_field SHORT," + + " byte_field BYTE," + + " string_field STRING," + + " boolean_field BOOLEAN," + + " long_field LONG," + + " float_field FLOAT," + + " double_field DOUBLE," + + " char_field CHAR(3)," + + " binary_field BINARY," + + " date_field DATE," + + " timestamp_field TIMESTAMP," + + " timestamp_ntz_field TIMESTAMP_NTZ," + + " decimal_field DECIMAL(10, 5)" + + ")"); + List tables = + sql("SHOW TABLES IN fluss_catalog." + DB).collectAsList().stream() + .map(row -> row.getString(1)) + .collect(Collectors.toList()); + assertThat(tables.size()).isEqualTo(1); + assertThat(tables.get(0)).isEqualTo(TABLE); + + // check spark datatype + sql("DESCRIBE TABLE fluss_catalog." + DB + "." + TABLE); + + // check fluss datatype + TableInfo tableInfo = admin.getTableInfo(TablePath.of(DB, TABLE)).join(); + assertThat(tableInfo.hasPrimaryKey()).isFalse(); + assertThat(tableInfo.getPartitionKeys()).isEmpty(); + List columns = tableInfo.getSchema().getColumns(); + assertThat(columns.size()).isEqualTo(14); + + assertThat(columns.get(0).getName()).isEqualTo("int_field"); + assertThat(columns.get(0).getDataType()).isInstanceOf(IntType.class); + assertThat(columns.get(1).getName()).isEqualTo("short_field"); + assertThat(columns.get(1).getDataType()).isInstanceOf(SmallIntType.class); + assertThat(columns.get(2).getName()).isEqualTo("byte_field"); + assertThat(columns.get(2).getDataType()).isInstanceOf(TinyIntType.class); + assertThat(columns.get(3).getName()).isEqualTo("string_field"); + assertThat(columns.get(3).getDataType()).isInstanceOf(StringType.class); + assertThat(columns.get(4).getName()).isEqualTo("boolean_field"); + assertThat(columns.get(4).getDataType()).isInstanceOf(BooleanType.class); + assertThat(columns.get(5).getName()).isEqualTo("long_field"); + assertThat(columns.get(5).getDataType()).isInstanceOf(BigIntType.class); + assertThat(columns.get(6).getName()).isEqualTo("float_field"); + assertThat(columns.get(6).getDataType()).isInstanceOf(FloatType.class); + assertThat(columns.get(7).getName()).isEqualTo("double_field"); + assertThat(columns.get(7).getDataType()).isInstanceOf(DoubleType.class); + assertThat(columns.get(8).getName()).isEqualTo("char_field"); + assertThat(columns.get(8).getDataType()).isInstanceOf(CharType.class); + assertThat(((CharType) columns.get(8).getDataType()).getLength()).isEqualTo(3); + assertThat(columns.get(9).getName()).isEqualTo("binary_field"); + assertThat(columns.get(9).getDataType()).isInstanceOf(BytesType.class); + assertThat(columns.get(10).getName()).isEqualTo("date_field"); + assertThat(columns.get(10).getDataType()).isInstanceOf(DateType.class); + assertThat(columns.get(11).getName()).isEqualTo("timestamp_field"); + assertThat(columns.get(11).getDataType()).isInstanceOf(LocalZonedTimestampType.class); + assertThat(((LocalZonedTimestampType) columns.get(11).getDataType()).getPrecision()) + .isEqualTo(6); + assertThat(columns.get(12).getName()).isEqualTo("timestamp_ntz_field"); + assertThat(columns.get(12).getDataType()).isInstanceOf(TimestampType.class); + assertThat(((TimestampType) columns.get(12).getDataType()).getPrecision()).isEqualTo(6); + assertThat(columns.get(13).getName()).isEqualTo("decimal_field"); + assertThat(columns.get(13).getDataType()).isInstanceOf(DecimalType.class); + assertThat(((DecimalType) columns.get(13).getDataType()).getPrecision()).isEqualTo(10); + assertThat(((DecimalType) columns.get(13).getDataType()).getScale()).isEqualTo(5); + } + + @Test + public void primaryKeyAndPartitionKeyTest() { + sql("CREATE DATABASE fluss_catalog." + DB); + sql( + "CREATE TABLE fluss_catalog." + + DB + + "." + + TABLE + + " (" + + " int_field INT," + + " short_field SHORT," + + " byte_field BYTE," + + " string_field STRING," + + " boolean_field BOOLEAN," + + " long_field LONG," + + " float_field FLOAT," + + " double_field DOUBLE," + + " char_field CHAR(3)," + + " binary_field BINARY," + + " date_field DATE," + + " timestamp_field TIMESTAMP," + + " timestamp_ntz_field TIMESTAMP_NTZ," + + " decimal_field DECIMAL(10, 2)" + + ") PARTITIONED BY (string_field) OPTIONS (" + + " 'primary.key' = 'int_field, string_field'," + + " 'table.auto-partition.enabled' = 'true'," + + " 'table.auto-partition.time-unit' = 'HOUR'" + + ")"); + List tables = + sql("SHOW TABLES IN fluss_catalog." + DB).collectAsList().stream() + .map(row -> row.getString(1)) + .collect(Collectors.toList()); + assertThat(tables.size()).isEqualTo(1); + assertThat(tables.get(0)).isEqualTo(TABLE); + + sql("DESCRIBE TABLE fluss_catalog." + DB + "." + TABLE); + + TableInfo tableInfo = admin.getTableInfo(TablePath.of(DB, TABLE)).join(); + // check primary key + assertThat(tableInfo.hasPrimaryKey()).isTrue(); + List primaryKey = tableInfo.getSchema().getPrimaryKey().get().getColumnNames(); + assertThat(primaryKey.size()).isEqualTo(2); + assertThat(primaryKey.get(0)).isEqualTo("int_field"); + assertThat(primaryKey.get(1)).isEqualTo("string_field"); + // check partition key + List partitionKeys = tableInfo.getPartitionKeys(); + assertThat(partitionKeys.size()).isEqualTo(1); + assertThat(partitionKeys.get(0)).isEqualTo("string_field"); + List columns = tableInfo.getSchema().getColumns(); + assertThat(columns.size()).isEqualTo(14); + } + + @Test + public void testPartitionedTable() { + sql("CREATE DATABASE fluss_catalog." + DB); + String fullName = "fluss_catalog." + DB + "." + PARTITION_TABLE; + sql(String.format("create table %s (a int, b string) partitioned by (b)", fullName)); + List tables = + sql("SHOW TABLES IN fluss_catalog." + DB).collectAsList().stream() + .map(row -> row.getString(1)) + .collect(Collectors.toList()); + assertThat(tables.size()).isEqualTo(1); + assertThat(tables.get(0)).isEqualTo(PARTITION_TABLE); + + TableInfo tableInfo = admin.getTableInfo(TablePath.of(DB, PARTITION_TABLE)).join(); + + // 1. check partition key + List partitionKeys = tableInfo.getPartitionKeys(); + assertThat(partitionKeys.size()).isEqualTo(1); + assertThat(partitionKeys.get(0)).isEqualTo("b"); + List columns = tableInfo.getSchema().getColumns(); + assertThat(columns.size()).isEqualTo(2); + + // 2. add partitions. + sql("alter table " + fullName + " add if not exists partition (b = 1)").collectAsList(); + sql("alter table " + fullName + " add if not exists partition (b = 2)").collectAsList(); + sql("alter table " + fullName + " add if not exists partition (b = 3)").collectAsList(); + List expectedShowPartitionsResult = Arrays.asList("[b=1]", "[b=2]", "[b=3]"); + List result = + sql("show partitions " + fullName).collectAsList().stream() + .map(Row::toString) + .collect(Collectors.toList()); + assertThat(result).isEqualTo(expectedShowPartitionsResult); + + // 3. drop partitions. + sql("alter table " + fullName + " drop if exists partition (b = 1)").collectAsList(); + + expectedShowPartitionsResult = Arrays.asList("[b=2]", "[b=3]"); + result = + sql("show partitions " + fullName).collectAsList().stream() + .map(Row::toString) + .collect(Collectors.toList()); + assertThat(result).isEqualTo(expectedShowPartitionsResult); + } + + @Test + public void commentTest() { + sql("CREATE DATABASE fluss_catalog." + DB); + sql( + "CREATE TABLE fluss_catalog." + + DB + + "." + + TABLE + + " (" + + " id INT COMMENT 'id comment test'," + + " first_name STRING COMMENT 'first name comment test'," + + " last_name STRING" + + ") COMMENT 'table comment test'"); + + List tables = + sql("SHOW TABLES IN fluss_catalog." + DB).collectAsList().stream() + .map(row -> row.getString(1)) + .collect(Collectors.toList()); + assertThat(tables.size()).isEqualTo(1); + assertThat(tables.get(0)).isEqualTo(TABLE); + + List comments = + sql("DESCRIBE TABLE fluss_catalog." + DB + "." + TABLE).collectAsList().stream() + .map(row -> row.getString(2)) + .collect(Collectors.toList()); + assertThat(comments.size()).isEqualTo(3); + assertThat(comments.get(0)).isEqualTo("id comment test"); + assertThat(comments.get(1)).isEqualTo("first name comment test"); + assertThat(comments.get(2)).isNull(); + } + + private static Map getSparkConfigs(Configuration flussConf) { + Map configs = new HashMap<>(); + configs.put("spark.sql.catalog.fluss_catalog", SparkCatalog.class.getName()); + configs.put( + "spark.sql.catalog.fluss_catalog.bootstrap.servers", + String.join(",", flussConf.get(ConfigOptions.BOOTSTRAP_SERVERS))); + return configs; + } + + public static Dataset sql(String sqlText) { + Dataset ds = spark.sql(sqlText); + if (ds.columns().length == 0) { + LOG.info("+----------------+"); + LOG.info("| Empty Result |"); + LOG.info("+----------------+"); + } else { + ds.show(20, 50); + } + return ds; + } +} diff --git a/fluss-spark/fluss-spark-common/src/test/java/org/apache/fluss/spark/utils/SparkConversionsTest.java b/fluss-spark/fluss-spark-common/src/test/java/org/apache/fluss/spark/utils/SparkConversionsTest.java new file mode 100644 index 0000000000..16efcf4749 --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/test/java/org/apache/fluss/spark/utils/SparkConversionsTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.utils; + +import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.spark.SparkConnectorOptions; + +import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link SparkConversions}. */ +public class SparkConversionsTest { + + @Test + void testTableConversion() { + StructField[] sparkColumns = + new StructField[] { + new StructField( + "order_id", + org.apache.spark.sql.types.DataTypes.LongType, + false, + Metadata.empty()), + new StructField( + "order_name", + org.apache.spark.sql.types.DataTypes.StringType, + true, + Metadata.empty()) + }; + + // test convert spark table to fluss table + StructType structType = new StructType(sparkColumns); + Transform[] transforms = new Transform[0]; + Map properties = new HashMap<>(); + properties.put(SparkConnectorOptions.PRIMARY_KEY.key(), "order_id"); + properties.put("comment", "test comment"); + properties.put("k1", "v1"); + properties.put("k2", "v2"); + TableDescriptor flussTable = + SparkConversions.toFlussTable(structType, transforms, properties); + + String expectFlussTableString = + "TableDescriptor{schema=(" + + "order_id BIGINT NOT NULL," + + "order_name STRING," + + "CONSTRAINT PK_order_id PRIMARY KEY (order_id)" + + "), comment='test comment', partitionKeys=[], " + + "tableDistribution={bucketKeys=[order_id] bucketCount=null}, " + + "properties={}, " + + "customProperties={comment=test comment, primary.key=order_id, k1=v1, k2=v2}" + + "}"; + assertThat(flussTable.toString()).isEqualTo(expectFlussTableString); + } +} diff --git a/fluss-spark/fluss-spark-common/src/test/java/org/apache/fluss/spark/utils/SparkTypeUtilsTest.java b/fluss-spark/fluss-spark-common/src/test/java/org/apache/fluss/spark/utils/SparkTypeUtilsTest.java new file mode 100644 index 0000000000..c91dd4e162 --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/test/java/org/apache/fluss/spark/utils/SparkTypeUtilsTest.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.utils; + +import org.apache.fluss.metadata.Schema; +import org.apache.fluss.types.DataTypes; + +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link SparkTypeUtils}. */ +public class SparkTypeUtilsTest { + + @Test + void testTypeConversion() { + // fluss columns + List flussColumns = + Arrays.asList( + new Schema.Column("a", DataTypes.BOOLEAN().copy(false), null), + new Schema.Column("b", DataTypes.TINYINT().copy(false), null), + new Schema.Column("c", DataTypes.SMALLINT(), "comment1"), + new Schema.Column("d", DataTypes.INT(), "comment2"), + new Schema.Column("e", DataTypes.BIGINT(), null), + new Schema.Column("f", DataTypes.FLOAT(), null), + new Schema.Column("g", DataTypes.DOUBLE(), null), + new Schema.Column("h", DataTypes.CHAR(1), null), + new Schema.Column("i", DataTypes.STRING(), null), + new Schema.Column("j", DataTypes.DECIMAL(10, 2), null), + new Schema.Column("k", DataTypes.BYTES(), null), + new Schema.Column("l", DataTypes.DATE(), null), + new Schema.Column("m", DataTypes.TIMESTAMP_LTZ(6), null), + new Schema.Column("n", DataTypes.TIMESTAMP(6), null)); + + // spark columns + List sparkColumns = + Arrays.asList( + new StructField( + "a", + org.apache.spark.sql.types.DataTypes.BooleanType, + false, + Metadata.empty()), + new StructField( + "b", + org.apache.spark.sql.types.DataTypes.ByteType, + false, + Metadata.empty()), + new StructField( + "c", + org.apache.spark.sql.types.DataTypes.ShortType, + true, + Metadata.empty()) + .withComment("comment1"), + new StructField( + "d", + org.apache.spark.sql.types.DataTypes.IntegerType, + true, + Metadata.empty()) + .withComment("comment2"), + new StructField( + "e", + org.apache.spark.sql.types.DataTypes.LongType, + true, + Metadata.empty()), + new StructField( + "f", + org.apache.spark.sql.types.DataTypes.FloatType, + true, + Metadata.empty()), + new StructField( + "g", + org.apache.spark.sql.types.DataTypes.DoubleType, + true, + Metadata.empty()), + new StructField( + "h", + new org.apache.spark.sql.types.CharType(1), + true, + Metadata.empty()), + new StructField( + "i", + org.apache.spark.sql.types.DataTypes.StringType, + true, + Metadata.empty()), + new StructField( + "j", + org.apache.spark.sql.types.DataTypes.createDecimalType(10, 2), + true, + Metadata.empty()), + new StructField( + "k", + org.apache.spark.sql.types.DataTypes.BinaryType, + true, + Metadata.empty()), + new StructField( + "l", + org.apache.spark.sql.types.DataTypes.DateType, + true, + Metadata.empty()), + new StructField( + "m", + org.apache.spark.sql.types.DataTypes.TimestampType, + true, + Metadata.empty()), + new StructField( + "n", + org.apache.spark.sql.types.DataTypes.TimestampNTZType, + true, + Metadata.empty())); + + // test from fluss columns to spark columns + List actualSparkColumns = new ArrayList<>(); + for (Schema.Column flussColumn : flussColumns) { + StructField field = + new StructField( + flussColumn.getName(), + SparkTypeUtils.fromFlussType(flussColumn.getDataType()), + flussColumn.getDataType().isNullable(), + Metadata.empty()); + if (flussColumn.getComment().isPresent()) { + field.withComment(flussColumn.getComment().get()); + } + actualSparkColumns.add(field); + } + assertThat(actualSparkColumns.toString()).isEqualTo(sparkColumns.toString()); + + // test from spark columns to fluss columns + List actualFlussColumns = new ArrayList<>(); + for (StructField sparkColumn : sparkColumns) { + actualFlussColumns.add( + new Schema.Column( + sparkColumn.name(), + SparkTypeUtils.toFlussType(sparkColumn.dataType()) + .copy(sparkColumn.nullable()), + sparkColumn.getComment().getOrElse(() -> null))); + } + assertThat(actualFlussColumns).isEqualTo(flussColumns); + + // test TIME type + assertThat(SparkTypeUtils.fromFlussType(DataTypes.TIME())) + .isEqualTo(org.apache.spark.sql.types.DataTypes.IntegerType); + } +} diff --git a/fluss-spark/pom.xml b/fluss-spark/pom.xml new file mode 100644 index 0000000000..05924298b9 --- /dev/null +++ b/fluss-spark/pom.xml @@ -0,0 +1,381 @@ + + + + + 4.0.0 + + org.apache.fluss + fluss + 0.8-SNAPSHOT + + + fluss-spark + + Fluss : Engine Spark : + + pom + + + fluss-spark-common + fluss-spark-3.3 + + + + UTF-8 + 2.12.15 + 2.12 + ${scala212.version} + 3.2.2 + 2.1.0 + 128m + + -XX:+IgnoreUnrecognizedVMOptions + --add-opens=java.base/java.lang=ALL-UNNAMED + --add-opens=java.base/java.lang.invoke=ALL-UNNAMED + --add-opens=java.base/java.lang.reflect=ALL-UNNAMED + --add-opens=java.base/java.io=ALL-UNNAMED + --add-opens=java.base/java.net=ALL-UNNAMED + --add-opens=java.base/java.nio=ALL-UNNAMED + --add-opens=java.base/java.util=ALL-UNNAMED + --add-opens=java.base/java.util.concurrent=ALL-UNNAMED + --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED + --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED + --add-opens=java.base/sun.nio.ch=ALL-UNNAMED + --add-opens=java.base/sun.nio.cs=ALL-UNNAMED + --add-opens=java.base/sun.security.action=ALL-UNNAMED + --add-opens=java.base/sun.util.calendar=ALL-UNNAMED + -Djdk.reflect.useDirectMethodHandle=false + -Dio.netty.tryReflectionSetAccessible=true + + + + + + + org.apache.spark + spark-sql_${scala.binary.version} + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + org.apache.logging.log4j + log4j-slf4j2-impl + + + org.apache.orc + orc-core + + + org.apache.orc + orc-mapreduce + + + org.apache.parquet + parquet-column + + + + + + org.apache.spark + spark-core_${scala.binary.version} + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + + org.apache.logging.log4j + log4j-slf4j2-impl + + + org.apache.orc + orc-core + + + org.apache.orc + orc-mapreduce + + + org.apache.parquet + parquet-column + + + com.google.protobuf + protobuf-java + + + + + + + + org.apache.spark + spark-sql_${scala.binary.version} + tests + test + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + org.apache.logging.log4j + log4j-slf4j2-impl + + + org.apache.orc + orc-core + + + org.apache.parquet + parquet-column + + + org.apache.parquet + parquet-hadoop + + + + + + org.apache.spark + spark-catalyst_${scala.binary.version} + tests + test + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + org.apache.orc + orc-core + + + org.apache.parquet + parquet-column + + + org.apache.parquet + parquet-hadoop + + + + + + org.apache.spark + spark-core_${scala.binary.version} + tests + test + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + org.apache.logging.log4j + log4j-slf4j2-impl + + + com.google.protobuf + protobuf-java + + + org.apache.orc + orc-core + + + + + + org.apache.spark + spark-hive_${scala.binary.version} + test + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + com.google.protobuf + protobuf-java + + + org.apache.orc + orc-core + + + + + + + + org.scala-lang + scala-library + ${scala.version} + + + + org.scala-lang + scala-reflect + ${scala.version} + + + + org.scala-lang + scala-compiler + ${scala.version} + + + + + + + org.scalatest + scalatest_${scala.binary.version} + 3.1.0 + test + + + + + org.apache.fluss + fluss-server + ${project.version} + test + + + + org.apache.fluss + fluss-server + ${project.version} + test + test-jar + + + + org.apache.fluss + fluss-test-utils + ${project.version} + test + + + + + + + + + + + + net.alchim31.maven + scala-maven-plugin + ${scala-maven-plugin.version} + + + + scala-compile-first + process-resources + + add-source + compile + + + + + + scala-test-compile + process-test-resources + + testCompile + + + + + ${scala.version} + false + + -nobootcp + -target:jvm-1.8 + + + + + + + org.scalatest + scalatest-maven-plugin + ${scalatest-maven-plugin.version} + + ${project.build.directory}/surefire-reports + . + -ea -Xmx4g -Xss4m -XX:MaxMetaspaceSize=2g -XX:ReservedCodeCacheSize=${CodeCacheSize} ${extraJavaTestArgs} -Dio.netty.tryReflectionSetAccessible=true + PaimonTestSuite.txt + + once + true + + + + test + + test + + + + + + + \ No newline at end of file diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml index aaa08c07db..f53f724ab9 100644 --- a/fluss-test-coverage/pom.xml +++ b/fluss-test-coverage/pom.xml @@ -416,6 +416,7 @@ org.apache.fluss.flink.tiering.FlussLakeTieringEntrypoint + com.alibaba.fluss.spark.* diff --git a/fluss-test-utils/src/main/java/org/apache/fluss/testutils/common/FlussAssertions.java b/fluss-test-utils/src/main/java/org/apache/fluss/testutils/common/FlussAssertions.java index f9de9002dd..27a17125da 100644 --- a/fluss-test-utils/src/main/java/org/apache/fluss/testutils/common/FlussAssertions.java +++ b/fluss-test-utils/src/main/java/org/apache/fluss/testutils/common/FlussAssertions.java @@ -35,9 +35,7 @@ public class FlussAssertions { private FlussAssertions() {} - /** - * @see #chainOfCauses(Throwable) - */ + /** @see #chainOfCauses(Throwable) */ @SuppressWarnings({"rawtypes", "unused"}) public static final InstanceOfAssertFactory> STREAM_THROWABLE = new InstanceOfAssertFactory<>(Stream.class, Assertions::assertThat); diff --git a/pom.xml b/pom.xml index 9ae2a7527a..9220986418 100644 --- a/pom.xml +++ b/pom.xml @@ -77,6 +77,7 @@ fluss-lake fluss-kafka tools/ci/fluss-ci-tools + fluss-spark @@ -1005,7 +1006,7 @@ ${skip.on.java8} - 1.15.0 + 1.7 @@ -1018,6 +1019,14 @@ + + + + + ${spotless.scalafmt.version} + ${maven.multiModuleProjectDirectory}/.scalafmt.conf + + diff --git a/tools/ci/fluss-ci-tools/src/main/java/org/apache/fluss/tools/ci/licensecheck/NoticeFileChecker.java b/tools/ci/fluss-ci-tools/src/main/java/org/apache/fluss/tools/ci/licensecheck/NoticeFileChecker.java index 051a50718e..a896dbb592 100644 --- a/tools/ci/fluss-ci-tools/src/main/java/org/apache/fluss/tools/ci/licensecheck/NoticeFileChecker.java +++ b/tools/ci/fluss-ci-tools/src/main/java/org/apache/fluss/tools/ci/licensecheck/NoticeFileChecker.java @@ -304,8 +304,7 @@ static Map> checkNoticeFile( // find all dependencies missing from NOTICE file Collection expectedDependencies = - modulesWithShadedDependencies - .getOrDefault(moduleName, Collections.emptySet()) + modulesWithShadedDependencies.getOrDefault(moduleName, Collections.emptySet()) .stream() .filter( dependency -> diff --git a/tools/ci/fluss-ci-tools/src/main/java/org/apache/fluss/tools/ci/utils/deploy/DeployParser.java b/tools/ci/fluss-ci-tools/src/main/java/org/apache/fluss/tools/ci/utils/deploy/DeployParser.java index db013715ec..bdf0fc1645 100644 --- a/tools/ci/fluss-ci-tools/src/main/java/org/apache/fluss/tools/ci/utils/deploy/DeployParser.java +++ b/tools/ci/fluss-ci-tools/src/main/java/org/apache/fluss/tools/ci/utils/deploy/DeployParser.java @@ -58,8 +58,7 @@ public static Set parseDeployOutput(File buildResult) throws IOException static Set parseDeployOutput(Stream lines) { return ParserUtils.parsePluginOutput( lines, DEPLOY_MODULE_PATTERN, DeployParser::parseDeployBlock) - .entrySet() - .stream() + .entrySet().stream() .filter(Map.Entry::getValue) .map(Map.Entry::getKey) .collect(Collectors.toSet());