Skip to content

Commit

Permalink
run only upgraded solutions, some solutions uses feather instead csv
Browse files Browse the repository at this point in the history
  • Loading branch information
jangorecki committed Dec 1, 2018
1 parent acdc4fd commit 2b4d3bd
Show file tree
Hide file tree
Showing 18 changed files with 115 additions and 84 deletions.
19 changes: 8 additions & 11 deletions dask/groupby-dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,22 @@

exec(open("./helpers.py").read())

src_grp = os.environ['SRC_GRP_LOCAL']

ver = dk.__version__
git = dk.__git_revision__
task = "groupby"
data_name = os.path.basename(src_grp)
solution = "dask"
fun = ".groupby"
cache = "TRUE"

src_grp = os.environ['SRC_GRP_LOCAL']
data_name = src_grp[:-4]
print("loading dataset %s" % data_name)

# loading from feather dask/dask#1277
#import feather # temp fix for pandas-dev/pandas#16359
#x = pd.DataFrame(feather.read_dataframe(os.path.join("data", src_grp)))
##x = pd.read_feather(os.path.join("data", src_grp), use_threads=True)

# try parquet according to suggestions in https://github.com/dask/dask/issues/4001
# parq created with fastparquet for 1e7, 1e8, and spark for 1e9 due to failure to read 1e9 data in
# x.write.option("compression","uncompressed").parquet("G1_1e9_1e2.parq") # full path to file was used
Expand All @@ -31,18 +35,11 @@
#x = dd.read_parquet(data_name, engine="fastparquet")
# parquet timings slower, 1e9 not possible to read due to parquet format portability issue of spark-fastparquet

if os.path.isfile(data_name):
x = dd.read_csv(data_name, na_filter=False, dtype={'id1':'category', 'id2':'category', 'id3':'category'}).persist()
else:
x = dd.read_csv(src_grp, na_filter=False, dtype={'id1':'category', 'id2':'category', 'id3':'category'}).persist()
x = dd.read_csv(os.path.join("data", src_grp), na_filter=False, dtype={'id1':'category', 'id2':'category', 'id3':'category'}).persist()

in_rows = len(x)
print(in_rows)

if in_rows==1000000000:
print("skip attempt to groupby dask on 1000000000 due to lack of memory")
exit(0)

print("grouping...")

question = "sum v1 by id1" #1
Expand Down
2 changes: 2 additions & 0 deletions dask/init-dask.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,5 @@ echo 'upgrading dask...'
source ./dask/py-dask/bin/activate

python -m pip install --upgrade dask > /dev/null

python -c 'import dask as dk; open("dask/VERSION","w").write(dk.__version__); open("dask/REVISION","w").write(dk.__git_revision__);' > /dev/null
7 changes: 4 additions & 3 deletions datatable/groupby-datatable.R
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ cat("# groupby-datatable.R\n")
source("./helpers.R")
source("./datatable/helpers-datatable.R")

stopifnot(requireNamespace("bit64", quietly=TRUE)) # used in chk to sum numeric columns
stopifnot(requireNamespace(c("bit64","feather"), quietly=TRUE)) # used in chk to sum numeric columns
suppressPackageStartupMessages(library(data.table))
ver = packageVersion("data.table")
git = datatable.git()
Expand All @@ -15,9 +15,10 @@ fun = "[.data.table"
cache = TRUE

src_grp = Sys.getenv("SRC_GRP_LOCAL")
data_name = basename(src_grp)
data_name = substr(src_grp, 1, nchar(src_grp)-4)
cat(sprintf("loading dataset %s\n", data_name))
X = fread(if (file.exists(data_name)) data_name else src_grp, stringsAsFactors=TRUE) # csv can be provided in local dir for faster import

X = setDT(feather::read_feather(file.path("data", src_grp)))
print(nrow(X))

cat("grouping...\n")
Expand Down
2 changes: 1 addition & 1 deletion datatable/init-datatable.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ set -e

# upgrade to latest devel
echo 'upgrading data.table...'
Rscript -e 'data.table::update.dev.pkg(quiet=TRUE, method="curl")'
Rscript -e 'data.table::update.dev.pkg(quiet=TRUE, method="curl"); v=read.dcf(system.file(package="data.table", "DESCRIPTION"), fields=c("Version","Revision")); invisible(mapply(function(f, v) writeLines(v, file.path("datatable", f)), toupper(colnames(v)), c(v)))'
7 changes: 4 additions & 3 deletions dplyr/groupby-dplyr.R
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ cat("# groupby-dplyr.R\n")
source("./helpers.R")
source("./dplyr/helpers-dplyr.R")

stopifnot(requireNamespace("bit64", quietly=TRUE)) # used in chk to sum numeric columns
stopifnot(requireNamespace(c("bit64","feather"), quietly=TRUE)) # used in chk to sum numeric columns
suppressPackageStartupMessages(library(dplyr, warn.conflicts=FALSE))
ver = packageVersion("dplyr")
git = dplyr.git()
Expand All @@ -15,9 +15,10 @@ fun = "group_by"
cache = TRUE

src_grp = Sys.getenv("SRC_GRP_LOCAL")
data_name = basename(src_grp)
data_name = substr(src_grp, 1, nchar(src_grp)-4)
cat(sprintf("loading dataset %s\n", data_name))
X = data.table::fread(if (file.exists(data_name)) data_name else src_grp, data.table=FALSE, stringsAsFactors=TRUE) # csv can be provided in local dir for faster import

X = as_tibble(feather::read_feather(file.path("data", src_grp)))
print(nrow(X))

cat("grouping...\n")
Expand Down
2 changes: 1 addition & 1 deletion dplyr/init-dplyr.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ set -e

# upgrade to latest devel
echo 'upgrading dplyr...'
Rscript -e 'devtools::install_github(c("tidyverse/dplyr","tidyverse/readr"), quiet=TRUE, method="curl")'
Rscript -e 'devtools::install_github("tidyverse/dplyr", quiet=TRUE, method="curl"); v=read.dcf(system.file(package="dplyr", "DESCRIPTION"), fields=c("Version","RemoteSha")); colnames(v)[colnames(v)=="RemoteSha"]="Revision"; invisible(mapply(function(f, v) writeLines(v, file.path("dplyr", f)), toupper(colnames(v)), c(v)))'
7 changes: 4 additions & 3 deletions juliadf/groupby-juliadf.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
print("# groupby-juliadf.jl\n");

using DataFrames;
using CSV;
using CSV; #Feather;
using Statistics; # mean function
using Pkg; # to get DataFrames version

Expand All @@ -17,10 +17,11 @@ fun = "by";
cache = true;

src_grp = ENV["SRC_GRP_LOCAL"];
data_name = basename(src_grp);
data_name = SubString(src_grp, 1, length(src_grp)-4);
println(string("loading dataset ", data_name))

x = CSV.read(data_name, categorical=true);
#x = Feather.materialize(string("data/", src_grp)); # JuliaData/Feather.jl#97
x = CSV.read(string("data/", src_grp), categorical=true);
in_rows = size(x, 1);
println(in_rows);

Expand Down
2 changes: 2 additions & 0 deletions juliadf/init-juliadf.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@ set -e
# upgrade to latest devel
echo 'upgrading juliadf...'
julia -q -e 'using Pkg; Pkg.update();' > /dev/null

julia -q -e 'using Pkg; f=open("juliadf/VERSION","w"); write(f, string(Pkg.installed()["DataFrames"]));' > /dev/null
88 changes: 65 additions & 23 deletions launcher.R
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,38 @@ library(data.table)
batch = Sys.getenv("BATCH", NA)
nodename = Sys.info()[["nodename"]]

upgraded.solution = function(x) {
ns = gsub(".","",x,fixed=TRUE)
f = file.path(ns, "VERSION")
version = if (!file.exists(f)) NA_character_ else toString(readLines(f, warn=FALSE))
f = file.path(ns, "REVISION")
git = if (!file.exists(f)) NA_character_ else toString(readLines(f, warn=FALSE))
if (!nzchar(git)) git = NA_character_
list(version=version, git=git)
}

log_run = function(solution, task, data, finished, batch, nodename, verbose=TRUE) {
timestamp=as.numeric(Sys.time())
lg = data.table(nodename=nodename, batch=batch, solution=solution, task=task, data=data, timestamp=timestamp, finished=finished)
lg = as.data.table(c(list(nodename=nodename, batch=batch, solution=solution), upgraded.solution(solution), list(task=task, data=data, timestamp=timestamp, finished=finished)))
file = "logs.csv"
fwrite(lg, file=file, append=file.exists(file), col.names=!file.exists(file))
if (verbose) cat(sprintf("%s %s %s %s\n", if (finished) "starting" else "finished", solution, task, data))
if (verbose) cat(sprintf("%s %s %s %s\n", if (finished) "finished:" else "starting:", solution, task, data))
}
file.ext = function(x)
file.ext = function(x) {
switch(x,
"data.table"=, "dplyr"="R",
"pandas"=, "spark"=, "pydatatable"=, "modin"=, "dask"="py",
"juliadf"="jl")
}
getenv = function(x) {
v = Sys.getenv(x, NA_character_)
if (is.na(v)) stop(sprintf("%s env var not defined.", x))
v = strsplit(v, " ", fixed=TRUE)[[1L]]
if (length(v)!=length(unique(v))) stop(sprintf("%s contains non-unique values", x))
v
}
run_tasks = getenv("RUN_TASKS")
#run_tasks = "groupby"
run_solutions = getenv("RUN_SOLUTIONS")
#run_solutions=c("data.table","dplyr","pydatatable")
run_tasks = getenv("RUN_TASKS") #run_tasks = "groupby"
run_solutions = getenv("RUN_SOLUTIONS") #run_solutions=c("data.table","dplyr","pydatatable","spark","pandas")

