From b5e865afb6a6ab10a14bb6fea7ba384426819e51 Mon Sep 17 00:00:00 2001 From: chradek <51000525+chradek@users.noreply.github.com> Date: Mon, 23 Aug 2021 19:14:54 -0700 Subject: [PATCH] [event-hubs] adds IoT Hub connection string conversion with websockets sample (#16589) As requested by https://github.com/Azure/azure-sdk-for-js/pull/7060#issuecomment-887480513, this PR adds a sample for converting an IoT Hub connection string to an Event Hubs-compatible connection string using web sockets. The main difference is that websocket options need to be specified, and the port used to connect changed from 5671 to 443. /cc @wiegvlieg - let me know if this helps or if you have more questions about how this sample works. --- sdk/eventhub/event-hubs/package.json | 1 + .../iothubConnectionStringWebsockets.ts | 171 ++++++++++++++++++ .../samples/v5/javascript/README.md | 18 +- .../iothubConnectionStringWebsockets.js | 164 +++++++++++++++++ .../samples/v5/typescript/README.md | 18 +- .../src/iothubConnectionStringWebsockets.ts | 171 ++++++++++++++++++ 6 files changed, 527 insertions(+), 16 deletions(-) create mode 100644 sdk/eventhub/event-hubs/samples-dev/iothubConnectionStringWebsockets.ts create mode 100644 sdk/eventhub/event-hubs/samples/v5/javascript/iothubConnectionStringWebsockets.js create mode 100644 sdk/eventhub/event-hubs/samples/v5/typescript/src/iothubConnectionStringWebsockets.ts diff --git a/sdk/eventhub/event-hubs/package.json b/sdk/eventhub/event-hubs/package.json index 38fe56bbe635..8a35f1fbb536 100644 --- a/sdk/eventhub/event-hubs/package.json +++ b/sdk/eventhub/event-hubs/package.json @@ -95,6 +95,7 @@ }, "skip": [ "iothubConnectionString.js", + "iothubConnectionStringWebsockets.js", "useWithIotHub.js", "usingAadAuth.js" ], diff --git a/sdk/eventhub/event-hubs/samples-dev/iothubConnectionStringWebsockets.ts b/sdk/eventhub/event-hubs/samples-dev/iothubConnectionStringWebsockets.ts new file mode 100644 index 000000000000..2af2dd8ac011 --- /dev/null +++ b/sdk/eventhub/event-hubs/samples-dev/iothubConnectionStringWebsockets.ts @@ -0,0 +1,171 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT Licence. + +/** + * @summary Demonstrates how to convert an IoT Hub connection string to an Event Hubs connection string that points to the built-in messaging endpoint using WebSockets. + */ + +/* + * The Event Hubs connection string is then used with the EventHubConsumerClient to receive events. + * + * More information about the built-in messaging endpoint can be found at: + * https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-messages-read-builtin + */ + +import * as crypto from "crypto"; +import { Buffer } from "buffer"; +import { AmqpError, Connection, ReceiverEvents, parseConnectionString } from "rhea-promise"; +import rheaPromise from "rhea-promise"; +import { EventHubConsumerClient, earliestEventPosition } from "@azure/event-hubs"; +import WebSocket from "ws"; + +// Load the .env file if it exists +import * as dotenv from "dotenv"; +dotenv.config(); + +/** + * Type guard for AmqpError. + * @param err - An unknown error. + */ +function isAmqpError(err: any): err is AmqpError { + return rheaPromise.isAmqpError(err); +} + +const consumerGroup = process.env["CONSUMER_GROUP_NAME"] || ""; + +// This code is modified from https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-security#security-tokens. +function generateSasToken( + resourceUri: string, + signingKey: string, + policyName: string, + expiresInMins: number +): string { + resourceUri = encodeURIComponent(resourceUri); + + const expiresInSeconds = Math.ceil(Date.now() / 1000 + expiresInMins * 60); + const toSign = resourceUri + "\n" + expiresInSeconds; + + // Use the crypto module to create the hmac. + const hmac = crypto.createHmac("sha256", Buffer.from(signingKey, "base64")); + hmac.update(toSign); + const base64UriEncoded = encodeURIComponent(hmac.digest("base64")); + + // Construct authorization string. + return `SharedAccessSignature sr=${resourceUri}&sig=${base64UriEncoded}&se=${expiresInSeconds}&skn=${policyName}`; +} + +/** + * Converts an IotHub Connection string into an Event Hubs-compatible connection string. + * @param connectionString - An IotHub connection string in the format: + * `"HostName=.azure-devices.net;SharedAccessKeyName=;SharedAccessKey="` + * @returns An Event Hubs-compatible connection string in the format: + * `"Endpoint=sb://;EntityPath=;SharedAccessKeyName=;SharedAccessKey="` + */ +async function convertIotHubToEventHubsConnectionString(connectionString: string): Promise { + const { HostName, SharedAccessKeyName, SharedAccessKey } = parseConnectionString<{ + HostName: string; + SharedAccessKeyName: string; + SharedAccessKey: string; + }>(connectionString); + + // Verify that the required info is in the connection string. + if (!HostName || !SharedAccessKey || !SharedAccessKeyName) { + throw new Error(`Invalid IotHub connection string.`); + } + + //Extract the IotHub name from the hostname. + const [iotHubName] = HostName.split("."); + + if (!iotHubName) { + throw new Error(`Unable to extract the IotHub name from the connection string.`); + } + + // Generate a token to authenticate to the service. + // The code for generateSasToken can be found at https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-security#security-tokens + const token = generateSasToken( + `${HostName}/messages/events`, + SharedAccessKey, + SharedAccessKeyName, + 5 // token expires in 5 minutes + ); + + const connection = new Connection({ + transport: "tls", + host: HostName, + hostname: HostName, + username: `${SharedAccessKeyName}@sas.root.${iotHubName}`, + port: 443, + reconnect: false, + password: token, + webSocketOptions: { + webSocket: WebSocket, + protocol: ["AMQPWSB10"], + url: `wss://${HostName}:${443}/$servicebus/websocket` + } + }); + await connection.open(); + + // Create the receiver that will trigger a redirect error. + const receiver = await connection.createReceiver({ + source: { address: `amqps://${HostName}/messages/events/$management` } + }); + + return new Promise((resolve, reject) => { + receiver.on(ReceiverEvents.receiverError, (context) => { + const error = context.receiver && context.receiver.error; + if (isAmqpError(error) && error.condition === "amqp:link:redirect") { + const hostname = error.info && error.info.hostname; + if (!hostname) { + reject(error); + } else { + resolve( + `Endpoint=sb://${hostname}/;EntityPath=${iotHubName};SharedAccessKeyName=${SharedAccessKeyName};SharedAccessKey=${SharedAccessKey}` + ); + } + } else { + reject(error); + } + connection.close().catch(() => { + /* ignore error */ + }); + }); + }); +} + +export async function main() { + console.log(`Running iothubConnectionString sample`); + + const eventHubsConnectionString = await convertIotHubToEventHubsConnectionString( + "HostName=.azure-devices.net;SharedAccessKeyName=;SharedAccessKey=" + ); + + const consumerClient = new EventHubConsumerClient(consumerGroup, eventHubsConnectionString); + + const subscription = consumerClient.subscribe( + { + // The callback where you add your code to process incoming events + processEvents: async (events, context) => { + for (const event of events) { + console.log( + `Received event: '${event.body}' from partition: '${context.partitionId}' and consumer group: '${context.consumerGroup}'` + ); + } + }, + processError: async (err, context) => { + console.log(`Error on partition "${context.partitionId}" : ${err}`); + } + }, + { startPosition: earliestEventPosition } + ); + + // Wait for a bit before cleaning up the sample + setTimeout(async () => { + await subscription.close(); + await consumerClient.close(); + console.log(`Exiting iothubConnectionString sample`); + }, 30 * 1000); +} + +main().catch((error) => { + console.error("Error running sample:", error); +}); diff --git a/sdk/eventhub/event-hubs/samples/v5/javascript/README.md b/sdk/eventhub/event-hubs/samples/v5/javascript/README.md index 3a168874e742..e9722f4cc1a2 100644 --- a/sdk/eventhub/event-hubs/samples/v5/javascript/README.md +++ b/sdk/eventhub/event-hubs/samples/v5/javascript/README.md @@ -12,14 +12,15 @@ urlFragment: event-hubs-javascript These sample programs show how to use the JavaScript client libraries for Azure Event Hubs in some common scenarios. -| **File Name** | **Description** | -| --------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------- | -| [sendEvents.js][sendevents] | Demonstrates how to send events to an Event Hub. | -| [receiveEvents.js][receiveevents] | Demonstrates how to use the EventHubConsumerClient to process events from all partitions of a consumer group in an Event Hub. | -| [usingAadAuth.js][usingaadauth] | Demonstrates how to instantiate EventHubsClient using AAD token credentials obtained from using service principal secrets. | -| [iothubConnectionString.js][iothubconnectionstring] | Demonstrates how to convert an IoT Hub connection string to an Event Hubs connection string that points to the built-in messaging endpoint. | -| [useWithIotHub.js][usewithiothub] | Demonstrates how to use the EventHubConsumerClient to receive messages from an IoT Hub. | -| [websockets.js][websockets] | Demonstrates how to connect to Azure Event Hubs over websockets to work over an HTTP proxy. | +| **File Name** | **Description** | +| ----------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------ | +| [sendEvents.js][sendevents] | Demonstrates how to send events to an Event Hub. | +| [receiveEvents.js][receiveevents] | Demonstrates how to use the EventHubConsumerClient to process events from all partitions of a consumer group in an Event Hub. | +| [usingAadAuth.js][usingaadauth] | Demonstrates how to instantiate EventHubsClient using AAD token credentials obtained from using service principal secrets. | +| [iothubConnectionString.js][iothubconnectionstring] | Demonstrates how to convert an IoT Hub connection string to an Event Hubs connection string that points to the built-in messaging endpoint. | +| [iothubConnectionStringWebsockets.js][iothubconnectionstringwebsockets] | Demonstrates how to convert an IoT Hub connection string to an Event Hubs connection string that points to the built-in messaging endpoint using WbeSockets. | +| [useWithIotHub.js][usewithiothub] | Demonstrates how to use the EventHubConsumerClient to receive messages from an IoT Hub. | +| [websockets.js][websockets] | Demonstrates how to connect to Azure Event Hubs over websockets to work over an HTTP proxy. | ## Prerequisites @@ -65,6 +66,7 @@ Take a look at our [API Documentation][apiref] for more information about the AP [receiveevents]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/eventhub/event-hubs/samples/v5/javascript/receiveEvents.js [usingaadauth]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/eventhub/event-hubs/samples/v5/javascript/usingAadAuth.js [iothubconnectionstring]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/eventhub/event-hubs/samples/v5/javascript/iothubConnectionString.js +[iothubconnectionstringwebsockets]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/eventhub/event-hubs/samples/v5/javascript/iothubConnectionStringWebsockets.js [usewithiothub]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/eventhub/event-hubs/samples/v5/javascript/useWithIotHub.js [websockets]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/eventhub/event-hubs/samples/v5/javascript/websockets.js [apiref]: https://docs.microsoft.com/javascript/api/@azure/event-hubs diff --git a/sdk/eventhub/event-hubs/samples/v5/javascript/iothubConnectionStringWebsockets.js b/sdk/eventhub/event-hubs/samples/v5/javascript/iothubConnectionStringWebsockets.js new file mode 100644 index 000000000000..062b99f4c1af --- /dev/null +++ b/sdk/eventhub/event-hubs/samples/v5/javascript/iothubConnectionStringWebsockets.js @@ -0,0 +1,164 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT Licence. + +/** + * @summary Demonstrates how to convert an IoT Hub connection string to an Event Hubs connection string that points to the built-in messaging endpoint using WebSockets. + */ + +/* + * The Event Hubs connection string is then used with the EventHubConsumerClient to receive events. + * + * More information about the built-in messaging endpoint can be found at: + * https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-messages-read-builtin + */ + +const crypto = require("crypto"); +const { Buffer } = require("buffer"); +const { Connection, ReceiverEvents, parseConnectionString } = require("rhea-promise"); +const rheaPromise = require("rhea-promise"); +const { EventHubConsumerClient, earliestEventPosition } = require("@azure/event-hubs"); +const WebSocket = require("ws"); + +// Load the .env file if it exists +const dotenv = require("dotenv"); +dotenv.config(); + +/** + * Type guard for AmqpError. + * @param err - An unknown error. + */ +function isAmqpError(err) { + return rheaPromise.isAmqpError(err); +} + +const consumerGroup = process.env["CONSUMER_GROUP_NAME"] || ""; + +// This code is modified from https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-security#security-tokens. +function generateSasToken(resourceUri, signingKey, policyName, expiresInMins) { + resourceUri = encodeURIComponent(resourceUri); + + const expiresInSeconds = Math.ceil(Date.now() / 1000 + expiresInMins * 60); + const toSign = resourceUri + "\n" + expiresInSeconds; + + // Use the crypto module to create the hmac. + const hmac = crypto.createHmac("sha256", Buffer.from(signingKey, "base64")); + hmac.update(toSign); + const base64UriEncoded = encodeURIComponent(hmac.digest("base64")); + + // Construct authorization string. + return `SharedAccessSignature sr=${resourceUri}&sig=${base64UriEncoded}&se=${expiresInSeconds}&skn=${policyName}`; +} + +/** + * Converts an IotHub Connection string into an Event Hubs-compatible connection string. + * @param connectionString - An IotHub connection string in the format: + * `"HostName=.azure-devices.net;SharedAccessKeyName=;SharedAccessKey="` + * @returns An Event Hubs-compatible connection string in the format: + * `"Endpoint=sb://;EntityPath=;SharedAccessKeyName=;SharedAccessKey="` + */ +async function convertIotHubToEventHubsConnectionString(connectionString) { + const { HostName, SharedAccessKeyName, SharedAccessKey } = parseConnectionString( + connectionString + ); + + // Verify that the required info is in the connection string. + if (!HostName || !SharedAccessKey || !SharedAccessKeyName) { + throw new Error(`Invalid IotHub connection string.`); + } + + //Extract the IotHub name from the hostname. + const [iotHubName] = HostName.split("."); + + if (!iotHubName) { + throw new Error(`Unable to extract the IotHub name from the connection string.`); + } + + // Generate a token to authenticate to the service. + // The code for generateSasToken can be found at https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-security#security-tokens + const token = generateSasToken( + `${HostName}/messages/events`, + SharedAccessKey, + SharedAccessKeyName, + 5 // token expires in 5 minutes + ); + + const connection = new Connection({ + transport: "tls", + host: HostName, + hostname: HostName, + username: `${SharedAccessKeyName}@sas.root.${iotHubName}`, + port: 443, + reconnect: false, + password: token, + webSocketOptions: { + webSocket: WebSocket, + protocol: ["AMQPWSB10"], + url: `wss://${HostName}:${443}/$servicebus/websocket` + } + }); + await connection.open(); + + // Create the receiver that will trigger a redirect error. + const receiver = await connection.createReceiver({ + source: { address: `amqps://${HostName}/messages/events/$management` } + }); + + return new Promise((resolve, reject) => { + receiver.on(ReceiverEvents.receiverError, (context) => { + const error = context.receiver && context.receiver.error; + if (isAmqpError(error) && error.condition === "amqp:link:redirect") { + const hostname = error.info && error.info.hostname; + if (!hostname) { + reject(error); + } else { + resolve( + `Endpoint=sb://${hostname}/;EntityPath=${iotHubName};SharedAccessKeyName=${SharedAccessKeyName};SharedAccessKey=${SharedAccessKey}` + ); + } + } else { + reject(error); + } + connection.close().catch(() => { + /* ignore error */ + }); + }); + }); +} + +async function main() { + console.log(`Running iothubConnectionString sample`); + + const eventHubsConnectionString = await convertIotHubToEventHubsConnectionString( + "HostName=.azure-devices.net;SharedAccessKeyName=;SharedAccessKey=" + ); + + const consumerClient = new EventHubConsumerClient(consumerGroup, eventHubsConnectionString); + + const subscription = consumerClient.subscribe( + { + // The callback where you add your code to process incoming events + processEvents: async (events, context) => { + for (const event of events) { + console.log( + `Received event: '${event.body}' from partition: '${context.partitionId}' and consumer group: '${context.consumerGroup}'` + ); + } + }, + processError: async (err, context) => { + console.log(`Error on partition "${context.partitionId}" : ${err}`); + } + }, + { startPosition: earliestEventPosition } + ); + + // Wait for a bit before cleaning up the sample + setTimeout(async () => { + await subscription.close(); + await consumerClient.close(); + console.log(`Exiting iothubConnectionString sample`); + }, 30 * 1000); +} + +main().catch((error) => { + console.error("Error running sample:", error); +}); diff --git a/sdk/eventhub/event-hubs/samples/v5/typescript/README.md b/sdk/eventhub/event-hubs/samples/v5/typescript/README.md index 37d3c5a10373..e48b6cf4f5b5 100644 --- a/sdk/eventhub/event-hubs/samples/v5/typescript/README.md +++ b/sdk/eventhub/event-hubs/samples/v5/typescript/README.md @@ -12,14 +12,15 @@ urlFragment: event-hubs-typescript These sample programs show how to use the TypeScript client libraries for Azure Event Hubs in some common scenarios. -| **File Name** | **Description** | -| --------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------- | -| [sendEvents.ts][sendevents] | Demonstrates how to send events to an Event Hub. | -| [receiveEvents.ts][receiveevents] | Demonstrates how to use the EventHubConsumerClient to process events from all partitions of a consumer group in an Event Hub. | -| [usingAadAuth.ts][usingaadauth] | Demonstrates how to instantiate EventHubsClient using AAD token credentials obtained from using service principal secrets. | -| [iothubConnectionString.ts][iothubconnectionstring] | Demonstrates how to convert an IoT Hub connection string to an Event Hubs connection string that points to the built-in messaging endpoint. | -| [useWithIotHub.ts][usewithiothub] | Demonstrates how to use the EventHubConsumerClient to receive messages from an IoT Hub. | -| [websockets.ts][websockets] | Demonstrates how to connect to Azure Event Hubs over websockets to work over an HTTP proxy. | +| **File Name** | **Description** | +| ----------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------ | +| [sendEvents.ts][sendevents] | Demonstrates how to send events to an Event Hub. | +| [receiveEvents.ts][receiveevents] | Demonstrates how to use the EventHubConsumerClient to process events from all partitions of a consumer group in an Event Hub. | +| [usingAadAuth.ts][usingaadauth] | Demonstrates how to instantiate EventHubsClient using AAD token credentials obtained from using service principal secrets. | +| [iothubConnectionString.ts][iothubconnectionstring] | Demonstrates how to convert an IoT Hub connection string to an Event Hubs connection string that points to the built-in messaging endpoint. | +| [iothubConnectionStringWebsockets.ts][iothubconnectionstringwebsockets] | Demonstrates how to convert an IoT Hub connection string to an Event Hubs connection string that points to the built-in messaging endpoint using WbeSockets. | +| [useWithIotHub.ts][usewithiothub] | Demonstrates how to use the EventHubConsumerClient to receive messages from an IoT Hub. | +| [websockets.ts][websockets] | Demonstrates how to connect to Azure Event Hubs over websockets to work over an HTTP proxy. | ## Prerequisites @@ -77,6 +78,7 @@ Take a look at our [API Documentation][apiref] for more information about the AP [receiveevents]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/eventhub/event-hubs/samples/v5/typescript/src/receiveEvents.ts [usingaadauth]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/eventhub/event-hubs/samples/v5/typescript/src/usingAadAuth.ts [iothubconnectionstring]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/eventhub/event-hubs/samples/v5/typescript/src/iothubConnectionString.ts +[iothubconnectionstringwebsockets]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/eventhub/event-hubs/samples/v5/typescript/src/iothubConnectionStringWebsockets.ts [usewithiothub]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/eventhub/event-hubs/samples/v5/typescript/src/useWithIotHub.ts [websockets]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/eventhub/event-hubs/samples/v5/typescript/src/websockets.ts [apiref]: https://docs.microsoft.com/javascript/api/@azure/event-hubs diff --git a/sdk/eventhub/event-hubs/samples/v5/typescript/src/iothubConnectionStringWebsockets.ts b/sdk/eventhub/event-hubs/samples/v5/typescript/src/iothubConnectionStringWebsockets.ts new file mode 100644 index 000000000000..2af2dd8ac011 --- /dev/null +++ b/sdk/eventhub/event-hubs/samples/v5/typescript/src/iothubConnectionStringWebsockets.ts @@ -0,0 +1,171 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT Licence. + +/** + * @summary Demonstrates how to convert an IoT Hub connection string to an Event Hubs connection string that points to the built-in messaging endpoint using WebSockets. + */ + +/* + * The Event Hubs connection string is then used with the EventHubConsumerClient to receive events. + * + * More information about the built-in messaging endpoint can be found at: + * https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-messages-read-builtin + */ + +import * as crypto from "crypto"; +import { Buffer } from "buffer"; +import { AmqpError, Connection, ReceiverEvents, parseConnectionString } from "rhea-promise"; +import rheaPromise from "rhea-promise"; +import { EventHubConsumerClient, earliestEventPosition } from "@azure/event-hubs"; +import WebSocket from "ws"; + +// Load the .env file if it exists +import * as dotenv from "dotenv"; +dotenv.config(); + +/** + * Type guard for AmqpError. + * @param err - An unknown error. + */ +function isAmqpError(err: any): err is AmqpError { + return rheaPromise.isAmqpError(err); +} + +const consumerGroup = process.env["CONSUMER_GROUP_NAME"] || ""; + +// This code is modified from https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-security#security-tokens. +function generateSasToken( + resourceUri: string, + signingKey: string, + policyName: string, + expiresInMins: number +): string { + resourceUri = encodeURIComponent(resourceUri); + + const expiresInSeconds = Math.ceil(Date.now() / 1000 + expiresInMins * 60); + const toSign = resourceUri + "\n" + expiresInSeconds; + + // Use the crypto module to create the hmac. + const hmac = crypto.createHmac("sha256", Buffer.from(signingKey, "base64")); + hmac.update(toSign); + const base64UriEncoded = encodeURIComponent(hmac.digest("base64")); + + // Construct authorization string. + return `SharedAccessSignature sr=${resourceUri}&sig=${base64UriEncoded}&se=${expiresInSeconds}&skn=${policyName}`; +} + +/** + * Converts an IotHub Connection string into an Event Hubs-compatible connection string. + * @param connectionString - An IotHub connection string in the format: + * `"HostName=.azure-devices.net;SharedAccessKeyName=;SharedAccessKey="` + * @returns An Event Hubs-compatible connection string in the format: + * `"Endpoint=sb://;EntityPath=;SharedAccessKeyName=;SharedAccessKey="` + */ +async function convertIotHubToEventHubsConnectionString(connectionString: string): Promise { + const { HostName, SharedAccessKeyName, SharedAccessKey } = parseConnectionString<{ + HostName: string; + SharedAccessKeyName: string; + SharedAccessKey: string; + }>(connectionString); + + // Verify that the required info is in the connection string. + if (!HostName || !SharedAccessKey || !SharedAccessKeyName) { + throw new Error(`Invalid IotHub connection string.`); + } + + //Extract the IotHub name from the hostname. + const [iotHubName] = HostName.split("."); + + if (!iotHubName) { + throw new Error(`Unable to extract the IotHub name from the connection string.`); + } + + // Generate a token to authenticate to the service. + // The code for generateSasToken can be found at https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-security#security-tokens + const token = generateSasToken( + `${HostName}/messages/events`, + SharedAccessKey, + SharedAccessKeyName, + 5 // token expires in 5 minutes + ); + + const connection = new Connection({ + transport: "tls", + host: HostName, + hostname: HostName, + username: `${SharedAccessKeyName}@sas.root.${iotHubName}`, + port: 443, + reconnect: false, + password: token, + webSocketOptions: { + webSocket: WebSocket, + protocol: ["AMQPWSB10"], + url: `wss://${HostName}:${443}/$servicebus/websocket` + } + }); + await connection.open(); + + // Create the receiver that will trigger a redirect error. + const receiver = await connection.createReceiver({ + source: { address: `amqps://${HostName}/messages/events/$management` } + }); + + return new Promise((resolve, reject) => { + receiver.on(ReceiverEvents.receiverError, (context) => { + const error = context.receiver && context.receiver.error; + if (isAmqpError(error) && error.condition === "amqp:link:redirect") { + const hostname = error.info && error.info.hostname; + if (!hostname) { + reject(error); + } else { + resolve( + `Endpoint=sb://${hostname}/;EntityPath=${iotHubName};SharedAccessKeyName=${SharedAccessKeyName};SharedAccessKey=${SharedAccessKey}` + ); + } + } else { + reject(error); + } + connection.close().catch(() => { + /* ignore error */ + }); + }); + }); +} + +export async function main() { + console.log(`Running iothubConnectionString sample`); + + const eventHubsConnectionString = await convertIotHubToEventHubsConnectionString( + "HostName=.azure-devices.net;SharedAccessKeyName=;SharedAccessKey=" + ); + + const consumerClient = new EventHubConsumerClient(consumerGroup, eventHubsConnectionString); + + const subscription = consumerClient.subscribe( + { + // The callback where you add your code to process incoming events + processEvents: async (events, context) => { + for (const event of events) { + console.log( + `Received event: '${event.body}' from partition: '${context.partitionId}' and consumer group: '${context.consumerGroup}'` + ); + } + }, + processError: async (err, context) => { + console.log(`Error on partition "${context.partitionId}" : ${err}`); + } + }, + { startPosition: earliestEventPosition } + ); + + // Wait for a bit before cleaning up the sample + setTimeout(async () => { + await subscription.close(); + await consumerClient.close(); + console.log(`Exiting iothubConnectionString sample`); + }, 30 * 1000); +} + +main().catch((error) => { + console.error("Error running sample:", error); +});