diff --git a/runner/prepare_benchmark.py b/runner/prepare_benchmark.py old mode 100644 new mode 100755 index b71bb75..96be013 --- a/runner/prepare_benchmark.py +++ b/runner/prepare_benchmark.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python +# # Copyright 2013 The Regents of The University California # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -44,6 +46,8 @@ def parse_args(): parser.add_option("-m", "--impala", action="store_true", default=False, help="Whether to include Impala") + parser.add_option("--spark", action="store_true", default=False, + help="Whether to include spark") parser.add_option("-s", "--shark", action="store_true", default=False, help="Whether to include Shark") parser.add_option("-r", "--redshift", action="store_true", default=False, @@ -57,6 +61,8 @@ def parse_args(): parser.add_option("-a", "--impala-host", help="Hostname of Impala state store node") + parser.add_option("--spark-host", + help="Hostname of spark master node") parser.add_option("-b", "--shark-host", help="Hostname of Shark master node") parser.add_option("-c", "--redshift-host", @@ -68,6 +74,8 @@ def parse_args(): parser.add_option("-x", "--impala-identity-file", help="SSH private key file to use for logging into Impala node") + parser.add_option("--spark-identity-file", + help="SSH private key file to use for logging into spark node") parser.add_option("-y", "--shark-identity-file", help="SSH private key file to use for logging into Shark node") parser.add_option("--hive-identity-file", @@ -93,9 +101,16 @@ def parse_args(): parser.add_option("--skip-s3-import", action="store_true", default=False, help="Assumes s3 data is already loaded") + parser.add_option("--skip-uploads", action="store_true", default=False, help="skip uploading files to the cluster (hive config & python UDF. Turn this on if you want to get the files into the right place yourself.") + + parser.add_option("--dryrun", action="store_true", default=False, help="print commands locally instead of executing over ssh") + + parser.add_option("--datapath", default = "s3n://big-data-benchmark/pavlo/", + help="prefix path for input data") + (opts, args) = parser.parse_args() - if not (opts.impala or opts.shark or opts.redshift or opts.hive or opts.hive_tez or opts.hive_cdh): + if not (opts.impala or opts.shark or opts.spark or opts.redshift or opts.hive or opts.hive_tez or opts.hive_cdh): parser.print_help() sys.exit(1) @@ -112,6 +127,14 @@ def parse_args(): print >> stderr, "Impala requires identity file, hostname, and AWS creds" sys.exit(1) + if opts.spark and (opts.spark_identity_file is None or + opts.spark_host is None or + opts.aws_key_id is None or + opts.aws_key is None): + print >> stderr, \ + "Spark requires identity file, spark hostname, and AWS credentials" + sys.exit(1) + if opts.shark and (opts.shark_identity_file is None or opts.shark_host is None or opts.aws_key_id is None or @@ -134,20 +157,21 @@ def parse_args(): # Run a command on a host through ssh, throwing an exception if ssh fails def ssh(host, username, identity_file, command): + print("ssh: %s" % command) subprocess.check_call( - "ssh -t -o StrictHostKeyChecking=no -i %s %s@%s '%s'" % + "ssh -t -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no -i %s %s@%s '%s'" % (identity_file, username, host, command), shell=True) # Copy a file to a given host through scp, throwing an exception if scp fails def scp_to(host, identity_file, username, local_file, remote_file): subprocess.check_call( - "scp -q -o StrictHostKeyChecking=no -i %s '%s' '%s@%s:%s'" % + "scp -q -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no -i %s '%s' '%s@%s:%s'" % (identity_file, local_file, username, host, remote_file), shell=True) # Copy a file to a given host through scp, throwing an exception if scp fails def scp_from(host, identity_file, username, remote_file, local_file): subprocess.check_call( - "scp -q -o StrictHostKeyChecking=no -i %s '%s@%s:%s' '%s'" % + "scp -q -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no -i %s '%s@%s:%s' '%s'" % (identity_file, username, host, remote_file, local_file), shell=True) # Insert AWS credentials into a given XML file on the given remote host @@ -246,7 +270,9 @@ def ssh_shark(command): git clone https://github.com/ahirreddy/shark.git -b branch-0.8; cp shark-back/conf/shark-env.sh shark/conf/shark-env.sh; cd shark; - sbt/sbt assembly; + wget -O sbt/sbt-launch.jar https://repo.typesafe.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/0.13.9/sbt-launch.jar + rm project/build.properties + HIVE_HOME="hack" sbt/sbt assembly; /root/spark-ec2/copy-dir --delete /root/shark; """) @@ -276,6 +302,111 @@ def ssh_shark(command): print "=== FINISHED CREATING BENCHMARK DATA ===" +def prepare_spark_dataset(opts): + def ssh_spark(command): + command = "source /root/.bash_profile; %s" % command + if opts.dryrun == False: + ssh(opts.spark_host, "root", opts.spark_identity_file, command) + else: + print command + + if not opts.skip_s3_import: + print "=== IMPORTING BENCHMARK DATA FROM S3 ===" + try: + ssh_spark("hdfs dfs -mkdir -p /user/spark/benchmark") + except Exception: + pass # Folder may already exist + + add_aws_credentials(opts.spark_host, "root", opts.spark_identity_file, + "/root/mapreduce/conf/core-site.xml", opts.aws_key_id, opts.aws_key) + + ssh_spark("/root/mapreduce/bin/start-mapred.sh") + + ssh_spark( + "hadoop distcp " \ + "%s/%s/%s/rankings/ " \ + "/user/spark/benchmark/rankings/" % (opts.datapath, + opts.file_format, opts.data_prefix)) + + ssh_spark( + "hadoop distcp " \ + "%s/%s/%s/uservisits/ " \ + "/user/spark/benchmark/uservisits/" % ( + opts.datapath, opts.file_format, opts.data_prefix)) + + ssh_spark( + "hadoop distcp " \ + "%s/%s/%s/crawl/ " \ + "/user/spark/benchmark/crawl/" % (opts.datapath, + opts.file_format, opts.data_prefix)) + + # Scratch table used for JVM warmup + ssh_spark( + "hadoop distcp /user/spark/benchmark/rankings " \ + "/user/spark/benchmark/scratch" + ) + + print "=== CREATING HIVE TABLES FOR BENCHMARK ===" + hive_site = ''' + + + fs.default.name + hdfs://NAMENODE:9000 + + + fs.defaultFS + hdfs://NAMENODE:9000 + + + mapred.job.tracker + NONE + + + mapreduce.framework.name + NONE + + + '''.replace("NAMENODE", opts.spark_host).replace('\n', '') + + if opts.skip_uploads == False: + ssh_spark('echo "%s" > /usr/local/hadoop/etc/hadoop/hive-site.xml' % hive_site) + scp_to(opts.spark_host, opts.spark_identity_file, "root", "udf/url_count.py", + "/root/url_count.py") + ssh_spark("/root/spark-ec2/copy-dir /root/url_count.py") + + format_translation = { + "text": "TEXTFILE", + "text-deflate": "TEXTFILE", + "sequence": "SEQUENCEFILE", + } + + file_format = format_translation[opts.file_format.lower()] + ssh_spark( + "spark-sql -e \"DROP TABLE IF EXISTS rankings; " \ + "CREATE EXTERNAL TABLE rankings (pageURL STRING, pageRank INT, " \ + "avgDuration INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY \\\",\\\" " \ + "STORED AS {format} LOCATION \\\"/user/spark/benchmark/rankings\\\";\"".format(format=file_format)) + + ssh_spark( + "spark-sql -e \"DROP TABLE IF EXISTS scratch; " \ + "CREATE EXTERNAL TABLE scratch (pageURL STRING, pageRank INT, " \ + "avgDuration INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY \\\",\\\" " \ + "STORED AS {format} LOCATION \\\"/user/spark/benchmark/scratch\\\";\"".format(format=file_format)) + + ssh_spark( + "spark-sql -e \"DROP TABLE IF EXISTS uservisits; " \ + "CREATE EXTERNAL TABLE uservisits (sourceIP STRING,destURL STRING," \ + "visitDate STRING,adRevenue DOUBLE,userAgent STRING,countryCode STRING," \ + "languageCode STRING,searchWord STRING,duration INT ) " \ + "ROW FORMAT DELIMITED FIELDS TERMINATED BY \\\",\\\" " \ + "STORED AS {format} LOCATION \\\"/user/spark/benchmark/uservisits\\\";\"".format(format=file_format)) + + ssh_spark("spark-sql -e \"DROP TABLE IF EXISTS documents; " \ + "CREATE EXTERNAL TABLE documents (line STRING) STORED AS {format} " \ + "LOCATION \\\"/user/spark/benchmark/crawl\\\";\"".format(format=file_format)) + + print "=== FINISHED CREATING BENCHMARK DATA ===" + def prepare_impala_dataset(opts): def ssh_impala(command): ssh(opts.impala_host, "ubuntu", opts.impala_identity_file, command) @@ -592,6 +723,8 @@ def main(): prepare_impala_dataset(opts) if opts.shark: prepare_shark_dataset(opts) + if opts.spark: + prepare_spark_dataset(opts) if opts.redshift: prepare_redshift_dataset(opts) if opts.hive: diff --git a/runner/run_query.py b/runner/run_query.py index 8c7407b..37e93d8 100644 --- a/runner/run_query.py +++ b/runner/run_query.py @@ -171,6 +171,8 @@ def parse_args(): parser.add_option("-m", "--impala", action="store_true", default=False, help="Whether to include Impala") + parser.add_option("--spark", action="store_true", default=False, + help="Whether to include Spark") parser.add_option("-s", "--shark", action="store_true", default=False, help="Whether to include Shark") parser.add_option("-r", "--redshift", action="store_true", default=False, @@ -182,6 +184,11 @@ def parse_args(): parser.add_option("--hive-cdh", action="store_true", default=False, help="Hive on CDH cluster") + parser.add_option("--spark-no-cache", action="store_true", + default=False, help="Disable caching in Spark") + parser.add_option("--spark-hadoop", action="store_true", + default=False, help="Run spark on hadoop, i.e. don't use the spark standalone scheduler, slave management, etc.") + parser.add_option("-g", "--shark-no-cache", action="store_true", default=False, help="Disable caching in Shark") parser.add_option("--impala-use-hive", action="store_true", @@ -193,6 +200,8 @@ def parse_args(): parser.add_option("-a", "--impala-hosts", help="Hostnames of Impala nodes (comma seperated)") + parser.add_option("--spark-host", + help="Hostname of spark master node") parser.add_option("-b", "--shark-host", help="Hostname of Shark master node") parser.add_option("-c", "--redshift-host", @@ -204,6 +213,8 @@ def parse_args(): parser.add_option("-x", "--impala-identity-file", help="SSH private key file to use for logging into Impala node") + parser.add_option("--spark-identity-file", + help="SSH private key file to use for logging into spark node") parser.add_option("-y", "--shark-identity-file", help="SSH private key file to use for logging into Shark node") parser.add_option("--hive-identity-file", @@ -225,7 +236,7 @@ def parse_args(): (opts, args) = parser.parse_args() - if not (opts.impala or opts.shark or opts.redshift or opts.hive or opts.hive_cdh): + if not (opts.impala or opts.spark or opts.shark or opts.redshift or opts.hive or opts.hive_cdh): parser.print_help() sys.exit(1) @@ -234,6 +245,12 @@ def parse_args(): print >> stderr, "Impala requires identity file and hostname" sys.exit(1) + if opts.spark and (opts.spark_identity_file is None or + opts.spark_host is None): + print >> stderr, \ + "Spark requires identity file and hostname" + sys.exit(1) + if opts.shark and (opts.shark_identity_file is None or opts.shark_host is None): print >> stderr, \ @@ -266,21 +283,186 @@ def parse_args(): # Run a command on a host through ssh, throwing an exception if ssh fails def ssh(host, username, identity_file, command): return subprocess.check_call( - "ssh -t -o StrictHostKeyChecking=no -i %s %s@%s '%s'" % + "ssh -t -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no -i %s %s@%s '%s'" % (identity_file, username, host, command), shell=True) # Copy a file to a given host through scp, throwing an exception if scp fails def scp_to(host, identity_file, username, local_file, remote_file): subprocess.check_call( - "scp -q -o StrictHostKeyChecking=no -i %s '%s' '%s@%s:%s'" % + "scp -q -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no -i %s '%s' '%s@%s:%s'" % (identity_file, local_file, username, host, remote_file), shell=True) # Copy a file to a given host through scp, throwing an exception if scp fails def scp_from(host, identity_file, username, remote_file, local_file): subprocess.check_call( - "scp -q -o StrictHostKeyChecking=no -i %s '%s@%s:%s' '%s'" % + "scp -q -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no -i %s '%s@%s:%s' '%s'" % (identity_file, username, host, remote_file, local_file), shell=True) +def run_spark_benchmark(opts): + def ssh_spark(command): + command = "source /root/.bash_profile; %s" % command + ssh(opts.spark_host, "root", opts.spark_identity_file, command) + + def clear_buffer_cache_spark(host): + print >> stderr, "Clearing", host + ssh(host, "root", opts.spark_identity_file, + "sudo bash -c \"sync && echo 3 > /proc/sys/vm/drop_caches\"") + + local_clean_query = CLEAN_QUERY + local_query_map = QUERY_MAP + + prefix = str(time.time()).split(".")[0] + query_file_name = "%s_workload.sh" % prefix + slaves_file_name = "%s_slaves" % prefix + local_query_file = os.path.join(LOCAL_TMP_DIR, query_file_name) + local_slaves_file = os.path.join(LOCAL_TMP_DIR, slaves_file_name) + query_file = open(local_query_file, 'w') + + remote_result_file = "/mnt/%s_results" % prefix + remote_tmp_file = "/mnt/%s_out" % prefix + remote_query_file = "/mnt/%s" % query_file_name + + runner = "/root/spark/bin/spark-sql" + if opts.spark_hadoop: + runner = "spark-sql" + + if opts.spark_hadoop == False: + #spark standalone + print "Getting Slave List" + scp_from(opts.spark_host, opts.spark_identity_file, "root", + "/root/spark-ec2/slaves", local_slaves_file) + slaves = map(str.strip, open(local_slaves_file).readlines()) + + #quick fix for instances setup without ebs storage: + map(lambda x: ssh(x, "root", opts.spark_identity_file, "mkdir -p /mnt"), + slaves + [opts.spark_host]) + + print "Restarting spark standalone scheduler..." + ssh_spark("/root/spark/sbin/stop-all.sh") + ensure_spark_stopped_on_slaves(slaves) + time.sleep(30) + ssh_spark("/root/spark/sbin/stop-all.sh") + ssh_spark("/root/spark/sbin/start-all.sh") + time.sleep(10) + + # Two modes here: spark Mem and spark Disk. If using spark disk clear buffer + # cache in-between each query. If using spark Mem, used cached tables. + + query_list = "set mapred.reduce.tasks = %s;" % opts.reduce_tasks + + # Throw away query for JVM warmup + query_list += "SELECT COUNT(*) FROM scratch;" + + # Create cached queries for spark Mem + if not opts.spark_no_cache: + local_clean_query = make_output_cached(CLEAN_QUERY) + + def convert_to_cached(query): + return (make_output_cached(make_input_cached(query[0])), ) + + local_query_map = {k: convert_to_cached(v) for k, v in QUERY_MAP.items()} + + # Set up cached tables + if '4' in opts.query_num: + # Query 4 uses entirely different tables + query_list += """ + DROP TABLE IF EXISTS documents_cached; + CREATE TABLE documents_cached AS SELECT * FROM documents; + """ + else: + query_list += """ + DROP TABLE IF EXISTS uservisits_cached; + DROP TABLE IF EXISTS rankings_cached; + CREATE TABLE uservisits_cached AS SELECT * FROM uservisits; + CREATE TABLE rankings_cached AS SELECT * FROM rankings; + """ + + # Warm up for Query 1 + if '1' in opts.query_num: + query_list += "DROP TABLE IF EXISTS warmup;" + query_list += "CREATE TABLE warmup AS SELECT pageURL, pageRank FROM scratch WHERE pageRank > 1000;" + + if '4' not in opts.query_num: + query_list += local_clean_query + query_list += local_query_map[opts.query_num][0] + + query_list = re.sub("\s\s+", " ", query_list.replace('\n', ' ')) + + print "\nQuery:" + print query_list.replace(';', ";\n") + + if opts.clear_buffer_cache: + query_file.write("/root/spark/sbin/slaves.sh \"sudo bash -c \\\"sync && echo 3 > /proc/sys/vm/drop_caches\\\"\"") + #query_file.write("python /root/shark/bin/dev/clear-buffer-cache.py\n") + + query_file.write( + "%s -e '%s' > %s 2>&1\n" % (runner, query_list, remote_tmp_file)) + + # query_file.write( + # "cat %s | tr -d '\\000' | grep Time | grep -v INFO |grep -v MapReduce >> %s\n" % ( + # remote_tmp_file, remote_result_file)) + #for spark 1.6 + query_file.write( + "cat %s | tr -d '\\000' | grep 'Time taken' >> %s\n" % ( + remote_tmp_file, remote_result_file)) + + query_file.close() + + print "Copying files to spark" + scp_to(opts.spark_host, opts.spark_identity_file, "root", local_query_file, + remote_query_file) + ssh_spark("chmod 775 %s" % remote_query_file) + + # Run benchmark + print "Running remote benchmark..." + + # Collect results + results = [] + contents = [] + + for i in range(opts.num_trials): + print "Stopping Executors on Slaves....." + if not opts.spark_hadoop: + ensure_spark_stopped_on_slaves(slaves) + + print "Query %s : Trial %i" % (opts.query_num, i+1) + print remote_query_file + ssh_spark("%s" % remote_query_file) + local_results_file = os.path.join(LOCAL_TMP_DIR, "%s_results" % prefix) + scp_from(opts.spark_host, opts.spark_identity_file, "root", + "/mnt/%s_results" % prefix, local_results_file) + content = open(local_results_file).readlines() + # all_times = map(lambda x: float(x.split(": ")[1].split(" ")[0]), content) + # for spark 1.6: + all_times = map(lambda x: float(x.split(": ")[2].split(" ")[0]), content) + + if '4' in opts.query_num: + query_times = all_times[-4:] + part_a = query_times[1] + part_b = query_times[3] + print "Parts: %s, %s" % (part_a, part_b) + result = float(part_a) + float(part_b) + else: + result = all_times[-1] # Only want time of last query + + print "Result: ", result + print "Raw Times: ", content + + results.append(result) + contents.append(content) + + # Clean-up + #ssh_spark("rm /mnt/%s*" % prefix) + print "Clean Up...." + ssh_spark("rm /mnt/%s_results" % prefix) + os.remove(local_results_file) + + if opts.spark_hadoop == False: + os.remove(local_slaves_file) + os.remove(local_query_file) + + return results, contents + def run_shark_benchmark(opts): def ssh_shark(command): command = "source /root/.bash_profile; %s" % command @@ -308,7 +490,7 @@ def ssh_shark(command): print "Restarting standalone scheduler..." ssh_shark("/root/spark/bin/stop-all.sh") - ensure_spark_stopped_on_slaves(slaves) + ensure_shark_stopped_on_slaves(slaves) time.sleep(30) ssh_shark("/root/spark/bin/stop-all.sh") ssh_shark("/root/spark/bin/start-all.sh") @@ -367,7 +549,7 @@ def convert_to_cached(query): "%s -e '%s' > %s 2>&1\n" % (runner, query_list, remote_tmp_file)) query_file.write( - "cat %s | grep Time | grep -v INFO |grep -v MapReduce >> %s\n" % ( + "cat %s | tr -d '\\000' | grep Time | grep -v INFO |grep -v MapReduce >> %s\n" % ( remote_tmp_file, remote_result_file)) query_file.close() @@ -386,7 +568,7 @@ def convert_to_cached(query): for i in range(opts.num_trials): print "Stopping Executors on Slaves....." - ensure_spark_stopped_on_slaves(slaves) + ensure_shark_stopped_on_slaves(slaves) print "Query %s : Trial %i" % (opts.query_num, i+1) ssh_shark("%s" % remote_query_file) local_results_file = os.path.join(LOCAL_TMP_DIR, "%s_results" % prefix) @@ -596,7 +778,7 @@ def clear_buffer_cache_hive(host): "%s -e '%s' > %s 2>&1\n" % (runner, query_list, remote_tmp_file)) query_file.write( - "cat %s | grep Time | grep -v INFO |grep -v MapReduce >> %s\n" % ( + "cat %s | tr -d '\\000' | grep Time | grep -v INFO |grep -v MapReduce >> %s\n" % ( remote_tmp_file, remote_result_file)) query_file.close() @@ -697,7 +879,7 @@ def clear_buffer_cache_hive(host): "%s -e '%s' > %s 2>&1\n" % (runner, query_list, remote_tmp_file)) query_file.write( - "cat %s | grep Time | grep -v INFO |grep -v MapReduce >> %s\n" % ( + "cat %s | tr -d '\\000' | grep Time | grep -v INFO |grep -v MapReduce >> %s\n" % ( remote_tmp_file, remote_result_file)) query_file.close() @@ -768,6 +950,20 @@ def ssh_ret_code(host, user, id_file, cmd): return e.returncode def ensure_spark_stopped_on_slaves(slaves): + stop = False + while not stop: + cmd = "jps | grep ExecutorBackend" + ret_vals = map(lambda s: ssh_ret_code(s, "root", opts.spark_identity_file, cmd), slaves) + print ret_vals + if 0 in ret_vals: + print "Spark is still running on some slaves... sleeping" + cmd = "jps | grep ExecutorBackend | cut -d \" \" -f 1 | xargs -rn1 kill -9" + map(lambda s: ssh_ret_code(s, "root", opts.spark_identity_file, cmd), slaves) + time.sleep(2) + else: + stop = True + +def ensure_shark_stopped_on_slaves(slaves): stop = False while not stop: cmd = "jps | grep ExecutorBackend" @@ -789,6 +985,8 @@ def main(): if opts.impala: results, contents = run_impala_benchmark(opts) + if opts.spark: + results, contents = run_spark_benchmark(opts) if opts.shark: results, contents = run_shark_benchmark(opts) if opts.redshift: @@ -803,6 +1001,10 @@ def main(): fname = "impala_disk" else: fname = "impala_mem" + elif opts.spark and opts.spark_no_cache: + fname = "spark_disk" + elif opts.spark: + fname = "spark_mem" elif opts.shark and opts.shark_no_cache: fname = "shark_disk" elif opts.shark: