Skip to content

Commit

Permalink
Support runtime fields on transforms
Browse files Browse the repository at this point in the history
  • Loading branch information
stevejgordon committed Mar 2, 2021
1 parent 353b26b commit 39dc814
Show file tree
Hide file tree
Showing 2 changed files with 178 additions and 0 deletions.
17 changes: 17 additions & 0 deletions src/Nest/XPack/Transform/TransformSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ public interface ITransformSource
/// </summary>
[DataMember(Name = "query")]
QueryContainer Query { get; set; }

/// <summary>
/// Specifies runtime fields which exist only as part of the query.
/// </summary>
[DataMember(Name = "runtime_mappings")]
IRuntimeFields RuntimeFields { get; set; }
}

/// <inheritdoc />
Expand All @@ -34,13 +40,17 @@ public class TransformSource

/// <inheritdoc />
public QueryContainer Query { get; set; }

/// <inheritdoc />
public IRuntimeFields RuntimeFields { get; set; }
}

/// <inheritdoc cref="ITransformSource" />
public class TransformSourceDescriptor<T> : DescriptorBase<TransformSourceDescriptor<T>, ITransformSource>, ITransformSource where T : class
{
Indices ITransformSource.Index { get; set; }
QueryContainer ITransformSource.Query { get; set; }
IRuntimeFields ITransformSource.RuntimeFields { get; set; }

/// <inheritdoc cref="ITransformSource.Index" />
public TransformSourceDescriptor<T> Index(Indices indices) => Assign(indices, (a, v) => a.Index = v);
Expand All @@ -52,5 +62,12 @@ public class TransformSourceDescriptor<T> : DescriptorBase<TransformSourceDescri
public TransformSourceDescriptor<T> Query(Func<QueryContainerDescriptor<T>, QueryContainer> selector) =>
Assign(selector, (a, v) => a.Query = v?.Invoke(new QueryContainerDescriptor<T>()));

/// <inheritdoc cref="ITransformSource.RuntimeFields" />
public TransformSourceDescriptor<T> RuntimeFields(Func<RuntimeFieldsDescriptor<T>, IPromise<IRuntimeFields>> runtimeFieldsSelector) =>
Assign(runtimeFieldsSelector, (a, v) => a.RuntimeFields = v?.Invoke(new RuntimeFieldsDescriptor<T>())?.Value);

/// <inheritdoc cref="ITransformSource.RuntimeFields" />
public TransformSourceDescriptor<T> RuntimeFields<TSource>(Func<RuntimeFieldsDescriptor<TSource>, IPromise<IRuntimeFields>> runtimeFieldsSelector) where TSource : class =>
Assign(runtimeFieldsSelector, (a, v) => a.RuntimeFields = v?.Invoke(new RuntimeFieldsDescriptor<TSource>())?.Value);
}
}
161 changes: 161 additions & 0 deletions tests/Tests/XPack/Transform/TransformApiWithSettingsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -311,4 +311,165 @@ protected override LazyResponses ClientUsage() =>
.MaxPageSearchSize(200)
.DocsPerSecond(200));
}

