Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make nightly clippy happy #2186

Merged
merged 1 commit into from
Apr 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 25 additions & 20 deletions ballista/rust/scheduler/src/state/persistent_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,28 +124,33 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
.get_from_prefix(&get_stage_prefix(&self.namespace))
.await?;

let mut tmp_stages: HashMap<StageKey, Arc<dyn ExecutionPlan>> = HashMap::new();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense to me (make the snapshot while not holding the lock) but I am not familiar enough with the ballista codebase to know if it would cause problems. Perhaps @mingmwang @yahoNanJing or @thinkharderdev could review it too?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. This should only happen when the scheduler initializes and needs to load the persistent state into memory

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

{
for (key, entry) in entries {
let (job_id, stage_id) = extract_stage_id_from_stage_key(&key).unwrap();
let session_id = self
.get_session_from_job(&job_id)
.expect("session id does not exist for job");
let session_ctx = self
.session_context_registry
.lookup_session(&session_id)
.await
.expect("SessionContext does not exist in SessionContextRegistry.");
let value = U::try_decode(&entry)?;
let runtime = session_ctx.runtime_env();
let plan = value.try_into_physical_plan(
session_ctx.deref(),
runtime.deref(),
self.codec.physical_extension_codec(),
)?;

tmp_stages.insert((job_id, stage_id), plan);
}
}
let mut stages = self.stages.write();
for (key, entry) in entries {
let (job_id, stage_id) = extract_stage_id_from_stage_key(&key).unwrap();
let session_id = self
.get_session_from_job(&job_id)
.expect("session id does not exist for job");
let session_ctx = self
.session_context_registry
.lookup_session(&session_id)
.await
.expect("SessionContext does not exist in SessionContextRegistry.");
let value = U::try_decode(&entry)?;
let runtime = session_ctx.runtime_env();
let plan = value.try_into_physical_plan(
session_ctx.deref(),
runtime.deref(),
self.codec.physical_extension_codec(),
)?;

stages.insert((job_id, stage_id), plan);
for tmp_stage in tmp_stages {
stages.insert(tmp_stage.0, tmp_stage.1);
}

Ok(())
}

Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-expr/src/expressions/lead_lag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,11 @@ fn shift_with_default_value(
create_empty_array(value, array.data_type(), array.len())
} else {
let slice_offset = (-offset).clamp(0, value_len) as usize;
let length = array.len() - offset.abs() as usize;
let length = array.len() - offset.unsigned_abs() as usize;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looking at this code I wonder if we need to check for offset < 0 -- however, this PR seems like a definite improvement so 👍

let slice = array.slice(slice_offset, length);

// Generate array with remaining `null` items
let nulls = offset.abs() as usize;
let nulls = offset.unsigned_abs() as usize;
let default_values = create_empty_array(value, slice.data_type(), nulls)?;
// Concatenate both arrays, add nulls after if shift > 0 else before
if offset > 0 {
Expand Down