Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,11 @@ namespace Akka.Streams
public System.Collections.Generic.IEnumerable<Akka.Streams.Attributes.IAttribute> AttributeList { get; }
public Akka.Streams.Attributes And(Akka.Streams.Attributes other) { }
public Akka.Streams.Attributes And(Akka.Streams.Attributes.IAttribute other) { }
[System.ObsoleteAttribute("Use GetAttribute<TAttr>() instead")]
public bool Contains<TAttr>(TAttr attribute)
where TAttr : Akka.Streams.Attributes.IAttribute { }
public bool Contains<TAttr>()
where TAttr : Akka.Streams.Attributes.IAttribute { }
public static Akka.Streams.Attributes CreateAsyncBoundary() { }
public static Akka.Streams.Attributes CreateInputBuffer(int initial, int max) { }
public static Akka.Streams.Attributes CreateLogLevels(Akka.Event.LogLevel onElement = 0, Akka.Event.LogLevel onFinish = 0, Akka.Event.LogLevel onError = 3) { }
Expand Down Expand Up @@ -1400,7 +1403,7 @@ namespace Akka.Streams.Dsl
where TOut : TInjected { }
public static Akka.Streams.Dsl.Flow<TIn, TOut, TMat> Limit<TIn, TOut, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, long max) { }
public static Akka.Streams.Dsl.Flow<TIn, TOut, TMat> LimitWeighted<TIn, TOut, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, long max, System.Func<TOut, long> costFunc) { }
public static Akka.Streams.Dsl.Flow<TIn, TOut, TMat> Log<TIn, TOut, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, string name, System.Func<TOut, object> extract = null, Akka.Event.ILoggingAdapter log = null) { }
public static Akka.Streams.Dsl.Flow<TIn, TOut, TMat> Log<TIn, TOut, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, string name, System.Func<TOut, object> extract = null, Akka.Event.ILoggingAdapter log = null, Akka.Event.LogLevel logLevel = 0) { }
public static Akka.Streams.Dsl.Flow<TIn, TOut2, TMat> Merge<TIn, TOut1, TOut2, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut1, TMat> flow, Akka.Streams.IGraph<Akka.Streams.SourceShape<TOut2>, TMat> other, bool eagerComplete = False)
where TOut1 : TOut2 { }
public static Akka.Streams.Dsl.Flow<TIn, TOut2, TMat> MergeMany<TIn, TOut1, TOut2, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut1, TMat> flow, int breadth, System.Func<TOut1, Akka.Streams.IGraph<Akka.Streams.SourceShape<TOut2>, TMat>> flatten) { }
Expand Down Expand Up @@ -2101,7 +2104,7 @@ namespace Akka.Streams.Dsl
where TOut : TInjected { }
public static Akka.Streams.Dsl.Source<T, TMat> Limit<T, TMat>(this Akka.Streams.Dsl.Source<T, TMat> flow, long max) { }
public static Akka.Streams.Dsl.Source<T, TMat> LimitWeighted<T, TMat>(this Akka.Streams.Dsl.Source<T, TMat> flow, long max, System.Func<T, long> costFunc) { }
public static Akka.Streams.Dsl.Source<TOut, TMat> Log<TOut, TMat>(this Akka.Streams.Dsl.Source<TOut, TMat> flow, string name, System.Func<TOut, object> extract = null, Akka.Event.ILoggingAdapter log = null) { }
public static Akka.Streams.Dsl.Source<TOut, TMat> Log<TOut, TMat>(this Akka.Streams.Dsl.Source<TOut, TMat> flow, string name, System.Func<TOut, object> extract = null, Akka.Event.ILoggingAdapter log = null, Akka.Event.LogLevel logLevel = 0) { }
public static Akka.Streams.Dsl.Source<TOut2, TMat> Merge<TOut1, TOut2, TMat>(this Akka.Streams.Dsl.Source<TOut1, TMat> flow, Akka.Streams.IGraph<Akka.Streams.SourceShape<TOut2>, TMat> other, bool eagerComplete = False)
where TOut1 : TOut2 { }
public static Akka.Streams.Dsl.Source<TOut2, TMat> MergeMany<TOut1, TOut2, TMat>(this Akka.Streams.Dsl.Source<TOut1, TMat> flow, int breadth, System.Func<TOut1, Akka.Streams.IGraph<Akka.Streams.SourceShape<TOut2>, TMat>> flatten) { }
Expand Down Expand Up @@ -4357,7 +4360,7 @@ namespace Akka.Streams.Implementation.Fusing
[Akka.Annotations.InternalApiAttribute()]
public sealed class Log<T> : Akka.Streams.Implementation.Fusing.SimpleLinearGraphStage<T>
{
public Log(string name, System.Func<T, object> extract, Akka.Event.ILoggingAdapter adapter) { }
public Log(string name, System.Func<T, object> extract, Akka.Event.ILoggingAdapter adapter, Akka.Event.LogLevel defaultLogLevel) { }
protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { }
public override string ToString() { }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,11 @@ namespace Akka.Streams
public System.Collections.Generic.IEnumerable<Akka.Streams.Attributes.IAttribute> AttributeList { get; }
public Akka.Streams.Attributes And(Akka.Streams.Attributes other) { }
public Akka.Streams.Attributes And(Akka.Streams.Attributes.IAttribute other) { }
[System.ObsoleteAttribute("Use GetAttribute<TAttr>() instead")]
public bool Contains<TAttr>(TAttr attribute)
where TAttr : Akka.Streams.Attributes.IAttribute { }
public bool Contains<TAttr>()
where TAttr : Akka.Streams.Attributes.IAttribute { }
public static Akka.Streams.Attributes CreateAsyncBoundary() { }
public static Akka.Streams.Attributes CreateInputBuffer(int initial, int max) { }
public static Akka.Streams.Attributes CreateLogLevels(Akka.Event.LogLevel onElement = 0, Akka.Event.LogLevel onFinish = 0, Akka.Event.LogLevel onError = 3) { }
Expand Down Expand Up @@ -1399,7 +1402,7 @@ namespace Akka.Streams.Dsl
where TOut : TInjected { }
public static Akka.Streams.Dsl.Flow<TIn, TOut, TMat> Limit<TIn, TOut, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, long max) { }
public static Akka.Streams.Dsl.Flow<TIn, TOut, TMat> LimitWeighted<TIn, TOut, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, long max, System.Func<TOut, long> costFunc) { }
public static Akka.Streams.Dsl.Flow<TIn, TOut, TMat> Log<TIn, TOut, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, string name, System.Func<TOut, object> extract = null, Akka.Event.ILoggingAdapter log = null) { }
public static Akka.Streams.Dsl.Flow<TIn, TOut, TMat> Log<TIn, TOut, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, string name, System.Func<TOut, object> extract = null, Akka.Event.ILoggingAdapter log = null, Akka.Event.LogLevel logLevel = 0) { }
public static Akka.Streams.Dsl.Flow<TIn, TOut2, TMat> Merge<TIn, TOut1, TOut2, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut1, TMat> flow, Akka.Streams.IGraph<Akka.Streams.SourceShape<TOut2>, TMat> other, bool eagerComplete = False)
where TOut1 : TOut2 { }
public static Akka.Streams.Dsl.Flow<TIn, TOut2, TMat> MergeMany<TIn, TOut1, TOut2, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut1, TMat> flow, int breadth, System.Func<TOut1, Akka.Streams.IGraph<Akka.Streams.SourceShape<TOut2>, TMat>> flatten) { }
Expand Down Expand Up @@ -2100,7 +2103,7 @@ namespace Akka.Streams.Dsl
where TOut : TInjected { }
public static Akka.Streams.Dsl.Source<T, TMat> Limit<T, TMat>(this Akka.Streams.Dsl.Source<T, TMat> flow, long max) { }
public static Akka.Streams.Dsl.Source<T, TMat> LimitWeighted<T, TMat>(this Akka.Streams.Dsl.Source<T, TMat> flow, long max, System.Func<T, long> costFunc) { }
public static Akka.Streams.Dsl.Source<TOut, TMat> Log<TOut, TMat>(this Akka.Streams.Dsl.Source<TOut, TMat> flow, string name, System.Func<TOut, object> extract = null, Akka.Event.ILoggingAdapter log = null) { }
public static Akka.Streams.Dsl.Source<TOut, TMat> Log<TOut, TMat>(this Akka.Streams.Dsl.Source<TOut, TMat> flow, string name, System.Func<TOut, object> extract = null, Akka.Event.ILoggingAdapter log = null, Akka.Event.LogLevel logLevel = 0) { }
public static Akka.Streams.Dsl.Source<TOut2, TMat> Merge<TOut1, TOut2, TMat>(this Akka.Streams.Dsl.Source<TOut1, TMat> flow, Akka.Streams.IGraph<Akka.Streams.SourceShape<TOut2>, TMat> other, bool eagerComplete = False)
where TOut1 : TOut2 { }
public static Akka.Streams.Dsl.Source<TOut2, TMat> MergeMany<TOut1, TOut2, TMat>(this Akka.Streams.Dsl.Source<TOut1, TMat> flow, int breadth, System.Func<TOut1, Akka.Streams.IGraph<Akka.Streams.SourceShape<TOut2>, TMat>> flatten) { }
Expand Down Expand Up @@ -4331,7 +4334,7 @@ namespace Akka.Streams.Implementation.Fusing
[Akka.Annotations.InternalApiAttribute()]
public sealed class Log<T> : Akka.Streams.Implementation.Fusing.SimpleLinearGraphStage<T>
{
public Log(string name, System.Func<T, object> extract, Akka.Event.ILoggingAdapter adapter) { }
public Log(string name, System.Func<T, object> extract, Akka.Event.ILoggingAdapter adapter, Akka.Event.LogLevel defaultLogLevel) { }
protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { }
public override string ToString() { }
}
Expand Down
9 changes: 9 additions & 0 deletions src/core/Akka.Streams.Tests/Dsl/AttributesSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

using System;
using System.Threading.Tasks;
using Akka.Event;
using Akka.Streams.Dsl;
using Akka.Streams.Implementation;
using Akka.Streams.TestKit;
Expand Down Expand Up @@ -44,6 +45,14 @@ public async Task Attributes_must_be_overridable_on_a_module_basis()
complete.GetAttribute<Attributes.Name>().Value.Should().Contain("new-name");
}

[Fact]
public void Attributes_Contains_should_not_return_true_if_doesnt_exist()
{
var attributes = Attributes.CreateName("new-name");
attributes.Contains<Attributes.LogLevels>().Should().BeFalse();
attributes.Contains<Attributes.Name>().Should().BeTrue();
}

[Fact]
public async Task Attributes_must_keep_the_outermost_attribute_as_the_least_specific()
{
Expand Down
10 changes: 10 additions & 0 deletions src/core/Akka.Streams.Tests/Dsl/FlowLogSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,16 @@ public void A_Log_on_source_must_allow_configuring_log_levels_via_Attributes()
error.Message.ToString().Should().Be("[flow-6e] Upstream failed, cause: Akka.Streams.TestKit.TestException test");
}

[Fact]
public void A_Log_on_source_must_allow_configuring_log_levels_via_Method_argument()
{
Source.Single(42)
.Log("flow-6", logLevel: LogLevel.WarningLevel)
.RunWith(Sink.Ignore<int>(), Materializer);

LogProbe.ExpectMsg<Warning>().Message.ToString().Should().Be("[flow-6] Element: 42");
}

[Fact]
public void A_Log_on_Source_must_follow_supervision_strategy_when_Exception_thrown()
{
Expand Down
5 changes: 4 additions & 1 deletion src/core/Akka.Streams/Attributes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -430,8 +430,11 @@ public string GetNameOrDefault(string defaultIfNotFound = "unknown-operation")
/// <typeparam name="TAttr">TBD</typeparam>
/// <param name="attribute">TBD</param>
/// <returns>TBD</returns>
public bool Contains<TAttr>(TAttr attribute) where TAttr : IAttribute => _attributes.Contains(attribute);
[Obsolete("Use GetAttribute<TAttr>() instead")]
public bool Contains<TAttr>(TAttr attribute) where TAttr : IAttribute => _attributes.Any(a => a is TAttr);

public bool Contains<TAttr>() where TAttr : IAttribute => _attributes.Any(a => a is TAttr);

/// <summary>
/// Specifies the name of the operation.
/// If the name is null or empty the name is ignored, i.e. <see cref="None"/> is returned.
Expand Down
22 changes: 11 additions & 11 deletions src/core/Akka.Streams/Dsl/FlowOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2106,17 +2106,17 @@ public static Flow<TIn, TOut, TMat> InitialDelay<TIn, TOut, TMat>(this Flow<TIn,
/// </para>
/// Cancels when downstream cancels
/// </summary>
/// <typeparam name="TIn">TBD</typeparam>
/// <typeparam name="TOut">TBD</typeparam>
/// <typeparam name="TMat">TBD</typeparam>
/// <param name="flow">TBD</param>
/// <param name="name">TBD</param>
/// <param name="extract">TBD</param>
/// <param name="log">TBD</param>
/// <returns>TBD</returns>
public static Flow<TIn, TOut, TMat> Log<TIn, TOut, TMat>(this Flow<TIn, TOut, TMat> flow, string name, Func<TOut, object> extract = null, ILoggingAdapter log = null)
{
return (Flow<TIn, TOut, TMat>)InternalFlowOperations.Log(flow, name, extract, log);
/// <typeparam name="TIn">The input type</typeparam>
/// <typeparam name="TOut">The output type</typeparam>
/// <typeparam name="TMat">The materialized type</typeparam>
/// <param name="flow">The underlying graph</param>
/// <param name="name">The name of the <see cref="LogSource"/></param>
/// <param name="extract">Optional. Extract the content that will be captured by the logger</param>
/// <param name="log">Optional. Use an external logging adapter</param>
/// <param name="logLevel">Optional. The log level being logged. Defaults to <see cref="LogLevel.DebugLevel"/></param>
public static Flow<TIn, TOut, TMat> Log<TIn, TOut, TMat>(this Flow<TIn, TOut, TMat> flow, string name, Func<TOut, object> extract = null, ILoggingAdapter log = null, LogLevel logLevel = LogLevel.DebugLevel)
{
return (Flow<TIn, TOut, TMat>)InternalFlowOperations.Log(flow, name, extract, log, logLevel);
}

/// <summary>
Expand Down
18 changes: 9 additions & 9 deletions src/core/Akka.Streams/Dsl/Internal/InternalFlowOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2017,17 +2017,17 @@ public static IFlow<T, TMat> InitialDelay<T, TMat>(this IFlow<T, TMat> flow, Tim
/// </para>
/// Cancels when downstream cancels
/// </summary>
/// <typeparam name="T">TBD</typeparam>
/// <typeparam name="TMat">TBD</typeparam>
/// <param name="flow">TBD</param>
/// <param name="name">TBD</param>
/// <param name="extract">TBD</param>
/// <param name="log">TBD</param>
/// <returns>TBD</returns>
/// <typeparam name="T">The output type</typeparam>
/// <typeparam name="TMat">The materialized type</typeparam>
/// <param name="flow">The underlying graph</param>
/// <param name="name">The name of the <see cref="LogSource"/></param>
/// <param name="extract">Optional. Extract the content that will be captured by the logger</param>
/// <param name="log">Optional. Use an external logging adapter</param>
/// <param name="logLevel">Optional. The log level being logged. Defaults to <see cref="LogLevel.DebugLevel"/></param>
public static IFlow<T, TMat> Log<T, TMat>(this IFlow<T, TMat> flow, string name, Func<T, object> extract = null,
ILoggingAdapter log = null)
ILoggingAdapter log = null, LogLevel logLevel = LogLevel.DebugLevel)
{
return flow.Via(new Fusing.Log<T>(name, extract ?? Identity<T>(), log));
return flow.Via(new Fusing.Log<T>(name, extract ?? Identity<T>(), log, logLevel));
}

/// <summary>
Expand Down
28 changes: 16 additions & 12 deletions src/core/Akka.Streams/Dsl/SourceOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1984,18 +1984,22 @@ public static Source<TOut, TMat> InitialDelay<TOut, TMat>(this Source<TOut, TMat
/// </para>
/// Cancels when downstream cancels
/// </summary>
/// <typeparam name="TOut">TBD</typeparam>
/// <typeparam name="TMat">TBD</typeparam>
/// <param name="flow">TBD</param>
/// <param name="name">TBD</param>
/// <param name="extract">TBD</param>
/// <param name="log">TBD</param>
/// <returns>TBD</returns>
public static Source<TOut, TMat> Log<TOut, TMat>(this Source<TOut, TMat> flow, string name, Func<TOut, object> extract = null, ILoggingAdapter log = null)
{
return (Source<TOut, TMat>)InternalFlowOperations.Log(flow, name, extract, log);
}

/// <typeparam name="TOut">The output type</typeparam>
/// <typeparam name="TMat">The materialized type</typeparam>
/// <param name="flow">The underlying graph</param>
/// <param name="name">The name of the <see cref="LogSource"/></param>
/// <param name="extract">Optional. Extract the content that will be captured by the logger</param>
/// <param name="log">Optional. Use an external logging adapter</param>
/// <param name="logLevel">Optional. The log level being logged. Defaults to <see cref="LogLevel.DebugLevel"/></param>
public static Source<TOut, TMat> Log<TOut, TMat>(
this Source<TOut, TMat> flow,
string name, Func<TOut, object> extract = null,
ILoggingAdapter log = null,
LogLevel logLevel = LogLevel.DebugLevel)
{
return (Source<TOut, TMat>)InternalFlowOperations.Log(flow, name, extract, log, logLevel);
}

/// <summary>
/// Combine the elements of current flow and the given <see cref="Source{TOut,TMat}"/> into a stream of tuples.
/// <para>
Expand Down
19 changes: 11 additions & 8 deletions src/core/Akka.Streams/Implementation/Fusing/Ops.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3000,7 +3000,14 @@ public override void OnDownstreamFinish(Exception cause)

public override void PreStart()
{
_logLevels = _inheritedAttributes.GetAttribute(DefaultLogLevels);
if (_inheritedAttributes.Contains<Attributes.LogLevels>())
{
_logLevels = _inheritedAttributes.GetAttribute(DefaultLogLevels);
}
else
{
_logLevels = new Attributes.LogLevels(_stage._defaultLogLevel, _stage._defaultLogLevel, LogLevel.ErrorLevel);
}
if (_stage._adapter != null)
_log = _stage._adapter;
else
Expand All @@ -3017,18 +3024,14 @@ public override void PreStart()
private readonly string _name;
private readonly Func<T, object> _extract;
private readonly ILoggingAdapter _adapter;
private readonly LogLevel _defaultLogLevel;

/// <summary>
/// TBD
/// </summary>
/// <param name="name">TBD</param>
/// <param name="extract">TBD</param>
/// <param name="adapter">TBD</param>
public Log(string name, Func<T, object> extract, ILoggingAdapter adapter)
public Log(string name, Func<T, object> extract, ILoggingAdapter adapter, LogLevel defaultLogLevel)
{
_name = name;
_extract = extract;
_adapter = adapter;
_defaultLogLevel = defaultLogLevel;
}

// TODO more optimisations can be done here - prepare logOnPush function etc
Expand Down
Loading