Skip to content

Commit

Permalink
Add CDF example. remove full path to make examples more concise. Some…
Browse files Browse the repository at this point in the history
… formatting

Signed-off-by: Abdullahsab3 <abdallah-sab3@hotmail.com>
  • Loading branch information
Abdullahsab3 committed Jan 1, 2025
1 parent 8c2ac43 commit 48e581c
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 52 deletions.
30 changes: 26 additions & 4 deletions docs/src/rust/read_cdf.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,37 @@
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {

let table = deltalake::open_table("../rust/tests/data/cdf-table").await?;
let table = deltalake::open_table("tmp/some-table").await?;
let ctx = SessionContext::new();
let ops = DeltaOps(table);
let cdf = ops.load_cdf()
let cdf = ops
.load_cdf()
.with_starting_version(0)
.with_ending_version(4)
.build()
.await?;

arrow_cast::pretty::print_batches(&cdf)?;
let batches = collect_batches(
cdf.properties().output_partitioning().partition_count(),
&cdf,
ctx,
).await?;
arrow_cast::pretty::print_batches(&batches)?;


Ok(())
}
}

async fn collect_batches(
num_partitions: usize,
stream: &impl ExecutionPlan,
ctx: SessionContext,
) -> Result<Vec<RecordBatch>, Box<dyn std::error::Error>> {
let mut batches = vec![];
for p in 0..num_partitions {
let data: Vec<RecordBatch> =
collect_sendable_stream(stream.execute(p, ctx.task_ctx())?).await?;
batches.extend_from_slice(&data);
}
Ok(batches)
}
24 changes: 12 additions & 12 deletions docs/usage/appending-overwriting-delta-lake-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ Append two additional rows of data to the table:

