Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cloud strorage v2 API, writeObject missing header x-goog-request-params for client streaming call #2121

Closed
martin-traverse opened this issue Jul 17, 2023 · 9 comments
Labels
api: storage Issues related to the googleapis/java-storage API. priority: p2 Moderately-important priority. Fix may not be included in next release. type: question Request for information or clarification. Not an issue.

Comments

@martin-traverse
Copy link

Environment details

  1. API: Cloud storage for Java, using the new StorageClient against the v2 gRPC API
  2. Platform: Probably all (tested Windows 11, macOS & Ubuntu)
  3. Java version: Probably all (tested with 11 and 17)
  4. version(s): Using GCP libraries BOM 26.18.0

Steps to reproduce

The issue occurs using StorageClient writeObject as a client streaming call. Unary calls and server streaming calls do not appear to have this issue.

  1. Construct a StorageClient with some sensible options and test it by listing bucket contents - all OK.
  2. Set up a writeObject client streaming call
  3. Send an initial message on the stream - the error occurs even for a single message with object spec, data and finish_write = true
  4. NVALID_ARGUMENT: An x-goog-request-params request metadata property must be provided for this request.

Looking through the SDK code, I found that for unary calls this header gets set to provide the bucket name in contextWithBucketName()

Code example

    var apiCall = addMissingRequestParams(storageClient.writeObjectCallable());

    var gcsResponseStream = new GcpUnaryResponse<WriteObjectResponse>();  // my own listener
    this.gcsWriteStream = apiCall.clientStreamingCall(gcsResponseStream, callCtx);
    gcsResponseStream.getResult().whenComplete(this::writeCompleteHandler);  // my own listener

    var initialRequest = WriteObjectRequest.newBuilder()
            .setWriteObjectSpec(gcsObjectSpec)
            .build();

    gcsWriteStream.onNext(initialRequest);

The addMissingRequestParams() method is what I have done to work around the error, it looks like this:

private <TRequest, TResponse>
ClientStreamingCallable<TRequest, TResponse>
addMissingRequestParams(ClientStreamingCallable<TRequest, TResponse> callable) {

    // https://github.com/googleapis/java-storage/blob/main/google-cloud-storage/src/main/java/com/google/cloud/storage/WriteFlushStrategy.java#L89

    // GCP SDK adds in this required header
    // For some API calls / usage patterns, the header does not get added

    var callParams = String.format("bucket=%s", gcsObjectSpec.getResource().getBucket());
    var callMetadata = Map.of("x-goog-request-params", List.of(callParams));

    var defaultContext = GrpcCallContext.createDefault().withExtraHeaders(callMetadata);

    return callable.withDefaultCallContext(defaultContext);
}

With the addMissingRequestParams() method the call succeeds just fine. However, without it we get an error and the stack trace below. I'm guessing this is not the expected behavior and the storage SDK should add these in for client streaming calls the same way it does for the others.

Stack trace

at org.finos.tracdap.plugins.gcp.storage.GcpUnaryResponse.onError(GcpUnaryResponse.java:50)
at com.google.api.gax.tracing.TracedClientStreamingCallable$TracedResponseObserver.onError(TracedClientStreamingCallable.java:167)
at com.google.api.gax.grpc.GrpcExceptionTranslatingStreamObserver.onError(GrpcExceptionTranslatingStreamObserver.java:60)
at com.google.api.gax.grpc.ApiStreamObserverDelegate.onError(ApiStreamObserverDelegate.java:55)
at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:491)
at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
at com.google.api.gax.grpc.ChannelPool$ReleasingClientCall$1.onClose(ChannelPool.java:541)
at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:567)
at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:71)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:735)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:716)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: An x-goog-request-params request metadata property must be provided for this request.
at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:92)
at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:41)
at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:86)
at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:66)
... 21 more
Caused by: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: An x-goog-request-params request metadata property must be provided for this request.
at io.grpc.Status.asRuntimeException(Status.java:539)
... 19 more

Any additional information below

I appreciate these are new APIs and the old APIs are still the recommended ones. Still the new StorageClient on the v2 gRPC API is mostly working for me and we really need the client streaming semantics, so we've been watching for when it came avaialble. Hopefully this one is an easy fix!

@product-auto-label product-auto-label bot added the api: storage Issues related to the googleapis/java-storage API. label Jul 17, 2023
@BenWhitehead
Copy link
Collaborator

Unfortunately what you are running into is Working as Intended.

Reason for this: When a ClientStream is opened, there is no guarantee of a Request message which the BucketName can be sourced from. For a unary, or server stream a Request Message must be provided before the request is initiated.

Having spent a significant amount of time integrating with the new pre-release GCS gRPC Api, I strongly recommend against you using StorageClient directly. In fact, this API is still preview and does not yet receive GA level support.

If you would like to try out a preview implementation (all pre-GA disclaimers present) see this code snippet showing how to construct an instance that uses the gRPC transport, see here:

// Create an instance of options which will use the Google Cloud Storage gRPC API for all
// operations
StorageOptions options = StorageOptions.grpc().build();
// Instantiates a client in a try-with-resource to automatically cleanup underlying resources
try (Storage storage = options.getService()) {
// The name for the new bucket
String bucketName = args[0]; // "my-new-bucket";
// Creates the new bucket using a request to the gRPC API
Bucket bucket = storage.create(BucketInfo.of(bucketName));
System.out.printf("Bucket %s created.%n", bucket.getName());
}

Particularly when it comes to the media operations of Objects (reading[1][2], writing[3][4]), the generated StorageClient is very rough to use from a Java IO perspective, where as using com.google.cloud.storage.Storage#{reader,writer} you can back a java.nio.channel which can be used with any normal Java IO.

One large advantage to you today using the above code sample, is you can choose which transport you want at construction time and the code my team and I have written will take care of the details of what needs to happen when JSON over HTTP or gRPC.

[1] https://cloud.google.com/storage/docs/streaming-uploads#storage-stream-upload-object-java
[2] https://cloud.google.com/storage/docs/samples/storage-stream-file-download#storage_stream_file_download-java
[3] https://cloud.google.com/storage/docs/streaming-downloads#client-libraries
[4] https://cloud.google.com/storage/docs/samples/storage-stream-file-upload#storage_stream_file_upload-java

@BenWhitehead BenWhitehead added priority: p2 Moderately-important priority. Fix may not be included in next release. type: question Request for information or clarification. Not an issue. labels Jul 17, 2023
@martin-traverse
Copy link
Author

Hi Ben - thanks for the quick response.

I looked at the old API for quite a while. We have one place, our data service, where we really want to use async. Using the old API would require a lot of extra plumbing and work-arounds to make that work. We were about to do this plumbing when I saw the new API appear in the SDK and we decided to wait for that, because it solves a lot of problems for us.

Specifically, we have a generic data service where we want to run async, un-cached data transfers as streams, in both directions. As far as I can tell, all the options using the old API either use blocking calls or require a physical scratch location. Have I missed something?

In the data service we're sending moderately large (few GB) datasets and doing format translation as they pass through (e.g. CSV -> Arrow -> Parquet, then Parquet -> Arrow -> JSON), so we really want to process chunk-by-chunk. Depending on the format we might not know how big the dataset is, when a chunk will end etc. We already have a framework that handles all the streaming, conversion etc. for regular gRPC, so using StorageClient is pretty easy for us. Using the old Storage API, we'll need to synthesise a lot of this behaviour using worker pools and callbacks, which can be done but it will create a lot of extra scaffolding and potential for errors.

With the streaming data pattern our data service is really light weight. We can run it in a small container with no extra resources or attached volumes. We can complete avoid doing thread-per-request, blocking, buffering etc. For all our other components we can get away with a nice simple thread-per-request model, but our data transfer service is where we really want the streaming functionality. I can see lower down the stack in the SDK the async capabilities are there, but they are all masked from client code in the Storage API, presumably this is by design? Is there any way to get streaming / async functionality using the regular Storage API?

@BenWhitehead
Copy link
Collaborator

BenWhitehead commented Jul 18, 2023

Thank you for the additional detail.

Are you having multiple things write to the same object concurrently?

When interacting with GCS almost all operations are synchronous, (i.e. Listing objects proceeds a page at a time using the token from each page to fetch the next one, writing to an object there is head of line blocking, etc). In the case of WriteChannel for example the level of blocking is similar to that of BufferedOutputStream, there is a layer that buffers to some threshold and then flushes to the underlying stream. As part of this, we also ensure the bytes have made it to GCS and have been ACKed before we accept more bytes from the application. This ensues we don't grow buffers past their configured limits and provides a boundary to retry an individual chunk if the request to gcs is unsuccessful.

When using the WriteChannel returned by storage.writer the amount of data to buffer before sending to gcs can be configured by writer.setChunkSize(int). If you wanted to limit chunks to say 4MiB writer.setChunkSize(4 * 1024 * 1024) would configure the underlying buffering to only transmit up to 4 MiB of data to GCS in a single chunk.

For encoding composition it is also possible with the Read and Write channels we return. I'm not familiar with Parquet or Arrow's APIs so I'll stick to something I do have experience with. Compression.

Contrived example, but it illustrates the composition pattern.

Imagine you need to read an object, compress it and write a base64 encoded representation of the compress bytes to a new object.

import com.google.cloud.ReadChannel;
import com.google.cloud.WriteChannel;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.Storage.BlobWriteOption;
import com.google.cloud.storage.StorageOptions;
import com.google.common.io.ByteStreams;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.channels.Channels;
import java.util.zip.GZIPOutputStream;
import org.apache.commons.codec.binary.Base64OutputStream;

final class Compression {

  public static void main(String[] args) throws Exception {
    try (Storage storage = StorageOptions.grpc().build().getService()) {
      BlobId readFrom = BlobId.of("bucket", "object-in");
      BlobInfo writeTo = BlobInfo.newBuilder(BlobId.of("bucket", "object-out")).build();
      try (
          // 1. Open a read channel for the object to read from
          ReadChannel readChannel = storage.reader(readFrom);
          // 2. Make a stream from the channel since gzip is an OutputStream
          InputStream inputStream = Channels.newInputStream(readChannel);
          // 3. Open a write channel for the object to write to
          WriteChannel writeChannel = storage.writer(writeTo, BlobWriteOption.doesNotExist());
          // 4. Make a stream from the channel since gzip is an OutputStream
          OutputStream writeStream = Channels.newOutputStream(writeChannel);
          // 5. Wire in the base64 encoded representation
          Base64OutputStream b64OutputStream = new Base64OutputStream(writeStream);
          // 6. Create the gzip compression stream
          GZIPOutputStream gzip = new GZIPOutputStream(b64OutputStream)
      ) {
        // 7. define the acceptable chunk size that can be buffered into memory and can be written
        //    in a single request
        writeChannel.setChunkSize(4 * 1024 * 1024);
        // use a utility to copy all the bytes
        ByteStreams.copy(inputStream, gzip);
      }
    }
  }
}

All buffering in the library is currently exclusively in ram, and can be fully controlled. The default is 16 MiB for writer and 2MiB for Reader (though in the case of reader you can disable buffering all together by setting chunkSize to 0). In the future we will offer the option to use disks for buffering instead of ram, but that is not yet available.

For the simply Unary calls, our existing api does not provide any sort of ApiFuture today because we are presenting a single api for both transports. For the Object read and write paths, there isn't any async, in places where we can there is non-blocking io, while in others there is backpressure due to the semantics of dealing with media in GCS. We try pretty much everything in our power to ensure that there isn't library user visible transient error seen.

As for weight/impact to the running process. I've run benchmarks against the existing JSON implementation using -Xms128m -Xmx128m with 16 concurrent threads uploading and downloading multi gigibyte files without any memory issues or even impacting GCS pressure.

@martin-traverse
Copy link
Author

Hi Ben, thanks for all the extra info. In summary, the "Storage" class is a higher level API which hides a lot of details from client code (buffering, retries, API transport etc.) and is meant to "just work", it is only available with a blocking API (although some write operations are offloaded internally). The "StorageClient" is a lower level API which is more like a thin binding on the raw gRPC end points, it is available with blocking, future or streaming APIs as is standard for gRPC. Also "StorageClient" and the underlying gRPC API are still in public beta. Is this essentially the difference? Most people will want the former and this should be the default. However in our use case I think we do want the latter (reasons below).

Regarding the original issue in this ticket. Is it not possible to return a specialised delegate from writeObjectCallable which uses an interceptor to set the required metadata when the first message is sent? Per my understanding the first message should always contain the object spec, otherwise anyway the API will return an error. If that's not possible, would it be worth adding in the method documentation a quick note of how to set the missing header? Just to close this ticket off.

Regarding backpressure on the gRPC interface. In regular gRPC you have ClientCallStreamObserver and ServerCallStreamObserver to manage this - we are already using ServerCallStreamObserver on our public APIs. However it seems the GAX layer hides this away, even though there is a ClientCallStreamObserver just a couple of layers down. I'm guessing this needs to be raised as an issue in https://github.com/googleapis/sdk-platform-java rather than here?

@martin-traverse
Copy link
Author

On why we want to use the StorageClient bindings. Probably not hugely relevant to the issue but might be interesting! There are a few reasons.

  1. Our data service is already built on gRPC using the stream observer pattern as our API entry point. We build dynamic data pipelines and have a bunch of different pipeline stages, all of them are streaming and event driven. Using the gRPC bindings for the GCP storage layer will fit very naturally into this pattern. Delegating to a blocking implementation requires a lot more plumbing, especially for read streams. Meanwhile the gRPC APIs are almost exactly in line with our internal interfaces.

  2. We have multiple storage implementations (big 3 clouds, local FS + option for others by extension) and we need a consistent interface. If we want to avoid extra plumbing and move our interfaces onto a blocking-style API we will need to move everything, including all our codecs for format translation, the connection with our public APIs and other bits like elastic buffers etc.

  3. Using the streaming / event-based APIs encourages incremental processing. It is of course possible using single-threaded code, but to make it feed all the way down a dynamically constructed pipeline, I think it's a lot easier to have onChunk() style processing and allocate each pipeline to an event loop. We have datasets of moderate size (say < 100 GB) and we perform two format translations on each stream (input -> arrow -> output). Incremental processing lets us easily run many requests in parallel on a fairly standard container. Without it, we're into very big containers, thread context switching, limited concurrent requests etc.

  4. We want to optimise resources across multiple clients. It's quite easy for our customers to write a configuration that will spawn multiple storage instances on multiple backends, and we want to avoid ending up with dozens of different thread pools, allocators etc., which all need tuning and managing. Using the gRPC stack really helps with this (we have grpc-netty instead of grpc-netty-shaded). AWS SDK also sits on top of Netty and exposes those low-level options.

  5. Our solution is a platform, so we already use the platform layer to insulate users and app developers from the underlying APIs. Probably we do end up replicating some error handling, but we can keep that contained fairly well. Also all our operations are idempotent, which makes error handling a lot easier.

Some of these points are certainly debatable! Still, I think the event-based pattern is probably the right choice for our data component. For every other component we've done thread-per-request and that's fine. Using event-based streaming for our data service is blazing fast and runs with very few resources. Maybe it's possible to build a blocking version that has good performance, but I think it would be a lot harder to get right and it certainly wouldn't be better than what we have, plus it would be a hell of a lot of work. So I guess the conclusion is, we really want the async APIs! And gRPC is the absolute best for us, because we're already using it.

@BenWhitehead
Copy link
Collaborator

Thanks for the additional context and clarification.

A few comments:

the "Storage" class is a higher level API which hides a lot of details from client code (buffering, retries, API transport etc.) and is meant to "just work"...

This is correct

The "StorageClient" is a lower level API which is more like a thin binding on the raw gRPC end points, it is available with blocking, future or streaming APIs as is standard for gRPC.

The StorageClient is an automatically generated clients that provides additional features oin top of what you would get from the raw grpc stubs. Docs, methods, proto messages are all derived from the google/storage/v2/storage.proto definition.

Also "StorageClient" and the underlying gRPC API are still in public beta.

The StorageClient is actually tagged as alpha in it's version, we have not yet promoted it to Beta.

The underlying GCS gRPC API is still in private preview, so it is pre-alpha and has no SLA associated with its use at this time. Until such time as the GCS gRPC API is GA, your use of it will only receive best effort support if an issue were to arise.

...set the required metadata when the first message is sent

The metadata/header value must be present when the client stream is opened, irrespective of any message queued. Opening a client steam is an independent action from sending a message on the stream. The client stream could be opened and not have any message on it for some time.

would it be worth adding in the method documentation a quick note of how to set the missing header?

When the GCS gRPC API enters it's public preview phase there will be API Docs stating the constraints. These are not yet published, being in private preview.

Regarding backpressure on the gRPC interface.

Gax internally manages the request(count) and other stream related details. In general, gax is intended to be a higher level api than raw gRPC Stubs and to support other transports than gRPC with a unified API. Not all transports have the same concept of request(count) like gRPC. I suspect it is not likely for the same type of feature to be exposed on the gax api.

If you did need full gRPC level of controls, the raw grpc stubs are available, however you would lose all the conveniences from StorageClient and shift to manual channel management, request parameters for all requests, custom retries, etc.

@martin-traverse
Copy link
Author

martin-traverse commented Jul 21, 2023

Thanks Ben, that is all clear and very helpful.

Just one more question if I can. Do you have any plans to release an async version of the higher level "Storage" API, or async / streaming overloads for key methods? Both AWS and Azure do this by providing an alternative client interface (S3Client / S3AsyncClient and BlobClient / BlobAsyncClient).

I'm sure the blocking / thread-per-request model is what most people want, but there are cases where event-based is preferable, for example middleware or platform products like ours, and especially in the storage / data space.

@BenWhitehead
Copy link
Collaborator

The idea of an async centric API has been floated before, but we don't have an ETA at this time of when/how/if it will happen for the library.

@BenWhitehead
Copy link
Collaborator

The Google Cloud Storage product team plans to offer full support for use of the google-cloud-storage library, but not for the generated gapic client below.

Closing this issue at this time as it does not affect google-cloud-storage.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: storage Issues related to the googleapis/java-storage API. priority: p2 Moderately-important priority. Fix may not be included in next release. type: question Request for information or clarification. Not an issue.
Projects
None yet
Development

No branches or pull requests

2 participants