Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: IcebergIO - Write performance issues #32746

Open
3 of 17 tasks
DanielMorales9 opened this issue Oct 10, 2024 · 23 comments
Open
3 of 17 tasks

[Bug]: IcebergIO - Write performance issues #32746

DanielMorales9 opened this issue Oct 10, 2024 · 23 comments
Assignees
Labels
bug dataflow IcebergIO IcebergIO: can only be used through ManagedIO io java P2

Comments

@DanielMorales9
Copy link

DanielMorales9 commented Oct 10, 2024

What happened?

I am trying to stream data from PubSub (with a throughput of 10-50 RPS) into an Iceberg Table (not partitioned) using the IcebergIO connector and Hive Metastore.

However, after some time, I see warning logs in the console like below:

...
Operation ongoing in bundle for at least 40m00s without completing Current user step name: Managed.ManagedTransform/ManagedSchemaTransformProvider.ManagedSchemaTransform/IcebergWriteSchemaTransformProvider.IcebergWriteSchemaTransform/IcebergIO.WriteRows/Write Rows 
Operation ongoing in bundle for at least 50m00s without completing Current user step name: Managed.ManagedTransform/ManagedSchemaTransformProvider.ManagedSchemaTransform/IcebergWriteSchemaTransformProvider.IcebergWriteSchemaTransform/IcebergIO.WriteRows/Write Rows 
Operation ongoing in bundle for at least 01h00m00s without completing Current user step name: Managed.ManagedTransform/ManagedSchemaTransformProvider.ManagedSchemaTransform/IcebergWriteSchemaTransformProvider.IcebergWriteSchemaTransform/IcebergIO.WriteRows/Write Rows
Operation ongoing in bundle for at least 01h10m00s without completing Current user step name: Managed.ManagedTransform/ManagedSchemaTransformProvider.ManagedSchemaTransform/IcebergWriteSchemaTransformProvider.IcebergWriteSchemaTransform/IcebergIO.WriteRows/Write Rows
...

From the thread dump (see threadump.txt) it looks like a significant number of DataflowWorkUnits threads (287) are in the TIMED_WAITING state, specifically waiting for a Hive client from the pool.

java.base@11.0.20/java.lang.Object.wait(Native Method)
app//org.apache.iceberg.ClientPoolImpl.get(ClientPoolImpl.java:133)

However, I find it surprising that all these threads are attempting to create a new writer each time, which results in the concurrent reload of the table we saw above. Is this behavior expected? I suspect that the performance issues stem from the RecordWriterManager class not being thread-safe. Specifically, it appears that a race condition is occurring in this code snippet due to the check-then-act logic:

private RecordWriter fetchWriterForPartition(PartitionKey partitionKey) {
RecordWriter recordWriter = writers.getIfPresent(partitionKey);
if (recordWriter == null || recordWriter.bytesWritten() > maxFileSize) {
// calling invalidate for a non-existent key is a safe operation
writers.invalidate(partitionKey);
recordWriter = createWriter(partitionKey);
writers.put(partitionKey, recordWriter);
}
return recordWriter;
}

Indeed, when I stop the job I am prompted with the error message: the evidence that too many writers got created.

java.lang.IllegalArgumentException: Expected all data writers to be closed, but found 8 data writer(s) still open
	at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:191)
	at org.apache.beam.sdk.io.iceberg.RecordWriterManager.close(RecordWriterManager.java:279)
	at org.apache.beam.sdk.io.iceberg.WriteUngroupedRowsToFiles$WriteUngroupedRowsToFilesDoFn.finishBundle(WriteUngroupedRowsToFiles.java:241)

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@DanielMorales9 DanielMorales9 changed the title [Bug]: IceberIO - Write performance issues [Bug]: IcebergIO - Write performance issues Oct 10, 2024
@liferoad liferoad added IcebergIO IcebergIO: can only be used through ManagedIO and removed awaiting triage labels Oct 11, 2024
@ahmedabu98
Copy link
Contributor

Hmmm, for streaming mode there should be one RecordWriterManager per bundle so I don't think there should be multiple threads trying to access one instance.

But it is weird. If you're writing to one destination with no partitions, there should be exactly one writer per bundle at any given time. Perhaps old writers are not closing properly? Can you check if you have any logs for "Encountered an error when closing data writer..." (line 118)?

