Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[event-hubs] adds IoT Hub connection string conversion with websockets sample #16589

Merged
4 commits merged into from
Aug 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/eventhub/event-hubs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
},
"skip": [
"iothubConnectionString.js",
"iothubConnectionStringWebsockets.js",
"useWithIotHub.js",
"usingAadAuth.js"
],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT Licence.

/**
* @summary Demonstrates how to convert an IoT Hub connection string to an Event Hubs connection string that points to the built-in messaging endpoint using WebSockets.
*/

/*
* The Event Hubs connection string is then used with the EventHubConsumerClient to receive events.
*
* More information about the built-in messaging endpoint can be found at:
* https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-messages-read-builtin
*/

import * as crypto from "crypto";
import { Buffer } from "buffer";
import { AmqpError, Connection, ReceiverEvents, parseConnectionString } from "rhea-promise";
import rheaPromise from "rhea-promise";
import { EventHubConsumerClient, earliestEventPosition } from "@azure/event-hubs";
import WebSocket from "ws";

// Load the .env file if it exists
import * as dotenv from "dotenv";
dotenv.config();

/**
* Type guard for AmqpError.
* @param err - An unknown error.
*/
function isAmqpError(err: any): err is AmqpError {
return rheaPromise.isAmqpError(err);
}

const consumerGroup = process.env["CONSUMER_GROUP_NAME"] || "";

// This code is modified from https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-security#security-tokens.
function generateSasToken(
resourceUri: string,
signingKey: string,
policyName: string,
expiresInMins: number
): string {
resourceUri = encodeURIComponent(resourceUri);

const expiresInSeconds = Math.ceil(Date.now() / 1000 + expiresInMins * 60);
const toSign = resourceUri + "\n" + expiresInSeconds;

// Use the crypto module to create the hmac.
const hmac = crypto.createHmac("sha256", Buffer.from(signingKey, "base64"));
hmac.update(toSign);
const base64UriEncoded = encodeURIComponent(hmac.digest("base64"));

// Construct authorization string.
return `SharedAccessSignature sr=${resourceUri}&sig=${base64UriEncoded}&se=${expiresInSeconds}&skn=${policyName}`;
}

/**
* Converts an IotHub Connection string into an Event Hubs-compatible connection string.
* @param connectionString - An IotHub connection string in the format:
* `"HostName=<your-iot-hub>.azure-devices.net;SharedAccessKeyName=<KeyName>;SharedAccessKey=<Key>"`
* @returns An Event Hubs-compatible connection string in the format:
* `"Endpoint=sb://<hostname>;EntityPath=<your-iot-hub>;SharedAccessKeyName=<KeyName>;SharedAccessKey=<Key>"`
*/
async function convertIotHubToEventHubsConnectionString(connectionString: string): Promise<string> {
const { HostName, SharedAccessKeyName, SharedAccessKey } = parseConnectionString<{
HostName: string;
SharedAccessKeyName: string;
SharedAccessKey: string;
}>(connectionString);

// Verify that the required info is in the connection string.
if (!HostName || !SharedAccessKey || !SharedAccessKeyName) {
throw new Error(`Invalid IotHub connection string.`);
}

//Extract the IotHub name from the hostname.
const [iotHubName] = HostName.split(".");

if (!iotHubName) {
throw new Error(`Unable to extract the IotHub name from the connection string.`);
}

// Generate a token to authenticate to the service.
// The code for generateSasToken can be found at https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-security#security-tokens
const token = generateSasToken(
`${HostName}/messages/events`,
SharedAccessKey,
SharedAccessKeyName,
5 // token expires in 5 minutes
);

const connection = new Connection({
transport: "tls",
host: HostName,
hostname: HostName,
username: `${SharedAccessKeyName}@sas.root.${iotHubName}`,
port: 443,
reconnect: false,
password: token,
webSocketOptions: {
webSocket: WebSocket,
protocol: ["AMQPWSB10"],
url: `wss://${HostName}:${443}/$servicebus/websocket`
}
});
await connection.open();

// Create the receiver that will trigger a redirect error.
const receiver = await connection.createReceiver({
source: { address: `amqps://${HostName}/messages/events/$management` }
});

return new Promise((resolve, reject) => {
receiver.on(ReceiverEvents.receiverError, (context) => {
const error = context.receiver && context.receiver.error;
if (isAmqpError(error) && error.condition === "amqp:link:redirect") {
const hostname = error.info && error.info.hostname;
if (!hostname) {
reject(error);
} else {
resolve(
`Endpoint=sb://${hostname}/;EntityPath=${iotHubName};SharedAccessKeyName=${SharedAccessKeyName};SharedAccessKey=${SharedAccessKey}`
);
}
} else {
reject(error);
}
connection.close().catch(() => {
/* ignore error */
});
});
});
}

