Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.fluss.config.cluster.AlterConfig;
import org.apache.fluss.config.cluster.ConfigEntry;
import org.apache.fluss.exception.LeaderNotAvailableException;
import org.apache.fluss.fs.FsPath;
import org.apache.fluss.metadata.DatabaseDescriptor;
import org.apache.fluss.metadata.DatabaseInfo;
import org.apache.fluss.metadata.DatabaseSummary;
Expand Down Expand Up @@ -291,6 +292,11 @@ public CompletableFuture<TableInfo> getTableInfo(TablePath tablePath) {
r.getTableId(),
r.getSchemaId(),
TableDescriptor.fromJsonBytes(r.getTableJson()),
// For backward compatibility, results returned by old
// clusters do not include the remote data dir
r.hasRemoteDataDir()
? new FsPath(r.getRemoteDataDir())
: null,
r.getCreatedTime(),
r.getModifiedTime()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,9 @@ void fetchOnce() throws Exception {
(bytes, throwable) -> {
if (throwable != null) {
LOG.error(
"Failed to download remote log segment file {}.",
"Failed to download remote log segment file {} for bucket {}.",
fsPathAndFileName.getFileName(),
request.segment.tableBucket(),
ExceptionUtils.stripExecutionException(throwable));
// release the semaphore for the failed request
prefetchSemaphore.release();
Expand All @@ -178,8 +179,9 @@ void fetchOnce() throws Exception {
scannerMetricGroup.remoteFetchErrorCount().inc();
} else {
LOG.info(
"Successfully downloaded remote log segment file {} to local cost {} ms.",
"Successfully downloaded remote log segment file {} for bucket {} to local cost {} ms.",
fsPathAndFileName.getFileName(),
request.segment.tableBucket(),
System.currentTimeMillis() - startTime);
File localFile =
new File(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import static org.apache.fluss.record.TestData.DATA2_TABLE_ID;
import static org.apache.fluss.record.TestData.DATA2_TABLE_INFO;
import static org.apache.fluss.record.TestData.DATA2_TABLE_PATH;
import static org.apache.fluss.record.TestData.DEFAULT_REMOTE_DATA_DIR;
import static org.apache.fluss.record.TestData.DEFAULT_SCHEMA_ID;
import static org.apache.fluss.row.BinaryString.fromString;
import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toByteBuffer;
Expand Down Expand Up @@ -169,6 +170,7 @@ void testProjection(LogFormat logFormat, byte magic) throws Exception {
.distributedBy(3)
.logFormat(logFormat)
.build(),
DEFAULT_REMOTE_DATA_DIR,
System.currentTimeMillis(),
System.currentTimeMillis());
long fetchOffset = 0L;
Expand Down Expand Up @@ -313,6 +315,7 @@ void testComplexTypeFetch() throws Exception {
.distributedBy(3)
.logFormat(LogFormat.ARROW)
.build(),
DEFAULT_REMOTE_DATA_DIR,
System.currentTimeMillis(),
System.currentTimeMillis());
long fetchOffset = 0L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ void testFetchWithSchemaChange() throws Exception {
DATA1_TABLE_INFO.getNumBuckets(),
DATA1_TABLE_INFO.getProperties(),
DATA1_TABLE_INFO.getCustomProperties(),
DATA1_TABLE_INFO.getRemoteDataDir(),
DATA1_TABLE_INFO.getComment().orElse(null),
DATA1_TABLE_INFO.getCreatedTime(),
DATA1_TABLE_INFO.getModifiedTime()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import static org.apache.fluss.record.TestData.DATA2_TABLE_ID;
import static org.apache.fluss.record.TestData.DATA2_TABLE_INFO;
import static org.apache.fluss.record.TestData.DATA2_TABLE_PATH;
import static org.apache.fluss.record.TestData.DEFAULT_REMOTE_DATA_DIR;
import static org.apache.fluss.record.TestData.DEFAULT_SCHEMA_ID;
import static org.apache.fluss.testutils.DataTestUtils.genLogFile;
import static org.apache.fluss.utils.FlussPaths.remoteLogSegmentDir;
Expand Down Expand Up @@ -218,6 +219,7 @@ void testProjection(String format) throws Exception {
.distributedBy(3)
.logFormat(logFormat)
.build(),
DEFAULT_REMOTE_DATA_DIR,
System.currentTimeMillis(),
System.currentTimeMillis());
long fetchOffset = 0L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import static org.apache.fluss.record.TestData.DATA1_TABLE_ID;
import static org.apache.fluss.record.TestData.DATA1_TABLE_INFO;
import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH;
import static org.apache.fluss.record.TestData.DEFAULT_REMOTE_DATA_DIR;
import static org.apache.fluss.testutils.DataTestUtils.indexedRow;
import static org.apache.fluss.testutils.DataTestUtils.row;
import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -96,6 +97,7 @@ class RecordAccumulatorTest {
.distributedBy(3)
.property(ConfigOptions.TABLE_LOG_ARROW_COMPRESSION_TYPE.key(), "zstd")
.build(),
DEFAULT_REMOTE_DATA_DIR,
System.currentTimeMillis(),
System.currentTimeMillis());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,44 @@ public class ConfigOptions {
"The directory used for storing the kv snapshot data files and remote log for log tiered storage "
+ " in a Fluss supported filesystem.");

public static final ConfigOption<List<String>> REMOTE_DATA_DIRS =
key("remote.data.dirs")
.stringType()
.asList()
.defaultValues()
.withDescription(
"The directories used for storing the kv snapshot data files and remote log for log tiered storage "
+ " in a Fluss supported filesystem. "
+ "This is a list of remote data directory paths. "
+ "Example: `remote.data.dirs: oss://bucket1/fluss-remote-data, oss://bucket2/fluss-remote-data`.");

public static final ConfigOption<RemoteDataDirStrategy> REMOTE_DATA_DIRS_STRATEGY =
key("remote.data.dirs.strategy")
.enumType(RemoteDataDirStrategy.class)
.defaultValue(RemoteDataDirStrategy.ROUND_ROBIN)
.withDescription(
"The strategy for selecting the remote data directory from `"
+ REMOTE_DATA_DIRS.key()
+ "`.");

public static final ConfigOption<List<Integer>> REMOTE_DATA_DIRS_WEIGHTS =
key("remote.data.dirs.weights")
.intType()
.asList()
.defaultValues()
.withDescription(
"The weights of the remote data directories. "
+ "This is a list of weights corresponding to the `"
+ REMOTE_DATA_DIRS.key()
+ "` in the same order. When `"
+ REMOTE_DATA_DIRS_STRATEGY.key()
+ "` is set to `"
+ RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN
+ "`, this must be configured, and its size must be equal to `"
+ REMOTE_DATA_DIRS.key()
+ "`; otherwise, it will be ignored."
+ "Example: `remote.data.dir.weights: 1, 2`");

public static final ConfigOption<MemorySize> REMOTE_FS_WRITE_BUFFER_SIZE =
key("remote.fs.write-buffer-size")
.memoryType()
Expand Down Expand Up @@ -2066,4 +2104,10 @@ private static class ConfigOptionsHolder {
public static ConfigOption<?> getConfigOption(String key) {
return ConfigOptionsHolder.CONFIG_OPTIONS_BY_KEY.get(key);
}

/** Remote data dir select strategy for Fluss. */
public enum RemoteDataDirStrategy {
ROUND_ROBIN,
WEIGHTED_ROUND_ROBIN
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@

import org.apache.fluss.annotation.Internal;
import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.exception.IllegalConfigurationException;
import org.apache.fluss.fs.FsPath;
import org.apache.fluss.utils.FlussPaths;

import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/** Utilities of Fluss {@link ConfigOptions}. */
@Internal
Expand Down Expand Up @@ -77,4 +81,109 @@ static Map<String, ConfigOption<?>> extractConfigOptions(String prefix) {
}
return options;
}

public static void validateCoordinatorConfigs(Configuration conf) {
validServerConfigs(conf);

validMinValue(conf, ConfigOptions.DEFAULT_REPLICATION_FACTOR, 1);
validMinValue(conf, ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, 1);
validMinValue(conf, ConfigOptions.SERVER_IO_POOL_SIZE, 1);

// Validate remote.data.dirs
List<String> remoteDataDirs = conf.get(ConfigOptions.REMOTE_DATA_DIRS);
for (int i = 0; i < remoteDataDirs.size(); i++) {
String remoteDataDir = remoteDataDirs.get(i);
try {
new FsPath(remoteDataDir);
} catch (Exception e) {
throw new IllegalConfigurationException(
String.format(
"Invalid remote path for %s at index %d.",
ConfigOptions.REMOTE_DATA_DIRS.key(), i),
e);
}
}

// Validate remote.data.dirs.strategy
ConfigOptions.RemoteDataDirStrategy remoteDataDirStrategy =
conf.get(ConfigOptions.REMOTE_DATA_DIRS_STRATEGY);
if (remoteDataDirStrategy == ConfigOptions.RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN) {
List<Integer> weights = conf.get(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS);
if (!remoteDataDirs.isEmpty() && !weights.isEmpty()) {
if (remoteDataDirs.size() != weights.size()) {
throw new IllegalConfigurationException(
String.format(
"The size of '%s' (%d) must match the size of '%s' (%d) when using WEIGHTED_ROUND_ROBIN strategy.",
ConfigOptions.REMOTE_DATA_DIRS.key(),
remoteDataDirs.size(),
ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key(),
weights.size()));
}
// Validate all weights are positive
for (int i = 0; i < weights.size(); i++) {
if (weights.get(i) < 0) {
throw new IllegalConfigurationException(
String.format(
"All weights in '%s' must be no less than 0, but found %d at index %d.",
ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key(),
weights.get(i),
i));
}
}
}
}
}

public static void validateTabletConfigs(Configuration conf) {
validServerConfigs(conf);

Optional<Integer> serverId = conf.getOptional(ConfigOptions.TABLET_SERVER_ID);
if (!serverId.isPresent()) {
throw new IllegalConfigurationException(
String.format("Configuration %s must be set.", ConfigOptions.TABLET_SERVER_ID));
}
validMinValue(ConfigOptions.TABLET_SERVER_ID, serverId.get(), 0);

validMinValue(conf, ConfigOptions.BACKGROUND_THREADS, 1);

if (conf.get(ConfigOptions.LOG_SEGMENT_FILE_SIZE).getBytes() > Integer.MAX_VALUE) {
throw new IllegalConfigurationException(
String.format(
"Invalid configuration for %s, it must be less than or equal %d bytes.",
ConfigOptions.LOG_SEGMENT_FILE_SIZE.key(), Integer.MAX_VALUE));
}
}

/** Validate common server configs. */
private static void validServerConfigs(Configuration conf) {
if (conf.get(ConfigOptions.REMOTE_DATA_DIR) == null) {
throw new IllegalConfigurationException(
String.format("Configuration %s must be set.", ConfigOptions.REMOTE_DATA_DIR));
} else {
// Must validate that remote.data.dir is a valid FsPath
try {
FlussPaths.remoteDataDir(conf);
} catch (Exception e) {
throw new IllegalConfigurationException(
String.format(
"Invalid configuration for %s.",
ConfigOptions.REMOTE_DATA_DIR.key()),
e);
}
}
}

private static void validMinValue(
Configuration conf, ConfigOption<Integer> option, int minValue) {
validMinValue(option, conf.get(option), minValue);
}

private static void validMinValue(ConfigOption<Integer> option, int value, int minValue) {
if (value < minValue) {
throw new IllegalConfigurationException(
String.format(
"Invalid configuration for %s, it must be greater than or equal %d.",
option.key(), minValue));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,31 @@
package org.apache.fluss.metadata;

import org.apache.fluss.annotation.PublicEvolving;
import org.apache.fluss.fs.FsPath;

import java.util.Objects;

/**
* Information of a partition metadata, includes the partition's name and the partition id that
* represents the unique identifier of the partition.
* Information of a partition metadata, includes partition id (unique identifier of the partition),
* partition name, etc.
*
* @since 0.2
*/
@PublicEvolving
public class PartitionInfo {
private final long partitionId;
private final ResolvedPartitionSpec partitionSpec;
private final FsPath remoteDataDir;

public PartitionInfo(long partitionId, ResolvedPartitionSpec partitionSpec) {
this(partitionId, partitionSpec, null);
}

public PartitionInfo(
long partitionId, ResolvedPartitionSpec partitionSpec, FsPath remoteDataDir) {
this.partitionId = partitionId;
this.partitionSpec = partitionSpec;
this.remoteDataDir = remoteDataDir;
}

/** Get the partition id. The id is globally unique in the Fluss cluster. */
Expand All @@ -58,6 +66,10 @@ public PartitionSpec getPartitionSpec() {
return partitionSpec.toPartitionSpec();
}

public FsPath getRemoteDataDir() {
return remoteDataDir;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -67,16 +79,25 @@ public boolean equals(Object o) {
return false;
}
PartitionInfo that = (PartitionInfo) o;
return partitionId == that.partitionId && Objects.equals(partitionSpec, that.partitionSpec);
return partitionId == that.partitionId
&& Objects.equals(partitionSpec, that.partitionSpec)
&& Objects.equals(remoteDataDir, that.remoteDataDir);
}

@Override
public int hashCode() {
return Objects.hash(partitionId, partitionSpec);
return Objects.hash(partitionId, partitionSpec, remoteDataDir);
}

@Override
public String toString() {
return "Partition{name='" + getPartitionName() + '\'' + ", id=" + partitionId + '}';
return "Partition{name='"
+ getPartitionName()
+ '\''
+ ", id="
+ partitionId
+ ", remoteDataDir="
+ remoteDataDir
+ '}';
}
}
Loading
Loading