Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for Spark SQL #16

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 138 additions & 5 deletions runner/prepare_benchmark.py
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#!/usr/bin/env python
#
# Copyright 2013 The Regents of The University California
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -44,6 +46,8 @@ def parse_args():

parser.add_option("-m", "--impala", action="store_true", default=False,
help="Whether to include Impala")
parser.add_option("--spark", action="store_true", default=False,
help="Whether to include spark")
parser.add_option("-s", "--shark", action="store_true", default=False,
help="Whether to include Shark")
parser.add_option("-r", "--redshift", action="store_true", default=False,
Expand All @@ -57,6 +61,8 @@ def parse_args():

parser.add_option("-a", "--impala-host",
help="Hostname of Impala state store node")
parser.add_option("--spark-host",
help="Hostname of spark master node")
parser.add_option("-b", "--shark-host",
help="Hostname of Shark master node")
parser.add_option("-c", "--redshift-host",
Expand All @@ -68,6 +74,8 @@ def parse_args():

parser.add_option("-x", "--impala-identity-file",
help="SSH private key file to use for logging into Impala node")
parser.add_option("--spark-identity-file",
help="SSH private key file to use for logging into spark node")
parser.add_option("-y", "--shark-identity-file",
help="SSH private key file to use for logging into Shark node")
parser.add_option("--hive-identity-file",
Expand All @@ -93,9 +101,16 @@ def parse_args():
parser.add_option("--skip-s3-import", action="store_true", default=False,
help="Assumes s3 data is already loaded")

parser.add_option("--skip-uploads", action="store_true", default=False, help="skip uploading files to the cluster (hive config & python UDF. Turn this on if you want to get the files into the right place yourself.")

parser.add_option("--dryrun", action="store_true", default=False, help="print commands locally instead of executing over ssh")

parser.add_option("--datapath", default = "s3n://big-data-benchmark/pavlo/",
help="prefix path for input data")

(opts, args) = parser.parse_args()

if not (opts.impala or opts.shark or opts.redshift or opts.hive or opts.hive_tez or opts.hive_cdh):
if not (opts.impala or opts.shark or opts.spark or opts.redshift or opts.hive or opts.hive_tez or opts.hive_cdh):
parser.print_help()
sys.exit(1)

Expand All @@ -112,6 +127,14 @@ def parse_args():
print >> stderr, "Impala requires identity file, hostname, and AWS creds"
sys.exit(1)

if opts.spark and (opts.spark_identity_file is None or
opts.spark_host is None or
opts.aws_key_id is None or
opts.aws_key is None):
print >> stderr, \
"Spark requires identity file, spark hostname, and AWS credentials"
sys.exit(1)

if opts.shark and (opts.shark_identity_file is None or
opts.shark_host is None or
opts.aws_key_id is None or
Expand All @@ -134,20 +157,21 @@ def parse_args():

# Run a command on a host through ssh, throwing an exception if ssh fails
def ssh(host, username, identity_file, command):
print("ssh: %s" % command)
subprocess.check_call(
"ssh -t -o StrictHostKeyChecking=no -i %s %s@%s '%s'" %
"ssh -t -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no -i %s %s@%s '%s'" %
(identity_file, username, host, command), shell=True)

# Copy a file to a given host through scp, throwing an exception if scp fails
def scp_to(host, identity_file, username, local_file, remote_file):
subprocess.check_call(
"scp -q -o StrictHostKeyChecking=no -i %s '%s' '%s@%s:%s'" %
"scp -q -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no -i %s '%s' '%s@%s:%s'" %
(identity_file, local_file, username, host, remote_file), shell=True)

# Copy a file to a given host through scp, throwing an exception if scp fails
def scp_from(host, identity_file, username, remote_file, local_file):
subprocess.check_call(
"scp -q -o StrictHostKeyChecking=no -i %s '%s@%s:%s' '%s'" %
"scp -q -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no -i %s '%s@%s:%s' '%s'" %
(identity_file, username, host, remote_file, local_file), shell=True)

# Insert AWS credentials into a given XML file on the given remote host
Expand Down Expand Up @@ -246,7 +270,9 @@ def ssh_shark(command):
git clone https://github.com/ahirreddy/shark.git -b branch-0.8;
cp shark-back/conf/shark-env.sh shark/conf/shark-env.sh;
cd shark;
sbt/sbt assembly;
wget -O sbt/sbt-launch.jar https://repo.typesafe.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/0.13.9/sbt-launch.jar
rm project/build.properties
HIVE_HOME="hack" sbt/sbt assembly;
/root/spark-ec2/copy-dir --delete /root/shark;
""")

Expand Down Expand Up @@ -276,6 +302,111 @@ def ssh_shark(command):

print "=== FINISHED CREATING BENCHMARK DATA ==="

def prepare_spark_dataset(opts):
def ssh_spark(command):
command = "source /root/.bash_profile; %s" % command
if opts.dryrun == False:
ssh(opts.spark_host, "root", opts.spark_identity_file, command)
else:
print command

if not opts.skip_s3_import:
print "=== IMPORTING BENCHMARK DATA FROM S3 ==="
try:
ssh_spark("hdfs dfs -mkdir -p /user/spark/benchmark")
except Exception:
pass # Folder may already exist

add_aws_credentials(opts.spark_host, "root", opts.spark_identity_file,
"/root/mapreduce/conf/core-site.xml", opts.aws_key_id, opts.aws_key)

ssh_spark("/root/mapreduce/bin/start-mapred.sh")

ssh_spark(
"hadoop distcp " \
"%s/%s/%s/rankings/ " \
"/user/spark/benchmark/rankings/" % (opts.datapath,
opts.file_format, opts.data_prefix))

ssh_spark(
"hadoop distcp " \
"%s/%s/%s/uservisits/ " \
"/user/spark/benchmark/uservisits/" % (
opts.datapath, opts.file_format, opts.data_prefix))

ssh_spark(
"hadoop distcp " \
"%s/%s/%s/crawl/ " \
"/user/spark/benchmark/crawl/" % (opts.datapath,
opts.file_format, opts.data_prefix))

# Scratch table used for JVM warmup
ssh_spark(
"hadoop distcp /user/spark/benchmark/rankings " \
"/user/spark/benchmark/scratch"
)

print "=== CREATING HIVE TABLES FOR BENCHMARK ==="
hive_site = '''
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://NAMENODE:9000</value>
</property>
<property>
<name>fs.defaultFS</name>
<value>hdfs://NAMENODE:9000</value>
</property>
<property>
<name>mapred.job.tracker</name>
<value>NONE</value>
</property>
<property>
<name>mapreduce.framework.name</name>
<value>NONE</value>
</property>
</configuration>
'''.replace("NAMENODE", opts.spark_host).replace('\n', '')

if opts.skip_uploads == False:
ssh_spark('echo "%s" > /usr/local/hadoop/etc/hadoop/hive-site.xml' % hive_site)
scp_to(opts.spark_host, opts.spark_identity_file, "root", "udf/url_count.py",
"/root/url_count.py")
ssh_spark("/root/spark-ec2/copy-dir /root/url_count.py")

format_translation = {
"text": "TEXTFILE",
"text-deflate": "TEXTFILE",
"sequence": "SEQUENCEFILE",
}

file_format = format_translation[opts.file_format.lower()]
ssh_spark(
"spark-sql -e \"DROP TABLE IF EXISTS rankings; " \
"CREATE EXTERNAL TABLE rankings (pageURL STRING, pageRank INT, " \
"avgDuration INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY \\\",\\\" " \
"STORED AS {format} LOCATION \\\"/user/spark/benchmark/rankings\\\";\"".format(format=file_format))

ssh_spark(
"spark-sql -e \"DROP TABLE IF EXISTS scratch; " \
"CREATE EXTERNAL TABLE scratch (pageURL STRING, pageRank INT, " \
"avgDuration INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY \\\",\\\" " \
"STORED AS {format} LOCATION \\\"/user/spark/benchmark/scratch\\\";\"".format(format=file_format))

ssh_spark(
"spark-sql -e \"DROP TABLE IF EXISTS uservisits; " \
"CREATE EXTERNAL TABLE uservisits (sourceIP STRING,destURL STRING," \
"visitDate STRING,adRevenue DOUBLE,userAgent STRING,countryCode STRING," \
"languageCode STRING,searchWord STRING,duration INT ) " \
"ROW FORMAT DELIMITED FIELDS TERMINATED BY \\\",\\\" " \
"STORED AS {format} LOCATION \\\"/user/spark/benchmark/uservisits\\\";\"".format(format=file_format))

ssh_spark("spark-sql -e \"DROP TABLE IF EXISTS documents; " \
"CREATE EXTERNAL TABLE documents (line STRING) STORED AS {format} " \
"LOCATION \\\"/user/spark/benchmark/crawl\\\";\"".format(format=file_format))

print "=== FINISHED CREATING BENCHMARK DATA ==="

def prepare_impala_dataset(opts):
def ssh_impala(command):
ssh(opts.impala_host, "ubuntu", opts.impala_identity_file, command)
Expand Down Expand Up @@ -592,6 +723,8 @@ def main():
prepare_impala_dataset(opts)
if opts.shark:
prepare_shark_dataset(opts)
if opts.spark:
prepare_spark_dataset(opts)
if opts.redshift:
prepare_redshift_dataset(opts)
if opts.hive:
Expand Down
Loading