=== "Rust"
```rust
let table = open_table("./data/simple_table").await?;
let table = open_table("tmp/some-table").await?;
DeltaOps(table).write(RecordBatch::try_new(
Arc::new(Schema::new(vec![
arrow::datatypes::Field::new("num", arrow::datatypes::DataType::Int32, false),
arrow::datatypes::Field::new("letter", arrow::datatypes::DataType::Utf8, false),
Field::new("num", DataType::Int32, false),
Field::new("letter", DataType::Utf8, false),
])),
vec![
Arc::new(datafusion::arrow::array::Int32Array::from(vec![8, 9])),
Arc::new(datafusion::arrow::array::StringArray::from(vec![
Arc::new(Int32Array::from(vec![8, 9])),
Arc::new(StringArray::from(vec![
"dd", "ee"
])),
])).with_save_mode(SaveMode::Append).await?;
Expand Down Expand Up @@ -70,15 +70,15 @@ Now let's see how to overwrite the exisitng Delta table.

=== "Rust"
```rust
let table = open_table("./data/simple_table").await?;
DeltaOps(table).write( RecordBatch::try_new(
let table = open_table("tmp/some-table").await?;
DeltaOps(table).write(RecordBatch::try_new(
Arc::new(Schema::new(vec![
arrow::datatypes::Field::new("num", arrow::datatypes::DataType::Int32, false),
arrow::datatypes::Field::new("letter", arrow::datatypes::DataType::Utf8, false),
Field::new("num", DataType::Int32, false),
Field::new("letter", DataType::Utf8, false),
])),
vec![
Arc::new(datafusion::arrow::array::Int32Array::from(vec![1, 2, 3])),
Arc::new(datafusion::arrow::array::StringArray::from(vec![
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(StringArray::from(vec![
"a", "b", "c",
])),
])).with_save_mode(SaveMode::Overwrite).await?;
Expand All @@ -104,7 +104,7 @@ Overwriting just performs a logical delete. It doesn't physically remove the pr

=== "Rust"
```rust
let mut table = open_table("./data/simple_table").await?;
let mut table = open_table("tmp/some-table").await?;
table.load_version(1).await?;
```

Expand Down
9 changes: 5 additions & 4 deletions docs/usage/create-delta-lake-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,19 @@ You can easily write a DataFrame to a Delta table.
.write(
RecordBatch::try_new(
Arc::new(Schema::new(vec![
arrow::datatypes::Field::new("num", arrow::datatypes::DataType::Int32, false),
arrow::datatypes::Field::new("letter", arrow::datatypes::DataType::Utf8, false),
Field::new("num", DataType::Int32, true),
Field::new("letter", Utf8, true),
])),
vec![
Arc::new(datafusion::arrow::array::Int32Array::from(vec![1, 2, 3])),
Arc::new(datafusion::arrow::array::StringArray::from(vec![
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(StringArray::from(vec![
"a", "b", "c",
])),
],
)?,
)
.await?;
record_batch_writer.flush_and_commit(&mut table).await?;
```

Here are the contents of the Delta table in storage:
Expand Down
40 changes: 20 additions & 20 deletions docs/usage/loading-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,24 +90,24 @@ the `DeltaTable.is_deltatable()` method.
=== "Rust"

```rust
let table_path = "<path/to/valid/table>";
let builder = deltalake::DeltaTableBuilder::from_uri(table_path);
builder.build()?.verify_deltatable_existence().await?;
// true

let invalid_table_path = "<path/to/nonexistent/table>";
let builder = deltalake::DeltaTableBuilder::from_uri(invalid_table_path);
builder.build()?.verify_deltatable_existence().await?;
// false

let bucket_table_path = "<path/to/valid/table/in/bucket>";
let storage_options = HashMap::from_iter(vec![
("AWS_ACCESS_KEY_ID".to_string(), "THE_AWS_ACCESS_KEY_ID".to_string()),
("AWS_SECRET_ACCESS_KEY".to_string(), "THE_AWS_SECRET_ACCESS_KEY".to_string()),
]);
let builder = deltalake::DeltaTableBuilder::from_uri(bucket_table_path).with_storage_options(storage_options);
builder.build()?.verify_deltatable_existence().await?;
// true
let table_path = "<path/to/valid/table>";
let builder = deltalake::DeltaTableBuilder::from_uri(table_path);
builder.build()?.verify_deltatable_existence().await?;
// true

let invalid_table_path = "<path/to/nonexistent/table>";
let builder = deltalake::DeltaTableBuilder::from_uri(invalid_table_path);
builder.build()?.verify_deltatable_existence().await?;
// false

let bucket_table_path = "<path/to/valid/table/in/bucket>";
let storage_options = HashMap::from_iter(vec![
("AWS_ACCESS_KEY_ID".to_string(), "THE_AWS_ACCESS_KEY_ID".to_string()),
("AWS_SECRET_ACCESS_KEY".to_string(), "THE_AWS_SECRET_ACCESS_KEY".to_string()),
]);
let builder = deltalake::DeltaTableBuilder::from_uri(bucket_table_path).with_storage_options(storage_options);
builder.build()?.verify_deltatable_existence().await?;
// true
```


Expand Down Expand Up @@ -158,8 +158,8 @@ wish to load:
```
=== "Rust"
```rust
let mut table = open_table("./data/simple_table").await?;
table.load_version(1).await?;
let mut table = open_table("./data/simple_table").await?;
table.load_version(1).await?;
```


Expand Down
24 changes: 12 additions & 12 deletions docs/usage/merging-tables.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,16 +105,16 @@ For example, let’s update the value of a column in the target table based on a
.await?;

let schema = Arc::new(Schema::new(vec![
arrow::datatypes::Field::new("x", arrow::datatypes::DataType::Int32, true),
arrow::datatypes::Field::new("y", arrow::datatypes::DataType::Int32, true),
Field::new("x", DataType::Int32, true),
Field::new("y", DataType::Int32, true),
]));
let mut record_batch_writer = deltalake::writer::RecordBatchWriter::for_table(&mut table)?;
record_batch_writer
.write(RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(datafusion::arrow::array::Int32Array::from(vec![1, 2, 3])),
Arc::new(datafusion::arrow::array::Int32Array::from(vec![4, 5, 6])),
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(Int32Array::from(vec![4, 5, 6])),
],
)?)
.await?;
Expand All @@ -125,8 +125,8 @@ For example, let’s update the value of a column in the target table based on a
let source_data = ctx.read_batch(RecordBatch::try_new(
schema,
vec![
Arc::new(datafusion::arrow::array::Int32Array::from(vec![2, 3])),
Arc::new(datafusion::arrow::array::Int32Array::from(vec![5, 6])),
Arc::new(Int32Array::from(vec![2, 3])),
Arc::new(Int32Array::from(vec![5, 6])),
],
)?)?;

Expand Down Expand Up @@ -200,16 +200,16 @@ For example, let’s say we start with the same target table:
.await?;

let schema = Arc::new(Schema::new(vec![
arrow::datatypes::Field::new("x", arrow::datatypes::DataType::Int32, true),
arrow::datatypes::Field::new("y", arrow::datatypes::DataType::Int32, true),
Field::new("x", DataType::Int32, true),
Field::new("y", DataType::Int32, true),
]));
let mut record_batch_writer = deltalake::writer::RecordBatchWriter::for_table(&mut table)?;
record_batch_writer
.write(RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(datafusion::arrow::array::Int32Array::from(vec![1, 2, 3])),
Arc::new(datafusion::arrow::array::Int32Array::from(vec![4, 5, 6])),
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(Int32Array::from(vec![4, 5, 6])),
],
)?)
.await?;
Expand All @@ -234,8 +234,8 @@ And we want to merge only new records from our source data, without duplication:
let source_data = ctx.read_batch(RecordBatch::try_new(
schema,
vec![
Arc::new(datafusion::arrow::array::Int32Array::from(vec![2, 3])),
Arc::new(datafusion::arrow::array::Int32Array::from(vec![5, 6])),
Arc::new(Int32Array::from(vec![2, 3])),
Arc::new(Int32Array::from(vec![5, 6])),
],
)?)?;
```
Expand Down
2 changes: 2 additions & 0 deletions docs/usage/read-cdf.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

Reading the CDF data from a table with change data is easy.

The `delta.enableChangeDataFeed` configuration needs to be set to `true` when creating the delta table.

## Reading CDF Log

{{ code_example('read_cdf', None, []) }}
Expand Down

0 comments on commit 48e581c

Please sign in to comment.