-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
concurrency: make it work #345
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is really incredible! Some questions:
- have you done any work to quantify the
dbt run
speedup this effects? - did you validate that this properly skips dependent models on failure?
- which data warehouses have you tested this with?
Couple of comments mostly for my edification. Excited to get this merged in!
dbt/adapters/default.py
Outdated
if connections_in_use.get(name): | ||
return connections_in_use.get(name) | ||
|
||
if recache_if_missing is False: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think it's more pythonic to do if not recache if missing
here
from dbt.schema import Column | ||
|
||
|
||
lock = multiprocessing.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these will be singletons -- is that ok?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, I think that's what we want right now. does that seem right to you?
one other thing here. I thought a little bit about how we could support connections of multiple types in a single run -- if we were to do that, we'd need to change this and the cache to be unique-per-connection-type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hah i wouldn't worry about multiple connection types for now. That sounds right to me, just wanted to confirm because we had some problems with single-imports back in the day
# we add a magic number, 2 because there are overhead connections, | ||
# one for pre- and post-run hooks and other misc operations that occur | ||
# before the run starts, and one for integration tests. | ||
max_connections = profile.get('threads', 1) + 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is kind of funky, what's going on here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this connection pool doesn't do any sophisticated retry logic if a new connection is available. it just has a fixed number of connections. if you try to acquire a connection but they are all already in use, you get an exception.
in addition to one thread per model, we need some overheard connections. one is 'master' which is used for pre- and post-run-hooks, getting the list of existing tables before model runs, creating the schema, etc.
the other is for testing, which is kind of dumb.
this code doesn't exactly cap the number of connections to the number of threads, but it does make sure that connections don't grow in an unbounded fashion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got it, thanks
error = ("Internal error executing {filepath}\n\n{error}" | ||
"\n\nThis is an error in dbt. Please try again. If " | ||
"the error persists, open an issue at " | ||
"https://github.com/fishtown-analytics/dbt").format( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👏
dbt/runner.py
Outdated
error=str(e).strip()) | ||
status = "ERROR" | ||
if type(e) == psycopg2.InternalError and \ | ||
ABORTED_TRANSACTION_STRING == e.diag.message_primary: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think we'll see this a lot less often now -- is it still worth including?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great point... let me think on this a little bit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think we can leave it in for now. but if overall error handling has improved, i don't think this particular error message is incredibly helpful (since each model gets its own transaction)
I haven't, it's difficult to do so with my project because we have an XS snowflake warehouse, and performance has been noticeably different with multiple simultaneous transactions. I asked @jthandy to do some benchmarking on it with a Redshift project once it's in dev.
Yeah, but I guess we should have an integration test for this. I added one for the inverse, i.e. if a model fails, other models in the same run level don't fail.
Snowflake & Postgres |
Changes:
dbt test
shows great results with this approach because there's only one run level.)