Skip to content

Commit

Permalink
Merge pull request #596 from nats-io/os-link-updates
Browse files Browse the repository at this point in the history
os changes to update and links
  • Loading branch information
aricart authored Sep 20, 2023
2 parents 203290c + 02eac68 commit 7707f9c
Show file tree
Hide file tree
Showing 4 changed files with 255 additions and 67 deletions.
20 changes: 0 additions & 20 deletions jetstream/jsclient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ class ViewsImpl implements Views {
name: string,
opts: Partial<ObjectStoreOptions> = {},
): Promise<ObjectStore> {
jetstreamPreview(this.js.nc);
if (typeof crypto?.subtle?.digest !== "function") {
return Promise.reject(
new Error(
Expand Down Expand Up @@ -1027,22 +1026,3 @@ function autoAckJsMsg(data: JsMsg | null) {
data.ack();
}
}

const jetstreamPreview = (() => {
let once = false;
return (nci: NatsConnectionImpl) => {
if (!once) {
once = true;
const { lang } = nci?.protocol?.transport;
if (lang) {
console.log(
`\u001B[33m >> jetstream's materialized views object store functionality in ${lang} is beta functionality \u001B[0m`,
);
} else {
console.log(
`\u001B[33m >> jetstream's materialized views object store functionality is beta functionality \u001B[0m`,
);
}
}
};
})();
62 changes: 35 additions & 27 deletions jetstream/objectstore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ class ObjectInfoImpl implements ObjectInfo {
get metadata(): Record<string, string> {
return this.info.metadata || {};
}
isLink() {
return this.info.options?.link !== undefined;
}
}

function toServerObjectStoreMeta(
Expand Down Expand Up @@ -515,10 +518,12 @@ export class ObjectStoreImpl implements ObjectStore {
if (ln === "") {
throw new Error("link is a bucket");
}
const os = await ObjectStoreImpl.create(
this.js,
info.options.link.bucket,
);
const os = info.options.link.bucket !== this.name
? await ObjectStoreImpl.create(
this.js,
info.options.link.bucket,
)
: this;
return os.get(ln);
}

Expand Down Expand Up @@ -604,32 +609,31 @@ export class ObjectStoreImpl implements ObjectStore {
}

async link(name: string, info: ObjectInfo): Promise<ObjectInfo> {
if (info.deleted) {
return Promise.reject(new Error("object is deleted"));
}
const { name: n, error } = this._checkNotEmpty(name);
if (error) {
return Promise.reject(error);
}

// same object store
if (this.name === info.bucket) {
const copy = Object.assign({}, meta(info)) as ObjectStoreMeta;
copy.name = n;
try {
await this.update(info.name, copy);
const ii = await this.info(n);
return ii!;
} catch (err) {
return Promise.reject(err);
}
if (info.deleted) {
return Promise.reject(new Error("src object is deleted"));
}
if ((info as ObjectInfoImpl).isLink()) {
return Promise.reject(new Error("src object is a link"));
}
const dest = await this.rawInfo(name);
if (dest !== null && !dest.deleted) {
return Promise.reject(
new Error("an object already exists with that name"),
);
}

const link = { bucket: info.bucket, name: info.name };
const mm = {
name: n,
options: { link: link },
} as ObjectStoreMeta;
return this._put(mm, null);
await this.js.publish(this._metaSubject(name), JSON.stringify(mm));
const i = await this.info(name);
return Promise.resolve(i!);
}

async delete(name: string): Promise<PurgeResponse> {
Expand Down Expand Up @@ -667,9 +671,6 @@ export class ObjectStoreImpl implements ObjectStore {
new Error("cannot update meta for a deleted object"),
);
}
// FIXME: Go's implementation doesn't seem correct - it possibly adds a new meta entry
// effectively making the object available under 2 names, but it doesn't remove the
// older one.
meta.name = meta.name ?? info.name;
const { name: n, error } = this._checkNotEmpty(meta.name);
if (error) {
Expand All @@ -684,11 +685,18 @@ export class ObjectStoreImpl implements ObjectStore {
}
}
meta.name = n;

const ii = Object.assign({}, info, toServerObjectStoreMeta(meta!));
const jc = JSONCodec();

return this.js.publish(this._metaSubject(ii.name), jc.encode(ii));
// if the name changed, delete the old meta
const ack = await this.js.publish(
this._metaSubject(ii.name),
JSON.stringify(ii),
);
if (name !== meta.name) {
await this.jsm.streams.purge(this.stream, {
filter: this._metaSubject(name),
});
}
return Promise.resolve(ack);
}

async watch(opts: Partial<
Expand Down
38 changes: 35 additions & 3 deletions jetstream/tests/objectstore_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ Deno.test("objectstore - watch history", async () => {
await cleanup(ns, nc);
});

Deno.test("objectstore - self link", async () => {
Deno.test("objectstore - same store link", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}, true));
if (await notCompatible(ns, nc, "2.6.3")) {
return;
Expand All @@ -560,14 +560,43 @@ Deno.test("objectstore - self link", async () => {
readableStreamFrom(sc.encode("a")),
);
const oi = await os.link("ref", src);
assertEquals(oi.options?.link, undefined);
assertEquals(oi.nuid, src.nuid);
assertEquals(oi.options?.link?.bucket, src.bucket);
assertEquals(oi.options?.link?.name, "a");

const a = await os.list();
assertEquals(a.length, 2);
assertEquals(a[0].name, "a");
assertEquals(a[1].name, "ref");

const data = await os.getBlob("ref");
assertEquals(new TextDecoder().decode(data!), "a");

await cleanup(ns, nc);
});

Deno.test("objectstore - link of link rejected", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}, true));
if (await notCompatible(ns, nc, "2.6.3")) {
return;
}
const js = nc.jetstream();
const os = await js.views.os("test");

const sc = StringCodec();
const src = await os.put(
{ name: "a" },
readableStreamFrom(sc.encode("a")),
);
const link = await os.link("ref", src);

await assertRejects(
async () => {
await os.link("ref2", link);
},
Error,
"src object is a link",
);

await cleanup(ns, nc);
});

Expand All @@ -591,6 +620,9 @@ Deno.test("objectstore - external link", async () => {
assertEquals(io.options?.link?.bucket, "test");
assertEquals(io.options?.link?.name, "a");

const data = await os2.getBlob("ref");
assertEquals(new TextDecoder().decode(data!), "a");

await cleanup(ns, nc);
});

Expand Down
Loading

0 comments on commit 7707f9c

Please sign in to comment.