Skip to content

Commit

Permalink
Merge pull request #64 from ITSLeeds/furrr
Browse files Browse the repository at this point in the history
Furrr
  • Loading branch information
mem48 authored Sep 11, 2024
2 parents 1d82b11 + d4808e3 commit e54493d
Show file tree
Hide file tree
Showing 27 changed files with 175 additions and 127 deletions.
5 changes: 0 additions & 5 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,17 @@ Imports:
collapse,
data.table,
dodgr,
doSNOW,
dplyr,
digest,
foreach,
future,
furrr,
future.apply,
geodist,
httr,
iotools,
stringr,
sf,
parallel,
lubridate,
purrr (>= 1.0),
pbapply,
readr (>= 2.0),
RcppSimdJson,
tidyr,
Expand Down
28 changes: 18 additions & 10 deletions R/atoc_export.R
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ makeCalendar <- function(schedule, ncores = 1) {
CHECKROWS_NAME_VECTOR <- c(WEEKDAY_NAME_VECTOR, "duration", "start_date", "end_date")


res.calendar.days <- res.calendar[,..CHECKROWS_NAME_VECTOR]
res.calendar.days <- res.calendar[,CHECKROWS_NAME_VECTOR]
res.calendar.days <- data.table::transpose(res.calendar.days)
#transpose on the same size runs in around 3s, but causes named dataframe with mixed datatypes to be coerced to unnamed vector of integer.

Expand All @@ -444,7 +444,7 @@ makeCalendar <- function(schedule, ncores = 1) {
.f = checkrows,
.progress = TRUE)
future::plan(future::sequential)
keep <- unlist(keep)


# cl <- parallel::makeCluster(ncores)
# parallel::clusterEvalQ(cl, {
Expand All @@ -458,6 +458,7 @@ makeCalendar <- function(schedule, ncores = 1) {
} else {
keep <- purrr::map(res.calendar.days, checkrows, .progress = TRUE)
}
keep <- unlist(keep)

res.calendar <- res.calendar[keep, ]

Expand Down Expand Up @@ -564,15 +565,22 @@ duplicate.stop_times_alt <- function(calendar, stop_times, ncores = 1) {
}

if (ncores == 1) {
stop_times.dup <- pbapply::pblapply(stop_times_split, duplicate.stop_times.int)
#stop_times.dup <- pbapply::pblapply(stop_times_split, duplicate.stop_times.int)
stop_times.dup <- purrr::map(stop_times_split, duplicate.stop_times.int, .progress = TRUE)
} else {
cl <- parallel::makeCluster(ncores)
stop_times.dup <- pbapply::pblapply(stop_times_split,
duplicate.stop_times.int,
cl = cl
)
parallel::stopCluster(cl)
rm(cl)
# cl <- parallel::makeCluster(ncores)
# stop_times.dup <- pbapply::pblapply(stop_times_split,
# duplicate.stop_times.int,
# cl = cl
# )
# parallel::stopCluster(cl)
# rm(cl)

future::plan(future::multisession, workers = ncores)
res <- furrr::future_map(.x = stop_times_split,
.f = duplicate.stop_times.int,
.progress = TRUE)
future::plan(future::sequential)
}

stop_times.dup <- dplyr::bind_rows(stop_times.dup)
Expand Down
2 changes: 1 addition & 1 deletion R/atoc_import.R
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ strip_whitespace <- function(df) {
#'
#' @param file Path to .mca file
#' @param silent logical, should messages be displayed
#' @param ncores number of cores to use when paralell processing
#' @param ncores number of cores to use when parallel processing
#' @param full_import import all data, default FALSE
#' @param working_timetable use rail industry scheduling times instead of public times
#' @export
Expand Down
2 changes: 1 addition & 1 deletion R/atoc_nr.R
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#' Convert ATOC CIF files from Network Rail to GTFS
#'
#' @param path_in Character, path to Network Rail ATOC file e.g."C:/input/toc-full.CIF.gz"
#' @param silent Logical, should progress messages be surpressed (default TRUE)
#' @param silent Logical, should progress messages be suppressed (default TRUE)
#' @param ncores Numeric, When parallel processing how many cores to use
#' (default 1)
#' @param locations where to get tiploc locations (see details)
Expand Down
9 changes: 6 additions & 3 deletions R/atoc_shapes.R
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ ATOC_shapes <- function(gtfs) {
}
}

dp.list <- pbapply::pblapply(dp.list, path_to_sf, verts = verts)
#dp.list <- pbapply::pblapply(dp.list, path_to_sf, verts = verts)
dp.list <- purrr::map(dp.list, path_to_sf, verts = verts, .progress = TRUE)
dp.list <- unname(dp.list)
pairs$geometry <- sf::st_sfc(dp.list, crs = 4326)
rm(dp.list, verts)
Expand All @@ -110,7 +111,8 @@ ATOC_shapes <- function(gtfs) {
}

message(paste0(Sys.time()," Invert routes"))
pairs_opp$geometry <- pbapply::pblapply(pairs_opp$geometry, invert_linestring)
#pairs_opp$geometry <- pbapply::pblapply(pairs_opp$geometry, invert_linestring)
pairs_opp$geometry <- purrr::map(pairs_opp$geometry, invert_linestring, .progress = TRUE)
pairs_opp$geometry <- sf::st_as_sfc(pairs_opp$geometry, crs = 4326)
pairs_opp <- sf::st_as_sf(pairs_opp)
pairs_opp <- pairs_opp[, names(pairs)]
Expand All @@ -135,7 +137,8 @@ ATOC_shapes <- function(gtfs) {

message(paste0(Sys.time()," final formatting"))
rm(graph, pairs)
shape_res <- pbapply::pblapply(st_split, match_lines)
#shape_res <- pbapply::pblapply(st_split, match_lines)
shape_res <- purrr::map(st_split, match_lines, .progress = TRUE)

str5 <- lapply(shape_res, `[[`, 2)
shapes <- lapply(shape_res, `[[`, 1)
Expand Down
2 changes: 1 addition & 1 deletion R/extdata.R
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ check_data <- function(default_tag = "v0.1.2"){
#'
#' The ATOC data has inaccurate locations for many tiplocs, this is an improved dataset
#'
#' "naptan_missing" Bus Stop Locations missing from NapTAN
#' "naptan_missing" Bus Stop Locations missing from NaPTAN
#'
#' A database of bus stops that are missing from the NAPTAN but are known to
#' have been used. For some reason the official NAPTAN file is missing a small
Expand Down
2 changes: 1 addition & 1 deletion R/get_cal.R
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#' @return data frame
#' @details TransXchange records bank holidays by name (e.g. Christmas Day),
#' some UK bank holidays move around, so this function downloads the official
#' bank holiday calendar. The offical feed only covers a short period of time
#' bank holiday calendar. The official feed only covers a short period of time
#' so this may not be suitable for converting files from the past / future.
#' @export
#'
Expand Down
2 changes: 1 addition & 1 deletion R/get_naptan.R
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#' @param naptan_extra data frame of missing stops default uses `naptan_missing`
#' @return data frame of stop locations
#' @details TransXchange does not store the location of bus stops, so this
#' functions downloads them from the offical DfT source. NaPTAN has some
#' functions downloads them from the official DfT source. NaPTAN has some
#' missing bus stops which are added by UK2GTFS. See `naptan_missing`
#'
#'
Expand Down
9 changes: 8 additions & 1 deletion R/globals.R
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ utils::globalVariables(c(
'agency_lang','agency_id', 'Freq',
'UID','hash','vehicle_type','running_board','service_number',
'operator_code','route_id',
'speed_after','distance','school_terms','distance_after','historic_bank_holidays'
'speed_after','distance','school_terms','distance_after','historic_bank_holidays',
'runs_monday','runs_tuesday','runs_wednesday','runs_thursday','runs_friday',
'runs_saturday','runs_sunday', 'total_sunday',
'runs_Mon','runs_Tue','runs_Wed','runs_Thu','runs_Fri',
'runs_Sat','runs_Sun',
'tot_Mon','tot_Tue','tot_Wed','tot_Thu','tot_Fri',
'tot_Sat','tot_Sun',
'service_id','zone_id','time_bands','stop_name'
))

2 changes: 1 addition & 1 deletion R/gtfs_cleaning.R
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ gtfs_fast_trips <- function(gtfs, maxspeed = 83, routes = TRUE) {
}

#' Find fast stops
#' @description A varient of gtfs_fast_trips that can detect stops that may be in the wrong location
#' @description A variant of gtfs_fast_trips that can detect stops that may be in the wrong location
#' @param gtfs list of gtfs tables
#' @param maxspeed the maximum allowed speed in metres per second default 83 m/s
#' (about 185 mph the max speed of trains on HS1 line)
Expand Down
26 changes: 17 additions & 9 deletions R/gtfs_interpolate_times.R
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,24 @@ gtfs_interpolate_times <- function(gtfs, ncores = 1){
stop_times <- dplyr::group_split(stop_times)

if(ncores == 1){
stop_times <- pbapply::pblapply(stop_times, stops_interpolate)
#stop_times <- pbapply::pblapply(stop_times, stops_interpolate)
stop_times <- purrr::map(stop_times, stops_interpolate, .progress = TRUE)
} else {
cl <- parallel::makeCluster(ncores)
parallel::clusterEvalQ(cl, {loadNamespace("UK2GTFS")})
stop_times <- pbapply::pblapply(stop_times,
stops_interpolate,
cl = cl
)
parallel::stopCluster(cl)
rm(cl)
# cl <- parallel::makeCluster(ncores)
# parallel::clusterEvalQ(cl, {loadNamespace("UK2GTFS")})
# stop_times <- pbapply::pblapply(stop_times,
# stops_interpolate,
# cl = cl
# )
# parallel::stopCluster(cl)
# rm(cl)

future::plan(future::multisession, workers = ncores)
keep <- furrr::future_map(.x = stop_times,
.f = stops_interpolate,
.progress = TRUE)
future::plan(future::sequential)

}

stop_times <- data.table::rbindlist(stop_times)
Expand Down
Loading

0 comments on commit e54493d

Please sign in to comment.