Skip to content

Commit

Permalink
even more xml docs, nullability change in OT, parameter rename in Per…
Browse files Browse the repository at this point in the history
…sistence (#1699)
  • Loading branch information
marcinbudny authored Jul 4, 2022
1 parent 9017e6e commit e15c9f4
Show file tree
Hide file tree
Showing 11 changed files with 241 additions and 40 deletions.
12 changes: 8 additions & 4 deletions src/Proto.OpenTelemetry/OpenTelemetryDecorators.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ public OpenTelemetryRootContextDecorator(IRootContext context, ActivitySetup sen
=> _sendActivitySetup = (activity, message)
=> {
activity?.SetTag(ProtoTags.ActorType, "<None>");
sendActivitySetup(activity, message);
if(activity != null)
sendActivitySetup(activity, message);
};

private static string Source => "Root";
Expand Down Expand Up @@ -55,13 +56,15 @@ ActivitySetup receiveActivitySetup
activity?.SetTag(ProtoTags.ActorType, actorType);
activity?.SetTag(ProtoTags.ActorPID, self);
activity?.SetTag(ProtoTags.SenderPID, self);
sendActivitySetup(activity, message);
if(activity != null)
sendActivitySetup(activity, message);
};
_receiveActivitySetup = (activity, message) => {
activity?.SetTag(ProtoTags.ActorType, actorType);
activity?.SetTag(ProtoTags.ActorPID, self);
activity?.SetTag(ProtoTags.TargetPID, self);
receiveActivitySetup(activity, message);
if(activity != null)
receiveActivitySetup(activity, message);
};
}

