-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Microbatch: batched execution #10677
Conversation
…ack + batch_size work
Thank you for your pull request! We could not find a changelog entry for this change. For details on how to document a change, see the contributing guide. |
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #10677 +/- ##
==========================================
+ Coverage 88.90% 88.95% +0.05%
==========================================
Files 180 181 +1
Lines 22856 22959 +103
==========================================
+ Hits 20319 20423 +104
+ Misses 2537 2536 -1
Flags with carried forward coverage won't be shown. Click here to find out more.
|
e4138c5
to
3a6c739
Compare
c6b2ccc
to
71a526c
Compare
71a526c
to
a31e703
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Requesting changes and am gonna follow up by making said changes 😅
microbatch_builder = MicrobatchBuilder( | ||
model=model, | ||
is_incremental=self._is_incremental(model), | ||
event_time_start=getattr(self.config.args, "EVENT_TIME_START", None), | ||
event_time_end=getattr(self.config.args, "EVENT_TIME_END", None), | ||
) | ||
end = microbatch_builder.build_end_time() | ||
start = microbatch_builder.build_start_time(end) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I was reading through MicrobatchBuilder
I found it odd that build_end_time
and build_start_time
weren't private functions that the __init__
would then call to default the event_time_start
/event_time_end
. With this bit of code here, I find myself still thinking so. Basically, the start/end are always going to be specific to a given MicrobatchBuilder
instance. Perhaps as a fast follow to this PR we should investigate if this can be reduced to that. The code here would then become
...
microbatch_builder = MicrobatchBuilder(
model=model,
is_incremental=self._is_incremental(model),
event_time_start=getattr(self.config.args, "EVENT_TIME_START", None),
event_time_end=getattr(self.config.args, "EVENT_TIME_END", None),
)
batches = microbatch_builder.build_batches()
...
The alternative would be to take the MicrobatchBuilder
class to be less specific to a model, which also seems like a valid approach. Right now though we seem to be somewhere in the middle with a class that is specific to a model, but methods that we're expected to call which should never change their return given the associated model.
# TODO: Remove. This is a temporary method. We're working with adapters on | ||
# a strategy to ensure we can access the `is_incremental` logic without drift |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should follow up with this on @mikealfare about the possible timeline
if ( | ||
os.environ.get("DBT_EXPERIMENTAL_MICROBATCH") | ||
and model.config.materialized == "incremental" | ||
and model.config.incremental_strategy == "microbatch" | ||
): | ||
batch_results = self._execute_microbatch_materialization( | ||
model, manifest, context, materialization_macro | ||
) | ||
else: | ||
result = MacroGenerator( | ||
materialization_macro, context, stack=context["context_macro_stack"] | ||
)() | ||
for relation in self._materialization_relations(result, model): | ||
self.adapter.cache_added(relation.incorporate(dbt_created=True)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know we talked about this work, and I absolutely understand why we're doing it this way. Something feels smelly about it though, and I can't exactly put my finger on it. My best guess is that we're doing a conditional, exiting the conditional, and then basically re-entering the conditional on lines 396-399. For instance line 399 should never be hit if we enter line 384. However because of the split conditionals, this isn't immediately apparent. I wonder if we should be calling into two separate private functions just before the try
on line 378, and only one or the other function would ever be called depending on if we're doing microbatch stuff or not.
status=RunStatus.Success, | ||
timing=[], | ||
thread_id=threading.current_thread().name, | ||
# TODO -- why isn't this getting propagated to logs? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The execution_time isn't making it to the logs? 🤔 That's odd....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My remaining open comments can be addressed at a later date. Let's move forward 🚀
Resolves #10700
Problem
For microbatch models, it should be possible to:
Solution
Checklist
🎩 Example batch-level failure:
