Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipe broken exception on write to GCS #103

Closed
KMax opened this issue Jun 13, 2018 · 10 comments
Closed

Pipe broken exception on write to GCS #103

KMax opened this issue Jun 13, 2018 · 10 comments

Comments

@KMax
Copy link

KMax commented Jun 13, 2018

I'm importing some data to Apache Accumulo which runs on top of Google Cloud Storage (as HDFS replacement). I use the GCS connector 1.8.1-hadoop2 and Accumulo runs in GCloud VMs.

I see the following exceptions in the logs quite frequently (the first - on GoogleHadoopOutputStream.write, the second - on GoogleHadoopOutputStream.close):

java.io.IOException: java.io.IOException: Pipe broken
		at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel.waitForCompletionAndThrowIfUploadFailed(AbstractGoogleAsyncWriteChannel.java:432)
		at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel.write(AbstractGoogleAsyncWriteChannel.java:256)
		at java.nio.channels.Channels.writeFullyImpl(Channels.java:78)
		at java.nio.channels.Channels.writeFully(Channels.java:101)
		at java.nio.channels.Channels.access$000(Channels.java:61)
		at java.nio.channels.Channels$1.write(Channels.java:174)
		at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
		at java.io.BufferedOutputStream.write(BufferedOutputStream.java:95)
		at com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.write(GoogleHadoopOutputStream.java:96)
		at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:50)
		at java.io.DataOutputStream.write(DataOutputStream.java:88)
		at java.io.DataOutputStream.writeByte(DataOutputStream.java:153)
		at org.apache.accumulo.tserver.logger.LogFileKey.write(LogFileKey.java:89)
		at org.apache.accumulo.tserver.log.DfsLogger.write(DfsLogger.java:616)
		at org.apache.accumulo.tserver.log.DfsLogger.logFileData(DfsLogger.java:633)
		at org.apache.accumulo.tserver.log.DfsLogger.logManyTablets(DfsLogger.java:673)
		at org.apache.accumulo.tserver.log.TabletServerLogger$7.write(TabletServerLogger.java:533)
		at org.apache.accumulo.tserver.log.TabletServerLogger.write(TabletServerLogger.java:420)
		at org.apache.accumulo.tserver.log.TabletServerLogger.write(TabletServerLogger.java:371)
		at org.apache.accumulo.tserver.log.TabletServerLogger.logManyTablets(TabletServerLogger.java:523)
		at org.apache.accumulo.tserver.TabletServer$ThriftClientHandler.flush(TabletServer.java:1030)
		at org.apache.accumulo.tserver.TabletServer$ThriftClientHandler.closeUpdate(TabletServer.java:1118)
		at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
		at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
		at java.lang.reflect.Method.invoke(Method.java:498)
		at org.apache.accumulo.core.trace.wrappers.RpcServerInvocationHandler.invoke(RpcServerInvocationHandler.java:46)
		at org.apache.accumulo.server.rpc.RpcWrapper$1.invoke(RpcWrapper.java:83)
		at com.sun.proxy.$Proxy17.closeUpdate(Unknown Source)
		at org.apache.accumulo.core.tabletserver.thrift.TabletClientService$Processor$closeUpdate.getResult(TabletClientService.java:2501)
		at org.apache.accumulo.core.tabletserver.thrift.TabletClientService$Processor$closeUpdate.getResult(TabletClientService.java:2485)
		at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
		at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
		at org.apache.accumulo.server.rpc.TimedProcessor.process(TimedProcessor.java:65)
		at org.apache.thrift.server.AbstractNonblockingServer$FrameBuffer.invoke(AbstractNonblockingServer.java:518)
		at org.apache.accumulo.server.rpc.CustomNonBlockingServer$CustomFrameBuffer.invoke(CustomNonBlockingServer.java:113)
		at org.apache.thrift.server.Invocation.run(Invocation.java:18)
		at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
		at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
		at org.apache.accumulo.fate.util.LoggingRunnable.run(LoggingRunnable.java:35)
		at java.lang.Thread.run(Thread.java:748)
	Caused by: java.io.IOException: Pipe broken
		at java.io.PipedInputStream.read(PipedInputStream.java:321)
		at java.io.PipedInputStream.read(PipedInputStream.java:377)
		at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.util.ByteStreams.read(ByteStreams.java:181)
		at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.setContentAndHeadersOnCurrentRequest(MediaHttpUploader.java:629)
		at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:409)
		at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:336)
		at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:427)
		at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
		at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
		at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:358)
		at java.util.concurrent.FutureTask.run(FutureTask.java:266)
		at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
		at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
		... 1 more
java.io.IOException: java.io.IOException: Pipe broken
		at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel.waitForCompletionAndThrowIfUploadFailed(AbstractGoogleAsyncWriteChannel.java:432)
		at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel.write(AbstractGoogleAsyncWriteChannel.java:256)
		at java.nio.channels.Channels.writeFullyImpl(Channels.java:78)
		at java.nio.channels.Channels.writeFully(Channels.java:101)
		at java.nio.channels.Channels.access$000(Channels.java:61)
		at java.nio.channels.Channels$1.write(Channels.java:174)
		at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
		at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
		at java.io.FilterOutputStream.close(FilterOutputStream.java:158)
		at com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.close(GoogleHadoopOutputStream.java:126)
		at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
		at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
		at org.apache.accumulo.tserver.log.DfsLogger.close(DfsLogger.java:592)
		at org.apache.accumulo.tserver.log.TabletServerLogger.close(TabletServerLogger.java:338)
		at org.apache.accumulo.tserver.log.TabletServerLogger.access$1000(TabletServerLogger.java:70)
		at org.apache.accumulo.tserver.log.TabletServerLogger$3.withWriteLock(TabletServerLogger.java:455)
		at org.apache.accumulo.tserver.log.TabletServerLogger.testLockAndRun(TabletServerLogger.java:137)
		at org.apache.accumulo.tserver.log.TabletServerLogger.write(TabletServerLogger.java:446)
		at org.apache.accumulo.tserver.log.TabletServerLogger.write(TabletServerLogger.java:371)
		at org.apache.accumulo.tserver.log.TabletServerLogger.logManyTablets(TabletServerLogger.java:523)
		at org.apache.accumulo.tserver.TabletServer$ThriftClientHandler.flush(TabletServer.java:1030)
		at org.apache.accumulo.tserver.TabletServer$ThriftClientHandler.closeUpdate(TabletServer.java:1118)
		at sun.reflect.GeneratedMethodAccessor14.invoke(Unknown Source)
		at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
		at java.lang.reflect.Method.invoke(Method.java:498)
		at org.apache.accumulo.core.trace.wrappers.RpcServerInvocationHandler.invoke(RpcServerInvocationHandler.java:46)
		at org.apache.accumulo.server.rpc.RpcWrapper$1.invoke(RpcWrapper.java:83)
		at com.sun.proxy.$Proxy17.closeUpdate(Unknown Source)
		at org.apache.accumulo.core.tabletserver.thrift.TabletClientService$Processor$closeUpdate.getResult(TabletClientService.java:2501)
		at org.apache.accumulo.core.tabletserver.thrift.TabletClientService$Processor$closeUpdate.getResult(TabletClientService.java:2485)
		at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
		at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
		at org.apache.accumulo.server.rpc.TimedProcessor.process(TimedProcessor.java:65)
		at org.apache.thrift.server.AbstractNonblockingServer$FrameBuffer.invoke(AbstractNonblockingServer.java:518)
		at org.apache.accumulo.server.rpc.CustomNonBlockingServer$CustomFrameBuffer.invoke(CustomNonBlockingServer.java:113)
		at org.apache.thrift.server.Invocation.run(Invocation.java:18)
		at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
		at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
		at org.apache.accumulo.fate.util.LoggingRunnable.run(LoggingRunnable.java:35)
		at java.lang.Thread.run(Thread.java:748)
		Suppressed: java.io.IOException: java.io.IOException: Pipe broken
			at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel.waitForCompletionAndThrowIfUploadFailed(AbstractGoogleAsyncWriteChannel.java:432)
			at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel.close(AbstractGoogleAsyncWriteChannel.java:287)
			at java.nio.channels.Channels$1.close(Channels.java:178)
			at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
			... 31 more
		Caused by: java.io.IOException: Pipe broken
			at java.io.PipedInputStream.read(PipedInputStream.java:321)
			at java.io.PipedInputStream.read(PipedInputStream.java:377)
			at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.util.ByteStreams.read(ByteStreams.java:181)
			at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.setContentAndHeadersOnCurrentRequest(MediaHttpUploader.java:629)
			at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:409)
			at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:336)
			at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:427)
			at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
			at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
			at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:358)
			at java.util.concurrent.FutureTask.run(FutureTask.java:266)
			at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
			at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
			... 1 more
		[CIRCULAR REFERENCE:java.io.IOException: Pipe broken]

Accumulo marks this exception with the ERROR level.

What could be the root cause? How to get more details about the exception (debug logs, etc.)? Thank you!

@medb
Copy link
Contributor

medb commented Jun 13, 2018

We have recently made improvements to handling network errors: 1115700

Yesterday we released it in 1.6 branch: https://github.com/GoogleCloudPlatform/bigdata-interop/releases/tag/v1.6.7

May you try to use GCS connector 1.6.7 and see if it solves the issue?

@KMax
Copy link
Author