Expand Down Expand Up @@ -208,7 +211,8 @@ internal static async Task Receive(string source, MessageEnvelope envelope, Acti
{
if (envelope.Sender != null) activity?.SetTag(ProtoTags.SenderPID, envelope.Sender.ToString());

receiveActivitySetup?.Invoke(activity, message);
if(activity != null)
receiveActivitySetup?.Invoke(activity, message);

await receive().ConfigureAwait(false);
}
Expand Down
14 changes: 12 additions & 2 deletions src/Proto.OpenTelemetry/OpenTelemetryMetricsExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,20 @@ namespace Proto.OpenTelemetry;

public static class OpenTelemetryMetricsExtensions
{
/// <summary>
/// Histogram buckets definition for request-like operations
/// </summary>
public static readonly double[] RequestLikeHistogramBoundaries =
{.002, .005, .01, .025, .05, .075, .1, .25, .5, .75, 1, 2.5, 5, 7.5, 10, 20, 30};

/// <summary>
/// Histogram buckets definition for queue length
/// </summary>
public static readonly double[] QueueLengthHistogramBoundaries =
{0, 1, 2, 5, 10, 20, 50, 100, 200, 500, 1000};

/// <summary>
/// Adds Proto.Actor meter to the MeterProviderBuilder
/// Adds Proto.Actor metrics to the <see cref="MeterProviderBuilder"/>
/// </summary>
/// <param name="builder"></param>
/// <param name="useRecommendedHistogramBoundaries">If true, views will be added for histogram metrics to specify recommended histogram boundaries.</param>
Expand Down Expand Up @@ -59,10 +65,14 @@ public static MeterProviderBuilder AddProtoActorInstrumentation(this MeterProvid
builder.AddView("protocluster_identity_get_with_global_lock_duration",
new ExplicitBucketHistogramConfiguration {Boundaries = RequestLikeHistogramBoundaries}
);

builder.AddView("protocluster_identity_try_acquire_lock_duration",
new ExplicitBucketHistogramConfiguration {Boundaries = RequestLikeHistogramBoundaries}
);

builder.AddView("protoremote_write_duration",
new ExplicitBucketHistogramConfiguration {Boundaries = RequestLikeHistogramBoundaries}
);
}

return builder;
Expand Down
27 changes: 17 additions & 10 deletions src/Proto.OpenTelemetry/OpenTelemetryTracingExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,25 @@

namespace Proto.OpenTelemetry;

public delegate void ActivitySetup(Activity? activity, object message);
/// <summary>
/// Customizes the activity based on a message being processed
/// <param name="activity">Activity to be customized</param>
/// <param name="message">Message being processed</param>
/// </summary>
public delegate void ActivitySetup(Activity activity, object message);

public static class OpenTelemetryTracingExtensions
{
public static TracerProviderBuilder AddProtoActorInstrumentation(this TracerProviderBuilder builder)
=> builder.AddSource(ProtoTags.ActivitySourceName);

/// <summary>
/// Setup OpenTelemetry send middleware & decorator.
/// Adds OpenTelemetry tracing to actors spawned with given <see cref="Props"/>. Incoming and outgoing messages will create new activities.
/// Ensures <see cref="Activity"/> context propagation via message headers.
/// </summary>
/// <param name="props">props.</param>
/// <param name="sendActivitySetup">provide a way inject send activity customization according to the message.</param>
/// <param name="receiveActivitySetup">provide a way inject receive activity customization according to the message.</param>
/// <param name="props"><see cref="Props"/> to instrument</param>
/// <param name="sendActivitySetup">Optional delegate to customize the <see cref="Activity"/> on message receive</param>
/// <param name="receiveActivitySetup">Optional delegate to customize the <see cref="Activity"/> on message send</param>
/// <returns>props</returns>
public static Props WithTracing(
this Props props,
Expand All @@ -31,7 +37,7 @@ public static Props WithTracing(
}

/// <summary>
/// Adds trace headers to the message envelope, to propagate trace context.
/// Adds trace headers to the message envelope, to propagate trace context.
/// </summary>
public static Sender OpenTelemetrySenderMiddleware(Sender next)
=> async (context, target, envelope) => {
Expand All @@ -46,11 +52,12 @@ public static Sender OpenTelemetrySenderMiddleware(Sender next)
};

/// <summary>
/// Setup OpenTelemetry send decorator around RootContext.
/// Adds OpenTelemetry tracing to messages sent through <see cref="IRootContext"/>. Sent messages will create new activities.
/// Ensures <see cref="Activity"/> context propagation via message headers.
/// </summary>
/// <param name="context">Root context</param>
/// <param name="sendActivitySetup">provide a way inject send activity customization according to the message.</param>
/// <returns>IRootContext</returns>
/// <param name="context"><see cref="IRootContext"/> to instrument</param>
/// <param name="sendActivitySetup">Optional delegate to customize the <see cref="Activity"/> on message send</param>
/// <returns></returns>
public static IRootContext WithTracing(this IRootContext context, ActivitySetup? sendActivitySetup = null)
{
sendActivitySetup ??= OpenTelemetryHelpers.DefaultSetupActivity!;
Expand Down
22 changes: 15 additions & 7 deletions src/Proto.OpenTelemetry/ProtoTags.cs
Original file line number Diff line number Diff line change
@@ -1,32 +1,40 @@
namespace Proto.OpenTelemetry;
using System.Diagnostics;

namespace Proto.OpenTelemetry;

/// <summary>
/// Proto.actor specific tags on the <see cref="Activity"/>
/// </summary>
public static class ProtoTags
{
/// <summary>
/// Activity source name for Proto.Actor created activities
/// </summary>
public const string ActivitySourceName = "Proto.Actor";

/// <summary>
/// GetType().Name on the message
/// GetType().Name on the message
/// </summary>
public const string MessageType = "proto.messagetype";

/// <summary>
/// Message destination
/// Message destination PID string representation
/// </summary>
public const string TargetPID = "proto.targetpid";

/// <summary>
/// Message origin
/// Message sender PID string representation
/// </summary>
public const string SenderPID = "proto.senderpid";

/// <summary>
/// Current actor PID, when applicable (equals TargetPID when this is a receive activity, or SenderId when this is a
/// sending activity)
/// Current actor PID string representation, when applicable (equals TargetPID when this is a receive activity, or SenderId when this is a
/// sending activity)
/// </summary>
public const string ActorPID = "proto.actorpid";

/// <summary>
/// Type of the current actor, when applicable
/// Type of the current actor, when applicable
/// </summary>
public const string ActorType = "proto.actortype";
}
60 changes: 54 additions & 6 deletions src/Proto.Persistence/IProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,72 @@

namespace Proto.Persistence;

/// <summary>
/// Abstraction for the snapshot storage
/// </summary>
public interface ISnapshotStore
{
Task<(object? Snapshot, long Index)> GetSnapshotAsync(string actorName);
/// <summary>
/// Gets the last available snapshot for the specified actor
/// </summary>
/// <param name="actorId">Unique actor identifier</param>
/// <returns>A tuple of (<see cref="Snapshot"/>, last event index included in the snapshot + 1)</returns>
Task<(object? Snapshot, long Index)> GetSnapshotAsync(string actorId);

Task PersistSnapshotAsync(string actorName, long index, object snapshot);
/// <summary>
/// Stores a new snapshot for the specified actor
/// </summary>
/// <param name="actorId">Unique actor identifier</param>
/// <param name="index">Last event included in the snapshot + 1</param>
/// <param name="snapshot">Snapshot to store</param>
/// <returns></returns>
Task PersistSnapshotAsync(string actorId, long index, object snapshot);

Task DeleteSnapshotsAsync(string actorName, long inclusiveToIndex);
/// <summary>
/// Deletes snapshots for the specified actor
/// </summary>
/// <param name="actorId">Unique actor identifier</param>
/// <param name="inclusiveToIndex">Index stored along the snapshot has to be &lt;= to the value in this parameter for the snapshot to be deleted</param>
/// <returns></returns>
Task DeleteSnapshotsAsync(string actorId, long inclusiveToIndex);
}

/// <summary>
/// Abstraction for event storage. Responsible for writing and retrieving event streams.
/// </summary>
public interface IEventStore
{
Task<long> GetEventsAsync(string actorName, long indexStart, long indexEnd, Action<object> callback);
/// <summary>
/// Gets a stream of events for particular actor
/// </summary>
/// <param name="actorId">Unique actor identifier</param>
/// <param name="indexStart">Index of the first event to get (inclusive)</param>
/// <param name="indexEnd">Index of the last event to get (inclusive)</param>
/// <param name="callback">A callback which should be called for each read event, in the order the events are stored</param>
/// <returns>Index of the last read event or -1 if none</returns>
Task<long> GetEventsAsync(string actorId, long indexStart, long indexEnd, Action<object> callback);

Task<long> PersistEventAsync(string actorName, long index, object @event);
/// <summary>
/// Writes an event to event stream of particular actor
/// </summary>
/// <param name="actorId">Unique actor identifier</param>
/// <param name="index">Expected index this event should be written at. This can be used for optimistic concurrency, although most providers don't do that</param>
/// <param name="event">Event to be written</param>
/// <returns>Index for the next event</returns>
Task<long> PersistEventAsync(string actorId, long index, object @event);

Task DeleteEventsAsync(string actorName, long inclusiveToIndex);
/// <summary>
/// Deletes events from actor's event stream starting with the oldest available, ending at provided index
/// </summary>
/// <param name="actorId">Unique actor identifier</param>
/// <param name="inclusiveToIndex">Inclusive index of the last event to delete</param>
/// <returns></returns>
Task DeleteEventsAsync(string actorId, long inclusiveToIndex);
}

/// <summary>
/// Abstraction for persistence provider
/// </summary>
public interface IProvider : IEventStore, ISnapshotStore
{
}
8 changes: 8 additions & 0 deletions src/Proto.Persistence/ISnapshotStrategy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,15 @@
// -----------------------------------------------------------------------
namespace Proto.Persistence;

/// <summary>
/// A strategy that decides at what points in time to take snapshots
/// </summary>
public interface ISnapshotStrategy
{
/// <summary>
/// Returns true if for given <see cref="PersistedEvent"/> a snapshot should be stored
/// </summary>
/// <param name="persistedEvent">Event being persisted along with its index</param>
/// <returns>True if snapshot should be stored</returns>
bool ShouldTakeSnapshot(PersistedEvent persistedEvent);
}
40 changes: 37 additions & 3 deletions src/Proto.Persistence/Messages.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
// -----------------------------------------------------------------------
namespace Proto.Persistence;

/// <summary>
/// Actor's persistent event
/// </summary>
public class Event
{
public Event(object data, long index)
Expand All @@ -13,24 +16,40 @@ public Event(object data, long index)
Index = index;
}

/// <summary>
/// Event data
/// </summary>
public object Data { get; }

/// <summary>
/// Event index
/// </summary>
public long Index { get; }
}

/// <summary>
/// Represents snapshot to be persisted
/// </summary>
public class PersistedSnapshot : Snapshot
{
public PersistedSnapshot(object state, long index) : base(state, index)
{
}
}

/// <summary>
/// Represents snapshot that is being recovered
/// </summary>
public class RecoverSnapshot : Snapshot
{
public RecoverSnapshot(object state, long index) : base(state, index)
{
}
}


/// <summary>
/// Wrapper for persistent snapshot
/// </summary>
public class Snapshot
{
public Snapshot(object state, long index)
Expand All @@ -39,25 +58,40 @@ public Snapshot(object state, long index)
Index = index;
}

/// <summary>
/// Snapshot data
/// </summary>
public object State { get; }

/// <summary>
/// Index of the last event included in the snapshot + 1
/// </summary>
public long Index { get; }
}



/// <summary>
/// Represents an event being recovered
/// </summary>
public class RecoverEvent : Event
{
public RecoverEvent(object data, long index) : base(data, index)
{
}
}

/// <summary>
/// Represents an event being replayed
/// </summary>
public class ReplayEvent : Event
{
public ReplayEvent(object data, long index) : base(data, index)
{
}
}

/// <summary>
/// Represents an event being stored
/// </summary>
public class PersistedEvent : Event
{
public PersistedEvent(object data, long index) : base(data, index)
Expand Down
Loading

0 comments on commit e15c9f4

Please sign in to comment.