Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: rust usage documentation #3089

Merged
merged 1 commit into from
Jan 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions docs/src/rust/check_constraints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,19 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// --8<-- [start:add_constraint]
let table = deltalake::open_table("../rust/tests/data/simple_table").await?;
let ops = DeltaOps(table);
ops.with_constraint("id_gt_0", "id > 0").await?;
ops.add_constraint().with_constraint("id_gt_0", "id > 0").await?;
// --8<-- [end:add_constraint]

// --8<-- [start:add_data]
let table = deltalake::open_table("../rust/tests/data/simple_table").await?;
let schema = table.get_state().arrow_schema()?;
let mut table = deltalake::open_table("../rust/tests/data/simple_table").await?;
let schema = table.snapshot()?.arrow_schema()?;
let invalid_values: Vec<Arc<dyn Array>> = vec![
Arc::new(Int32Array::from(vec![-10]))
];
let batch = RecordBatch::try_new(schema, invalid_values)?;
table.write(vec![batch]).await?;
let mut writer = RecordBatchWriter::for_table(&table)?;
writer.write(batch).await?;
writer.flush_and_commit(&mut table).await?;
// --8<-- [end:add_data]

Ok(())
Expand Down
30 changes: 26 additions & 4 deletions docs/src/rust/read_cdf.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,37 @@
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {

let table = deltalake::open_table("../rust/tests/data/cdf-table").await?;
let table = deltalake::open_table("tmp/some-table").await?;
let ctx = SessionContext::new();
let ops = DeltaOps(table);
let cdf = ops.load_cdf()
let cdf = ops
.load_cdf()
.with_starting_version(0)
.with_ending_version(4)
.build()
.await?;

arrow_cast::pretty::print_batches(&cdf)?;
let batches = collect_batches(
cdf.properties().output_partitioning().partition_count(),
&cdf,
ctx,
).await?;
arrow_cast::pretty::print_batches(&batches)?;


Ok(())
}
}

async fn collect_batches(
num_partitions: usize,
stream: &impl ExecutionPlan,
ctx: SessionContext,
) -> Result<Vec<RecordBatch>, Box<dyn std::error::Error>> {
let mut batches = vec![];
for p in 0..num_partitions {
let data: Vec<RecordBatch> =
collect_sendable_stream(stream.execute(p, ctx.task_ctx())?).await?;
batches.extend_from_slice(&data);
}
Ok(batches)
}
72 changes: 58 additions & 14 deletions docs/usage/appending-overwriting-delta-lake-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,30 @@ Suppose you have a Delta table with the following contents:

Append two additional rows of data to the table:

```python
from deltalake import write_deltalake, DeltaTable

df = pd.DataFrame({"num": [8, 9], "letter": ["dd", "ee"]})
write_deltalake("tmp/some-table", df, mode="append")
```
=== "Python"

```python
from deltalake import write_deltalake, DeltaTable

df = pd.DataFrame({"num": [8, 9], "letter": ["dd", "ee"]})
write_deltalake("tmp/some-table", df, mode="append")
```

=== "Rust"
```rust
let table = open_table("tmp/some-table").await?;
DeltaOps(table).write(RecordBatch::try_new(
Arc::new(Schema::new(vec![
Field::new("num", DataType::Int32, false),
Field::new("letter", DataType::Utf8, false),
])),
vec![
Arc::new(Int32Array::from(vec![8, 9])),
Arc::new(StringArray::from(vec![
"dd", "ee"
])),
])).with_save_mode(SaveMode::Append).await?;
```

Here are the updated contents of the Delta table:

Expand All @@ -44,12 +62,27 @@ Now let's see how to perform an overwrite transaction.
## Delta Lake overwrite transactions

Now let's see how to overwrite the exisitng Delta table.

```python
df = pd.DataFrame({"num": [11, 22], "letter": ["aa", "bb"]})
write_deltalake("tmp/some-table", df, mode="overwrite")
```

=== "Python"
```python
df = pd.DataFrame({"num": [11, 22], "letter": ["aa", "bb"]})
write_deltalake("tmp/some-table", df, mode="overwrite")
```

=== "Rust"
```rust
let table = open_table("tmp/some-table").await?;
DeltaOps(table).write(RecordBatch::try_new(
Arc::new(Schema::new(vec![
Field::new("num", DataType::Int32, false),
Field::new("letter", DataType::Utf8, false),
])),
vec![
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(StringArray::from(vec![
"a", "b", "c",
])),
])).with_save_mode(SaveMode::Overwrite).await?;
```
Here are the contents of the Delta table after the overwrite operation:

```
Expand All @@ -63,9 +96,20 @@ Here are the contents of the Delta table after the overwrite operation:

Overwriting just performs a logical delete. It doesn't physically remove the previous data from storage. Time travel back to the previous version to confirm that the old version of the table is still accessable.

```python
dt = DeltaTable("tmp/some-table", version=1)
=== "Python"

```python
dt = DeltaTable("tmp/some-table", version=1)
```

=== "Rust"
```rust
let mut table = open_table("tmp/some-table").await?;
table.load_version(1).await?;
```


```
+-------+----------+
| num | letter |
|-------+----------|
Expand Down
47 changes: 47 additions & 0 deletions docs/usage/create-delta-lake-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,53 @@ You can easily write a DataFrame to a Delta table.
df.write_delta("tmp/some-table")
```

=== "Rust"

```rust
let delta_ops = DeltaOps::try_from_uri("tmp/some-table").await?;
let mut table = delta_ops
.create()
.with_table_name("some-table")
.with_save_mode(SaveMode::Overwrite)
.with_columns(
StructType::new(vec![
StructField::new(
"num".to_string(),
DataType::Primitive(PrimitiveType::Integer),
true,
),
StructField::new(
"letter".to_string(),
DataType::Primitive(PrimitiveType::String),
true,
),
])
.fields()
.cloned(),
)
.await?;

let mut record_batch_writer =
deltalake::writer::RecordBatchWriter::for_table(&mut table)?;
record_batch_writer
.write(
RecordBatch::try_new(
Arc::new(Schema::new(vec![
Field::new("num", DataType::Int32, true),
Field::new("letter", Utf8, true),
])),
vec![
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(StringArray::from(vec![
"a", "b", "c",
])),
],
)?,
)
.await?;
record_batch_writer.flush_and_commit(&mut table).await?;
```

Here are the contents of the Delta table in storage:

```
Expand Down
28 changes: 23 additions & 5 deletions docs/usage/deleting-rows-from-delta-lake-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,29 @@ Suppose you have the following Delta table with four rows:

Here's how to delete all the rows where the `num` is greater than 2:

```python
dt = DeltaTable("tmp/my-table")
dt.delete("num > 2")
```

=== "Python"

```python
dt = DeltaTable("tmp/my-table")
dt.delete("num > 2")
```

=== "Rust"
```rust
let table = deltalake::open_table("./data/simple_table").await?;
let (table, delete_metrics) = DeltaOps(table)
.delete()
.with_predicate(col("num").gt(lit(2)))
.await?;
```
`with_predicate` expects an argument that can be translated to a Datafusion `Expression`. This can be either using the Dataframe API, or using a `SQL where` clause:
```rust
let table = deltalake::open_table("./data/simple_table").await?;
let (table, delete_metrics) = DeltaOps(table)
.delete()
.with_predicate("num > 2")
.await?;
```
Here are the contents of the Delta table after the delete operation has been performed:

```
Expand Down
Loading
Loading