Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Lwt_retry library #1032

Merged
merged 11 commits into from
Nov 4, 2024
11 changes: 11 additions & 0 deletions dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,17 @@
(source (github ocsigen/lwt))
(documentation "https://ocsigen.org/lwt")

(package
(name lwt_retry)
(synopsis "Utilities for retrying Lwt computations")
(authors "Shon Feder")
(maintainers
"Raphaël Proust <code@bnwr.net>"
"Shon Feder <shon.feder@gmail.com>")
(depends
(ocaml (>= 4.08))
(lwt (>= 5.3.0))))

(package
(name lwt_ppx)
(synopsis "PPX syntax for Lwt, providing something similar to async/await from JavaScript")
Expand Down
31 changes: 31 additions & 0 deletions lwt_retry.opam
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# This file is generated by dune, edit dune-project instead
opam-version: "2.0"
synopsis: "Utilities for retrying Lwt computations"
maintainer: [
"Raphaël Proust <code@bnwr.net>" "Shon Feder <shon.feder@gmail.com>"
]
authors: ["Shon Feder"]
license: "MIT"
homepage: "https://github.com/ocsigen/lwt"
doc: "https://ocsigen.org/lwt"
bug-reports: "https://github.com/ocsigen/lwt/issues"
depends: [
"dune" {>= "2.0"}
"ocaml" {>= "4.08"}
"lwt" {>= "5.3.0"}
]
build: [
["dune" "subst"] {pinned}
[
"dune"
"build"
"-p"
name
"-j"
jobs
"@install"
"@runtest" {with-test}
"@doc" {with-doc}
]
]
dev-repo: "git+https://github.com/ocsigen/lwt.git"
19 changes: 19 additions & 0 deletions src/retry/dune
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
(* -*- tuareg -*- *)

let preprocess =
match Sys.getenv "BISECT_ENABLE" with
| "yes" -> "(preprocess (pps bisect_ppx))"
| _ -> ""
| exception _ -> ""

let () = Jbuild_plugin.V1.send @@ {|

(library
(public_name lwt_retry)
(synopsis "A utility for retrying Lwt computations")
(wrapped false)
(libraries lwt lwt.unix)
|} ^ preprocess ^ {|
(flags (:standard -w +A)))

|}
67 changes: 67 additions & 0 deletions src/retry/lwt_retry.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
(* This file is part of Lwt, released under the MIT license. See LICENSE.md for
details, or visit https://github.com/ocsigen/lwt/blob/master/LICENSE.md. *)



open Lwt.Syntax

let default_sleep_duration n' =
let base_sleep_time = 2.0 in
let n = Int.to_float n' in
n *. base_sleep_time *. Float.pow 2.0 n

type ('retry, 'fatal) error =
[ `Retry of 'retry
| `Fatal of 'fatal
]

let pp_opaque fmt _ = Format.fprintf fmt "<opaque>"

let pp_error ?(retry = pp_opaque) ?(fatal = pp_opaque) fmt err =
match err with
| `Retry r -> Format.fprintf fmt "`Retry %a" retry r
| `Fatal f -> Format.fprintf fmt "`Fatal %a" fatal f

let equal_error ~retry ~fatal a b =
match a, b with
| `Retry a', `Retry b' -> retry a' b'
| `Fatal a', `Fatal b' -> fatal a' b'
| _ -> false

type ('ok, 'retry, 'fatal) attempt = ('ok, ('retry, 'fatal) error * int) result

let on_error
(f : unit -> ('ok, ('retry, 'fatal) error) result Lwt.t)
: ('ok, 'retry, 'fatal) attempt Lwt_stream.t
=
let i = ref 0 in
let stop = ref false in
Lwt_stream.from begin fun () ->
incr i;
if !stop then
Lwt.return None
else
let+ result = f () in
match result with
| Error (`Retry _ as retry) -> Some (Error (retry, !i))
| Error (`Fatal _ as fatal) -> stop := true; Some (Error (fatal, !i))
| Ok _ as ok -> stop := true; Some ok
end

let with_sleep ?(duration=default_sleep_duration) (attempts : _ attempt Lwt_stream.t) : _ attempt Lwt_stream.t =
attempts
|> Lwt_stream.map_s begin function
| Ok _ as ok -> Lwt.return ok
| Error (_, n) as err ->
let* () = Lwt_unix.sleep @@ duration n in
Lwt.return err
end

let n_times n attempts =
if n < 0 then invalid_arg "Lwt_retry.n_times: n must be non-negative";
(* The first attempt is a try, and re-tries start counting from n + 1 *)
let retries = n + 1 in
let+ attempts = Lwt_stream.nget retries attempts in
match List.rev attempts with
| last :: _ -> last
| _ -> failwith "Lwt_retry.n_times: impossible"
157 changes: 157 additions & 0 deletions src/retry/lwt_retry.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
(* This file is part of Lwt, released under the MIT license. See LICENSE.md for
details, or visit https://github.com/ocsigen/lwt/blob/master/LICENSE.md. *)



(** Utilities for retrying Lwt computations

These utilities are useful for dealing with failure-prone computations that
are expected to succeed after some number of repeated attempts. E.g.,

{[
let flaky_computation () = match try_to_get_resource () with
| Flaky_error msg -> Error (`Retry msg)
| Fatal_error err -> Error (`Fatal err)
| Success result -> Ok result

let error_tolerant_computation () =
Lwt_retry.(flaky_computation
|> on_error (* Retry when [`Retry]able results are produced. *)
|> with_sleep (* Add a delay between attempts, with an exponential backoff. *)
|> n_times 10 (* Try up to 10 times, so long as errors are retryable. *)
)
]}

This library provides a few combinators, but retry attempts are produced on
demand in an {!type:Lwt_stream.t}, and they can be consumed and traversed
using the {!module:Lwt_stream} functions directly. *)

type ('retry, 'fatal) error =
[ `Retry of 'retry
| `Fatal of 'fatal
]
(** The type of errors that a retryable computation can produce.

- [`Retry r] when [r] represents an error that can be retried.
- [`Fatal f] when [f] represents an error that cannot be retried. *)

type ('ok, 'retry, 'fatal) attempt = ('ok, ('retry, 'fatal) error * int) result
(** A [('ok, 'retry, 'fatal) attempt] is the [result] of a retryable computation,
with its the erroneous results enumerated.

- [Ok v] is a successfully computed value [v]
- [Error (err, n)] is the {!type:error} [err] produced on the [n]th
attempt

The enumeration of attempts is 1-based, because making 0 attempts means
making no attempts all, making 1 attempt means {i trying} once, and (when
[i>0]) making [n] attempts means trying once and then {i retrying} up to
[n-1] times. *)

val pp_error :
?retry:(Format.formatter -> 'retry -> unit) ->
?fatal:(Format.formatter -> 'fatal -> unit) ->
Format.formatter -> ('retry, 'fatal) error -> unit
(** [pp_error ~retry ~fatal] is a pretty printer for {!type:error}s that formats
fatal and retryable errors according to the provided printers.

If a printers is not provided, values of the type are represented as
["<opaque>"]. *)

val equal_error :
retry:('retry -> 'retry -> bool) ->
fatal:('fatal -> 'fatal -> bool) ->
('retry, 'fatal) error ->
('retry, 'fatal) error ->
bool

val on_error :
(unit -> ('ok, ('retry, 'fatal) error) result Lwt.t) ->
('ok, 'retry, 'fatal) attempt Lwt_stream.t
(** [Lwt_retry.on_error f] is a stream of attempts to compute [f], with attempts
made on demand. Attempts will be added to the stream when results are
requested until the computation either succeeds or produces a fatal error.

Examples

{[
# let success () = Lwt.return_ok ();;
val success : unit -> (unit, 'a) result Lwt.t = <fun>
# Lwt_retry.(success |> on_error) |> Lwt_stream.to_list;;
- : (unit, 'a, 'b) Lwt_retry.attempt list = [Ok ()]

# let fatal_failure () = Lwt.return_error (`Fatal ());;
val fatal_failure : unit -> ('a, [> `Fatal of unit ]) result Lwt.t = <fun>
# Lwt_retry.(fatal_failure |> on_error) |> Lwt_stream.to_list;;
- : ('a, 'b, unit) Lwt_retry.attempt list = [Error (`Fatal (), 1)]

# let retryable_error () = Lwt.return_error (`Retry ());;
val retryable_error : unit -> ('a, [> `Retry of unit ]) result Lwt.t = <fun>
# Lwt_retry.(retryable_error |> on_error) |> Lwt_stream.nget 3;;
- : ('a, unit, 'b) Lwt_retry.attempt list =
[Error (`Retry (), 1); Error (`Retry (), 2); Error (`Retry (), 3)]
]}*)

val with_sleep :
?duration:(int -> float) ->
('ok, 'retry, 'fatal) attempt Lwt_stream.t ->
('ok, 'retry, 'fatal) attempt Lwt_stream.t
(** [with_sleep ~duration attempts] is the stream of [attempts] with a sleep of
[duration n] seconds added before computing each [n]th retryable attempt.

@param duration the optional sleep duration calculation, defaulting to
{!val:default_sleep_duration}.

Examples

{[
# let f () = Lwt.return_error (`Retry ());;
# let attempts_with_sleeps = Lwt_retry.(f |> on_error |> with_sleep);;

# Lwt_stream.get attempts_with_sleeps;;
(* computed immediately *)
Some (Error (`Retry (), 1))

# Lwt_stream.get attempts_with_sleeps;;
(* computed after 3 seconds *)
Some (Error (`Retry (), 2))

# Lwt_stream.get attempts_with_sleeps;;
(* computed after 9 seconds *)
Some (Error (`Retry (), 3))

(* a stream with a constant 1s sleep between attempts *)
# let attempts_with_constant_sleeps =
Lwt_retry.(f |> on_error |> with_sleep ~duration:(fun _ -> 1.0));;
]} *)

val default_sleep_duration : int -> float
(** [default_sleep_duration n] is an exponential backoff computed as [n] * 2 *
(2 ^ [n]), which gives the sequence [ [0.; 4.; 16.; 48.; 128.; 320.; 768.;
1792.; ...] ]. *)

val n_times :
int ->
('ok, 'retry, 'fatal) attempt Lwt_stream.t ->
('ok, 'retry, 'fatal) attempt Lwt.t
(** [n_times n attempts] is [Ok v] if one of the [attempts] succeeds within [n]
retries (or [n+1] attempts), [Error (`Fatal f, n+1)] if any of the attempts
results in the fatal error, or [Error (`Retry r, n+1)] if all [n] retries are
exhausted and the [n+1]th attempt results in a retry error.

In particular [n_times 0 attempts] will *try* 1 attempt but *re-try* 0, so
it is guaranteed to produce some result.
shonfeder marked this conversation as resolved.
Show resolved Hide resolved

[n_times] forces up to [n] elements of the on-demand stream of attempts.

Examples

{[
# let f () =
let i = ref 0 in
fun () -> Lwt.return_error (if !i < 3 then (incr i; `Retry ()) else `Fatal "error!");;
# Lwt_retry.(f () |> on_error |> n_times 0);;
Error (`Retry (), 1)
# Lwt_retry.(f () |> on_error |> n_times 4);;
Error (`Fatal "error!", 3)
]} *)
4 changes: 4 additions & 0 deletions test/retry/dune
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
(test
(name main)
(package lwt_retry)
(libraries lwttester lwt_retry))
Loading
Loading