Skip to content

Commit

Permalink
Add verbosity argument to req_peform_connection()
Browse files Browse the repository at this point in the history
Fixes #599
  • Loading branch information
hadley committed Jan 6, 2025
1 parent dbd27a5 commit 2dd60b4
Show file tree
Hide file tree
Showing 9 changed files with 168 additions and 28 deletions.
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# httr2 (development version)

* `req_perform_connection()` gains a `verbosity` argument, which is useful for understanding exactly how data is streamed back to you (#599).
* `curl_transform()` will now use `req_body_json_modify()` for JSON data (#258).
* `resp_stream_is_complete()` tells you if there is still data remaining to be streamed (#559).
* New `url_modify()`, `url_modify_query()`, and `url_modify_relative()` make it easier to modify an existing url (#464).
Expand Down
27 changes: 24 additions & 3 deletions R/req-perform-connection.R
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

#' Perform a request and return a streaming connection
#'
#' @description
Expand All @@ -16,7 +15,7 @@
#' than providing callbacks that the data is pushed to. This is useful if you
#' want to do other work in between handling inputs from the stream.
#'
#' @inheritParams req_perform_stream
#' @inheritParams req_perform
#' @param blocking When retrieving data, should the connection block and wait
#' for the desired information or immediately return what it has (possibly
#' nothing)?
Expand All @@ -33,10 +32,12 @@
#'
#' # Always close the response when you're done
#' close(resp)
req_perform_connection <- function(req, blocking = TRUE) {
req_perform_connection <- function(req, blocking = TRUE, verbosity = NULL) {
check_request(req)
check_bool(blocking)
# verbosity checked in req_verbosity_connection

req <- req_verbosity_connection(req, verbosity %||% httr2_verbosity())

Check warning on line 40 in R/req-perform-connection.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-connection.R#L40

Added line #L40 was not covered by tests
req <- auth_sign(req)
req_prep <- req_prepare(req)
handle <- req_handle(req_prep)
Expand Down Expand Up @@ -78,6 +79,26 @@ req_perform_connection <- function(req, blocking = TRUE) {
resp
}

# Like req_verbosity() but we want to print the streaming body when it's
# requested not when curl actually receives it
req_verbosity_connection <- function(req, verbosity, error_call = caller_env()) {
if (!is_integerish(verbosity, n = 1) || verbosity < 0 || verbosity > 3) {
cli::cli_abort("{.arg verbosity} must 0, 1, 2, or 3.", call = error_call)

Check warning on line 86 in R/req-perform-connection.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-connection.R#L85-L86

Added lines #L85 - L86 were not covered by tests
}

req <- switch(verbosity + 1,
req,
req_verbose(req),
req_verbose(req, body_req = TRUE),
req_verbose(req, body_req = TRUE, info = TRUE)
)
if (verbosity > 1) {
req <- req_policies(req, show_streaming_body = TRUE)

Check warning on line 96 in R/req-perform-connection.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-connection.R#L89-L96

Added lines #L89 - L96 were not covered by tests
}
req

Check warning on line 98 in R/req-perform-connection.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-connection.R#L98

Added line #L98 was not covered by tests
}


req_perform_connection1 <- function(req, handle, blocking = TRUE) {
stream <- curl::curl(req$url, handle = handle)

Expand Down
26 changes: 18 additions & 8 deletions R/resp-stream-aws.R
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,21 @@ resp_stream_aws <- function(resp, max_size = Inf) {
include_trailer = FALSE
)

if (!is.null(event_bytes)) {
parse_aws_event(event_bytes)
} else {
return(NULL)
if (is.null(event_bytes)) {
return()

Check warning on line 13 in R/resp-stream-aws.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream-aws.R#L12-L13

Added lines #L12 - L13 were not covered by tests
}

event <- parse_aws_event(event_bytes)
if (resp_stream_is_verbose(resp)) {

Check warning on line 17 in R/resp-stream-aws.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream-aws.R#L16-L17

Added lines #L16 - L17 were not covered by tests
# Emit header
for (key in names(event$headers)) {
cli::cat_line("<< ", key, ": ", event$headers[[key]])

Check warning on line 20 in R/resp-stream-aws.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream-aws.R#L19-L20

Added lines #L19 - L20 were not covered by tests
}
# Emit body
cli::cat_line("<< ", event$body)
cli::cat_line()

Check warning on line 24 in R/resp-stream-aws.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream-aws.R#L23-L24

Added lines #L23 - L24 were not covered by tests
}
event

Check warning on line 26 in R/resp-stream-aws.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream-aws.R#L26

Added line #L26 was not covered by tests
}

find_aws_event_boundary <- function(buffer) {
Expand Down Expand Up @@ -57,15 +67,15 @@ parse_aws_event <- function(bytes) {

# headers
headers <- list()
while(i <= 12 + header_length) {
while (i <= 12 + header_length) {
name_length <- as.integer(read_bytes(1))
name <- rawToChar(read_bytes(name_length))
type <- as.integer(read_bytes(1))

delayedAssign("length", parse_int(read_bytes(2)))
value <- switch(type_enum(type),
'TRUE' = TRUE,
'FALSE' = FALSE,
"TRUE" = TRUE,
"FALSE" = FALSE,
BYTE = parse_int(read_bytes(1)),
SHORT = parse_int(read_bytes(2)),
INTEGER = parse_int(read_bytes(4)),
Expand Down Expand Up @@ -95,7 +105,7 @@ parse_aws_event <- function(bytes) {
# Helpers ----------------------------------------------------------------

parse_int <- function(x) {
sum(as.integer(x) * 256 ^ rev(seq_along(x) - 1))
sum(as.integer(x) * 256^rev(seq_along(x) - 1))
}

parse_int64 <- function(x) {
Expand Down
67 changes: 52 additions & 15 deletions R/resp-stream.R
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,33 @@
#' @param resp,con A streaming [response] created by [req_perform_connection()].
#' @param kb How many kilobytes (1024 bytes) of data to read.
#' @order 1
#' @examples
#' req <- request(example_url()) |>
#' req_template("GET /stream/:n", n = 5)
#'
#' con <- req |> req_perform_connection()
#' while (!resp_stream_is_complete(con)) {
#' lines <- con |> resp_stream_lines(2)
#' cat(length(lines), " lines received\n", sep = "")
#' }
#' close(con)
#'
#' # You can also see what's happening by setting verbosity
#' con <- req |> req_perform_connection(verbosity = 2)
#' while (!resp_stream_is_complete(con)) {
#' lines <- con |> resp_stream_lines(2)
#' }
#' close(con)
resp_stream_raw <- function(resp, kb = 32) {
check_streaming_response(resp)
conn <- resp$body

readBin(conn, raw(), kb * 1024)
out <- readBin(conn, raw(), kb * 1024)
if (resp_stream_is_verbose(resp)) {
cli::cat_line("<< Streamed ", length(out), " bytes")
cli::cat_line()

Check warning on line 55 in R/resp-stream.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream.R#L52-L55

Added lines #L52 - L55 were not covered by tests
}
out

Check warning on line 57 in R/resp-stream.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream.R#L57

Added line #L57 was not covered by tests
}

#' @export
Expand All @@ -59,12 +81,18 @@ resp_stream_lines <- function(resp, lines = 1, max_size = Inf, warn = TRUE) {
line <- resp_stream_oneline(resp, max_size, warn, encoding)
if (length(line) == 0) {
# No more data, either because EOF or req_perform_connection(blocking=FALSE).
# Either way, return what we have
return(lines_read)
# Either way we're done
break

Check warning on line 85 in R/resp-stream.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream.R#L85

Added line #L85 was not covered by tests
}
lines_read <- c(lines_read, line)
lines <- lines - 1
}

if (resp_stream_is_verbose(resp)) {
cli::cat_line("<< ", lines_read)
cli::cat_line()

Check warning on line 93 in R/resp-stream.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream.R#L91-L93

Added lines #L91 - L93 were not covered by tests
}

lines_read
}

Expand All @@ -76,19 +104,25 @@ resp_stream_lines <- function(resp, lines = 1, max_size = Inf, warn = TRUE) {
#' @order 1
resp_stream_sse <- function(resp, max_size = Inf) {
event_bytes <- resp_boundary_pushback(resp, max_size, find_event_boundary, include_trailer = FALSE)
if (!is.null(event_bytes)) {
parse_event(event_bytes)
} else {
return(NULL)
if (is.null(event_bytes)) {
return()

Check warning on line 108 in R/resp-stream.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream.R#L107-L108

Added lines #L107 - L108 were not covered by tests
}

event <- parse_event(event_bytes)
if (resp_stream_is_verbose(resp)) {
for (key in names(event)) {
cli::cat_line("< ", key, ": ", event[[key]])

Check warning on line 114 in R/resp-stream.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream.R#L111-L114

Added lines #L111 - L114 were not covered by tests
}
cli::cat_line()

Check warning on line 116 in R/resp-stream.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream.R#L116

Added line #L116 was not covered by tests
}
event

Check warning on line 118 in R/resp-stream.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream.R#L118

Added line #L118 was not covered by tests
}

#' @export
#' @rdname resp_stream_raw
resp_stream_is_complete <- function(resp) {
check_response(resp)

!isIncomplete(resp$body)
length(resp$cache$push_back) == 0 && !isIncomplete(resp$body)

Check warning on line 125 in R/resp-stream.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream.R#L125

Added line #L125 was not covered by tests
}

#' @export
Expand Down Expand Up @@ -189,16 +223,16 @@ find_event_boundary <- function(buffer) {

boundary_end <- which(
(left1 == 0x0A & buffer == 0x0A) | # \n\n
(left1 == 0x0D & buffer == 0x0D) | # \r\r
(left3 == 0x0D & left2 == 0x0A & left1 == 0x0D & buffer == 0x0A) # \r\n\r\n
(left1 == 0x0D & buffer == 0x0D) | # \r\r
(left3 == 0x0D & left2 == 0x0A & left1 == 0x0D & buffer == 0x0A) # \r\n\r\n
)

if (length(boundary_end) == 0) {
return(NULL) # No event boundary found
return(NULL) # No event boundary found
}

boundary_end <- boundary_end[1] # Take the first occurrence
split_at <- boundary_end + 1 # Split at one after the boundary
boundary_end <- boundary_end[1] # Take the first occurrence
split_at <- boundary_end + 1 # Split at one after the boundary
split_at
}

Expand Down Expand Up @@ -324,7 +358,6 @@ parse_event <- function(event_data) {
check_streaming_response <- function(resp,
arg = caller_arg(resp),
call = caller_env()) {

check_response(resp, arg = arg, call = call)

if (resp_body_type(resp) != "stream") {
Expand Down Expand Up @@ -355,3 +388,7 @@ isValid <- function(con) {
error = function(cnd) FALSE
)
}

resp_stream_is_verbose <- function(resp) {
resp$request$policies$show_streaming_body %||% FALSE

Check warning on line 393 in R/resp-stream.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream.R#L393

Added line #L393 was not covered by tests
}
14 changes: 13 additions & 1 deletion man/req_perform_connection.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions man/resp_stream_raw.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 19 additions & 0 deletions tests/testthat/_snaps/resp-stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,22 @@
Error in `resp_stream_raw()`:
! `resp` has already been closed.

# verbosity = 2 streams request bodies

Code
stream_all(req, resp_stream_lines, 1)
Output
<< line 1
<< line 2
Code
stream_all(req, resp_stream_raw, 5 / 1024)
Output
<< Streamed 5 bytes
<< Streamed 5 bytes
<< Streamed 4 bytes

2 changes: 1 addition & 1 deletion tests/testthat/helper.R
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
testthat::set_state_inspector(function() {
getAllConnections()
list(connections = getAllConnections())
})
22 changes: 22 additions & 0 deletions tests/testthat/test-resp-stream.R
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,28 @@ test_that("streaming size limits enforced", {
)
})

test_that("verbosity = 2 streams request bodies", {
req <- local_app_request(function(req, res) {
res$send_chunk("line 1\n")
res$send_chunk("line 2\n")
})

stream_all <- function(req, fun, ...) {
con <- req_perform_connection(req, blocking = TRUE, verbosity = 2)
on.exit(close(con))
while (!resp_stream_is_complete(con)) {
fun(con, ...)
}
}
expect_snapshot(
{
stream_all(req, resp_stream_lines, 1)
stream_all(req, resp_stream_raw, 5 / 1024)
},
transform = function(lines) lines[!grepl("^(<-|->)", lines)]
)
})

test_that("has a working find_event_boundary", {
boundary_test <- function(x, matched, remaining) {
buffer <- charToRaw(x)
Expand Down

0 comments on commit 2dd60b4

Please sign in to comment.