Skip to content

Commit

Permalink
fix: include .venv in .gitignore
Browse files Browse the repository at this point in the history
  • Loading branch information
gautham acharya authored and rtyler committed Mar 9, 2024
1 parent 2786e92 commit 1e19cf3
Show file tree
Hide file tree
Showing 10 changed files with 91 additions and 399 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ tlaplus/*.toolbox/*/[0-9]*-[0-9]*-[0-9]*-[0-9]*-[0-9]*-[0-9]*/
/.idea
.vscode
.env
.venv
**/.DS_Store
**/.python-version
.coverage
Expand All @@ -29,4 +30,4 @@ Cargo.lock

justfile
site
__pycache__
__pycache__
102 changes: 18 additions & 84 deletions crates/core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,6 @@ pub struct Metrics {
pub preserve_insertion_order: bool,
}

/// Information to be committed by the optimizer.
#[derive(Debug)]
pub struct CommitContext {
actions: Vec<Action>,
app_metadata: Option<HashMap<String, serde_json::Value>>,
snapshot: Option<DeltaTableState>,
operation: OptimizeInput,
}

/// Statistics on files for a particular operation
/// Operation can be remove or add
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
Expand Down Expand Up @@ -184,8 +175,6 @@ pub struct OptimizeBuilder<'a> {
/// Optimize type
optimize_type: OptimizeType,
min_commit_interval: Option<Duration>,
/// Indicates whether the writes should be committed
commit_writes: bool,
}

impl<'a> OptimizeBuilder<'a> {
Expand All @@ -203,7 +192,6 @@ impl<'a> OptimizeBuilder<'a> {
max_spill_size: 20 * 1024 * 1024 * 2014, // 20 GB.
optimize_type: OptimizeType::Compact,
min_commit_interval: None,
commit_writes: true, // commit by default
}
}

Expand Down Expand Up @@ -263,36 +251,10 @@ impl<'a> OptimizeBuilder<'a> {
self.min_commit_interval = Some(min_commit_interval);
self
}

/// Commit Writes
pub fn with_commit_writes(mut self, commit_writes: bool) -> Self {
self.commit_writes = commit_writes;
self
}
}

impl<'a> OptimizeBuilder<'a> {
/// Commit writes after processing
pub async fn commit_writes(self, commit_info: CommitContext) -> DeltaResult<DeltaTable> {
commit(
self.log_store.as_ref(),
&commit_info.actions,
commit_info.operation.clone().into(),
commit_info.snapshot.as_ref(),
commit_info.app_metadata.clone(),
)
.await?;

let mut table = DeltaTable::new_with_state(self.log_store, self.snapshot);

table.update().await?;

Ok(table)
}
}

impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> {
type Output = DeltaResult<(DeltaTable, Metrics, Option<CommitContext>)>;
type Output = DeltaResult<(DeltaTable, Metrics)>;
type IntoFuture = BoxFuture<'a, Self::Output>;

fn into_future(self) -> Self::IntoFuture {
Expand All @@ -314,22 +276,19 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> {
this.target_size.to_owned(),
writer_properties,
)?;
let (metrics, commit_info) = plan
let metrics = plan
.execute(
this.log_store.clone(),
&this.snapshot,
this.max_concurrent_tasks,
this.max_spill_size,
this.min_commit_interval,
this.app_metadata,
this.commit_writes,
)
.await?;

let mut table = DeltaTable::new_with_state(this.log_store, this.snapshot);
table.update().await?;

Ok((table, metrics, commit_info))
Ok((table, metrics))
})
}
}
Expand Down Expand Up @@ -645,7 +604,6 @@ impl MergePlan {
}

/// Perform the operations outlined in the plan.
#[allow(clippy::too_many_arguments)]
pub async fn execute(
mut self,
log_store: LogStoreRef,
Expand All @@ -655,9 +613,9 @@ impl MergePlan {
max_spill_size: usize,
min_commit_interval: Option<Duration>,
app_metadata: Option<HashMap<String, serde_json::Value>>,
commit_writes: bool,
) -> Result<(Metrics, Option<CommitContext>), DeltaTableError> {
) -> Result<Metrics, DeltaTableError> {
let operations = std::mem::take(&mut self.operations);

let stream = match operations {
OptimizeOperations::Compact(bins) => futures::stream::iter(bins)
.flat_map(|(_, (partition, bins))| {
Expand Down Expand Up @@ -740,9 +698,6 @@ impl MergePlan {
// or when we reach the commit interval.
let mut actions = vec![];

// Store total actions if we are not committing
let mut total_actions: Vec<Action> = vec![];

// Each time we commit, we'll reset buffered_metrics to orig_metrics.
let orig_metrics = std::mem::take(&mut self.metrics);
let mut buffered_metrics = orig_metrics.clone();
Expand All @@ -766,11 +721,7 @@ impl MergePlan {
None => false,
Some(i) => now.duration_since(last_commit) > i,
};

// if commit_writes is false, we are not committing anyway, and will
// instead return a commit_info object, so no need to consider the
// min_commit_interval
if !actions.is_empty() && (!commit_writes || mature || end) {
if !actions.is_empty() && (mature || end) {
let actions = std::mem::take(&mut actions);
last_commit = now;

Expand All @@ -789,41 +740,24 @@ impl MergePlan {
}

table.update().await?;

if commit_writes {
debug!("committing {} actions", actions.len());
//// TODO: Check for remove actions on optimized partitions. If a
//// optimized partition was updated then abort the commit. Requires (#593).
commit(
table.log_store.as_ref(),
&actions,
self.task_parameters.input_parameters.clone().into(),
Some(table.snapshot()?),
Some(app_metadata.clone()),
)
.await?;
} else {
// Save the actions buffer so the client can commit later
total_actions.extend(actions)
}
debug!("committing {} actions", actions.len());
//// TODO: Check for remove actions on optimized partitions. If a
//// optimized partition was updated then abort the commit. Requires (#593).
commit(
table.log_store.as_ref(),
&actions,
self.task_parameters.input_parameters.clone().into(),
Some(table.snapshot()?),
Some(app_metadata.clone()),
)
.await?;
}

if end {
break;
}
}

let commit_context = if !commit_writes {
Some(CommitContext {
actions: total_actions,
app_metadata,
snapshot: Some(snapshot.clone()), // using original table snapshot as all commits are added at once
operation: self.task_parameters.input_parameters.clone(),
})
} else {
None
};

total_metrics.preserve_insertion_order = true;
if total_metrics.num_files_added == 0 {
total_metrics.files_added.min = 0;
Expand All @@ -832,7 +766,7 @@ impl MergePlan {
total_metrics.files_removed.min = 0;
}

Ok((total_metrics, commit_context))
Ok(total_metrics)
}
}

Expand Down
Loading

0 comments on commit 1e19cf3

Please sign in to comment.