Skip to content

Commit

Permalink
fix store, and add infinite loader (horovod#2954)
Browse files Browse the repository at this point in the history
* fix store, and infinite loader

Signed-off-by: Peng Zhang <pengz@uber.com>

* fix store, and infinite loader

Signed-off-by: Peng Zhang <pengz@uber.com>

* add test for validation

Signed-off-by: Peng Zhang <pengz@uber.com>
  • Loading branch information
irasit authored Jun 9, 2021
1 parent 3008df3 commit ffcc655
Show file tree
Hide file tree
Showing 10 changed files with 303 additions and 114 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,6 @@ examples/**/checkpoint*

horovod/tensorflow/mpi_lib.so
horovod/torch/test_cuda/
lightning_logs


23 changes: 17 additions & 6 deletions examples/spark/pytorch/pytorch_lightning_spark_mnist.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,18 @@ def forward(self, x):
def configure_optimizers(self):
return optim.SGD(self.parameters(), lr=0.01, momentum=0.5)

def training_step(self, batch, batch_nb):
def training_step(self, batch, batch_idx):
if batch_idx == 0:
print(f"training data batch size: {batch['label'].shape}")
x, y = batch['features'], batch['label']
y_hat = self(x)
loss = F.nll_loss(y_hat, y.long())
self.log('train_loss', loss)
return loss

def validation_step(self, batch, batch_nb):
def validation_step(self, batch, batch_idx):
if batch_idx == 0:
print(f"validation data batch size: {batch['label'].shape}")
x, y = batch['features'], batch['label']
y_hat = self(x)
loss = F.nll_loss(y_hat, y.long())
Expand All @@ -139,6 +143,7 @@ class MyDummyCallback(Callback):
def __init__(self):
self.epcoh_end_counter = 0
self.train_epcoh_end_counter = 0
self.validation_epoch_end_counter = 0

def on_init_start(self, trainer):
print('Starting to init trainer!')
Expand All @@ -154,11 +159,17 @@ def on_train_epoch_end(self, trainer, model, unused=None):
print('A train epoch ended.')
self.train_epcoh_end_counter += 1

def on_validation_epoch_end(self, trainer, model, unused=None):
print('A val epoch ended.')
self.validation_epoch_end_counter += 1

def on_train_end(self, trainer, model):
print("Training ends:"
f"self.epcoh_end_counter={self.epcoh_end_counter}, "
f"self.train_epcoh_end_counter={self.train_epcoh_end_counter}")
assert self.train_epcoh_end_counter == epochs
f"epcoh_end_counter={self.epcoh_end_counter}, "
f"train_epcoh_end_counter={self.train_epcoh_end_counter}, "
f"validation_epoch_end_counter={self.validation_epoch_end_counter} \n")
assert self.train_epcoh_end_counter <= epochs
assert self.epcoh_end_counter == self.train_epcoh_end_counter + self.validation_epoch_end_counter

callbacks = [MyDummyCallback()]

Expand All @@ -179,9 +190,9 @@ def on_train_end(self, trainer, model):
input_shapes=[[-1, 1, 28, 28]],
feature_cols=['features'],
label_cols=['label'],
validation=0.1,
batch_size=args.batch_size,
epochs=args.epochs,
validation=0.1,
verbose=1,
callbacks=callbacks)

Expand Down
35 changes: 23 additions & 12 deletions examples/spark/pytorch/pytorch_lightning_spark_mnist_legacy.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
import argparse
import os
import subprocess
import sys
from distutils.version import LooseVersion

import numpy as np
from distutils.version import LooseVersion

import pyspark
import pyspark.sql.types as T
from pyspark import SparkConf
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import OneHotEncoderEstimator
if LooseVersion(pyspark.__version__) < LooseVersion('3.0.0'):
from pyspark.ml.feature import OneHotEncoderEstimator as OneHotEncoder
else:
from pyspark.ml.feature import OneHotEncoder
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf

Expand All @@ -18,15 +23,16 @@

import horovod.spark.lightning as hvd
from horovod.spark.lightning.estimator import MIN_PL_VERSION
from horovod.spark.common.backend import SparkBackend
from horovod.spark.common.store import Store

