Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support runtime fields on transforms #5377

Merged
merged 1 commit into from
Mar 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/Nest/XPack/Transform/TransformSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ public interface ITransformSource
/// </summary>
[DataMember(Name = "query")]
QueryContainer Query { get; set; }

/// <summary>
/// Specifies runtime fields which exist only as part of the query.
/// </summary>
[DataMember(Name = "runtime_mappings")]
IRuntimeFields RuntimeFields { get; set; }
}

/// <inheritdoc />
Expand All @@ -34,13 +40,17 @@ public class TransformSource

/// <inheritdoc />
public QueryContainer Query { get; set; }

/// <inheritdoc />
public IRuntimeFields RuntimeFields { get; set; }
}

/// <inheritdoc cref="ITransformSource" />
public class TransformSourceDescriptor<T> : DescriptorBase<TransformSourceDescriptor<T>, ITransformSource>, ITransformSource where T : class
{
Indices ITransformSource.Index { get; set; }
QueryContainer ITransformSource.Query { get; set; }
IRuntimeFields ITransformSource.RuntimeFields { get; set; }

/// <inheritdoc cref="ITransformSource.Index" />
public TransformSourceDescriptor<T> Index(Indices indices) => Assign(indices, (a, v) => a.Index = v);
Expand All @@ -52,5 +62,12 @@ public class TransformSourceDescriptor<T> : DescriptorBase<TransformSourceDescri
public TransformSourceDescriptor<T> Query(Func<QueryContainerDescriptor<T>, QueryContainer> selector) =>
Assign(selector, (a, v) => a.Query = v?.Invoke(new QueryContainerDescriptor<T>()));

/// <inheritdoc cref="ITransformSource.RuntimeFields" />
public TransformSourceDescriptor<T> RuntimeFields(Func<RuntimeFieldsDescriptor<T>, IPromise<IRuntimeFields>> runtimeFieldsSelector) =>
Assign(runtimeFieldsSelector, (a, v) => a.RuntimeFields = v?.Invoke(new RuntimeFieldsDescriptor<T>())?.Value);

/// <inheritdoc cref="ITransformSource.RuntimeFields" />
public TransformSourceDescriptor<T> RuntimeFields<TSource>(Func<RuntimeFieldsDescriptor<TSource>, IPromise<IRuntimeFields>> runtimeFieldsSelector) where TSource : class =>
Assign(runtimeFieldsSelector, (a, v) => a.RuntimeFields = v?.Invoke(new RuntimeFieldsDescriptor<TSource>())?.Value);
}
}
161 changes: 161 additions & 0 deletions tests/Tests/XPack/Transform/TransformApiWithSettingsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -311,4 +311,165 @@ protected override LazyResponses ClientUsage() =>
.MaxPageSearchSize(200)
.DocsPerSecond(200));
}

