Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Repo.stream #96

Open
ruslandoga opened this issue Jul 28, 2023 · 10 comments
Open

Add Repo.stream #96

ruslandoga opened this issue Jul 28, 2023 · 10 comments

Comments

@ruslandoga
Copy link
Contributor

No description provided.

@hkrutzer
Copy link
Contributor

There is currently no way to stream even though there is a Repo.stream right?

@ruslandoga
Copy link
Contributor Author

ruslandoga commented Aug 18, 2023

Yes, because Repo.stream requires a transaction and we don't define Repo.transaction since ClickHouse doesn't support transactions.

Repo.checkout(fn ->
  "numbers"
  |> select([n], n.number)
  |> limit(100)
  |> Repo.stream(prefix: "system")
  |> Stream.each(fn rows -> IO.inspect(rows) end)
  |> Stream.run
end)

results in

** (RuntimeError) cannot reduce stream outside of transaction
    (ecto_sql 3.10.1) lib/ecto/adapters/sql.ex:927: Ecto.Adapters.SQL.reduce/6
    (elixir 1.15.3) lib/stream.ex:1828: Enumerable.Stream.do_each/4
    (elixir 1.15.3) lib/stream.ex:943: Stream.do_transform/5
    (elixir 1.15.3) lib/stream.ex:1828: Enumerable.Stream.do_each/4
    (elixir 1.15.3) lib/stream.ex:690: Stream.run/1
    iex:5: (file)
    iex:5: (file)

Ideally, I'd want to make it work with Repo.checkout

@hkrutzer
Copy link
Contributor

hkrutzer commented Nov 6, 2023

That would require a change in ecto_sql, right?

@ruslandoga
Copy link
Contributor Author

ruslandoga commented Nov 6, 2023

I'm not sure. I haven't looked into it yet :) But we can always work around Ecto.Adapters.SQL and implement these functions in our own way.

@hkrutzer
Copy link
Contributor

Do you have any recommendations for streaming right now? I need it for a data export which currently grows to a few hundred MBs while it's going on. I could do what you describe in plausible/ch#82 (comment) but I don't think there is a way to retrieve a Ch pid from the connection pool in the Ecto APIs.

@ruslandoga
Copy link
Contributor Author

👋 @hkrutzer

You can start Ch on demand. That's what I've been doing so far in plausible/analytics#3507 with

config = Plausible.SomeClickhouseRepo.config()
{:ok, ch_conn} = Ch.start_link(Keyword.put(config, :pool_size, 1)) # which dies when the export process finishes

@hkrutzer
Copy link
Contributor

Thanks @ruslandoga! That looks useful. I take it as you are also using stream but haven't implemented it in ecto_ch, this is not an easy feature to add?

@ruslandoga
Copy link
Contributor Author

ruslandoga commented Nov 20, 2023

I guess I'll need to implement it in ecto_ch sooner or later :) Maybe when the export/import pr in Plausible reaches "MVP" stage and I start refactoring it.

Regarding whether it'd be easy or not, I can't say for certain, I just haven't looked into it much...

@hkrutzer
Copy link
Contributor

Hi @ruslandoga is the above still the best way to stream responses?

@ruslandoga
Copy link
Contributor Author

👋 @hkrutzer

Yes, except Ch.stream now emits %Ch.Result{} structs. And Ch.stream is what I ended up using in Plausible CSV exports: https://github.com/plausible/analytics/blob/af94f013100ce39ff0171f4bd21ac6c736bb2fe5/lib/plausible/exports.ex#L577-L608

@ruslandoga ruslandoga changed the title add Repo.stream Add Repo.stream Oct 15, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants