Skip to content

Commit

Permalink
fix: Add queue for EZSP adapter (#771)
Browse files Browse the repository at this point in the history
Using the "concurrent" option for the queue.
Restoring the queue to send to the endpoint.
  • Loading branch information
kirovilya authored Oct 12, 2023
1 parent a93af4a commit 04b5ebd
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 35 deletions.
57 changes: 33 additions & 24 deletions src/adapter/ezsp/adapter/ezspAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import {Driver, EmberIncomingMessage} from '../driver';
import {EmberZDOCmd, EmberApsOption, uint16_t, EmberEUI64, EmberStatus, EmberKeyData} from '../driver/types';
import {ZclFrame, FrameType, Direction, Foundation} from '../../../zcl';
import * as Events from '../../events';
import {Waitress, Wait, RealpathSync} from '../../../utils';
import {Queue, Waitress, Wait, RealpathSync} from '../../../utils';
import * as Models from "../../../models";
import SerialPortUtils from '../../serialPortUtils';
import SocketPortUtils from '../../socketPortUtils';
Expand All @@ -40,6 +40,8 @@ class EZSPAdapter extends Adapter {
private waitress: Waitress<Events.ZclDataPayload, WaitressMatcher>;
private interpanLock: boolean;
private backupMan: EZSPAdapterBackup;
private queue: Queue;


public constructor(networkOptions: NetworkOptions,
serialPortOptions: SerialPortOptions, backupPath: string, adapterOptions: AdapterOptions) {
Expand All @@ -49,6 +51,11 @@ class EZSPAdapter extends Adapter {
this.waitressValidator, this.waitressTimeoutFormatter
);
this.interpanLock = false;

const concurrent = adapterOptions && adapterOptions.concurrent ? adapterOptions.concurrent : 8;
debug(`Adapter concurrent: ${concurrent}`);
this.queue = new Queue(concurrent);

this.driver = new Driver();
this.driver.on('deviceJoined', this.handleDeviceJoin.bind(this));
this.driver.on('deviceLeft', this.handleDeviceLeft.bind(this));
Expand Down Expand Up @@ -197,7 +204,7 @@ class EZSPAdapter extends Adapter {
}

public async getCoordinator(): Promise<Coordinator> {
return this.driver.queue.execute<Coordinator>(async () => {
return this.queue.execute<Coordinator>(async () => {
this.checkInterpanLock();
const networkAddress = 0x0000;
const message = await this.driver.zdoRequest(
Expand Down Expand Up @@ -231,7 +238,7 @@ class EZSPAdapter extends Adapter {
}

public async permitJoin(seconds: number, networkAddress: number): Promise<void> {
return this.driver.queue.execute<void>(async () => {
return this.queue.execute<void>(async () => {
this.checkInterpanLock();
if (seconds) {
this.driver.preJoining();
Expand Down Expand Up @@ -267,7 +274,7 @@ class EZSPAdapter extends Adapter {
}

public async lqi(networkAddress: number): Promise<LQI> {
return this.driver.queue.execute<LQI>(async (): Promise<LQI> => {
return this.queue.execute<LQI>(async (): Promise<LQI> => {
this.checkInterpanLock();
const neighbors: LQINeighbor[] = [];

Expand Down Expand Up @@ -313,7 +320,7 @@ class EZSPAdapter extends Adapter {
}

public async routingTable(networkAddress: number): Promise<RoutingTable> {
return this.driver.queue.execute<RoutingTable>(async (): Promise<RoutingTable> => {
return this.queue.execute<RoutingTable>(async (): Promise<RoutingTable> => {
this.checkInterpanLock();
const table: RoutingTableEntry[] = [];

Expand Down Expand Up @@ -356,7 +363,7 @@ class EZSPAdapter extends Adapter {
}

public async nodeDescriptor(networkAddress: number): Promise<NodeDescriptor> {
return this.driver.queue.execute<NodeDescriptor>(async () => {
return this.queue.execute<NodeDescriptor>(async () => {
this.checkInterpanLock();
try {
debug(`Requesting 'Node Descriptor' for '${networkAddress}'`);
Expand All @@ -383,7 +390,7 @@ class EZSPAdapter extends Adapter {

public async activeEndpoints(networkAddress: number): Promise<ActiveEndpoints> {
debug(`Requesting 'Active endpoints' for '${networkAddress}'`);
return this.driver.queue.execute<ActiveEndpoints>(async () => {
return this.queue.execute<ActiveEndpoints>(async () => {
const endpoints = await this.driver.zdoRequest(
networkAddress, EmberZDOCmd.Active_EP_req, EmberZDOCmd.Active_EP_rsp,
{dstaddr: networkAddress}
Expand All @@ -394,7 +401,7 @@ class EZSPAdapter extends Adapter {

public async simpleDescriptor(networkAddress: number, endpointID: number): Promise<SimpleDescriptor> {
debug(`Requesting 'Simple Descriptor' for '${networkAddress}' endpoint ${endpointID}`);
return this.driver.queue.execute<SimpleDescriptor>(async () => {
return this.queue.execute<SimpleDescriptor>(async () => {
this.checkInterpanLock();
const descriptor = await this.driver.zdoRequest(
networkAddress, EmberZDOCmd.Simple_Desc_req, EmberZDOCmd.Simple_Desc_rsp,
Expand All @@ -414,11 +421,13 @@ class EZSPAdapter extends Adapter {
ieeeAddr: string, networkAddress: number, endpoint: number, zclFrame: ZclFrame, timeout: number,
disableResponse: boolean, disableRecovery: boolean, sourceEndpoint?: number,
): Promise<Events.ZclDataPayload> {
this.checkInterpanLock();
return this.sendZclFrameToEndpointInternal(
ieeeAddr, networkAddress, endpoint, sourceEndpoint || 1, zclFrame, timeout, disableResponse,
disableRecovery, 0, 0, false, false, false, null
);
return this.queue.execute<Events.ZclDataPayload>(async () => {
this.checkInterpanLock();
return this.sendZclFrameToEndpointInternal(
ieeeAddr, networkAddress, endpoint, sourceEndpoint || 1, zclFrame, timeout, disableResponse,
disableRecovery, 0, 0, false, false, false, null
);
}, networkAddress);
}

private async sendZclFrameToEndpointInternal(
Expand All @@ -431,7 +440,7 @@ class EZSPAdapter extends Adapter {
ieeeAddr = `0x${this.driver.ieee.toString()}`;
}
debug('sendZclFrameToEndpointInternal %s:%i/%i (%i,%i,%i)',
ieeeAddr, networkAddress, endpoint, responseAttempt, dataRequestAttempt, this.driver.queue.count());
ieeeAddr, networkAddress, endpoint, responseAttempt, dataRequestAttempt, this.queue.count());
let response = null;
const command = zclFrame.getCommand();
if (command.hasOwnProperty('response') && disableResponse === false) {
Expand Down Expand Up @@ -483,7 +492,7 @@ class EZSPAdapter extends Adapter {
}

public async sendZclFrameToGroup(groupID: number, zclFrame: ZclFrame): Promise<void> {
return this.driver.queue.execute<void>(async () => {
return this.queue.execute<void>(async () => {
this.checkInterpanLock();
const frame = this.driver.makeApsFrame(zclFrame.Cluster.ID, false);
frame.profileId = 0x0104;
Expand All @@ -501,7 +510,7 @@ class EZSPAdapter extends Adapter {
}

public async sendZclFrameToAll(endpoint: number, zclFrame: ZclFrame, sourceEndpoint: number): Promise<void> {
return this.driver.queue.execute<void>(async () => {
return this.queue.execute<void>(async () => {
this.checkInterpanLock();
const frame = this.driver.makeApsFrame(zclFrame.Cluster.ID, false);
frame.profileId = sourceEndpoint === 242 && endpoint === 242 ? 0xA1E0 : 0x0104;
Expand All @@ -524,7 +533,7 @@ class EZSPAdapter extends Adapter {
clusterID: number, destinationAddressOrGroup: string | number, type: 'endpoint' | 'group',
destinationEndpoint?: number
): Promise<void> {
return this.driver.queue.execute<void>(async () => {
return this.queue.execute<void>(async () => {
this.checkInterpanLock();
const ieee = new EmberEUI64(sourceIeeeAddress);
let destAddr;
Expand Down Expand Up @@ -555,7 +564,7 @@ class EZSPAdapter extends Adapter {
clusterID: number, destinationAddressOrGroup: string | number, type: 'endpoint' | 'group',
destinationEndpoint: number
): Promise<void> {
return this.driver.queue.execute<void>(async () => {
return this.queue.execute<void>(async () => {
this.checkInterpanLock();
const ieee = new EmberEUI64(sourceIeeeAddress);
let destAddr;
Expand All @@ -582,7 +591,7 @@ class EZSPAdapter extends Adapter {
}

public removeDevice(networkAddress: number, ieeeAddr: string): Promise<void> {
return this.driver.queue.execute<void>(async () => {
return this.queue.execute<void>(async () => {
this.checkInterpanLock();
const ieee = new EmberEUI64(ieeeAddr);
this.driver.setNode(networkAddress, ieee);
Expand Down Expand Up @@ -610,7 +619,7 @@ class EZSPAdapter extends Adapter {
}

public async restoreChannelInterPAN(): Promise<void> {
return this.driver.queue.execute<void>(async () => {
return this.queue.execute<void>(async () => {
const channel = (await this.getNetworkParameters()).channel;
await this.driver.setChannel(channel);
// Give adapter some time to restore, otherwise stuff crashes
Expand All @@ -626,7 +635,7 @@ class EZSPAdapter extends Adapter {
}

public async sendZclFrameInterPANToIeeeAddr(zclFrame: ZclFrame, ieeeAddr: string): Promise<void> {
return this.driver.queue.execute<void>(async () => {
return this.queue.execute<void>(async () => {
debug(`sendZclFrameInterPANToIeeeAddr to ${ieeeAddr}`);
try {
const frame = this.driver.makeEmberIeeeRawFrame();
Expand All @@ -647,7 +656,7 @@ class EZSPAdapter extends Adapter {
}

public async sendZclFrameInterPANBroadcast(zclFrame: ZclFrame, timeout: number): Promise<Events.ZclDataPayload> {
return this.driver.queue.execute<Events.ZclDataPayload>(async () => {
return this.queue.execute<Events.ZclDataPayload>(async () => {
debug(`sendZclFrameInterPANBroadcast`);
const command = zclFrame.getCommand();
if (!command.hasOwnProperty('response')) {
Expand Down Expand Up @@ -681,13 +690,13 @@ class EZSPAdapter extends Adapter {

public async setTransmitPower(value: number): Promise<void> {
debug(`setTransmitPower to ${value}`);
return this.driver.queue.execute<void>(async () => {
return this.queue.execute<void>(async () => {
await this.driver.setRadioPower(value);
});
}

public async setChannelInterPAN(channel: number): Promise<void> {
return this.driver.queue.execute<void>(async () => {
return this.queue.execute<void>(async () => {
this.interpanLock = true;
await this.driver.setChannel(channel);
});
Expand Down
5 changes: 2 additions & 3 deletions src/adapter/ezsp/driver/driver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import {
EmberKeyType
} from './types/named';
import {Multicast} from './multicast';
import {Queue, Waitress, Wait} from '../../../utils';
import {Waitress, Wait} from '../../../utils';
import Debug from "debug";
import equals from 'fast-deep-equal/es6';
import {ParamsDesc} from './commands';
Expand Down Expand Up @@ -87,15 +87,14 @@ export class Driver extends EventEmitter {
public ieee: EmberEUI64;
private multicast: Multicast;
private waitress: Waitress<EmberFrame, EmberWaitressMatcher>;
public queue: Queue;
private transactionID = 1;
private port: string;
/* eslint-disable-next-line @typescript-eslint/no-explicit-any*/
private serialOpt: Record<string, any>;

constructor() {
super();
this.queue = new Queue(8);

this.waitress = new Waitress<EmberFrame, EmberWaitressMatcher>(
this.waitressValidator, this.waitressTimeoutFormatter);
}
Expand Down
14 changes: 6 additions & 8 deletions src/adapter/ezsp/driver/multicast.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,13 @@ export class Multicast {

/* eslint-disable-next-line @typescript-eslint/no-explicit-any*/
async startup(enpoints: Array<any>): Promise<void> {
return this.driver.queue.execute<void>(async () => {
await this._initialize();
for (const ep of enpoints) {
if (!ep.id) continue;
for (const group_id of ep.member_of) {
await this.subscribe(group_id, ep.id);
}
await this._initialize();
for (const ep of enpoints) {
if (!ep.id) continue;
for (const group_id of ep.member_of) {
await this.subscribe(group_id, ep.id);
}
});
}
}

public async subscribe(group_id: number, endpoint: number): Promise<EmberStatus> {
Expand Down

0 comments on commit 04b5ebd

Please sign in to comment.