Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Recipe for tee #186

Open
digitalbuddha opened this issue Jan 27, 2016 · 12 comments
Open

Recipe for tee #186

digitalbuddha opened this issue Jan 27, 2016 · 12 comments
Milestone

Comments

@digitalbuddha
Copy link

https://twitter.com/jessewilson/status/692326663061966848

Original issue:

I'm struggling with reading from a BufferedSource twice. What I'm trying to do is take an okhttp response body and stream it to both a parser and a file. First I created 2 Bufferinstances from same BufferedSource next I wrap a Buffer in an InputStreamReader and gracefully (or so I thought) pass it to gson. What I get is MalformedJsonException

If I however simply do body.source().readUtf8Line() I get the full raw json.

full example:

        Request request = new Request.Builder()
                .url("https://www.reddit.com/r/aww/new/.json")
                .build();
        response =new OkHttpClient().newCall(request).execute();

                        ResponseBody body = response.body();
                        BufferedSource sourceforFile = body.source();
                        sourceforFile.require(8014);
                        Buffer buffer = sourceforFile.buffer();
                        Buffer sourceForParser = buffer.clone();

                        InputStreamReader inputStreamReader = new InputStreamReader(sourceForParser.inputStream());
                        result= new GsonBuilder()
                                .registerTypeAdapterFactory(new GsonAdaptersModel()).create()
                                .fromJson(inputStreamReader, RedditData.class);

                        File file = new File(getContext().getCacheDir(), "file");
                        BufferedSink sink = Okio.buffer(Okio.sink(file));
                        sink.writeAll(sourceforFile.buffer());
                        diskValue = Okio.buffer(Okio.source(file)).readString(Charset.defaultCharset());

It seems like Buffers are perfectly suited for taking a single source and streaming in multiple direction, would love to finally bring it all together. Thank you for your help.

@digitalbuddha digitalbuddha changed the title Reading all data from buffer. Streaming BufferedSource to parser and disk Jan 27, 2016
@digitalbuddha
Copy link
Author

I rearranged to instead use the original BufferedSource to create the InputStream and the parsing step works, but the copied Buffer still only gives me a partial data saved to disk. Do I need to synchronize or wait for the originalSource to be exhausted prior to using a cloned Buffer?

ResponseBody body = response.body();
BufferedSource originalSource = body.source();
originalSource.require(8000);
Buffer copiedSource = originalSource.buffer().clone();

InputStreamReader inputStreamReader = new InputStreamReader(originalSource.inputStream());
result= new GsonBuilder()
        .fromJson(inputStreamReader, RedditData.class);

File file = new File(getContext().getCacheDir(), "file");
BufferedSink sink = Okio.buffer(Okio.sink(file));
sink.writeAll(copiedSource);
sink.close();
diskValue = Okio.buffer(Okio.source(file)).readString(Charset.defaultCharset());

@JakeWharton
Copy link
Collaborator

You can use Request.peekBody(Long.MAX_VALUE) to get a copy of the RequestBody, but this has a HUGE downside: it buffers the entire response in memory.

What you want is to write the stream to another sink while the application pulls bytes off the wire. OkHttp's cache behaves like this, and you should copy what it does. I can grab a pointer in the source if you can't find it when I get home.

@digitalbuddha
Copy link
Author

Yeah definitely don't want to copy. A pointer would be appreciated.

@JakeWharton
Copy link
Collaborator

https://github.com/square/okhttp/blob/master/okhttp/src/main/java/okhttp3/internal/http/HttpEngine.java#L756-L803

cacheBody would be Okio.buffer(Okio.source(targetFile)) and everything else should be straightforward to modify.

@digitalbuddha
Copy link
Author

thanks! I won't tell you how many times I read Cache.java today trying to figure it out

@digitalbuddha
Copy link
Author

After 2 tweets and 2 issue comments I finally got it. Thanks @JakeWharton & @swankjesse in case anyone else needs a working example here's the relevant parts.
https://gist.github.com/digitalbuddha/3c5bb15fa12a553c85ec

@JakeWharton
Copy link
Collaborator

By the way you can use ResponseBody.charStream() to get a Reader to pass to Gson that will use the appropriate charset defined in the Content-Type header automatically.

@JakeWharton JakeWharton changed the title Streaming BufferedSource to parser and disk Consider primitive to copy data to a second sink when source is read Jan 27, 2016
@JakeWharton JakeWharton reopened this Jan 27, 2016
@digitalbuddha
Copy link
Author

Reopening in preparation of pr.

Going to add @nonnull
Source filter(final Source source, final BufferedSink cacheBody) to Okio.java. Any suggestions for method name, feels like a source filter to me

@swankjesse
Copy link
Collaborator

@digitalbuddha
Copy link
Author

Wasn't sure if that was a placeholder term first time you suggested it.
Thanks.
On Jan 27, 2016 10:42 AM, "Jesse Wilson" notifications@github.com wrote:

Tee ? https://en.wikipedia.org/wiki/Tee_%28command%29


Reply to this email directly or view it on GitHub
#186 (comment).

@swankjesse swankjesse changed the title Consider primitive to copy data to a second sink when source is read Recipe for tee May 2, 2016
@swankjesse swankjesse added this to the 1.9 milestone May 2, 2016
@swankjesse swankjesse modified the milestones: 1.11, 1.10 Oct 12, 2016
@swankjesse swankjesse removed this from the 1.11 milestone May 1, 2017
@devcsrj
Copy link

devcsrj commented Jan 5, 2018

bump

Hi! Is a recipe now documented somewhere for this (i.e.: single producer, multiple consumer)?

@swankjesse
Copy link
Collaborator

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants