Skip to content

Commit

Permalink
Move SessionConfig to datafusion_execution
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Mar 13, 2023
1 parent c6061cf commit e2034d9
Show file tree
Hide file tree
Showing 3 changed files with 365 additions and 338 deletions.
338 changes: 0 additions & 338 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1161,344 +1161,6 @@ impl QueryPlanner for DefaultQueryPlanner {
}
}

/// Map that holds opaque objects indexed by their type.
///
/// Data is wrapped into an [`Arc`] to enable [`Clone`] while still being [object safe].
///
/// [object safe]: https://doc.rust-lang.org/reference/items/traits.html#object-safety
type AnyMap =
HashMap<TypeId, Arc<dyn Any + Send + Sync + 'static>, BuildHasherDefault<IdHasher>>;

/// Hasher for [`AnyMap`].
///
/// With [`TypeId`]s as keys, there's no need to hash them. They are already hashes themselves, coming from the compiler.
/// The [`IdHasher`] just holds the [`u64`] of the [`TypeId`], and then returns it, instead of doing any bit fiddling.
#[derive(Default)]
struct IdHasher(u64);

impl Hasher for IdHasher {
fn write(&mut self, _: &[u8]) {
unreachable!("TypeId calls write_u64");
}

#[inline]
fn write_u64(&mut self, id: u64) {
self.0 = id;
}

#[inline]
fn finish(&self) -> u64 {
self.0
}
}

/// Configuration options for session context
#[derive(Clone)]
pub struct SessionConfig {
/// Configuration options
options: ConfigOptions,
/// Opaque extensions.
extensions: AnyMap,
}

impl Default for SessionConfig {
fn default() -> Self {
Self {
options: ConfigOptions::new(),
// Assume no extensions by default.
extensions: HashMap::with_capacity_and_hasher(
0,
BuildHasherDefault::default(),
),
}
}
}

impl SessionConfig {
/// Create an execution config with default setting
pub fn new() -> Self {
Default::default()
}

/// Create an execution config with config options read from the environment
pub fn from_env() -> Result<Self> {
Ok(ConfigOptions::from_env()?.into())
}

/// Set a configuration option
pub fn set(mut self, key: &str, value: ScalarValue) -> Self {
self.options.set(key, &value.to_string()).unwrap();
self
}

/// Set a boolean configuration option
pub fn set_bool(self, key: &str, value: bool) -> Self {
self.set(key, ScalarValue::Boolean(Some(value)))
}

/// Set a generic `u64` configuration option
pub fn set_u64(self, key: &str, value: u64) -> Self {
self.set(key, ScalarValue::UInt64(Some(value)))
}

/// Set a generic `usize` configuration option
pub fn set_usize(self, key: &str, value: usize) -> Self {
let value: u64 = value.try_into().expect("convert usize to u64");
self.set(key, ScalarValue::UInt64(Some(value)))
}

/// Set a generic `str` configuration option
pub fn set_str(self, key: &str, value: &str) -> Self {
self.set(key, ScalarValue::Utf8(Some(value.to_string())))
}

/// Customize batch size
pub fn with_batch_size(mut self, n: usize) -> Self {
// batch size must be greater than zero
assert!(n > 0);
self.options.execution.batch_size = n;
self
}

/// Customize [`target_partitions`]
///
/// [`target_partitions`]: crate::config::ExecutionOptions::target_partitions
pub fn with_target_partitions(mut self, n: usize) -> Self {
// partition count must be greater than zero
assert!(n > 0);
self.options.execution.target_partitions = n;
self
}

/// Get [`target_partitions`]
///
/// [`target_partitions`]: crate::config::ExecutionOptions::target_partitions
pub fn target_partitions(&self) -> usize {
self.options.execution.target_partitions
}

/// Is the information schema enabled?
pub fn information_schema(&self) -> bool {
self.options.catalog.information_schema
}

/// Should the context create the default catalog and schema?
pub fn create_default_catalog_and_schema(&self) -> bool {
self.options.catalog.create_default_catalog_and_schema
}

/// Are joins repartitioned during execution?
pub fn repartition_joins(&self) -> bool {
self.options.optimizer.repartition_joins
}

/// Are aggregates repartitioned during execution?
pub fn repartition_aggregations(&self) -> bool {
self.options.optimizer.repartition_aggregations
}

/// Are window functions repartitioned during execution?
pub fn repartition_window_functions(&self) -> bool {
self.options.optimizer.repartition_windows
}

/// Do we execute sorts in a per-partition fashion and merge afterwards,
/// or do we coalesce partitions first and sort globally?
pub fn repartition_sorts(&self) -> bool {
self.options.optimizer.repartition_sorts
}

/// Are statistics collected during execution?
pub fn collect_statistics(&self) -> bool {
self.options.execution.collect_statistics
}

/// Selects a name for the default catalog and schema
pub fn with_default_catalog_and_schema(
mut self,
catalog: impl Into<String>,
schema: impl Into<String>,
) -> Self {
self.options.catalog.default_catalog = catalog.into();
self.options.catalog.default_schema = schema.into();
self
}

/// Controls whether the default catalog and schema will be automatically created
pub fn with_create_default_catalog_and_schema(mut self, create: bool) -> Self {
self.options.catalog.create_default_catalog_and_schema = create;
self
}

/// Enables or disables the inclusion of `information_schema` virtual tables
pub fn with_information_schema(mut self, enabled: bool) -> Self {
self.options.catalog.information_schema = enabled;
self
}

/// Enables or disables the use of repartitioning for joins to improve parallelism
pub fn with_repartition_joins(mut self, enabled: bool) -> Self {
self.options.optimizer.repartition_joins = enabled;
self
}

/// Enables or disables the use of repartitioning for aggregations to improve parallelism
pub fn with_repartition_aggregations(mut self, enabled: bool) -> Self {
self.options.optimizer.repartition_aggregations = enabled;
self
}

/// Sets minimum file range size for repartitioning scans
pub fn with_repartition_file_min_size(mut self, size: usize) -> Self {
self.options.optimizer.repartition_file_min_size = size;
self
}

/// Enables or disables the use of repartitioning for file scans
pub fn with_repartition_file_scans(mut self, enabled: bool) -> Self {
self.options.optimizer.repartition_file_scans = enabled;
self
}

/// Enables or disables the use of repartitioning for window functions to improve parallelism
pub fn with_repartition_windows(mut self, enabled: bool) -> Self {
self.options.optimizer.repartition_windows = enabled;
self
}

/// Enables or disables the use of per-partition sorting to improve parallelism
pub fn with_repartition_sorts(mut self, enabled: bool) -> Self {
self.options.optimizer.repartition_sorts = enabled;
self
}

/// Enables or disables the use of pruning predicate for parquet readers to skip row groups
pub fn with_parquet_pruning(mut self, enabled: bool) -> Self {
self.options.execution.parquet.pruning = enabled;
self
}

/// Returns true if pruning predicate should be used to skip parquet row groups
pub fn parquet_pruning(&self) -> bool {
self.options.execution.parquet.pruning
}

/// Enables or disables the collection of statistics after listing files
pub fn with_collect_statistics(mut self, enabled: bool) -> Self {
self.options.execution.collect_statistics = enabled;
self
}

/// Get the currently configured batch size
pub fn batch_size(&self) -> usize {
self.options.execution.batch_size
}

/// Convert configuration options to name-value pairs with values
/// converted to strings.
///
/// Note that this method will eventually be deprecated and
/// replaced by [`config_options`].
///
/// [`config_options`]: Self::config_options
pub fn to_props(&self) -> HashMap<String, String> {
let mut map = HashMap::new();
// copy configs from config_options
for entry in self.options.entries() {
map.insert(entry.key, entry.value.unwrap_or_default());
}

map
}

/// Return a handle to the configuration options.
pub fn config_options(&self) -> &ConfigOptions {
&self.options
}

/// Return a mutable handle to the configuration options.
pub fn config_options_mut(&mut self) -> &mut ConfigOptions {
&mut self.options
}

/// Add extensions.
///
/// Extensions can be used to attach extra data to the session config -- e.g. tracing information or caches.
/// Extensions are opaque and the types are unknown to DataFusion itself, which makes them extremely flexible. [^1]
///
/// Extensions are stored within an [`Arc`] so they do NOT require [`Clone`]. The are immutable. If you need to
/// modify their state over their lifetime -- e.g. for caches -- you need to establish some for of interior mutability.
///
/// Extensions are indexed by their type `T`. If multiple values of the same type are provided, only the last one
/// will be kept.
///
/// You may use [`get_extension`](Self::get_extension) to retrieve extensions.
///
/// # Example
/// ```
/// use std::sync::Arc;
/// use datafusion::execution::context::SessionConfig;
///
/// // application-specific extension types
/// struct Ext1(u8);
/// struct Ext2(u8);
/// struct Ext3(u8);
///
/// let ext1a = Arc::new(Ext1(10));
/// let ext1b = Arc::new(Ext1(11));
/// let ext2 = Arc::new(Ext2(2));
///
/// let cfg = SessionConfig::default()
/// // will only remember the last Ext1
/// .with_extension(Arc::clone(&ext1a))
/// .with_extension(Arc::clone(&ext1b))
/// .with_extension(Arc::clone(&ext2));
///
/// let ext1_received = cfg.get_extension::<Ext1>().unwrap();
/// assert!(!Arc::ptr_eq(&ext1_received, &ext1a));
/// assert!(Arc::ptr_eq(&ext1_received, &ext1b));
///
/// let ext2_received = cfg.get_extension::<Ext2>().unwrap();
/// assert!(Arc::ptr_eq(&ext2_received, &ext2));
///
/// assert!(cfg.get_extension::<Ext3>().is_none());
/// ```
///
/// [^1]: Compare that to [`ConfigOptions`] which only supports [`ScalarValue`] payloads.
pub fn with_extension<T>(mut self, ext: Arc<T>) -> Self
where
T: Send + Sync + 'static,
{
let ext = ext as Arc<dyn Any + Send + Sync + 'static>;
let id = TypeId::of::<T>();
self.extensions.insert(id, ext);
self
}

/// Get extension, if any for the specified type `T` exists.
///
/// See [`with_extension`](Self::with_extension) on how to add attach extensions.
pub fn get_extension<T>(&self) -> Option<Arc<T>>
where
T: Send + Sync + 'static,
{
let id = TypeId::of::<T>();
self.extensions
.get(&id)
.cloned()
.map(|ext| Arc::downcast(ext).expect("TypeId unique"))
}
}

impl From<ConfigOptions> for SessionConfig {
fn from(options: ConfigOptions) -> Self {
Self {
options,
..Default::default()
}
}
}

/// Execution context for registering data sources and executing queries
#[derive(Clone)]
pub struct SessionState {
Expand Down
Loading

0 comments on commit e2034d9

Please sign in to comment.