export async function main() {
console.log(`Running iothubConnectionString sample`);

const eventHubsConnectionString = await convertIotHubToEventHubsConnectionString(
"HostName=<your-iot-hub>.azure-devices.net;SharedAccessKeyName=<KeyName>;SharedAccessKey=<Key>"
);

const consumerClient = new EventHubConsumerClient(consumerGroup, eventHubsConnectionString);

const subscription = consumerClient.subscribe(
{
// The callback where you add your code to process incoming events
processEvents: async (events, context) => {
for (const event of events) {
console.log(
`Received event: '${event.body}' from partition: '${context.partitionId}' and consumer group: '${context.consumerGroup}'`
);
}
},
processError: async (err, context) => {
console.log(`Error on partition "${context.partitionId}" : ${err}`);
}
},
{ startPosition: earliestEventPosition }
);

// Wait for a bit before cleaning up the sample
setTimeout(async () => {
await subscription.close();
await consumerClient.close();
console.log(`Exiting iothubConnectionString sample`);
}, 30 * 1000);
}

main().catch((error) => {
console.error("Error running sample:", error);
});
18 changes: 10 additions & 8 deletions sdk/eventhub/event-hubs/samples/v5/javascript/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@ urlFragment: event-hubs-javascript

These sample programs show how to use the JavaScript client libraries for Azure Event Hubs in some common scenarios.

| **File Name** | **Description** |
| --------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------- |
| [sendEvents.js][sendevents] | Demonstrates how to send events to an Event Hub. |
| [receiveEvents.js][receiveevents] | Demonstrates how to use the EventHubConsumerClient to process events from all partitions of a consumer group in an Event Hub. |
| [usingAadAuth.js][usingaadauth] | Demonstrates how to instantiate EventHubsClient using AAD token credentials obtained from using service principal secrets. |
| [iothubConnectionString.js][iothubconnectionstring] | Demonstrates how to convert an IoT Hub connection string to an Event Hubs connection string that points to the built-in messaging endpoint. |
| [useWithIotHub.js][usewithiothub] | Demonstrates how to use the EventHubConsumerClient to receive messages from an IoT Hub. |
| [websockets.js][websockets] | Demonstrates how to connect to Azure Event Hubs over websockets to work over an HTTP proxy. |
| **File Name** | **Description** |
| ----------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| [sendEvents.js][sendevents] | Demonstrates how to send events to an Event Hub. |
| [receiveEvents.js][receiveevents] | Demonstrates how to use the EventHubConsumerClient to process events from all partitions of a consumer group in an Event Hub. |
| [usingAadAuth.js][usingaadauth] | Demonstrates how to instantiate EventHubsClient using AAD token credentials obtained from using service principal secrets. |
| [iothubConnectionString.js][iothubconnectionstring] | Demonstrates how to convert an IoT Hub connection string to an Event Hubs connection string that points to the built-in messaging endpoint. |
| [iothubConnectionStringWebsockets.js][iothubconnectionstringwebsockets] | Demonstrates how to convert an IoT Hub connection string to an Event Hubs connection string that points to the built-in messaging endpoint using WbeSockets. |
| [useWithIotHub.js][usewithiothub] | Demonstrates how to use the EventHubConsumerClient to receive messages from an IoT Hub. |
| [websockets.js][websockets] | Demonstrates how to connect to Azure Event Hubs over websockets to work over an HTTP proxy. |

## Prerequisites

Expand Down Expand Up @@ -65,6 +66,7 @@ Take a look at our [API Documentation][apiref] for more information about the AP
[receiveevents]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/eventhub/event-hubs/samples/v5/javascript/receiveEvents.js
[usingaadauth]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/eventhub/event-hubs/samples/v5/javascript/usingAadAuth.js
[iothubconnectionstring]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/eventhub/event-hubs/samples/v5/javascript/iothubConnectionString.js
[iothubconnectionstringwebsockets]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/eventhub/event-hubs/samples/v5/javascript/iothubConnectionStringWebsockets.js
[usewithiothub]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/eventhub/event-hubs/samples/v5/javascript/useWithIotHub.js
[websockets]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/eventhub/event-hubs/samples/v5/javascript/websockets.js
[apiref]: https://docs.microsoft.com/javascript/api/@azure/event-hubs
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT Licence.

/**
* @summary Demonstrates how to convert an IoT Hub connection string to an Event Hubs connection string that points to the built-in messaging endpoint using WebSockets.
*/

/*
* The Event Hubs connection string is then used with the EventHubConsumerClient to receive events.
*
* More information about the built-in messaging endpoint can be found at:
* https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-messages-read-builtin
*/

const crypto = require("crypto");
const { Buffer } = require("buffer");
const { Connection, ReceiverEvents, parseConnectionString } = require("rhea-promise");
const rheaPromise = require("rhea-promise");
const { EventHubConsumerClient, earliestEventPosition } = require("@azure/event-hubs");
const WebSocket = require("ws");

