Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
daf652b
[connector] support spark catalog
chunxiaozheng Dec 20, 2024
be8528e
[connector] Support spark catalog and introduce some basic classes to…
Alibaba-HZY Mar 10, 2025
12f9d26
Merge branch 'refs/heads/main' into spark-3.3-catalog
Alibaba-HZY Mar 10, 2025
089c172
Merge branch 'refs/heads/main' into spark-3.3-catalog
Alibaba-HZY Mar 10, 2025
4bcf19b
[license] Update License Copyright year
Alibaba-HZY Mar 10, 2025
4033244
[connector-spark] some fix
Alibaba-HZY Mar 10, 2025
f10f1f0
[connector-spark] support partition manager
Alibaba-HZY Mar 11, 2025
983663a
[connector-spark] fix it test
Alibaba-HZY Mar 12, 2025
2190ed9
Merge branch 'refs/heads/main' into spark-catalog
Alibaba-HZY Mar 16, 2025
e121544
[spark] Rename module "fluss-connector-spark" to "fluss-spark"
Alibaba-HZY Mar 16, 2025
623b75c
[spark]fix
Alibaba-HZY Mar 22, 2025
64d671a
Merge branch 'refs/heads/main' into spark-catalog
Alibaba-HZY Apr 23, 2025
d266cf4
Merge branch 'refs/heads/main' into spark-catalog
Alibaba-HZY Apr 23, 2025
b561684
[connector] Support spark catalog and introduce some basic classes to…
Alibaba-HZY Apr 23, 2025
2e3a906
[connector] Support spark catalog and introduce some basic classes to…
Alibaba-HZY Sep 3, 2025
4a83b9b
[connector] Support spark catalog and introduce some basic classes to…
Alibaba-HZY Sep 3, 2025
ea205d2
[connector] Support spark catalog and introduce some basic classes to…
Alibaba-HZY Sep 3, 2025
c7bea0d
[connector] Support spark catalog and introduce some basic classes to…
Alibaba-HZY Sep 3, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
runner.dialect = scala212

# Version is required to make sure IntelliJ picks the right version
version = 3.4.3
preset = default

# Max column
maxColumn = 100

# This parameter simply says the .stripMargin method was not redefined by the user to assign
# special meaning to indentation preceding the | character. Hence, that indentation can be modified.
assumeStandardLibraryStripMargin = true
align.stripMargin = true

# Align settings
align.preset = none
align.closeParenSite = false
align.openParenCallSite = false
danglingParentheses.defnSite = false
danglingParentheses.callSite = false
danglingParentheses.ctrlSite = true
danglingParentheses.tupleSite = false
align.openParenCallSite = false
align.openParenDefnSite = false
align.openParenTupleSite = false

# Newlines
newlines.alwaysBeforeElseAfterCurlyIf = false
newlines.beforeCurlyLambdaParams = multiline # Newline before lambda params
newlines.afterCurlyLambdaParams = squash # No newline after lambda params
newlines.inInterpolation = "avoid"
newlines.avoidInResultType = true
optIn.annotationNewlines = true

# Scaladoc
docstrings.style = Asterisk # Javadoc style
docstrings.removeEmpty = true
docstrings.oneline = fold
docstrings.forceBlankLineBefore = true

# Indentation
indent.extendSite = 2 # This makes sure extend is not indented as the ctor parameters

# Rewrites
rewrite.rules = [AvoidInfix, Imports, RedundantBraces, SortModifiers]

# Imports
rewrite.imports.sort = scalastyle
rewrite.imports.groups = [
["com.alibaba.fluss\\..*"],
["com.alibaba.fluss.shade\\..*"],
[".*"],
["javax\\..*"],
["java\\..*"],
["scala\\..*"]
]
rewrite.imports.contiguousGroups = no
importSelectors = singleline # Imports in a single line, like IntelliJ

# Remove redundant braces in string interpolation.
rewrite.redundantBraces.stringInterpolation = true
rewrite.redundantBraces.defnBodies = false
rewrite.redundantBraces.generalExpressions = false
rewrite.redundantBraces.ifElseExpressions = false
rewrite.redundantBraces.methodBodies = false
rewrite.redundantBraces.includeUnitMethods = false
rewrite.redundantBraces.maxBreaks = 1

# Remove trailing commas
rewrite.trailingCommas.style = "never"
Original file line number Diff line number Diff line change
Expand Up @@ -919,7 +919,7 @@ void testInvalidColumnProjection() throws Exception {
assertThatThrownBy(() -> createLogScanner(table, new int[] {1, 2, 3, 4, 5}))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage(
"Projected field index 2 is out of bound for schema ROW<`a` INT, `b` STRING>");
"Projected field index 2 is out of bound for schema ROW<`a` INT '...', `b` STRING '...'>");
}

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,7 @@ public ServerNode getCoordinatorServer() {
return coordinatorServer;
}

/**
* @return The known set of alive tablet servers.
*/
/** @return The known set of alive tablet servers. */
public Map<Integer, ServerNode> getAliveTabletServers() {
return aliveTabletServersById;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,10 @@ public interface MemorySegmentPool {
*/
void returnAll(List<MemorySegment> memory);

/**
* @return Free page number.
*/
/** @return Free page number. */
int freePages();

/**
* @return the available memory size in bytes.
*/
/** @return the available memory size in bytes. */
long availableMemory();

void close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ private Schema(List<Column> columns, @Nullable PrimaryKey primaryKey) {
.map(
column ->
new DataField(
column.getName(), column.getDataType()))
column.getName(),
column.getDataType(),
column.getComment().orElse(null)))
.collect(Collectors.toList()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,7 @@ public interface Predicate extends Serializable {
*/
boolean test(long rowCount, InternalRow minValues, InternalRow maxValues, Long[] nullCounts);

/**
* @return the negation predicate of this predicate if possible.
*/
/** @return the negation predicate of this predicate if possible. */
Optional<Predicate> negate();

<T> T visit(PredicateVisitor<T> visitor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,9 +257,7 @@ public void append(short schemaId, BinaryRow row) throws IOException {
currentRecordNumber++;
}

/**
* @param valueBytes consisted of schema id and the row encoded in the value bytes
*/
/** @param valueBytes consisted of schema id and the row encoded in the value bytes */
public void append(byte[] valueBytes) throws IOException {
if (isClosed) {
throw new IllegalStateException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@ public class SendWritableOutput extends ByteBufWritableOutput {
/** The current reader index of the underlying {@link #buf} for building next {@link Send}. */
private int currentReaderIndex = 0;

/**
* @param buf The ByteBuf that has capacity of data size excluding zero-copy.
*/
/** @param buf The ByteBuf that has capacity of data size excluding zero-copy. */
public SendWritableOutput(ByteBuf buf) {
super(buf);
this.sends = new ArrayDeque<>(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,16 +93,12 @@ public UUID remoteLogSegmentId() {
return remoteLogSegmentId;
}

/**
* @return Remote log start offset of this segment (inclusive).
*/
/** @return Remote log start offset of this segment (inclusive). */
public long remoteLogStartOffset() {
return remoteLogStartOffset;
}

/**
* @return Remote log end offset of this segment (inclusive).
*/
/** @return Remote log end offset of this segment (inclusive). */
public long remoteLogEndOffset() {
return remoteLogEndOffset;
}
Expand Down
10 changes: 10 additions & 0 deletions fluss-common/src/main/java/org/apache/fluss/row/TimestampNtz.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ public class TimestampNtz implements Comparable<TimestampNtz>, Serializable {
// the number of milliseconds in a day.
private static final long MILLIS_PER_DAY = 86400000; // = 24 * 60 * 60 * 1000

public static final long MICROS_PER_MILLIS = 1000L;

public static final long NANOS_PER_MICROS = 1000L;

// this field holds the integral second and the milli-of-second.
private final long millisecond;

Expand All @@ -70,6 +74,12 @@ public int getNanoOfMillisecond() {
return nanoOfMillisecond;
}

/** Converts this {@link TimestampNtz} object to micros. */
public long toMicros() {
long micros = Math.multiplyExact(millisecond, MICROS_PER_MILLIS);
return micros + nanoOfMillisecond / NANOS_PER_MICROS;
}

/**
* Creates an instance of {@link TimestampNtz} from milliseconds.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.fluss.flink.utils;
package org.apache.fluss.utils;

import org.apache.fluss.exception.DatabaseAlreadyExistException;
import org.apache.fluss.exception.DatabaseNotEmptyException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ public class DateTimeUtils {
/** The number of milliseconds in an hour. */
private static final long MILLIS_PER_HOUR = 3600000L; // = 60 * 60 * 1000

/**
* The number of milliseconds in a day.
*
* <p>This is the modulo 'mask' used when converting TIMESTAMP values to DATE and TIME values.
*/
public static final long MILLIS_PER_DAY = 86400000L; // = 24 * 60 * 60 * 1000

private static final DateTimeFormatter DEFAULT_TIMESTAMP_FORMATTER =
new DateTimeFormatterBuilder()
.appendPattern("yyyy-[MM][M]-[dd][d]")
Expand Down Expand Up @@ -256,6 +263,19 @@ public static Integer parseTime(String v) {
+ milli;
}

/**
* Converts the Java type used for UDF parameters of SQL DATE type ({@link java.sql.Date}) to
* internal representation (int).
*/
public static int toInternal(java.sql.Date date) {
long ts = date.getTime() + TimeZone.getDefault().getOffset(date.getTime());
return (int) (ts / MILLIS_PER_DAY);
}

public static int toInternal(LocalDate date) {
return ymdToUnixDate(date.getYear(), date.getMonthValue(), date.getDayOfMonth());
}

private static boolean isInteger(String s) {
boolean isInt = s.length() > 0;
for (int i = 0; i < s.length(); i++) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.utils;

import org.apache.fluss.row.BinaryString;
import org.apache.fluss.row.Decimal;
import org.apache.fluss.row.GenericRow;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.row.indexed.IndexedRow;
import org.apache.fluss.types.DataType;
import org.apache.fluss.types.RowType;

import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.List;

/** Utils for {@link InternalRow} structures. */
public class InternalRowUtils {

public static InternalRow copyInternalRow(InternalRow row, RowType rowType) {
if (row instanceof IndexedRow) {
return ((IndexedRow) row).copy();
} else {
GenericRow ret = new GenericRow(row.getFieldCount());

InternalRow.FieldGetter[] fieldGetters = createFieldGetters(rowType.getChildren());
for (int i = 0; i < row.getFieldCount(); ++i) {
ret.setField(i, fieldGetters[i].getFieldOrNull(row));
}

return ret;
}
}

public static Object copy(Object o, DataType type) {
if (o instanceof BinaryString) {
return ((BinaryString) o).copy();
} else if (o instanceof InternalRow) {
return copyInternalRow((InternalRow) o, (RowType) type);
} else if (o instanceof Decimal) {
return ((Decimal) o).copy();
}
return o;
}

public static long castToIntegral(Decimal dec) {
BigDecimal bd = dec.toBigDecimal();
// rounding down. This is consistent with float=>int,
// and consistent with SQLServer, Spark.
bd = bd.setScale(0, RoundingMode.DOWN);
return bd.longValue();
}

public static InternalRow.FieldGetter[] createFieldGetters(List<DataType> fieldTypes) {
InternalRow.FieldGetter[] fieldGetters = new InternalRow.FieldGetter[fieldTypes.size()];
for (int i = 0; i < fieldTypes.size(); i++) {
fieldGetters[i] = createNullCheckingFieldGetter(fieldTypes.get(i), i);
}
return fieldGetters;
}

public static InternalRow.FieldGetter createNullCheckingFieldGetter(
DataType dataType, int index) {
InternalRow.FieldGetter getter = InternalRow.createFieldGetter(dataType, index);
if (dataType.isNullable()) {
return getter;
} else {
return row -> {
if (row.isNullAt(index)) {
return null;
}
return getter.getFieldOrNull(row);
};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,12 @@ public static <L, R> Either<L, R> right(R value) {
*/
public abstract R right() throws IllegalStateException;

/**
* @return true if this is a Left value, false if this is a Right value
*/
/** @return true if this is a Left value, false if this is a Right value */
public final boolean isLeft() {
return getClass() == Left.class;
}

/**
* @return true if this is a Right value, false if this is a Left value
*/
/** @return true if this is a Right value, false if this is a Left value */
public final boolean isRight() {
return getClass() == Right.class;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,7 @@ void testIdentifierCaseInsensitive() {
.isInstanceOf(TestIdentifierClientAuthenticator.class);
assertThat(
AuthenticationFactory.loadServerAuthenticatorSuppliers(configuration)
.values()
.stream()
.values().stream()
.findAny()
.get()
.get())
Expand All @@ -94,8 +93,7 @@ void testIdentifierCaseInsensitive() {
.isInstanceOf(TestIdentifierClientAuthenticator.class);
assertThat(
AuthenticationFactory.loadServerAuthenticatorSuppliers(configuration)
.values()
.stream()
.values().stream()
.findAny()
.get()
.get())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ private static void applyInMemoryStorage(FileSystem fileSystem) throws IOExcepti
GoogleCloudStorageFileSystem googleCloudStorageFileSystem =
new GoogleCloudStorageFileSystem(
googleCloudStorageOptions -> inMemoryGoogleCloudStorage,
GoogleCloudStorageFileSystemOptions.DEFAULT.toBuilder()
GoogleCloudStorageFileSystemOptions.DEFAULT
.toBuilder()
.setCloudStorageOptions(inMemoryGoogleCloudStorage.getOptions())
.build());

Expand Down
Loading
Loading