Skip to content

Commit

Permalink
Merge pull request #26 from biaslab/dev-error-if-operator
Browse files Browse the repository at this point in the history
feat: add error_if_* operators
  • Loading branch information
bvdmitri authored Jun 16, 2022
2 parents fe134e9 + c7f034c commit 07cf554
Show file tree
Hide file tree
Showing 12 changed files with 420 additions and 2 deletions.
2 changes: 1 addition & 1 deletion docs/Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ Documenter = "e30172f5-a6a5-5a46-863b-614d45cd2de4"
Rocket = "df971d30-c9d6-4b37-b8ff-e965b2cb3a40"

[compat]
Documenter = "0.25"
Documenter = "0.27"
5 changes: 4 additions & 1 deletion docs/make.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ using Documenter, Rocket

makedocs(
modules = [ Rocket ],
strict = [ :doctest, :eval_block, :example_block, :meta_block, :parse_error, :setup_block ],
clean = true,
sitename = "Rocket.jl",
pages = [
Expand Down Expand Up @@ -101,7 +102,9 @@ makedocs(
"Error handling" => [
"About error handling operators" => "operators/errors/about.md",
"catch_error" => "operators/errors/catch_error.md",
"rerun" => "operators/errors/rerun.md"
"rerun" => "operators/errors/rerun.md",
"error_if" => "operators/errors/error_if.md",
"`error_if_not`" => "operators/errors/error_if_not.md",
],
"Join" => [
"with_latest" => "operators/join/with_latest.md"
Expand Down
2 changes: 2 additions & 0 deletions docs/src/operators/all.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ There are operators for different purposes, and they may be categorized as: crea

- [catch_error](@ref operator_catch_error)
- [rerun](@ref operator_rerun)
- [error_if](@ref operator_error_if)
- [`error_if_not`](@ref operator_error_if_not)

## Join operator

Expand Down
2 changes: 2 additions & 0 deletions docs/src/operators/errors/about.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ There are operators for different purposes, and they may be categorized as: crea

- [catch_error](@ref operator_catch_error)
- [rerun](@ref operator_rerun)
- [error_if](@ref operator_error_if)
- [`error_if_not`](@ref operator_error_if_not)

# See also

Expand Down
9 changes: 9 additions & 0 deletions docs/src/operators/errors/error_if.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# [Error If Operator](@id operator_error_if)

```@docs
error_if
```

## See also

[Operators](@ref what_are_operators)
11 changes: 11 additions & 0 deletions docs/src/operators/errors/error_if_not.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# [Error If Not Operator](@id operator_error_if_not)

Note: `error_if_not` is an alias for `error_if` operator with inverted `checkFn`.

```@docs
error_if_not
```

## See also

[Operators](@ref what_are_operators)
2 changes: 2 additions & 0 deletions src/Rocket.jl
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ include("operators/safe.jl")
include("operators/noop.jl")
include("operators/default_if_empty.jl")
include("operators/error_if_empty.jl")
include("operators/error_if.jl")
include("operators/error_if_not.jl")
include("operators/debounce_time.jl")
include("operators/skip_next.jl")
include("operators/skip_error.jl")
Expand Down
115 changes: 115 additions & 0 deletions src/operators/error_if.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
export error_if

import Base: show

"""
error_if(checkFn, errorFn)
Creates an `error_if` operator, which performs a check for every emission on the source Observable with `checkFn`.
If `checkFn` returns `true`, the operator sends an `error` event and unsubscribes from the observable.
# Arguments
- `checkFn`: check function with `(data) -> Bool` signature
- `errorFn`: error object generating function with `(data) -> Any` signature, optional
# Producing
Stream of type `<: Subscribable{L}` where `L` refers to type of source stream
# Examples
```jldoctest
using Rocket
source = from([1, 2, 3]) |> error_if((data) -> data > 2, (data) -> "CustomError")
subscription = subscribe!(source, lambda(
on_next = (d) -> println("Next: ", d),
on_error = (e) -> println("Error: ", e),
on_complete = () -> println("Completed")
))
;
# output
Next: 1
Next: 2
Error: CustomError
```
See also: [`error_if_not`](@ref), [`error_if_empty`](@ref), [`default_if_empty`](@ref), [`lambda`](@ref)
"""
error_if(checkFn::F, errorFn::E = nothing) where { F, E } = ErrorIfOperator{F, E}(checkFn, errorFn)

struct ErrorIfOperator{F, E} <: InferableOperator
checkFn :: F
errorFn :: E
end

function on_call!(::Type{L}, ::Type{L}, operator::ErrorIfOperator{F, E}, source) where { L, F, E }
return proxy(L, source, ErrorIfProxy{F, E}(operator.checkFn, operator.errorFn))
end

operator_right(::ErrorIfOperator, ::Type{L}) where L = L

struct ErrorIfProxy{F, E} <: ActorSourceProxy
checkFn :: F
errorFn :: E
end

actor_proxy!(::Type{L}, proxy::ErrorIfProxy{F, E}, actor::A) where { L, A, F, E } = ErrorIfActor{L, A, F, E}(proxy.checkFn, proxy.errorFn, actor, false, voidTeardown)
source_proxy!(::Type{L}, proxy::ErrorIfProxy, source::S) where { L, S } = ErrorIfSource{L, S}(source)

mutable struct ErrorIfActor{L, A, F, E} <: Actor{L}
checkFn :: F
errorFn :: E
actor :: A
completed::Bool
subscription
end

error_msg(actor::ErrorIfActor, data) = error_msg(actor, actor.errorFn, data)

error_msg(::ErrorIfActor, ::Nothing, data) = "`error_if` operator check failed for data $(data)"
error_msg(::ErrorIfActor, callback, data) = callback(data)

function on_next!(actor::ErrorIfActor{L}, data::L) where L
if !actor.completed
check = actor.checkFn(data)
if check
error!(actor, error_msg(actor, data))
else
next!(actor.actor, data)
end
end
end

function on_error!(actor::ErrorIfActor, err)
if !actor.completed
actor.completed = true
unsubscribe!(actor.subscription)
error!(actor.actor, err)
end
end

function on_complete!(actor::ErrorIfActor)
if !actor.completed
actor.completed = true
complete!(actor.actor)
end
end

@subscribable struct ErrorIfSource{L, S} <: Subscribable{L}
source :: S
end

function on_subscribe!(source::ErrorIfSource, actor::ErrorIfActor)
subscription = subscribe!(source.source, actor)
if !actor.completed
actor.subscription = subscription
end
return subscription
end

Base.show(io::IO, ::ErrorIfOperator) = print(io, "ErrorIfOperator()")
Base.show(io::IO, ::ErrorIfProxy) = print(io, "ErrorIfProxy()")
Base.show(io::IO, ::ErrorIfActor{L}) where L = print(io, "ErrorIfActor($L)")
Base.show(io::IO, ::ErrorIfSource{L}) where L = print(io, "ErrorIfSource($L)")
39 changes: 39 additions & 0 deletions src/operators/error_if_not.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
export error_if_not

"""
error_if_not(checkFn, errorFn)
Creates an `error_if_not` operator, which performs a check for every emission on the source Observable with `checkFn`.
If `checkFn` returns `false`, the operator sends an `error` event and unsubscribes from the observable.
Note: `error_if_not` is an alias for `error_if` operator with inverted `checkFn`.
# Arguments
- `checkFn`: check function with `(data) -> Bool` signature
- `errorFn`: error object generating function with `(data) -> Any` signature, optional
# Producing
Stream of type `<: Subscribable{L}` where `L` refers to type of source stream
# Examples
```jldoctest
using Rocket
source = from([1, 2, 3]) |> error_if_not((data) -> data < 2, (data) -> "CustomError")
subscription = subscribe!(source, lambda(
on_next = (d) -> println("Next: ", d),
on_error = (e) -> println("Error: ", e),
on_complete = () -> println("Completed")
))
;
# output
Next: 1
Error: CustomError
```
See also: [`error_if`](@ref), [`error_if_empty`](@ref), [`default_if_empty`](@ref), [`logger`](@ref)
"""
error_if_not(checkFn::F, errorFn::E = nothing) where { F, E } = error_if((d) -> !checkFn(d), errorFn)
119 changes: 119 additions & 0 deletions test/operators/test_operator_error_if.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
module RocketErrorIfOperatorTest

using Test
using Rocket

include("../test_helpers.jl")

@testset "operator: error_if()" begin

println("Testing: operator error_if()")

run_proxyshowcheck("ErrorIf", error_if((d) -> false))

run_testset([
(
source = from(1:42) |> error_if((d) -> d > 2),
values = @ts([ 1, 2, e ]),
source_type = Int
),
(
source = from(1:42) |> error_if((d) -> d > 2, (d) -> "Error $d"),
values = @ts([ 1, 2, e("Error 3") ]),
source_type = Int
),
(
source = from(1:42) |> error_if((d) -> d > 5, (d) -> "Error $d"),
values = @ts([ 1, 2, 3, 4, 5, e("Error 6") ]),
source_type = Int
),
(
source = from(1:42) |> error_if((d) -> d > 2) |> catch_error((err, obs) -> of(3)),
values = @ts([ 1, 2, 3, c ]),
source_type = Int
),
(
source = completed(Int) |> error_if((d) -> d > 2),
values = @ts(c),
source_type = Int
),
(
source = faulted(String, "e") |> error_if((d) -> d > 2),
values = @ts(e("e")),
source_type = String
),
(
source = never() |> error_if((d) -> d > 2),
values = @ts()
)
])

@testset "`error_if` should unsubscribe on check fail" begin
source = Subject(Int)

events = []

subscription = subscribe!(source |> error_if((d) -> d > 2), lambda(
on_next = (d) -> push!(events, d),
on_error = (e) -> push!(events, "Error"),
on_complete = () -> push!(events, "Completed")
))

@test events == [ ]

next!(source, 1)

@test events == [ 1 ]

next!(source, 2)

@test events == [ 1, 2 ]

next!(source, 3)

@test events == [ 1, 2, "Error" ]

next!(source, 4)

@test events == [ 1, 2, "Error" ]

unsubscribe!(subscription)

@test events == [ 1, 2, "Error" ]
end

@testset "`error_if` should not check anything after unsubscription" begin
source = Subject(Int)

events = []

subscription = subscribe!(source |> error_if((d) -> d > 2), lambda(
on_next = (d) -> push!(events, d),
on_error = (e) -> push!(events, "Error"),
on_complete = () -> push!(events, "Completed")
))

@test events == [ ]

next!(source, 1)

@test events == [ 1 ]

next!(source, 2)

@test events == [ 1, 2 ]

unsubscribe!(subscription)

next!(source, 3)

@test events == [ 1, 2 ]

next!(source, 4)

@test events == [ 1, 2 ]
end

end

end
Loading

0 comments on commit 07cf554

Please sign in to comment.