Skip to content

Commit

Permalink
fix(ignore): Cleanup request queue (Koenkk#826)
Browse files Browse the repository at this point in the history
* Cleanup: variable rename

* Cleanup: move requestQueue to separate file, rename variables

* Adapt data types in tests
  • Loading branch information
slugzero authored Dec 10, 2023
1 parent 435ed7a commit db1545b
Show file tree
Hide file tree
Showing 4 changed files with 243 additions and 188 deletions.
40 changes: 20 additions & 20 deletions src/controller/helpers/request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,57 +30,57 @@ class Request<Type = any> {
0x16: 'immediate', // Discover Attributes Extended Response
};

private _func: (frame: Zcl.ZclFrame) => Promise<Type>;
private func: (frame: Zcl.ZclFrame) => Promise<Type>;
frame: Zcl.ZclFrame;
expires: number;
sendPolicy: SendPolicy;
sendWhen: SendRequestWhen;
private _resolveQueue: Array<(value: Type) => void>;
private _rejectQueue: Array <(error: Error) => void>;
private _lastError: Error;
private resolveQueue: Array<(value: Type) => void>;
private rejectQueue: Array <(error: Error) => void>;
private lastError: Error;
constructor (func: (frame: Zcl.ZclFrame) => Promise<Type>, frame: Zcl.ZclFrame, timeout: number,
sendWhen?: SendRequestWhen, sendPolicy?: SendPolicy, lastError?: Error,
resolve?:(value: Type) => void, reject?: (error: Error) => void) {
this._func = func;
this.func = func;
this.frame = frame;
this.sendWhen = sendWhen ?? 'active',
this.expires = timeout + Date.now();
this.sendPolicy = sendPolicy ?? (typeof frame.getCommand !== 'function' ?
undefined : Request.defaultSendPolicy[frame.getCommand().ID]);
this._resolveQueue = resolve === undefined ?
this.resolveQueue = resolve === undefined ?
new Array<(value: Type) => void>() : new Array<(value: Type) => void>(resolve);
this._rejectQueue = reject === undefined ?
this.rejectQueue = reject === undefined ?
new Array<(error: Error) => void>() : new Array<(error: Error) => void>(reject);
this._lastError = lastError ?? Error("Request rejected before first send");
this.lastError = lastError ?? Error("Request rejected before first send");
}

moveCallbacks(from: Request <Type>) : void {
this._resolveQueue = this._resolveQueue.concat(from._resolveQueue);
this._rejectQueue = this._rejectQueue.concat(from._rejectQueue);
from._resolveQueue.length = 0;
from._rejectQueue.length = 0;
this.resolveQueue = this.resolveQueue.concat(from.resolveQueue);
this.rejectQueue = this.rejectQueue.concat(from.rejectQueue);
from.resolveQueue.length = 0;
from.rejectQueue.length = 0;
}

addCallbacks(resolve: (value: Type) => void, reject: (error: Error) => void): void {
this._resolveQueue.push(resolve);
this._rejectQueue.push(reject);
this.resolveQueue.push(resolve);
this.rejectQueue.push(reject);
}

reject(error?: Error): void {
this._rejectQueue.forEach(el => el(error ?? this._lastError));
this._rejectQueue.length = 0;
this.rejectQueue.forEach(el => el(error ?? this.lastError));
this.rejectQueue.length = 0;
}

resolve(value: Type): void {
this._resolveQueue.forEach(el => el(value));
this._resolveQueue.length = 0;
this.resolveQueue.forEach(el => el(value));
this.resolveQueue.length = 0;
}

async send(): Promise<Type> {
try {
return await this._func(this.frame);
return await this.func(this.frame);
} catch (error) {
this._lastError = error;
this.lastError = error;
throw (error);
}
}
Expand Down
130 changes: 130 additions & 0 deletions src/controller/helpers/requestQueue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import * as Zcl from '../../zcl';
import {Endpoint} from '../model';
import Request from './request';
import Debug from "debug";

const debug = {
info: Debug('zigbee-herdsman:helpers:requestQueue'),
error: Debug('zigbee-herdsman:helpers:requestQeue'),
};

type Mutable<T> = { -readonly [P in keyof T ]: T[P] };


class RequestQueue extends Set<Request> {

private sendInProgress: boolean;
private ID: number;
private deviceIeeeAddress: string;

constructor (endpoint: Endpoint)
{
super();
this.sendInProgress = false;
this.ID = endpoint.ID;
this.deviceIeeeAddress = endpoint.deviceIeeeAddress;
}


public async send(fastPolling: boolean): Promise<void> {
if (this.size === 0) return;

if (!fastPolling && this.sendInProgress) {
debug.info(`Request Queue (${this.deviceIeeeAddress}/${this.ID}): sendPendingRequests already in progress`);
return;
}
this.sendInProgress = true;

// Remove expired requests first
const now = Date.now();
for (const request of this) {
if (now > request.expires) {
debug.info(`Request Queue (${this.deviceIeeeAddress}/${this.ID}): discard after timeout. ` +
`Size before: ${this.size}`);
request.reject();
this.delete(request);
}
}

debug.info(`Request Queue (${this.deviceIeeeAddress}/${this.ID}): send pending requests (` +
`${this.size}, ${fastPolling})`);

for (const request of this) {
if (fastPolling || (request.sendWhen !== 'fastpoll' && request.sendPolicy !== 'bulk')) {
try {
const result = await request.send();
debug.info(`Request Queue (${this.deviceIeeeAddress}/${this.ID}): send success`);
request.resolve(result);
this.delete(request);
} catch (error) {
debug.info(`Request Queue (${this.deviceIeeeAddress}/${this.ID}): send failed, expires in ` +
`${(request.expires - now) / 1000} seconds`);
}
}
}
this.sendInProgress = false;
}

public async queue<Type>(request: Request<Type>): Promise<Type> {
debug.info(`Request Queue (${this.deviceIeeeAddress}/${this.ID}): Sending when active. ` +
`Expires: ${request.expires}`);
return new Promise((resolve, reject): void => {
request.addCallbacks(resolve, reject);
this.add(request);
});
}

public filter(newRequest: Request): void {

if(this.size === 0 || !(typeof newRequest.frame.getCommand === 'function')) {
return;
}
const clusterID = newRequest.frame.Cluster.ID;
const payload = newRequest.frame.Payload;
const commandID = newRequest.frame.getCommand().ID;

debug.info(`Request Queue (${this.deviceIeeeAddress}/${this.ID}): ZCL ${newRequest.frame.getCommand().name} ` +
`command, filter requests. Before: ${this.size}`);

for (const request of this) {
if( request?.frame?.Cluster?.ID === undefined || typeof request.frame.getCommand !== 'function') {
continue;
}
if (['bulk', 'queue', 'immediate'].includes(request.sendPolicy)) {
continue;
}
/* istanbul ignore else */
if(request.frame.Cluster.ID === clusterID && request.frame.getCommand().ID === commandID) {
/* istanbul ignore else */
if (newRequest.sendPolicy === 'keep-payload'
&& JSON.stringify(request.frame.Payload) === JSON.stringify(payload)) {
debug.info(`Request Queue (${this.deviceIeeeAddress}/${this.ID}): Merge duplicate request`);
this.delete(request);
newRequest.moveCallbacks(request);
}
else if ((newRequest.sendPolicy === 'keep-command' || newRequest.sendPolicy === 'keep-cmd-undiv') &&
Array.isArray(request.frame.Payload)) {
const filteredPayload = request.frame.Payload.filter((oldEl: {attrId: number}) =>
!payload.find((newEl: {attrId: number}) => oldEl.attrId === newEl.attrId));
if (filteredPayload.length == 0) {
debug.info(`Request Queue (${this.deviceIeeeAddress}/${this.ID}): Remove & reject request`);
if( JSON.stringify(request.frame.Payload) === JSON.stringify(payload)) {
newRequest.moveCallbacks(request);
} else {
request.reject();
}
this.delete(request);
} else if (newRequest.sendPolicy !== 'keep-cmd-undiv') {
// remove all duplicate attributes if we shall not write undivided
(request.frame as Mutable<Zcl.ZclFrame>).Payload = filteredPayload;
debug.info(`Request Queue (${this.deviceIeeeAddress}/${this.ID}): `
+ `Remove commands from request`);
}
}
}
}
debug.info(`Request Queue (${this.deviceIeeeAddress}/${this.ID}): After: ${this.size}`);
}
}

export default RequestQueue;
119 changes: 10 additions & 109 deletions src/controller/model/endpoint.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import * as Zcl from '../../zcl';
import ZclTransactionSequenceNumber from '../helpers/zclTransactionSequenceNumber';
import * as ZclFrameConverter from '../helpers/zclFrameConverter';
import Request from '../helpers/request';
import RequestQueue from '../helpers/requestQueue';
import {Events as AdapterEvents} from '../../adapter';
import Group from './group';
import Device from './device';
Expand All @@ -15,8 +16,6 @@ const debug = {
error: Debug('zigbee-herdsman:controller:endpoint'),
};

type Mutable<T> = { -readonly [P in keyof T ]: T[P] };

export interface ConfigureReportingItem {
attribute: string | number | {ID: number; type: number};
minimumReportInterval: number;
Expand Down Expand Up @@ -87,8 +86,7 @@ class Endpoint extends Entity {
private _binds: BindInternal[];
private _configuredReportings: ConfiguredReportingInternal[];
public meta: KeyValue;
private pendingRequests: Set<Request>;
private sendInProgress: boolean;
private pendingRequests: RequestQueue;

// Getters/setters
get binds(): Bind[] {
Expand Down Expand Up @@ -154,8 +152,7 @@ class Endpoint extends Entity {
this._binds = binds;
this._configuredReportings = configuredReportings;
this.meta = meta;
this.pendingRequests = new Set<Request>;
this.sendInProgress =false;
this.pendingRequests = new RequestQueue(this);
}

/**
Expand Down Expand Up @@ -267,103 +264,7 @@ class Endpoint extends Entity {
}

public async sendPendingRequests(fastPolling: boolean): Promise<void> {

if (this.pendingRequests.size === 0) return;

if (!fastPolling && this.sendInProgress) {
debug.info(`Request Queue (${this.deviceIeeeAddress}/${this.ID}): sendPendingRequests already in progress`);
return;
}
this.sendInProgress = true;

// Remove expired requests first
const now = Date.now();
for (const request of this.pendingRequests) {
if (now > request.expires) {
debug.info(`Request Queue (${this.deviceIeeeAddress}/${this.ID}): discard after timeout. ` +
`Size before: ${this.pendingRequests.size}`);
request.reject();
this.pendingRequests.delete(request);
}
}

debug.info(`Request Queue (${this.deviceIeeeAddress}/${this.ID}): send pending requests (` +
`${this.pendingRequests.size}, ${fastPolling})`);

for (const request of this.pendingRequests) {
if (fastPolling || (request.sendWhen !== 'fastpoll' && request.sendPolicy !== 'bulk')) {
try {
const result = await request.send();
debug.info(`Request Queue (${this.deviceIeeeAddress}/${this.ID}): send success`);
request.resolve(result);
this.pendingRequests.delete(request);
} catch (error) {
debug.info(`Request Queue (${this.deviceIeeeAddress}/${this.ID}): send failed, expires in ` +
`${(request.expires - now) / 1000} seconds`);
}
}
}
this.sendInProgress = false;
}

private async queueRequest<Type>(request: Request<Type>): Promise<Type> {
debug.info(`Request Queue (${this.deviceIeeeAddress}/${this.ID}): Sending when active. ` +
`Timeout ${this.getDevice().pendingRequestTimeout/1000} seconds`);
return new Promise((resolve, reject): void => {
request.addCallbacks(resolve, reject);
this.pendingRequests.add(request);
});
}
private filterRequests(newRequest: Request): void {

if(this.pendingRequests.size === 0 || !(typeof newRequest.frame.getCommand === 'function')) {
return;
}
const clusterID = newRequest.frame.Cluster.ID;
const payload = newRequest.frame.Payload;
const commandID = newRequest.frame.getCommand().ID;

debug.info(`Request Queue (${this.deviceIeeeAddress}/${this.ID}): ZCL ${newRequest.frame.getCommand().name} ` +
`command, filter requests. Before: ${this.pendingRequests.size}`);

for (const request of this.pendingRequests) {
if( request?.frame?.Cluster?.ID === undefined || typeof request.frame.getCommand !== 'function') {
continue;
}
if (['bulk', 'queue', 'immediate'].includes(request.sendPolicy)) {
continue;
}
/* istanbul ignore else */
if(request.frame.Cluster.ID === clusterID && request.frame.getCommand().ID === commandID) {
/* istanbul ignore else */
if (newRequest.sendPolicy === 'keep-payload'
&& JSON.stringify(request.frame.Payload) === JSON.stringify(payload)) {
debug.info(`Request Queue (${this.deviceIeeeAddress}/${this.ID}): Merge duplicate request`);
this.pendingRequests.delete(request);
newRequest.moveCallbacks(request);
}
else if ((newRequest.sendPolicy === 'keep-command' || newRequest.sendPolicy === 'keep-cmd-undiv') &&
Array.isArray(request.frame.Payload)) {
const filteredPayload = request.frame.Payload.filter((oldEl: {attrId: number}) =>
!payload.find((newEl: {attrId: number}) => oldEl.attrId === newEl.attrId));
if (filteredPayload.length == 0) {
debug.info(`Request Queue (${this.deviceIeeeAddress}/${this.ID}): Remove & reject request`);
if( JSON.stringify(request.frame.Payload) === JSON.stringify(payload)) {
newRequest.moveCallbacks(request);
} else {
request.reject();
}
this.pendingRequests.delete(request);
} else if (newRequest.sendPolicy !== 'keep-cmd-undiv') {
// remove all duplicate attributes if we shall not write undivided
(request.frame as Mutable<Zcl.ZclFrame>).Payload = filteredPayload;
debug.info(`Request Queue (${this.deviceIeeeAddress}/${this.ID}): `
+ `Remove commands from request`);
}
}
}
}
debug.info(`Request Queue (${this.deviceIeeeAddress}/${this.ID}): After: ${this.pendingRequests.size}`);
return this.pendingRequests.send(fastPolling);
}

private async sendRequest(frame: Zcl.ZclFrame, options: Options): Promise<AdapterEvents.ZclDataPayload>;
Expand All @@ -381,7 +282,7 @@ class Endpoint extends Entity {

if (request.sendPolicy !== 'bulk') {
// Check if such a request is already in the queue and remove the old one(s) if necessary
this.filterRequests(request);
this.pendingRequests.filter(request);
}

// send without queueing if sendWhen or sendPolicy is 'immediate' or if the device has no timeout set
Expand All @@ -396,8 +297,8 @@ class Endpoint extends Entity {
}
// If this is a bulk message, we queue directly.
if (request.sendPolicy === 'bulk') {
debug.info(logPrefix + `queue request (${this.pendingRequests.size} / ${this.sendInProgress})))`);
return this.queueRequest(request);
debug.info(logPrefix + `queue request (${this.pendingRequests.size})))`);
return this.pendingRequests.queue(request);
}

try {
Expand All @@ -407,7 +308,7 @@ class Endpoint extends Entity {
// If we got a failed transaction, the device is likely sleeping.
// Queue for transmission later.
debug.info(logPrefix + `queue request (transaction failed)`);
return this.queueRequest(request);
return this.pendingRequests.queue(request);
}
}

Expand Down Expand Up @@ -511,7 +412,7 @@ class Endpoint extends Entity {
payload.push({attrId: Number(nameOrID), status: value.status});
} else {
throw new Error(`Unknown attribute '${nameOrID}', specify either an existing attribute or a number`);
}
}
} else {
throw new Error(`Missing attribute 'status'`);
}
Expand All @@ -534,7 +435,7 @@ class Endpoint extends Entity {
throw error;
}
}

public async read(
clusterKey: number | string, attributes: (string | number)[], options?: Options
): Promise<KeyValue> {
Expand Down
Loading

0 comments on commit db1545b

Please sign in to comment.