-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Description
Describe the bug
Hi, I've recently started using DataFusion and have run into an issue trying to copy some results into a local cache implemented using the Memtable. Here is the code:
// execute the query and get the DataFrame:
let df = self
.ctx
.execute_logical_plan(plan.clone())
.await
.map_err(anyhow::Error::msg)?;
// print total number of rows
info!("Number of DB results {}", df.clone().count().await?);
// insert into the local table
let plan = cached_table.provider
.insert_into(
&self.ctx.state(),
df.clone().create_physical_plan().await?,
InsertOp::Append,
)
.await?;
let task_ctx = self.ctx.task_ctx();
let mut stream = plan.execute(0, task_ctx)?;
// print number of rows inserted
while let Some(batch) = stream.try_next().await? {
let rows = batch.column_by_name("count");
info!(
"Inserted {:?} rows into cache table {}",
rows,
id
);
}
There are 731 result rows, but every time I run this only 87 rows are inserted into the MemTable/cache. I've confirmed this is the accurate count of rows inserted because some later code scans this cache and indeed finds only 87 rows. The number 87 is consistent for me, with the 731 original rows, but varies slightly depending on how many total rows there are -- for example, my teammate had 751 rows in his backing db, and saw it repeatedly insert only 90 rows instead of 87.
Any idea why only a small subset of these rows are being inserted? There is only 1 partition btw (according to plan.output_partitioning().partition_count())
)
To Reproduce
No response
Expected behavior
We expect all 731 rows (or however many are in the dataframe according to df.count()
) to be inserted into the cache table
Additional context
No response