Skip to content

Commit

Permalink
[event-hubs] adds IoT Hub connection string conversion with websocket…
Browse files Browse the repository at this point in the history
…s sample (#16589)

As requested by #7060 (comment), this PR adds a sample for converting an IoT Hub connection string to an Event Hubs-compatible connection string using web sockets.

The main difference is that websocket options need to be specified, and the port used to connect changed from 5671 to 443.

/cc @wiegvlieg - let me know if this helps or if you have more questions about how this sample works.
  • Loading branch information
chradek authored Aug 24, 2021
1 parent 5944d3a commit b497c95
Show file tree
Hide file tree
Showing 6 changed files with 527 additions and 16 deletions.
1 change: 1 addition & 0 deletions sdk/eventhub/event-hubs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
},
"skip": [
"iothubConnectionString.js",
"iothubConnectionStringWebsockets.js",
"useWithIotHub.js",
"usingAadAuth.js"
],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT Licence.

/**
* @summary Demonstrates how to convert an IoT Hub connection string to an Event Hubs connection string that points to the built-in messaging endpoint using WebSockets.
*/

/*
* The Event Hubs connection string is then used with the EventHubConsumerClient to receive events.
*
* More information about the built-in messaging endpoint can be found at:
* https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-messages-read-builtin
*/

import * as crypto from "crypto";
import { Buffer } from "buffer";
import { AmqpError, Connection, ReceiverEvents, parseConnectionString } from "rhea-promise";
import rheaPromise from "rhea-promise";
import { EventHubConsumerClient, earliestEventPosition } from "@azure/event-hubs";
import WebSocket from "ws";

// Load the .env file if it exists
import * as dotenv from "dotenv";
dotenv.config();

/**
* Type guard for AmqpError.
* @param err - An unknown error.
*/
function isAmqpError(err: any): err is AmqpError {
return rheaPromise.isAmqpError(err);
}

const consumerGroup = process.env["CONSUMER_GROUP_NAME"] || "";

// This code is modified from https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-security#security-tokens.
function generateSasToken(
resourceUri: string,
signingKey: string,
policyName: string,
expiresInMins: number
): string {
resourceUri = encodeURIComponent(resourceUri);

const expiresInSeconds = Math.ceil(Date.now() / 1000 + expiresInMins * 60);
const toSign = resourceUri + "\n" + expiresInSeconds;

// Use the crypto module to create the hmac.
const hmac = crypto.createHmac("sha256", Buffer.from(signingKey, "base64"));
hmac.update(toSign);
const base64UriEncoded = encodeURIComponent(hmac.digest("base64"));

// Construct authorization string.
return `SharedAccessSignature sr=${resourceUri}&sig=${base64UriEncoded}&se=${expiresInSeconds}&skn=${policyName}`;
}

/**
* Converts an IotHub Connection string into an Event Hubs-compatible connection string.
* @param connectionString - An IotHub connection string in the format:
* `"HostName=<your-iot-hub>.azure-devices.net;SharedAccessKeyName=<KeyName>;SharedAccessKey=<Key>"`
* @returns An Event Hubs-compatible connection string in the format:
* `"Endpoint=sb://<hostname>;EntityPath=<your-iot-hub>;SharedAccessKeyName=<KeyName>;SharedAccessKey=<Key>"`
*/
async function convertIotHubToEventHubsConnectionString(connectionString: string): Promise<string> {
const { HostName, SharedAccessKeyName, SharedAccessKey } = parseConnectionString<{
HostName: string;
SharedAccessKeyName: string;
SharedAccessKey: string;
}>(connectionString);

// Verify that the required info is in the connection string.
if (!HostName || !SharedAccessKey || !SharedAccessKeyName) {
throw new Error(`Invalid IotHub connection string.`);
}

//Extract the IotHub name from the hostname.
const [iotHubName] = HostName.split(".");

if (!iotHubName) {
throw new Error(`Unable to extract the IotHub name from the connection string.`);
}

// Generate a token to authenticate to the service.
// The code for generateSasToken can be found at https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-security#security-tokens
const token = generateSasToken(
`${HostName}/messages/events`,
SharedAccessKey,
SharedAccessKeyName,
5 // token expires in 5 minutes
);

const connection = new Connection({
transport: "tls",
host: HostName,
hostname: HostName,
username: `${SharedAccessKeyName}@sas.root.${iotHubName}`,
port: 443,
reconnect: false,
password: token,
webSocketOptions: {
webSocket: WebSocket,
protocol: ["AMQPWSB10"],
url: `wss://${HostName}:${443}/$servicebus/websocket`
}
});
await connection.open();

// Create the receiver that will trigger a redirect error.
const receiver = await connection.createReceiver({
source: { address: `amqps://${HostName}/messages/events/$management` }
});

return new Promise((resolve, reject) => {
receiver.on(ReceiverEvents.receiverError, (context) => {
const error = context.receiver && context.receiver.error;
if (isAmqpError(error) && error.condition === "amqp:link:redirect") {
const hostname = error.info && error.info.hostname;
if (!hostname) {
reject(error);
} else {
resolve(
`Endpoint=sb://${hostname}/;EntityPath=${iotHubName};SharedAccessKeyName=${SharedAccessKeyName};SharedAccessKey=${SharedAccessKey}`
);
}
} else {
reject(error);
}
connection.close().catch(() => {
/* ignore error */
});
});
});
}

export async function main() {
console.log(`Running iothubConnectionString sample`);

const eventHubsConnectionString = await convertIotHubToEventHubsConnectionString(
"HostName=<your-iot-hub>.azure-devices.net;SharedAccessKeyName=<KeyName>;SharedAccessKey=<Key>"
);

const consumerClient = new EventHubConsumerClient(consumerGroup, eventHubsConnectionString);

const subscription = consumerClient.subscribe(
{
// The callback where you add your code to process incoming events
processEvents: async (events, context) => {
for (const event of events) {
console.log(
`Received event: '${event.body}' from partition: '${context.partitionId}' and consumer group: '${context.consumerGroup}'`
);
}
},
processError: async (err, context) => {
console.log(`Error on partition "${context.partitionId}" : ${err}`);
}
},
{ startPosition: earliestEventPosition }
);

// Wait for a bit before cleaning up the sample
setTimeout(async () => {
await subscription.close();
await consumerClient.close();
console.log(`Exiting iothubConnectionString sample`);
}, 30 * 1000);
}

main().catch((error) => {
console.error("Error running sample:", error);
});
18 changes: 10 additions & 8 deletions sdk/eventhub/event-hubs/samples/v5/javascript/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@ urlFragment: event-hubs-javascript

These sample programs show how to use the JavaScript client libraries for Azure Event Hubs in some common scenarios.

| **File Name** | **Description** |
| --------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------- |
| [sendEvents.js][sendevents] | Demonstrates how to send events to an Event Hub. |
| [receiveEvents.js][receiveevents] | Demonstrates how to use the EventHubConsumerClient to process events from all partitions of a consumer group in an Event Hub. |
| [usingAadAuth.js][usingaadauth] | Demonstrates how to instantiate EventHubsClient using AAD token credentials obtained from using service principal secrets. |
| [iothubConnectionString.js][iothubconnectionstring] | Demonstrates how to convert an IoT Hub connection string to an Event Hubs connection string that points to the built-in messaging endpoint. |
| [useWithIotHub.js][usewithiothub] | Demonstrates how to use the EventHubConsumerClient to receive messages from an IoT Hub. |
| [websockets.js][websockets] | Demonstrates how to connect to Azure Event Hubs over websockets to work over an HTTP proxy. |
| **File Name** | **Description** |
| ----------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| [sendEvents.js][sendevents] | Demonstrates how to send events to an Event Hub. |
| [receiveEvents.js][receiveevents] | Demonstrates how to use the EventHubConsumerClient to process events from all partitions of a consumer group in an Event Hub. |
| [usingAadAuth.js][usingaadauth] | Demonstrates how to instantiate EventHubsClient using AAD token credentials obtained from using service principal secrets. |
| [iothubConnectionString.js][iothubconnectionstring] | Demonstrates how to convert an IoT Hub connection string to an Event Hubs connection string that points to the built-in messaging endpoint. |
| [iothubConnectionStringWebsockets.js][iothubconnectionstringwebsockets] | Demonstrates how to convert an IoT Hub connection string to an Event Hubs connection string that points to the built-in messaging endpoint using WbeSockets. |
| [useWithIotHub.js][usewithiothub] | Demonstrates how to use the EventHubConsumerClient to receive messages from an IoT Hub. |
| [websockets.js][websockets] | Demonstrates how to connect to Azure Event Hubs over websockets to work over an HTTP proxy. |

## Prerequisites

Expand Down Expand Up @@ -65,6 +66,7 @@ Take a look at our [API Documentation][apiref] for more information about the AP
[receiveevents]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/eventhub/event-hubs/samples/v5/javascript/receiveEvents.js
[usingaadauth]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/eventhub/event-hubs/samples/v5/javascript/usingAadAuth.js
[iothubconnectionstring]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/eventhub/event-hubs/samples/v5/javascript/iothubConnectionString.js
[iothubconnectionstringwebsockets]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/eventhub/event-hubs/samples/v5/javascript/iothubConnectionStringWebsockets.js
[usewithiothub]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/eventhub/event-hubs/samples/v5/javascript/useWithIotHub.js
[websockets]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/eventhub/event-hubs/samples/v5/javascript/websockets.js
[apiref]: https://docs.microsoft.com/javascript/api/@azure/event-hubs
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT Licence.

/**
* @summary Demonstrates how to convert an IoT Hub connection string to an Event Hubs connection string that points to the built-in messaging endpoint using WebSockets.
*/

/*
* The Event Hubs connection string is then used with the EventHubConsumerClient to receive events.
*
* More information about the built-in messaging endpoint can be found at:
* https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-messages-read-builtin
*/

const crypto = require("crypto");
const { Buffer } = require("buffer");
const { Connection, ReceiverEvents, parseConnectionString } = require("rhea-promise");
const rheaPromise = require("rhea-promise");
const { EventHubConsumerClient, earliestEventPosition } = require("@azure/event-hubs");
const WebSocket = require("ws");

// Load the .env file if it exists
const dotenv = require("dotenv");
dotenv.config();

/**
* Type guard for AmqpError.
* @param err - An unknown error.
*/
function isAmqpError(err) {
return rheaPromise.isAmqpError(err);
}

const consumerGroup = process.env["CONSUMER_GROUP_NAME"] || "";

// This code is modified from https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-security#security-tokens.
function generateSasToken(resourceUri, signingKey, policyName, expiresInMins) {
resourceUri = encodeURIComponent(resourceUri);

const expiresInSeconds = Math.ceil(Date.now() / 1000 + expiresInMins * 60);
const toSign = resourceUri + "\n" + expiresInSeconds;

// Use the crypto module to create the hmac.
const hmac = crypto.createHmac("sha256", Buffer.from(signingKey, "base64"));
hmac.update(toSign);
const base64UriEncoded = encodeURIComponent(hmac.digest("base64"));

// Construct authorization string.
return `SharedAccessSignature sr=${resourceUri}&sig=${base64UriEncoded}&se=${expiresInSeconds}&skn=${policyName}`;
}

/**
* Converts an IotHub Connection string into an Event Hubs-compatible connection string.
* @param connectionString - An IotHub connection string in the format:
* `"HostName=<your-iot-hub>.azure-devices.net;SharedAccessKeyName=<KeyName>;SharedAccessKey=<Key>"`
* @returns An Event Hubs-compatible connection string in the format:
* `"Endpoint=sb://<hostname>;EntityPath=<your-iot-hub>;SharedAccessKeyName=<KeyName>;SharedAccessKey=<Key>"`
*/
async function convertIotHubToEventHubsConnectionString(connectionString) {
const { HostName, SharedAccessKeyName, SharedAccessKey } = parseConnectionString(
connectionString
);

// Verify that the required info is in the connection string.
if (!HostName || !SharedAccessKey || !SharedAccessKeyName) {
throw new Error(`Invalid IotHub connection string.`);
}

//Extract the IotHub name from the hostname.
const [iotHubName] = HostName.split(".");

if (!iotHubName) {
throw new Error(`Unable to extract the IotHub name from the connection string.`);
}

// Generate a token to authenticate to the service.
// The code for generateSasToken can be found at https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-security#security-tokens
const token = generateSasToken(
`${HostName}/messages/events`,
SharedAccessKey,
SharedAccessKeyName,
5 // token expires in 5 minutes
);

const connection = new Connection({
transport: "tls",
host: HostName,
hostname: HostName,
username: `${SharedAccessKeyName}@sas.root.${iotHubName}`,
port: 443,
reconnect: false,
password: token,
webSocketOptions: {
webSocket: WebSocket,
protocol: ["AMQPWSB10"],
url: `wss://${HostName}:${443}/$servicebus/websocket`
}
});
await connection.open();

// Create the receiver that will trigger a redirect error.
const receiver = await connection.createReceiver({
source: { address: `amqps://${HostName}/messages/events/$management` }
});

return new Promise((resolve, reject) => {
receiver.on(ReceiverEvents.receiverError, (context) => {
const error = context.receiver && context.receiver.error;
if (isAmqpError(error) && error.condition === "amqp:link:redirect") {
const hostname = error.info && error.info.hostname;
if (!hostname) {
reject(error);
} else {
resolve(
`Endpoint=sb://${hostname}/;EntityPath=${iotHubName};SharedAccessKeyName=${SharedAccessKeyName};SharedAccessKey=${SharedAccessKey}`
);
}
} else {
reject(error);
}
connection.close().catch(() => {
/* ignore error */
});
});
});
}

async function main() {
console.log(`Running iothubConnectionString sample`);

const eventHubsConnectionString = await convertIotHubToEventHubsConnectionString(
"HostName=<your-iot-hub>.azure-devices.net;SharedAccessKeyName=<KeyName>;SharedAccessKey=<Key>"
);

const consumerClient = new EventHubConsumerClient(consumerGroup, eventHubsConnectionString);

const subscription = consumerClient.subscribe(
{
// The callback where you add your code to process incoming events
processEvents: async (events, context) => {
for (const event of events) {
console.log(
`Received event: '${event.body}' from partition: '${context.partitionId}' and consumer group: '${context.consumerGroup}'`
);
}
},
processError: async (err, context) => {
console.log(`Error on partition "${context.partitionId}" : ${err}`);
}
},
{ startPosition: earliestEventPosition }
);

// Wait for a bit before cleaning up the sample
setTimeout(async () => {
await subscription.close();
await consumerClient.close();
console.log(`Exiting iothubConnectionString sample`);
}, 30 * 1000);
}

main().catch((error) => {
console.error("Error running sample:", error);
});
Loading

0 comments on commit b497c95

Please sign in to comment.