Skip to content

Commit

Permalink
Adding new changes on spark R
Browse files Browse the repository at this point in the history
  • Loading branch information
GerardSoleCa committed Feb 28, 2017
1 parent 28b8876 commit 68737ed
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 8 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,6 @@
**/metastore_db/*
**/derby.log
**/*.log


**/.Rhistory
17 changes: 14 additions & 3 deletions week_3/main.R
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
source("plain_R/top10airports.R")
source("spark_R/top10airports.R")
source("plain_R/plain.R")
source("spark_R/spark.R")

read <- function(filepath) {
return (as.data.frame(read.csv(filepath, header=FALSE, sep = ";")))
Expand All @@ -15,4 +15,15 @@ main <- function(){

one_week <- read("./datasets/traffic1day.exp2")
plain_1_week = system.time(plainR(one_week))
}
}

main2 <- function(){
sc <- startContext()
# Plain R
one_hour <- read("./datasets/traffic1hour.exp2")
plain_1_hour = system.time(sparkR(sc, one_hour))

stopContext(sc)
}

main2()
32 changes: 27 additions & 5 deletions week_3/spark_R/spark.R
Original file line number Diff line number Diff line change
@@ -1,11 +1,33 @@
library(dplyr)
library(nycflights13)
library(ggplot2)
library(sparklyr)

startContext <- function(){
return (spark_connect(master="local"))
}
#startContext <- function(){
# return (spark_connect(master="local"))
#}

sparkR <- function(){
sc <- spark_connect(master="local")

flights_r <- spark_read_csv(sc, name="flights_sc", path="./datasets/traffic1hour.exp2", header=F, delimiter = ";")

origin <- select(flights_r, airport=V1)
destination <- select(flights_r, airport=V2)

origin <- mutate(origin, departure = 1, arrival=0, total=1)
destination <- mutate(destination, departure=1, arrival = 1, total=1)

origin %>%
group_by(airport)
destination %>% group_by(airport)



filter(flights_r, dep_delay > 1000)
arrange(flights_r, desc(dep_delay))
summarise(flights_r, mean_dep_delay = mean(dep_delay))
mutate(flights_r, speed = distance / air_time * 60)

}

stopContext <- function(sc){
spark_disconnect(sc)
Expand Down

0 comments on commit 68737ed

Please sign in to comment.