Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ deploy/kubernetes/docker/hadoopconfig/*
*.dll
*.so
*.dylib
local
vendor
VERSION
testbin/*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.common.util;

import java.lang.reflect.Constructor;
import java.util.stream.Stream;

import org.apache.commons.lang3.tuple.Pair;

import org.apache.uniffle.common.exception.RssException;

public class ClassUtils {

@SuppressWarnings("unchecked")
public static <T> T instantiate(Class<T> clazz, Pair<Class<T>, Object>... typeAndVals)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We seems have many similar places like the some strategies class. Could we unify them?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throws RssException {
try {
if (typeAndVals == null || typeAndVals.length == 0) {
return clazz.getConstructor().newInstance();
}
Class<T>[] types = Stream.of(typeAndVals).map(x -> x.getLeft()).toArray(Class[]::new);
Constructor<T> constructor = clazz.getConstructor(types);
return constructor.newInstance(
Stream.of(typeAndVals).map(x -> x.getRight()).toArray(Object[]::new)
);
} catch (Exception e) {
throw new RssException(e);
}
}
}
1 change: 1 addition & 0 deletions docs/server_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ This document will introduce how to deploy Uniffle shuffle servers.
| rss.server.max.concurrency.of.single.partition.writer | 1 | The max concurrency of single partition writer, the data partition file number is equal to this value. Default value is 1. This config could improve the writing speed, especially for huge partition. |
| rss.metrics.reporter.class | - | The class of metrics reporter. |
|rss.server.multistorage.manager.selector.class | org.apache.uniffle.server.storage.multi.DefaultStorageManagerSelector | The manager selector strategy for `MEMORY_LOCALFILE_HDFS`. Default value is `DefaultStorageManagerSelector`, and another `HugePartitionSensitiveStorageManagerSelector` will flush only huge partition's data to cold storage. |
|rss.server.localstorage.storage.choosing.policy.class|org.apache.uniffle.server.storage.local.HashStorageChoosingPolicy|For localstorage, the storage choosing policy is for per-partition. Default value is the hash-based disk selector.|
Comment on lines 93 to +94
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these two configurations seems related. Do you think it's possible to merge these configurations in next release, so users don't have to be confused by too many configurations.


### Advanced Configurations
|Property Name|Default| Description |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,13 @@ public class ShuffleServerConf extends RssBaseConf {
.defaultValue(0L)
.withDescription("For multistorage, fail times exceed the number, will switch storage");

public static final ConfigOption<String> LOCAL_STORAGE_CHOOSING_POLICY = ConfigOptions
.key("rss.server.localstorage.storage.choosing.policy.class")
.stringType()
.defaultValue("org.apache.uniffle.server.storage.local.HashStorageChoosingPolicy")
.withDescription("For localstorage, the storage choosing policy is for per-partition. "
+ "Default value is the hash-based disk selector.");

public static final ConfigOption<List<String>> TAGS = ConfigOptions
.key("rss.server.tags")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.uniffle.common.storage.StorageInfo;
import org.apache.uniffle.common.storage.StorageMedia;
import org.apache.uniffle.common.storage.StorageStatus;
import org.apache.uniffle.common.util.ClassUtils;
import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.common.util.ThreadUtils;
Expand All @@ -62,6 +63,7 @@
import org.apache.uniffle.server.event.AppPurgeEvent;
import org.apache.uniffle.server.event.PurgeEvent;
import org.apache.uniffle.server.event.ShufflePurgeEvent;
import org.apache.uniffle.server.storage.local.StorageChoosingPolicy;
import org.apache.uniffle.storage.common.LocalStorage;
import org.apache.uniffle.storage.common.Storage;
import org.apache.uniffle.storage.common.StorageMediaProvider;
Expand All @@ -71,6 +73,7 @@
import org.apache.uniffle.storage.util.ShuffleStorageUtils;
import org.apache.uniffle.storage.util.StorageType;

import static org.apache.uniffle.server.ShuffleServerConf.LOCAL_STORAGE_CHOOSING_POLICY;
import static org.apache.uniffle.server.ShuffleServerConf.LOCAL_STORAGE_INITIALIZE_MAX_FAIL_NUMBER;

public class LocalStorageManager extends SingleStorageManager {
Expand All @@ -84,6 +87,8 @@ public class LocalStorageManager extends SingleStorageManager {
private final Map<String, LocalStorage> partitionsOfStorage;
private final List<StorageMediaProvider> typeProviders = Lists.newArrayList();

private final StorageChoosingPolicy<LocalStorage> storageChoosingPolicy;

@VisibleForTesting
LocalStorageManager(ShuffleServerConf conf) {
super(conf);
Expand Down Expand Up @@ -155,6 +160,18 @@ public class LocalStorageManager extends SingleStorageManager {
StringUtils.join(localStorages.stream().map(LocalStorage::getBasePath).collect(Collectors.toList()))
);
this.checker = new LocalStorageChecker(conf, localStorages);
this.storageChoosingPolicy = initStorageChoosingPolicy(conf);
}

private StorageChoosingPolicy<LocalStorage> initStorageChoosingPolicy(ShuffleServerConf conf) {
try {
String className = conf.get(LOCAL_STORAGE_CHOOSING_POLICY);
Class<StorageChoosingPolicy<LocalStorage>> clz =
(Class<StorageChoosingPolicy<LocalStorage>>) Class.forName(className);
return ClassUtils.instantiate(clz);
} catch (Exception e) {
throw new RssException(e);
}
}

private StorageMedia getStorageTypeForBasePath(String basePath) {
Expand Down Expand Up @@ -184,19 +201,8 @@ public Storage selectStorage(ShuffleDataFlushEvent event) {
return storage;
}
}

List<LocalStorage> candidates = localStorages
.stream()
.filter(x -> x.canWrite() && !x.isCorrupted())
.collect(Collectors.toList());
final LocalStorage selectedStorage = candidates.get(
ShuffleStorageUtils.getStorageIndex(
candidates.size(),
appId,
shuffleId,
partitionId
)
);
final LocalStorage selectedStorage =
storageChoosingPolicy.choose(event, localStorages.stream().toArray(LocalStorage[]::new));
return partitionsOfStorage.compute(
UnionKey.buildKey(appId, shuffleId, partitionId),
(key, localStorage) -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.server.storage.local;

import java.math.BigDecimal;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

import org.apache.uniffle.server.ShuffleDataFlushEvent;
import org.apache.uniffle.storage.common.LocalStorage;

public class AvailableSpaceStorageChoosingPolicy implements StorageChoosingPolicy<LocalStorage> {

@Override
public LocalStorage choose(ShuffleDataFlushEvent event, LocalStorage... storages) {
final List<LocalStorage> candidates = Arrays.stream(storages)
.filter(x -> x.canWrite() && !x.isCorrupted())
.collect(Collectors.toList());

if (candidates.size() == 0) {
return null;
}
Comment on lines +32 to +38
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this method should be moved to interface's default method, such as getDefaultCandidates?


candidates.sort((s1, s2) -> {
BigDecimal s1UsedRatio = BigDecimal.valueOf(s1.getDiskSize()).divide(BigDecimal.valueOf(s1.getCapacity()));
BigDecimal s2UsedRatio = BigDecimal.valueOf(s2.getDiskSize()).divide(BigDecimal.valueOf(s2.getCapacity()));
return s1UsedRatio.compareTo(s2UsedRatio);
});

return candidates.get(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

max rather than sort then get first?

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.server.storage.local;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

import org.apache.uniffle.server.ShuffleDataFlushEvent;
import org.apache.uniffle.storage.common.LocalStorage;
import org.apache.uniffle.storage.util.ShuffleStorageUtils;

public class HashStorageChoosingPolicy implements StorageChoosingPolicy<LocalStorage> {

@Override
public LocalStorage choose(ShuffleDataFlushEvent event, LocalStorage... storages) {
List<LocalStorage> candidates = Arrays.stream(storages)
.filter(x -> x.canWrite() && !x.isCorrupted())
.collect(Collectors.toList());

if (candidates.size() == 0) {
return null;
}

String appId = event.getAppId();
int shuffleId = event.getShuffleId();
int partitionId = event.getStartPartition();
final LocalStorage selectedStorage = candidates.get(
ShuffleStorageUtils.getStorageIndex(
candidates.size(),
appId,
shuffleId,
partitionId
)
);
return selectedStorage;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.server.storage.local;

import org.apache.uniffle.server.ShuffleDataFlushEvent;
import org.apache.uniffle.storage.common.Storage;

public interface StorageChoosingPolicy<T extends Storage> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the difference between policy and strategy? Could we have a unify style?


T choose(ShuffleDataFlushEvent event, T... candidates);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we have better name? What's the difference between choose and select? Should we have a unify style?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we use array as parameter? Collection seems better than array according to the book <>.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For strategy, we should add some Java docs to explain the function of this interface and tell other developer how to extend this interface.


}
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,49 @@ public void testStorageSelectionWhenReachingHighWatermark() {
assertNotEquals(storage1, storage2);
}


@Test
public void testPluggableStorageSelection() {
String[] storagePaths = {"/tmp/rss-data1", "/tmp/rss-data2", "/tmp/rss-data3"};

ShuffleServerConf conf = new ShuffleServerConf();
conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(storagePaths));
conf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L);
conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, org.apache.uniffle.storage.util.StorageType.LOCALFILE.name());
conf.setString(
ShuffleServerConf.LOCAL_STORAGE_CHOOSING_POLICY,
"org.apache.uniffle.server.storage.local.AvailableSpaceStorageChoosingPolicy"
);
LocalStorageManager localStorageManager = new LocalStorageManager(conf);

List<LocalStorage> storages = localStorageManager.getStorages();
assertNotNull(storages);

LocalStorage s1 = storages.get(0);
LocalStorage s2 = storages.get(1);
LocalStorage s3 = storages.get(2);


// case1
s1.setDiskSize(300);
s2.setDiskSize(200);
s3.setDiskSize(500);
String appId = "testPluggableStorageSelection";
ShuffleDataFlushEvent event1 = toDataFlushEvent(appId, 1, 1);
Storage storage = localStorageManager.selectStorage(event1);
assertEquals(s2, storage);

// case2
s2.markCorrupted();
storage = localStorageManager.selectStorage(event1);
assertEquals(s1, storage);

// case3
s1.markCorrupted();
storage = localStorageManager.selectStorage(event1);
assertEquals(s3, storage);
}

@Test
public void testStorageSelection() {
String[] storagePaths = {"/tmp/rss-data1", "/tmp/rss-data2", "/tmp/rss-data3"};
Expand Down
Loading