Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Request Queue Behavior #2 #700

Merged
merged 4 commits into from
Apr 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions src/controller/helpers/request.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import {SendRequestWhen, SendPolicy} from '../tstype';
import * as Zcl from '../../zcl';

/* eslint-disable-next-line @typescript-eslint/no-explicit-any*/
class Request<Type = any> {

static defaultSendPolicy: {[key: number]: SendPolicy} = {
0x00: 'keep-payload', // Read Attributes
0x01: 'immediate', // Read Attributes Response
0x02: 'keep-command', // Write Attributes
0x03: 'keep-cmd-undiv', // Write Attributes Undivided
0x04: 'immediate', // Write Attributes Response
0x05: 'keep-command', // Write Attributes No Response
0x06: 'keep-payload', // Configure Reporting
0x07: 'immediate', // Configure Reporting Response
0x08: 'keep-payload', // Read Reporting Configuration
0x09: 'immediate', // Read Reporting Configuration Response
0x0a: 'keep-payload', // Report attributes
0x0b: 'immediate', // Default Response
0x0c: 'keep-payload', // Discover Attributes
0x0d: 'immediate', // Discover Attributes Response
0x0e: 'keep-payload', // Read Attributes Structured
0x0f: 'keep-payload', // Write Attributes Structured
0x10: 'immediate', // Write Attributes Structured response
0x11: 'keep-payload', // Discover Commands Received
0x12: 'immediate', // Discover Commands Received Response
0x13: 'keep-payload', // Discover Commands Generated
0x14: 'immediate', // Discover Commands Generated Response
0x15: 'keep-payload', // Discover Attributes Extended
0x16: 'immediate', // Discover Attributes Extended Response
};

private _func: (frame: Zcl.ZclFrame) => Promise<Type>;
frame: Zcl.ZclFrame;
expires: number;
sendPolicy: SendPolicy;
sendWhen: SendRequestWhen;
private _resolveQueue: Array<(value: Type) => void>;
private _rejectQueue: Array <(error: Error) => void>;
private _lastError: Error;
constructor (func: (frame: Zcl.ZclFrame) => Promise<Type>, frame: Zcl.ZclFrame, timeout: number,
sendWhen?: SendRequestWhen, sendPolicy?: SendPolicy, lastError?: Error,
resolve?:(value: Type) => void, reject?: (error: Error) => void) {
this._func = func;
this.frame = frame;
this.sendWhen = sendWhen ?? 'active',
this.expires = timeout + Date.now();
this.sendPolicy = sendPolicy ?? (typeof frame.getCommand !== 'function' ?
undefined : Request.defaultSendPolicy[frame.getCommand().ID]);
this._resolveQueue = resolve === undefined ?
new Array<(value: Type) => void>() : new Array<(value: Type) => void>(resolve);
this._rejectQueue = reject === undefined ?
new Array<(error: Error) => void>() : new Array<(error: Error) => void>(reject);
this._lastError = lastError ?? Error("Request rejected before first send");
}

addCallbacks(resolve: (value: Type) => void, reject: (error: Error) => void): void {
this._resolveQueue.push(resolve);
this._rejectQueue.push(reject);
}

reject(error?: Error): void {
this._rejectQueue.forEach(el => el(error ?? this._lastError));
this._rejectQueue.length = 0;
}

resolve(value: Type): void {
this._resolveQueue.forEach(el => el(value));
this._resolveQueue.length = 0;
}

async send(): Promise<Type> {
try {
return await this._func(this.frame);
} catch (error) {
this._lastError = error;
throw (error);
}
}
}

export default Request;
97 changes: 41 additions & 56 deletions src/controller/model/endpoint.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {KeyValue, SendRequestWhen} from '../tstype';
import * as Zcl from '../../zcl';
import ZclTransactionSequenceNumber from '../helpers/zclTransactionSequenceNumber';
import * as ZclFrameConverter from '../helpers/zclFrameConverter';
import Request from '../helpers/request';
import {Events as AdapterEvents} from '../../adapter';
import Group from './group';
import Device from './device';
Expand Down Expand Up @@ -70,12 +71,6 @@ interface ConfiguredReporting {
reportableChange: number,
}

interface PendingRequest {
/* eslint-disable-next-line @typescript-eslint/no-explicit-any*/
func: () => Promise<any>, resolve: (value: any) => any, reject: (error: any) => any,
sendWhen: 'active' | 'fastpoll', expires: number, lastError: Error
}

class Endpoint extends Entity {
public deviceID?: number;
public inputClusters: number[];
Expand All @@ -88,7 +83,7 @@ class Endpoint extends Entity {
private _binds: BindInternal[];
private _configuredReportings: ConfiguredReportingInternal[];
public meta: KeyValue;
private pendingRequests: PendingRequest[];
private pendingRequests: Set<Request>;
private sendInProgress: boolean;

// Getters/setters
Expand Down Expand Up @@ -155,7 +150,8 @@ class Endpoint extends Entity {
this._binds = binds;
this._configuredReportings = configuredReportings;
this.meta = meta;
this.pendingRequests = [];
this.pendingRequests = new Set<Request>;
this.sendInProgress =false;
}

/**
Expand Down Expand Up @@ -263,12 +259,12 @@ class Endpoint extends Entity {
}

public hasPendingRequests(): boolean {
return this.pendingRequests.length > 0;
return this.pendingRequests.size > 0;
}

public async sendPendingRequests(fastPolling: boolean): Promise<void> {

if (this.pendingRequests.length === 0) return;
if (this.pendingRequests.size === 0) return;

if (this.sendInProgress) {
debug.info(`Request Queue (${this.deviceIeeeAddress}/${this.ID}): sendPendingRequests already in progress`);
Expand All @@ -278,93 +274,82 @@ class Endpoint extends Entity {

// Remove expired requests first
const now = Date.now();
this.pendingRequests = this.pendingRequests.filter((request) => {
const isExpired = now > request.expires;
if (isExpired) {
for (const request of this.pendingRequests) {
if (now > request.expires) {
debug.info(`Request Queue (${this.deviceIeeeAddress}/${this.ID}): discard after timeout. ` +
`Size before: ${this.pendingRequests.length}`);
request.reject(request.lastError ?? new Error("Request timeout before first check-in"));
`Size before: ${this.pendingRequests.size}`);
request.reject();
this.pendingRequests.delete(request);
}
return !isExpired;
});
}

const postponedRequests = [];
debug.info(`Request Queue (${this.deviceIeeeAddress}/${this.ID}): send pending requests (` +
`${this.pendingRequests.length}, ${fastPolling})`);

// Elements can be added to the queue while send is in progress.
// Using a while loop ensures that newly added elements are also sent in the end
while (this.pendingRequests.length > 0) {
const request = this.pendingRequests.shift();
`${this.pendingRequests.size}, ${fastPolling})`);

for (const request of this.pendingRequests) {
if ((fastPolling && request.sendWhen == 'fastpoll') || request.sendWhen == 'active') {
try {
const result = await request.func();
const result = await request.send();
debug.info(`Request Queue (${this.deviceIeeeAddress}/${this.ID}): send success`);
request.resolve(result);
} catch (error) {
debug.error(`Request Queue (${this.deviceIeeeAddress}/${this.ID}): send failed, expires in ` +
`${(request.expires - now) / 1000} seconds`);
request.reject(error);
}
}
else {
postponedRequests.push(request);
this.pendingRequests.delete(request);
}
}
this.pendingRequests = postponedRequests;
this.sendInProgress = false;
}

private async queueRequest<Type>(func: () => Promise<Type>, sendWhen: 'active' | 'fastpoll',
lastError?: Error): Promise<Type> {
private async queueRequest<Type>(request: Request<Type>): Promise<Type> {
debug.info(`Request Queue (${this.deviceIeeeAddress}/${this.ID}): Sending when active. ` +
`Timeout ${this.getDevice().pendingRequestTimeout/1000} seconds`);
return new Promise((resolve, reject): void => {
// Remove request from queue after timeout
const expires = this.getDevice().pendingRequestTimeout + Date.now();
const request: PendingRequest = {func, resolve, reject, sendWhen, expires, lastError};
this.pendingRequests.push(request);
request.addCallbacks(resolve, reject);
this.pendingRequests.add(request);
});
}

private async sendRequest(data: Zcl.ZclFrame, options: Options): Promise<AdapterEvents.ZclDataPayload>;
private async sendRequest<Type>(
data: Zcl.ZclFrame, options: Options, func: () => Promise<Type>): Promise<Type>;
private async sendRequest<Type>(
data: Zcl.ZclFrame, options: Options, func: () => Promise<Type> = (): Promise<Type> => {
private async sendRequest(frame: Zcl.ZclFrame, options: Options): Promise<AdapterEvents.ZclDataPayload>;
private async sendRequest<Type>(frame: Zcl.ZclFrame, options: Options,
func: (frame: Zcl.ZclFrame) => Promise<Type>): Promise<Type>;
private async sendRequest<Type>(frame: Zcl.ZclFrame, options: Options,
func: (d: Zcl.ZclFrame) => Promise<Type> = (d: Zcl.ZclFrame): Promise<Type> => {
return Entity.adapter.sendZclFrameToEndpoint(
this.deviceIeeeAddress, this.deviceNetworkAddress, this.ID, data, options.timeout,
this.deviceIeeeAddress, this.deviceNetworkAddress, this.ID, d, options.timeout,
options.disableResponse, options.disableRecovery, options.srcEndpoint) as Promise<Type>;
}): Promise<Type> {
const logPrefix = `Request Queue (${this.deviceIeeeAddress}/${this.ID}): `;
const request = new Request(func, frame, this.getDevice().pendingRequestTimeout, options.sendWhen);

// If we already have something queued, we queue directly to avoid
// messing up the ordering too much.
if (options.sendWhen !== 'immediate' && (this.hasPendingRequests() || this.sendInProgress)) {
debug.info(logPrefix + `queue request (${this.pendingRequests.length} / ${this.sendInProgress})))`);
return this.queueRequest(func, options.sendWhen);
}

// send without queueing if sendWhen is 'immediate' or if this is a response
if (options.sendWhen === 'immediate' || (data.Header.frameControl.direction === Zcl.Direction.SERVER_TO_CLIENT))
{
if (options.sendWhen === 'immediate'
|| (frame.Header?.frameControl.direction === Zcl.Direction.SERVER_TO_CLIENT)) {
if (this.getDevice().defaultSendRequestWhen !=='immediate')
{
debug.info(logPrefix + `send ${data.getCommand().name} request, bypass queue on fail ` +
debug.info(logPrefix + `send ${frame.getCommand().name} request immediately ` +
`(sendWhen=${options.sendWhen})`);
}
return func();
return request.send();
}
// If we already have something queued, we queue directly to avoid
// messing up the ordering too much.
if (this.hasPendingRequests() || this.sendInProgress) {
debug.info(logPrefix + `queue request (${this.pendingRequests.size} / ${this.sendInProgress})))`);
return this.queueRequest(request);
}

try {
debug.info(logPrefix + `send request (queue empty)`);
return await func();
return await request.send();
} catch(error) {
// If we got a failed transaction, the device is likely sleeping.
// Queue for transmission later.
debug.info(logPrefix + `queue request (transaction failed)`);
return this.queueRequest(func, options.sendWhen, error);
return this.queueRequest(request);
}
}

Expand Down Expand Up @@ -820,13 +805,13 @@ class Endpoint extends Entity {
debug.info(log);

try {
await this.sendRequest(frame, options, async () => {
await this.sendRequest(frame, options, async (f) => {
// Broadcast Green Power responses
if (this.ID === 242) {
await Entity.adapter.sendZclFrameToAll(242, frame, 242);
await Entity.adapter.sendZclFrameToAll(242, f, 242);
} else {
await Entity.adapter.sendZclFrameToEndpoint(
this.deviceIeeeAddress, this.deviceNetworkAddress, this.ID, frame, options.timeout,
this.deviceIeeeAddress, this.deviceNetworkAddress, this.ID, f, options.timeout,
options.disableResponse, options.disableRecovery, options.srcEndpoint
);
}
Expand Down
19 changes: 18 additions & 1 deletion src/controller/tstype.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,23 @@
// eslint-disable-next-line
interface KeyValue {[s: string]: any};

/* Send request policies:
'bulk': Message must be sent together with other messages in the correct sequence.
No immediate delivery required.
'queue': Request shall be sent 'as-is' as soon as possible.
Multiple identical requests shall be delivered multiple times.
Not strict ordering required.
'immediate': Request shall be sent immediately and not be kept for later retries (e.g. response message).
'keep-payload': Request shall be sent as soon as possible.
If immediate delivery fails, the exact same payload is only sent once, even if there were
multiple requests.
'keep-command': Request shall be sent as soon as possible.
If immediate delivery fails, only the latest command for each command ID is kept for delivery.
'keep-cmd-undiv': Request shall be sent as soon as possible.
If immediate delivery fails, only the latest undivided set of commands is sent for each unique
set of command IDs.
*/
type SendPolicy = 'bulk' | 'queue' | 'immediate' | 'keep-payload' | 'keep-command' | 'keep-cmd-undiv';
type SendRequestWhen = 'immediate' | 'fastpoll' | 'active';
type DeviceType = 'Coordinator' | 'Router' | 'EndDevice' | 'Unknown' | 'GreenPower';

Expand All @@ -25,5 +42,5 @@ interface GreenPowerDeviceJoinedPayload {

export {
KeyValue, DatabaseEntry, EntityType, DeviceType, GreenPowerEvents, GreenPowerDeviceJoinedPayload,
SendRequestWhen
SendRequestWhen, SendPolicy
};
Loading