Skip to content

Commit

Permalink
in progress...
Browse files Browse the repository at this point in the history
  • Loading branch information
ralfaron committed Oct 23, 2024
1 parent e774f57 commit 98c11f3
Show file tree
Hide file tree
Showing 9 changed files with 62 additions and 157 deletions.
66 changes: 25 additions & 41 deletions projects/aas-server/src/app/aas-provider/aas-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import {

import { ImageProcessing } from '../image-processing.js';
import { AASIndex } from '../aas-index/aas-index.js';
import { ScanResultType, ScanResult, ScanEndpointResult, ScanNextPageResult } from './scan-result.js';
import { ScanResultType, ScanResult, ScanEndpointResult } from './scan-result.js';
import { Logger } from '../logging/logger.js';
import { Parallel } from './parallel.js';
import { ScanEndpointData } from './worker-data.js';
Expand Down Expand Up @@ -61,7 +61,6 @@ export class AASProvider {
this.timeout = variable.SCAN_CONTAINER_TIMEOUT;
this.parallel.on('message', this.parallelOnMessage);
this.parallel.on('end', this.parallelOnEnd);
this.parallel.on('nextPage', this.parallelOnNextPage);
}

/**
Expand Down Expand Up @@ -471,25 +470,41 @@ export class AASProvider {
}
};

private notify(data: AASServerMessage): void {
this.wsServer.notify('IndexChange', {
type: 'AASServerMessage',
data: data,
});
}

private scanEndpoint = async (taskId: number, endpoint: AASEndpoint) => {
const result = await this.index.nextPage(endpoint.name, undefined);
const data: ScanEndpointData = {
type: 'ScanEndpointData',
taskId,
endpoint,
documents: result.result,
};

this.taskHandler.set(taskId, { endpointName: endpoint.name, owner: this, type: 'ScanEndpoint' });
this.parallel.execute(data);
};

private notify(data: AASServerMessage): void {
this.wsServer.notify('IndexChange', {
type: 'AASServerMessage',
data: data,
});
}
private parallelOnMessage = async (result: ScanEndpointResult) => {
try {
switch (result.type) {
case ScanResultType.Changed:
await this.onChanged(result);
break;
case ScanResultType.Added:
await this.onAdded(result);
break;
case ScanResultType.Removed:
await this.onRemoved(result);
break;
}
} catch (error) {
this.logger.error(error);
}
};

private parallelOnEnd = async (result: ScanResult) => {
const task = this.taskHandler.get(result.taskId);
Expand All @@ -514,37 +529,6 @@ export class AASProvider {
}
};

private parallelOnMessage = async (result: ScanEndpointResult) => {
try {
switch (result.type) {
case ScanResultType.Changed:
await this.onChanged(result);
break;
case ScanResultType.Added:
await this.onAdded(result);
break;
case ScanResultType.Removed:
await this.onRemoved(result);
break;
}
} catch (error) {
this.logger.error(error);
}
};

private parallelOnNextPage = async (result: ScanNextPageResult, worker: Worker) => {
const a = await this.index.nextPage(result.endpoint.name, result.cursor);
const data: ScanEndpointData = {
type: 'ScanEndpointData',
taskId: result.taskId,
endpoint: result.endpoint,
documents: a.result,
cursor: a.paging_metadata.cursor,
};

worker.postMessage(data);
};

private async onChanged(result: ScanEndpointResult): Promise<void> {
const document = result.document;
if ((await this.index.hasEndpoint(document.endpoint)) === false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,12 @@
*****************************************************************************/

import EventEmitter from 'events';
import { AASDocument } from 'aas-core';
import { PagedResult } from '../types/paged-result.js';

/** Defines an automate to scan an AAS resource for Asset Administration Shells. */
export abstract class AASResourceScan extends EventEmitter {
/**
* Gets all documents of the current container.
* @param cursor ToDo.
* */
public abstract scanAsync(cursor?: string): Promise<PagedResult<AASDocument>>;
public abstract scanAsync(): Promise<void>;
}
16 changes: 11 additions & 5 deletions projects/aas-server/src/app/aas-provider/aas-server-scan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,34 @@ import { Logger } from '../logging/logger.js';
import { AASApiClient } from '../packages/aas-server/aas-api-client.js';
import { AASServerPackage } from '../packages/aas-server/aas-server-package.js';
import { AASResourceScan } from './aas-resource-scan.js';
import { PagedResult } from '../types/paged-result.js';

export class AASServerScan extends AASResourceScan {
private readonly logger: Logger;
private readonly server: AASApiClient;

private static set = new Set<string>();

public constructor(logger: Logger, server: AASApiClient) {
super();

this.logger = logger;
this.server = server;
}

public async scanAsync(cursor?: string): Promise<PagedResult<AASDocument>> {
public async scanAsync(): Promise<void> {
try {
await this.server.openAsync();

const documents: AASDocument[] = [];
const result = await this.server.getShellsAsync(cursor);
const result = await this.server.getShellsAsync();
const ids = new Set(result.result);
for (const id of ids) {
if (AASServerScan.set.has(id)) {
AASServerScan.set.delete(id);
} else {
AASServerScan.set.add(id);
}

try {
const aasPackage = new AASServerPackage(this.logger, this.server, id);
const document = await aasPackage.createDocumentAsync();
Expand All @@ -40,8 +48,6 @@ export class AASServerScan extends AASResourceScan {
this.emit('error', error, this.server, id);
}
}

return { result: documents, paging_metadata: { cursor: result.paging_metadata.cursor } };
} finally {
await this.server.closeAsync();
}
Expand Down
5 changes: 1 addition & 4 deletions projects/aas-server/src/app/aas-provider/directory-scan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import { Logger } from '../logging/logger.js';
import { AasxPackage } from '../packages/file-system/aasx-package.js';
import { AasxDirectory } from '../packages/file-system/aasx-directory.js';
import { AASResourceScan } from './aas-resource-scan.js';
import { PagedResult } from '../types/paged-result.js';

export class DirectoryScan extends AASResourceScan {
public constructor(
Expand All @@ -21,7 +20,7 @@ export class DirectoryScan extends AASResourceScan {
super();
}

public async scanAsync(cursor?: string): Promise<PagedResult<AASDocument>> {
public async scanAsync(): Promise<void> {
try {
await this.source.openAsync();
const result = await this.source.getFiles(cursor);
Expand All @@ -36,8 +35,6 @@ export class DirectoryScan extends AASResourceScan {
this.emit('error', error, this.source, file);
}
}

return { result: documents, paging_metadata: { cursor: result.paging_metadata.cursor } };
} finally {
await this.source.closeAsync();
}
Expand Down
8 changes: 2 additions & 6 deletions projects/aas-server/src/app/aas-provider/opcua-server-scan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@
*****************************************************************************/

import { AttributeIds, BrowseDescriptionLike, QualifiedName, ReferenceDescription } from 'node-opcua';
import { AASDocument, noop } from 'aas-core';
import { AASDocument } from 'aas-core';
import { Logger } from '../logging/logger.js';
import { OpcuaDataTypeDictionary } from '../packages/opcua/opcua-data-type-dictionary.js';
import { OpcuaClient } from '../packages/opcua/opcua-client.js';
import { OpcuaPackage } from '../packages/opcua/opcua-package.js';
import { AASResourceScan } from './aas-resource-scan.js';
import { PagedResult } from '../types/paged-result.js';

export class OpcuaServerScan extends AASResourceScan {
private readonly logger: Logger;
Expand All @@ -26,9 +25,8 @@ export class OpcuaServerScan extends AASResourceScan {
this.server = server;
}

public async scanAsync(cursor?: string): Promise<PagedResult<AASDocument>> {
public async scanAsync(): Promise<void> {
try {
noop(cursor);
await this.server.openAsync();
const documents: AASDocument[] = [];
const dataTypes = new OpcuaDataTypeDictionary();
Expand All @@ -44,8 +42,6 @@ export class OpcuaServerScan extends AASResourceScan {
this.emit('error', error, this.server, nodeId);
}
}

return { result: documents, paging_metadata: {} };
} finally {
await this.server.closeAsync();
}
Expand Down
8 changes: 0 additions & 8 deletions projects/aas-server/src/app/aas-provider/scan-result.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,10 @@ export interface ScanResult {
/** The result of an endpoint scan. */
export interface ScanEndpointResult extends ScanResult {
endpoint: AASEndpoint;
documents: AASDocument[];
cursor?: string;
document: AASDocument;
}

/** The result of a template scan. */
export interface ScanTemplatesResult extends ScanResult {
templates: TemplateDescriptor[];
}

/** The result for a next page request. */
export interface ScanNextPageResult extends ScanResult {
endpoint: AASEndpoint;
cursor: string;
}
4 changes: 1 addition & 3 deletions projects/aas-server/src/app/aas-provider/worker-data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
*
*****************************************************************************/

import { AASDocument, AASEndpoint } from 'aas-core';
import { AASEndpoint } from 'aas-core';

export interface WorkerData {
taskId: number;
Expand All @@ -16,8 +16,6 @@ export interface WorkerData {
export interface ScanEndpointData extends WorkerData {
type: 'ScanEndpointData';
endpoint: AASEndpoint;
documents: AASDocument[];
cursor?: string;
}

export interface ScanTemplatesData extends WorkerData {
Expand Down
46 changes: 12 additions & 34 deletions projects/aas-server/src/app/endpoint-scan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,42 +26,26 @@ export class EndpointScan {
@inject(Variable) private readonly variable: Variable,
) {}

public async scanAsync(data: ScanEndpointData): Promise<string | undefined> {
public async scanAsync(data: ScanEndpointData): Promise<void> {
this.data = data;
const scan = this.resourceScanFactory.create(data.endpoint);
try {
scan.on('scanned', this.onDocumentScanned);
scan.on('compare', this.compare);
scan.on('removed', this.removed);
scan.on('error', this.onError);
const result = await scan.scanAsync(data.cursor);
this.computeDeleted(result.result);
const result = await scan.scanAsync(data);
// this.computeDeleted(result.result);
return result.paging_metadata.cursor;
} finally {
scan.off('scanned', this.onDocumentScanned);
scan.off('compare', this.compare);
scan.off('removed', this.removed);
scan.off('error', this.onError);
}
}

private computeDeleted(documents: AASDocument[]): void {
if (this.data.documents === undefined) {
return;
}

const current = new Map<string, AASDocument>(documents.map(item => [item.id, item]));
for (const document of this.data.documents) {
if (!current.has(document.id)) {
this.postDeleted(document);
}
}
}

private onDocumentScanned = (document: AASDocument): void => {
const reference = this.data.documents?.find(item => item.id === document.id);
if (reference) {
if (this.documentChanged(document, reference)) {
this.postChanged(document);
}
} else {
this.postAdded(document);
private compare = (reference: AASDocument, document: AASDocument): void => {
if (this.documentChanged(document, reference)) {
this.postChanged(document);
}
};

Expand All @@ -74,36 +58,30 @@ export class EndpointScan {
taskId: this.data.taskId,
type: ScanResultType.Changed,
endpoint: this.data.endpoint,
documents: this.data.documents,
cursor: this.data.cursor,
document: document,
};

const array = toUint8Array(value);
parentPort?.postMessage(array, [array.buffer]);
}

private postDeleted(document: AASDocument): void {
private removed = (document: AASDocument): void => {
const value: ScanEndpointResult = {
taskId: this.data.taskId,
type: ScanResultType.Removed,
endpoint: this.data.endpoint,
documents: this.data.documents,
cursor: this.data.cursor,
document: document,
};

const array = toUint8Array(value);
parentPort?.postMessage(array, [array.buffer]);
}
};

private postAdded(document: AASDocument): void {
const value: ScanEndpointResult = {
taskId: this.data.taskId,
type: ScanResultType.Added,
endpoint: this.data.endpoint,
documents: this.data.documents,
cursor: this.data.cursor,
document: document,
};

Expand Down
Loading

0 comments on commit 98c11f3

Please sign in to comment.