Skip to content

Commit

Permalink
Add optional eventStream
Browse files Browse the repository at this point in the history
  • Loading branch information
Hexagon committed Apr 25, 2024
1 parent f9c9a00 commit 44e40cd
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 6 deletions.
10 changes: 9 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,15 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to
[Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
## [1.0.1] - 2024-04-25

### Added

- Add optional constructor parameter `eventStream` defaulting to false. Add
`.on()` and `.off()` for subscribing to pup events through WebSocketStreams if
eventStream is enabled.

## [1.0.0] - 2024-04-23

### Added

Expand Down
5 changes: 3 additions & 2 deletions deno.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@pup/api-client",
"version": "1.0.0",
"version": "1.0.1",
"exports": {
".": "./mod.ts"
},
Expand All @@ -12,6 +12,7 @@
"example"
],
"imports": {
"@pup/api-definitions": "jsr:@pup/api-definitions@^1.0.0"
"@pup/api-definitions": "jsr:@pup/api-definitions@^1.0.0",
"@pup/common": "jsr:@pup/common@^1.0.2"
}
}
90 changes: 89 additions & 1 deletion src/pup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
* @license MIT
*/

import { EventEmitter, type EventHandler } from "@pup/common/eventemitter";

import { type ApiResponse, RestClient } from "./rest.ts";

import type {
Expand All @@ -15,14 +17,37 @@ import type {
} from "@pup/api-definitions";

export class PupRestClient extends RestClient {
constructor(baseUrl: string, jwtSecret: string) {
private wsStream?: WebSocketStream;
private events: EventEmitter = new EventEmitter();

/**
* Constructs a new PupRestClient instance.
*
* @param baseUrl - The base URL of the Pup API.
* @param jwtSecret - The JWT secret used for authentication.
* @param eventStream - Optional flag to enable real-time updates via WebSocket (defaults to false).
*/
constructor(
baseUrl: string,
jwtSecret: string,
eventStream: boolean = false,
) {
super(baseUrl, jwtSecret);
if (eventStream) {
this.setupEventStream();
}
}

/**
* Retrieves a list of processes from the Pup API.
*/
async getProcesses(): Promise<ApiResponse<ApiProcessData[]>> {
return await this.get("/processes");
}

/**
* Fetches the current application state of the Pup system, including memory usage, cpu usage, version numbers etc.
*/
async getState(): Promise<ApiResponse<ApiApplicationState>> {
return await this.get("/state");
}
Expand All @@ -47,16 +72,34 @@ export class PupRestClient extends RestClient {
return await this.post(`/processes/${processId}/unblock`);
}

/**
* Transmits telemetry for a client process to Pup.
* - Should only be used by the Telemetry-component.
*
* @param telemetryData - The ApiTelemetryData object to send.
* @returns A Promise that resolves to an ApiResponse (no data expected).
*/
async sendTelemetry(
telemetryData: ApiTelemetryData,
): Promise<ApiResponse<void>> {
return await this.post("/telemetry", telemetryData);
}

/**
* Terminates Pup completely
*/
async terminate(): Promise<ApiResponse<void>> {
return await this.post("/terminate");
}

/**
* Sends a structured log message to the Pup API.
*
* @param severity - The severity level of the log message (e.g., "debug", "info", "warn", "error").
* @param plugin - The name of the plugin or subsystem that generated the log message.
* @param message - The text content of the log message.
* @returns A Promise that resolves to an ApiResponse (likely indicating success or failure).
*/
async sendLog(
severity: string,
plugin: string,
Expand All @@ -65,6 +108,16 @@ export class PupRestClient extends RestClient {
return await this.post("/log", { severity, plugin, message });
}

/**
* Retrieves logs from Pup with optional filtering.
*
* @param processId - Optional ID of the process to filter logs by.
* @param startTimeStamp - Optional timestamp for filtering logs by start time.
* @param endTimeStamp - Optional timestamp for filtering logs by end time.
* @param severity - Optional severity level to filter logs by.
* @param nRows - Optional maximum number of log entries to return.
* @returns A Promise that resolves to an ApiResponse containing an array of ApiLogItem objects.
*/
async getLogs(
processId?: string,
startTimeStamp?: number,
Expand All @@ -87,4 +140,39 @@ export class PupRestClient extends RestClient {

return await this.get(`/logs?${queryParams.toString()}`); // Send query parameters
}

private async setupEventStream() {
const wsUrl = `${this.baseUrl.replace("http", "ws")}/wss`;
this.wsStream = new WebSocketStream(wsUrl, {
headers: {
"Authorization": `Bearer ${this.token}`,
},
});
const { readable } = await this.wsStream.opened;
const reader = readable.getReader();
while (true) {
const { value, done } = await reader.read();
if (done) {
break;
}
try {
const v = JSON.parse(value.toString());
this.events.emit(v.t, v.d);
} catch (_e) { /* Ignore */ }
}
return;
}

on(event: string, fn: EventHandler<unknown>) {
this.events.on(event, fn);
}

off(event: string, fn: EventHandler<unknown>) {
this.events.off(event, fn);
}

close() {
this.events.close();
this.wsStream?.close();
}
}
4 changes: 2 additions & 2 deletions src/rest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ class RestAPIError extends Error {
}

export class RestClient {
private baseUrl: string; // Declare the types
private token: string;
public baseUrl: string; // Declare the types
public token: string;

constructor(baseUrl: string, token: string) {
this.baseUrl = baseUrl;
Expand Down

0 comments on commit 44e40cd

Please sign in to comment.