parser = argparse.ArgumentParser(description='Keras Spark MNIST Example',
parser = argparse.ArgumentParser(description='PyTorch Spark MNIST Example',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--master',
help='spark master to connect to')
parser.add_argument('--num-proc', type=int,
help='number of worker processes for training, default: `spark.default.parallelism`')
parser.add_argument('--batch-size', type=int, default=128,
parser.add_argument('--batch-size', type=int, default=64,
help='input batch size for training')
parser.add_argument('--epochs', type=int, default=12,
help='number of epochs to train')
Expand All @@ -44,7 +50,7 @@ def train_model(args):
return

# Initialize SparkSession
conf = SparkConf().setAppName('keras_spark_mnist').set('spark.sql.shuffle.partitions', '16')
conf = SparkConf().setAppName('pytorch_spark_mnist').set('spark.sql.shuffle.partitions', '16')
if args.master:
conf.setMaster(args.master)
elif args.num_proc:
Expand All @@ -66,9 +72,9 @@ def train_model(args):
.load(libsvm_path)

# One-hot encode labels into SparseVectors
encoder = OneHotEncoderEstimator(inputCols=['label'],
outputCols=['label_vec'],
dropLast=False)
encoder = OneHotEncoder(inputCols=['label'],
outputCols=['label_vec'],
dropLast=False)
model = encoder.fit(df)
train_df = model.transform(df)

Expand All @@ -85,29 +91,33 @@ def __init__(self):
self.fc1 = nn.Linear(320, 50)
self.fc2 = nn.Linear(50, 10)

def forward(self, x):
x = x.float()
def forward(self, features):
x = features.float()
x = F.relu(F.max_pool2d(self.conv1(x), 2))
x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
x = x.view(-1, 320)
x = F.relu(self.fc1(x))
x = F.dropout(x, training=self.training)
x = self.fc2(x)
return F.log_softmax(x)
return F.log_softmax(x, -1)

model = Net()
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.5)
loss = nn.NLLLoss()

# Train a Horovod Spark Estimator on the DataFrame
torch_estimator = hvd.TorchEstimator(num_proc=args.num_proc,
backend = SparkBackend(num_proc=args.num_proc,
stdout=sys.stdout, stderr=sys.stderr,
prefix_output_with_timestamp=True)
torch_estimator = hvd.TorchEstimator(backend=backend,
store=store,
model=model,
optimizer=optimizer,
loss=lambda input, target: loss(input, target.long()),
input_shapes=[[-1, 1, 28, 28]],
feature_cols=['features'],
label_cols=['label'],
validation=0.1,
batch_size=args.batch_size,
epochs=args.epochs,
verbose=1)
Expand All @@ -116,6 +126,7 @@ def forward(self, x):

# Evaluate the model on the held-out test DataFrame
pred_df = torch_model.transform(test_df)

argmax = udf(lambda v: float(np.argmax(v)), returnType=T.DoubleType())
pred_df = pred_df.withColumn('label_pred', argmax(pred_df.label_prob))
evaluator = MulticlassClassificationEvaluator(predictionCol='label_pred', labelCol='label', metricName='accuracy')
Expand Down
7 changes: 3 additions & 4 deletions examples/spark/pytorch/pytorch_spark_mnist.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@
# Train/test split
train_df, test_df = train_df.randomSplit([0.9, 0.1])


# Define the PyTorch model without any Horovod-specific parameters
class Net(nn.Module):
def __init__(self):
Expand All @@ -86,8 +85,8 @@ def __init__(self):
self.fc1 = nn.Linear(320, 50)
self.fc2 = nn.Linear(50, 10)

def forward(self, x):
x = x.float()
def forward(self, features):
x = features.float()
x = F.relu(F.max_pool2d(self.conv1(x), 2))
x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
x = x.view(-1, 320)
Expand All @@ -96,7 +95,6 @@ def forward(self, x):
x = self.fc2(x)
return F.log_softmax(x)


model = Net()
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.5)
loss = nn.NLLLoss()
Expand All @@ -115,6 +113,7 @@ def forward(self, x):
label_cols=['label'],
batch_size=args.batch_size,
epochs=args.epochs,
validation=0.1,
verbose=1)

