Skip to content

Commit

Permalink
Allow Bulk operation arguments to be passed through from top-level ge…
Browse files Browse the repository at this point in the history
…nerics such as sf_create, sf_update, etc.
  • Loading branch information
StevenMMortimer committed Jun 5, 2019
1 parent f4846a5 commit 087b0f2
Show file tree
Hide file tree
Showing 19 changed files with 156 additions and 117 deletions.
91 changes: 54 additions & 37 deletions R/bulk-operation.R
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,21 @@
#' @template external_id_fieldname
#' @template api_type
#' @param content_type character; being one of 'CSV', 'ZIP_CSV', 'ZIP_XML', or 'ZIP_JSON' to
#' indicate the type of data being passed to the Bulk API
#' @param concurrency_mode character; either "Parallel" or "Serial" that specifies whether batches should be completed
#' sequentially or in parallel. Use "Serial" only if Lock contentions persist with in "Parallel" mode.
#' @param line_ending character; indicating the The line ending used for CSV job data,
#' marking the end of a data row. The default is NULL and determined by the operating system using
#' "CRLF" for Windows machines and "LF" for Unix machines
#' indicate the type of data being passed to the Bulk API.
#' @param concurrency_mode character; either "Parallel" or "Serial" that specifies
#' whether batches should be completed sequentially or in parallel. Use "Serial"
#' only if lock contentions persist with in "Parallel" mode. Note: this argument is
#' only used in the Bulk 1.0 API and will be ignored in calls using the Bulk 2.0 API.
#' @param line_ending character; indicating the line ending used for CSV job data,
#' marking the end of a data row. The default is NULL meaing that the line ending
#' is determined by the operating system using "CRLF" for Windows machines and
#' "LF" for Unix machines. Note: this argument is only used in the Bulk 2.0 API
#' and will be ignored in calls using the Bulk 1.0 API.
#' @param column_delimiter character; indicating the column delimiter used for CSV job data.
#' The default value is COMMA. Valid values are: "BACKQUOTE", "CARET", "COMMA", "PIPE",
#' "SEMICOLON", and "TAB".
#' "SEMICOLON", and "TAB", but this package only accepts and uses "COMMA". Also,
#' note that this argument is only used in the Bulk 2.0 API and will be ignored
#' in calls using the Bulk 1.0 API.
#' @template verbose
#' @return A \code{tbl_df} parameters defining the created job, including id
#' @references \url{https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/}
Expand Down Expand Up @@ -46,8 +52,8 @@ sf_create_job_bulk <- function(operation = c("insert", "delete", "upsert", "upda
object_name,
external_id_fieldname = NULL,
api_type = c("Bulk 1.0", "Bulk 2.0"),
content_type=c('CSV', 'ZIP_CSV', 'ZIP_XML', 'ZIP_JSON'),
concurrency_mode=c("Parallel", "Serial"),
content_type = c('CSV', 'ZIP_CSV', 'ZIP_XML', 'ZIP_JSON'),
concurrency_mode = c("Parallel", "Serial"),
line_ending = NULL,
column_delimiter = c('COMMA', 'TAB', 'PIPE', 'SEMICOLON',
'CARET', 'BACKQUOTE'),
Expand All @@ -57,26 +63,35 @@ sf_create_job_bulk <- function(operation = c("insert", "delete", "upsert", "upda
operation <- match.arg(operation)
content_type <- match.arg(content_type)
if(api_type == "Bulk 1.0"){
job_response <- sf_create_job_bulk_v1(operation=operation,
object_name=object_name,
external_id_fieldname=external_id_fieldname,
content_type=content_type,
concurrency_mode=concurrency_mode,
verbose=verbose)
if(!missing(line_ending)){
warning("Ignoring the line_ending argument which isn't used when calling the Bulk 1.0 API", call. = FALSE)
}
if(!missing(column_delimiter)){
warning("Ignoring the column_delimiter argument which isn't used when calling the Bulk 1.0 API", call. = FALSE)
}
job_response <- sf_create_job_bulk_v1(operation = operation,
object_name = object_name,
external_id_fieldname = external_id_fieldname,
content_type = content_type,
concurrency_mode = concurrency_mode,
verbose = verbose)
} else if(api_type == "Bulk 2.0"){
if(!(operation %in% c("insert", "delete", "upsert", "update"))){
stop('Bulk 2.0 only supports the following operations: "insert", "delete", "upsert", and "update"')
}
if(!(content_type %in% c("CSV"))){
stop('Bulk 2.0 only supports the "CSV" content type.')
}
job_response <- sf_create_job_bulk_v2(operation=operation,
object_name=object_name,
external_id_fieldname=external_id_fieldname,
content_type=content_type,
line_ending=line_ending,
column_delimiter=column_delimiter,
verbose=verbose)
if(!missing(concurrency_mode)){
warning("Ignoring the concurrency_mode argument which isn't used when calling the Bulk 2.0 API", call. = FALSE)
}
job_response <- sf_create_job_bulk_v2(operation = operation,
object_name = object_name,
external_id_fieldname = external_id_fieldname,
content_type = content_type,
line_ending = line_ending,
column_delimiter = column_delimiter,
verbose = verbose)
} else {
stop("Unknown API type")
}
Expand All @@ -93,10 +108,10 @@ sf_create_job_bulk <- function(operation = c("insert", "delete", "upsert", "upda
sf_create_job_bulk_v1 <- function(operation = c("insert", "delete", "upsert", "update",
"hardDelete", "query"),
object_name,
external_id_fieldname=NULL,
content_type=c('CSV', 'ZIP_CSV', 'ZIP_XML', 'ZIP_JSON'),
concurrency_mode=c("Parallel", "Serial"),
verbose=FALSE){
external_id_fieldname = NULL,
content_type = c('CSV', 'ZIP_CSV', 'ZIP_XML', 'ZIP_JSON'),
concurrency_mode = c("Parallel", "Serial"),
verbose = FALSE){

operation <- match.arg(operation)
content_type <- match.arg(content_type)
Expand Down Expand Up @@ -150,16 +165,14 @@ sf_create_job_bulk_v1 <- function(operation = c("insert", "delete", "upsert", "u
#' @keywords internal
sf_create_job_bulk_v2 <- function(operation = c("insert", "delete", "upsert", "update"),
object_name,
external_id_fieldname=NULL,
external_id_fieldname = NULL,
content_type = 'CSV',
line_ending = NULL,
column_delimiter = c('COMMA', 'TAB', 'PIPE', 'SEMICOLON',
'CARET', 'BACKQUOTE'),
verbose=FALSE){

operation <- match.arg(operation)
content_type <- match.arg(content_type)
line_ending <- match.arg(line_ending)
column_delimiter <- match.arg(column_delimiter)
if(column_delimiter != "COMMA"){
stop("column_delimiter = 'COMMA' is currently the only supported file delimiter")
Expand Down Expand Up @@ -696,8 +709,9 @@ sf_batch_status_bulk <- function(job_id, batch_id, api_type=c("Bulk 1.0"),
#' @template batch_id
#' @template api_type
#' @template verbose
#' @return A \code{tbl_df}, formatted by salesforce, with information containing the success or failure or certain rows in a submitted batch,
#' unless the operation was query, then it is a data.frame containing the result_id for retrieving the recordset.
#' @return A \code{tbl_df}, formatted by Salesforce, with information containing
#' the success or failure or certain rows in a submitted batch, unless the operation
#' was query, then it is a data.frame containing the result_id for retrieving the recordset.
#' @references \url{https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/}
#' @note This is a legacy function used only with Bulk 1.0.
#' @examples
Expand Down Expand Up @@ -847,6 +861,8 @@ sf_get_job_records_bulk_v2 <- function(job_id,
#' @param operation character; string defining the type of operation being performed
#' @template external_id_fieldname
#' @template api_type
#' @param ... other arguments passed on to \code{\link{sf_create_job_bulk}} such as
#' \code{content_type}, \code{concurrency_mode}, \code{line_ending} or \code{column_delimiter}.
#' @param wait_for_results logical; indicating whether to wait for the operation to complete
#' so that the batch results of individual records can be obtained
#' @param interval_seconds integer; defines the seconds between attempts to check
Expand Down Expand Up @@ -875,6 +891,7 @@ sf_bulk_operation <- function(input_data,
"update", "hardDelete"),
external_id_fieldname = NULL,
api_type = c("Bulk 1.0", "Bulk 2.0"),
...,
wait_for_results = TRUE,
interval_seconds = 3,
max_attempts = 200,
Expand All @@ -883,11 +900,11 @@ sf_bulk_operation <- function(input_data,
stopifnot(!missing(operation))
api_type <- match.arg(api_type)

job_info <- sf_create_job_bulk(operation, object_name=object_name,
external_id_fieldname=external_id_fieldname,
api_type=api_type, verbose=verbose)
job_info <- sf_create_job_bulk(operation, object_name = object_name,
external_id_fieldname = external_id_fieldname,
api_type = api_type, verbose = verbose, ...)
batches_info <- sf_create_batches_bulk(job_id = job_info$id, input_data,
api_type=api_type, verbose=verbose)
api_type = api_type, verbose = verbose)

if(wait_for_results){
status_complete <- FALSE
Expand Down Expand Up @@ -929,12 +946,12 @@ sf_bulk_operation <- function(input_data,
}
if (!status_complete) {
message("Function's Time Limit Exceeded. Aborting Job Now")
res <- sf_abort_job_bulk(job_info$id, api_type=api_type, verbose=verbose)
res <- sf_abort_job_bulk(job_info$id, api_type = api_type, verbose = verbose)
} else {
res <- sf_get_job_records_bulk(job_info$id, api_type=api_type, verbose=verbose)
# For Bulk 2.0 jobs -> INVALIDJOBSTATE: Closing already Completed Job not allowed
if(api_type == "Bulk 1.0"){
close_job_info <- sf_close_job_bulk(job_info$id, api_type=api_type, verbose=verbose)
close_job_info <- sf_close_job_bulk(job_info$id, api_type = api_type, verbose = verbose)
}
}
} else {
Expand Down
22 changes: 11 additions & 11 deletions R/create.R
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#' @template object_name
#' @template all_or_none
#' @template api_type
#' @param ... Other arguments passed on to \code{\link{sf_bulk_operation}}.
#' @param ... other arguments passed on to \code{\link{sf_bulk_operation}}.
#' @template verbose
#' @return \code{tbl_df} of records with success indicator
#' @examples
Expand Down Expand Up @@ -180,12 +180,12 @@ sf_create_bulk_v1 <- function(input_data, object_name, all_or_none = FALSE,
...,
verbose = FALSE){
# allor none?
input_data <- sf_input_data_validation(operation="create", input_data)
resultset <- sf_bulk_operation(input_data=input_data,
object_name=object_name,
operation="insert",
input_data <- sf_input_data_validation(operation = "create", input_data)
resultset <- sf_bulk_operation(input_data = input_data,
object_name = object_name,
operation = "insert",
api_type = "Bulk 1.0",
verbose=verbose, ...)
verbose = verbose, ...)
return(resultset)
}

Expand All @@ -199,12 +199,12 @@ sf_create_bulk_v2 <- function(input_data, object_name, all_or_none = FALSE,
verbose = FALSE){
# allor none?
#The order of records in the response is not guaranteed to match the ordering of records in the original job data.
input_data <- sf_input_data_validation(operation="create", input_data)
resultset <- sf_bulk_operation(input_data=input_data,
object_name=object_name,
operation="insert",
input_data <- sf_input_data_validation(operation = "create", input_data)
resultset <- sf_bulk_operation(input_data = input_data,
object_name = object_name,
operation = "insert",
api_type = "Bulk 2.0",
verbose=verbose, ...)
verbose = verbose, ...)
return(resultset)
}

Expand Down
12 changes: 6 additions & 6 deletions R/delete.R
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
#' @template object_name
#' @template all_or_none
#' @template api_type
#' @param ... Other arguments passed on to \code{\link{sf_bulk_operation}}.
#' @param ... other arguments passed on to \code{\link{sf_bulk_operation}}.
#' @template verbose
#' @return \code{tbl_df} of records with success indicator
#' @examples
Expand Down Expand Up @@ -150,9 +150,9 @@ sf_delete_bulk_v1 <- function(ids, object_name,
...,
verbose = FALSE){
# allor none?
ids <- sf_input_data_validation(ids, operation='delete')
resultset <- sf_bulk_operation(input_data=ids, object_name=object_name,
operation="delete",
ids <- sf_input_data_validation(ids, operation = 'delete')
resultset <- sf_bulk_operation(input_data = ids, object_name = object_name,
operation = "delete",
api_type = "Bulk 1.0",
verbose=verbose, ...)
return(resultset)
Expand All @@ -163,8 +163,8 @@ sf_delete_bulk_v2 <- function(ids, object_name,
verbose = FALSE){
# allor none?
ids <- sf_input_data_validation(ids, operation='delete')
resultset <- sf_bulk_operation(input_data=ids, object_name=object_name,
operation="delete",
resultset <- sf_bulk_operation(input_data = ids, object_name = object_name,
operation = "delete",
api_type = "Bulk 2.0",
verbose=verbose, ...)
return(resultset)
Expand Down
20 changes: 10 additions & 10 deletions R/update.R
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#' @template object_name
#' @template all_or_none
#' @template api_type
#' @param ... Other arguments passed on to \code{\link{sf_bulk_operation}}.
#' @param ... other arguments passed on to \code{\link{sf_bulk_operation}}.
#' @template verbose
#' @return \code{tbl_df} of records with success indicator
#' @examples
Expand Down Expand Up @@ -187,12 +187,12 @@ sf_update_bulk_v1 <- function(input_data, object_name, all_or_none = FALSE,
...,
verbose = FALSE){
# allor none?
input_data <- sf_input_data_validation(operation="update", input_data)
resultset <- sf_bulk_operation(input_data=input_data,
object_name=object_name,
operation="update",
input_data <- sf_input_data_validation(operation = "update", input_data)
resultset <- sf_bulk_operation(input_data = input_data,
object_name = object_name,
operation = "update",
api_type = "Bulk 1.0",
verbose=verbose, ...)
verbose = verbose, ...)
return(resultset)
}

Expand All @@ -206,10 +206,10 @@ sf_update_bulk_v2 <- function(input_data, object_name, all_or_none = FALSE,
verbose = FALSE){
# allor none?
#The order of records in the response is not guaranteed to match the ordering of records in the original job data.
input_data <- sf_input_data_validation(operation='update', input_data)
resultset <- sf_bulk_operation(input_data=input_data,
object_name=object_name,
operation="update",
input_data <- sf_input_data_validation(operation = 'update', input_data)
resultset <- sf_bulk_operation(input_data = input_data,
object_name = object_name,
operation = "update",
api_type = "Bulk 2.0",
verbose=verbose, ...)
return(resultset)
Expand Down
Loading

0 comments on commit 087b0f2

Please sign in to comment.