From f3fb298d82a60a1e1b0723ef84852bcdc76384ba Mon Sep 17 00:00:00 2001 From: Michael Bromley Date: Thu, 7 Oct 2021 21:04:29 +0200 Subject: [PATCH] feat(elasticsearch-plugin): Support search index job batching Relates to #1137 --- packages/dev-server/dev-config.ts | 12 ++++--- .../src/elasticsearch-resolver.ts | 36 +++++++++++++++---- packages/elasticsearch-plugin/src/options.ts | 3 ++ packages/elasticsearch-plugin/src/plugin.ts | 9 +++++ 4 files changed, 49 insertions(+), 11 deletions(-) diff --git a/packages/dev-server/dev-config.ts b/packages/dev-server/dev-config.ts index 621436be70..95e44552e9 100644 --- a/packages/dev-server/dev-config.ts +++ b/packages/dev-server/dev-config.ts @@ -9,6 +9,7 @@ import { LogLevel, VendureConfig, } from '@vendure/core'; +import { ElasticsearchPlugin } from '@vendure/elasticsearch-plugin'; import { defaultEmailHandlers, EmailPlugin } from '@vendure/email-plugin'; import { BullMQJobQueuePlugin } from '@vendure/job-queue-plugin/package/bullmq'; import path from 'path'; @@ -64,14 +65,15 @@ export const devConfig: VendureConfig = { route: 'assets', assetUploadDir: path.join(__dirname, 'assets'), }), - DefaultSearchPlugin.init({ bufferUpdates: true }), + // DefaultSearchPlugin.init({ bufferUpdates: true }), BullMQJobQueuePlugin.init({}), // DefaultJobQueuePlugin, // JobQueueTestPlugin.init({ queueCount: 10 }), - // ElasticsearchPlugin.init({ - // host: 'http://localhost', - // port: 9200, - // }), + ElasticsearchPlugin.init({ + host: 'http://localhost', + port: 9200, + bufferUpdates: true, + }), EmailPlugin.init({ devMode: true, route: 'mailbox', diff --git a/packages/elasticsearch-plugin/src/elasticsearch-resolver.ts b/packages/elasticsearch-plugin/src/elasticsearch-resolver.ts index 43bed6c26f..6acf4b672a 100644 --- a/packages/elasticsearch-plugin/src/elasticsearch-resolver.ts +++ b/packages/elasticsearch-plugin/src/elasticsearch-resolver.ts @@ -6,14 +6,21 @@ import { SearchResponse, } from '@vendure/common/lib/generated-types'; import { Omit } from '@vendure/common/lib/omit'; -import { Allow, Collection, Ctx, FacetValue, RequestContext, SearchResolver } from '@vendure/core'; +import { + Allow, + Collection, + Ctx, + FacetValue, + RequestContext, + SearchJobBufferService, + SearchResolver, +} from '@vendure/core'; import { ElasticsearchService } from './elasticsearch.service'; import { ElasticSearchInput, SearchPriceData } from './types'; @Resolver('SearchResponse') -export class ShopElasticSearchResolver - implements Omit { +export class ShopElasticSearchResolver implements Pick { constructor(private elasticsearchService: ElasticsearchService) {} @Query() @@ -38,8 +45,11 @@ export class ShopElasticSearchResolver } @Resolver('SearchResponse') -export class AdminElasticSearchResolver implements Omit { - constructor(private elasticsearchService: ElasticsearchService) {} +export class AdminElasticSearchResolver implements Pick { + constructor( + private elasticsearchService: ElasticsearchService, + private searchJobBufferService: SearchJobBufferService, + ) {} @Query() @Allow(Permission.ReadCatalog, Permission.ReadProduct) @@ -56,7 +66,21 @@ export class AdminElasticSearchResolver implements Omit { - return (this.elasticsearchService.reindex(ctx) as unknown) as GraphQLJob; + return this.elasticsearchService.reindex(ctx) as unknown as GraphQLJob; + } + + @Query() + @Allow(Permission.UpdateCatalog, Permission.UpdateProduct) + async pendingSearchIndexUpdates(...args: any[]): Promise { + return this.searchJobBufferService.getPendingSearchUpdates(); + } + + @Mutation() + @Allow(Permission.UpdateCatalog, Permission.UpdateProduct) + async runPendingSearchIndexUpdates(...args: any[]): Promise { + // Intentionally not awaiting this method call + this.searchJobBufferService.runPendingSearchUpdates(); + return { success: true }; } } diff --git a/packages/elasticsearch-plugin/src/options.ts b/packages/elasticsearch-plugin/src/options.ts index df1ab4f705..e5462a6d5f 100644 --- a/packages/elasticsearch-plugin/src/options.ts +++ b/packages/elasticsearch-plugin/src/options.ts @@ -205,6 +205,8 @@ export interface ElasticsearchOptions { customProductVariantMappings?: { [fieldName: string]: CustomMapping<[ProductVariant, LanguageCode]>; }; + // TODO: docs + bufferUpdates?: boolean; } /** @@ -418,6 +420,7 @@ export const defaultOptions: ElasticsearchRuntimeOptions = { }, customProductMappings: {}, customProductVariantMappings: {}, + bufferUpdates: false, }; export function mergeWithDefaults(userOptions: ElasticsearchOptions): ElasticsearchRuntimeOptions { diff --git a/packages/elasticsearch-plugin/src/plugin.ts b/packages/elasticsearch-plugin/src/plugin.ts index 356a05a54a..b1851f2bd6 100644 --- a/packages/elasticsearch-plugin/src/plugin.ts +++ b/packages/elasticsearch-plugin/src/plugin.ts @@ -2,6 +2,7 @@ import { NodeOptions } from '@elastic/elasticsearch'; import { OnApplicationBootstrap } from '@nestjs/common'; import { AssetEvent, + BUFFER_SEARCH_INDEX_UPDATES, CollectionModificationEvent, EventBus, HealthCheckRegistryService, @@ -13,6 +14,7 @@ import { ProductEvent, ProductVariantChannelEvent, ProductVariantEvent, + SearchJobBufferService, TaxRateModificationEvent, Type, VendurePlugin, @@ -201,7 +203,12 @@ import { ElasticsearchOptions, ElasticsearchRuntimeOptions, mergeWithDefaults } ElasticsearchService, ElasticsearchHealthIndicator, ElasticsearchIndexerController, + SearchJobBufferService, { provide: ELASTIC_SEARCH_OPTIONS, useFactory: () => ElasticsearchPlugin.options }, + { + provide: BUFFER_SEARCH_INDEX_UPDATES, + useFactory: () => ElasticsearchPlugin.options.bufferUpdates === true, + }, ], adminApiExtensions: { resolvers: [AdminElasticSearchResolver, EntityElasticSearchResolver] }, shopApiExtensions: { @@ -314,6 +321,7 @@ export class ElasticsearchPlugin implements OnApplicationBootstrap { } }); + // TODO: Remove this buffering logic because because we have dedicated buffering based on #1137 const collectionModification$ = this.eventBus.ofType(CollectionModificationEvent); const closingNotifier$ = collectionModification$.pipe(debounceTime(50)); collectionModification$ @@ -335,6 +343,7 @@ export class ElasticsearchPlugin implements OnApplicationBootstrap { // The delay prevents a "TransactionNotStartedError" (in SQLite/sqljs) by allowing any existing // transactions to complete before a new job is added to the queue (assuming the SQL-based // JobQueueStrategy). + // TODO: should be able to remove owing to f0fd6625 .pipe(delay(1)) .subscribe(event => { const defaultTaxZone = event.ctx.channel.defaultTaxZone;