Skip to content

Issue using PySpark writeStream to write to Pub/Sub Lite with an attributes field #261

@anguillanneuf

Description

@anguillanneuf

Despite getting the datatypes to match exactly as described in the README, I'm not able to use writeStream to write to Pub/Sub Lite with the attributes field.

Screen Shot 2021-09-07 at 5 43 12 PM

Steps to reproduce

  1. Run this command.
    gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite.py \
      --region=$DATAPROC_REGION \
      --cluster=$CLUSTER_NAME \
      --jars=https://search.maven.org/remotecontent?filepath=com/google/cloud/pubsublite-spark-sql-streaming/0.3.1/pubsublite-spark-sql-streaming-0.3.1-with-dependencies.jar \
      --driver-log-levels=root=INFO \
      --properties=spark.master=yarn
    

Code example

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import array, create_map, lit, udf
    from pyspark.sql.types import BinaryType, StringType

    # TODO(developer):
    # project_number = 11223344556677
    # location = "us-central1-a"
    # topic_id = "your-topic-id"

    spark = SparkSession.builder.appName("poc").master("yarn").getOrCreate()

    # RateStreamSource is a streaming source that generates consecutive
    # numbers with timestamp that can be useful for testing and PoCs.
    sdf = spark.readStream.format("rate").option("rowsPerSecond", 1).load()

    # root
    # |-- timestamp: timestamp (nullable = true)
    # |-- value: long (nullable = true)

    divisible_by_two_udf = udf(lambda z: "even" if str(z)[-1] % 2 == 0 else "odd")

    long_to_bytestring_udf = udf(lambda n: str(n))

    sdf = sdf.withColumn(
        "key", (sdf.value % 5).cast(StringType()).cast(BinaryType())
    ).withColumn(
        "event_timestamp", sdf.timestamp
    ).withColumn(
        "data", long_to_bytestring_udf("value").cast(BinaryType())
    ).withColumn(
        "attributes", create_map(
            lit("prop1"), array(divisible_by_two_udf("value").cast(BinaryType()))).cast(MapType(StringType(), ArrayType(BinaryType()), True))
    ).drop(
        "value", "timestamp"
    )

    sdf.printSchema()
    # root
    # |-- key: binary (nullable = true)
    # |-- event_timestamp: timestamp (nullable = true)
    # |-- data: binary (nullable = true)
    # |-- attributes: map (nullable = false)
    # |    |-- key: string
    # |    |-- value: array (valueContainsNull = true)
    # |    |    |-- element: binary (containsNull = true)

    query = (
        sdf.writeStream.format("pubsublite")
        .option(
            "pubsublite.topic",
            f"projects/{project_number}/locations/{location}/topics/{topic_id}",
        )
        .option("checkpointLocation", "/tmp/app")
        .outputMode("append")
        .trigger(processingTime="1 second")
        .start()
    )

    query.awaitTermination(60)
    query.stop()

Stack trace

pyspark.sql.utils.StreamingQueryException: 'Column attributes in input schema to write to Pub/Sub Lite
has a wrong DataType. Actual: MapType(StringType,ArrayType(BinaryType,true),true),
expected: MapType(StringType,ArrayType(BinaryType,true),true).

Metadata

Metadata

Labels

api: pubsubliteIssues related to the googleapis/java-pubsublite-spark API.type: feature request‘Nice-to-have’ improvement, new feature or different behavior or design.

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions