Skip to content

perf: Improve repr performance #918

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Aug 30, 2024
28 changes: 3 additions & 25 deletions bigframes/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,23 +70,7 @@ def from_pyarrow(cls, arrow_table: pa.Table, session: Session):
iobytes.getvalue(),
data_schema=schema,
session=session,
)
return cls(node)

@classmethod
def from_cached(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this was dead code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, probably from a recent refactor

cls,
original: ArrayValue,
table: google.cloud.bigquery.Table,
ordering: orderings.TotalOrdering,
):
node = nodes.CachedTableNode(
original_node=original.node,
project_id=table.reference.project,
dataset_id=table.reference.dataset_id,
table_id=table.reference.table_id,
physical_schema=tuple(table.schema),
ordering=ordering,
n_rows=arrow_table.num_rows,
)
return cls(node)

Expand All @@ -110,10 +94,7 @@ def from_table(
bigframes.exceptions.PreviewWarning,
)
node = nodes.ReadTableNode(
project_id=table.reference.project,
dataset_id=table.reference.dataset_id,
table_id=table.reference.table_id,
physical_schema=tuple(table.schema),
table=nodes.GbqTable.from_table(table),
total_order_cols=(offsets_col,) if offsets_col else tuple(primary_key),
order_col_is_sequential=(offsets_col is not None),
columns=schema,
Expand Down Expand Up @@ -154,10 +135,7 @@ def as_cached(
"""
node = nodes.CachedTableNode(
original_node=self.node,
project_id=cache_table.reference.project,
dataset_id=cache_table.reference.dataset_id,
table_id=cache_table.reference.table_id,
physical_schema=tuple(cache_table.schema),
table=nodes.GbqTable.from_table(cache_table),
ordering=ordering,
)
return ArrayValue(node)
Expand Down
26 changes: 10 additions & 16 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ def index(self) -> BlockIndexProperties:
@functools.cached_property
def shape(self) -> typing.Tuple[int, int]:
"""Returns dimensions as (length, width) tuple."""

row_count_expr = self.expr.row_count()

# Support in-memory engines for hermetic unit tests.
Expand All @@ -210,8 +211,7 @@ def shape(self) -> typing.Tuple[int, int]:
except Exception:
pass

iter, _ = self.session._execute(row_count_expr, ordered=False)
row_count = next(iter)[0]
row_count = self.session._executor.get_row_count(self.expr)
return (row_count, len(self.value_columns))

@property
Expand Down Expand Up @@ -560,7 +560,7 @@ def to_pandas(
def try_peek(
self, n: int = 20, force: bool = False
) -> typing.Optional[pd.DataFrame]:
if force or tree_properties.peekable(self.expr.node):
if force or tree_properties.can_fast_peek(self.expr.node):
iterator, _ = self.session._peek(self.expr, n)
df = self._to_dataframe(iterator)
self._copy_index_to_pandas(df)
Expand Down Expand Up @@ -1587,19 +1587,13 @@ def retrieve_repr_request_results(

Returns a tuple of the dataframe and the overall number of rows of the query.
"""
# TODO(swast): Select a subset of columns if max_columns is less than the
# number of columns in the schema.
count = self.shape[0]
if count > max_results:
head_block = self.slice(0, max_results)
else:
head_block = self
computed_df, query_job = head_block.to_pandas()
formatted_df = computed_df.set_axis(self.column_labels, axis=1)
# we reset the axis and substitute the bf index name(s) for the default
if len(self.index.names) > 0:
formatted_df.index.names = self.index.names # type: ignore
return formatted_df, count, query_job

results, query_job = self.session._executor.head(self.expr, max_results)
count = self.session._executor.get_row_count(self.expr)

computed_df = self._to_dataframe(results)
self._copy_index_to_pandas(computed_df)
return computed_df, count, query_job

def promote_offsets(self, label: Label = None) -> typing.Tuple[Block, str]:
result_id = guid.generate_guid()
Expand Down
12 changes: 8 additions & 4 deletions bigframes/core/compile/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,16 @@ def compile_readlocal(self, node: nodes.ReadLocalNode, ordered: bool = True):

@_compile_node.register
def compile_cached_table(self, node: nodes.CachedTableNode, ordered: bool = True):
full_table_name = f"{node.project_id}.{node.dataset_id}.{node.table_id}"
full_table_name = (
f"{node.table.project_id}.{node.table.dataset_id}.{node.table.table_id}"
)
used_columns = (
*node.schema.names,
*node.hidden_columns,
)
# Physical schema might include unused columns, unsupported datatypes like JSON
physical_schema = ibis.backends.bigquery.BigQuerySchema.to_ibis(
list(i for i in node.physical_schema if i.name in used_columns)
list(i for i in node.table.physical_schema if i.name in used_columns)
)
ibis_table = ibis.table(physical_schema, full_table_name)
if ordered:
Expand Down Expand Up @@ -156,14 +158,16 @@ def compile_readtable(self, node: nodes.ReadTableNode, ordered: bool = True):
def read_table_as_unordered_ibis(
self, node: nodes.ReadTableNode
) -> ibis.expr.types.Table:
full_table_name = f"{node.project_id}.{node.dataset_id}.{node.table_id}"
full_table_name = (
f"{node.table.project_id}.{node.table.dataset_id}.{node.table.table_id}"
)
used_columns = (
*node.schema.names,
*[i for i in node.total_order_cols if i not in node.schema.names],
)
# Physical schema might include unused columns, unsupported datatypes like JSON
physical_schema = ibis.backends.bigquery.BigQuerySchema.to_ibis(
list(i for i in node.physical_schema if i.name in used_columns)
list(i for i in node.table.physical_schema if i.name in used_columns)
)
if node.at_time is not None or node.sql_predicate is not None:
import bigframes.session._io.bigquery
Expand Down
120 changes: 81 additions & 39 deletions bigframes/core/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,18 +312,36 @@ def transform_children(

# Input Nodex
@dataclass(frozen=True)
class ReadLocalNode(BigFrameNode):
class LeafNode(BigFrameNode):
@property
def roots(self) -> typing.Set[BigFrameNode]:
return {self}

@property
def supports_fast_head(self) -> bool:
return False

def transform_children(
self, t: Callable[[BigFrameNode], BigFrameNode]
) -> BigFrameNode:
return self

@property
def row_count(self) -> typing.Optional[int]:
"""How many rows are in the data source. None means unknown."""
return None


@dataclass(frozen=True)
class ReadLocalNode(LeafNode):
feather_bytes: bytes
data_schema: schemata.ArraySchema
n_rows: int
session: typing.Optional[bigframes.session.Session] = None

def __hash__(self):
return self._node_hash

@property
def roots(self) -> typing.Set[BigFrameNode]:
return {self}

@functools.cached_property
def schema(self) -> schemata.ArraySchema:
return self.data_schema
Expand All @@ -333,6 +351,10 @@ def variables_introduced(self) -> int:
"""Defines the number of variables generated by the current node. Used to estimate query planning complexity."""
return len(self.schema.items) + 1

@property
def supports_fast_head(self) -> bool:
return True

@property
def order_ambiguous(self) -> bool:
return False
Expand All @@ -341,20 +363,38 @@ def order_ambiguous(self) -> bool:
def explicitly_ordered(self) -> bool:
return True

def transform_children(
self, t: Callable[[BigFrameNode], BigFrameNode]
) -> BigFrameNode:
return self
@property
def row_count(self) -> typing.Optional[int]:
return self.n_rows


## Put ordering in here or just add order_by node above?
@dataclass(frozen=True)
class ReadTableNode(BigFrameNode):
class GbqTable:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is so we can get something hashable? So we can make sure we only have the fields we care about? A docstring with the purpose would be helpful here.

project_id: str = field()
dataset_id: str = field()
table_id: str = field()

physical_schema: Tuple[bq.SchemaField, ...] = field()
n_rows: int = field()
cluster_cols: typing.Optional[Tuple[str, ...]]

@staticmethod
def from_table(table: bq.Table) -> GbqTable:
return GbqTable(
project_id=table.project,
dataset_id=table.dataset_id,
table_id=table.table_id,
physical_schema=tuple(table.schema),
n_rows=table.num_rows,
cluster_cols=None
if table.clustering_fields is None
else tuple(table.clustering_fields),
)


## Put ordering in here or just add order_by node above?
@dataclass(frozen=True)
class ReadTableNode(LeafNode):
table: GbqTable
# Subset of physical schema columns, with chosen BQ types
columns: schemata.ArraySchema = field()

Expand All @@ -370,10 +410,10 @@ class ReadTableNode(BigFrameNode):

def __post_init__(self):
# enforce invariants
physical_names = set(map(lambda i: i.name, self.physical_schema))
physical_names = set(map(lambda i: i.name, self.table.physical_schema))
if not set(self.columns.names).issubset(physical_names):
raise ValueError(
f"Requested schema {self.columns} cannot be derived from table schemal {self.physical_schema}"
f"Requested schema {self.columns} cannot be derived from table schemal {self.table.physical_schema}"
)
if self.order_col_is_sequential and len(self.total_order_cols) != 1:
raise ValueError("Sequential primary key must have only one component")
Expand All @@ -385,10 +425,6 @@ def session(self):
def __hash__(self):
return self._node_hash

@property
def roots(self) -> typing.Set[BigFrameNode]:
return {self}

@property
def schema(self) -> schemata.ArraySchema:
return self.columns
Expand All @@ -398,6 +434,13 @@ def relation_ops_created(self) -> int:
# Assume worst case, where readgbq actually has baked in analytic operation to generate index
return 3

@property
def supports_fast_head(self) -> bool:
# Fast head is only supported when row offsets are available.
# In the future, ORDER BY+LIMIT optimizations may allow fast head when
# clustered and/or partitioned on ordering key
return self.order_col_is_sequential

@property
def order_ambiguous(self) -> bool:
return len(self.total_order_cols) == 0
Expand All @@ -410,37 +453,34 @@ def explicitly_ordered(self) -> bool:
def variables_introduced(self) -> int:
return len(self.schema.items) + 1

def transform_children(
self, t: Callable[[BigFrameNode], BigFrameNode]
) -> BigFrameNode:
return self
@property
def row_count(self) -> typing.Optional[int]:
if self.sql_predicate is None:
return self.table.n_rows
return None


# This node shouldn't be used in the "original" expression tree, only used as replacement for original during planning
@dataclass(frozen=True)
class CachedTableNode(BigFrameNode):
class CachedTableNode(LeafNode):
# The original BFET subtree that was cached
# note: this isn't a "child" node.
original_node: BigFrameNode = field()
# reference to cached materialization of original_node
project_id: str = field()
dataset_id: str = field()
table_id: str = field()
physical_schema: Tuple[bq.SchemaField, ...] = field()

table: GbqTable
ordering: typing.Optional[orderings.RowOrdering] = field()

def __post_init__(self):
# enforce invariants
physical_names = set(map(lambda i: i.name, self.physical_schema))
physical_names = set(map(lambda i: i.name, self.table.physical_schema))
logical_names = self.original_node.schema.names
if not set(logical_names).issubset(physical_names):
raise ValueError(
f"Requested schema {logical_names} cannot be derived from table schema {self.physical_schema}"
f"Requested schema {logical_names} cannot be derived from table schema {self.table.physical_schema}"
)
if not set(self.hidden_columns).issubset(physical_names):
raise ValueError(
f"Requested hidden columns {self.hidden_columns} cannot be derived from table schema {self.physical_schema}"
f"Requested hidden columns {self.hidden_columns} cannot be derived from table schema {self.table.physical_schema}"
)

@property
Expand All @@ -450,10 +490,6 @@ def session(self):
def __hash__(self):
return self._node_hash

@property
def roots(self) -> typing.Set[BigFrameNode]:
return {self}

@property
def schema(self) -> schemata.ArraySchema:
return self.original_node.schema
Expand All @@ -473,6 +509,13 @@ def hidden_columns(self) -> typing.Tuple[str, ...]:
if col not in self.schema.names
)

@property
def supports_fast_head(self) -> bool:
# Fast head is only supported when row offsets are available.
# In the future, ORDER BY+LIMIT optimizations may allow fast head when
# clustered and/or partitioned on ordering key
return (self.ordering is None) or self.ordering.is_sequential

@property
def order_ambiguous(self) -> bool:
return not isinstance(self.ordering, orderings.TotalOrdering)
Expand All @@ -483,10 +526,9 @@ def explicitly_ordered(self) -> bool:
self.ordering.all_ordering_columns
) > 0

def transform_children(
self, t: Callable[[BigFrameNode], BigFrameNode]
) -> BigFrameNode:
return self
@property
def row_count(self) -> typing.Optional[int]:
return self.table.n_rows


# Unary nodes
Expand Down
Loading