Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 38 additions & 3 deletions bindings/python/example/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,19 @@ async def main():
except Exception as e:
print(f"Failed to get table info: {e}")

# Demo: List offsets
print("\n--- Testing list_offsets() ---")
try:
# Query latest offsets using OffsetType constant (recommended for type safety)
offsets = await admin.list_offsets(
table_path,
bucket_ids=[0],
offset_type=fluss.OffsetType.LATEST
)
print(f"Latest offsets for table (before writes): {offsets}")
except Exception as e:
print(f"Failed to list offsets: {e}")

# Get the table instance
table = await conn.get_table(table_path)
print(f"Got table: {table}")
Expand All @@ -88,7 +101,7 @@ async def main():
print(f"Created append writer: {append_writer}")

try:
# Test 1: Write PyArrow Table
# Demo: Write PyArrow Table
print("\n--- Testing PyArrow Table write ---")
pa_table = pa.Table.from_arrays(
[
Expand All @@ -103,7 +116,7 @@ async def main():
append_writer.write_arrow(pa_table)
print("Successfully wrote PyArrow Table")

# Test 2: Write PyArrow RecordBatch
# Demo: Write PyArrow RecordBatch
print("\n--- Testing PyArrow RecordBatch write ---")
pa_record_batch = pa.RecordBatch.from_arrays(
[
Expand All @@ -118,7 +131,7 @@ async def main():
append_writer.write_arrow_batch(pa_record_batch)
print("Successfully wrote PyArrow RecordBatch")

# Test 3: Write Pandas DataFrame
# Demo: Write Pandas DataFrame
print("\n--- Testing Pandas DataFrame write ---")
df = pd.DataFrame(
{
Expand All @@ -137,6 +150,19 @@ async def main():
append_writer.flush()
print("Successfully flushed data")

# Demo: Check offsets after writes
print("\n--- Checking offsets after writes ---")
try:
# Query with string constant (alternative API - both strings and constants are supported)
offsets = await admin.list_offsets(
table_path,
bucket_ids=[0],
offset_type="latest" # Can also use "earliest" or "timestamp"
)
print(f"Latest offsets after writing 7 records: {offsets}")
except Exception as e:
print(f"Failed to list offsets: {e}")

except Exception as e:
print(f"Error during writing: {e}")

Expand Down Expand Up @@ -178,6 +204,15 @@ async def main():
except Exception as e:
print(f"Error during scanning: {e}")

# Demo: Drop table
print("\n--- Testing drop_table() ---")
try:
# Drop the table (ignore_if_not_exists=True means no error if already dropped)
await admin.drop_table(table_path, ignore_if_not_exists=True)
print(f"Successfully dropped table: {table_path}")
except Exception as e:
print(f"Failed to drop table: {e}")

# Close connection
conn.close()
print("\nConnection closed")
Expand Down
104 changes: 101 additions & 3 deletions bindings/python/src/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl FlussAdmin {
) -> PyResult<Bound<'py, PyAny>> {
let ignore = ignore_if_exists.unwrap_or(false);

let core_table_path = table_path.to_core().clone();
let core_table_path = table_path.to_core();
let core_descriptor = table_descriptor.to_core().clone();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here it's needed, it's reference

let admin = self.__admin.clone();

Expand All @@ -58,7 +58,7 @@ impl FlussAdmin {
py: Python<'py>,
table_path: &TablePath,
) -> PyResult<Bound<'py, PyAny>> {
let core_table_path = table_path.to_core().clone();
let core_table_path = table_path.to_core();
let admin = self.__admin.clone();

future_into_py(py, async move {
Expand All @@ -80,7 +80,7 @@ impl FlussAdmin {
py: Python<'py>,
table_path: &TablePath,
) -> PyResult<Bound<'py, PyAny>> {
let core_table_path = table_path.to_core().clone();
let core_table_path = table_path.to_core();
let admin = self.__admin.clone();

future_into_py(py, async move {
Expand All @@ -96,6 +96,104 @@ impl FlussAdmin {
})
}

/// Drop a table
#[pyo3(signature = (table_path, ignore_if_not_exists=false))]
pub fn drop_table<'py>(
&self,
py: Python<'py>,
table_path: &TablePath,
ignore_if_not_exists: bool,
) -> PyResult<Bound<'py, PyAny>> {
let core_table_path = table_path.to_core();
let admin = self.__admin.clone();

future_into_py(py, async move {
admin
.drop_table(&core_table_path, ignore_if_not_exists)
.await
.map_err(|e| FlussError::new_err(format!("Failed to drop table: {e}")))?;

Python::attach(|py| Ok(py.None()))
})
}

/// List offsets for buckets.
///
/// Args:
/// table_path: Path to the table
/// bucket_ids: List of bucket IDs to query
/// offset_type: Type of offset to retrieve:
/// - "earliest" or OffsetType.EARLIEST: Start of the log
/// - "latest" or OffsetType.LATEST: End of the log
/// - "timestamp" or OffsetType.TIMESTAMP: Offset at given timestamp (requires timestamp arg)
/// timestamp: Required when offset_type is "timestamp", ignored otherwise
///
/// Returns:
/// dict[int, int]: Mapping of bucket_id -> offset
///
/// Example:
/// >>> offsets = await admin.list_offsets(table_path, [0, 1], "latest")
/// >>> print(offsets) # {0: 100, 1: 150}
#[pyo3(signature = (table_path, bucket_ids, offset_type, timestamp=None))]
pub fn list_offsets<'py>(
&self,
py: Python<'py>,
table_path: &TablePath,
bucket_ids: Vec<i32>,
offset_type: &str,
timestamp: Option<i64>,
) -> PyResult<Bound<'py, PyAny>> {
use fcore::rpc::message::OffsetSpec;

// Validate bucket IDs
for &bucket_id in &bucket_ids {
if bucket_id < 0 {
return Err(FlussError::new_err(format!(
"Invalid bucket_id: {}. Bucket IDs must be non-negative",
bucket_id
)));
}
}

let core_table_path = table_path.to_core();
let admin = self.__admin.clone();

// Parse offset_type (case-insensitive, no allocation)
let offset_spec = match offset_type {
s if s.eq_ignore_ascii_case("earliest") => OffsetSpec::Earliest,
s if s.eq_ignore_ascii_case("latest") => OffsetSpec::Latest,
s if s.eq_ignore_ascii_case("timestamp") => {
let ts = timestamp.ok_or_else(|| {
FlussError::new_err(
"timestamp must be provided when offset_type='timestamp'".to_string(),
)
})?;
OffsetSpec::Timestamp(ts)
}
_ => {
return Err(FlussError::new_err(format!(
"Invalid offset_type: '{}'. Must be 'earliest', 'latest', or 'timestamp'",
offset_type
)));
}
};

future_into_py(py, async move {
let offsets = admin
.list_offsets(&core_table_path, &bucket_ids, offset_spec)
.await
.map_err(|e| FlussError::new_err(format!("Failed to list offsets: {e}")))?;

Python::attach(|py| {
let dict = pyo3::types::PyDict::new(py);
for (bucket_id, offset) in offsets {
dict.set_item(bucket_id, offset)?;
}
Ok(dict.unbind())
})
})
}

fn __repr__(&self) -> String {
"FlussAdmin()".to_string()
}
Expand Down
18 changes: 18 additions & 0 deletions bindings/python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,23 @@ static TOKIO_RUNTIME: LazyLock<Runtime> = LazyLock::new(|| {
.expect("Failed to create Tokio runtime")
});

/// Offset type constants for list_offsets()
#[pyclass]
#[derive(Clone)]
pub struct OffsetType;

#[pymethods]
impl OffsetType {
#[classattr]
const EARLIEST: &'static str = "earliest";

#[classattr]
const LATEST: &'static str = "latest";

#[classattr]
const TIMESTAMP: &'static str = "timestamp";
}

#[pymodule]
fn _fluss(m: &Bound<'_, PyModule>) -> PyResult<()> {
// Register all classes
Expand All @@ -59,6 +76,7 @@ fn _fluss(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<LogScanner>()?;
m.add_class::<LakeSnapshot>()?;
m.add_class::<TableBucket>()?;
m.add_class::<OffsetType>()?;

// Register exception types
m.add_class::<FlussError>()?;
Expand Down