Skip to content

Commit

Permalink
Properly checkpoint writes via the webhook lambda
Browse files Browse the repository at this point in the history
Missed a spot in my haste last week, oops!
  • Loading branch information
rtyler committed Apr 1, 2024
1 parent 137209d commit cc9cdc2
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 6 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ members = [
resolver = "2"

[workspace.package]
version = "0.10.1"
version = "0.10.2"
edition = "2021"
keywords = ["deltalake", "parquet", "lambda", "delta"]
homepage = "https://github.com/buoyant-data/oxbow"
Expand Down
7 changes: 3 additions & 4 deletions lambdas/webhook/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
[package]
name = "webhook"
version = "0.1.0"
version.workspace = true
edition.workspace = true
repository.workspace = true
homepage.workspace = true

[dependencies]
http = "1.1.0"
Expand All @@ -16,6 +18,3 @@ serde_json = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
arrow-json = "50.0.0"
arrow-schema = "50.0.0"

15 changes: 14 additions & 1 deletion lambdas/webhook/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,20 @@ async fn append_values(mut table: DeltaTable, jsonl: &str) -> Result<DeltaTable,
writer.write(batch).await?;
}

writer.flush_and_commit(&mut table).await?;
let version = writer.flush_and_commit(&mut table).await?;
info!("Successfully flushed v{version} to Delta table");
// Reload the table to make sure we have the latest version to checkpoint
let _ = table.load().await;
if table.version() == version {
match deltalake::checkpoints::create_checkpoint(&table).await {
Ok(_) => info!("Successfully created checkpoint"),
Err(e) => {
error!("Failed to create checkpoint for {table:?}: {e:?}")
}
}
} else {
error!("The table was reloaded to create a checkpoint but a new version already exists!");
}

Ok(table)
}
Expand Down

0 comments on commit cc9cdc2

Please sign in to comment.