Skip to content

Commit

Permalink
feat(elasticsearch-plugin): Support search index job batching
Browse files Browse the repository at this point in the history
Relates to #1137
  • Loading branch information
michaelbromley committed Oct 7, 2021
1 parent becf132 commit f3fb298
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 11 deletions.
12 changes: 7 additions & 5 deletions packages/dev-server/dev-config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
LogLevel,
VendureConfig,
} from '@vendure/core';
import { ElasticsearchPlugin } from '@vendure/elasticsearch-plugin';
import { defaultEmailHandlers, EmailPlugin } from '@vendure/email-plugin';
import { BullMQJobQueuePlugin } from '@vendure/job-queue-plugin/package/bullmq';
import path from 'path';
Expand Down Expand Up @@ -64,14 +65,15 @@ export const devConfig: VendureConfig = {
route: 'assets',
assetUploadDir: path.join(__dirname, 'assets'),
}),
DefaultSearchPlugin.init({ bufferUpdates: true }),
// DefaultSearchPlugin.init({ bufferUpdates: true }),
BullMQJobQueuePlugin.init({}),
// DefaultJobQueuePlugin,
// JobQueueTestPlugin.init({ queueCount: 10 }),
// ElasticsearchPlugin.init({
// host: 'http://localhost',
// port: 9200,
// }),
ElasticsearchPlugin.init({
host: 'http://localhost',
port: 9200,
bufferUpdates: true,
}),
EmailPlugin.init({
devMode: true,
route: 'mailbox',
Expand Down
36 changes: 30 additions & 6 deletions packages/elasticsearch-plugin/src/elasticsearch-resolver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,21 @@ import {
SearchResponse,
} from '@vendure/common/lib/generated-types';
import { Omit } from '@vendure/common/lib/omit';
import { Allow, Collection, Ctx, FacetValue, RequestContext, SearchResolver } from '@vendure/core';
import {
Allow,
Collection,
Ctx,
FacetValue,
RequestContext,
SearchJobBufferService,
SearchResolver,
} from '@vendure/core';

import { ElasticsearchService } from './elasticsearch.service';
import { ElasticSearchInput, SearchPriceData } from './types';

@Resolver('SearchResponse')
export class ShopElasticSearchResolver
implements Omit<SearchResolver, 'facetValues' | 'collections' | 'reindex'> {
export class ShopElasticSearchResolver implements Pick<SearchResolver, 'search'> {
constructor(private elasticsearchService: ElasticsearchService) {}

@Query()
Expand All @@ -38,8 +45,11 @@ export class ShopElasticSearchResolver
}

@Resolver('SearchResponse')
export class AdminElasticSearchResolver implements Omit<SearchResolver, 'facetValues' | 'collections'> {
constructor(private elasticsearchService: ElasticsearchService) {}
export class AdminElasticSearchResolver implements Pick<SearchResolver, 'search' | 'reindex'> {
constructor(
private elasticsearchService: ElasticsearchService,
private searchJobBufferService: SearchJobBufferService,
) {}

@Query()
@Allow(Permission.ReadCatalog, Permission.ReadProduct)
Expand All @@ -56,7 +66,21 @@ export class AdminElasticSearchResolver implements Omit<SearchResolver, 'facetVa
@Mutation()
@Allow(Permission.UpdateCatalog, Permission.UpdateProduct)
async reindex(@Ctx() ctx: RequestContext): Promise<GraphQLJob> {
return (this.elasticsearchService.reindex(ctx) as unknown) as GraphQLJob;
return this.elasticsearchService.reindex(ctx) as unknown as GraphQLJob;
}

@Query()
@Allow(Permission.UpdateCatalog, Permission.UpdateProduct)
async pendingSearchIndexUpdates(...args: any[]): Promise<any> {
return this.searchJobBufferService.getPendingSearchUpdates();
}

@Mutation()
@Allow(Permission.UpdateCatalog, Permission.UpdateProduct)
async runPendingSearchIndexUpdates(...args: any[]): Promise<any> {
// Intentionally not awaiting this method call
this.searchJobBufferService.runPendingSearchUpdates();
return { success: true };
}
}

Expand Down
3 changes: 3 additions & 0 deletions packages/elasticsearch-plugin/src/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,8 @@ export interface ElasticsearchOptions {
customProductVariantMappings?: {
[fieldName: string]: CustomMapping<[ProductVariant, LanguageCode]>;
};
// TODO: docs
bufferUpdates?: boolean;
}

/**
Expand Down Expand Up @@ -418,6 +420,7 @@ export const defaultOptions: ElasticsearchRuntimeOptions = {
},
customProductMappings: {},
customProductVariantMappings: {},
bufferUpdates: false,
};

export function mergeWithDefaults(userOptions: ElasticsearchOptions): ElasticsearchRuntimeOptions {
Expand Down
9 changes: 9 additions & 0 deletions packages/elasticsearch-plugin/src/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { NodeOptions } from '@elastic/elasticsearch';
import { OnApplicationBootstrap } from '@nestjs/common';
import {
AssetEvent,
BUFFER_SEARCH_INDEX_UPDATES,
CollectionModificationEvent,
EventBus,
HealthCheckRegistryService,
Expand All @@ -13,6 +14,7 @@ import {
ProductEvent,
ProductVariantChannelEvent,
ProductVariantEvent,
SearchJobBufferService,
TaxRateModificationEvent,
Type,
VendurePlugin,
Expand Down Expand Up @@ -201,7 +203,12 @@ import { ElasticsearchOptions, ElasticsearchRuntimeOptions, mergeWithDefaults }
ElasticsearchService,
ElasticsearchHealthIndicator,
ElasticsearchIndexerController,
SearchJobBufferService,
{ provide: ELASTIC_SEARCH_OPTIONS, useFactory: () => ElasticsearchPlugin.options },
{
provide: BUFFER_SEARCH_INDEX_UPDATES,
useFactory: () => ElasticsearchPlugin.options.bufferUpdates === true,
},
],
adminApiExtensions: { resolvers: [AdminElasticSearchResolver, EntityElasticSearchResolver] },
shopApiExtensions: {
Expand Down Expand Up @@ -314,6 +321,7 @@ export class ElasticsearchPlugin implements OnApplicationBootstrap {
}
});

// TODO: Remove this buffering logic because because we have dedicated buffering based on #1137
const collectionModification$ = this.eventBus.ofType(CollectionModificationEvent);
const closingNotifier$ = collectionModification$.pipe(debounceTime(50));
collectionModification$
Expand All @@ -335,6 +343,7 @@ export class ElasticsearchPlugin implements OnApplicationBootstrap {
// The delay prevents a "TransactionNotStartedError" (in SQLite/sqljs) by allowing any existing
// transactions to complete before a new job is added to the queue (assuming the SQL-based
// JobQueueStrategy).
// TODO: should be able to remove owing to f0fd6625
.pipe(delay(1))
.subscribe(event => {
const defaultTaxZone = event.ctx.channel.defaultTaxZone;
Expand Down

0 comments on commit f3fb298

Please sign in to comment.