Skip to content

Commit

Permalink
Ying/update event hub sdk (#463)
Browse files Browse the repository at this point in the history
* update eventhub node sdk; fix breaking change where hub connection string cannot be directly used to create client

* package verison update
  • Loading branch information
YingXue authored Aug 12, 2021
1 parent 04b7897 commit 11b867b
Show file tree
Hide file tree
Showing 27 changed files with 797 additions and 1,034 deletions.
1,026 changes: 487 additions & 539 deletions package-lock.json

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "azure-iot-explorer",
"version": "0.14.5",
"version": "0.14.6",
"description": "This project welcomes contributions and suggestions. Most contributions require you to agree to a\r Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us\r the rights to use your contribution. For details, visit https://cla.microsoft.com.",
"main": "host/electron.js",
"build": {
Expand Down Expand Up @@ -69,7 +69,7 @@
},
"homepage": "https://github.com/Azure/azure-iot-explorer#readme",
"dependencies": {
"@azure/event-hubs": "1.0.7",
"@azure/event-hubs": "5.6.0",
"@fluentui/react": "8.20.2",
"azure-iot-common": "1.10.3",
"azure-iothub": "1.8.1",
Expand Down
1 change: 0 additions & 1 deletion public/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ export const MESSAGE_CHANNELS = {
DEVICE_SEND_MESSAGE: 'device_sendMessage',
DIRECTORY_GET_DIRECTORIES: 'directory_getDirectories',
EVENTHUB_START_MONITORING: 'eventhub_startMonitoring',
EVENTHUB_STOP_MONITORING: 'eventhub_stopMonitoring',
MODEL_REPOSITORY_GET_DEFINITION: 'model_definition',
SETTING_HIGH_CONTRAST: 'setting_highContrast',
};
Expand Down
3 changes: 1 addition & 2 deletions public/electron.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { onSettingsHighContrast } from './handlers/settingsHandler';
import { onGetInterfaceDefinition } from './handlers/modelRepositoryHandler';
import { onGetDirectories } from './handlers/directoryHandler';
import { onSendMessageToDevice } from './handlers/deviceHandler';
import { onStartMonitoring, onStopMonitoring } from './handlers/eventHubHandler';
import { onStartMonitoring } from './handlers/eventHubHandler';
import { formatError } from './utils/errorHelper';
import '../dist/server/serverElectron';

Expand All @@ -34,7 +34,6 @@ class Main {
Main.registerHandler(MESSAGE_CHANNELS.DIRECTORY_GET_DIRECTORIES, onGetDirectories);
Main.registerHandler(MESSAGE_CHANNELS.DEVICE_SEND_MESSAGE, onSendMessageToDevice);
Main.registerHandler(MESSAGE_CHANNELS.EVENTHUB_START_MONITORING, onStartMonitoring);
Main.registerHandler(MESSAGE_CHANNELS.EVENTHUB_STOP_MONITORING, onStopMonitoring);
}

private static setApplicationLock(): void {
Expand Down
3 changes: 0 additions & 3 deletions public/factories/eventHubInterfaceFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@ export const generateEventHubInterface = (): EventHubInterface => {
return {
startEventHubMonitoring: async (params: StartEventHubMonitoringParameters): Promise<Message[]> => {
return invokeInMainWorld<Message[]>(MESSAGE_CHANNELS.EVENTHUB_START_MONITORING, params);
},
stopEventHubMonitoring: async (): Promise<void> => {
return invokeInMainWorld<void>(MESSAGE_CHANNELS.EVENTHUB_STOP_MONITORING);
}
};
};
186 changes: 60 additions & 126 deletions public/handlers/eventHubHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,147 +3,81 @@
* Licensed under the MIT License
**********************************************************/
import { IpcMainInvokeEvent } from 'electron';
import { EventHubClient, EventPosition, ReceiveHandler } from '@azure/event-hubs';
import { EventHubConsumerClient, ReceivedEventData, earliestEventPosition } from '@azure/event-hubs';
import { Message, StartEventHubMonitoringParameters } from '../interfaces/eventHubInterface';

let client: EventHubClient = null;
let messages: Message[] = [];
let receivers: ReceiveHandler[] = [];
let connectionString: string = ''; // would equal `${hubConnectionString}` or `${customEventHubConnectionString}/${customEventHubName}`
let deviceId: string = '';
let moduleId: string = '';
import { convertIotHubToEventHubsConnectionString } from '../utils/eventHubHelper';

const IOTHUB_CONNECTION_DEVICE_ID = 'iothub-connection-device-id';
const IOTHUB_CONNECTION_MODULE_ID = 'iothub-connection-module-id';
let hubConnectionString = '';
let eventHubCompatibleConnectionString = '';
let deviceId = '';
let moduleId = '';
let messages: Message[] = [];

export const onStartMonitoring = async (event: IpcMainInvokeEvent, params: StartEventHubMonitoringParameters): Promise<Message[]>=> {
return eventHubProvider(params).then(result => {
return result;
});
export const onStartMonitoring = async (event: IpcMainInvokeEvent, params: StartEventHubMonitoringParameters): Promise<Message[]>=> {
await eventHubProvider(params);
const result = [...messages];
messages = [];
return result;
}

export const onStopMonitoring = async (): Promise<void> => {
try {
return stopClient();
} catch (error) {
// swallow the error as we set client to null anyways
const eventHubProvider = async (params: any) => {
let connectionString = '';
if (params.customEventHubConnectionString) {
connectionString = params.customEventHubConnectionString;
}
}
else {
if (params.hubConnectionString != hubConnectionString) {
connectionString = await convertIotHubToEventHubsConnectionString(params.hubConnectionString);
// save strings for future use
hubConnectionString = params.hubConnectionString;
eventHubCompatibleConnectionString = connectionString;

const eventHubProvider = async (params: StartEventHubMonitoringParameters) => {
if (needToCreateNewEventHubClient(params))
{
// hub has changed, reinitialize client, receivers and mesages
client = params.customEventHubConnectionString ?
await EventHubClient.createFromConnectionString(params.customEventHubConnectionString, params.customEventHubName) :
await EventHubClient.createFromIotHubConnectionString(params.hubConnectionString);

connectionString = params.customEventHubConnectionString ?
`${params.customEventHubConnectionString}/${params.customEventHubName}` :
params.hubConnectionString;
receivers = [];
messages = [];
}
else {
connectionString = eventHubCompatibleConnectionString;
}
}
updateEntityIdIfNecessary(params);

return listeningToMessages(client, params);
};

const listeningToMessages = async (eventHubClient: EventHubClient, params: StartEventHubMonitoringParameters) => {
if (params.startListeners || !receivers) {
const partitionIds = await client.getPartitionIds();
const hubInfo = await client.getHubRuntimeInformation();
const startTime = params.startTime ? Date.parse(params.startTime) : Date.now();

partitionIds && partitionIds.forEach(async (partitionId: string) => {
const receiveOptions = {
consumerGroup: params.consumerGroup,
enableReceiverRuntimeMetric: true,
eventPosition: EventPosition.fromEnqueuedTime(startTime),
name: `${hubInfo.path}_${partitionId}`,
};

const receiver = eventHubClient.receive(
partitionId,
onMessageReceived,
(err: object) => {},
receiveOptions);
receivers.push(receiver);
});
if (deviceId != params.deviceId || moduleId != params.moduleId) {
messages = []; // clear the messages when switching identites
deviceId = params.deviceId;
moduleId = params.moduleId;
}

return handleMessages();
};

const handleMessages = () => {
let results: Message[] = [];
messages.forEach(message => {
if (!results.some(result => result.systemProperties?.['x-opt-sequence-number'] === message.systemProperties?.['x-opt-sequence-number'])) {
// if user click stop/start too refrequently, it's possible duplicate receivers are created before the cleanup happens as it's async
// remove duplicate messages before proper cleanup is finished
results.push(message);
}
})
messages = []; // empty the array everytime the result is returned
return results;
}

const stopClient = async () => {
return stopReceivers().then(() => {
return client && client.close().catch(error => {
console.log(`client cleanup error: ${error}`); // swallow the error as we will cleanup anyways
});
}).finally (() => {
client = null;
receivers = [];
});
};

const stopReceivers = async () => {
return Promise.all(
receivers.map(receiver => {
if (receiver && (receiver.isReceiverOpen === undefined || receiver.isReceiverOpen)) {
return stopReceiver(receiver);
} else {
return null;
const client = new EventHubConsumerClient(params.consumerGroup, connectionString);
const subscription = client.subscribe(
{
processEvents: async (events) => {
handleMessages(events, params)
},
processError: async (err) => {
console.log(err);
}
})
},
{ startPosition: params.startTime ? { enqueuedOn: new Date(params.startTime).getTime() } : earliestEventPosition }
);
};

const stopReceiver = async (receiver: ReceiveHandler) => {
receiver.stop().catch((err: object) => {
throw new Error(`receivers cleanup error: ${err}`);
});
}

const needToCreateNewEventHubClient = (parmas: StartEventHubMonitoringParameters): boolean => {
return !client ||
parmas.hubConnectionString && parmas.hubConnectionString !== connectionString ||
parmas.customEventHubConnectionString && `${parmas.customEventHubConnectionString}/${parmas.customEventHubName}` !== connectionString;
}

const updateEntityIdIfNecessary = (parmas: StartEventHubMonitoringParameters) => {
if( !deviceId || parmas.deviceId !== deviceId) {
deviceId = parmas.deviceId;
messages = [];
}
if (parmas.moduleId !== moduleId) {
moduleId = parmas.moduleId;
messages = [];
}
}
// Wait for a few seconds to receive events before closing
setTimeout(async () => {
await subscription.close();
await client.close();
}, 3 * 1000);
};

const onMessageReceived = async (eventData: any) => {
if (eventData && eventData.annotations && eventData.annotations[IOTHUB_CONNECTION_DEVICE_ID] === deviceId) {
if (!moduleId || eventData?.annotations?.[IOTHUB_CONNECTION_MODULE_ID] === moduleId) {
const message: Message = {
body: eventData.body,
enqueuedTime: eventData.enqueuedTimeUtc.toString(),
properties: eventData.applicationProperties
};
message.systemProperties = eventData.annotations;
messages.push(message);
const handleMessages = (events: ReceivedEventData[], params: any) => {
events.forEach(event => {
if (event?.systemProperties?.[IOTHUB_CONNECTION_DEVICE_ID] === params.deviceId) {
if (!params.moduleId || event?.systemProperties?.[IOTHUB_CONNECTION_MODULE_ID] === params.moduleId) {
const message: Message = {
body: event.body,
enqueuedTime: event.enqueuedTimeUtc.toString(),
properties: event.properties
};
message.systemProperties = event.systemProperties;
messages.push(message);
}
}
}
};
});
}
2 changes: 0 additions & 2 deletions public/interfaces/eventHubInterface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ export interface StartEventHubMonitoringParameters {
moduleId: string;
consumerGroup: string;
startTime: string;
startListeners: boolean;

customEventHubName?: string;
customEventHubConnectionString?: string;
Expand All @@ -23,5 +22,4 @@ export interface Message {

export interface EventHubInterface {
startEventHubMonitoring(params: StartEventHubMonitoringParameters): Promise<Message[]>;
stopEventHubMonitoring(): Promise<void>;
}
83 changes: 83 additions & 0 deletions public/utils/eventHubHelper.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@

/***********************************************************
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License
**********************************************************/
import { AmqpError, isAmqpError, Connection, ReceiverEvents, parseConnectionString } from "rhea-promise";
import * as crypto from "crypto";

// The following helper functions are directly copied from:
// https://raw.githubusercontent.com/Azure/azure-sdk-for-js/main/sdk/eventhub/event-hubs/samples/v5/typescript/src/iothubConnectionString.ts
export const convertIotHubToEventHubsConnectionString = async (connectionString: string): Promise<string> =>
{
const { HostName, SharedAccessKeyName, SharedAccessKey } = parseConnectionString<{
HostName: string;
SharedAccessKeyName: string;
SharedAccessKey: string;
}>(connectionString);

// Verify that the required info is in the connection string.
if (!HostName || !SharedAccessKey || !SharedAccessKeyName) {
throw new Error(`Invalid IotHub connection string.`);
}

//Extract the IotHub name from the hostname.
const [iotHubName] = HostName.split(".");

if (!iotHubName) {
throw new Error(`Unable to extract the IotHub name from the connection string.`);
}

// Generate a token to authenticate to the service.
// The code for generateSasToken can be found at https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-security#security-tokens
const token = generateSasToken(`${HostName}/messages/events`, SharedAccessKey, SharedAccessKeyName, 5);

const connection = new Connection({
transport: "tls",
host: HostName,
hostname: HostName,
username: `${SharedAccessKeyName}@sas.root.${iotHubName}`,
port: 5671,
reconnect: false,
password: token
});
await connection.open();

// Create the receiver that will trigger a redirect error.
const receiver = await connection.createReceiver({ source: { address: `amqps://${HostName}/messages/events/$management` }});

return new Promise((resolve, reject) => {
receiver.on(ReceiverEvents.receiverError, (context) => {
const error = context.receiver && context.receiver.error;
if (isAmqpError(error) && (error as AmqpError).condition === "amqp:link:redirect") {
const hostname = (error as AmqpError).info?.hostname;
if (!hostname) {
reject(error);
} else {
resolve(`Endpoint=sb://${hostname}/;EntityPath=${iotHubName};SharedAccessKeyName=${SharedAccessKeyName};SharedAccessKey=${SharedAccessKey}`);
}
} else {
reject(error);
}
connection.close().catch(() => {
/* ignore error */
});
});
});
}

// This code copied from event hub's sample
// https://raw.githubusercontent.com/Azure/azure-sdk-for-js/main/sdk/eventhub/event-hubs/samples/v5/typescript/src/iothubConnectionString.ts
const generateSasToken = (resourceUri: string, signingKey: string, policyName: string, expiresInMins: number): string => {
resourceUri = encodeURIComponent(resourceUri);
const expiresInSeconds = Math.ceil(Date.now() / 1000 + expiresInMins * 60);
const toSign = resourceUri + "\n" + expiresInSeconds;

// Use the crypto module to create the hmac.
const hmac = crypto.createHmac("sha256", Buffer.from(signingKey, "base64"));
hmac.update(toSign);
const base64UriEncoded = encodeURIComponent(hmac.digest("base64"));

// Construct authorization string.
return `SharedAccessSignature sr=${resourceUri}&sig=${base64UriEncoded}&se=${expiresInSeconds}&skn=${policyName}`;
}
1 change: 0 additions & 1 deletion src/app/api/parameters/deviceParameters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ export interface MonitorEventsParameters {
deviceId: string;
moduleId: string;
consumerGroup: string;
startListeners: boolean;

customEventHubName?: string;
customEventHubConnectionString?: string;
Expand Down
Loading

0 comments on commit 11b867b

Please sign in to comment.