Skip to content

Commit aee4159

Browse files
feat: Use validated local storage for data uploads (#1612)
1 parent ffa597c commit aee4159

File tree

14 files changed

+466
-381
lines changed

14 files changed

+466
-381
lines changed

Diff for: bigframes/core/array_value.py

+9-2
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ def from_table(
9797
at_time: Optional[datetime.datetime] = None,
9898
primary_key: Sequence[str] = (),
9999
offsets_col: Optional[str] = None,
100+
n_rows: Optional[int] = None,
100101
):
101102
if offsets_col and primary_key:
102103
raise ValueError("must set at most one of 'offests', 'primary_key'")
@@ -126,7 +127,11 @@ def from_table(
126127
)
127128
)
128129
source_def = nodes.BigqueryDataSource(
129-
table=table_def, at_time=at_time, sql_predicate=predicate, ordering=ordering
130+
table=table_def,
131+
at_time=at_time,
132+
sql_predicate=predicate,
133+
ordering=ordering,
134+
n_rows=n_rows,
130135
)
131136
node = nodes.ReadTableNode(
132137
source=source_def,
@@ -176,7 +181,9 @@ def as_cached(
176181
Replace the node with an equivalent one that references a table where the value has been materialized to.
177182
"""
178183
table = nodes.GbqTable.from_table(cache_table)
179-
source = nodes.BigqueryDataSource(table, ordering=ordering)
184+
source = nodes.BigqueryDataSource(
185+
table, ordering=ordering, n_rows=cache_table.num_rows
186+
)
180187
# Assumption: GBQ cached table uses field name as bq column name
181188
scan_list = nodes.ScanList(
182189
tuple(

Diff for: bigframes/core/blocks.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -2713,11 +2713,13 @@ def _get_rows_as_json_values(self) -> Block:
27132713
)
27142714
)
27152715

2716+
dest_table = self.session.bqclient.get_table(destination)
27162717
expr = core.ArrayValue.from_table(
2717-
self.session.bqclient.get_table(destination),
2718+
dest_table,
27182719
schema=new_schema,
27192720
session=self.session,
27202721
offsets_col=ordering_column_name,
2722+
n_rows=dest_table.num_rows,
27212723
).drop_columns([ordering_column_name])
27222724
block = Block(
27232725
expr,

Diff for: bigframes/core/local_data.py

+178-27
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,17 @@
1818

1919
import dataclasses
2020
import functools
21-
from typing import cast, Union
21+
import io
22+
import itertools
23+
import json
24+
from typing import Any, Callable, cast, Generator, Iterable, Literal, Optional, Union
2225
import uuid
2326

2427
import geopandas # type: ignore
2528
import numpy as np
2629
import pandas
2730
import pyarrow as pa
31+
import pyarrow.parquet # type: ignore
2832

2933
import bigframes.core.schema as schemata
3034
import bigframes.dtypes
@@ -42,7 +46,9 @@ def from_arrow(cls, table: pa.Table) -> LocalTableMetadata:
4246

4347
_MANAGED_STORAGE_TYPES_OVERRIDES: dict[bigframes.dtypes.Dtype, pa.DataType] = {
4448
# wkt to be precise
45-
bigframes.dtypes.GEO_DTYPE: pa.string()
49+
bigframes.dtypes.GEO_DTYPE: pa.string(),
50+
# Just json as string
51+
bigframes.dtypes.JSON_DTYPE: pa.string(),
4652
}
4753

4854

@@ -90,6 +96,50 @@ def from_pyarrow(self, table: pa.Table) -> ManagedArrowTable:
9096
schemata.ArraySchema(tuple(fields)),
9197
)
9298

99+
def to_parquet(
100+
self,
101+
dst: Union[str, io.IOBase],
102+
*,
103+
offsets_col: Optional[str] = None,
104+
geo_format: Literal["wkb", "wkt"] = "wkt",
105+
duration_type: Literal["int", "duration"] = "duration",
106+
json_type: Literal["string"] = "string",
107+
):
108+
pa_table = self.data
109+
if offsets_col is not None:
110+
pa_table = pa_table.append_column(
111+
offsets_col, pa.array(range(pa_table.num_rows), type=pa.int64())
112+
)
113+
if geo_format != "wkt":
114+
raise NotImplementedError(f"geo format {geo_format} not yet implemented")
115+
if duration_type != "duration":
116+
raise NotImplementedError(
117+
f"duration as {duration_type} not yet implemented"
118+
)
119+
assert json_type == "string"
120+
pyarrow.parquet.write_table(pa_table, where=dst)
121+
122+
def itertuples(
123+
self,
124+
*,
125+
geo_format: Literal["wkb", "wkt"] = "wkt",
126+
duration_type: Literal["int", "timedelta"] = "timedelta",
127+
json_type: Literal["string", "object"] = "string",
128+
) -> Iterable[tuple]:
129+
"""
130+
Yield each row as an unlabeled tuple.
131+
132+
Row-wise iteration of columnar data is slow, avoid if possible.
133+
"""
134+
for row_dict in _iter_table(
135+
self.data,
136+
self.schema,
137+
geo_format=geo_format,
138+
duration_type=duration_type,
139+
json_type=json_type,
140+
):
141+
yield tuple(row_dict.values())
142+
93143
def validate(self):
94144
# TODO: Content-based validation for some datatypes (eg json, wkt, list) where logical domain is smaller than pyarrow type
95145
for bf_field, arrow_field in zip(self.schema.items, self.data.schema):
@@ -101,11 +151,78 @@ def validate(self):
101151
)
102152

103153

104-
def _get_managed_storage_type(dtype: bigframes.dtypes.Dtype) -> pa.DataType:
105-
if dtype in _MANAGED_STORAGE_TYPES_OVERRIDES.keys():
106-
return _MANAGED_STORAGE_TYPES_OVERRIDES[dtype]
107-
else:
108-
return bigframes.dtypes.bigframes_dtype_to_arrow_dtype(dtype)
154+
# Sequential iterator, but could split into batches and leverage parallelism for speed
155+
def _iter_table(
156+
table: pa.Table,
157+
schema: schemata.ArraySchema,
158+
*,
159+
geo_format: Literal["wkb", "wkt"] = "wkt",
160+
duration_type: Literal["int", "timedelta"] = "timedelta",
161+
json_type: Literal["string", "object"] = "string",
162+
) -> Generator[dict[str, Any], None, None]:
163+
"""For when you feel like iterating row-wise over a column store. Don't expect speed."""
164+
165+
if geo_format != "wkt":
166+
raise NotImplementedError(f"geo format {geo_format} not yet implemented")
167+
168+
@functools.singledispatch
169+
def iter_array(
170+
array: pa.Array, dtype: bigframes.dtypes.Dtype
171+
) -> Generator[Any, None, None]:
172+
values = array.to_pylist()
173+
if dtype == bigframes.dtypes.JSON_DTYPE:
174+
if json_type == "object":
175+
yield from map(lambda x: json.loads(x) if x is not None else x, values)
176+
else:
177+
yield from values
178+
elif dtype == bigframes.dtypes.TIMEDELTA_DTYPE:
179+
if duration_type == "int":
180+
yield from map(
181+
lambda x: ((x.days * 3600 * 24) + x.seconds) * 1_000_000
182+
+ x.microseconds
183+
if x is not None
184+
else x,
185+
values,
186+
)
187+
else:
188+
yield from values
189+
else:
190+
yield from values
191+
192+
@iter_array.register
193+
def _(
194+
array: pa.ListArray, dtype: bigframes.dtypes.Dtype
195+
) -> Generator[Any, None, None]:
196+
value_generator = iter_array(
197+
array.flatten(), bigframes.dtypes.get_array_inner_type(dtype)
198+
)
199+
for (start, end) in itertools.pairwise(array.offsets):
200+
arr_size = end.as_py() - start.as_py()
201+
yield list(itertools.islice(value_generator, arr_size))
202+
203+
@iter_array.register
204+
def _(
205+
array: pa.StructArray, dtype: bigframes.dtypes.Dtype
206+
) -> Generator[Any, None, None]:
207+
# yield from each subarray
208+
sub_generators: dict[str, Generator[Any, None, None]] = {}
209+
for field_name, dtype in bigframes.dtypes.get_struct_fields(dtype).items():
210+
sub_generators[field_name] = iter_array(array.field(field_name), dtype)
211+
212+
keys = list(sub_generators.keys())
213+
for row_values in zip(*sub_generators.values()):
214+
yield {key: value for key, value in zip(keys, row_values)}
215+
216+
for batch in table.to_batches():
217+
sub_generators: dict[str, Generator[Any, None, None]] = {}
218+
for field in schema.items:
219+
sub_generators[field.column] = iter_array(
220+
batch.column(field.column), field.dtype
221+
)
222+
223+
keys = list(sub_generators.keys())
224+
for row_values in zip(*sub_generators.values()):
225+
yield {key: value for key, value in zip(keys, row_values)}
109226

110227

111228
def _adapt_pandas_series(
@@ -117,32 +234,63 @@ def _adapt_pandas_series(
117234
return pa.array(series, type=pa.string()), bigframes.dtypes.GEO_DTYPE
118235
try:
119236
return _adapt_arrow_array(pa.array(series))
120-
except Exception as e:
237+
except pa.ArrowInvalid as e:
121238
if series.dtype == np.dtype("O"):
122239
try:
123-
series = series.astype(bigframes.dtypes.GEO_DTYPE)
240+
return _adapt_pandas_series(series.astype(bigframes.dtypes.GEO_DTYPE))
124241
except TypeError:
242+
# Prefer original error
125243
pass
126244
raise e
127245

128246

129247
def _adapt_arrow_array(
130248
array: Union[pa.ChunkedArray, pa.Array]
131249
) -> tuple[Union[pa.ChunkedArray, pa.Array], bigframes.dtypes.Dtype]:
132-
target_type = _arrow_type_replacements(array.type)
250+
target_type = _logical_type_replacements(array.type)
133251
if target_type != array.type:
134252
# TODO: Maybe warn if lossy conversion?
135253
array = array.cast(target_type)
136254
bf_type = bigframes.dtypes.arrow_dtype_to_bigframes_dtype(target_type)
255+
137256
storage_type = _get_managed_storage_type(bf_type)
138257
if storage_type != array.type:
139-
raise TypeError(
140-
f"Expected {bf_type} to use arrow {storage_type}, instead got {array.type}"
141-
)
258+
array = array.cast(storage_type)
142259
return array, bf_type
143260

144261

145-
def _arrow_type_replacements(type: pa.DataType) -> pa.DataType:
262+
def _get_managed_storage_type(dtype: bigframes.dtypes.Dtype) -> pa.DataType:
263+
if dtype in _MANAGED_STORAGE_TYPES_OVERRIDES.keys():
264+
return _MANAGED_STORAGE_TYPES_OVERRIDES[dtype]
265+
return _physical_type_replacements(
266+
bigframes.dtypes.bigframes_dtype_to_arrow_dtype(dtype)
267+
)
268+
269+
270+
def _recursive_map_types(
271+
f: Callable[[pa.DataType], pa.DataType]
272+
) -> Callable[[pa.DataType], pa.DataType]:
273+
@functools.wraps(f)
274+
def recursive_f(type: pa.DataType) -> pa.DataType:
275+
if pa.types.is_list(type):
276+
new_field_t = recursive_f(type.value_type)
277+
if new_field_t != type.value_type:
278+
return pa.list_(new_field_t)
279+
return type
280+
if pa.types.is_struct(type):
281+
struct_type = cast(pa.StructType, type)
282+
new_fields: list[pa.Field] = []
283+
for i in range(struct_type.num_fields):
284+
field = struct_type.field(i)
285+
new_fields.append(field.with_type(recursive_f(field.type)))
286+
return pa.struct(new_fields)
287+
return f(type)
288+
289+
return recursive_f
290+
291+
292+
@_recursive_map_types
293+
def _logical_type_replacements(type: pa.DataType) -> pa.DataType:
146294
if pa.types.is_timestamp(type):
147295
# This is potentially lossy, but BigFrames doesn't support ns
148296
new_tz = "UTC" if (type.tz is not None) else None
@@ -160,21 +308,24 @@ def _arrow_type_replacements(type: pa.DataType) -> pa.DataType:
160308
if pa.types.is_large_string(type):
161309
# simple string type can handle the largest strings needed
162310
return pa.string()
311+
if pa.types.is_dictionary(type):
312+
return _logical_type_replacements(type.value_type)
163313
if pa.types.is_null(type):
164314
# null as a type not allowed, default type is float64 for bigframes
165315
return pa.float64()
166-
if pa.types.is_list(type):
167-
new_field_t = _arrow_type_replacements(type.value_type)
168-
if new_field_t != type.value_type:
169-
return pa.list_(new_field_t)
170-
return type
171-
if pa.types.is_struct(type):
172-
struct_type = cast(pa.StructType, type)
173-
new_fields: list[pa.Field] = []
174-
for i in range(struct_type.num_fields):
175-
field = struct_type.field(i)
176-
field.with_type(_arrow_type_replacements(field.type))
177-
new_fields.append(field.with_type(_arrow_type_replacements(field.type)))
178-
return pa.struct(new_fields)
179316
else:
180317
return type
318+
319+
320+
_ARROW_MANAGED_STORAGE_OVERRIDES = {
321+
bigframes.dtypes._BIGFRAMES_TO_ARROW[bf_dtype]: arrow_type
322+
for bf_dtype, arrow_type in _MANAGED_STORAGE_TYPES_OVERRIDES.items()
323+
if bf_dtype in bigframes.dtypes._BIGFRAMES_TO_ARROW
324+
}
325+
326+
327+
@_recursive_map_types
328+
def _physical_type_replacements(dtype: pa.DataType) -> pa.DataType:
329+
if dtype in _ARROW_MANAGED_STORAGE_OVERRIDES:
330+
return _ARROW_MANAGED_STORAGE_OVERRIDES[dtype]
331+
return dtype

Diff for: bigframes/core/nodes.py

+2-3
Original file line numberDiff line numberDiff line change
@@ -654,7 +654,6 @@ class GbqTable:
654654
dataset_id: str = dataclasses.field()
655655
table_id: str = dataclasses.field()
656656
physical_schema: Tuple[bq.SchemaField, ...] = dataclasses.field()
657-
n_rows: int = dataclasses.field()
658657
is_physically_stored: bool = dataclasses.field()
659658
cluster_cols: typing.Optional[Tuple[str, ...]]
660659

@@ -670,7 +669,6 @@ def from_table(table: bq.Table, columns: Sequence[str] = ()) -> GbqTable:
670669
dataset_id=table.dataset_id,
671670
table_id=table.table_id,
672671
physical_schema=schema,
673-
n_rows=table.num_rows,
674672
is_physically_stored=(table.table_type in ["TABLE", "MATERIALIZED_VIEW"]),
675673
cluster_cols=None
676674
if table.clustering_fields is None
@@ -696,6 +694,7 @@ class BigqueryDataSource:
696694
# Added for backwards compatibility, not validated
697695
sql_predicate: typing.Optional[str] = None
698696
ordering: typing.Optional[orderings.RowOrdering] = None
697+
n_rows: Optional[int] = None
699698

700699

701700
## Put ordering in here or just add order_by node above?
@@ -773,7 +772,7 @@ def variables_introduced(self) -> int:
773772
@property
774773
def row_count(self) -> typing.Optional[int]:
775774
if self.source.sql_predicate is None and self.source.table.is_physically_stored:
776-
return self.source.table.n_rows
775+
return self.source.n_rows
777776
return None
778777

779778
@property

Diff for: bigframes/core/schema.py

+6-2
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,13 @@ def dtypes(self) -> typing.Tuple[bigframes.dtypes.Dtype, ...]:
6767
def _mapping(self) -> typing.Dict[ColumnIdentifierType, bigframes.dtypes.Dtype]:
6868
return {item.column: item.dtype for item in self.items}
6969

70-
def to_bigquery(self) -> typing.Tuple[google.cloud.bigquery.SchemaField, ...]:
70+
def to_bigquery(
71+
self, overrides: dict[bigframes.dtypes.Dtype, str] = {}
72+
) -> typing.Tuple[google.cloud.bigquery.SchemaField, ...]:
7173
return tuple(
72-
bigframes.dtypes.convert_to_schema_field(item.column, item.dtype)
74+
bigframes.dtypes.convert_to_schema_field(
75+
item.column, item.dtype, overrides=overrides
76+
)
7377
for item in self.items
7478
)
7579

0 commit comments

Comments
 (0)