Skip to content

Commit

Permalink
Observable Kind Wrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
pakoito committed Sep 15, 2017
1 parent 13319e8 commit 2b9d9b1
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 130 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package kategory

import io.reactivex.Observable
import io.reactivex.ObservableEmitter

fun <A> Observable<A>.k(): ObservableKW<A> = ObservableKW(this)

fun <A> ObservableKWKind<A>.value(): Observable<A> =
this.ev().observable

@higherkind
@deriving(Functor::class, Applicative::class, AsyncContext::class)
data class ObservableKW<A>(val observable: Observable<A>) : ObservableKWKind<A> {
fun <B> map(f: (A) -> B): ObservableKW<B> =
observable.map(f).k()

fun <B> ap(fa: ObservableKWKind<(A) -> B>): ObservableKW<B> =
flatMap { a -> fa.ev().map { ff -> ff(a) } }

fun <B> flatMap(f: (A) -> ObservableKW<B>): ObservableKW<B> =
observable.flatMap { f(it).observable }.k()

fun <B> concatMap(f: (A) -> ObservableKW<B>): ObservableKW<B> =
observable.concatMap { f(it).observable }.k()

fun <B> switchMap(f: (A) -> ObservableKW<B>): ObservableKW<B> =
observable.switchMap { f(it).observable }.k()

companion object {
fun <A> pure(a: A): ObservableKW<A> =
Observable.just(a).k()

fun <A> raiseError(t: Throwable): ObservableKW<A> =
Observable.error<A>(t).k()

fun <A> runAsync(fa: Proc<A>): ObservableKW<A> =
Observable.create { emitter: ObservableEmitter<A> ->
fa { either: Either<Throwable, A> ->
either.fold({
emitter.onError(it)
}, {
emitter.onNext(it)
emitter.onComplete()
})

}
}.k()

fun monadFlat(): ObservableKWFlatMonadInstance = ObservableKWFlatMonadInstanceImplicits.instance()

fun monadConcat(): ObservableKWConcatMonadInstance = ObservableKWConcatMonadInstanceImplicits.instance()

fun monadSwitch(): ObservableKWSwitchMonadInstance = ObservableKWSwitchMonadInstanceImplicits.instance()

fun monadErrorFlat(): ObservableKWFlatMonadErrorInstance = ObservableKWFlatMonadErrorInstanceImplicits.instance()

fun monadErrorConcat(): ObservableKWConcatMonadErrorInstance = ObservableKWConcatMonadErrorInstanceImplicits.instance()

fun monadErrorSwitch(): ObservableKWSwitchMonadErrorInstance = ObservableKWSwitchMonadErrorInstanceImplicits.instance()
}
}

fun <A> ObservableKW<A>.handleErrorWith(function: (Throwable) -> ObservableKW<A>): ObservableKW<A> =
this.observable.onErrorResumeNext { t: Throwable -> function(t).observable }.k()

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,117 +1,117 @@
package kategory