@ahmedabu98
Copy link
Contributor

ahmedabu98 commented Oct 11, 2024

P.S. are you running at Beam HEAD? We recently merged a change that adds a static table cache (#32686), so it should be loaded only once per RecordWriterManager

@ahmedabu98
Copy link
Contributor

Currently running a pipeline to try to repro your error:

    Map<String, Object> config =
            ImmutableMap.<String, Object>builder()
                    .put("table", table)
                    .put("catalog_name", "test")
                    .put("catalog_properties",
                            ImmutableMap.<String, String>builder()
                                    .put("warehouse", warehouse)
                                    .put("gcp_project", "apache-beam-testing")
                                    .put("gcp_location", "us-central1")
                                    .put("catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog")
                                    .build())
                    .put("triggering_frequency_seconds", 5)
                    .build();

    Schema taxiSchema =
            Schema.builder()
                    .addStringField("ride_id")
                    .addInt32Field("point_idx")
                    .addDoubleField("latitude")
                    .addDoubleField("longitude")
                    .addStringField("timestamp")
                    .addDoubleField("meter_reading")
                    .addDoubleField("meter_increment")
                    .addStringField("ride_status")
                    .addInt32Field("passenger_count")
                    .build();

    Pipeline q = Pipeline.create(options);
    q
            .apply(PubsubIO.readStrings().fromTopic("projects/pubsub-public-data/topics/taxirides-realtime"))
            .apply(JsonToRow.withSchema(taxiSchema))
            .apply(Managed.write(Managed.ICEBERG).withConfig(config));
    q.run();

Will let it run for some time. The throughput is sitting at ~2k rows per second.

@DanielMorales9
Copy link
Author

DanielMorales9 commented Oct 11, 2024

Hmmm, for streaming mode there should be one RecordWriterManager per bundle so I don't think there should be multiple threads trying to access one instance.

Strange, because I can see threads writing multiple files with few rows.
Screenshot 2024-10-11 at 11 29 57

Can you check if you have any logs for "Encountered an error when closing data writer..." (line 118)?

I do not have any errors when closing writers.

P.S. are you running at Beam HEAD? We recently merged a change that adds a static table cache (#32686), so it should be loaded only once per RecordWriterManager

I am running version 2.59.0.

@ahmedabu98
Copy link
Contributor

There's been quite a bit of improvements recently (see here). They should be available for 2.60.0 -- all except the two most recent PRs (#32686, #32688).

Can you try running the same pipeline against HEAD or the latest snapshot?

@DanielMorales9
Copy link
Author

I noticed, however, that I have not provided the triggering frequency. I will check if it changes something and let you know.

I also found that the writer hangs indefinitely while trying to update the manifest file, but not timing out. Not sure why is that🤔

Operation ongoing in bundle for at least 02h10m00s without completing
Current user step name: Managed.ManagedTransform/ManagedSchemaTransformProvider.ManagedSchemaTransform/IcebergWriteSchemaTransformProvider.IcebergWriteSchemaTransform/IcebergIO.WriteRows/Write Rows to Destinations/AppendFilesToTables/Append metadata updates to tables
Time spent in this step(millis): 1728598024092
Processing times in each step(millis)
Step name: Managed.ManagedTransform/ManagedSchemaTransformProvider.ManagedSchemaTransform/IcebergWriteSchemaTransformProvider.IcebergWriteSchemaTransform/MapElements/Map
Time spent in this step: IntSummaryStatistics{count=90, sum=124, min=0, average=1.377778, max=53}
Step name: Managed.ManagedTransform/ManagedSchemaTransformProvider.ManagedSchemaTransform/IcebergWriteSchemaTransformProvider.IcebergWriteSchemaTransform/IcebergIO.WriteRows/Write Rows to Destinations/AppendFilesToTables/Append metadata updates to tables
Time spent in this step: IntSummaryStatistics{count=90, sum=17162211, min=34058, average=190691.233333, max=996066}
Step name: Managed.ManagedTransform/ManagedSchemaTransformProvider.ManagedSchemaTransform/IcebergWriteSchemaTransformProvider.IcebergWriteSchemaTransform/IcebergIO.WriteRows/Write Rows to Destinations/AppendFilesToTables/Group metadata updates by table/ReadStream
Time spent in this step: IntSummaryStatistics{count=942, sum=1470, min=0, average=1.560510, max=234}
  at java.base@11.0.20/java.net.SocketInputStream.socketRead0(Native Method)
  at java.base@11.0.20/java.net.SocketInputStream.socketRead(SocketInputStream.java:115)
  at java.base@11.0.20/java.net.SocketInputStream.read(SocketInputStream.java:168)
  at java.base@11.0.20/java.net.SocketInputStream.read(SocketInputStream.java:140)
  at java.base@11.0.20/sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:484)
  at java.base@11.0.20/sun.security.ssl.SSLSocketInputRecord.readHeader(SSLSocketInputRecord.java:478)
  at java.base@11.0.20/sun.security.ssl.SSLSocketInputRecord.bytesInCompletePacket(SSLSocketInputRecord.java:70)
  at java.base@11.0.20/sun.security.ssl.SSLSocketImpl.readApplicationRecord(SSLSocketImpl.java:1459)
  at java.base@11.0.20/sun.security.ssl.SSLSocketImpl$AppInputStream.read(SSLSocketImpl.java:1070)
  at java.base@11.0.20/java.io.BufferedInputStream.fill(BufferedInputStream.java:252)
  at java.base@11.0.20/java.io.BufferedInputStream.read1(BufferedInputStream.java:292)
  at java.base@11.0.20/java.io.BufferedInputStream.read(BufferedInputStream.java:351)
  at java.base@11.0.20/sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:789)
  at java.base@11.0.20/sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:724)
  at java.base@11.0.20/sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1615)
  at java.base@11.0.20/sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1520)
  at java.base@11.0.20/java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:527)
  at java.base@11.0.20/sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:334)
  at app//com.google.api.client.http.javanet.NetHttpResponse.<init>(NetHttpResponse.java:36)
  at app//com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:152)
  at app//com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:84)
  at app//com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1012)
  at app//com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:525)
  at app//com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:466)
  at app//com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeMedia(AbstractGoogleClientRequest.java:490)
  at app//com.google.api.services.storage.Storage$Objects$Get.executeMedia(Storage.java:6522)
  at app//com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.openStream(GoogleCloudStorageReadChannel.java:933)
  at app//com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.openContentChannel(GoogleCloudStorageReadChannel.java:724)
  at app//com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.performLazySeek(GoogleCloudStorageReadChannel.java:715)
  at app//com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.read(GoogleCloudStorageReadChannel.java:307)
  at app//com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.read(GoogleHadoopFSInputStream.java:142)
  at java.base@11.0.20/java.io.DataInputStream.read(DataInputStream.java:149)
  at app//org.apache.iceberg.hadoop.HadoopStreams$HadoopSeekableInputStream.read(HadoopStreams.java:123)
  at app//org.apache.iceberg.avro.AvroIO$AvroInputStreamAdapter.read(AvroIO.java:117)
  at app//org.apache.avro.file.DataFileReader.openReader(DataFileReader.java:65)
  at app//org.apache.iceberg.avro.AvroIterable.newFileReader(AvroIterable.java:100)
  at app//org.apache.iceberg.avro.AvroIterable.iterator(AvroIterable.java:76)
  at app//org.apache.iceberg.io.CloseableIterable$7$1.<init>(CloseableIterable.java:188)
  at app//org.apache.iceberg.io.CloseableIterable$7.iterator(CloseableIterable.java:187)
  at app//org.apache.iceberg.ManifestFiles.copyManifestInternal(ManifestFiles.java:312)
  at app//org.apache.iceberg.ManifestFiles.copyAppendManifest(ManifestFiles.java:264)
  at app//org.apache.iceberg.MergingSnapshotProducer.copyManifest(MergingSnapshotProducer.java:288)
  at app//org.apache.iceberg.MergingSnapshotProducer.add(MergingSnapshotProducer.java:279)
  at app//org.apache.iceberg.MergeAppend.appendManifest(MergeAppend.java:68)
  at app//org.apache.beam.sdk.io.iceberg.AppendFilesToTables$AppendFilesToTablesDoFn.processElement(AppendFilesToTables.java:93)
  at app//org.apache.beam.sdk.io.iceberg.AppendFilesToTables$AppendFilesToTablesDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)

