Skip to content

Review Design of Observable.using #1466

@davidmoten

Description

@davidmoten

This is the signature ofObservable.using currently:

static <T, Resource extends Subscription> Observable<T> using(Func0<Resource> resourceFactory, Func1<Resource, ? extends Observable<? extends T>> observableFactory)

What use case is there for the observableFactory to have access to the Subscription methods of Resource which are there for resource disposal?

If the source observable needs access to the resource disposal method then why would the using method be used at all?

I noticed that this method exists with this signature in the Rx.Net API so @headinthebox may be able to enlighten me.

I'd prefer the existence of a less coupled method (either as an addition or a replacement) like:

static <S, T> Observable<T> using(final Func0<S> resourceFactory,
            final Func1<? super S, ? extends Observable<T>> observableFactory,
            final Action1<? super S> dispose) 

The existing method is also non-intuitive to use I think and ideally involves the trick of using Subscriptions.create(Action0) as demonstrated to me once by @zsxwing.

Here's an example with a resource that is an InputStream from a file that we want using to close once we are finished.

Using the current api:

package com.github.davidmoten.rx;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;

import org.junit.Test;

import rx.Observable;
import rx.Subscription;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.subscriptions.Subscriptions;

public class UsingTest {

    @Test
    public void testUsingCurrentStyle() throws IOException {
        final File file = new File("target/temp");
        file.createNewFile();

        // do stuff with file inputstream then close inputstream
        Func0<Resource> resourceFactory = () -> {
            return new Resource(file);
        };

        // dummy observable that doesn't actually use the input stream
        Func1<Resource, Observable<String>> observableFactory = resource -> Observable.just("boo");

        Observable<String> ob = Observable.using(resourceFactory, observableFactory);
    }

    private static class Resource implements Subscription {
        private final InputStream is;
        private final Subscription sub;

        Resource(File file) {
            try {
                this.is = new FileInputStream(file);
            } catch (FileNotFoundException e) {
                throw new RuntimeException(e);
            }
            this.sub = Subscriptions.create(() -> {
                try {
                    is.close();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            });
        }

        InputStream getInputStream() {
            return is;
        }

        @Override
        public void unsubscribe() {
            sub.unsubscribe();
        }

        @Override
        public boolean isUnsubscribed() {
            return sub.isUnsubscribed();
        }
    }

}

With the proposed change/addition:

package com.github.davidmoten.rx;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;

import org.junit.Test;

import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func0;
import rx.functions.Func1;

public class UsingNewTest {

    @Test
    public void testUsingNewStyle() throws IOException {

        File file = new File("target/temp");
        file.createNewFile();

        // do stuff with file inputstream then close inputstream
        Func0<InputStream> resourceFactory = () -> {
            try {
                return new FileInputStream("target");
            } catch (FileNotFoundException e) {
                throw new RuntimeException(e);
            }
        };
        Func1<InputStream, Observable<String>> observableFactory = is -> Observable.just("boo");
        Action1<InputStream> dispose = is -> {
            try {
                is.close();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        };
        //this method does not exist yet
        Observable<String> ob = Observable.using(resourceFactory, observableFactory, dispose);
    }
}

What do people think of such an addition/replacement? ( @benjchristensen , @abersnaze , @akarnokd )

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions