Skip to content

Commit

Permalink
feat(core): Remove all long-lived in-memory state, use short-TTL caching
Browse files Browse the repository at this point in the history
Relates to #988. This change resolves stale data issues when running multiple Vendure instances in
parallel, e.g. as serverless functions.
  • Loading branch information
michaelbromley committed Sep 27, 2021
1 parent 214b86b commit d428ffc
Show file tree
Hide file tree
Showing 23 changed files with 157 additions and 83 deletions.
2 changes: 1 addition & 1 deletion packages/core/src/api/common/request-context.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export class RequestContextService {
session?: CachedSession,
): Promise<RequestContext> {
const channelToken = this.getChannelToken(req);
const channel = this.channelService.getChannelFromToken(channelToken);
const channel = await this.channelService.getChannelFromToken(channelToken);
const apiType = getApiType(info);

const hasOwnerPermission = !!requiredPermissions && requiredPermissions.includes(Permission.Owner);
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/api/resolvers/admin/zone.resolver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export class ZoneResolver {

@Query()
@Allow(Permission.ReadSettings, Permission.ReadZone)
zones(@Ctx() ctx: RequestContext): Zone[] {
zones(@Ctx() ctx: RequestContext): Promise<Zone[]> {
return this.zoneService.findAll(ctx);
}

Expand Down
1 change: 1 addition & 0 deletions packages/core/src/common/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ export * from './error/generated-graphql-admin-errors';
export * from './injector';
export * from './permission-definition';
export * from './ttl-cache';
export * from './self-refreshing-cache';
export * from './types/common-types';
export * from './types/injectable-strategy';
export * from './types/locale-types';
Expand Down
73 changes: 73 additions & 0 deletions packages/core/src/common/self-refreshing-cache.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import { Logger } from '../config/logger/vendure-logger';

/**
* @description
* A cache which automatically refreshes itself if the value is found to be stale.
*/
export interface SelfRefreshingCache<V> {
/**
* @description
* The current value of the cache. If the value is stale, the data will be refreshed and then
* the fresh value will be returned.
*/
value(): Promise<V>;

/**
* @description
* Force a refresh of the value, e.g. when it is known that the value has changed such as after
* an update operation to the source data in the database.
*/
refresh(): Promise<V>;
}

export interface SelfRefreshingCacheConfig<V> {
name: string;
ttl: number;
refreshFn: () => Promise<V>;
}

/**
* @description
* Creates a {@link SelfRefreshingCache} object, which is used to cache a single frequently-accessed value. In this type
* of cache, the function used to populate the value (`refreshFn`) is defined during the creation of the cache, and
* it is immediately used to populate the initial value.
*
* From there, when the `.value` property is accessed, it will _always_ return a value synchronously. If the
* value has expired, it will still be returned and the `refreshFn` will be triggered to update the value in the
* background.
*/
export async function createSelfRefreshingCache<V>(
config: SelfRefreshingCacheConfig<V>,
): Promise<SelfRefreshingCache<V>> {
const { ttl, name, refreshFn } = config;
const initialValue = await refreshFn();
let value = initialValue;
let expires = new Date().getTime() + ttl;
const refreshValue = (): Promise<V> => {
Logger.debug(`Refreshing the SelfRefreshingCache "${name}"`);
return refreshFn()
.then(newValue => {
value = newValue;
expires = new Date().getTime() + ttl;
return value;
})
.catch(err => {
Logger.error(
`Failed to update SelfRefreshingCache "${name}": ${err.message}`,
undefined,
err.stack,
);
return value;
});
};
return {
async value() {
const now = new Date().getTime();
if (expires < now) {
return refreshValue();
}
return value;
},
refresh: refreshValue,
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export class FastImporterService {
) {}

async initialize() {
this.defaultChannel = this.channelService.getDefaultChannel();
this.defaultChannel = await this.channelService.getDefaultChannel();
}

async createProduct(input: CreateProductInput): Promise<ID> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ export class Importer {
): Promise<ID[]> {
const facetValueIds: ID[] = [];
const ctx = new RequestContext({
channel: this.channelService.getDefaultChannel(),
channel: await this.channelService.getDefaultChannel(),
apiType: 'admin',
isAuthorized: true,
authorizedAsOwnerOnly: false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ export class ExternalAuthenticationService {
user: savedUser,
});
}
this.channelService.assignToCurrentChannel(customer, ctx);
await this.channelService.assignToCurrentChannel(customer, ctx);
await this.connection.getRepository(ctx, Customer).save(customer);

await this.historyService.createHistoryEntryForCustomer({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ export class OrderCalculator {
options?: { recalculateShipping?: boolean },
): Promise<OrderItem[]> {
const { taxZoneStrategy } = this.configService.taxOptions;
const zones = this.zoneService.findAll(ctx);
const zones = await this.zoneService.findAll(ctx);
const activeTaxZone = await this.requestContextCache.get(ctx, 'activeTaxZone', () =>
taxZoneStrategy.determineTaxZone(ctx, zones, ctx.channel, order),
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ export class ShippingCalculator {
order: Order,
skipIds: ID[] = [],
): Promise<EligibleShippingMethod[]> {
const shippingMethods = this.shippingMethodService
.getActiveShippingMethods(ctx.channel)
.filter(method => !skipIds.includes(method.id));
const shippingMethods = (await this.shippingMethodService.getActiveShippingMethods(ctx)).filter(
method => !skipIds.includes(method.id),
);

const checkEligibilityPromises = shippingMethods.map(method =>
this.checkEligibilityByShippingMethod(ctx, order, method),
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/service/services/asset.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ export class AssetService {
focalPoint: null,
customFields,
});
this.channelService.assignToCurrentChannel(asset, ctx);
await this.channelService.assignToCurrentChannel(asset, ctx);
return this.connection.getRepository(ctx, Asset).save(asset);
}

Expand Down
34 changes: 19 additions & 15 deletions packages/core/src/service/services/channel.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { RequestContext } from '../../api/common/request-context';
import { ErrorResultUnion, isGraphQlErrorResult } from '../../common/error/error-result';
import { ChannelNotFoundError, EntityNotFoundError, InternalServerError } from '../../common/error/errors';
import { LanguageNotAvailableError } from '../../common/error/generated-graphql-admin-errors';
import { createSelfRefreshingCache, SelfRefreshingCache } from '../../common/self-refreshing-cache';
import { ChannelAware } from '../../common/types/common-types';
import { assertFound, idsAreEqual } from '../../common/utils';
import { ConfigService } from '../../config/config.service';
Expand All @@ -32,7 +33,7 @@ import { GlobalSettingsService } from './global-settings.service';

@Injectable()
export class ChannelService {
private allChannels: Channel[] = [];
private allChannels: SelfRefreshingCache<Channel[]>;

constructor(
private connection: TransactionalConnection,
Expand All @@ -47,15 +48,20 @@ export class ChannelService {
*/
async initChannels() {
await this.ensureDefaultChannelExists();
await this.updateAllChannels();
this.allChannels = await createSelfRefreshingCache({
name: 'ChannelService.allChannels',
ttl: 10000,
refreshFn: () => this.findAll(RequestContext.empty()),
});
}

/**
* Assigns a ChannelAware entity to the default Channel as well as any channel
* specified in the RequestContext.
*/
assignToCurrentChannel<T extends ChannelAware>(entity: T, ctx: RequestContext): T {
const channelIds = unique([ctx.channelId, this.getDefaultChannel().id]);
async assignToCurrentChannel<T extends ChannelAware>(entity: T, ctx: RequestContext): Promise<T> {
const defaultChannel = await this.getDefaultChannel();
const channelIds = unique([ctx.channelId, defaultChannel.id]);
entity.channels = channelIds.map(id => ({ id })) as any;
return entity;
}
Expand Down Expand Up @@ -105,12 +111,13 @@ export class ChannelService {
/**
* Given a channel token, returns the corresponding Channel if it exists.
*/
getChannelFromToken(token: string): Channel {
if (this.allChannels.length === 1 || token === '') {
async getChannelFromToken(token: string): Promise<Channel> {
const allChannels = await this.allChannels.value();
if (allChannels.length === 1 || token === '') {
// there is only the default channel, so return it
return this.getDefaultChannel();
}
const channel = this.allChannels.find(c => c.token === token);
const channel = allChannels.find(c => c.token === token);
if (!channel) {
throw new ChannelNotFoundError(token);
}
Expand All @@ -120,8 +127,9 @@ export class ChannelService {
/**
* Returns the default Channel.
*/
getDefaultChannel(): Channel {
const defaultChannel = this.allChannels.find(channel => channel.code === DEFAULT_CHANNEL_CODE);
async getDefaultChannel(): Promise<Channel> {
const allChannels = await this.allChannels.value();
const defaultChannel = allChannels.find(channel => channel.code === DEFAULT_CHANNEL_CODE);

if (!defaultChannel) {
throw new InternalServerError(`error.default-channel-not-found`);
Expand Down Expand Up @@ -166,7 +174,7 @@ export class ChannelService {
}
const newChannel = await this.connection.getRepository(ctx, Channel).save(channel);
await this.customFieldRelationService.updateRelations(ctx, Channel, input, newChannel);
await this.updateAllChannels(ctx);
await this.allChannels.refresh();
return channel;
}

Expand Down Expand Up @@ -199,7 +207,7 @@ export class ChannelService {
}
await this.connection.getRepository(ctx, Channel).save(updatedChannel, { reload: false });
await this.customFieldRelationService.updateRelations(ctx, Channel, input, updatedChannel);
await this.updateAllChannels(ctx);
await this.allChannels.refresh();
return assertFound(this.findOne(ctx, channel.id));
}

Expand Down Expand Up @@ -262,8 +270,4 @@ export class ChannelService {
}
}
}

private async updateAllChannels(ctx?: RequestContext) {
this.allChannels = await this.findAll(ctx || RequestContext.empty());
}
}
3 changes: 0 additions & 3 deletions packages/core/src/service/services/country.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ export class CountryService {
entityType: Country,
translationType: CountryTranslation,
});
await this.zoneService.updateZonesCache(ctx);
return assertFound(this.findOne(ctx, country.id));
}

Expand All @@ -94,7 +93,6 @@ export class CountryService {
entityType: Country,
translationType: CountryTranslation,
});
await this.zoneService.updateZonesCache(ctx);
return assertFound(this.findOne(ctx, country.id));
}

Expand All @@ -112,7 +110,6 @@ export class CountryService {
message: ctx.translate('message.country-used-in-addresses', { count: addressesUsingCountry }),
};
} else {
await this.zoneService.updateZonesCache(ctx);
await this.connection.getRepository(ctx, Country).remove(country);
return {
result: DeletionResult.DELETED,
Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/service/services/customer.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ export class CustomerService {
} else {
this.eventBus.publish(new AccountRegistrationEvent(ctx, customer.user));
}
this.channelService.assignToCurrentChannel(customer, ctx);
await this.channelService.assignToCurrentChannel(customer, ctx);
const createdCustomer = await this.connection.getRepository(ctx, Customer).save(customer);
await this.customFieldRelationService.updateRelations(ctx, Customer, input, createdCustomer);
await this.historyService.createHistoryEntryForCustomer({
Expand Down Expand Up @@ -564,7 +564,7 @@ export class CustomerService {
customer.channels.push(await this.connection.getEntityOrThrow(ctx, Channel, ctx.channelId));
} else {
customer = await this.connection.getRepository(ctx, Customer).save(new Customer(input));
this.channelService.assignToCurrentChannel(customer, ctx);
await this.channelService.assignToCurrentChannel(customer, ctx);
this.eventBus.publish(new CustomerEvent(ctx, customer, 'created'));
}
return this.connection.getRepository(ctx, Customer).save(customer);
Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/service/services/facet-value.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ export class FacetValueService {
input,
entityType: FacetValue,
translationType: FacetValueTranslation,
beforeSave: fv => {
beforeSave: async fv => {
fv.facet = facet;
this.channelService.assignToCurrentChannel(fv, ctx);
await this.channelService.assignToCurrentChannel(fv, ctx);
},
});
await this.customFieldRelationService.updateRelations(
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/service/services/facet.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ export class FacetService {
translationType: FacetTranslation,
beforeSave: async f => {
f.code = await this.ensureUniqueCode(ctx, f.code);
this.channelService.assignToCurrentChannel(f, ctx);
await this.channelService.assignToCurrentChannel(f, ctx);
},
});
await this.customFieldRelationService.updateRelations(ctx, Facet, input, facet);
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/service/services/order.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ export class OrderService {
newOrder.customer = customer;
}
}
this.channelService.assignToCurrentChannel(newOrder, ctx);
await this.channelService.assignToCurrentChannel(newOrder, ctx);
const order = await this.connection.getRepository(ctx, Order).save(newOrder);
const transitionResult = await this.transitionToState(ctx, order.id, 'AddingItems');
if (isGraphQlErrorResult(transitionResult)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ export class PaymentMethodService {
input.checker,
);
}
this.channelService.assignToCurrentChannel(paymentMethod, ctx);
await this.channelService.assignToCurrentChannel(paymentMethod, ctx);
return this.connection.getRepository(ctx, PaymentMethod).save(paymentMethod);
}

Expand Down
9 changes: 5 additions & 4 deletions packages/core/src/service/services/product-variant.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ export class ProductVariantService {
variant.product = { id: input.productId } as any;
variant.taxCategory = { id: input.taxCategoryId } as any;
await this.assetService.updateFeaturedAsset(ctx, variant, input);
this.channelService.assignToCurrentChannel(variant, ctx);
await this.channelService.assignToCurrentChannel(variant, ctx);
},
typeOrmSubscriberData: {
channelId: ctx.channelId,
Expand All @@ -401,7 +401,7 @@ export class ProductVariantService {
);
}

const defaultChannelId = this.channelService.getDefaultChannel().id;
const defaultChannelId = (await this.channelService.getDefaultChannel()).id;
await this.createOrUpdateProductVariantPrice(ctx, createdVariant.id, input.price, ctx.channelId);
if (!idsAreEqual(ctx.channelId, defaultChannelId)) {
// When creating a ProductVariant _not_ in the default Channel, we still need to
Expand Down Expand Up @@ -585,7 +585,7 @@ export class ProductVariantService {
});
}
const { taxZoneStrategy } = this.configService.taxOptions;
const zones = this.requestCache.get(ctx, 'allZones', () => this.zoneService.findAll(ctx));
const zones = await this.requestCache.get(ctx, 'allZones', () => this.zoneService.findAll(ctx));
const activeTaxZone = await this.requestCache.get(ctx, 'activeTaxZone', () =>
taxZoneStrategy.determineTaxZone(ctx, zones, ctx.channel, order),
);
Expand Down Expand Up @@ -670,7 +670,8 @@ export class ProductVariantService {
if (!hasPermission) {
throw new ForbiddenError();
}
if (idsAreEqual(input.channelId, this.channelService.getDefaultChannel().id)) {
const defaultChannel = await this.channelService.getDefaultChannel();
if (idsAreEqual(input.channelId, defaultChannel.id)) {
throw new UserInputError('error.products-cannot-be-removed-from-default-channel');
}
const variants = await this.connection
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/service/services/product.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ export class ProductService {
entityType: Product,
translationType: ProductTranslation,
beforeSave: async p => {
this.channelService.assignToCurrentChannel(p, ctx);
await this.channelService.assignToCurrentChannel(p, ctx);
if (input.facetValueIds) {
p.facetValues = await this.facetValueService.findByIds(ctx, input.facetValueIds);
}
Expand Down
Loading

0 comments on commit d428ffc

Please sign in to comment.