@DanielMorales9
Copy link
Author

There's been quite a bit of improvements recently (see here). They should be available for 2.60.0 -- all except the two most recent PRs (#32686, #32688).

Can you try running the same pipeline against HEAD or the latest snapshot?

Sure thing, will do that!

@ahmedabu98
Copy link
Contributor

I noticed, however, that I have not provided the triggering frequency. I will check if it changes something and let you know.

Yep the triggering frequency was one of the recently added features. Streaming writes before 2.60.0 may work in some cases but its rather unpredictable.

I also found that the writer hangs indefinitely while trying to update the manifest file, but not timing out. Not sure why is that🤔

This is also one of the improvements -- we drastically reduced the number of manifest files we write. From my experience, the more manifest files we have, the longer it takes to update the next one

Let me know how the new pipeline goes!

@DanielMorales9
Copy link
Author

DanielMorales9 commented Oct 11, 2024

I used 2.60.0-SNAPSHOT and a triggering frequency of 60s, but after some time I see the errors again:

  at java.base@11.0.20/jdk.internal.misc.Unsafe.park(Native Method)
  at java.base@11.0.20/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
  at java.base@11.0.20/java.util.concurrent.FutureTask.awaitDone(FutureTask.java:447)
  at java.base@11.0.20/java.util.concurrent.FutureTask.get(FutureTask.java:190)
  at app//com.google.cloud.hadoop.util.BaseAbstractGoogleAsyncWriteChannel.waitForCompletionAndThrowIfUploadFailed(BaseAbstractGoogleAsyncWriteChannel.java:247)
  at app//com.google.cloud.hadoop.util.BaseAbstractGoogleAsyncWriteChannel.close(BaseAbstractGoogleAsyncWriteChannel.java:168)
  at java.base@11.0.20/java.nio.channels.Channels$1.close(Channels.java:177)
  at java.base@11.0.20/java.io.FilterOutputStream.close(FilterOutputStream.java:188)
  at app//com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.close(GoogleHadoopOutputStream.java:119)
  at app//org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
  at app//org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101)
  at app//org.apache.iceberg.hadoop.HadoopStreams$HadoopPositionOutputStream.close(HadoopStreams.java:188)
  at java.base@11.0.20/java.io.FilterOutputStream.close(FilterOutputStream.java:188)
  at java.base@11.0.20/java.io.FilterOutputStream.close(FilterOutputStream.java:188)
  at app//org.apache.avro.file.DataFileWriter.close(DataFileWriter.java:461)
  at app//org.apache.iceberg.avro.AvroFileAppender.close(AvroFileAppender.java:94)
  at app//org.apache.iceberg.ManifestWriter.close(ManifestWriter.java:213)
  at app//org.apache.iceberg.ManifestFiles.copyManifestInternal(ManifestFiles.java:337)
  at app//org.apache.iceberg.ManifestFiles.copyAppendManifest(ManifestFiles.java:264)
  at app//org.apache.iceberg.MergingSnapshotProducer.copyManifest(MergingSnapshotProducer.java:288)
  at app//org.apache.iceberg.MergingSnapshotProducer.add(MergingSnapshotProducer.java:279)
  at app//org.apache.iceberg.MergeAppend.appendManifest(MergeAppend.java:68)
  at app//org.apache.beam.sdk.io.iceberg.AppendFilesToTables$AppendFilesToTablesDoFn.processElement(AppendFilesToTables.java:104)
  at app//org.apache.beam.sdk.io.iceberg.AppendFilesToTables$AppendFilesToTablesDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)

However, it looks like it eventually succeeds and one snapshot is produced:
Screenshot 2024-10-11 at 12 58 50

@DanielMorales9
Copy link
Author

Not sure if it can be of any help, but I am using the following gcs-connector version.

<dependency>
    <groupId>com.google.cloud.bigdataoss</groupId>
    <artifactId>gcs-connector</artifactId>
    <version>hadoop3-2.2.11</version>
</dependency>

@ahmedabu98
Copy link
Contributor

We're using version hadoop2-2.2.16 (I'm not familiar if there's any performance differences, but may be worth trying it)

I canceled my previous repro attempt because it was healthy for 3+ hours, and attempting another run now with higher throughput and 60s triggering frequency.

Any chance you can provide a repro?

@ahmedabu98
Copy link
Contributor

Ahh I'm seeing the error now

java.lang.IllegalArgumentException: Expected all data writers to be closed, but found 1 data writer(s) still open
	at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:191)
	at org.apache.beam.sdk.io.iceberg.RecordWriterManager.close(RecordWriterManager.java:295)
	at org.apache.beam.sdk.io.iceberg.WriteGroupedRowsToFiles$WriteGroupedRowsToFilesDoFn.processElement(WriteGroupedRowsToFiles.java:109)

And also seeing the same number of these errors:

Full stacktrace
java.io.UncheckedIOException: Failed to create Parquet file
	at org.apache.iceberg.parquet.ParquetWriter.ensureWriterInitialized(ParquetWriter.java:121)
	at org.apache.iceberg.parquet.ParquetWriter.flushRowGroup(ParquetWriter.java:210)
	at org.apache.iceberg.parquet.ParquetWriter.close(ParquetWriter.java:254)
	at org.apache.iceberg.io.DataWriter.close(DataWriter.java:82)
	at org.apache.beam.sdk.io.iceberg.RecordWriter.close(RecordWriter.java:112)
	at org.apache.beam.sdk.io.iceberg.RecordWriterManager$DestinationState.lambda$new$0(RecordWriterManager.java:114)
	at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.processPendingNotifications(LocalCache.java:1850)
	at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.runUnlockedCleanup(LocalCache.java:3503)
	at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.postWriteCleanup(LocalCache.java:3479)
	at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.remove(LocalCache.java:3108)
	at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.remove(LocalCache.java:4305)
	at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalManualCache.invalidate(LocalCache.java:4950)
	at org.apache.beam.sdk.io.iceberg.RecordWriterManager$DestinationState.fetchWriterForPartition(RecordWriterManager.java:156)
	at org.apache.beam.sdk.io.iceberg.RecordWriterManager$DestinationState.write(RecordWriterManager.java:141)
	at org.apache.beam.sdk.io.iceberg.RecordWriterManager.write(RecordWriterManager.java:270)
	at org.apache.beam.sdk.io.iceberg.WriteGroupedRowsToFiles$WriteGroupedRowsToFilesDoFn.processElement(WriteGroupedRowsToFiles.java:107)
	at org.apache.beam.sdk.io.iceberg.WriteGroupedRowsToFiles$WriteGroupedRowsToFilesDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:212)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
	at org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
	at org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
	at org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:54)
	at org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:276)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:86)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$OnTimerArgumentProvider.outputWindowedValue(SimpleDoFnRunner.java:890)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$OnTimerArgumentProvider.outputWithTimestamp(SimpleDoFnRunner.java:878)
	at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:98)
	at org.apache.beam.sdk.transforms.GroupIntoBatches$GroupIntoBatchesDoFn.flushBatch(GroupIntoBatches.java:660)
	at org.apache.beam.sdk.transforms.GroupIntoBatches$GroupIntoBatchesDoFn.onBufferingTimer(GroupIntoBatches.java:601)
	at org.apache.beam.sdk.transforms.GroupIntoBatches$GroupIntoBatchesDoFn$OnTimerInvoker$tsendOfBuffering$dHMtZW5kT2ZCdWZmZXJpbmc.invokeOnTimer(Unknown Source)
	at org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory$DoFnInvokerBase.invokeOnTimer(ByteBuddyDoFnInvokerFactory.java:242)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.onTimer(SimpleDoFnRunner.java:206)
	at org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processUserTimer(SimpleParDoFn.java:366)
	at org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$600(SimpleParDoFn.java:79)
	at org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$1.processTimer(SimpleParDoFn.java:454)
	at org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:483)
	at org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:358)
	at org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52)
	at org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:94)
	at org.apache.beam.runners.dataflow.worker.streaming.ComputationWorkExecutor.executeWork(ComputationWorkExecutor.java:78)
	at org.apache.beam.runners.dataflow.worker.windmill.work.processing.StreamingWorkScheduler.executeWork(StreamingWorkScheduler.java:382)
	at org.apache.beam.runners.dataflow.worker.windmill.work.processing.StreamingWorkScheduler.processWork(StreamingWorkScheduler.java:255)
	at org.apache.beam.runners.dataflow.worker.windmill.work.processing.StreamingWorkScheduler.lambda$scheduleWork$2(StreamingWorkScheduler.java:214)
	at org.apache.beam.runners.dataflow.worker.streaming.ExecutableWork.run(ExecutableWork.java:38)
	at org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.lambda$executeMonitorHeld$0(BoundedQueueExecutor.java:234)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.IOException: Failed to get result: java.io.IOException: Failed to get result: java.lang.OutOfMemoryError: GC overhead limit exceeded
	at com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.getFromFuture(GoogleCloudStorageFileSystem.java:911)
	at com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.create(GoogleCloudStorageFileSystem.java:293)
	at com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.createChannel(GoogleHadoopOutputStream.java:80)
	at com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.<init>(GoogleHadoopOutputStream.java:71)
	at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.lambda$create$3(GoogleHadoopFileSystemBase.java:650)
	at com.google.cloud.hadoop.fs.gcs.GhfsStorageStatistics.trackDuration(GhfsStorageStatistics.java:77)
	at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.create(GoogleHadoopFileSystemBase.java:624)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1073)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1054)
	at org.apache.parquet.hadoop.util.HadoopOutputFile.createOrOverwrite(HadoopOutputFile.java:81)
	at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:345)
	at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:323)
	at org.apache.iceberg.parquet.ParquetWriter.ensureWriterInitialized(ParquetWriter.java:111)
	... 48 more
Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Failed to get result: java.lang.OutOfMemoryError: GC overhead limit exceeded
	at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
	at com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.getFromFuture(GoogleCloudStorageFileSystem.java:905)
	... 60 more
Caused by: java.io.IOException: Failed to get result: java.lang.OutOfMemoryError: GC overhead limit exceeded
	at com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.getFromFuture(GoogleCloudStorageFileSystem.java:911)
	at com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.getFileInfoInternal(GoogleCloudStorageFileSystem.java:1115)
	at com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.lambda$create$1(GoogleCloudStorageFileSystem.java:287)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	... 3 more
Caused by: java.util.concurrent.ExecutionException: java.lang.OutOfMemoryError: GC overhead limit exceeded
	at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
	at com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.getFromFuture(GoogleCloudStorageFileSystem.java:905)
	... 6 more
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded

@ahmedabu98
Copy link
Contributor

ahmedabu98 commented Oct 11, 2024

Fortunately when this error happens, the bundle is retried so there is no data-loss. The pipeline also adapts well by auto-scaling to more workers:
image

Seeing the following root causes:

  • java.lang.OutOfMemoryError: GC overhead limit exceeded
  • java.lang.OutOfMemoryError: Java heap space

My guess is a higher triggering frequency means the worker has to store more records in memory when writing to files. We could also be doing something inefficient with memory -- will investigate further

@DanielMorales9
Copy link
Author

I updated the GCS-Connector to hadoop3-2.2.16 and I can see high latency warnings on all type of operations on the manifest files.

Screenshot 2024-10-11 at 14 41 03

Also, the request count on GCS is quite high compared to 20RPS on Pubsub.

Screenshot 2024-10-11 at 14 44 26

Data files are written but the consumer is slow on committing a snapshot and a new manifest list.

@ahmedabu98
Copy link
Contributor

The high latency warnings can be a little noisy -- they were added in in 2.2.16 (ref)

Also, the request count on GCS is quite high compared to 20RPS on Pubsub.

Agreed that's weird for just 20 rows/sec. Is this a project-based metric? Are you sure nothing else is writing to GCS at the same time?

@DanielMorales9
Copy link
Author

DanielMorales9 commented Oct 11, 2024

Is this a project-based metric? Are you sure nothing else is writing to GCS at the same time?

No, it's at the bucket level. I know that most of the traffic is coming from this job because of the spike.

I can see that the job creates too many small files from the metatada. It's an average of 5 records per file.

Screenshot 2024-10-11 at 16 21 06

@ahmedabu98
Copy link
Contributor

ahmedabu98 commented Oct 11, 2024

I believe my pipeline was suffering from too much parallelism -- the work was split over too many concurrent threads within a worker, each one creating its own writer and eating up the worker's memory. High parallelism would also explain the many small files. There's an activeIcebergWriters metric that roughly tracks how many open writers there are. Here's an example of way too many writers for only 1 worker:

image

