diff --git a/pubsub/README.md b/pubsub/README.md new file mode 100644 index 0000000000..193d9bbcfa --- /dev/null +++ b/pubsub/README.md @@ -0,0 +1,25 @@ +## Pub/Sub Samples + +These samples require two environment variables to be set: + +- `GOOGLE_APPLICATION_CREDENTIALS` - Path to a service account file. You can +download one from your Google project's "permissions" page. +- `TEST_PROJECT_ID` - Id of your Google project. + +## Run a sample + +Install dependencies: + + npm install + +To print available commands: + + npm run + +Execute a sample: + + npm run + +Example: + + npm run subscription diff --git a/pubsub/iam.js b/pubsub/iam.js new file mode 100644 index 0000000000..2ab999f3a1 --- /dev/null +++ b/pubsub/iam.js @@ -0,0 +1,183 @@ +// Copyright 2016, Google, Inc. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +'use strict'; + +var async = require('async'); +var utils = require('./subscription'); +var createTopic = utils.createTopic; +var subscribe = utils.subscribe; + +// [START get_topic_policy] +function getTopicPolicy(topic, callback) { + // Retrieve the IAM policy for the provided topic + topic.iam.getPolicy(callback); +} +// [END get_topic_policy] + +// [START get_subscription_policy] +function getSubscriptionPolicy(subscription, callback) { + // Retrieve the IAM policy for the provided subscription + subscription.iam.getPolicy(callback); +} +// [END get_subscription_policy] + +// [START set_topic_policy] +function setTopicPolicy(topic, callback) { + // Policy update + var myPolicy = { + bindings: [ + { + role: 'roles/pubsub.subscriber', + members: ['serviceAccount:myotherproject@appspot.gserviceaccount.com'] + } + ] + }; + + // Retrieve the IAM policy for the provided topic + topic.iam.setPolicy(myPolicy, callback); +} +// [END set_topic_policy] + +// [START set_subscription_policy] +function setSubscriptionPolicy(subscription, callback) { + // Policy update + var myPolicy = { + bindings: [ + { + role: 'roles/pubsub.subscriber', + members: ['serviceAccount:myotherproject@appspot.gserviceaccount.com'] + } + ] + }; + + // Retrieve the IAM policy for the provided subscription + subscription.iam.setPolicy(myPolicy, callback); +} +// [END set_subscription_policy] + +// [START test_topic_permissions] +function testTopicPermissions(topic, callback) { + var tests = [ + 'pubsub.topics.attachSubscription', + 'pubsub.topics.publish', + 'pubsub.topics.update' + ]; + + // Retrieve the IAM policy for the provided topic + topic.iam.testPermissions(tests, callback); +} +// [END test_topic_permissions] + +// [START test_subscription_permissions] +function testSubscriptionPermissions(subscription, callback) { + var tests = [ + 'pubsub.subscriptions.consume', + 'pubsub.subscriptions.update' + ]; + + // Retrieve the IAM policy for the provided subscription + subscription.iam.testPermissions(tests, callback); +} +// [END test_subscription_permissions] + +exports.getTopicPolicy = getTopicPolicy; +exports.getSubscriptionPolicy = getSubscriptionPolicy; +exports.setTopicPolicy = setTopicPolicy; +exports.setSubscriptionPolicy = setSubscriptionPolicy; +exports.testTopicPermissions = testTopicPermissions; +exports.testSubscriptionPermissions = testSubscriptionPermissions; +exports.runSample = runSample; + +function runSample(callback) { + var _subscription; + var _topic; + // Gather responses + var responses = []; + async.waterfall([ + function (cb) { + console.log('create topic...'); + createTopic(cb); + }, + function (topic, apiResponse, cb) { + _topic = topic; + responses.push([topic, apiResponse]); + console.log('created topic'); + console.log('get topic IAM policy...'); + getTopicPolicy(topic, cb); + }, + function (policy, apiResponse, cb) { + responses.push([policy, apiResponse]); + console.log('got topic policy', policy); + // IAM server is down? + // console.log('updating topic policy...'); + // setTopicPolicy(_topic, cb); + // }, + // function (policy, apiResponse, cb) { + // responses.push([policy, apiResponse]); + // console.log('updated topic policy', policy); + console.log('testing topic permissions...'); + testTopicPermissions(_topic, cb); + }, + function (permissions, apiResponse, cb) { + responses.push([permissions, apiResponse]); + console.log('tested topic permissions', permissions); + console.log('create subscription...'); + subscribe(cb); + }, + function (subscription, apiResponse, cb) { + _subscription = subscription; + responses.push([subscription, apiResponse]); + console.log('created subscription'); + console.log('get subscription IAM policy...'); + getSubscriptionPolicy(subscription, cb); + }, + function (policy, apiResponse, cb) { + responses.push([policy, apiResponse]); + console.log('got subscription policy', policy); + // IAM server is down? + // console.log('updating subscription policy...'); + // setSubscriptionPolicy(_subscription, cb); + // }, + // function (policy, apiResponse, cb) { + // responses.push([policy, apiResponse]); + // console.log('updated subscription policy', policy); + console.log('testing subscription permissions...'); + testSubscriptionPermissions(_subscription, cb); + }, + function (permissions, apiResponse, cb) { + responses.push([permissions, apiResponse]); + console.log('tested subscription permissions', permissions); + console.log('deleting subscription...'); + _subscription.delete(cb); + }, + function (apiResponse, cb) { + console.log('deleted subscription'); + console.log('deleting topic...'); + _topic.delete(cb); + } + ], function (err) { + if (err) { + console.error(err); + } else { + console.log('deleted topic'); + } + if (typeof callback === 'function') { + callback(err, responses); + } + }); +} + +if (module === require.main) { + runSample(); +} diff --git a/pubsub/package.json b/pubsub/package.json new file mode 100644 index 0000000000..564b9a5d29 --- /dev/null +++ b/pubsub/package.json @@ -0,0 +1,17 @@ +{ + "name": "nodejs-docs-samples-pubsub", + "description": "Node.js samples for Google Cloud Pub/Sub.", + "version": "0.0.1", + "private": true, + "license": "Apache Version 2.0", + "engines": { + "node": ">=0.10.x" + }, + "scripts": { + "iam": "node iam.js", + "subscription": "node subscription.js" + }, + "dependencies": { + "gcloud": "^0.27.0" + } +} diff --git a/pubsub/subscription.js b/pubsub/subscription.js new file mode 100644 index 0000000000..83e77853d3 --- /dev/null +++ b/pubsub/subscription.js @@ -0,0 +1,246 @@ +// Copyright 2016, Google, Inc. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +'use strict'; + +var async = require('async'); + +// [START auth] +// You must set these environment variables to run this sample +var projectId = process.env.TEST_PROJECT_ID; +var keyFilename = process.env.GOOGLE_APPLICATION_CREDENTIALS; + +// If you don't set the environment variables, then you can modify this file +// to set the values +projectId = projectId || ''; +keyFilename = keyFilename || '/path/to/keyfile.json'; + +// [START require] +// Provide projectId and authentication to gcloud +var gcloud = require('gcloud')({ + projectId: projectId, + keyFilename: keyFilename +}); + +// Get a reference to the pubsub component +var pubsub = gcloud.pubsub(); +// [END auth] + +// [START create_topic] +function createTopic(callback) { + var topicName = 'messageCenter'; + + var topic = pubsub.topic(topicName); + + // Get the topic if it exists. Create it if it does not exist. + topic.get({ + autoCreate: true + }, callback); +} +// [END create_topic] + +// [START publish] +function publish(callback) { + var topicName = 'messageCenter'; + + // Grab a reference to an existing topic + var topic = pubsub.topic(topicName); + + // Publish a message to the topic + topic.publish({ + data: 'Hello, world!' + }, callback); +} +// [END publish] + +// [START list_topics] +function getAllTopics(callback) { + // Grab paginated topics + pubsub.getTopics(function (err, topics, nextQuery) { + // Quit on error + if (err) { + return callback(err); + } + // There is another page of topics + if (nextQuery) { + // Grab the remaining pages of topics recursively + return getAllTopics(function (err, _topics) { + if (_topics) { + topics = topics.concat(_topics); + } + callback(err, topics); + }); + } + // Last page of topics + return callback(err, topics); + }); +} +// [END list_topics] + +// [START get_all_subscriptions] +function getAllSubscriptions(callback) { + // Grab paginated subscriptions + pubsub.getSubscriptions(function (err, subscriptions, nextQuery) { + // Quit on error + if (err) { + return callback(err); + } + // There is another page of subscriptions + if (nextQuery) { + // Grab the remaining pages of subscriptions recursively + return getAllSubscriptions(function (err, _subscriptions) { + if (_subscriptions) { + subscriptions = subscriptions.concat(_subscriptions); + } + callback(err, subscriptions); + }); + } + // Last page of subscriptions + return callback(err, subscriptions); + }); +} +// [END get_all_subscriptions] + +// [START create_subscription] +function subscribe(callback) { + var topicName = 'messageCenter'; + var subscriptionName = 'newMessages'; + + var options = { + reuseExisting: true + }; + pubsub.subscribe(topicName, subscriptionName, options, callback); +} +// [END create_subscription] + +// [START pull_messages] +function pullMessages(callback) { + // Create a topic + createTopic(function (err) { + if (err) { + return callback(err); + } + // Create a subscription to the topic + subscribe(function (err, subscription) { + if (err) { + return callback(err); + } + var options = { + // Limit the amount of messages pulled. + maxResults: 100, + // If set, the system will respond immediately. Otherwise, wait until + // new messages are available. Returns if timeout is reached. + returnImmediately: false + }; + // Pull any messages on the subscription + subscription.pull(options, function (err, messages) { + if (err) { + return callback(err); + } + + // Do something with messages here? + + // Acknowledge messages + subscription.ack(messages.map(function (message) { + return message.ackId; + }), function (err) { + if (err) { + return callback(err); + } + callback(null, messages); + }); + }); + }); + }); +} +// [END pull_messages] + +exports.createTopic = createTopic; +exports.publish = publish; +exports.getAllTopics = getAllTopics; +exports.getAllSubscriptions = getAllSubscriptions; +exports.pullMessages = pullMessages; +exports.subscribe = subscribe; +exports.pubsub = pubsub; +exports.runSample = runSample; + +function runSample(callback) { + var _subscription; + var _topic; + // Gather responses + var responses = []; + async.waterfall([ + function (cb) { + console.log('create topic...'); + createTopic(cb); + }, + function (topic, apiResponse, cb) { + _topic = topic; + responses.push([topic, apiResponse]); + console.log('created topic'); + console.log('create subscription...'); + subscribe(cb); + }, + function (subscription, apiResponse, cb) { + _subscription = subscription; + responses.push([subscription, apiResponse]); + console.log('created subscription'); + console.log('list all topics...'); + getAllTopics(cb); + }, + function (topics, cb) { + responses.push([topics]); + console.log('got all topics'); + console.log('list all subscriptions...'); + getAllSubscriptions(cb); + }, + function (subscriptions, cb) { + responses.push([subscriptions]); + console.log('got all subscriptions'); + console.log('publishing a message...'); + publish(cb); + }, + function (messageIds, apiResponse, cb) { + responses.push([messageIds, apiResponse]); + console.log('published message'); + console.log('pulling messages...'); + pullMessages(cb); + }, + function (messages, cb) { + responses.push([messages]); + console.log('got messages', messages.map(function (message) { + return message.data; + })); + console.log('deleting subscription...'); + _subscription.delete(cb); + }, + function (apiResponse, cb) { + console.log('deleted subscription'); + console.log('deleting topic...'); + _topic.delete(cb); + } + ], function (err) { + if (err) { + console.error(err); + } else { + console.log('deleted topic'); + } + if (typeof callback === 'function') { + callback(err, responses); + } + }); +} + +if (module === require.main) { + runSample(); +} diff --git a/test/pubsub/iam.test.js b/test/pubsub/iam.test.js new file mode 100644 index 0000000000..89467d2615 --- /dev/null +++ b/test/pubsub/iam.test.js @@ -0,0 +1,67 @@ +// Copyright 2016, Google, Inc. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +'use strict'; + +var assert = require('assert'); +var projectId = process.env.TEST_PROJECT_ID; + +var iamSample = require('../../pubsub/iam'); + +describe('pubsub/iam', function () { + it('should run the sample', function (done) { + this.timeout(30000); + iamSample.runSample(function (err, responses) { + try { + assert.ok(err === null); + // topic + var expectedTopic = 'projects/' + projectId + '/topics/messageCenter'; + assert.equal(responses[0][0].name, expectedTopic); + assert.ok(responses[0][0].iam); + // apiResponse + assert.ok(responses[0][1]); + // policy + assert.deepEqual(responses[1][0], { etag: 'ACAB' }); + // apiResponse + assert.ok(responses[1][1]); + // permissions + assert.deepEqual(responses[2][0], { + 'pubsub.topics.attachSubscription': true, + 'pubsub.topics.publish': true, + 'pubsub.topics.update': true + }) + // apiResponse + assert.ok(responses[2][1]); + // subscription + assert.ok(responses[3][0].on); + assert.ok(responses[3][0].iam); + // apiResponse + assert.ok(responses[3][1]); + // policy + assert.deepEqual(responses[4][0], { etag: 'ACAB' }); + // apiResponse + assert.ok(responses[4][1]); + // permissions + assert.deepEqual(responses[5][0], { + 'pubsub.subscriptions.consume': true, + 'pubsub.subscriptions.update': true + }) + // apiResponse + assert.ok(responses[5][1]); + done(); + } catch (err) { + done(err); + } + }); + }); +}); diff --git a/test/pubsub/subscription.test.js b/test/pubsub/subscription.test.js new file mode 100644 index 0000000000..311490c096 --- /dev/null +++ b/test/pubsub/subscription.test.js @@ -0,0 +1,56 @@ +// Copyright 2016, Google, Inc. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +'use strict'; + +var assert = require('assert'); +var projectId = process.env.TEST_PROJECT_ID; + +var subscriptionSample = require('../../pubsub/subscription'); + +describe('pubsub/subscription', function () { + it('should run the sample', function (done) { + this.timeout(30000); + subscriptionSample.runSample(function (err, responses) { + try { + assert.ok(err === null); + // topic + var expectedTopic = 'projects/' + projectId + '/topics/messageCenter'; + assert.equal(responses[0][0].name, expectedTopic); + assert.ok(responses[0][0].iam); + // apiResponse + assert.ok(responses[0][1]); + // subscription + assert.ok(responses[1][0].on); + assert.ok(responses[1][0].iam); + // apiResponse + assert.ok(responses[1][1]); + // topics + assert.equal(responses[2][0][0].name, expectedTopic); + assert.ok(responses[2][0][0].iam); + // subscriptions + assert.ok(responses[3][0][0].on); + assert.ok(responses[3][0][0].iam); + // messageIds + assert.ok(typeof responses[4][0][0] === 'string'); + // apiResponse + assert.ok(responses[4][1]); + // messages + assert.equal(responses[5][0][0].data, 'Hello, world!'); + done(); + } catch (err) { + done(err); + } + }); + }); +});