From 39dc814f7e5baaa443c83237fc0a2a28040fda1e Mon Sep 17 00:00:00 2001 From: Steve Gordon Date: Mon, 1 Mar 2021 15:52:32 +0000 Subject: [PATCH] Support runtime fields on transforms --- src/Nest/XPack/Transform/TransformSource.cs | 17 ++ .../TransformApiWithSettingsTests.cs | 161 ++++++++++++++++++ 2 files changed, 178 insertions(+) diff --git a/src/Nest/XPack/Transform/TransformSource.cs b/src/Nest/XPack/Transform/TransformSource.cs index 5b458709841..30c013cc4b7 100644 --- a/src/Nest/XPack/Transform/TransformSource.cs +++ b/src/Nest/XPack/Transform/TransformSource.cs @@ -23,6 +23,12 @@ public interface ITransformSource /// [DataMember(Name = "query")] QueryContainer Query { get; set; } + + /// + /// Specifies runtime fields which exist only as part of the query. + /// + [DataMember(Name = "runtime_mappings")] + IRuntimeFields RuntimeFields { get; set; } } /// @@ -34,6 +40,9 @@ public class TransformSource /// public QueryContainer Query { get; set; } + + /// + public IRuntimeFields RuntimeFields { get; set; } } /// @@ -41,6 +50,7 @@ public class TransformSourceDescriptor : DescriptorBase public TransformSourceDescriptor Index(Indices indices) => Assign(indices, (a, v) => a.Index = v); @@ -52,5 +62,12 @@ public class TransformSourceDescriptor : DescriptorBase Query(Func, QueryContainer> selector) => Assign(selector, (a, v) => a.Query = v?.Invoke(new QueryContainerDescriptor())); + /// + public TransformSourceDescriptor RuntimeFields(Func, IPromise> runtimeFieldsSelector) => + Assign(runtimeFieldsSelector, (a, v) => a.RuntimeFields = v?.Invoke(new RuntimeFieldsDescriptor())?.Value); + + /// + public TransformSourceDescriptor RuntimeFields(Func, IPromise> runtimeFieldsSelector) where TSource : class => + Assign(runtimeFieldsSelector, (a, v) => a.RuntimeFields = v?.Invoke(new RuntimeFieldsDescriptor())?.Value); } } diff --git a/tests/Tests/XPack/Transform/TransformApiWithSettingsTests.cs b/tests/Tests/XPack/Transform/TransformApiWithSettingsTests.cs index 2784b407c50..0a42d47d77f 100644 --- a/tests/Tests/XPack/Transform/TransformApiWithSettingsTests.cs +++ b/tests/Tests/XPack/Transform/TransformApiWithSettingsTests.cs @@ -311,4 +311,165 @@ protected override LazyResponses ClientUsage() => .MaxPageSearchSize(200) .DocsPerSecond(200)); } + + [SkipVersion("<7.12.0", "Settings introduced in 7.12.0")] + public class TransformApiWithRuntimeFieldsTests + : ApiIntegrationTestBase, IPreviewTransformRequest, PreviewTransformDescriptor, PreviewTransformRequest> + { + private const string RuntimeFieldName = "search_runtime_field"; + private const string RuntimeFieldScript = "if (doc['type'].size() != 0) {emit(doc['type'].value.toUpperCase())}"; + + public TransformApiWithRuntimeFieldsTests(WritableCluster cluster, EndpointUsage usage) : base(cluster, usage) { } + + protected override LazyResponses ClientUsage() => Calls( + (client, f) => client.Transform.Preview(f), + (client, f) => client.Transform.PreviewAsync(f), + (client, r) => client.Transform.Preview(r), + (client, r) => client.Transform.PreviewAsync(r)); + + protected override HttpMethod HttpMethod => HttpMethod.POST; + protected override string UrlPath => "_transform/_preview"; + protected override bool ExpectIsValid => true; + protected override int ExpectStatusCode => 200; + protected override bool SupportsDeserialization => false; + + protected override object ExpectJson => + new + { + description = CallIsolatedValue, + frequency = "1s", + source = new { + index = new[] { "project" }, + query = new { match_all = new { } }, + runtime_mappings = new + { + search_runtime_field = new + { + script = new + { + lang = "painless", + source = RuntimeFieldScript + }, + type = "keyword" + } + } + }, + dest = new { index = $"transform-{CallIsolatedValue}" }, + pivot = new + { + aggregations = new + { + averageCommits = new + { + avg = new + { + field = "numberOfCommits" + } + }, + sumIntoMaster = new + { + scripted_metric = new + { + combine_script = new + { + source = "long sum = 0; for (s in state.masterCommits) { sum += s } return sum" + }, + init_script = new + { + source = "state.masterCommits = []" + }, + map_script = new + { + source = "state.masterCommits.add(doc['branches.keyword'].contains('master')? 1 : 0)" + }, + reduce_script = new + { + source = "long sum = 0; for (s in states) { sum += s } return sum" + } + } + } + }, + group_by = new { type = new { terms = new { field = "search_runtime_field" } } } + }, + sync = new + { + time = new + { + field = "lastActivity" + } + } + }; + + protected override PreviewTransformRequest Initializer => new() + { + Description = CallIsolatedValue, + Frequency = "1s", + Source = new TransformSource { Index = Index(), Query = new MatchAllQuery(), + RuntimeFields = new RuntimeFields + { + { RuntimeFieldName, new RuntimeField + { + Type = FieldType.Keyword, + Script = new PainlessScript(RuntimeFieldScript) + } + } + } + }, + Destination = new TransformDestination { Index = $"transform-{CallIsolatedValue}" }, + Pivot = new TransformPivot + { + Aggregations = + new AverageAggregation("averageCommits", Field(f => f.NumberOfCommits)) && + new ScriptedMetricAggregation("sumIntoMaster") + { + InitScript = new InlineScript("state.masterCommits = []"), + MapScript = new InlineScript("state.masterCommits.add(doc['branches.keyword'].contains('master')? 1 : 0)"), + CombineScript = new InlineScript("long sum = 0; for (s in state.masterCommits) { sum += s } return sum"), + ReduceScript = new InlineScript("long sum = 0; for (s in states) { sum += s } return sum") + }, + GroupBy = new Dictionary + { + { + "type", + new TermsGroupSource { Field = "search_runtime_field"} + } + } + + }, + Sync = new TransformSyncContainer(new TransformTimeSync { Field = Field(f => f.LastActivity) }) + }; + + protected override Func, IPreviewTransformRequest> Fluent => f => f + .Description(CallIsolatedValue) + .Frequency(new Time(1, TimeUnit.Second)) + .Source(s => s + .Index() + .Query(q => q.MatchAll()) + .RuntimeFields(rtf => rtf.RuntimeField(RuntimeFieldName, FieldType.Keyword, r => r.Script(RuntimeFieldScript))) + ) + .Destination(de => de + .Index($"transform-{CallIsolatedValue}") + ) + .Pivot(p => p + .Aggregations(a => a + .Average("averageCommits", avg => avg + .Field(fld => fld.NumberOfCommits) + ) + .ScriptedMetric("sumIntoMaster", sm => sm + .InitScript("state.masterCommits = []") + .MapScript("state.masterCommits.add(doc['branches.keyword'].contains('master')? 1 : 0)") + .CombineScript("long sum = 0; for (s in state.masterCommits) { sum += s } return sum") + .ReduceScript("long sum = 0; for (s in states) { sum += s } return sum") + ) + ) + .GroupBy(g => g + .Terms("type", t => t.Field("search_runtime_field")) + ) + ) + .Sync(sy => sy + .Time(t => t + .Field(fld => fld.LastActivity) + ) + ); + } }