torch_model = torch_estimator.fit(train_df).setOutputCols(['label_prob'])
Expand Down
2 changes: 1 addition & 1 deletion horovod/spark/common/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ def get_simple_meta_from_parquet(store, label_columns, feature_columns, sample_w

train_data_meta_path = store.get_data_metadata_path(train_data_path)
val_data_meta_path = store.get_data_metadata_path(validation_data_path)
fs = store.get_filesystem()
fs = store.fs

schema_cols = feature_columns + label_columns
if sample_weight_col:
Expand Down
66 changes: 61 additions & 5 deletions horovod/spark/data_loaders/pytorch_data_loaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,26 @@


class PytorchDataLoader(BaseDataLoader):
def __init__(self, reader, batch_size, shuffling_queue_capacity):
def __init__(self, reader, batch_size, shuffling_queue_capacity, name="",
limit_step_per_epoch=-1, verbose=False):
self.reader = reader
self.batch_size = batch_size
self.shuffling_queue_capacity = shuffling_queue_capacity
print(f"Initializing petastorm dataloader with batch_size {batch_size}"
f" and shuffling_queue_capacity {shuffling_queue_capacity}")
self.limit_step_per_epoch = limit_step_per_epoch
self.name = name
self.verbose = verbose

print(f"[{self.name}]: Initializing petastorm dataloader with batch_size={batch_size}"
f"shuffling_queue_capacity={shuffling_queue_capacity}, "
f"limit_step_per_epoch={limit_step_per_epoch}")

def __len__(self):
return len(self.reader)
return self.limit_step_per_epoch if self.limit_step_per_epoch != -1 else len(self.reader)

def _iterate(self):
# Reset the reader if needed.
if self.reader.last_row_consumed:
print(f"Resetting Petastorm reader for {self.reader.dataset.paths}")
self._print_verbose(f"[{self.name}]: Resetting Petastorm reader for {self.reader.dataset.paths}")
self.reader.reset()

# Re-create the data loader for each iteration. This is needed becasue there may be
Expand All @@ -44,10 +50,60 @@ def _iterate(self):
shuffling_queue_capacity=self.shuffling_queue_capacity,
)

num_steps = 0

self._print_verbose(f"[{self.name}]: Start to generate batch data. limit_step_per_epoch={self.limit_step_per_epoch}")

for batch in data_loader:
if num_steps == self.limit_step_per_epoch:
self._print_verbose(f"[{self.name}]: Reach limit_step_per_epoch. Stop at step {num_steps}.")
break

num_steps += 1
yield batch

def _print_verbose(self, *args, **kwargs):
if self.verbose:
print(*args, **kwargs)


class PytorchAsyncDataLoader(AsyncDataLoaderMixin, PytorchDataLoader):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)


class PytorchInfiniteDataLoader(PytorchDataLoader):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

if self.reader.num_epochs is not None:
raise ValueError("Need to set num_epochs as None in reader.")

self.data_loader = BatchedDataLoader(
self.reader,
batch_size=self.batch_size,
shuffling_queue_capacity=self.shuffling_queue_capacity)
self.iterater = iter(self.data_loader)

def _iterate(self):
num_steps = 0
self._print_verbose(f"[{self.name}]: Start to generate batch data. limit_step_per_epoch={self.limit_step_per_epoch}")

while True:
if num_steps == self.limit_step_per_epoch:
self._print_verbose(f"[{self.name}]: Reach limit_step_per_epoch. Stop at step {num_steps}.")
break
num_steps += 1

yield next(self.iterater)


class PytorchInfiniteAsyncDataLoader(AsyncDataLoaderMixin, PytorchInfiniteDataLoader):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)


class PetastormBatchedDataLoader(BatchedDataLoader):
def __init__(self, name="", limit_step_per_epoch=-1, verbose=False, *args, **kwargs):
print(f"[{name}]Petastorm BatchedDataLoader will ignore limit_step_per_epoch and verbose.")
super().__init__(*args, **kwargs)
2 changes: 1 addition & 1 deletion horovod/spark/lightning/estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ class TorchEstimator(HorovodEstimator, TorchEstimatorParamsWritable,
either 1/True or 0/False, or validation split (float) giving percent of data
to be randomly selected for validation.
validation_steps_per_epoch: (Optional) Number of validation steps to perform each epoch.
verbose: (Optional)Verbosity level [0, 2] (default: 1).
verbose: (Optional)Verbosity level, 0 for silent. (default: 1).
"""

input_shapes = Param(Params._dummy(), 'input_shapes', 'input layer shapes')
Expand Down
4 changes: 2 additions & 2 deletions horovod/spark/lightning/legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ def __init__(self):
super().__init__()
self._model = model

def forward(self, **kwargs):
return self._model(**kwargs)
def forward(self, *args, **kwargs):
return self._model(*args, **kwargs)

def configure_optimizers(self):
# Optimizer object needs to be re-instantiated. Internally, it uses memory addresses of
Expand Down
Loading

0 comments on commit ffcc655

Please sign in to comment.