[SkipVersion("<7.12.0", "Settings introduced in 7.12.0")]
public class TransformApiWithRuntimeFieldsTests
: ApiIntegrationTestBase<WritableCluster, PreviewTransformResponse<ProjectTransform>, IPreviewTransformRequest, PreviewTransformDescriptor<Project>, PreviewTransformRequest>
{
private const string RuntimeFieldName = "search_runtime_field";
private const string RuntimeFieldScript = "if (doc['type'].size() != 0) {emit(doc['type'].value.toUpperCase())}";

public TransformApiWithRuntimeFieldsTests(WritableCluster cluster, EndpointUsage usage) : base(cluster, usage) { }

protected override LazyResponses ClientUsage() => Calls(
(client, f) => client.Transform.Preview<Project, ProjectTransform>(f),
(client, f) => client.Transform.PreviewAsync<Project, ProjectTransform>(f),
(client, r) => client.Transform.Preview<ProjectTransform>(r),
(client, r) => client.Transform.PreviewAsync<ProjectTransform>(r));

protected override HttpMethod HttpMethod => HttpMethod.POST;
protected override string UrlPath => "_transform/_preview";
protected override bool ExpectIsValid => true;
protected override int ExpectStatusCode => 200;
protected override bool SupportsDeserialization => false;

protected override object ExpectJson =>
new
{
description = CallIsolatedValue,
frequency = "1s",
source = new {
index = new[] { "project" },
query = new { match_all = new { } },
runtime_mappings = new
{
search_runtime_field = new
{
script = new
{
lang = "painless",
source = RuntimeFieldScript
},
type = "keyword"
}
}
},
dest = new { index = $"transform-{CallIsolatedValue}" },
pivot = new
{
aggregations = new
{
averageCommits = new
{
avg = new
{
field = "numberOfCommits"
}
},
sumIntoMaster = new
{
scripted_metric = new
{
combine_script = new
{
source = "long sum = 0; for (s in state.masterCommits) { sum += s } return sum"
},
init_script = new
{
source = "state.masterCommits = []"
},
map_script = new
{
source = "state.masterCommits.add(doc['branches.keyword'].contains('master')? 1 : 0)"
},
reduce_script = new
{
source = "long sum = 0; for (s in states) { sum += s } return sum"
}
}
}
},
group_by = new { type = new { terms = new { field = "search_runtime_field" } } }
},
sync = new
{
time = new
{
field = "lastActivity"
}
}
};

protected override PreviewTransformRequest Initializer => new()
{
Description = CallIsolatedValue,
Frequency = "1s",
Source = new TransformSource { Index = Index<Project>(), Query = new MatchAllQuery(),
RuntimeFields = new RuntimeFields
{
{ RuntimeFieldName, new RuntimeField
{
Type = FieldType.Keyword,
Script = new PainlessScript(RuntimeFieldScript)
}
}
}
},
Destination = new TransformDestination { Index = $"transform-{CallIsolatedValue}" },
Pivot = new TransformPivot
{
Aggregations =
new AverageAggregation("averageCommits", Field<Project>(f => f.NumberOfCommits)) &&
new ScriptedMetricAggregation("sumIntoMaster")
{
InitScript = new InlineScript("state.masterCommits = []"),
MapScript = new InlineScript("state.masterCommits.add(doc['branches.keyword'].contains('master')? 1 : 0)"),
CombineScript = new InlineScript("long sum = 0; for (s in state.masterCommits) { sum += s } return sum"),
ReduceScript = new InlineScript("long sum = 0; for (s in states) { sum += s } return sum")
},
GroupBy = new Dictionary<string, ISingleGroupSource>
{
{
"type",
new TermsGroupSource { Field = "search_runtime_field"}
}
}

},
Sync = new TransformSyncContainer(new TransformTimeSync { Field = Field<Project>(f => f.LastActivity) })
};

protected override Func<PreviewTransformDescriptor<Project>, IPreviewTransformRequest> Fluent => f => f
.Description(CallIsolatedValue)
.Frequency(new Time(1, TimeUnit.Second))
.Source(s => s
.Index<Project>()
.Query(q => q.MatchAll())
.RuntimeFields(rtf => rtf.RuntimeField(RuntimeFieldName, FieldType.Keyword, r => r.Script(RuntimeFieldScript)))
)
.Destination(de => de
.Index($"transform-{CallIsolatedValue}")
)
.Pivot(p => p
.Aggregations(a => a
.Average("averageCommits", avg => avg
.Field(fld => fld.NumberOfCommits)
)
.ScriptedMetric("sumIntoMaster", sm => sm
.InitScript("state.masterCommits = []")
.MapScript("state.masterCommits.add(doc['branches.keyword'].contains('master')? 1 : 0)")
.CombineScript("long sum = 0; for (s in state.masterCommits) { sum += s } return sum")
.ReduceScript("long sum = 0; for (s in states) { sum += s } return sum")
)
)
.GroupBy(g => g
.Terms("type", t => t.Field("search_runtime_field"))
)
)
.Sync(sy => sy
.Time(t => t
.Field(fld => fld.LastActivity)
)
);
}
}