From 4ccac35b01bea9c191f04d74ecde0c3c51cf1e2b Mon Sep 17 00:00:00 2001 From: Alberto Ricart Date: Tue, 19 Sep 2023 16:28:18 -0500 Subject: [PATCH] [FEAT] stream configurations now have `compression` option - this is supported on servers 2.10.x or better. --- jetstream/jsapi_types.ts | 16 ++++++++++++++ jetstream/jsmstream_api.ts | 6 +++++ jetstream/tests/jsm_test.ts | 44 ++++++++++++++++++++++++++++++++++++- nats-base-client/semver.ts | 2 ++ 4 files changed, 67 insertions(+), 1 deletion(-) diff --git a/jetstream/jsapi_types.ts b/jetstream/jsapi_types.ts index 7096e3d3..e147ff0e 100644 --- a/jetstream/jsapi_types.ts +++ b/jetstream/jsapi_types.ts @@ -240,6 +240,11 @@ export interface StreamUpdateConfig { * This feature only supported on 2.10.x and better. */ subject_transform?: SubjectTransformConfig; + /** + * Sets the compression level of the stream. This feature is only supported in + * servers 2.10.x and better. + */ + compression?: StoreCompression; } export interface Republish { @@ -410,6 +415,17 @@ export enum ReplayPolicy { Original = "original", } +export enum StoreCompression { + /** + * No compression + */ + None = "none", + /** + * S2 compression + */ + S2 = "s2", +} + /** * Options for StreamAPI info requests */ diff --git a/jetstream/jsmstream_api.ts b/jetstream/jsmstream_api.ts index 2c2812fa..4371ec4d 100644 --- a/jetstream/jsmstream_api.ts +++ b/jetstream/jsmstream_api.ts @@ -237,6 +237,12 @@ export class StreamAPIImpl extends BaseApiClient implements StreamAPI { throw new Error(`stream 'subject_transform' requires server ${min}`); } } + if (cfg.compression) { + const { min, ok } = nci.features.get(Feature.JS_STREAM_COMPRESSION); + if (!ok) { + throw new Error(`stream 'compression' requires server ${min}`); + } + } function validateStreamSource( context: string, diff --git a/jetstream/tests/jsm_test.ts b/jetstream/tests/jsm_test.ts index cbce3a8c..e875f0bf 100644 --- a/jetstream/tests/jsm_test.ts +++ b/jetstream/tests/jsm_test.ts @@ -73,7 +73,7 @@ import { JetStreamManagerImpl } from "../jsm.ts"; import { Feature } from "../../nats-base-client/semver.ts"; import { convertStreamSourceDomain } from "../jsmstream_api.ts"; import { ConsumerAPIImpl } from "../jsmconsumer_api.ts"; -import { ConsumerApiAction } from "../jsapi_types.ts"; +import { ConsumerApiAction, StoreCompression } from "../jsapi_types.ts"; const StreamNameRequired = "stream name required"; const ConsumerNameRequired = "durable name required"; @@ -2544,3 +2544,45 @@ Deno.test("jsm - source transforms rejected on old servers", async () => { await cleanup(ns, nc); }); + +Deno.test("jsm - stream compression not supported", async () => { + const { ns, nc } = await setup(jetstreamServerConf()); + const nci = nc as NatsConnectionImpl; + nci.features.update("2.9.0"); + nci.info!.version = "2.9.0"; + + const jsm = await nc.jetstreamManager(); + + await assertRejects( + async () => { + await jsm.streams.add({ + name: "n", + subjects: ["foo"], + storage: StorageType.File, + compression: StoreCompression.S2, + }); + }, + Error, + "stream 'compression' requires server 2.10.0", + ); + + await cleanup(ns, nc); +}); + +Deno.test("jsm - stream compression", async () => { + const { ns, nc } = await setup(jetstreamServerConf()); + + const jsm = await nc.jetstreamManager(); + let si = await jsm.streams.add({ + name: "n", + subjects: ["foo"], + storage: StorageType.File, + compression: StoreCompression.S2, + }); + assertEquals(si.config.compression, StoreCompression.S2); + + si = await jsm.streams.update("n", { compression: StoreCompression.None }); + assertEquals(si.config.compression, StoreCompression.None); + + await cleanup(ns, nc); +}); diff --git a/nats-base-client/semver.ts b/nats-base-client/semver.ts index 4e8641d5..412c3dd5 100644 --- a/nats-base-client/semver.ts +++ b/nats-base-client/semver.ts @@ -50,6 +50,7 @@ export enum Feature { JS_STREAM_FIRST_SEQ = "js_stream_first_seq", JS_STREAM_SUBJECT_TRANSFORM = "js_stream_subject_transform", JS_STREAM_SOURCE_SUBJECT_TRANSFORM = "js_stream_source_subject_transform", + JS_STREAM_COMPRESSION = "js_stream_compression", } type FeatureVersion = { @@ -105,6 +106,7 @@ export class Features { this.set(Feature.JS_STREAM_FIRST_SEQ, "2.10.0"); this.set(Feature.JS_STREAM_SUBJECT_TRANSFORM, "2.10.0"); this.set(Feature.JS_STREAM_SOURCE_SUBJECT_TRANSFORM, "2.10.0"); + this.set(Feature.JS_STREAM_COMPRESSION, "2.10.0"); this.disabled.forEach((f) => { this.features.delete(f);