Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Google Cloud Pub/Sub #1085

Merged
merged 70 commits into from
Jun 28, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
a0fd43a
Initial GAX submission - generated classes only
garrettjonesgoogle Oct 30, 2015
66e176c
Updating README.md to latest form
garrettjonesgoogle Nov 2, 2015
23294dc
Updating version and installation module
garrettjonesgoogle Nov 2, 2015
19da84d
Adding GAX classes
garrettjonesgoogle Nov 3, 2015
a884d58
Adding implicit dependency, using constant instead of hardcoded string
garrettjonesgoogle Nov 4, 2015
e184b32
Initial submission of generated pubsub protobuf classes
garrettjonesgoogle Nov 4, 2015
980ba88
Adding all sections to readme, with TODOs for missing content
garrettjonesgoogle Nov 5, 2015
fce95f9
Generated Pub/Sub client classes and unit tests
garrettjonesgoogle Nov 10, 2015
edd73cc
Fixing source paths and module configuration
garrettjonesgoogle Nov 11, 2015
e26198e
Updates to address PR comments.
garrettjonesgoogle Nov 17, 2015
c291344
More updates to address PR comments
garrettjonesgoogle Nov 19, 2015
6cc6456
Third round of updates to address PR comments
garrettjonesgoogle Nov 20, 2015
48b49b7
Delete duplicate LocalPublisherImpl.java
pongad Nov 30, 2015
34f170c
merge from master
aozarov Feb 5, 2016
1671c0e
disable javadoc generation for java 8 on gcloud-java-pubsub
aozarov Feb 6, 2016
a6be66f
Removing GAX from gcloud-java
garrettjonesgoogle Feb 18, 2016
78179b4
Removing generated files from gcloud-java-pubsub
garrettjonesgoogle Feb 18, 2016
44dd3d2
Switching dependencies, regenerating code
garrettjonesgoogle Feb 18, 2016
c160a25
Using version of GAX compatible with Java 1.7
garrettjonesgoogle Feb 24, 2016
c286b88
Fixing doc problems, improving formatting
garrettjonesgoogle Feb 25, 2016
39b713a
Fixing javadoc error
garrettjonesgoogle Feb 25, 2016
4ab3977
Regenerating code, new settings classes
garrettjonesgoogle Mar 4, 2016
20c9bd3
Updating to GAX 0.0.3
garrettjonesgoogle Mar 8, 2016
f253c54
Surface updates from internal review
garrettjonesgoogle Mar 10, 2016
0b7b39a
Using resurrected ServiceApiSettings in Settings classes
garrettjonesgoogle Mar 11, 2016
7282c7a
Updates due to code gen
garrettjonesgoogle Mar 11, 2016
e2b1969
Removing newSettings() method from XApi classes
garrettjonesgoogle Mar 16, 2016
878ae88
Fixing build from last change
garrettjonesgoogle Mar 16, 2016
7252fca
Putting spi files under spi.v1
garrettjonesgoogle Mar 16, 2016
bece1d4
Bundling descriptor for Publish
garrettjonesgoogle Mar 18, 2016
3c2b7f5
Regenerating (fixing style issues)
garrettjonesgoogle Mar 22, 2016
980d39e
Update with latest surface fixes based Java export review.
shinfan Mar 29, 2016
72b1442
Update pom to fix the dependency issues.
shinfan Mar 29, 2016
ababe05
Update the grpc dependency of pubsub java.
shinfan Mar 29, 2016
7c68c8e
Update pubsub client with latest surface changes. (#885)
shinfan Apr 11, 2016
613a5cc
Rename com.google.gcloud to com.google.cloud (#907)
garrettjonesgoogle Apr 12, 2016
87ff28e
update pubsub pom
aozarov Apr 13, 2016
9b2f341
Update gcloud pubsub. (#951)
shinfan Apr 22, 2016
7c56fc2
Add custom port support to LocalPubsubHelper (#956)
shinfan Apr 25, 2016
2cd4b0c
Add PubSub API prototype and related classes (#962)
mziccard Apr 28, 2016
9ec67ad
Add javadoc and unit tests for TopicInfo (#976)
mziccard May 3, 2016
8586506
Add AsyncPage implementation and docs. Add related tests (#974)
mziccard May 4, 2016
f9a9ed0
Updating pubsub to match gax-java 0.0.12 (#981)
michaelbausor May 5, 2016
05d39cc
Add javadoc and unit tests for SubscriptionInfo (#977)
mziccard May 5, 2016
e9f247b
Update LocalPubsubHelper to be compatible with beta emulators. (#992)
shinfan May 6, 2016
697803f
Add TopicId and SubscriptionId classes (#984)
mziccard May 10, 2016
a427d89
Add base class for operation options, javadoc and tests (#996)
mziccard May 11, 2016
9ae7fda
Add options() method and project name to LocalPubsubHelper (#999)
mziccard May 11, 2016
238c7d1
Add tests and javadoc for Message and ByteArray (#1000)
mziccard May 12, 2016
cb92ee4
Add javadoc and tests for PushConfig (#1004)
mziccard May 12, 2016
916dbb2
Add base service option classes for gRPC and HTTP services (#1011)
mziccard May 16, 2016
4053013
Regenerated pubsub surface (#1009)
michaelbausor May 17, 2016
a9e3873
Implement Pub/Sub management methods, add javadoc and tests (#1015)
mziccard May 22, 2016
47c4c1c
Implement modifyAckDeadline methods, add javadoc and tests (#1022)
mziccard May 25, 2016
ce2ace0
Fix deleted topic name: _deleted_topic_ -> _deleted-topic_ (#1023)
mziccard May 26, 2016
07a45e4
Add javadoc and tests for functional Topic class (#1021)
mziccard May 28, 2016
47046e6
Implement ack and nack methods, add javadoc and tests (#1027)
mziccard Jun 1, 2016
174849f
Add javadoc and tests for functional ReceivedMessage class (#1038)
mziccard Jun 1, 2016
f4e4e26
Add AckDeadlineRenewer class for automatic ack deadline renewal (#1031)
mziccard Jun 3, 2016
54e179c
Implement Iterator pull methods, add javadoc and tests (#1041)
mziccard Jun 6, 2016
337a8ef
Fix PubSub Iterator pullAsync: add callback to PubSubRpc.pull (#1048)
mziccard Jun 14, 2016
f6f552e
Add MessageConsumerImpl class, implement pullAsync, add tests (#1043)
mziccard Jun 22, 2016
d7ac46d
Add serialization test for PubSub classes (#1072)
mziccard Jun 23, 2016
933c985
Add javadoc and tests for Subscription (#1074)
mziccard Jun 24, 2016
fec0e12
Add docs to PubSub spi layer (#1066)
mziccard Jun 24, 2016
57883ae
Add PubSub examples, update READMEs and package-info javadoc (#1075)
mziccard Jun 24, 2016
2553b89
Support setting library header in grpc services (#1078)
mziccard Jun 24, 2016
098ab65
Add integration tests to pubsub (#1080)
mziccard Jun 26, 2016
fb3e663
Update grpc-related dependencies, remove gax dependency from pubsub
mziccard Jun 27, 2016
aefce68
Fix broken link in main README
mziccard Jun 28, 2016
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ before_install:
- cp target/travis/settings.xml ~/.m2/settings.xml
install: mvn install -DskipTests=true -Dgpg.skip=true
script:
- travis_wait 30 utilities/verify.sh
- travis_wait 60 utilities/verify.sh
after_success:
- utilities/after_success.sh
env:
Expand Down
42 changes: 42 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ This client supports the following Google Cloud Platform services:
- [Google Cloud Compute] (#google-cloud-compute-alpha) (Alpha)
- [Google Cloud Datastore] (#google-cloud-datastore)
- [Google Cloud DNS] (#google-cloud-dns-alpha) (Alpha)
- [Google Cloud Pub/Sub] (#google-cloud-pubsub-alpha) (Alpha - Not working on App Engine Standard)
- [Google Cloud Resource Manager] (#google-cloud-resource-manager-alpha) (Alpha)
- [Google Cloud Storage] (#google-cloud-storage)

Expand Down Expand Up @@ -62,6 +63,8 @@ Example Applications
- [`Flexible Environment/Datastore example`](https://github.com/GoogleCloudPlatform/java-docs-samples/tree/master/managed_vms/datastore) - A simple app that uses Cloud Datastore to list the last 10 IP addresses that visited your site.
- Read about how to run the application [here](https://github.com/GoogleCloudPlatform/java-docs-samples/blob/master/managed_vms/README.md).
- [`Flexible Environment/Storage example`](https://github.com/GoogleCloudPlatform/java-docs-samples/tree/master/managed_vms/cloudstorage) - An app that uploads files to a public Cloud Storage bucket on the App Engine Flexible Environment runtime.
- [`PubSubExample`](./gcloud-java-examples/src/main/java/com/google/cloud/examples/pubsub/PubSubExample.java) - A simple command line interface providing some of Cloud Pub/Sub's functionality
- Read more about using this application on the [`PubSubExample` docs page](http://googlecloudplatform.github.io/gcloud-java/apidocs/?com/google/cloud/examples/pubsub/PubSubExample.html).
- [`ResourceManagerExample`](./gcloud-java-examples/src/main/java/com/google/cloud/examples/resourcemanager/ResourceManagerExample.java) - A simple command line interface providing some of Cloud Resource Manager's functionality
- Read more about using this application on the [`ResourceManagerExample` docs page](http://googlecloudplatform.github.io/gcloud-java/apidocs/?com/google/cloud/examples/resourcemanager/ResourceManagerExample.html).
- [`SparkDemo`](https://github.com/GoogleCloudPlatform/java-docs-samples/blob/master/managed_vms/sparkjava) - An example of using `gcloud-java-datastore` from within the SparkJava and App Engine Flexible Environment frameworks.
Expand Down Expand Up @@ -368,6 +371,44 @@ ChangeRequestInfo changeRequest = changeBuilder.build();
zone.applyChangeRequest(changeRequest);
```

Google Cloud Pub/Sub (Alpha)
----------------------

- [API Documentation][pubsub-api]
- [Official Documentation][cloud-pubsub-docs]

#### Preview

Here is a code snippet showing a simple usage example from within Compute Engine/App Engine
Flexible. Note that you must [supply credentials](#authentication) and a project ID if running this
snippet elsewhere. Complete source code can be found at
[CreateSubscriptionAndPullMessages.java](./gcloud-java-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateSubscriptionAndPullMessages.java).

```java
import com.google.cloud.pubsub.Message;
import com.google.cloud.pubsub.PubSub;
import com.google.cloud.pubsub.PubSub.MessageConsumer;
import com.google.cloud.pubsub.PubSub.MessageProcessor;
import com.google.cloud.pubsub.PubSubOptions;
import com.google.cloud.pubsub.Subscription;
import com.google.cloud.pubsub.SubscriptionInfo;

try (PubSub pubsub = PubSubOptions.defaultInstance().service()) {
Subscription subscription =
pubsub.create(SubscriptionInfo.of("test-topic", "test-subscription"));
MessageProcessor callback = new MessageProcessor() {
@Override
public void process(Message message) throws Exception {
System.out.printf("Received message \"%s\"%n", message.payloadAsString());
}
};
// Create a message consumer and pull messages (for 60 seconds)
try (MessageConsumer consumer = subscription.pullAsync(callback)) {
Thread.sleep(60_000);
}
}
```

Google Cloud Resource Manager (Alpha)
----------------------

Expand Down Expand Up @@ -513,6 +554,7 @@ Apache 2.0 - See [LICENSE] for more information.
[cloud-dns-docs]: https://cloud.google.com/dns/docs
[cloud-dns-activation]: https://console.cloud.google.com/start/api?id=dns

[pubsub-api]: http://googlecloudplatform.github.io/gcloud-java/apidocs/index.html?com/google/cloud/pubsub/package-summary.html
[cloud-pubsub]: https://cloud.google.com/pubsub/
[cloud-pubsub-docs]: https://cloud.google.com/pubsub/docs

Expand Down
41 changes: 41 additions & 0 deletions TESTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,5 +174,46 @@ Here is an example that clears the dataset created in Step 3.
RemoteBigQueryHelper.forceDelete(bigquery, dataset);
```

### Testing code that uses Pub/Sub

#### On your machine

You can test against a temporary local Pub/Sub by following these steps:

1. Start the local Pub/Sub emulator before running your tests using `LocalPubSubHelper`'s `create`
and `start` methods. This will bind a port for communication with the local Pub/Sub emulator.
```java
LocalPubSubHelper helper = LocalPubSubHelper.create();

helper.start(); // Starts the local Pub/Sub emulator in a separate process
```

2. Create and use a `PubSub` object with the options given by the `LocalPubSubHelper` instance. For
example:
```java
PubSub localPubsub = helper.options().service();
```

3. Run your tests.

4. Stop the local Pub/Sub emulator by calling the `stop()` method, like so:
```java
helper.stop();
```

#### On a remote machine

You can test against a remote Pub/Sub emulator as well. To do this, set the `PubSubOptions` project
endpoint to the hostname of the remote machine, like the example below.

```java
PubSubOptions options = PubSubOptions.builder()
.projectId("my-project-id") // must match project ID specified on remote machine
.host("<hostname of machine>:<port>")
.authCredentials(AuthCredentials.noAuth())
.build();
PubSub localPubsub= options.service();
```

[cloud-platform-storage-authentication]:https://cloud.google.com/storage/docs/authentication?hl=en#service_accounts
[create-service-account]:https://developers.google.com/identity/protocols/OAuth2ServiceAccount#creatinganaccount
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,19 @@

package com.google.cloud.bigquery;

import com.google.cloud.ServiceOptions;
import com.google.cloud.HttpServiceOptions;
import com.google.cloud.bigquery.spi.BigQueryRpc;
import com.google.cloud.bigquery.spi.BigQueryRpcFactory;
import com.google.cloud.bigquery.spi.DefaultBigQueryRpc;
import com.google.common.collect.ImmutableSet;

import java.util.Set;

public class BigQueryOptions extends ServiceOptions<BigQuery, BigQueryRpc, BigQueryOptions> {
public class BigQueryOptions extends HttpServiceOptions<BigQuery, BigQueryRpc, BigQueryOptions> {

private static final String BIGQUERY_SCOPE = "https://www.googleapis.com/auth/bigquery";
private static final Set<String> SCOPES = ImmutableSet.of(BIGQUERY_SCOPE);
private static final long serialVersionUID = -215981591481708043L;
private static final long serialVersionUID = -8592198255032667206L;

public static class DefaultBigqueryFactory implements BigQueryFactory {

Expand All @@ -51,7 +51,7 @@ public BigQueryRpc create(BigQueryOptions options) {
}

public static class Builder extends
ServiceOptions.Builder<BigQuery, BigQueryRpc, BigQueryOptions, Builder> {
HttpServiceOptions.Builder<BigQuery, BigQueryRpc, BigQueryOptions, Builder> {

private Builder() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,19 @@

package com.google.cloud.compute;

import com.google.cloud.ServiceOptions;
import com.google.cloud.HttpServiceOptions;
import com.google.cloud.compute.spi.ComputeRpc;
import com.google.cloud.compute.spi.ComputeRpcFactory;
import com.google.cloud.compute.spi.DefaultComputeRpc;
import com.google.common.collect.ImmutableSet;

import java.util.Set;

public class ComputeOptions extends ServiceOptions<Compute, ComputeRpc, ComputeOptions> {
public class ComputeOptions extends HttpServiceOptions<Compute, ComputeRpc, ComputeOptions> {

private static final String COMPUTE_SCOPE = "https://www.googleapis.com/auth/compute";
private static final Set<String> SCOPES = ImmutableSet.of(COMPUTE_SCOPE);
private static final long serialVersionUID = 6509557711917342058L;
private static final long serialVersionUID = 5074781985597996770L;

public static class DefaultComputeFactory implements ComputeFactory {

Expand All @@ -51,7 +51,7 @@ public ComputeRpc create(ComputeOptions options) {
}

public static class Builder extends
ServiceOptions.Builder<Compute, ComputeRpc, ComputeOptions, Builder> {
HttpServiceOptions.Builder<Compute, ComputeRpc, ComputeOptions, Builder> {

private Builder() {
}
Expand Down
10 changes: 10 additions & 0 deletions gcloud-java-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -98,5 +98,15 @@
<version>3.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.0.0-beta-3</version>
</dependency>
<dependency>
<groupId>com.google.api</groupId>
<artifactId>gax</artifactId>
<version>0.0.13</version>
</dependency>
</dependencies>
</project>
52 changes: 52 additions & 0 deletions gcloud-java-core/src/main/java/com/google/cloud/AsyncPage.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2016 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud;

import java.util.concurrent.Future;

/**
* Interface for asynchronously consuming Google Cloud paginated results.
*
* <p>Use {@code AsyncPage} to iterate through all values (also in next pages):
* <pre> {@code
* AsyncPage<T> page = ...; // get an AsyncPage<T> instance
* Iterator<T> iterator = page.iterateAll();
* while (iterator.hasNext()) {
* T value = iterator.next();
* // do something with value
* }}</pre>
*
* <p>Or handle pagination explicitly:
* <pre> {@code
* AsyncPage<T> page = ...; // get a AsyncPage<T> instance
* while (page != null) {
* for (T value : page.values()) {
* // do something with value
* }
* page = page.nextPageAsync().get();
* }}</pre>
*
* @param <T> the value type that the page holds
*/
public interface AsyncPage<T> extends Page<T> {

/**
* Returns a {@link Future} object for the next page. {@link Future#get()} returns {@code null} if
* the last page has been reached.
*/
Future<AsyncPage<T>> nextPageAsync();
}
83 changes: 83 additions & 0 deletions gcloud-java-core/src/main/java/com/google/cloud/AsyncPageImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright 2016 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud;

import com.google.common.base.Throwables;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.Uninterruptibles;

import java.io.Serializable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

/**
* Base implementation for asynchronously consuming Google Cloud paginated results.
*
* @param <T> the value type that the page holds
*/
public class AsyncPageImpl<T> extends PageImpl<T> implements AsyncPage<T> {

private static final long serialVersionUID = -6009473188630364906L;

private final NextPageFetcher<T> asyncPageFetcher;

/**
* Interface for asynchronously fetching the next page of results from the service.
*
* @param <T> the value type that the page holds
*/
public interface NextPageFetcher<T> extends Serializable {
Future<AsyncPage<T>> nextPage();
}

private static class SyncNextPageFetcher<T> implements PageImpl.NextPageFetcher<T> {

private static final long serialVersionUID = -4124568632363525351L;

private final NextPageFetcher<T> asyncPageFetcher;

private SyncNextPageFetcher(NextPageFetcher<T> asyncPageFetcher) {
this.asyncPageFetcher = asyncPageFetcher;
}

@Override
public Page<T> nextPage() {
try {
return asyncPageFetcher != null
? Uninterruptibles.getUninterruptibly(asyncPageFetcher.nextPage()) : null;
} catch (ExecutionException ex) {
throw Throwables.propagate(ex.getCause());
}
}
}

/**
* Creates an {@code AsyncPageImpl} object.
*/
public AsyncPageImpl(NextPageFetcher<T> asyncPageFetcher, String cursor, Iterable<T> results) {
super(new SyncNextPageFetcher<T>(asyncPageFetcher), cursor, results);
this.asyncPageFetcher = asyncPageFetcher;
}

@Override
public Future<AsyncPage<T>> nextPageAsync() {
if (nextPageCursor() == null || asyncPageFetcher == null) {
return Futures.immediateCheckedFuture(null);
}
return asyncPageFetcher.nextPage();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.google.api.client.googleapis.json.GoogleJsonError;
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.gax.grpc.ApiException;
import com.google.common.base.MoreObjects;

import java.io.IOException;
Expand Down Expand Up @@ -167,6 +168,16 @@ public BaseServiceException(int code, String message, String reason, boolean ide
this.debugInfo = null;
}

public BaseServiceException(ApiException apiException, boolean idempotent) {
super(apiException.getMessage(), apiException);
this.code = apiException.getStatusCode().value();
this.reason = apiException.getStatusCode().name();
this.idempotent = idempotent;
this.retryable = apiException.isRetryable();
this.location = null;
this.debugInfo = null;
}

protected Set<Error> retryableErrors() {
return Collections.emptySet();
}
Expand Down
Loading