KMax commented Jun 13, 2018

Hi @medb! Unfortunately, I see the same exceptions, but I found a related exception in another log:

accumulo-tserver_1  | Jun 13, 2018 7:31:38 PM com.google.api.client.http.HttpRequest execute
accumulo-tserver_1  | WARNING: exception thrown while executing request
accumulo-tserver_1  | java.io.IOException: insufficient data written
accumulo-tserver_1  | 	at sun.net.www.protocol.http.HttpURLConnection$StreamingOutputStream.close(HttpURLConnection.java:3558)
accumulo-tserver_1  | 	at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:81)
accumulo-tserver_1  | 	at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:972)
accumulo-tserver_1  | 	at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequestWithoutGZip(MediaHttpUploader.java:545)
accumulo-tserver_1  | 	at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequest(MediaHttpUploader.java:562)
accumulo-tserver_1  | 	at com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:419)
accumulo-tserver_1  | 	at com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:336)
accumulo-tserver_1  | 	at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:427)
accumulo-tserver_1  | 	at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
accumulo-tserver_1  | 	at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
accumulo-tserver_1  | 	at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:358)
accumulo-tserver_1  | 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
accumulo-tserver_1  | 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
accumulo-tserver_1  | 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
accumulo-tserver_1  | 	at java.lang.Thread.run(Thread.java:748)

@KMax
Copy link
Author

KMax commented Jun 13, 2018

Looks like googleapis/google-cloud-java#1018 is the related issue. But I'm not sure yet, how to deal with it.

@medb
Copy link
Contributor

medb commented Jun 13, 2018

This could be related to the Accumulo's write pattern.

If it writes a lot of small files at high QPS rate to GCS then it could cause GCS to drop connections which will manifest itself in "java.io.IOException: Pipe broken" exception in GCS connector.

Do you know what kind of objects/files, of what size and how often Accumulo writes to GCS?

@KMax
Copy link
Author

KMax commented Jun 13, 2018

In my current setup, it writes 1GB files each minute and deletes some files each several minutes.

Not sure about the write pattern yet, I'll need to figure it out.

BTW: I tried to set fs.gs.http.connect-timeout and fs.gs.http.read-timeout to 0, but it didn't help.

@medb
Copy link
Contributor

medb commented Jun 15, 2018

GCS Connector does not support Apache Accumulo because it geared toward high-throughput use case (reading/writing big objects continuously) which is not the use case of Apache Accumulo.

Also, Apache Accumulo has relevant thread where discussed GCS support and it ended with conclusion that GCS is not supported as backing storage for Apache Accumulo: apache/accumulo#428

@medb medb closed this as completed Jun 15, 2018
@KMax
Copy link
Author

KMax commented Jun 22, 2018

Although current issue and apache/accumulo#428 both are related to the GCS, they are different, because they occur in different situations.

it ended with conclusion that GCS is not supported

I don't think that Accumulo should support particular file systems (GCS, Azure, etc.). I believe, that it should be the other way around. The custom implementations need to make sure that they properly support the HDFS interface.

@medb
Copy link
Contributor

medb commented Jun 22, 2018

The problem is that GCS, Azure Blob Store and AWS S3 are not file systems, but object stores and Apache Accumulo written in mind with HDFS capabilities, which could not be fully supported by object stores.

GCS connector tries to mimic HDFS semantic, but because of object stores limitations it could not do so fully.

We need to take a look into Accumulo use case to determine if it possible to make it work with GCS, but because Accumulo is not supported now by GCS connector, it's not immediate action item for us.

@ctubbsii
Copy link

@medb The Apache Accumulo issue you referenced did not conclude that GCS wouldn't, or couldn't, be supported. That issue was closed because the question raised by the user about what was the explanation for the issue they were seeing, that question was answered.

The supported solution is to use a LogCloser configured on the user's class path for Accumulo which will handle closing logs on GCS. I don't know enough about GCS to know for sure, but it may be sufficient to trivially fork Accumulo's built-in HadoopLogCloser, and do nothing instead of throwing the IllegalStateException when the FileSystem is GCS (essentially, no attempt to do lease recovery, just like in the local file system case).

I do not think that the issue has anything to do with Accumulo's write pattern... as suggested here... at least, not if it's the same issue as the one you referenced. It's likely a simple matter of implementing an appropriate LogCloser.

@medb
Copy link
Contributor

medb commented Jun 25, 2018

Yes, it is currently not supported, that's why I created FR (#104) to add support for Accumulo.

For posterity, currently it looks like there are 2 issues:

  1. GCS drops connections due to expensive seek implementation in GCS connector
  2. Not compatible with GCS connector HadoopLogCloser implementation

@ctubbsii thank you for clarifying LogCloser issue and suggested solutions

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants