From 738a7302c96a01b856b5ff30a14650d247d45831 Mon Sep 17 00:00:00 2001 From: Jason Dobry Date: Fri, 22 Jan 2016 12:27:01 -0800 Subject: [PATCH] Added Pub/Sub samples. --- pubsub/README.md | 25 +++++ pubsub/iam.js | 165 ++++++++++++++++++++++++++++++ pubsub/package.json | 17 ++++ pubsub/subscription.js | 225 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 432 insertions(+) create mode 100644 pubsub/README.md create mode 100644 pubsub/iam.js create mode 100644 pubsub/package.json create mode 100644 pubsub/subscription.js diff --git a/pubsub/README.md b/pubsub/README.md new file mode 100644 index 00000000000..193d9bbcfa6 --- /dev/null +++ b/pubsub/README.md @@ -0,0 +1,25 @@ +## Pub/Sub Samples + +These samples require two environment variables to be set: + +- `GOOGLE_APPLICATION_CREDENTIALS` - Path to a service account file. You can +download one from your Google project's "permissions" page. +- `TEST_PROJECT_ID` - Id of your Google project. + +## Run a sample + +Install dependencies: + + npm install + +To print available commands: + + npm run + +Execute a sample: + + npm run + +Example: + + npm run subscription diff --git a/pubsub/iam.js b/pubsub/iam.js new file mode 100644 index 00000000000..eba8a1536cb --- /dev/null +++ b/pubsub/iam.js @@ -0,0 +1,165 @@ +// Copyright 2016, Google, Inc. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +'use strict'; + +var async = require('async'); +var utils = require('./subscription'); +var createTopic = utils.createTopic; +var subscribe = utils.subscribe; + +// [START get_topic_policy] +function getTopicPolicy(topic, callback) { + // Retrieve the IAM policy for the provided topic + topic.iam.getPolicy(callback); +} +// [END get_topic_policy] + +// [START get_subscription_policy] +function getSubscriptionPolicy(subscription, callback) { + // Retrieve the IAM policy for the provided subscription + subscription.iam.getPolicy(callback); +} +// [END get_subscription_policy] + +// [START set_topic_policy] +function setTopicPolicy(topic, callback) { + // Policy update + var myPolicy = { + bindings: [ + { + role: 'roles/pubsub.subscriber', + members: ['serviceAccount:myotherproject@appspot.gserviceaccount.com'] + } + ] + }; + + // Retrieve the IAM policy for the provided topic + topic.iam.setPolicy(myPolicy, callback); +} +// [END set_topic_policy] + +// [START set_subscription_policy] +function setSubscriptionPolicy(subscription, callback) { + // Policy update + var myPolicy = { + bindings: [ + { + role: 'roles/pubsub.subscriber', + members: ['serviceAccount:myotherproject@appspot.gserviceaccount.com'] + } + ] + }; + + // Retrieve the IAM policy for the provided subscription + subscription.iam.setPolicy(myPolicy, callback); +} +// [END set_subscription_policy] + +// [START test_topic_permissions] +function testTopicPermissions(topic, callback) { + var tests = [ + 'pubsub.topics.attachSubscription', + 'pubsub.topics.publish', + 'pubsub.topics.update' + ]; + + // Retrieve the IAM policy for the provided topic + topic.iam.testPermissions(tests, callback); +} +// [END test_topic_permissions] + +// [START test_subscription_permissions] +function testSubscriptionPermissions(subscription, callback) { + var tests = [ + 'pubsub.subscriptions.consume', + 'pubsub.subscriptions.update' + ]; + + // Retrieve the IAM policy for the provided subscription + subscription.iam.testPermissions(tests, callback); +} +// [END test_subscription_permissions] + +exports.getTopicPolicy = getTopicPolicy; +exports.getSubscriptionPolicy = getSubscriptionPolicy; +exports.setTopicPolicy = setTopicPolicy; +exports.setSubscriptionPolicy = setSubscriptionPolicy; +exports.testTopicPermissions = testTopicPermissions; +exports.testSubscriptionPermissions = testSubscriptionPermissions; + +if (module === require.main) { + var _subscription; + var _topic; + async.waterfall([ + function (cb) { + console.log('create topic...'); + createTopic(cb); + }, + function (topic, apiResponse, cb) { + _topic = topic; + console.log('created topic'); + console.log('get topic IAM policy...'); + getTopicPolicy(topic, cb); + }, + function (policy, apiResponse, cb) { + console.log('got topic policy', policy); + // IAM server is down? + // console.log('updating topic policy...'); + // setTopicPolicy(_topic, cb); + // }, + // function (policy, apiResponse, cb) { + // console.log('updated topic policy', policy); + console.log('testing topic permissions...'); + testTopicPermissions(_topic, cb); + }, + function (permissions, apiResponse, cb) { + console.log('tested topic permissions', permissions); + console.log('create subscription...'); + subscribe(cb); + }, + function (subscription, apiResponse, cb) { + _subscription = subscription; + console.log('created subscription'); + console.log('get subscription IAM policy...'); + getSubscriptionPolicy(subscription, cb); + }, + function (policy, apiResponse, cb) { + console.log('got subscription policy', policy); + // IAM server is down? + // console.log('updating subscription policy...'); + // setSubscriptionPolicy(_subscription, cb); + // }, + // function (policy, apiResponse, cb) { + // console.log('updated subscription policy', policy); + console.log('testing subscription permissions...'); + testSubscriptionPermissions(_subscription, cb); + }, + function (permissions, apiResponse, cb) { + console.log('tested subscription permissions', permissions); + console.log('deleting subscription...'); + _subscription.delete(cb); + }, + function (apiResponse, cb) { + console.log('deleted subscription'); + console.log('deleting topic...'); + _topic.delete(cb); + } + ], function (err) { + if (err) { + console.error(err); + } else { + console.log('deleted topic'); + } + }); +} diff --git a/pubsub/package.json b/pubsub/package.json new file mode 100644 index 00000000000..564b9a5d29f --- /dev/null +++ b/pubsub/package.json @@ -0,0 +1,17 @@ +{ + "name": "nodejs-docs-samples-pubsub", + "description": "Node.js samples for Google Cloud Pub/Sub.", + "version": "0.0.1", + "private": true, + "license": "Apache Version 2.0", + "engines": { + "node": ">=0.10.x" + }, + "scripts": { + "iam": "node iam.js", + "subscription": "node subscription.js" + }, + "dependencies": { + "gcloud": "^0.27.0" + } +} diff --git a/pubsub/subscription.js b/pubsub/subscription.js new file mode 100644 index 00000000000..eb793330027 --- /dev/null +++ b/pubsub/subscription.js @@ -0,0 +1,225 @@ +// Copyright 2016, Google, Inc. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +'use strict'; + +var async = require('async'); + +// [START auth] +// You must set these environment variables to run this sample +var projectId = process.env.TEST_PROJECT_ID; +var keyFilename = process.env.GOOGLE_APPLICATION_CREDENTIALS; + +// If you don't set the environment variables, then you can modify this file +// to set the values +projectId = projectId || ''; +keyFilename = keyFilename || '/path/to/keyfile.json'; + +// [START require] +// Provide projectId and authentication to gcloud +var gcloud = require('gcloud')({ + projectId: projectId, + keyFilename: keyFilename +}); + +// Get a reference to the pubsub component +var pubsub = gcloud.pubsub(); +// [END auth] + +// [START create_topic] +function createTopic(callback) { + var topicName = 'messageCenter'; + + var topic = pubsub.topic(topicName); + + // Get the topic if it exists. Create it if it does not exist. + topic.get({ + autoCreate: true + }, callback); +} +// [END create_topic] + +// [START publish] +function publish(callback) { + var topicName = 'messageCenter'; + + // Grab a reference to an existing topic + var topic = pubsub.topic(topicName); + + // Publish a message to the topic + topic.publish({ + data: 'Hello, world!' + }, callback); +} +// [END publish] + +// [START list_topics] +function getAllTopics(callback) { + // Grab paginated topics + pubsub.getTopics(function (err, topics, nextQuery) { + // Quit on error + if (err) { + return callback(err); + } + // There is another page of topics + if (nextQuery) { + // Grab the remaining pages of topics recursively + return getAllTopics(function (err, _topics) { + if (_topics) { + topics = topics.concat(_topics); + } + callback(err, topics); + }); + } + // Last page of topics + return callback(err, topics); + }); +} +// [END list_topics] + +// [START get_all_subscriptions] +function getAllSubscriptions(callback) { + // Grab paginated subscriptions + pubsub.getSubscriptions(function (err, subscriptions, nextQuery) { + // Quit on error + if (err) { + return callback(err); + } + // There is another page of subscriptions + if (nextQuery) { + // Grab the remaining pages of subscriptions recursively + return getAllSubscriptions(function (err, _subscriptions) { + if (_subscriptions) { + subscriptions = subscriptions.concat(_subscriptions); + } + callback(err, subscriptions); + }); + } + // Last page of subscriptions + return callback(err, subscriptions); + }); +} +// [END get_all_subscriptions] + +// [START create_subscription] +function subscribe(callback) { + var topicName = 'messageCenter'; + var subscriptionName = 'newMessages'; + + var options = { + reuseExisting: true + }; + pubsub.subscribe(topicName, subscriptionName, options, callback); +} +// [END create_subscription] + +// [START pull_messages] +function pullMessages(callback) { + // Create a topic + createTopic(function (err) { + if (err) { + return callback(err); + } + // Create a subscription to the topic + subscribe(function (err, subscription) { + if (err) { + return callback(err); + } + var options = { + // Limit the amount of messages pulled. + maxResults: 100, + // If set, the system will respond immediately. Otherwise, wait until + // new messages are available. Returns if timeout is reached. + returnImmediately: false + }; + // Pull any messages on the subscription + subscription.pull(options, function (err, messages) { + if (err) { + return callback(err); + } + + // Do something with messages here? + + // Acknowledge messages + subscription.ack(messages.map(function (message) { + return message.ackId; + }), function (err) { + if (err) { + return callback(err); + } + callback(null, messages); + }); + }); + }); + }); +} +// [END pull_messages] + +exports.createTopic = createTopic; +exports.publish = publish; +exports.getAllTopics = getAllTopics; +exports.getAllSubscriptions = getAllSubscriptions; +exports.pullMessages = pullMessages; +exports.subscribe = subscribe; +exports.pubsub = pubsub; + +if (module === require.main) { + var _subscription; + var _topic; + async.waterfall([ + function (cb) { + console.log('create topic...'); + createTopic(cb); + }, + function (topic, apiResponse, cb) { + _topic = topic; + console.log('created topic'); + console.log('create subscription...'); + subscribe(cb); + }, + function (subscription, apiResponse, cb) { + _subscription = subscription; + console.log('created subscription'); + console.log('list all subscriptions...'); + getAllSubscriptions(cb); + }, + function (subscriptions, cb) { + console.log('got all subscriptions'); + console.log('publishing a message...'); + publish(cb); + }, + function (messageIds, apiResponse, cb) { + console.log('published message'); + console.log('pulling messages...'); + pullMessages(cb); + }, + function (messages, cb) { + console.log('got messages', messages.map(function (message) { + return message.data; + })); + console.log('deleting subscription...'); + _subscription.delete(cb); + }, + function (apiResponse, cb) { + console.log('deleted subscription'); + console.log('deleting topic...'); + _topic.delete(cb); + } + ], function (err) { + if (err) { + console.error(err); + } else { + console.log('deleted topic'); + } + }); +}