object ObservableWMonadInstanceImplicits {
object ObservableKWMonadInstanceImplicits {
@JvmStatic
fun instance(): ObservableWFlatMonadInstance = ObservableWFlatMonadInstanceImplicits.instance()
fun instance(): ObservableKWFlatMonadInstance = ObservableKWFlatMonadInstanceImplicits.instance()
}

object ObservableWMonadErrorInstanceImplicits {
object ObservableKWMonadErrorInstanceImplicits {
@JvmStatic
fun instance(): ObservableWFlatMonadErrorInstance = ObservableWFlatMonadErrorInstanceImplicits.instance()
fun instance(): ObservableKWFlatMonadErrorInstance = ObservableKWFlatMonadErrorInstanceImplicits.instance()
}

interface ObservableWFlatMonadInstance :
ObservableWApplicativeInstance,
Monad<ObservableWHK> {
override fun <A, B> ap(fa: ObservableWKind<A>, ff: ObservableWKind<(A) -> B>): ObservableW<B> =
interface ObservableKWFlatMonadInstance :
ObservableKWApplicativeInstance,
Monad<ObservableKWHK> {
override fun <A, B> ap(fa: ObservableKWKind<A>, ff: ObservableKWKind<(A) -> B>): ObservableKW<B> =
fa.ev().ap(ff)

override fun <A, B> flatMap(fa: ObservableWKind<A>, f: (A) -> ObservableWKind<B>): ObservableWKind<B> =
override fun <A, B> flatMap(fa: ObservableKWKind<A>, f: (A) -> ObservableKWKind<B>): ObservableKWKind<B> =
fa.ev().flatMap { f(it).ev() }

override fun <A, B> tailRecM(a: A, f: (A) -> ObservableWKind<Either<A, B>>): ObservableWKind<B> =
override fun <A, B> tailRecM(a: A, f: (A) -> ObservableKWKind<Either<A, B>>): ObservableKWKind<B> =
f(a).ev().flatMap {
it.fold({ tailRecM(a, f).ev() }, { pure(it).ev() })
}
}

object ObservableWFlatMonadInstanceImplicits {
object ObservableKWFlatMonadInstanceImplicits {
@JvmStatic
fun instance(): ObservableWFlatMonadInstance = object : ObservableWFlatMonadInstance {}
fun instance(): ObservableKWFlatMonadInstance = object : ObservableKWFlatMonadInstance {}
}

interface ObservableWFlatMonadErrorInstance :
ObservableWFlatMonadInstance,
MonadError<ObservableWHK, Throwable> {
override fun <A> raiseError(e: Throwable): ObservableW<A> =
ObservableW.raiseError(e)
interface ObservableKWFlatMonadErrorInstance :
ObservableKWFlatMonadInstance,
MonadError<ObservableKWHK, Throwable> {
override fun <A> raiseError(e: Throwable): ObservableKW<A> =
ObservableKW.raiseError(e)

override fun <A> handleErrorWith(fa: ObservableWKind<A>, f: (Throwable) -> ObservableWKind<A>): ObservableW<A> =
override fun <A> handleErrorWith(fa: ObservableKWKind<A>, f: (Throwable) -> ObservableKWKind<A>): ObservableKW<A> =
fa.ev().handleErrorWith { f(it).ev() }
}

object ObservableWFlatMonadErrorInstanceImplicits {
object ObservableKWFlatMonadErrorInstanceImplicits {
@JvmStatic
fun instance(): ObservableWFlatMonadErrorInstance = object : ObservableWFlatMonadErrorInstance {}
fun instance(): ObservableKWFlatMonadErrorInstance = object : ObservableKWFlatMonadErrorInstance {}
}

interface ObservableWConcatMonadInstance :
ObservableWApplicativeInstance,
Monad<ObservableWHK> {
override fun <A, B> ap(fa: ObservableWKind<A>, ff: ObservableWKind<(A) -> B>): ObservableW<B> =
interface ObservableKWConcatMonadInstance :
ObservableKWApplicativeInstance,
Monad<ObservableKWHK> {
override fun <A, B> ap(fa: ObservableKWKind<A>, ff: ObservableKWKind<(A) -> B>): ObservableKW<B> =
fa.ev().ap(ff)

override fun <A, B> flatMap(fa: ObservableWKind<A>, f: (A) -> ObservableWKind<B>): ObservableW<B> =
override fun <A, B> flatMap(fa: ObservableKWKind<A>, f: (A) -> ObservableKWKind<B>): ObservableKW<B> =
fa.ev().concatMap { f(it).ev() }

override fun <A, B> tailRecM(a: A, f: (A) -> ObservableWKind<Either<A, B>>): ObservableW<B> =
override fun <A, B> tailRecM(a: A, f: (A) -> ObservableKWKind<Either<A, B>>): ObservableKW<B> =
f(a).ev().concatMap {
it.fold({ tailRecM(a, f).ev() }, { pure(it).ev() })
}
}

object ObservableWConcatMonadInstanceImplicits {
object ObservableKWConcatMonadInstanceImplicits {
@JvmStatic
fun instance(): ObservableWConcatMonadInstance = object : ObservableWConcatMonadInstance {}
fun instance(): ObservableKWConcatMonadInstance = object : ObservableKWConcatMonadInstance {}
}

interface ObservableWConcatMonadErrorInstance :
ObservableWConcatMonadInstance,
MonadError<ObservableWHK, Throwable> {
override fun <A> raiseError(e: Throwable): ObservableW<A> =
ObservableW.raiseError(e)
interface ObservableKWConcatMonadErrorInstance :
ObservableKWConcatMonadInstance,
MonadError<ObservableKWHK, Throwable> {
override fun <A> raiseError(e: Throwable): ObservableKW<A> =
ObservableKW.raiseError(e)

override fun <A> handleErrorWith(fa: ObservableWKind<A>, f: (Throwable) -> ObservableWKind<A>): ObservableW<A> =
override fun <A> handleErrorWith(fa: ObservableKWKind<A>, f: (Throwable) -> ObservableKWKind<A>): ObservableKW<A> =
fa.ev().handleErrorWith { f(it).ev() }
}

object ObservableWConcatMonadErrorInstanceImplicits {
object ObservableKWConcatMonadErrorInstanceImplicits {
@JvmStatic
fun instance(): ObservableWConcatMonadErrorInstance = object : ObservableWConcatMonadErrorInstance {}
fun instance(): ObservableKWConcatMonadErrorInstance = object : ObservableKWConcatMonadErrorInstance {}
}

interface ObservableWSwitchMonadInstance :
ObservableWApplicativeInstance,
Monad<ObservableWHK> {
override fun <A, B> ap(fa: ObservableWKind<A>, ff: ObservableWKind<(A) -> B>): ObservableW<B> =
interface ObservableKWSwitchMonadInstance :
ObservableKWApplicativeInstance,
Monad<ObservableKWHK> {
override fun <A, B> ap(fa: ObservableKWKind<A>, ff: ObservableKWKind<(A) -> B>): ObservableKW<B> =
fa.ev().ap(ff)

override fun <A, B> flatMap(fa: ObservableWKind<A>, f: (A) -> ObservableWKind<B>): ObservableW<B> =
override fun <A, B> flatMap(fa: ObservableKWKind<A>, f: (A) -> ObservableKWKind<B>): ObservableKW<B> =
fa.ev().switchMap { f(it).ev() }

override fun <A, B> tailRecM(a: A, f: (A) -> ObservableWKind<Either<A, B>>): ObservableW<B> =
override fun <A, B> tailRecM(a: A, f: (A) -> ObservableKWKind<Either<A, B>>): ObservableKW<B> =
f(a).ev().switchMap {
it.fold({ tailRecM(a, f).ev() }, { pure(it).ev() })
}
}

object ObservableWSwitchMonadInstanceImplicits {
object ObservableKWSwitchMonadInstanceImplicits {
@JvmStatic
fun instance(): ObservableWSwitchMonadInstance = object : ObservableWSwitchMonadInstance {}
fun instance(): ObservableKWSwitchMonadInstance = object : ObservableKWSwitchMonadInstance {}
}

interface ObservableWSwitchMonadErrorInstance :
ObservableWSwitchMonadInstance,
MonadError<ObservableWHK, Throwable> {
interface ObservableKWSwitchMonadErrorInstance :
ObservableKWSwitchMonadInstance,
MonadError<ObservableKWHK, Throwable> {

override fun <A> raiseError(e: Throwable): ObservableW<A> =
ObservableW.raiseError(e)
override fun <A> raiseError(e: Throwable): ObservableKW<A> =
ObservableKW.raiseError(e)

override fun <A> handleErrorWith(fa: ObservableWKind<A>, f: (Throwable) -> ObservableWKind<A>): ObservableW<A> =
override fun <A> handleErrorWith(fa: ObservableKWKind<A>, f: (Throwable) -> ObservableKWKind<A>): ObservableKW<A> =
fa.ev().handleErrorWith { f(it).ev() }
}

object ObservableWSwitchMonadErrorInstanceImplicits {
object ObservableKWSwitchMonadErrorInstanceImplicits {
@JvmStatic
fun instance(): ObservableWSwitchMonadErrorInstance = object : ObservableWSwitchMonadErrorInstance {}
fun instance(): ObservableKWSwitchMonadErrorInstance = object : ObservableKWSwitchMonadErrorInstance {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import org.junit.runner.RunWith
import java.util.concurrent.TimeUnit

@RunWith(KTestJUnitRunner::class)
class ObservableWTest : UnitSpec() {
class ObservableKWTest : UnitSpec() {

fun <T> EQ(): Eq<ObservableWKind<T>> = object : Eq<ObservableWKind<T>> {
override fun eqv(a: ObservableWKind<T>, b: ObservableWKind<T>): Boolean =
fun <T> EQ(): Eq<ObservableKWKind<T>> = object : Eq<ObservableKWKind<T>> {
override fun eqv(a: ObservableKWKind<T>, b: ObservableKWKind<T>): Boolean =
try {
a.value().blockingFirst() == b.value().blockingFirst()
} catch (throwable: Throwable) {
Expand All @@ -38,19 +38,19 @@ class ObservableWTest : UnitSpec() {
init {

"instances can be resolved implicitly" {
functor<ObservableWHK>() shouldNotBe null
applicative<ObservableWHK>() shouldNotBe null
monad<ObservableWHK>() shouldNotBe null
monadError<ObservableWHK, Unit>() shouldNotBe null
asyncContext<ObservableWHK>() shouldNotBe null
functor<ObservableKWHK>() shouldNotBe null
applicative<ObservableKWHK>() shouldNotBe null
monad<ObservableKWHK>() shouldNotBe null
monadError<ObservableKWHK, Unit>() shouldNotBe null
asyncContext<ObservableKWHK>() shouldNotBe null
}

testLaws(AsyncLaws.laws(ObservableW.asyncContext(), ObservableW.monadErrorFlat(), EQ(), EQ()))
testLaws(AsyncLaws.laws(ObservableW.asyncContext(), ObservableW.monadErrorConcat(), EQ(), EQ()))
testLaws(AsyncLaws.laws(ObservableW.asyncContext(), ObservableW.monadErrorSwitch(), EQ(), EQ()))
testLaws(AsyncLaws.laws(ObservableKW.asyncContext(), ObservableKW.monadErrorFlat(), EQ(), EQ()))
testLaws(AsyncLaws.laws(ObservableKW.asyncContext(), ObservableKW.monadErrorConcat(), EQ(), EQ()))
testLaws(AsyncLaws.laws(ObservableKW.asyncContext(), ObservableKW.monadErrorSwitch(), EQ(), EQ()))

"Multi-thread Observables finish correctly" {
val value: Observable<Long> = ObservableW.monadErrorFlat().bindingE {
val value: Observable<Long> = ObservableKW.monadErrorFlat().bindingE {
val a = Observable.timer(2, TimeUnit.SECONDS).k().bind()
yields(a)
}.value()
Expand All @@ -63,7 +63,7 @@ class ObservableWTest : UnitSpec() {
"Multi-thread Observables should run on their required threads" {
val originalThread: Thread = Thread.currentThread()
var nextThread: Thread? = null
val value: Observable<Long> = ObservableW.monadErrorFlat().bindingE {
val value: Observable<Long> = ObservableKW.monadErrorFlat().bindingE {
val a = Observable.timer(2, TimeUnit.SECONDS).k().bind()
nextThread = Thread.currentThread()
val b = Observable.just(a).observeOn(Schedulers.io()).k().bind()
Expand All @@ -77,7 +77,7 @@ class ObservableWTest : UnitSpec() {
}

"Observable cancellation forces binding to cancel without completing too" {
val value: Observable<Long> = ObservableW.monadErrorFlat().bindingE {
val value: Observable<Long> = ObservableKW.monadErrorFlat().bindingE {
val a = Observable.timer(3, TimeUnit.SECONDS).k().bind()
yields(a)
}.value()
Expand Down

0 comments on commit 2b9d9b1

Please sign in to comment.