Skip to content

Commit

Permalink
working fix; still needs cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
svidgen committed Jun 1, 2022
1 parent bd01398 commit 9c239b4
Show file tree
Hide file tree
Showing 2 changed files with 161 additions and 50 deletions.
163 changes: 115 additions & 48 deletions packages/datastore/__tests__/DataStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import {
PersistentModel,
PersistentModelConstructor,
} from '../src/types';
import { Comment, Model, Post, Metadata, testSchema } from './helpers';
import { Comment, Model, Post, Metadata, testSchema, pause } from './helpers';

let initSchema: typeof initSchemaType;
let DataStore: typeof DataStoreType;
Expand Down Expand Up @@ -273,6 +273,8 @@ describe('DataStore observeQuery, with fake-indexeddb and fake sync', () => {
Comment: PersistentModelConstructor<Comment>;
Post: PersistentModelConstructor<Post>;
});

await DataStore.start();
await DataStore.clear();

// Fully faking or mocking the sync engine would be pretty significant.
Expand All @@ -294,6 +296,25 @@ describe('DataStore observeQuery, with fake-indexeddb and fake sync', () => {
(DataStore as any).syncPageSize = 1000;
});

afterEach(async () => {
//
// ~~~~ NAUGHTINESS WARNING! ~~~~
//
// ( cover your eyes )
//
// this prevents pollutions between tests, especially those that test observe
// behavior. it would seem that, due to the order in which DataStore processes
// observers internally, we need to inject a small async pause to let DataStore
// "settle" before clearing it and starting the next test -- IF NOT, we get
// spooky contamination between tests.
//
await pause(10);

// and out of an abundance of caution:
await DataStore.start();
await DataStore.clear();
});