// Load the .env file if it exists
const dotenv = require("dotenv");
dotenv.config();

/**
* Type guard for AmqpError.
* @param err - An unknown error.
*/
function isAmqpError(err) {
return rheaPromise.isAmqpError(err);
}

const consumerGroup = process.env["CONSUMER_GROUP_NAME"] || "";

// This code is modified from https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-security#security-tokens.
function generateSasToken(resourceUri, signingKey, policyName, expiresInMins) {
resourceUri = encodeURIComponent(resourceUri);

const expiresInSeconds = Math.ceil(Date.now() / 1000 + expiresInMins * 60);
const toSign = resourceUri + "\n" + expiresInSeconds;

// Use the crypto module to create the hmac.
const hmac = crypto.createHmac("sha256", Buffer.from(signingKey, "base64"));
hmac.update(toSign);
const base64UriEncoded = encodeURIComponent(hmac.digest("base64"));

// Construct authorization string.
return `SharedAccessSignature sr=${resourceUri}&sig=${base64UriEncoded}&se=${expiresInSeconds}&skn=${policyName}`;
}

/**
* Converts an IotHub Connection string into an Event Hubs-compatible connection string.
* @param connectionString - An IotHub connection string in the format:
* `"HostName=<your-iot-hub>.azure-devices.net;SharedAccessKeyName=<KeyName>;SharedAccessKey=<Key>"`
* @returns An Event Hubs-compatible connection string in the format:
* `"Endpoint=sb://<hostname>;EntityPath=<your-iot-hub>;SharedAccessKeyName=<KeyName>;SharedAccessKey=<Key>"`
*/
async function convertIotHubToEventHubsConnectionString(connectionString) {
const { HostName, SharedAccessKeyName, SharedAccessKey } = parseConnectionString(
connectionString
);

// Verify that the required info is in the connection string.
if (!HostName || !SharedAccessKey || !SharedAccessKeyName) {
throw new Error(`Invalid IotHub connection string.`);
}

//Extract the IotHub name from the hostname.
const [iotHubName] = HostName.split(".");

if (!iotHubName) {
throw new Error(`Unable to extract the IotHub name from the connection string.`);
}

// Generate a token to authenticate to the service.
// The code for generateSasToken can be found at https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-security#security-tokens
const token = generateSasToken(
`${HostName}/messages/events`,
SharedAccessKey,
SharedAccessKeyName,
5 // token expires in 5 minutes
);

const connection = new Connection({
transport: "tls",
host: HostName,
hostname: HostName,
username: `${SharedAccessKeyName}@sas.root.${iotHubName}`,
port: 443,
reconnect: false,
password: token,
webSocketOptions: {
webSocket: WebSocket,
protocol: ["AMQPWSB10"],
url: `wss://${HostName}:${443}/$servicebus/websocket`
}
});
await connection.open();

// Create the receiver that will trigger a redirect error.
const receiver = await connection.createReceiver({
source: { address: `amqps://${HostName}/messages/events/$management` }
});

return new Promise((resolve, reject) => {
receiver.on(ReceiverEvents.receiverError, (context) => {
const error = context.receiver && context.receiver.error;
if (isAmqpError(error) && error.condition === "amqp:link:redirect") {
const hostname = error.info && error.info.hostname;
if (!hostname) {
reject(error);
} else {
resolve(
`Endpoint=sb://${hostname}/;EntityPath=${iotHubName};SharedAccessKeyName=${SharedAccessKeyName};SharedAccessKey=${SharedAccessKey}`
);
}
} else {
reject(error);
}
connection.close().catch(() => {
/* ignore error */
});
});
});
}

async function main() {
console.log(`Running iothubConnectionString sample`);

const eventHubsConnectionString = await convertIotHubToEventHubsConnectionString(
"HostName=<your-iot-hub>.azure-devices.net;SharedAccessKeyName=<KeyName>;SharedAccessKey=<Key>"
);

const consumerClient = new EventHubConsumerClient(consumerGroup, eventHubsConnectionString);

const subscription = consumerClient.subscribe(
{
// The callback where you add your code to process incoming events
processEvents: async (events, context) => {
for (const event of events) {
console.log(
`Received event: '${event.body}' from partition: '${context.partitionId}' and consumer group: '${context.consumerGroup}'`
);
}
},
processError: async (err, context) => {
console.log(`Error on partition "${context.partitionId}" : ${err}`);
}
},
{ startPosition: earliestEventPosition }
);

// Wait for a bit before cleaning up the sample
setTimeout(async () => {
await subscription.close();
await consumerClient.close();
console.log(`Exiting iothubConnectionString sample`);
}, 30 * 1000);
}

main().catch((error) => {
console.error("Error running sample:", error);
});
Loading