Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incorrect order of row groups when reading #551

Closed
hig-dev opened this issue Apr 27, 2020 · 2 comments
Closed

Incorrect order of row groups when reading #551

hig-dev opened this issue Apr 27, 2020 · 2 comments

Comments

@hig-dev
Copy link

hig-dev commented Apr 27, 2020

My goal is to read the created dataset in the order in which I generated the rows. However if the row group size is set to a value lower than the total size of the dataset, the order when reading the dataset is wrong, despite setting shuffle_row_groups=False.

Please look at this demonstration of this problem. I would expect that the exception does not occur.

import pathlib
import numpy as np
from petastorm import make_reader
from petastorm.codecs import ScalarCodec

from petastorm.etl.dataset_metadata import materialize_dataset
from petastorm.unischema import Unischema, UnischemaField, dict_to_spark_row
from pyspark.sql import SparkSession
from pyspark.sql.types import LongType

output_directory = pathlib.Path('./_generated_demo_data')
output_url = output_directory.resolve().as_uri()

session_builder = SparkSession \
    .builder \
    .appName('Demo')

spark = session_builder.getOrCreate()
sc = spark.sparkContext

schema = Unischema('DemoSchema', [
        UnischemaField('timestamp', np.uint64, (), ScalarCodec(LongType()), False),
    ])

# Generate petastorm with timestamps in order
with materialize_dataset(spark, output_url, schema, row_group_size_mb=1):
    generator = enumerate(range(1000000))
    rows_dict_generator = map(lambda x: {'timestamp': x[0]}, generator)
    rows_spark_generator = map(lambda x: dict_to_spark_row(schema, x), rows_dict_generator)
    rows_rdd = sc.parallelize(rows_spark_generator)

    spark.createDataFrame(rows_rdd, schema.as_spark_schema()) \
        .coalesce(1) \
        .write \
        .mode('overwrite') \
        .parquet(output_url)

# Read generated petastorm and check timestamps ordering
last_timestamp = -float("inf")
with make_reader(output_url,
                 schema_fields=['timestamp'],
                 shuffle_row_groups=False) as reader:
    for row in reader:
        # ensure timestamp ordering or num_epochs handling
        if row.timestamp < last_timestamp:
            raise Exception('Timestamps in petastorm are not in order!')

        last_timestamp = row.timestamp

@selitvin
Copy link
Collaborator

This is likely to be a result of a race between multiple reader threads. Try passing make_reader(..., workers_count=1) - that should make reading order deterministic. Unfortunately, you are going to get lower throughput rate.

This problem can be properly mitigated by adding a reordering queue to petastorm implementation, but we do not have it right now.

@hig-dev
Copy link
Author

hig-dev commented May 6, 2020

Thanks for the tip. The workaround of setting workers_count=1 in make_reader did work.
I will let you decide, if you want to close this issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants