Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SYSTEMDS-2951] Multi-GPU Support for End-to-End ML Pipelines #2050

Open
wants to merge 25 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
d02782b
fix: multiple GPUs cache error
WDRshadow Jul 13, 2024
eb77efc
fix: mismatch of ParForProgramBlock._numThreads
WDRshadow Jul 13, 2024
6a21cd8
update: mismatch of ParForProgramBlock._numThreads
WDRshadow Jul 14, 2024
e8f79ac
Add Example-ResNet.dml
Jul 15, 2024
99d5c01
update: MultiGPUTest (draft)
WDRshadow Jul 16, 2024
c56144a
Commit MultiGPUTest
Jul 17, 2024
25bde5d
Merge branch 'main' of https://github.com/WDRshadow/systemds
Jul 17, 2024
9ea0f02
Merge branch 'main' into SYSTEMDS-2951-dev
Jul 17, 2024
9ed077c
update: delete unnecessary codes
WDRshadow Jul 17, 2024
9b67303
update: delete unnecessary codes 2
WDRshadow Jul 17, 2024
18ffb44
update: MultiGPUTest
WDRshadow Jul 17, 2024
46cfbc1
update: initialize SingleGPUTest available gpu
WDRshadow Jul 18, 2024
62a0465
update: Tests for multi-GPU completed
WDRshadow Jul 18, 2024
ea9fce8
update: add _numTasks check for test
WDRshadow Jul 18, 2024
0c6a950
update: delete _numThreads check.
WDRshadow Jul 18, 2024
28c7cac
new GPUTest
Jul 19, 2024
a35c8bd
Update: SingleGPUTest
Jul 19, 2024
d6b2d44
update: delete unnecessary codes 3
WDRshadow Jul 19, 2024
729200d
Update: Test batch size from 100K to 500K, print the all the time.
Jul 19, 2024
0f01ff5
update: delete unnecessary codes 4
WDRshadow Jul 20, 2024
cc136d8
update: modified the test instances
WDRshadow Jul 20, 2024
7cf12a6
update: modified the test instances
Jul 19, 2024
f44fda6
Merge remote-tracking branch 'origin/SYSTEMDS-2951-dev-batch' into SY…
WDRshadow Jul 20, 2024
9001604
Update: GPUTest.dml add time counter
Jul 21, 2024
cf08084
Update: write and read the model, train and predict perspectively
KexingLi22 Jul 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,10 @@
private void executeLocalParFor( ExecutionContext ec, IntObject from, IntObject to, IntObject incr )
throws InterruptedException
{
if (DMLScript.USE_ACCELERATOR) {
_numThreads = Math.min(_numThreads, ec.getNumGPUContexts());

Check warning on line 742 in src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java#L742

Added line #L742 was not covered by tests
}

LOG.trace("Local Par For (multi-threaded) with degree of parallelism : " + _numThreads);
/* Step 1) init parallel workers, task queue and threads
* start threads (from now on waiting for tasks)
Expand Down Expand Up @@ -808,6 +812,12 @@
LineageCacheConfig.setReuseLineageTraces(false); //disable lineage trace reuse
for( Thread thread : threads )
thread.join();

if (DMLScript.USE_ACCELERATOR) {
for(LocalParWorker worker : workers) {
LOG.trace("The worker of GPU " + worker.getExecutionContext().getGPUContext(0).toString() + " has executed " + worker.getExecutedTasks() + " tasks.");

Check warning on line 818 in src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java#L818

Added line #L818 was not covered by tests
}
}

if( _monitor )
StatisticMonitor.putPFStat(_ID, Stat.PARFOR_WAIT_EXEC_T, time.stop());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import java.io.File;
import java.io.IOException;
import java.lang.ref.SoftReference;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.commons.lang3.mutable.MutableBoolean;
Expand Down Expand Up @@ -214,7 +214,7 @@
//for lazily evaluated RDDs, and (2) as abstraction for environments that do not necessarily have spark libraries available
private RDDObject _rddHandle = null; //RDD handle
private BroadcastObject<T> _bcHandle = null; //Broadcast handle
protected HashMap<GPUContext, GPUObject> _gpuObjects = null; //Per GPUContext object allocated on GPU
protected ConcurrentHashMap<GPUContext, GPUObject> _gpuObjects = null; //Per GPUContext object allocated on GPU

private LineageItem _lineage = null;

Expand All @@ -229,7 +229,7 @@
_uniqueID = _seq.getNextID();
_cacheStatus = CacheStatus.EMPTY;
_numReadThreads = 0;
_gpuObjects = DMLScript.USE_ACCELERATOR ? new HashMap<>() : null;
_gpuObjects = DMLScript.USE_ACCELERATOR ? new ConcurrentHashMap<>() : null;
WDRshadow marked this conversation as resolved.
Show resolved Hide resolved
}

/**
Expand Down Expand Up @@ -472,7 +472,7 @@

public synchronized void setGPUObject(GPUContext gCtx, GPUObject gObj) {
if( _gpuObjects == null )
_gpuObjects = new HashMap<>();
_gpuObjects = new ConcurrentHashMap<>();

Check warning on line 475 in src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java#L475

Added line #L475 was not covered by tests
GPUObject old = _gpuObjects.put(gCtx, gObj);
if (old != null)
throw new DMLRuntimeException("GPU : Inconsistent internal state - this CacheableData already has a GPUObject assigned to the current GPUContext (" + gCtx + ")");
Expand Down
29 changes: 29 additions & 0 deletions src/test/config/SystemDS-SingleGPU-config.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<!--
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
-->

<root>
<!-- The number of theads for the spark instance artificially selected-->
<sysds.local.spark.number.threads>2</sysds.local.spark.number.threads>
<!-- The timeout of the federated tests to initialize the federated matrixes -->
<sysds.federated.initialization.timeout>2</sysds.federated.initialization.timeout>
<!-- The timeout of each instruction sent to federated workers -->
<sysds.federated.timeout>128</sysds.federated.timeout>
<!-- sets the GPUs to use per process, -1 for all GPUs, a specific GPU number (5), a range (eg: 0-2) or a comma separated list (eg: 0,2,4)-->
<sysds.gpu.availableGPUs>0</sysds.gpu.availableGPUs>
</root>
165 changes: 165 additions & 0 deletions src/test/java/org/apache/sysds/test/gpu/multigpu/GPUTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.sysds.test.gpu.multigpu;

import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.spi.LoggingEvent;
import org.apache.sysds.runtime.controlprogram.ParForProgramBlock;
import org.apache.sysds.test.AutomatedTestBase;
import org.apache.sysds.test.TestConfiguration;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public abstract class GPUTest extends AutomatedTestBase {
protected static final String TEST_DIR = "gpu/";
protected static final String TEST_CLASS_DIR = TEST_DIR + MultiGPUTest.class.getSimpleName() + "/";
protected static final String SINGLE_GPU_TEST = "SingleGPUTest";
protected static final String MULTI_GPUS_TEST = "MultiGPUsTest";
protected static final String TEST_NAME = "InferenceScript";
protected static final String TRAIN_SCRIPT = "TrainScript";
protected static final String DATA_SET = DATASET_DIR + "MNIST/mnist_test.csv";
protected static final String SINGLE_TEST_CONFIG = CONFIG_DIR + "SystemDS-SingleGPU-config.xml";
protected static final String MULTI_TEST_CONFIG = CONFIG_DIR + "SystemDS-config.xml";

@Override
public void setUp() {
TEST_GPU = true;
VERBOSE_STATS = true;
addTestConfiguration(SINGLE_GPU_TEST,
new TestConfiguration(TEST_CLASS_DIR, SINGLE_GPU_TEST, new String[] { "R" }));
addTestConfiguration(MULTI_GPUS_TEST,
new TestConfiguration(TEST_CLASS_DIR, MULTI_GPUS_TEST, new String[] { "R" }));
}

/**
* Run the test with multiple GPUs
*
* @param multiGPUs whether to run the test with multiple GPUs
*/
protected void runMultiGPUsTest(boolean multiGPUs, int numTestImages) {
getAndLoadTestConfiguration(multiGPUs ? MULTI_GPUS_TEST : SINGLE_GPU_TEST);

String HOME = SCRIPT_DIR + TEST_DIR;
fullDMLScriptName = HOME + TEST_NAME + ".dml";
programArgs = new String[] { "-args", DATA_SET, output("R"), Integer.toString(numTestImages), "-config",
multiGPUs ? MULTI_TEST_CONFIG : SINGLE_TEST_CONFIG };
fullRScriptName = HOME + TEST_NAME + ".R";

rCmd = null;
InMemoryAppender appender = configureLog4j();

runTest(true, false, null, -1);

List<String> logs = appender.getLogMessages();
int numRealThread = 0;
for (String log : logs) {
if (log.contains("has executed") && extractNumTasks(log) > 0) {
numRealThread ++;
}
}
if (multiGPUs) {
assertTrue(numRealThread > 1);
} else {
assertEquals(1, numRealThread);
}

appender.clearLogMessages();
}

/**
* Run the training script
*/
protected void runTrainingScript(boolean multiGPUs, int numTestImages) {
getAndLoadTestConfiguration(multiGPUs ? MULTI_GPUS_TEST : SINGLE_GPU_TEST);

String HOME = SCRIPT_DIR + TEST_DIR;
fullDMLScriptName = HOME + TRAIN_SCRIPT + ".dml";
programArgs = new String[] { "-args", DATA_SET, output("R"), Integer.toString(numTestImages), "-config",
multiGPUs ? MULTI_TEST_CONFIG : SINGLE_TEST_CONFIG };
fullRScriptName = HOME + TEST_NAME + ".R";

rCmd = null;
InMemoryAppender appender = configureLog4j();

runTest(true, false, null, -1);
}

protected static InMemoryAppender configureLog4j() {
Logger rootLogger = Logger.getRootLogger();
rootLogger.setLevel(Level.ERROR);

Logger logger = Logger.getLogger(ParForProgramBlock.class.getName());
logger.setLevel(Level.TRACE);

InMemoryAppender inMemoryAppender = new InMemoryAppender();
inMemoryAppender.setThreshold(Level.TRACE);
logger.addAppender(inMemoryAppender);

return inMemoryAppender;
}

protected static int extractNumTasks(String logMessage) {
String regex = "has executed (\\d+) tasks";
Pattern pattern = Pattern.compile(regex);
Matcher matcher = pattern.matcher(logMessage);
if (matcher.find()) {
return Integer.parseInt(matcher.group(1));
}
throw new IllegalArgumentException("No _numTasks value found in log message");
}

protected static class InMemoryAppender extends AppenderSkeleton {

protected final List<String> logMessages = new ArrayList<>();

@Override
protected void append(LoggingEvent event) {
if (event.getLevel().isGreaterOrEqual(Level.TRACE)) {
logMessages.add(event.getRenderedMessage());
}
}

@Override
public void close() {
// No resources to release
}

@Override
public boolean requiresLayout() {
return false;
}

public List<String> getLogMessages() {
return new ArrayList<>(logMessages);
}

public void clearLogMessages() {
logMessages.clear();
}
}
}
82 changes: 82 additions & 0 deletions src/test/java/org/apache/sysds/test/gpu/multigpu/MultiGPUTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.sysds.test.gpu.multigpu;

import org.junit.AfterClass;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;

import java.util.ArrayList;
import java.util.List;

@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class MultiGPUTest extends GPUTest {

private static List<Double> executionTimes = new ArrayList<>();

@Test
public void test01_gpuTest_10k() {
runMultiGPUsTest(true, 10000);
}

@Test
public void test01_gpuTest_20k() {
runMultiGPUsTest(true, 20000);
}

@Test
public void test01_gpuTest_50k() {
runMultiGPUsTest(true, 50000);
}

@Test
public void test01_gpuTest_100k() {
runMultiGPUsTest(true, 100000);
}

@Test
public void test01_gpuTest_200k() {
runMultiGPUsTest(true, 200000);
}

@Test
public void test01_gpuTest_500k() {
runMultiGPUsTest(true, 500000);
}

@Override
protected void runMultiGPUsTest(boolean multiGPUs, int numTestImages) {
long startTime = System.nanoTime();
super.runMultiGPUsTest(multiGPUs, numTestImages);
long endTime = System.nanoTime();
double executionTime = (endTime - startTime) / 1e9;
executionTimes.add(executionTime);
}

@AfterClass
public static void printExecutionTimes() {
System.out.println("Execution times for each test:");
for (int i = 0; i < executionTimes.size(); i++) {
System.out.printf("Test %d: %.3f sec\n", i + 1, executionTimes.get(i));
}
}
}

Loading
Loading