I tried the following routes and was able to get around it (in order of most to least effective):

  • Adding .apply(Redistribute.<Row>arbitrarily().withNumBuckets(<N>)) before the write step, reducing the parallelism to N
  • Use the --numberOfWorkerHarnessThreads=N pipeline option, which sets an upper bound on the number of threads per worker
  • Use machine type with larger memory (--workerMarchineType=<type>). The default for streaming engine is n1-standard-2, so you may go for n1-standard-4 or n1-standard-8. This will be able to handle the larger parallelism but you may still end up with small files

P.S. we also have metrics that show the distribution (max, mean, min) of data file byte size (committedDataFileByteSize) and record count (committedDataFileRecordCount). These are good ways to measure the effectiveness of each option

@DanielMorales9
Copy link
Author

I dropped the table but it's the same situation, still 5 records per file, not sure how to control the number of files per bundle.
Do you respect the target file size as per spec?

@ahmedabu98
Copy link
Contributor

ahmedabu98 commented Oct 11, 2024

not sure how to control the number of files per bundle

It's always one data file per bundle. The idea is to control the number of (concurrent) bundles

Do you respect the target file size as per spec?

We have a fixed 512MB max file size (ref)

@DanielMorales9
Copy link
Author

DanielMorales9 commented Oct 11, 2024

Yes, I suffer the same parallelism problem:
Screenshot 2024-10-11 at 16 54 27

  • Adding .apply(Redistribute.arbitrarily().withNumBuckets()) before the write step, reducing the parallelism to N

Is it similar to the Spark repartition? Does it shuffle data? How will it work with autoscaling enabled?

  • Adding .apply(Redistribute.arbitrarily().withNumBuckets()) before the write step, reducing the parallelism to N
  • Use the --numberOfWorkerHarnessThreads=N pipeline option, which sets an upper bound on the number of threads per worker

Right now, I have autoscaling disabled an I will try to set N=2 and machineType=n1-standard-4.

@ahmedabu98
Copy link
Contributor

Hmmm, I'm seeing an old metric that we dropped (manifestFilesWritten). Can you try using beam version 2.61.0-SNAPSHOT?

Is it similar to the Spark repartition? Does it shuffle data?

Yes they're similar. The idea is to redistribute data across workers.

How will it work with autoscaling enabled?

This is hard to predict. In general autoscaling reacts to your backlog and throughput, and it may autoscale to more than the number of keys in your Redistribute.

Right now, I have autoscaling disabled an I will try to set N=2 and machineType=n1-standard-4

That's a good first step! Let me know how it goes -- honestly you may end up only needing the Redistribute

@DanielMorales9
Copy link
Author

DanielMorales9 commented Oct 11, 2024

Yep, It works! 🥳
I can see only two threads are writing now and a stable commit-interval distribution.

Screenshot 2024-10-11 at 18 25 20

However, I'm still uncertain about how Redistribute behaves when autoscaling is enabled in Dataflow. 🤔
I might need to run a load test 📈

My concern is that with a fixed number of buckets defined using withNumBuckets, autoscaling may cause inefficiencies. When autoscaling kicks in, if the number of workers exceeds the number of buckets numOfBuckets < numWorkers, many workers could remain idle, leading to underutilization. This creates a scenario where the pipeline isn't truly elastic, as it can't dynamically scale with fluctuations in data volume.

At the same time, it's not feasible to skip redistribution entirely, as seen from earlier attempts—the job becomes non-performant and, in some cases, indefinitely stuck without it.

In contrast, I would expect IcebergIO to support dynamic redistribution behavior of Iceberg with Spark, where partitions are automatically adjusted based on target file sizes or other heuristics (e.g., the default 512MB). Such dynamic repartitioning ensures that as data volume grows or shrinks, the system can adjust on the fly to maintain efficiency.

@ahmedabu98
Copy link
Contributor

Yep, It works! 🥳

Great stuff! Glad to see it getting off the ground :)

I would expect IcebergIO to support dynamic redistribution

Yep that's very valid. I was hoping #32612 would take care of this, but looks like we're not quite there yet.

In the meantime, for your concern about idle workers, you can always set an upper bound on autoscaling with the --maxNumWorkers=N pipeline option

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug dataflow IcebergIO IcebergIO: can only be used through ManagedIO io java P2
Projects
None yet
Development

No branches or pull requests

3 participants