Skip to content

Commit

Permalink
delta compression
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia committed May 5, 2024
1 parent ab1a074 commit 15466d9
Show file tree
Hide file tree
Showing 8 changed files with 746 additions and 480 deletions.
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
"build-browser": "esbuild src/browser.ts --bundle --minify --sourcemap --outfile=dist/centrifuge.js",
"dev": "esbuild src/browser.ts --bundle --outfile=dist/centrifuge.js --servedir=dist/ --serve=2000",
"build-browser-protobuf": "esbuild src/browser.protobuf.ts --bundle --minify --sourcemap --outfile=dist/centrifuge.protobuf.js",
"dev-protobuf": "esbuild src/browser.protobuf.ts --bundle --outfile=dist/centrifuge.protobuf.js --servedir=dist/ --serve=2000",
"dev-protobuf": "esbuild src/browser.protobuf.ts --bundle --outfile=dist/centrifuge.protobuf.js --servedir=dist/ --serve=3000",
"proto": "./make-proto"
},
"devDependencies": {
Expand Down Expand Up @@ -87,4 +87,4 @@
"events": "^3.3.0",
"protobufjs": "^7.2.5"
}
}
}
12 changes: 12 additions & 0 deletions src/client.proto.json
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,10 @@
"keyType": "string",
"type": "string",
"id": 7
},
"delta": {
"type": "bool",
"id": 8
}
},
"reserved": [
Expand Down Expand Up @@ -566,6 +570,10 @@
"join_leave": {
"type": "bool",
"id": 11
},
"delta": {
"type": "string",
"id": 12
}
},
"reserved": [
Expand Down Expand Up @@ -621,6 +629,10 @@
"was_recovering": {
"type": "bool",
"id": 12
},
"delta": {
"type": "bool",
"id": 13
}
},
"reserved": [
Expand Down
192 changes: 192 additions & 0 deletions src/fossil.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
/*
Copyright 2014-2024 Dmitry Chestnykh (JavaScript port)
Copyright 2007 D. Richard Hipp (original C version)
Fossil SCM delta compression algorithm, this is only the applyDelta part extracted
from https://github.com/dchest/fossil-delta-js. The code was slightly modified
to strip unnecessary parts. The copyright on top of this file is from the original
repo on Github licensed under Simplified BSD License.
*/

// We accept plain arrays of bytes or Uint8Array.
type ByteArray = number[] | Uint8Array;


const zValue = [
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, -1, -1,
-1, -1, -1, -1, -1, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23,
24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, -1, -1, -1, -1, 36, -1, 37,
38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56,
57, 58, 59, 60, 61, 62, -1, -1, -1, 63, -1,
];

// Reader reads bytes, chars, ints from array.
class Reader {
public a: ByteArray;
public pos: number;

constructor(array: ByteArray) {
this.a = array; // source array
this.pos = 0; // current position in array
}

haveBytes() {
return this.pos < this.a.length;
}

getByte() {
const b = this.a[this.pos];
this.pos++;
if (this.pos > this.a.length) throw new RangeError("out of bounds");
return b;
}

getChar() {
return String.fromCharCode(this.getByte());
}

// Read base64-encoded unsigned integer.
getInt() {
let v = 0;
let c: number;
while (this.haveBytes() && (c = zValue[0x7f & this.getByte()]) >= 0) {
v = (v << 6) + c;
}
this.pos--;
return v >>> 0;
}
}

// Write writes an array.
class Writer {
private a: number[] = [];

toByteArray<T extends ByteArray>(sourceType: T): T {
if (Array.isArray(sourceType)) {
return this.a as T;
}
return new Uint8Array(this.a) as T;
}

// Copy from array at start to end.
putArray(a: ByteArray, start: number, end: number) {
this.a.push(...a.slice(start, end));
}
}

// Return a 32-bit checksum of the array.
function checksum(arr: ByteArray): number {
let sum0 = 0,
sum1 = 0,
sum2 = 0,
sum3 = 0,
z = 0,
N = arr.length;
//TODO measure if this unrolling is helpful.
while (N >= 16) {
sum0 = (sum0 + arr[z + 0]) | 0;
sum1 = (sum1 + arr[z + 1]) | 0;
sum2 = (sum2 + arr[z + 2]) | 0;
sum3 = (sum3 + arr[z + 3]) | 0;

sum0 = (sum0 + arr[z + 4]) | 0;
sum1 = (sum1 + arr[z + 5]) | 0;
sum2 = (sum2 + arr[z + 6]) | 0;
sum3 = (sum3 + arr[z + 7]) | 0;

sum0 = (sum0 + arr[z + 8]) | 0;
sum1 = (sum1 + arr[z + 9]) | 0;
sum2 = (sum2 + arr[z + 10]) | 0;
sum3 = (sum3 + arr[z + 11]) | 0;

sum0 = (sum0 + arr[z + 12]) | 0;
sum1 = (sum1 + arr[z + 13]) | 0;
sum2 = (sum2 + arr[z + 14]) | 0;
sum3 = (sum3 + arr[z + 15]) | 0;

z += 16;
N -= 16;
}
while (N >= 4) {
sum0 = (sum0 + arr[z + 0]) | 0;
sum1 = (sum1 + arr[z + 1]) | 0;
sum2 = (sum2 + arr[z + 2]) | 0;
sum3 = (sum3 + arr[z + 3]) | 0;
z += 4;
N -= 4;
}
sum3 = (((((sum3 + (sum2 << 8)) | 0) + (sum1 << 16)) | 0) + (sum0 << 24)) | 0;
switch (N) {
//@ts-ignore

Check failure on line 122 in src/fossil.ts

View workflow job for this annotation

GitHub Actions / test (20)

Include a description after the "@ts-ignore" directive to explain why the @ts-ignore is necessary. The description must be 3 characters or longer
case 3:
sum3 = (sum3 + (arr[z + 2] << 8)) | 0; /* falls through */
//@ts-ignore

Check failure on line 125 in src/fossil.ts

View workflow job for this annotation

GitHub Actions / test (20)

Include a description after the "@ts-ignore" directive to explain why the @ts-ignore is necessary. The description must be 3 characters or longer
case 2:

Check failure on line 126 in src/fossil.ts

View workflow job for this annotation

GitHub Actions / test (20)

Expected a 'break' statement before 'case'
sum3 = (sum3 + (arr[z + 1] << 16)) | 0; /* falls through */
case 1:
sum3 = (sum3 + (arr[z + 0] << 24)) | 0; /* falls through */
}
return sum3 >>> 0;
}

/**
* Apply a delta byte array to a source byte array, returning the target byte array.
*/
export function applyDelta<T extends ByteArray>(
source: T,
delta: T
): T {
let limit: number,
total = 0;
const zDelta = new Reader(delta);
const lenSrc = source.length;
const lenDelta = delta.length;

limit = zDelta.getInt();

Check failure on line 147 in src/fossil.ts

View workflow job for this annotation

GitHub Actions / test (20)

'limit' is never reassigned. Use 'const' instead
if (zDelta.getChar() !== "\n")
throw new Error("size integer not terminated by '\\n'");
const zOut = new Writer();
while (zDelta.haveBytes()) {
let cnt = zDelta.getInt();

Check failure on line 152 in src/fossil.ts

View workflow job for this annotation

GitHub Actions / test (20)

'cnt' is never reassigned. Use 'const' instead
let ofst: number;

switch (zDelta.getChar()) {
case "@":
ofst = zDelta.getInt();
if (zDelta.haveBytes() && zDelta.getChar() !== ",")
throw new Error("copy command not terminated by ','");
total += cnt;
if (total > limit) throw new Error("copy exceeds output file size");
if (ofst + cnt > lenSrc)
throw new Error("copy extends past end of input");
zOut.putArray(source, ofst, ofst + cnt);
break;

case ":":
total += cnt;
if (total > limit)
throw new Error(
"insert command gives an output larger than predicted"
);
if (cnt > lenDelta)
throw new Error("insert count exceeds size of delta");
zOut.putArray(zDelta.a, zDelta.pos, zDelta.pos + cnt);
zDelta.pos += cnt;
break;

case ";":
const out = zOut.toByteArray(source);

Check failure on line 180 in src/fossil.ts

View workflow job for this annotation

GitHub Actions / test (20)

Unexpected lexical declaration in case block
if (cnt !== checksum(out))
throw new Error("bad checksum");
if (total !== limit)
throw new Error("generated size does not match predicted size");
return out;

default:
throw new Error("unknown delta operator");
}
}
throw new Error("unterminated delta");
}
17 changes: 17 additions & 0 deletions src/json.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { applyDelta } from './fossil';

/** @internal */
export class JsonCodec {
name() {
Expand All @@ -11,4 +13,19 @@ export class JsonCodec {
decodeReplies(data: string): any[] {
return data.trim().split('\n').map(r => JSON.parse(r));
}

applyDeltaIfNeeded(pub: any, prevValue: any) {
let newData: any, newPrevValue: any;
if (pub.delta) {
// JSON string delta.
const valueArray = applyDelta(prevValue, new TextEncoder().encode(pub.data));
newData = JSON.parse(new TextDecoder().decode(valueArray))
newPrevValue = valueArray;
} else {
// Full data as JSON string.
newData = JSON.parse(pub.data);
newPrevValue = new TextEncoder().encode(pub.data);
}
return { newData, newPrevValue }
}
}
19 changes: 18 additions & 1 deletion src/protobuf.codec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import * as protobuf from 'protobufjs/light'
import * as protoJSON from './client.proto.json';
import { applyDelta } from './fossil';

const proto = protobuf.Root.fromJSON(protoJSON);

const Command = proto.lookupType('protocol.Command');
Expand Down Expand Up @@ -52,4 +54,19 @@ export class ProtobufCodec {
ok: false
};
}
}

applyDeltaIfNeeded(pub: any, prevValue: any) {
let newData: any, newPrevValue: any;
if (pub.delta) {
// binary delta.
const valueArray = applyDelta(prevValue, pub.data);
newData = new Uint8Array(valueArray)
newPrevValue = valueArray;
} else {
// full binary data.
newData = pub.data;
newPrevValue = pub.data;
}
return { newData, newPrevValue }
}
}
28 changes: 27 additions & 1 deletion src/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
private _epoch: string | null;
private _resubscribeAttempts: number;
private _promiseId: number;

private _delta: string;
private _delta_negotiated: boolean;
private _token: string;
private _data: any | null;
private _getData: null | ((ctx: SubscriptionDataContext) => Promise<any>);
Expand All @@ -35,6 +36,7 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
private _joinLeave: boolean;
// @ts-ignore – this is used by a client in centrifuge.ts.
private _inflight: boolean;
private _prevValue: any;

/** Subscription constructor should not be used directly, create subscriptions using Client method. */
constructor(centrifuge: Centrifuge, channel: string, options?: Partial<SubscriptionOptions>) {
Expand All @@ -60,6 +62,9 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
this._promiseId = 0;
this._inflight = false;
this._refreshTimeout = null;
this._delta = '';
this._delta_negotiated = false;
this._prevValue = null;
this._setOptions(options);
// @ts-ignore – we are hiding some symbols from public API autocompletion.
if (this._centrifuge._debugEnabled) {
Expand Down Expand Up @@ -222,6 +227,11 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
this._offset = result.offset || 0;
this._epoch = result.epoch || '';
}
if (result.delta) {
this._delta_negotiated = true;
} else {
this._delta_negotiated = false;
}

this._setState(SubscriptionState.Subscribed);
// @ts-ignore – we are hiding some methods from public API autocompletion.
Expand Down Expand Up @@ -377,6 +387,10 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
}
}

if (this._delta) {
req.delta = this._delta;
}

const cmd = { subscribe: req };

this._inflight = true;
Expand Down Expand Up @@ -448,6 +462,12 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
}

private _handlePublication(pub: any) {
if (this._delta && this._delta_negotiated) {
// @ts-ignore – we are hiding some methods from public API autocompletion.
const {newData, newPrevValue} = this._centrifuge._codec.applyDeltaIfNeeded(pub, this._prevValue)
pub.data = newData;
this._prevValue = newPrevValue;
}
// @ts-ignore – we are hiding some methods from public API autocompletion.
const ctx = this._centrifuge._getPublicationContext(this.channel, pub);
this.emit('publication', ctx);
Expand Down Expand Up @@ -568,6 +588,12 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
if (options.joinLeave === true) {
this._joinLeave = true;
}
if (options.delta) {
if (options.delta !== 'fossil') {
throw new Error('unsupported delta format');
}
this._delta = options.delta;
}
}

private _getOffset() {
Expand Down
2 changes: 2 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,8 @@ export interface SubscriptionOptions {
recoverable: boolean;
/** ask server to send join/leave messages. */
joinLeave: boolean;
/** delta format to be used */
delta: 'fossil';
}

/** Stream postion describes position of publication inside a stream. */
Expand Down
Loading

0 comments on commit 15466d9

Please sign in to comment.