Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ private void initialization() throws Exception {
jettyServer = new JettyServer(shuffleServerConf);
registerMetrics();

storageManager = StorageManagerFactory.getInstance().createStorageManager(id, shuffleServerConf);
storageManager = StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
storageManager.start();


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public static StorageManagerFactory getInstance() {
return LazyHolder.INSTANCE;
}

public StorageManager createStorageManager(String serverId, ShuffleServerConf conf) {
public StorageManager createStorageManager(ShuffleServerConf conf) {
StorageType type = StorageType.valueOf(conf.get(ShuffleServerConf.RSS_STORAGE_TYPE));
if (StorageType.LOCALFILE.equals(type) || StorageType.MEMORY_LOCALFILE.equals(type)) {
return new LocalStorageManager(conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public void hadoopConfTest() {
shuffleServerConf.setString("rss.server.hadoop.dfs.replication", "2");
shuffleServerConf.setString("rss.server.hadoop.a.b", "value");
StorageManager storageManager =
StorageManagerFactory.getInstance().createStorageManager("shuffleServerId", shuffleServerConf);
StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
ShuffleFlushManager manager =
new ShuffleFlushManager(shuffleServerConf, "shuffleServerId", null, storageManager);
assertEquals("2", manager.getHadoopConf().get("dfs.replication"));
Expand All @@ -97,7 +97,7 @@ public void hadoopConfTest() {
public void writeTest() throws Exception {
String appId = "writeTest_appId";
StorageManager storageManager =
StorageManagerFactory.getInstance().createStorageManager("shuffleServerId", shuffleServerConf);
StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
storageManager.registerRemoteStorage(appId, remoteStorage);
storageManager.registerRemoteStorage(appId, remoteStorage);
String storageHost = "localhost";
Expand Down Expand Up @@ -145,7 +145,7 @@ public void writeTest() throws Exception {
public void complexWriteTest() throws Exception {
shuffleServerConf.setString("rss.server.flush.handler.expired", "3");
StorageManager storageManager =
StorageManagerFactory.getInstance().createStorageManager("shuffleServerId", shuffleServerConf);
StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
String appId = "complexWriteTest_appId";
storageManager.registerRemoteStorage(appId, remoteStorage);
List<ShufflePartitionedBlock> expectedBlocks = Lists.newArrayList();
Expand Down Expand Up @@ -183,7 +183,7 @@ public void complexWriteTest() throws Exception {
@Test
public void clearTest() throws Exception {
StorageManager storageManager =
StorageManagerFactory.getInstance().createStorageManager("shuffleServerId", shuffleServerConf);
StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
String appId1 = "complexWriteTest_appId1";
String appId2 = "complexWriteTest_appId2";
storageManager.registerRemoteStorage(appId1, remoteStorage);
Expand Down Expand Up @@ -240,7 +240,7 @@ public void clearLocalTest(@TempDir File tempDir) throws Exception {
serverConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.LOCALFILE.name());
serverConf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L * 1024L * 1024L);
StorageManager storageManager =
StorageManagerFactory.getInstance().createStorageManager("shuffleServerId", serverConf);
StorageManagerFactory.getInstance().createStorageManager(serverConf);
ShuffleFlushManager manager =
new ShuffleFlushManager(serverConf, "shuffleServerId", null, storageManager);
ShuffleDataFlushEvent event1 =
Expand Down Expand Up @@ -385,7 +385,7 @@ public void processPendingEventsTest(@TempDir File tempDir) {
shuffleServerConf.set(ShuffleServerConf.DISK_CAPACITY, 100L);
shuffleServerConf.set(ShuffleServerConf.PENDING_EVENT_TIMEOUT_SEC, 5L);
StorageManager storageManager =
StorageManagerFactory.getInstance().createStorageManager("shuffleServerId", shuffleServerConf);
StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
ShuffleFlushManager manager =
new ShuffleFlushManager(shuffleServerConf, "shuffleServerId", null, storageManager);
ShuffleDataFlushEvent event = new ShuffleDataFlushEvent(1, "1", 1, 1,1, 100, null, null, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ public void cacheShuffleDataWithPreAllocationTest() {
@Test
public void bufferSizeTest() throws Exception {
ShuffleServer mockShuffleServer = mock(ShuffleServer.class);
StorageManager storageManager = StorageManagerFactory.getInstance().createStorageManager("serverId", conf);
StorageManager storageManager = StorageManagerFactory.getInstance().createStorageManager(conf);
ShuffleFlushManager shuffleFlushManager = new ShuffleFlushManager(conf, "serverId", mockShuffleServer, storageManager);
shuffleBufferManager = new ShuffleBufferManager(conf, shuffleFlushManager);

Expand Down