Skip to content

Commit

Permalink
Add missing pub-sub samples (#503)
Browse files Browse the repository at this point in the history
* Add missing pub-sub samples

* Fix nits
  • Loading branch information
Ace Nassri authored Oct 24, 2017
1 parent 24e116e commit 8532458
Show file tree
Hide file tree
Showing 6 changed files with 259 additions and 30 deletions.
58 changes: 38 additions & 20 deletions pubsub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,25 +39,31 @@ View the [documentation][topics_0_docs] or the [source code][topics_0_code].
__Usage:__ `node topics.js --help`

```
topics.js <command>
Commands:
list Lists all topics in the current project.
create <topicName> Creates a new topic.
delete <topicName> Deletes a topic.
publish <topicName> <message> Publishes a message to a topic.
publish-ordered <topicName> <message> Publishes an ordered message to a topic.
get-policy <topicName> Gets the IAM policy for a topic.
set-policy <topicName> Sets the IAM policy for a topic.
test-permissions <topicName> Tests the permissions for a topic.
topics.js list Lists all topics in the current project.
topics.js create <topicName> Creates a new topic.
topics.js delete <topicName> Deletes a topic.
topics.js publish <topicName> <message> Publishes a message to a topic.
topics.js publish-batch <topicName> <message> Publishes messages to a topic using custom batching settings.
topics.js publish-ordered <topicName> <message> Publishes an ordered message to a topic.
topics.js get-policy <topicName> Gets the IAM policy for a topic.
topics.js set-policy <topicName> Sets the IAM policy for a topic.
topics.js test-permissions <topicName> Tests the permissions for a topic.
Options:
--help Show help [boolean]
--version Show version number [boolean]
--help Show help [boolean]
Examples:
node topics.js list
node topics.js create my-topic
node topics.js delete my-topic
node topics.js publish my-topic "Hello, world!"
node topics.js publish my-topic '{"data":"Hello, world!"}'
node topics.js publish-ordered my-topic "Hello, world!"
node topics.js publish-batch my-topic "Hello, world!" -w 1000
node topics.js get-policy greetings
node topics.js set-policy greetings
node topics.js test-permissions greetings
Expand All @@ -75,27 +81,39 @@ View the [documentation][subscriptions_1_docs] or the [source code][subscription
__Usage:__ `node subscriptions.js --help`

```
subscriptions.js <command>
Commands:
list [topicName] Lists all subscriptions in the current project, optionally filtering by a
topic.
create <topicName> <subscriptionName> Creates a new subscription.
create-push <topicName> <subscriptionName> Creates a new push subscription.
delete <subscriptionName> Deletes a subscription.
get <subscriptionName> Gets the metadata for a subscription.
listen <subscriptionName> Listens to messages for a subscription.
get-policy <subscriptionName> Gets the IAM policy for a subscription.
set-policy <subscriptionName> Sets the IAM policy for a subscription.
test-permissions <subscriptionName> Tests the permissions for a subscription.
subscriptions.js list [topicName] Lists all subscriptions in the current project,
optionally filtering by a topic.
subscriptions.js create <topicName> <subscriptionName> Creates a new subscription.
subscriptions.js create-flow <topicName> <subscriptionName> Creates a new subscription with flow-control limits,
which don't persist between subscriptions.
subscriptions.js create-push <topicName> <subscriptionName> Creates a new push subscription.
subscriptions.js modify-config <topicName> Modifies the configuration of an existing push
<subscriptionName> subscription.
subscriptions.js delete <subscriptionName> Deletes a subscription.
subscriptions.js get <subscriptionName> Gets the metadata for a subscription.
subscriptions.js listen-messages <subscriptionName> Listens to messages for a subscription.
subscriptions.js listen-errors <subscriptionName> Listens to messages and errors for a subscription.
subscriptions.js get-policy <subscriptionName> Gets the IAM policy for a subscription.
subscriptions.js set-policy <subscriptionName> Sets the IAM policy for a subscription.
subscriptions.js test-permissions <subscriptionName> Tests the permissions for a subscription.
Options:
--help Show help [boolean]
--version Show version number [boolean]
--help Show help [boolean]
Examples:
node subscriptions.js list
node subscriptions.js list my-topic
node subscriptions.js create my-topic worker-1
node subscriptions.js create-flow my-topic worker-1 -m 5
node subscriptions.js create-push my-topic worker-1
node subscriptions.js modify-config my-topic worker-1
node subscriptions.js get worker-1
node subscriptions.js listen-messages my-subscription
node subscriptions.js listen-errors my-subscription
node subscriptions.js delete worker-1
node subscriptions.js pull worker-1
node subscriptions.js get-policy worker-1
Expand Down
8 changes: 4 additions & 4 deletions pubsub/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
"test": "samples test run --cmd ava -- -T 30s --verbose system-test/*.test.js"
},
"dependencies": {
"@google-cloud/pubsub": "0.14.0",
"yargs": "8.0.2"
"@google-cloud/pubsub": "0.14.5",
"yargs": "10.0.3"
},
"devDependencies": {
"@google-cloud/nodejs-repo-tools": "1.4.17",
"@google-cloud/nodejs-repo-tools": "2.0.11",
"ava": "0.22.0",
"proxyquire": "1.8.0",
"sinon": "3.2.1"
"sinon": "4.0.1"
},
"cloud-repo-tools": {
"requiresKeyFile": true,
Expand Down
125 changes: 124 additions & 1 deletion pubsub/subscriptions.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,32 @@ function createSubscription (topicName, subscriptionName) {
}
// [END pubsub_create_subscription]

// [START pubsub_subscriber_flow_settings]
function createFlowControlledSubscription (topicName, subscriptionName, maxInProgress, maxBytes) {
// Instantiates a client
const pubsub = PubSub();

// References an existing topic, e.g. "my-topic"
const topic = pubsub.topic(topicName);

// Creates a new subscription, e.g. "my-new-subscription"
// Note that flow control configurations are not persistent
return topic.createSubscription(subscriptionName, {
flowControl: {
maxBytes: maxBytes,
maxMessages: maxInProgress
}
})
.then((results) => {
const subscription = results[0];

console.log(`Subscription ${subscription.name} created with a maximum of ${maxInProgress} unprocessed messages.`);

return subscription;
});
}
// [END pubsub_subscriber_flow_settings]

// [START pubsub_create_push_subscription]
function createPushSubscription (topicName, subscriptionName) {
// Instantiates a client
Expand Down Expand Up @@ -112,6 +138,28 @@ function createPushSubscription (topicName, subscriptionName) {
}
// [END pubsub_create_push_subscription]

// [START pubsub_modify_push_config]
function modifyPushConfig (topicName, subscriptionName, pushEndpoint) {
// Instantiates a client
const pubsub = PubSub();

// References an existing topic and subscription, e.g. "my-topic" > "my-subscription"
const topic = pubsub.topic(topicName);
const subscription = topic.subscription(subscriptionName);

const options = {
// Set to an HTTPS endpoint of your choice. If necessary, register
// (authorize) the domain on which the server is hosted.
pushEndpoint: `https://${pubsub.projectId}.appspot.com/push`
};

return subscription.modifyPushConfig(options)
.then((results) => {
console.log(`Modified push config for subscription ${subscription.name}.`);
});
}
// [END pubsub_modify_push_config]

// [START pubsub_delete_subscription]
function deleteSubscription (subscriptionName) {
// Instantiates a client
Expand Down Expand Up @@ -246,6 +294,42 @@ function listenForOrderedMessages (subscriptionName, timeout) {
}
// [END pubsub_listen_ordered_messages]

// [START pubsub_listen_errors]
function listenForErrors (subscriptionName, timeout) {
// Instantiates a client
const pubsub = PubSub();

// References an existing subscription, e.g. "my-subscription"
const subscription = pubsub.subscription(subscriptionName);

// Create an event handler to handle messages
const messageHandler = function (message) {
// Do something with the message
console.log(`Message: ${message}`);

// "Ack" (acknowledge receipt of) the message
message.ack();
};

// Create an event handler to handle errors
const errorHandler = function (error) {
// Do something with the error
console.error(`ERROR: ${error}`);
};

// Listen for new messages/errors until timeout is hit
return new Promise((resolve) => {
subscription.on(`message`, messageHandler);
subscription.on(`error`, errorHandler);
setTimeout(() => {
subscription.removeListener(`message`, messageHandler);
subscription.removeListener(`error`, errorHandler);
resolve();
}, timeout * 1000);
});
}
// [END pubsub_listen_errors]

// [START pubsub_get_subscription_policy]
function getSubscriptionPolicy (subscriptionName) {
// Instantiates a client
Expand Down Expand Up @@ -349,12 +433,35 @@ const cli = require(`yargs`)
{},
(opts) => createSubscription(opts.topicName, opts.subscriptionName)
)
.command(
`create-flow <topicName> <subscriptionName>`,
`Creates a new subscription with flow-control limits, which don't persist between subscriptions.`,
{
maxInProgress: {
alias: 'm',
type: 'number',
default: 0
},
maxBytes: {
alias: 'b',
type: 'number',
default: 0
}
},
(opts) => createFlowControlledSubscription(opts.topicName, opts.subscriptionName, opts.maxInProgress, opts.maxBytes)
)
.command(
`create-push <topicName> <subscriptionName>`,
`Creates a new push subscription.`,
{},
(opts) => createPushSubscription(opts.topicName, opts.subscriptionName)
)
.command(
`modify-config <topicName> <subscriptionName>`,
`Modifies the configuration of an existing push subscription.`,
{},
(opts) => modifyPushConfig(opts.topicName, opts.subscriptionName)
)
.command(
`delete <subscriptionName>`,
`Deletes a subscription.`,
Expand All @@ -368,7 +475,7 @@ const cli = require(`yargs`)
(opts) => getSubscription(opts.subscriptionName)
)
.command(
`listen <subscriptionName>`,
`listen-messages <subscriptionName>`,
`Listens to messages for a subscription.`,
{
timeout: {
Expand All @@ -379,6 +486,18 @@ const cli = require(`yargs`)
},
(opts) => listenForMessages(opts.subscriptionName, opts.timeout)
)
.command(
`listen-errors <subscriptionName>`,
`Listens to messages and errors for a subscription.`,
{
timeout: {
alias: 't',
type: 'number',
default: 10
}
},
(opts) => listenForErrors(opts.subscriptionName, opts.timeout)
)
.command(
`get-policy <subscriptionName>`,
`Gets the IAM policy for a subscription.`,
Expand All @@ -400,8 +519,12 @@ const cli = require(`yargs`)
.example(`node $0 list`)
.example(`node $0 list my-topic`)
.example(`node $0 create my-topic worker-1`)
.example(`node $0 create-flow my-topic worker-1 -m 5`)
.example(`node $0 create-push my-topic worker-1`)
.example(`node $0 modify-config my-topic worker-1`)
.example(`node $0 get worker-1`)
.example(`node $0 listen-messages my-subscription`)
.example(`node $0 listen-errors my-subscription`)
.example(`node $0 delete worker-1`)
.example(`node $0 pull worker-1`)
.example(`node $0 get-policy worker-1`)
Expand Down
28 changes: 25 additions & 3 deletions pubsub/system-test/subscriptions.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ const topicNameTwo = `nodejs-docs-samples-test-${uuid.v4()}`;
const subscriptionNameOne = `nodejs-docs-samples-test-sub-${uuid.v4()}`;
const subscriptionNameTwo = `nodejs-docs-samples-test-sub-${uuid.v4()}`;
const subscriptionNameThree = `nodejs-docs-samples-test-sub-${uuid.v4()}`;
const subscriptionNameFour = `nodejs-docs-samples-test-sub-${uuid.v4()}`;
const projectId = process.env.GCLOUD_PROJECT;
const fullTopicNameOne = `projects/${projectId}/topics/${topicNameOne}`;
const fullSubscriptionNameOne = `projects/${projectId}/subscriptions/${subscriptionNameOne}`;
const fullSubscriptionNameTwo = `projects/${projectId}/subscriptions/${subscriptionNameTwo}`;
const fullSubscriptionNameFour = `projects/${projectId}/subscriptions/${subscriptionNameFour}`;
const cmd = `node subscriptions.js`;

test.before(tools.checkCredentials);
Expand Down Expand Up @@ -81,6 +83,12 @@ test.serial(`should create a push subscription`, async (t) => {
}).start();
});

test.serial(`should modify the config of an existing push subscription`, async (t) => {
t.plan(1);
const output = await tools.runAsync(`${cmd} modify-config ${topicNameTwo} ${subscriptionNameTwo}`, cwd);
t.is(output, `Modified push config for subscription ${fullSubscriptionNameTwo}.`);
});

test.serial(`should get metadata for a subscription`, async (t) => {
const output = await tools.runAsync(`${cmd} get ${subscriptionNameOne}`, cwd);
const expected = `Subscription: ${fullSubscriptionNameOne}` +
Expand Down Expand Up @@ -111,9 +119,8 @@ test.serial(`should list subscriptions for a topic`, async (t) => {
});

test.serial(`should listen for messages`, async (t) => {
const expected = `Hello, world!`;
const messageIds = await pubsub.topic(topicNameOne).publisher().publish(Buffer.from(expected));
const output = await tools.runAsync(`${cmd} listen ${subscriptionNameOne}`, cwd);
const messageIds = await pubsub.topic(topicNameOne).publisher().publish(Buffer.from(`Hello, world!`));
const output = await tools.runAsync(`${cmd} listen-messages ${subscriptionNameOne}`, cwd);
t.true(output.includes(`Received message ${messageIds[0]}:`));
});

Expand Down Expand Up @@ -148,6 +155,11 @@ test.serial(`should listen for ordered messages`, async (t) => {
});
});

test.serial(`should listen for error messages`, async (t) => {
const output = await tools.runAsyncWithIO(`${cmd} listen-errors nonexistent-subscription -t 3`, cwd);
t.true(output.stderr.includes(`Resource not found`));
});

test.serial(`should set the IAM policy for a subscription`, async (t) => {
await tools.runAsync(`${cmd} set-policy ${subscriptionNameOne}`, cwd);
const results = await pubsub.subscription(subscriptionNameOne).iam.getPolicy();
Expand Down Expand Up @@ -185,3 +197,13 @@ test.serial(`should delete a subscription`, async (t) => {
assert(subscriptions.every((s) => s.name !== fullSubscriptionNameOne));
}).start();
});

test.serial(`should create a subscription with flow control`, async (t) => {
t.plan(1);
const output = await tools.runAsync(`${cmd} create-flow ${topicNameTwo} ${subscriptionNameFour} -m 5 -b 1024`, cwd);
t.is(output, `Subscription ${fullSubscriptionNameFour} created with a maximum of 5 unprocessed messages.`);
await tools.tryTest(async (assert) => {
const [subscriptions] = await pubsub.topic(topicNameTwo).getSubscriptions();
assert(subscriptions.some((s) => s.name === fullSubscriptionNameFour));
}).start();
});
16 changes: 16 additions & 0 deletions pubsub/system-test/topics.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const topicNameOne = `nodejs-docs-samples-test-${uuid.v4()}`;
const topicNameTwo = `nodejs-docs-samples-test-${uuid.v4()}`;
const subscriptionNameOne = `nodejs-docs-samples-test-${uuid.v4()}`;
const subscriptionNameTwo = `nodejs-docs-samples-test-${uuid.v4()}`;
const subscriptionNameThree = `nodejs-docs-samples-test-${uuid.v4()}`;
const projectId = process.env.GCLOUD_PROJECT;
const fullTopicNameOne = `projects/${projectId}/topics/${topicNameOne}`;
const expectedMessage = { data: `Hello, world!` };
Expand All @@ -48,6 +49,9 @@ test.after.always(async () => {
try {
await pubsub.subscription(subscriptionNameTwo).delete();
} catch (err) {} // ignore error
try {
await pubsub.subscription(subscriptionNameThree).delete();
} catch (err) {} // ignore error
try {
await pubsub.topic(topicNameTwo).delete();
} catch (err) {} // ignore error
Expand Down Expand Up @@ -131,6 +135,18 @@ test.serial(`should publish ordered messages`, async (t) => {
await topics.publishOrderedMessage(topicNameTwo, expectedMessage.data);
});

test.serial(`should publish with specific batch settings`, async (t) => {
t.plan(2);
const expectedWait = 1000;
const [subscription] = await pubsub.topic(topicNameOne).createSubscription(subscriptionNameThree);
const startTime = Date.now();
await tools.runAsync(`${cmd} publish-batch ${topicNameOne} "${expectedMessage.data}" -w ${expectedWait}`, cwd);
const receivedMessage = await _pullOneMessage(subscription);
const publishTime = Date.parse(receivedMessage.publishTime);
t.is(receivedMessage.data.toString(), expectedMessage.data);
t.true(publishTime - startTime > expectedWait);
});

test.serial(`should set the IAM policy for a topic`, async (t) => {
await tools.runAsync(`${cmd} set-policy ${topicNameOne}`, cwd);
const results = await pubsub.topic(topicNameOne).iam.getPolicy();
Expand Down
Loading

0 comments on commit 8532458

Please sign in to comment.