Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

initialize only once #4921

Merged
merged 8 commits into from
Feb 24, 2020
2 changes: 1 addition & 1 deletion packages/core/src/Util/Reachability.native.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ export default class ReachabilityNavigator implements Reachability {
}, 2000);

return () => {
logger.warn('unsubscribing reachability');
logger.log('unsubscribing reachability');

clearInterval(id);
};
Expand Down
67 changes: 64 additions & 3 deletions packages/datastore/__tests__/DataStore.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,37 @@
import 'fake-indexeddb/auto';
import * as uuidValidate from 'uuid-validate';
import { initSchema as initSchemaType } from '../src/datastore/datastore';
import {
initSchema as initSchemaType,
DataStore as DataStoreType,
} from '../src/datastore/datastore';
import {
ModelInit,
MutableModel,
PersistentModelConstructor,
Schema,
} from '../src/types';
import StorageType from '../src/storage/storage';
import Observable from 'zen-observable-ts';

let initSchema: typeof initSchemaType;
let DataStore: typeof DataStoreType;
let Storage: typeof StorageType;

beforeEach(() => {
jest.resetModules();

({ initSchema } = require('../src/datastore/datastore'));
jest.doMock('../src/storage/storage', () => {
const mock = jest.fn().mockImplementation(() => ({
runExclusive: jest.fn(),
query: jest.fn(),
observe: jest.fn(() => Observable.of()),
}));

(<any>mock).getNamespace = () => ({ models: {} });

return { default: mock };
});
({ initSchema, DataStore } = require('../src/datastore/datastore'));
});

describe('DataStore tests', () => {
Expand Down Expand Up @@ -66,6 +84,14 @@ describe('DataStore tests', () => {
uuidValidate(model.id.replace(/^(.{4})-(.{4})-(.{8})/, '$3-$2-$1'), 1)
).toBe(true);
});

test('initSchema is executed only once', () => {
initSchema(testSchema());

expect(() => {
initSchema(testSchema());
}).toThrow('The schema has already been initialized');
});
});

describe('Immutability', () => {
Expand Down Expand Up @@ -122,6 +148,39 @@ describe('DataStore tests', () => {
expect(model1.id).toBe(model2.id);
});
});

describe('Initialization', () => {
test('start is called only once', async () => {
Storage = require('../src/storage/storage').default;

const classes = initSchema(testSchema());

const { Model } = classes;

const promises = [
DataStore.query(Model),
DataStore.query(Model),
DataStore.query(Model),
DataStore.query(Model),
];

await Promise.all(promises);

expect(Storage).toHaveBeenCalledTimes(1);
});
});

test('It is initialized when observing (no query)', async () => {
Storage = require('../src/storage/storage').default;

const classes = initSchema(testSchema());

const { Model } = classes;

DataStore.observe(Model).subscribe(jest.fn());

expect(Storage).toHaveBeenCalledTimes(1);
});
});

//#region Test helpers
Expand All @@ -144,6 +203,7 @@ function testSchema(): Schema {
models: {
Model: {
name: 'Model',
pluralName: 'Models',
syncable: true,
fields: {
id: {
Expand All @@ -162,6 +222,7 @@ function testSchema(): Schema {
},
LocalModel: {
name: 'LocalModel',
pluralName: 'LocalModels',
syncable: false,
fields: {
id: {
Expand All @@ -179,7 +240,7 @@ function testSchema(): Schema {
},
},
},
version: 1,
version: '1',
};
}

Expand Down
52 changes: 36 additions & 16 deletions packages/datastore/src/datastore/datastore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,6 @@ const observe: {
modelConstructor?: PersistentModelConstructor<T>,
idOrCriteria?: string | ProducerModelPredicate<T>
) => {
start();
let predicate: ModelPredicate<T>;

if (idOrCriteria !== undefined && modelConstructor === undefined) {
Expand Down Expand Up @@ -504,9 +503,24 @@ const observe: {
);
}

return storage
.observe(modelConstructor, predicate)
.filter(({ model }) => namespaceResolver(model) === USER);
return new Observable<SubscriptionMessage<any>>(observer => {
let handle: ZenObservable.Subscription;

(async () => {
await start();

handle = storage
.observe(modelConstructor, predicate)
.filter(({ model }) => namespaceResolver(model) === USER)
.subscribe(observer);
})();

return () => {
if (handle) {
handle.unsubscribe();
}
};
});
};

const query: {
Expand Down Expand Up @@ -683,21 +697,29 @@ async function checkSchemaVersion(
if (storedValue !== version) {
await s.clear(false);
}
} else {
await s.save(
modelInstanceCreator(Setting, {
key: SETTING_SCHEMA_VERSION,
value: JSON.stringify(version),
})
);
}

await s.save(
modelInstanceCreator(Setting, {
key: SETTING_SCHEMA_VERSION,
value: JSON.stringify(version),
})
);
});
}

let syncSubscription: ZenObservable.Subscription;

let initResolve: Function;
let initialized: Promise<void>;
async function start(): Promise<void> {
if (storage !== undefined) {
if (initialized === undefined) {
initialized = new Promise(res => {
initResolve = res;
});
} else {
await initialized;

return;
}

Expand All @@ -710,10 +732,6 @@ async function start(): Promise<void> {

await checkSchemaVersion(storage, schema.version);

if (sync !== undefined) {
return;
}

const { aws_appsync_graphqlEndpoint } = amplifyConfig;

if (aws_appsync_graphqlEndpoint) {
Expand All @@ -738,6 +756,8 @@ async function start(): Promise<void> {
},
});
}

initResolve();
}

async function clear() {
Expand Down
10 changes: 8 additions & 2 deletions packages/datastore/src/sync/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ export class SyncEngine {
new Reachability().networkMonitor().subscribe(async ({ online }) => {
this.online = online;
if (online) {
//#region GraphQL Subscriptions
const [
ctlSubsObservable,
dataSubsObservable,
Expand All @@ -151,6 +152,9 @@ export class SyncEngine {
}

logger.log('Realtime ready');
//#endregion

//#region Base & Sync queries
const currentTimeStamp = new Date().getTime();

const modelLastSync: Map<
Expand Down Expand Up @@ -181,8 +185,9 @@ export class SyncEngine {
observer.error(err);
return;
}
//#endregion

// process mutations
//#region process mutations
subscriptions.push(
this.mutationsProcessor
.start()
Expand All @@ -201,8 +206,9 @@ export class SyncEngine {
}
)
);
//#endregion

// TODO: extract to funciton
// TODO: extract to function
subscriptions.push(
dataSubsObservable.subscribe(
([_transformerMutationType, modelDefinition, item]) => {
Expand Down
14 changes: 8 additions & 6 deletions packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider {
if (type === MESSAGE_TYPES.GQL_CONNECTION_KEEP_ALIVE) {
clearTimeout(this.keepAliveTimeoutId);
this.keepAliveTimeoutId = setTimeout(
this._timeoutDisconnect.bind(this),
this._errorDisconnect.bind(this, 'Timeout disconnect'),
this.keepAliveTimeout
);
return;
Expand Down Expand Up @@ -492,16 +492,15 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider {
}
}

private _timeoutDisconnect() {
private _errorDisconnect(msg: string) {
this.subscriptionObserverMap.forEach(({ observer }) => {
if (!observer.closed) {
observer.error({
errors: [{ ...new GraphQLError(`Timeout disconnect`) }],
errors: [{ ...new GraphQLError(msg) }],
});
observer.complete();
}
});
this.subscriptionObserverMap = new Map();
this.subscriptionObserverMap.clear();
if (this.awsRealTimeSocket) {
this.awsRealTimeSocket.close();
}
Expand Down Expand Up @@ -661,7 +660,10 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider {
this.awsRealTimeSocket.onmessage = this._handleIncomingSubscriptionMessage.bind(
this
);
this.awsRealTimeSocket.onerror = logger.debug;
this.awsRealTimeSocket.onerror = err => {
logger.debug(err);
this._errorDisconnect('Connection closed');
};
res('Cool, connected to AWS AppSyncRealTime');
return;
}
Expand Down