Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Require Debug for TableProvider, TableProviderFactory and PartitionStream #12557

Merged
merged 7 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion-cli/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ pub fn display_all_functions() -> Result<()> {
}

/// PARQUET_META table function
#[derive(Debug)]
struct ParquetMetadataTable {
schema: SchemaRef,
batch: RecordBatch,
Expand Down
1 change: 1 addition & 0 deletions datafusion-examples/examples/simple_udtf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ async fn main() -> Result<()> {
/// Usage: `read_csv(filename, [limit])`
///
/// [`read_csv`]: https://duckdb.org/docs/data/csv/overview.html
#[derive(Debug)]
struct LocalCsvTable {
schema: SchemaRef,
limit: Option<usize>,
Expand Down
8 changes: 5 additions & 3 deletions datafusion/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use std::any::Any;
use std::borrow::Cow;
use std::fmt::Debug;
use std::sync::Arc;

use crate::session::Session;
Expand All @@ -31,7 +32,7 @@ use datafusion_physical_plan::ExecutionPlan;

/// Source table
#[async_trait]
pub trait TableProvider: Sync + Send {
pub trait TableProvider: Debug + Sync + Send {
/// Returns the table provider as [`Any`](std::any::Any) so that it can be
/// downcast to a specific implementation.
fn as_any(&self) -> &dyn Any;
Expand Down Expand Up @@ -193,6 +194,7 @@ pub trait TableProvider: Sync + Send {
/// # use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
/// # use datafusion_physical_plan::ExecutionPlan;
/// // Define a struct that implements the TableProvider trait
/// #[derive(Debug)]
/// struct TestDataSource {}
///
/// #[async_trait]
Expand All @@ -212,7 +214,7 @@ pub trait TableProvider: Sync + Send {
/// // This example only supports a between expr with a single column named "c1".
/// Expr::Between(between_expr) => {
/// between_expr.expr
/// .try_into_col()
/// .try_as_col()
/// .map(|column| {
/// if column.name == "c1" {
/// TableProviderFilterPushDown::Exact
Expand Down Expand Up @@ -283,7 +285,7 @@ pub trait TableProvider: Sync + Send {
/// For example, this can be used to create a table "on the fly"
/// from a directory of files only when that name is referenced.
#[async_trait]
pub trait TableProviderFactory: Sync + Send {
pub trait TableProviderFactory: Debug + Sync + Send {
/// Create a TableProvider with the given url
async fn create(
&self,
Expand Down
22 changes: 18 additions & 4 deletions datafusion/core/src/catalog_common/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@
//!
//! [Information Schema]: https://en.wikipedia.org/wiki/Information_schema

use async_trait::async_trait;
use datafusion_common::DataFusionError;
use std::{any::Any, sync::Arc};

use arrow::{
array::{StringBuilder, UInt64Builder},
datatypes::{DataType, Field, Schema, SchemaRef},
record_batch::RecordBatch,
};
use async_trait::async_trait;
use datafusion_common::DataFusionError;
use std::fmt::{Debug, Formatter};
use std::{any::Any, sync::Arc};

use crate::catalog::{CatalogProviderList, SchemaProvider, TableProvider};
use crate::datasource::streaming::StreamingTable;
Expand Down Expand Up @@ -75,6 +75,15 @@ struct InformationSchemaConfig {
catalog_list: Arc<dyn CatalogProviderList>,
}

impl Debug for InformationSchemaConfig {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("InformationSchemaConfig")
// TODO it would be great to print the catalog list here
// but that would require CatalogProviderList to implement Debug
.finish_non_exhaustive()
}
}

impl InformationSchemaConfig {
/// Construct the `information_schema.tables` virtual table
async fn make_tables(
Expand Down Expand Up @@ -246,6 +255,7 @@ impl SchemaProvider for InformationSchemaProvider {
}
}

#[derive(Debug)]
struct InformationSchemaTables {
schema: SchemaRef,
config: InformationSchemaConfig,
Expand Down Expand Up @@ -337,6 +347,7 @@ impl InformationSchemaTablesBuilder {
}
}

#[derive(Debug)]
struct InformationSchemaViews {
schema: SchemaRef,
config: InformationSchemaConfig,
Expand Down Expand Up @@ -424,6 +435,7 @@ impl InformationSchemaViewBuilder {
}
}

#[derive(Debug)]
struct InformationSchemaColumns {
schema: SchemaRef,
config: InformationSchemaConfig,
Expand Down Expand Up @@ -640,6 +652,7 @@ impl InformationSchemaColumnsBuilder {
}
}

#[derive(Debug)]
struct InformationSchemata {
schema: SchemaRef,
config: InformationSchemaConfig,
Expand Down Expand Up @@ -741,6 +754,7 @@ impl PartitionStream for InformationSchemata {
}
}

#[derive(Debug)]
struct InformationSchemaDfSettings {
schema: SchemaRef,
config: InformationSchemaConfig,
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1873,6 +1873,7 @@ impl DataFrame {
}
}

#[derive(Debug)]
struct DataFrameTableProvider {
plan: LogicalPlan,
}
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/cte_worktable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use crate::datasource::{TableProvider, TableType};
/// The temporary working table where the previous iteration of a recursive query is stored
/// Naming is based on PostgreSQL's implementation.
/// See here for more details: www.postgresql.org/docs/11/queries-with.html#id-1.5.6.12.5.4
#[derive(Debug)]
pub struct CteWorkTable {
/// The name of the CTE work table
// WIP, see https://github.com/apache/datafusion/issues/462
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/empty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use crate::physical_plan::{empty::EmptyExec, ExecutionPlan};

/// An empty plan that is useful for testing and generating plans
/// without mapping them to actual data.
#[derive(Debug)]
pub struct EmptyTable {
schema: SchemaRef,
partitions: usize,
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,7 @@ impl ListingOptions {
/// # Ok(())
/// # }
/// ```
#[derive(Debug)]
pub struct ListingTable {
table_paths: Vec<ListingTableUrl>,
/// File fields only
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/src/datasource/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ impl StreamConfig {
///
/// [Hadoop]: https://hadoop.apache.org/
/// [`ListingTable`]: crate::datasource::listing::ListingTable
#[derive(Debug)]
pub struct StreamTable(Arc<StreamConfig>);

impl StreamTable {
Expand Down Expand Up @@ -370,6 +371,7 @@ impl TableProvider for StreamTable {
}
}

#[derive(Debug)]
struct StreamRead(Arc<StreamConfig>);

impl PartitionStream for StreamRead {
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use datafusion_expr::{Expr, TableType};
use log::debug;

/// A [`TableProvider`] that streams a set of [`PartitionStream`]
#[derive(Debug)]
pub struct StreamingTable {
schema: SchemaRef,
partitions: Vec<Arc<dyn PartitionStream>>,
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use datafusion_optimizer::Analyzer;
use crate::datasource::{TableProvider, TableType};

/// An implementation of `TableProvider` that uses another logical plan.
#[derive(Debug)]
pub struct ViewTable {
/// LogicalPlan of the view
logical_plan: LogicalPlan,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1831,6 +1831,7 @@ mod tests {

#[test]
fn test_streaming_table_after_projection() -> Result<()> {
#[derive(Debug)]
struct DummyStreamPartition {
schema: SchemaRef,
}
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ pub fn csv_exec_sorted(
}

// construct a stream partition for test purposes
#[derive(Debug)]
pub(crate) struct TestStreamPartition {
pub schema: SchemaRef,
}
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/src/test_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ pub fn populate_csv_partitions(
}

/// TableFactory for tests
#[derive(Default, Debug)]
pub struct TestTableFactory {}

#[async_trait]
Expand All @@ -191,6 +192,7 @@ impl TableProviderFactory for TestTableFactory {
}

/// TableProvider for testing purposes
#[derive(Debug)]
pub struct TestTableProvider {
/// URL of table files or folder
pub url: String,
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/tests/custom_sources_cases/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ macro_rules! TEST_CUSTOM_RECORD_BATCH {

//--- Custom source dataframe tests ---//

#[derive(Debug)]
struct CustomTableProvider;

#[derive(Debug, Clone)]
struct CustomExecutionPlan {
projection: Option<Vec<usize>>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ impl ExecutionPlan for CustomPlan {
}
}

#[derive(Clone)]
#[derive(Clone, Debug)]
struct CustomProvider {
zero_batch: RecordBatch,
one_batch: RecordBatch,
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/tests/memory_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,7 @@ fn batches_byte_size(batches: &[RecordBatch]) -> usize {
batches.iter().map(|b| b.get_array_memory_size()).sum()
}

#[derive(Debug)]
struct DummyStreamPartition {
schema: SchemaRef,
batches: Vec<RecordBatch>,
Expand All @@ -798,6 +799,7 @@ impl PartitionStream for DummyStreamPartition {
}

/// Wrapper over a TableProvider that can provide ordering information
#[derive(Debug)]
struct SortedTableProvider {
schema: SchemaRef,
batches: Vec<Vec<RecordBatch>>,
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/tests/physical_optimizer/limit_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec};
use datafusion_physical_plan::{get_plan_string, ExecutionPlan, ExecutionPlanProperties};
use std::sync::Arc;

#[derive(Debug)]
struct DummyStreamPartition {
schema: SchemaRef,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ async fn test_deregister_udtf() -> Result<()> {
Ok(())
}

#[derive(Debug)]
struct SimpleCsvTable {
schema: SchemaRef,
exprs: Vec<Expr>,
Expand Down
3 changes: 2 additions & 1 deletion datafusion/physical-plan/src/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! Generic plans for deferred execution: [`StreamingTableExec`] and [`PartitionStream`]

use std::any::Any;
use std::fmt::Debug;
use std::sync::Arc;

use super::{DisplayAs, DisplayFormatType, ExecutionMode, PlanProperties};
Expand All @@ -42,7 +43,7 @@ use log::debug;
/// Combined with [`StreamingTableExec`], you can use this trait to implement
/// [`ExecutionPlan`] for a custom source with less boiler plate than
/// implementing `ExecutionPlan` directly for many use cases.
pub trait PartitionStream: Send + Sync {
pub trait PartitionStream: Debug + Send + Sync {
/// Returns the schema of this partition
fn schema(&self) -> &SchemaRef;

Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ pub fn mem_exec(partitions: usize) -> MemoryExec {
}

// construct a stream partition for test purposes
#[derive(Debug)]
pub struct TestPartitionStream {
pub schema: SchemaRef,
pub batches: Vec<RecordBatch>,
Expand Down
1 change: 1 addition & 0 deletions datafusion/sqllogictest/src/test_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ pub async fn register_partition_table(test_ctx: &mut TestContext) {

// registers a LOCAL TEMPORARY table.
pub async fn register_temp_table(ctx: &SessionContext) {
#[derive(Debug)]
struct TestTable(TableType);

#[async_trait]
Expand Down
Loading