Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Simplified code to generate parquet files for tests #883

Merged
merged 2 commits into from
Mar 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 55 additions & 54 deletions parquet_integration/write_parquet.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Tuple

import pyarrow as pa
import pyarrow.parquet
import os
Expand All @@ -6,7 +8,7 @@
PYARROW_PATH = "fixtures/pyarrow3"


def case_basic_nullable(size=1):
def case_basic_nullable() -> Tuple[dict, pa.Schema, str]:
int64 = [0, 1, None, 3, None, 5, 6, 7, None, 9]
float64 = [0.0, 1.0, None, 3.0, None, 5.0, 6.0, 7.0, None, 9.0]
string = ["Hello", None, "aa", "", None, "abc", None, None, "def", "aaa"]
Expand Down Expand Up @@ -38,27 +40,27 @@ def case_basic_nullable(size=1):

return (
{
"int64": int64 * size,
"float64": float64 * size,
"string": string * size,
"bool": boolean * size,
"date": int64 * size,
"uint32": int64 * size,
"string_large": string_large * size,
"decimal_9": decimal * size,
"decimal_18": decimal * size,
"decimal_26": decimal * size,
"timestamp_us": int64 * size,
"timestamp_s": int64 * size,
"emoji": emoji * size,
"timestamp_s_utc": int64 * size,
"int64": int64,
"float64": float64,
"string": string,
"bool": boolean,
"date": int64,
"uint32": int64,
"string_large": string_large,
"decimal_9": decimal,
"decimal_18": decimal,
"decimal_26": decimal,
"timestamp_us": int64,
"timestamp_s": int64,
"emoji": emoji,
"timestamp_s_utc": int64,
},
schema,
f"basic_nullable_{size*10}.parquet",
f"basic_nullable_10.parquet",
)


def case_basic_required(size=1):
def case_basic_required() -> Tuple[dict, pa.Schema, str]:
int64 = [-256, -1, 0, 1, 2, 3, 4, 5, 6, 7]
uint32 = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
float64 = [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
Expand Down Expand Up @@ -87,22 +89,22 @@ def case_basic_required(size=1):

return (
{
"int64": int64 * size,
"float64": float64 * size,
"string": string * size,
"bool": boolean * size,
"date": int64 * size,
"uint32": uint32 * size,
"decimal_9": decimal * size,
"decimal_18": decimal * size,
"decimal_26": decimal * size,
"int64": int64,
"float64": float64,
"string": string,
"bool": boolean,
"date": int64,
"uint32": uint32,
"decimal_9": decimal,
"decimal_18": decimal,
"decimal_26": decimal,
},
schema,
f"basic_required_{size*10}.parquet",
f"basic_required_10.parquet",
)


def case_nested(size):
def case_nested() -> Tuple[dict, pa.Schema, str]:
items_nullable = [[0, 1], None, [2, None, 3], [4, 5, 6], [], [7, 8, 9], None, [10]]
items_required = [[0, 1], None, [2, 0, 3], [4, 5, 6], [], [7, 8, 9], None, [10]]
all_required = [[0, 1], [], [2, 0, 3], [4, 5, 6], [], [7, 8, 9], [], [10]]
Expand Down Expand Up @@ -178,23 +180,23 @@ def case_nested(size):
schema = pa.schema(fields)
return (
{
"list_int64": items_nullable * size,
"list_int64_required": items_required * size,
"list_int64_required_required": all_required * size,
"list_int16": i16 * size,
"list_bool": boolean * size,
"list_utf8": string * size,
"list_large_binary": string * size,
"list_nested_i64": items_nested * size,
"list_nested_inner_required_i64": items_required_nested * size,
"list_nested_inner_required_required_i64": items_required_nested_2 * size,
"list_int64": items_nullable,
"list_int64_required": items_required,
"list_int64_required_required": all_required,
"list_int16": i16,
"list_bool": boolean,
"list_utf8": string,
"list_large_binary": string,
"list_nested_i64": items_nested,
"list_nested_inner_required_i64": items_required_nested,
"list_nested_inner_required_required_i64": items_required_nested_2,
},
schema,
f"nested_nullable_{size*10}.parquet",
f"nested_nullable_10.parquet",
)


def case_struct(size):
def case_struct() -> Tuple[dict, pa.Schema, str]:
string = ["Hello", None, "aa", "", None, "abc", None, None, "def", "aaa"]
boolean = [True, None, False, False, None, True, None, None, True, True]
struct_fields = [
Expand All @@ -220,31 +222,30 @@ def case_struct(size):
)

struct = pa.StructArray.from_arrays(
[pa.array(string * size), pa.array(boolean * size)],
[pa.array(string), pa.array(boolean)],
fields=struct_fields,
)
return (
{
"struct": struct,
"struct_struct": pa.StructArray.from_arrays(
[struct, pa.array(boolean * size)],
[struct, pa.array(boolean)],
names=["f1", "f2"],
),
},
schema,
f"struct_nullable_{size*10}.parquet",
f"struct_nullable_10.parquet",
)


def write_pyarrow(
case,
size: int,
page_version: int,
use_dictionary: bool,
multiple_pages: bool,
compression: str,
):
data, schema, path = case(size)
data, schema, path = case

base_path = f"{PYARROW_PATH}/v{page_version}"
if use_dictionary:
Expand Down Expand Up @@ -279,20 +280,20 @@ def write_pyarrow(
for version in [1, 2]:
for use_dict in [True, False]:
for compression in ["lz4", None, "snappy"]:
write_pyarrow(case, 1, version, use_dict, False, compression)
write_pyarrow(case(), version, use_dict, False, compression)


def case_benches(size):
assert size % 8 == 0
data, schema, _ = case_basic_nullable(1)
data, schema, _ = case_basic_nullable()
for k in data:
data[k] = data[k][:8] * (size // 8)
return data, schema, f"benches_{size}.parquet"


def case_benches_required(size):
assert size % 8 == 0
data, schema, _ = case_basic_required(1)
data, schema, _ = case_basic_required()
for k in data:
data[k] = data[k][:8] * (size // 8)
return data, schema, f"benches_required_{size}.parquet"
Expand All @@ -301,14 +302,14 @@ def case_benches_required(size):
# for read benchmarks
for i in range(10, 22, 2):
# two pages (dict)
write_pyarrow(case_benches, 2 ** i, 1, True, False, None)
write_pyarrow(case_benches(2 ** i), 1, True, False, None)
# single page
write_pyarrow(case_benches, 2 ** i, 1, False, False, None)
write_pyarrow(case_benches(2 ** i), 1, False, False, None)
# single page required
write_pyarrow(case_benches_required, 2 ** i, 1, False, False, None)
write_pyarrow(case_benches_required(2 ** i), 1, False, False, None)
# multiple pages
write_pyarrow(case_benches, 2 ** i, 1, False, True, None)
write_pyarrow(case_benches(2 ** i), 1, False, True, None)
# multiple compressed pages
write_pyarrow(case_benches, 2 ** i, 1, False, True, "snappy")
write_pyarrow(case_benches(2 ** i), 1, False, True, "snappy")
# single compressed page
write_pyarrow(case_benches, 2 ** i, 1, False, False, "snappy")
write_pyarrow(case_benches(2 ** i), 1, False, False, "snappy")
Loading