test('publishes preexisting local data immediately', async done => {
try {
for (let i = 0; i < 5; i++) {
Expand Down Expand Up @@ -388,88 +409,134 @@ describe('DataStore observeQuery, with fake-indexeddb and fake sync', () => {
// The test currently times out, because the mutation is filtered before
// observeQuery can get ahold of it, so the last expected subscription
// message never even arrives.
test.skip('can remove newly-unmatched items out of the snapshot on subsequent saves', async done => {
test('can remove newly-unmatched items out of the snapshot on subsequent saves', async done => {
try {
const expecteds = [0, 5, 4];

// watch for post snapshots.
// the first "real" snapshot should include all five posts with "include"
// in the title. after the update to change ONE of those posts to "omit" instead,
// we should see a snapshot of 4 posts with the updated post removed.
const expecteds = [0, 4, 3];
const sub = DataStore.observeQuery(Post, p =>
p.title('contains', 'include')
).subscribe(({ items }) => {
).subscribe(async ({ items }) => {
const expected = expecteds.shift() || 0;
expect(items.length).toBe(expected);

for (const item of items) {
expect(item.title).toMatch('include');
}

if (expecteds.length === 0) {
if (expecteds.length === 1) {
// after the second snapshot arrives, changes a single post from
// "the post # - include"
// to
// "edited post - omit"

// because this is intended to trigger a new, post-sync'd snapshot,
// we'll start with a little sanity check:
expect(
((DataStore as any).sync as any).getModelSyncedStatus({})
).toBe(true);

await pause(1);
const itemToEdit = (
await DataStore.query(Post, p => p.title('contains', 'include'))
).pop();
await DataStore.save(
Post.copyOf(itemToEdit, draft => {
draft.title = 'second edited post - omit';
})
);
} else if (expecteds.length === 0) {
sub.unsubscribe();
done();
}
});

setTimeout(async () => {
// creates posts like:
//
// "the post 0 - include"
// "the post 1 - omit"
// "the post 2 - include"
// "the post 3 - omit"
//
// etc.
//
for (let i = 0; i < 10; i++) {
await DataStore.save(
new Post({
title: `the post ${i} - ${Boolean(i % 2) ? 'include' : 'omit'}`,
})
);
}
setTimeout(async () => {
const itemToEdit = (await DataStore.query(Post)).pop();
await DataStore.save(
Post.copyOf(itemToEdit, draft => {
draft.title = 'edited post - omit';
})
);
}, 100);
}, 100);
} catch (error) {
done(error);
}
});

test('publishes preexisting local data AND follows up with subsequent saves', async done => {
try {
const expecteds = [5, 15];

for (let i = 0; i < 5; i++) {
// changes a single post from
// "the post # - include"
// to
// "edited post - omit"
await pause(1);
((DataStore as any).sync as any).getModelSyncedStatus = (model: any) =>
true;

// the first edit simulates a quick-turnaround update that gets
// applied while the first snapshot is still being generated
const itemToEdit = (
await DataStore.query(Post, p => p.title('contains', 'include'))
).pop();
await DataStore.save(
new Post({
title: `the post ${i}`,
Post.copyOf(itemToEdit, draft => {
draft.title = 'first edited post - omit';
})
);
}

const sub = DataStore.observeQuery(Post).subscribe(
({ items, isSynced }) => {
const expected = expecteds.shift() || 0;
expect(items.length).toBe(expected);

for (let i = 0; i < expected; i++) {
expect(items[i].title).toEqual(`the post ${i}`);
}
}, 1);
} catch (error) {
done(error);
}
});

if (expecteds.length === 0) {
sub.unsubscribe();
done();
}
}
);
test('publishes preexisting local data AND follows up with subsequent saves', done => {
(async () => {
try {
const expecteds = [5, 15];

setTimeout(async () => {
for (let i = 5; i < 15; i++) {
for (let i = 0; i < 5; i++) {
await DataStore.save(
new Post({
title: `the post ${i}`,
})
);
}
}, 100);
} catch (error) {
done(error);
}

const sub = DataStore.observeQuery(Post).subscribe(
({ items, isSynced }) => {
const expected = expecteds.shift() || 0;
expect(items.length).toBe(expected);

for (let i = 0; i < expected; i++) {
expect(items[i].title).toEqual(`the post ${i}`);
}

if (expecteds.length === 0) {
sub.unsubscribe();
done();
}
}
);

setTimeout(async () => {
for (let i = 5; i < 15; i++) {
await DataStore.save(
new Post({
title: `the post ${i}`,
})
);
}
}, 100);
} catch (error) {
done(error);
}
})();
});
});

Expand Down
48 changes: 46 additions & 2 deletions packages/datastore/src/datastore/datastore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ import {
registerNonModelClass,
sortCompareFunction,
DeferredCallbackResolver,
validatePredicate,
} from '../util';
import { has } from 'immer/dist/internal';

setAutoFreeze(true);
enablePatches();
Expand Down Expand Up @@ -1188,6 +1190,7 @@ class DataStore {
const itemsChanged = new Map<string, T>();
let deletedItemIds: string[] = [];
let handle: ZenObservable.Subscription;
let predicate: ModelPredicate<T>;

const generateAndEmitSnapshot = (): void => {
const snapshot = generateSnapshot();
Expand All @@ -1205,6 +1208,28 @@ class DataStore {
const { sort } = options || {};
const sortOptions = sort ? { sort } : undefined;

const modelDefinition = getModelDefinition(model);
if (isQueryOne(criteria)) {
predicate = ModelPredicateCreator.createForId<T>(
modelDefinition,
criteria
);
} else {
if (isPredicatesAll(criteria)) {
// Predicates.ALL means "all records", so no predicate (undefined)
predicate = undefined;
} else {
predicate = ModelPredicateCreator.createFromExisting(
modelDefinition,
criteria
);
}
}

const { predicates, type: predicateGroupType } =
ModelPredicateCreator.getPredicates(predicate, false) || {};
const hasPredicate = !!predicates;

(async () => {
try {
// first, query and return any locally-available records
Expand All @@ -1214,10 +1239,29 @@ class DataStore {

// observe the model and send a stream of updates (debounced)
handle = this.observe(
model,
model
// @ts-ignore TODO: fix this TSlint error
criteria
// criteria
).subscribe(({ element, model, opType }) => {
// We need to filter HERE instead of in `observe()` to ensure we see updated for
// items that ~become~ filtered-out as the result of a save. We need to remove
// those items from the existing snapshot.
if (
hasPredicate &&
!validatePredicate(element, predicateGroupType, predicates)
) {
if (
opType === 'UPDATE' &&
(items.has(element.id) || itemsChanged.has(element.id))
) {
// we need to track this as a "deleted item" to calcuate a correct page `limit`.
deletedItemIds.push(element.id);
} else {
// ignore updates for irrelevant/filtered items.
return;
}
}

// Flag items which have been recently deleted
// NOTE: Merging of separate operations to the same model instance is handled upstream
// in the `mergePage` method within src/sync/merger.ts. The final state of a model instance
Expand Down

0 comments on commit 9c239b4

Please sign in to comment.