data = fread("data.csv")
data = data[active==TRUE, # filter on active datasets
Expand All @@ -46,45 +55,78 @@ solution = rbindlist(list(
solution = solution[run_solutions, on="solution", nomatch=0L] # filter for env var RUN_SOLUTIONS

format = rbindlist(list( # to be updated when binary files in place and benchmark scripts updated
dask = list(format="csv"),
data.table = list(format="csv"),
dplyr = list(format="csv"),
juliadf = list(format="csv"),
modin = list(format="csv"),
pandas = list(format="csv"),
pydatatable = list(format="csv"),
spark = list(format="csv")
dask = list(format="csv"), # dask/dask#1277
data.table = list(format="fea"),
dplyr = list(format="fea"),
juliadf = list(format="csv"), # JuliaData/Feather.jl#97
modin = list(format="csv"), # modin-project/modin#278
pandas = list(format="fea"),
pydatatable = list(format="csv"), # h2oai/datatable#1461
spark = list(format="csv") # https://stackoverflow.com/questions/53569580/read-feather-file-into-spark
), idcol="solution")

# what to run
dt = solution[data, on="task", allow.cartesian=TRUE]
dt[, "nodename" := nodename]
dt = format[dt, on="solution"]

# filter runs to only what is new
#TODO
if (file.exists("time.csv") && file.exists("logs.csv") && nrow(timings<-fread("time.csv")) && nrow(logs<-fread("logs.csv"))) {
timings[, .N, by=c("nodename","batch","task","solution","data","version","git")
][, "N" := NULL
][!nzchar(git), "git" := NA_character_
][] -> timings
logs[, .N, c("nodename","batch","task","solution","data","version","git")
][N==2L
][, "N" := NULL
][!nzchar(git), "git" := NA_character_
][] -> logs
past = timings[logs, .(nodename, batch, task, solution, data, timing_version=x.version, timing_git=x.git, logs_version=i.version, logs_git=i.git), on=c("nodename","batch","task","solution","data")] # there might be no timings for solutions that crashed, thus join to logs
# NA timing_version/git is when solution crashed
# NA logs_version/git is when VERSION/REVISION files where not created, TODO separate creating VERSION/REVISION from init scripts and run always
# mismatch of version/git might occur in 'logs.csv' when manually updating solution, and not via init shell scripts
# rules for running/skipping:
# 1. compare to most recent run only
recent = past[batch==max(batch, na.rm=TRUE)]
# 2. where possible compare on git revision, otherwise version
recent[, "compare" := logs_git][is.na(compare), "compare" := logs_version]
upgraded = rbindlist(sapply(unique(dt$solution), upgraded.solution, simplify=FALSE), idcol="solution")
upgraded[, "compare" := git][is.na(compare), "compare" := version]
recent[, c("timing_version","timing_git","logs_version","logs_git") := NULL] # remove unused
if (any(recent[, .N>1L, by=c("nodename","solution","task","data")]$V1))
stop("Recent timings and logs produces more rows than expected, investigate")
dt[upgraded, "compare" := i.compare, on="solution"]
dt[recent, "run_batch" := i.batch, on=c("nodename","solution","task","data","compare")]
} else {
dt[, c("compare","run_batch") := list(NA_character_, NA_integer_)]
}

# run

## solution
solutions = dt[, unique(solution)]
for (s in solutions) {
for (s in solutions) { #s = solutions[1]
### task
#s = solutions[1]
tasks = dt[.(s), unique(task), on="solution"]
for (t in tasks) {
for (t in tasks) { #t = tasks[1]
#### data
#t = tasks[1]
data = dt[.(s, t), data, on=c("solution","task")]
for (d in data) {
#d=data[1]
for (d in data) { #d=data[1]
this_run = dt[.(s, t, d), on=c("solution","task","data")]
if (nrow(this_run) != 1L) stop(sprintf("single run for %s-%s-%s has %s entries while it must have exactly one", s, t, d, nrow(this_run)))
if (nrow(this_run) != 1L)
stop(sprintf("single run for %s-%s-%s has %s entries while it must have exactly one", s, t, d, nrow(this_run)))
if (!is.na(this_run$run_batch)) {
cat(sprintf("%s %s %s %s, %s run on %s %s\n", "skip run:", s, t, d,
substr(this_run$compare, 1, 7), format(as.Date(as.POSIXct(this_run$run_batch, origin="1970-01-01")), "%Y%m%d"), this_run$run_batch))
next
}
log_run(s, t, d, finished=0, batch=batch, nodename=nodename)
# TODO SRC_GRP_LOCAL is groupby specific
Sys.setenv("SRC_GRP_LOCAL"=this_run[, paste(data, format, sep=".")])
ns = gsub(".", "", s, fixed=TRUE)
out_dir = "out"
out_file = sprintf("%s/run_%s_%s_%s_%s.out", out_dir, batch, ns, t, d)
out_file = sprintf("%s/run_%s_%s_%s.out", out_dir, ns, t, d)
ext = file.ext(s)
cmd = sprintf("./%s/%s-%s.%s > %s 2>&1", ns, t, ns, ext, out_file)
venv = if (ext=="py") sprintf("source ./%s/py-%s/bin/activate && ", ns, ns) else ""
Expand Down
10 changes: 3 additions & 7 deletions modin/groupby-modin.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,18 @@

exec(open("./helpers.py").read())

src_grp = os.environ['SRC_GRP_LOCAL']

ver = modin.__version__
git = modin.__git_revision__
task = "groupby"
data_name = os.path.basename(src_grp)
solution = "modin"
fun = ".groupby"
cache = "TRUE"

src_grp = os.environ['SRC_GRP_LOCAL']
data_name = src_grp[:-4]
print("loading dataset %s" % data_name)

if os.path.isfile(data_name):
x = pd.read_csv(data_name)
else:
x = pd.read_csv(src_grp)
x = pd.read_csv(os.path.join("data", src_grp))

print("grouping...")

Expand Down
2 changes: 2 additions & 0 deletions modin/init-modin.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,5 @@ echo 'upgrading modin...'
source ./modin/py-modin/bin/activate

python -m pip install --upgrade modin > /dev/null

python -c 'import modin as modin; open("modin/VERSION","w").write(modin.__version__); open("modin/REVISION","w").write(modin.__git_revision__);' > /dev/null
24 changes: 6 additions & 18 deletions pandas/groupby-pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,37 +6,25 @@
import gc
import timeit
import pandas as pd
import datatable as dt

exec(open("./helpers.py").read())

src_grp = os.environ['SRC_GRP_LOCAL']

ver = pd.__version__
git = ""
git = "" # pd.__git_version__ since 0.24.0
task = "groupby"
data_name = os.path.basename(src_grp)
solution = "pandas"
fun = ".groupby"
cache = "TRUE"

src_grp = os.environ['SRC_GRP_LOCAL']
data_name = src_grp[:-4]
print("loading dataset %s" % data_name)
x = dt.fread(src_grp).topandas()
x['id1'] = x['id1'].astype('category')
x['id2'] = x['id2'].astype('category')
x['id3'] = x['id3'].astype('category')

#if os.path.isfile(data_name):
# x = pd.read_csv(data_name, dtype={'id1':'category', 'id2':'category', 'id3':'category'})
#else:
# x = pd.read_csv(src_grp, dtype={'id1':'category', 'id2':'category', 'id3':'category'})

import feather # temp fix for pandas-dev/pandas/issues/16359
x = pd.DataFrame(feather.read_dataframe(os.path.join("data", src_grp)))
#x = pd.read_feather(os.path.join("data", src_grp), use_threads=True)
print(len(x.index))

#if len(x.index)==1000000000:
# print("skip attempt to groupby pandas on 1000000000 due to lack of memory")
# exit(0)

print("grouping...")

question = "sum v1 by id1" #1
Expand Down
2 changes: 2 additions & 0 deletions pandas/init-pandas.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,5 @@ echo 'upgrading pandas...'
source ./pandas/py-pandas/bin/activate

python -m pip install --upgrade pandas > /dev/null

python -c 'import pandas as pd; open("pandas/VERSION","w").write(pd.__version__); open("pandas/REVISION","w").write("");' > /dev/null 2>1& # from 0.24.0 also revision
10 changes: 3 additions & 7 deletions pydatatable/groupby-pydatatable.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,18 @@

exec(open("./helpers.py").read())

src_grp = os.environ['SRC_GRP_LOCAL']

ver = dt.__version__
git = dt.__git_revision__
task = "groupby"
data_name = os.path.basename(src_grp)
solution = "pydatatable"
fun = "[.datatable"
cache = "TRUE"

src_grp = os.environ['SRC_GRP_LOCAL']
data_name = src_grp[:-4]
print("loading dataset %s" % data_name)

if os.path.isfile(data_name):
x = dt.fread(data_name)
else:
x = dt.fread(src_grp)
x = dt.fread(os.path.join("data", src_grp))

print(x.nrows)

Expand Down
2 changes: 2 additions & 0 deletions pydatatable/init-pydatatable.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,5 @@ make install > /dev/null

cd ../..
rm -rf ./tmp/datatable

python -c 'import datatable as dt; open("pydatatable/VERSION","w").write(dt.__version__); open("pydatatable/REVISION","w").write(dt.__git_revision__);' > /dev/null
1 change: 1 addition & 0 deletions run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ if [[ "$DO_UPGRADE" == true && "$RUN_SOLUTIONS" =~ "dask" ]]; then ./dask/init-d
if [[ "$DO_UPGRADE" == true && "$RUN_SOLUTIONS" =~ "data.table" ]]; then ./datatable/init-datatable.sh; fi;
if [[ "$DO_UPGRADE" == true && "$RUN_SOLUTIONS" =~ "dplyr" ]]; then ./dplyr/init-dplyr.sh; fi;
if [[ "$DO_UPGRADE" == true && "$RUN_SOLUTIONS" =~ "juliadf" ]]; then ./juliadf/init-juliadf.sh; fi;
if [[ "$DO_UPGRADE" == true && "$RUN_SOLUTIONS" =~ "modin" ]]; then ./modin/init-modin.sh; fi;
if [[ "$DO_UPGRADE" == true && "$RUN_SOLUTIONS" =~ "pandas" ]]; then ./pandas/init-pandas.sh; fi;
if [[ "$DO_UPGRADE" == true && "$RUN_SOLUTIONS" =~ "pydatatable" ]]; then ./pydatatable/init-pydatatable.sh; fi;
if [[ "$DO_UPGRADE" == true && "$RUN_SOLUTIONS" =~ "spark" ]]; then ./spark/init-spark.sh; fi;
Expand Down
Loading

0 comments on commit 2b4d3bd

Please sign in to comment.