From 6be835dbf68b68deeacf30c2d7c2225815ca55d1 Mon Sep 17 00:00:00 2001 From: Bagaev Dmitry Date: Thu, 16 Jun 2022 15:14:42 +0300 Subject: [PATCH 1/6] feat: add `error_if*` operators --- docs/make.jl | 5 +- docs/src/operators/all.md | 2 + docs/src/operators/errors/about.md | 2 + docs/src/operators/errors/error_if.md | 9 ++ docs/src/operators/errors/error_if_not.md | 9 ++ src/Rocket.jl | 2 + src/operators/error_if.jl | 114 ++++++++++++++++++ src/operators/error_if_not.jl | 37 ++++++ test/operators/test_operator_error_if.jl | 119 +++++++++++++++++++ test/operators/test_operator_error_if_not.jl | 114 ++++++++++++++++++ test/runtests.jl | 2 + 11 files changed, 414 insertions(+), 1 deletion(-) create mode 100644 docs/src/operators/errors/error_if.md create mode 100644 docs/src/operators/errors/error_if_not.md create mode 100644 src/operators/error_if.jl create mode 100644 src/operators/error_if_not.jl create mode 100644 test/operators/test_operator_error_if.jl create mode 100644 test/operators/test_operator_error_if_not.jl diff --git a/docs/make.jl b/docs/make.jl index d2dc112ed..6735874fc 100644 --- a/docs/make.jl +++ b/docs/make.jl @@ -2,6 +2,7 @@ using Documenter, Rocket makedocs( modules = [ Rocket ], + strict = [ :doctest, :eval_block, :example_block, :meta_block, :parse_error, :setup_block ], clean = true, sitename = "Rocket.jl", pages = [ @@ -101,7 +102,9 @@ makedocs( "Error handling" => [ "About error handling operators" => "operators/errors/about.md", "catch_error" => "operators/errors/catch_error.md", - "rerun" => "operators/errors/rerun.md" + "rerun" => "operators/errors/rerun.md", + "error_if" => "operators/errors/error_if.md", + "`error_if_not`" => "operators/errors/error_if_not.md", ], "Join" => [ "with_latest" => "operators/join/with_latest.md" diff --git a/docs/src/operators/all.md b/docs/src/operators/all.md index a5be2c1de..a05d3c6f3 100644 --- a/docs/src/operators/all.md +++ b/docs/src/operators/all.md @@ -71,6 +71,8 @@ There are operators for different purposes, and they may be categorized as: crea - [catch_error](@ref operator_catch_error) - [rerun](@ref operator_rerun) +- [error_if](@ref operator_error_if) +- [`error_if_not`](@ref operator_error_if_not) ## Join operator diff --git a/docs/src/operators/errors/about.md b/docs/src/operators/errors/about.md index 0bd2da9a9..7eee5cc11 100644 --- a/docs/src/operators/errors/about.md +++ b/docs/src/operators/errors/about.md @@ -4,6 +4,8 @@ There are operators for different purposes, and they may be categorized as: crea - [catch_error](@ref operator_catch_error) - [rerun](@ref operator_rerun) +- [error_if](@ref operator_error_if) +- [`error_if_not`](@ref operator_error_if_not) # See also diff --git a/docs/src/operators/errors/error_if.md b/docs/src/operators/errors/error_if.md new file mode 100644 index 000000000..eb9fb37b2 --- /dev/null +++ b/docs/src/operators/errors/error_if.md @@ -0,0 +1,9 @@ +# [Error If Operator](@id operator_error_if) + +```@docs +error_if +``` + +## See also + +[Operators](@ref what_are_operators) diff --git a/docs/src/operators/errors/error_if_not.md b/docs/src/operators/errors/error_if_not.md new file mode 100644 index 000000000..bfbd81800 --- /dev/null +++ b/docs/src/operators/errors/error_if_not.md @@ -0,0 +1,9 @@ +# [Error If Not Operator](@id operator_error_if_not) + +```@docs +error_if_not +``` + +## See also + +[Operators](@ref what_are_operators) diff --git a/src/Rocket.jl b/src/Rocket.jl index f01cd38f5..1943d474a 100644 --- a/src/Rocket.jl +++ b/src/Rocket.jl @@ -113,6 +113,8 @@ include("operators/safe.jl") include("operators/noop.jl") include("operators/default_if_empty.jl") include("operators/error_if_empty.jl") +include("operators/error_if.jl") +include("operators/error_if_not.jl") include("operators/debounce_time.jl") include("operators/skip_next.jl") include("operators/skip_error.jl") diff --git a/src/operators/error_if.jl b/src/operators/error_if.jl new file mode 100644 index 000000000..6ec35c665 --- /dev/null +++ b/src/operators/error_if.jl @@ -0,0 +1,114 @@ +export error_if + +import Base: show + +""" + error_if(checkFn, errorFn) + +Creates an `error_if` operator, which performs a check for every emission on the source Observable with `checkFn`. +If `checkFn` returns `true`, the operator sends an `error` event and unsubscribes from the observable. + +# Arguments +- `checkFn`: check function with `(data) -> Bool` signature +- `errorFn`: error object generating function with `(data) -> Any` signature, optional + +# Producing + +Stream of type `<: Subscribable{L}` where `L` refers to type of source stream + +# Examples +```jldoctest +using Rocket + +source = from([1, 2, 3]) |> error_if((data) -> data > 2, (data) -> "CustomError") + +subscription = subscribe!(source, lambda( + on_next = (d) -> println("Next: ", d), + on_error = (e) -> println("Error: ", e), + on_complete = () -> println("Completed") +)); + +# output +Next: 1 +Next: 2 +Error: CustomError +``` + +See also: [`error_if_not`](@ref), [`error_if_empty`](@ref), [`default_if_empty`](@ref), [`lambda`](@ref) +""" +error_if(checkFn::F, errorFn::E = nothing) where { F, E } = ErrorIfOperator{F, E}(checkFn, errorFn) + +struct ErrorIfOperator{F, E} <: InferableOperator + checkFn :: F + errorFn :: E +end + +function on_call!(::Type{L}, ::Type{L}, operator::ErrorIfOperator{F, E}, source) where { L, F, E } + return proxy(L, source, ErrorIfProxy{F, E}(operator.checkFn, operator.errorFn)) +end + +operator_right(::ErrorIfOperator, ::Type{L}) where L = L + +struct ErrorIfProxy{F, E} <: ActorSourceProxy + checkFn :: F + errorFn :: E +end + +actor_proxy!(::Type{L}, proxy::ErrorIfProxy{F, E}, actor::A) where { L, A, F, E } = ErrorIfActor{L, A, F, E}(proxy.checkFn, proxy.errorFn, actor, false, voidTeardown) +source_proxy!(::Type{L}, proxy::ErrorIfProxy, source::S) where { L, S } = ErrorIfSource{L, S}(source) + +mutable struct ErrorIfActor{L, A, F, E} <: Actor{L} + checkFn :: F + errorFn :: E + actor :: A + completed::Bool + subscription +end + +error_msg(actor::ErrorIfActor, data) = error_msg(actor, actor.errorFn, data) + +error_msg(::ErrorIfActor, ::Nothing, data) = "`error_if` operator check failed for data $(data)" +error_msg(::ErrorIfActor, callback, data) = callback(data) + +function on_next!(actor::ErrorIfActor{L}, data::L) where L + if !actor.completed + check = actor.checkFn(data) + if check + error!(actor, error_msg(actor, data)) + else + next!(actor.actor, data) + end + end +end + +function on_error!(actor::ErrorIfActor, err) + if !actor.completed + actor.completed = true + unsubscribe!(actor.subscription) + error!(actor.actor, err) + end +end + +function on_complete!(actor::ErrorIfActor) + if !actor.completed + actor.completed = true + complete!(actor.actor) + end +end + +@subscribable struct ErrorIfSource{L, S} <: Subscribable{L} + source :: S +end + +function on_subscribe!(source::ErrorIfSource, actor::ErrorIfActor) + subscription = subscribe!(source.source, actor) + if !actor.completed + actor.subscription = subscription + end + return subscription +end + +Base.show(io::IO, ::ErrorIfOperator) = print(io, "ErrorIfOperator()") +Base.show(io::IO, ::ErrorIfProxy) = print(io, "ErrorIfProxy()") +Base.show(io::IO, ::ErrorIfActor{L}) where L = print(io, "ErrorIfActor($L)") +Base.show(io::IO, ::ErrorIfSource{L}) where L = print(io, "ErrorIfSource($L)") diff --git a/src/operators/error_if_not.jl b/src/operators/error_if_not.jl new file mode 100644 index 000000000..aa948b4b7 --- /dev/null +++ b/src/operators/error_if_not.jl @@ -0,0 +1,37 @@ +export error_if_not + +""" + error_if_not(checkFn, errorFn) + +Creates an `error_if_not` operator, which performs a check for every emission on the source Observable with `checkFn`. +If `checkFn` returns `false`, the operator sends an `error` event and unsubscribes from the observable. + +Note: `error_if_not` is an alias for `error_if` operator with inverted `checkFn`. + +# Arguments +- `checkFn`: check function with `(data) -> Bool` signature +- `errorFn`: error object generating function with `(data) -> Any` signature, optional + +# Producing + +Stream of type `<: Subscribable{L}` where `L` refers to type of source stream + +# Examples +```jldoctest +using Rocket + +source = from([1, 2, 3]) |> error_if_not((data) -> data < 2, (data) -> "CustomError") + +subscription = subscribe!(source, lambda( + on_next = (d) -> println("Next: ", d), + on_error = (e) -> println("Error: ", e), + on_complete = () -> println("Completed") +)); + +Next: 1 +Error: CustomError +``` + +See also: [`error_if`](@ref), [`error_if_empty`](@ref), [`default_if_empty`](@ref), [`logger`](@ref) +""" +error_if_not(checkFn::F, errorFn::E = nothing) where { F, E } = error_if((d) -> !checkFn(d), errorFn) \ No newline at end of file diff --git a/test/operators/test_operator_error_if.jl b/test/operators/test_operator_error_if.jl new file mode 100644 index 000000000..3ce8d10fd --- /dev/null +++ b/test/operators/test_operator_error_if.jl @@ -0,0 +1,119 @@ +module RocketErrorIfOperatorTest + +using Test +using Rocket + +include("../test_helpers.jl") + +@testset "operator: error_if()" begin + + println("Testing: operator error_if()") + + run_proxyshowcheck("ErrorIf", error_if((d) -> false)) + + run_testset([ + ( + source = from(1:42) |> error_if((d) -> d > 2), + values = @ts([ 1, 2, e ]), + source_type = Int + ), + ( + source = from(1:42) |> error_if((d) -> d > 2, (d) -> "Error $d"), + values = @ts([ 1, 2, e("Error 3") ]), + source_type = Int + ), + ( + source = from(1:42) |> error_if((d) -> d > 5, (d) -> "Error $d"), + values = @ts([ 1, 2, 3, 4, 5, e("Error 6") ]), + source_type = Int + ), + ( + source = from(1:42) |> error_if((d) -> d > 2) |> catch_error((err, obs) -> of(3)), + values = @ts([ 1, 2, 3, c ]), + source_type = Int + ), + ( + source = completed(Int) |> error_if((d) -> d > 2), + values = @ts(c), + source_type = Int + ), + ( + source = faulted(String, "e") |> error_if((d) -> d > 2), + values = @ts(e("e")), + source_type = String + ), + ( + source = never() |> error_if((d) -> d > 2), + values = @ts() + ) + ]) + + @testset "`error_if` should unsubscribe on check fail" begin + source = Subject(Int) + + events = [] + + subscription = subscribe!(source |> error_if((d) -> d > 2), lambda( + on_next = (d) -> push!(events, d), + on_error = (e) -> push!(events, "Error"), + on_complete = () -> push!(events, "Completed") + )) + + @test events == [ ] + + next!(source, 1) + + @test events == [ 1 ] + + next!(source, 2) + + @test events == [ 1, 2 ] + + next!(source, 3) + + @test events == [ 1, 2, "Error" ] + + next!(source, 4) + + @test events == [ 1, 2, "Error" ] + + unsubscribe!(subscription) + + @test events == [ 1, 2, "Error" ] + end + + @testset "`error_if` should not check anything after unsubscription" begin + source = Subject(Int) + + events = [] + + subscription = subscribe!(source |> error_if((d) -> d > 2), lambda( + on_next = (d) -> push!(events, d), + on_error = (e) -> push!(events, "Error"), + on_complete = () -> push!(events, "Completed") + )) + + @test events == [ ] + + next!(source, 1) + + @test events == [ 1 ] + + next!(source, 2) + + @test events == [ 1, 2 ] + + unsubscribe!(subscription) + + next!(source, 3) + + @test events == [ 1, 2 ] + + next!(source, 4) + + @test events == [ 1, 2 ] + end + +end + +end diff --git a/test/operators/test_operator_error_if_not.jl b/test/operators/test_operator_error_if_not.jl new file mode 100644 index 000000000..86b06807b --- /dev/null +++ b/test/operators/test_operator_error_if_not.jl @@ -0,0 +1,114 @@ +module RocketErrorIfNotperatorTest + +using Test +using Rocket + +include("../test_helpers.jl") + +@testset "operator: error_if_not()" begin + + println("Testing: operator error_if_not()") + + run_proxyshowcheck("ErrorIf", error_if_not((d) -> true)) + + run_testset([ + ( + source = from(1:42) |> error_if_not((d) -> d <= 2), + values = @ts([ 1, 2, e ]), + source_type = Int + ), + ( + source = from(1:42) |> error_if_not((d) -> d <= 2, (d) -> "Error $d"), + values = @ts([ 1, 2, e("Error 3") ]), + source_type = Int + ), + ( + source = from(1:42) |> error_if_not((d) -> d <= 2) |> catch_error((err, obs) -> of(3)), + values = @ts([ 1, 2, 3, c ]), + source_type = Int + ), + ( + source = completed(Int) |> error_if_not((d) -> d <= 2), + values = @ts(c), + source_type = Int + ), + ( + source = faulted(String, "e") |> error_if_not((d) -> d <= 2), + values = @ts(e("e")), + source_type = String + ), + ( + source = never() |> error_if_not((d) -> d <= 2), + values = @ts() + ) + ]) + + @testset "`error_if` should unsubscribe on check fail" begin + source = Subject(Int) + + events = [] + + subscription = subscribe!(source |> error_if_not((d) -> d <= 2), lambda( + on_next = (d) -> push!(events, d), + on_error = (e) -> push!(events, "Error"), + on_complete = () -> push!(events, "Completed") + )) + + @test events == [ ] + + next!(source, 1) + + @test events == [ 1 ] + + next!(source, 2) + + @test events == [ 1, 2 ] + + next!(source, 3) + + @test events == [ 1, 2, "Error" ] + + next!(source, 4) + + @test events == [ 1, 2, "Error" ] + + unsubscribe!(subscription) + + @test events == [ 1, 2, "Error" ] + end + + @testset "`error_if` should not check anything after unsubscription" begin + source = Subject(Int) + + events = [] + + subscription = subscribe!(source |> error_if_not((d) -> d <= 2), lambda( + on_next = (d) -> push!(events, d), + on_error = (e) -> push!(events, "Error"), + on_complete = () -> push!(events, "Completed") + )) + + @test events == [ ] + + next!(source, 1) + + @test events == [ 1 ] + + next!(source, 2) + + @test events == [ 1, 2 ] + + unsubscribe!(subscription) + + next!(source, 3) + + @test events == [ 1, 2 ] + + next!(source, 4) + + @test events == [ 1, 2 ] + end + +end + +end diff --git a/test/runtests.jl b/test/runtests.jl index 3484acd37..2b136bda6 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -87,6 +87,8 @@ doctest(Rocket) include("./operators/test_operator_ref_count.jl") include("./operators/test_operator_default_if_empty.jl") include("./operators/test_operator_error_if_empty.jl") + include("./operators/test_operator_error_if.jl") + include("./operators/test_operator_error_if_not.jl") include("./operators/test_operator_skip_next.jl") include("./operators/test_operator_skip_error.jl") include("./operators/test_operator_skip_complete.jl") From 60e44a2166709f3608555663f454d01bc42d3103 Mon Sep 17 00:00:00 2001 From: Bagaev Dmitry Date: Thu, 16 Jun 2022 15:23:10 +0300 Subject: [PATCH 2/6] Update error_if_not.md --- docs/src/operators/errors/error_if_not.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/src/operators/errors/error_if_not.md b/docs/src/operators/errors/error_if_not.md index bfbd81800..12f046c33 100644 --- a/docs/src/operators/errors/error_if_not.md +++ b/docs/src/operators/errors/error_if_not.md @@ -1,5 +1,7 @@ # [Error If Not Operator](@id operator_error_if_not) +Note: `error_if_not` is an alias for `error_if` operator with inverted `checkFn`. + ```@docs error_if_not ``` From d76eacaa6d0d1cd6ffa65e3cf939ca998628b92d Mon Sep 17 00:00:00 2001 From: Bagaev Dmitry Date: Thu, 16 Jun 2022 15:30:07 +0300 Subject: [PATCH 3/6] doc: fix doctest --- src/operators/error_if_not.jl | 1 + 1 file changed, 1 insertion(+) diff --git a/src/operators/error_if_not.jl b/src/operators/error_if_not.jl index aa948b4b7..4ed60d7d1 100644 --- a/src/operators/error_if_not.jl +++ b/src/operators/error_if_not.jl @@ -28,6 +28,7 @@ subscription = subscribe!(source, lambda( on_complete = () -> println("Completed") )); +# output Next: 1 Error: CustomError ``` From 25f6f8dcf8135f25215ff3f0d50d3469fc82efff Mon Sep 17 00:00:00 2001 From: Bagaev Dmitry Date: Thu, 16 Jun 2022 15:48:24 +0300 Subject: [PATCH 4/6] Update jldocs output --- src/operators/error_if.jl | 3 ++- src/operators/error_if_not.jl | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/operators/error_if.jl b/src/operators/error_if.jl index 6ec35c665..6db5b6505 100644 --- a/src/operators/error_if.jl +++ b/src/operators/error_if.jl @@ -26,7 +26,8 @@ subscription = subscribe!(source, lambda( on_next = (d) -> println("Next: ", d), on_error = (e) -> println("Error: ", e), on_complete = () -> println("Completed") -)); +)) +; # output Next: 1 diff --git a/src/operators/error_if_not.jl b/src/operators/error_if_not.jl index 4ed60d7d1..499b04130 100644 --- a/src/operators/error_if_not.jl +++ b/src/operators/error_if_not.jl @@ -26,7 +26,8 @@ subscription = subscribe!(source, lambda( on_next = (d) -> println("Next: ", d), on_error = (e) -> println("Error: ", e), on_complete = () -> println("Completed") -)); +)) +; # output Next: 1 From 546e2c2f20ff82968a97766f1cffebfe7c79b55a Mon Sep 17 00:00:00 2001 From: Bagaev Dmitry Date: Thu, 16 Jun 2022 16:01:48 +0300 Subject: [PATCH 5/6] Fix strict argument for old versions of Julia --- docs/make.jl | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/docs/make.jl b/docs/make.jl index 6735874fc..61e92fe83 100644 --- a/docs/make.jl +++ b/docs/make.jl @@ -1,8 +1,14 @@ using Documenter, Rocket +const is_strict = @static if VERSION >= v"1.6" + [ :doctest, :eval_block, :example_block, :meta_block, :parse_error, :setup_block ] +else + false +end + makedocs( modules = [ Rocket ], - strict = [ :doctest, :eval_block, :example_block, :meta_block, :parse_error, :setup_block ], + strict = is_strict, clean = true, sitename = "Rocket.jl", pages = [ From c7f034c7759ff06f1dbc9854aa54d45fafcf5ae0 Mon Sep 17 00:00:00 2001 From: Bagaev Dmitry Date: Thu, 16 Jun 2022 16:04:19 +0300 Subject: [PATCH 6/6] Bump documenter version --- docs/Project.toml | 2 +- docs/make.jl | 8 +------- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/docs/Project.toml b/docs/Project.toml index 79a06bb55..4e303130b 100644 --- a/docs/Project.toml +++ b/docs/Project.toml @@ -3,4 +3,4 @@ Documenter = "e30172f5-a6a5-5a46-863b-614d45cd2de4" Rocket = "df971d30-c9d6-4b37-b8ff-e965b2cb3a40" [compat] -Documenter = "0.25" +Documenter = "0.27" diff --git a/docs/make.jl b/docs/make.jl index 61e92fe83..6735874fc 100644 --- a/docs/make.jl +++ b/docs/make.jl @@ -1,14 +1,8 @@ using Documenter, Rocket -const is_strict = @static if VERSION >= v"1.6" - [ :doctest, :eval_block, :example_block, :meta_block, :parse_error, :setup_block ] -else - false -end - makedocs( modules = [ Rocket ], - strict = is_strict, + strict = [ :doctest, :eval_block, :example_block, :meta_block, :parse_error, :setup_block ], clean = true, sitename = "Rocket.jl", pages = [