[SkipVersion("<7.12.0", "Settings introduced in 7.12.0")]
public class TransformApiWithRuntimeFieldsTests
: ApiIntegrationTestBase<WritableCluster, PreviewTransformResponse<ProjectTransform>, IPreviewTransformRequest, PreviewTransformDescriptor<Project>, PreviewTransformRequest>
{
private const string RuntimeFieldName = "search_runtime_field";
private const string RuntimeFieldScript = "if (doc['type'].size() != 0) {emit(doc['type'].value.toUpperCase())}";

public TransformApiWithRuntimeFieldsTests(WritableCluster cluster, EndpointUsage usage) : base(cluster, usage) { }

protected override LazyResponses ClientUsage() => Calls(
(client, f) => client.Transform.Preview<Project, ProjectTransform>(f),
(client, f) => client.Transform.PreviewAsync<Project, ProjectTransform>(f),
(client, r) => client.Transform.Preview<ProjectTransform>(r),
(client, r) => client.Transform.PreviewAsync<ProjectTransform>(r));

protected override HttpMethod HttpMethod => HttpMethod.POST;
protected override string UrlPath => "_transform/_preview";
protected override bool ExpectIsValid => true;
protected override int ExpectStatusCode => 200;
protected override bool SupportsDeserialization => false;

protected override object ExpectJson =>
new
{
description = CallIsolatedValue,
frequency = "1s",
source = new {
index = new[] { "project" },
query = new { match_all = new { } },
runtime_mappings = new
{
search_runtime_field = new
{
script = new
{
lang = "painless",
source = RuntimeFieldScript
},
type = "keyword"
}
}
},
dest = new { index = $"transform-{CallIsolatedValue}" },
pivot = new
{
aggregations = new
{
averageCommits = new
{
avg = new
{
field = "numberOfCommits"
}
},
sumIntoMaster = new
{
scripted_metric = new
{
combine_script = new
{
source = "long sum = 0; for (s in state.masterCommits) { sum += s } return sum"
},
init_script = new
{
source = "state.masterCommits = []"
},
map_script = new
{
source = "state.masterCommits.add(doc['branches.keyword'].contains('master')? 1 : 0)"
},
reduce_script = new
{
source = "long sum = 0; for (s in states) { sum += s } return sum"
}
}
}
},
group_by = new { type = new { terms = new { field = "search_runtime_field" } } }
},
sync = new
{
time = new
{
field = "lastActivity"
}
}
};

protected override PreviewTransformRequest Initializer => new()
{
Description = CallIsolatedValue,
Frequency = "1s",
Source = new TransformSource { Index = Index<Project>(), Query = new MatchAllQuery(),
RuntimeFields = new RuntimeFields
{
{ RuntimeFieldName, new RuntimeField
{
Type = FieldType.Keyword,
Script = new PainlessScript(RuntimeFieldScript)
}
}
}
},
Destination = new TransformDestination { Index = $"transform-{CallIsolatedValue}" },
Pivot = new TransformPivot
{
Aggregations =
new AverageAggregation("averageCommits", Field<Project>(f => f.NumberOfCommits)) &&
new ScriptedMetricAggregation("sumIntoMaster")
{
InitScript = new InlineScript("state.masterCommits = []"),
MapScript = new InlineScript("state.masterCommits.add(doc['branches.keyword'].contains('master')? 1 : 0)"),
CombineScript = new InlineScript("long sum = 0; for (s in state.masterCommits) { sum += s } return sum"),
ReduceScript = new InlineScript("long sum = 0; for (s in states) { sum += s } return sum")
},
GroupBy = new Dictionary<string, ISingleGroupSource>
{
{
"type",
new TermsGroupSource { Field = "search_runtime_field"}
}
}

},
Sync = new TransformSyncContainer(new TransformTimeSync { Field = Field<Project>(f => f.LastActivity) })
};

protected override Func<PreviewTransformDescriptor<Project>, IPreviewTransformRequest> Fluent => f => f
.Description(CallIsolatedValue)
.Frequency(new Time(1, TimeUnit.Second))
.Source(s => s
.Index<Project>()
.Query(q => q.MatchAll())
.RuntimeFields(rtf => rtf.RuntimeField(RuntimeFieldName, FieldType.Keyword, r => r.Script(RuntimeFieldScript)))
)
.Destination(de => de
.Index($"transform-{CallIsolatedValue}")
)
.Pivot(p => p
.Aggregations(a => a
.Average("averageCommits", avg => avg
.Field(fld => fld.NumberOfCommits)
)
.ScriptedMetric("sumIntoMaster", sm => sm
.InitScript("state.masterCommits = []")
.MapScript("state.masterCommits.add(doc['branches.keyword'].contains('master')? 1 : 0)")
.CombineScript("long sum = 0; for (s in state.masterCommits) { sum += s } return sum")
.ReduceScript("long sum = 0; for (s in states) { sum += s } return sum")
)
)
.GroupBy(g => g
.Terms("type", t => t.Field("search_runtime_field"))
)
)
.Sync(sy => sy
.Time(t => t
.Field(fld => fld.LastActivity)
)
);
}
}

0 comments on commit 39dc814

Please sign in to comment.