-
Notifications
You must be signed in to change notification settings - Fork 51
How To: Extend Forge in my Application
This page contains features that were built on top of Forge-integrated applications, utilizing the dynamic extensibility of Forge. Hopefully these notes can help other application owners implement similar features. New contributions to this page are always welcome!
A state persistence story is a necessary component for many applications. Let's walk through how a ServiceFabric application implemented state persistence on top of the IForgeDictionary.
ReliableForgeDictionary properties
/// <summary>
/// The ReliableForgeDictionary class defines the methods for accessing forge state.
/// An IReliableDictionary is used as a persistent backing store.
/// A ForgeDictionary is used as the locally cached backing store.
/// Note: The KeyPrefix should always precede the key when using the forgeStateTable to limit the scope to the current SessionId.
/// Note: This class uses byte[] as the hardcoded value type. All values will be Json serialized then serialized to byte[] before being set.
/// </summary>
public class ReliableForgeDictionary : IForgeDictionary
{
/// <summary>
/// The unique identifier for this session.
/// </summary>
public Guid SessionId { get; set; }
/// <summary>
/// The ServiceFabric ReliableDictionary holding the forge state.
/// Maps the string key to serialized byte[] value.
/// The key should always be preceded by the KeyPrefix. This ensures the scope of the table is limited to the current SessionId.
/// </summary>
private IReliableDictionary<string, byte[]> forgeStateTable;
/// <summary>
/// Service fabric transaction helper.
/// </summary>
private readonly IServiceFabricTransactionHelper serviceFabricTransactionHelper;
/// <summary>
/// The key prefix that should precede the keys when using the forgeStateTable.
/// This ensures the scope of the table is limited to the current SessionId.
/// </summary>
private string keyPrefix;
/// <summary>
/// The locally cached forge state.
/// Optimization to speed up GetValue by returning the cached object directly instead of deserializing the byte[] from persisted state.
/// </summary>
internal ForgeDictionary forgeStateCached;
- ServiceFabric uses IReliableDictionary (and other collections) to persist state. These behave similarly to Dictionary objects, but take transactions per row for reads and writes. This works great for holding Forge state since every tree walking session will be writing to different rows, no write transactions will overlap.
- byte[] is being used as the value type instead of object because ServiceFabric cannot persist dynamic object types well. Since Forge is persisting multiple data types in a single dictionary (string and ActionResponse), the value gets serialized to a byte[] for storing.
- Similar to ForgeDictionary, a keyPrefix is used to enforce individual sessions only have access to their session's state.
- A locally cached ForgeDictionary object is also used to avoid the costly deserialize on gets.
ReliableForgeDictionary methods
public async Task Set<T>(string key, T value)
{
// Convert the value into a serialized Json and then byte[] before setting the final value.
string serializedJson = JsonConvert.SerializeObject(value);
byte[] byteArray = Encoding.UTF8.GetBytes(serializedJson);
await this.serviceFabricTransactionHelper.RunInsideWriteTransactionAsync(
"SetForgeState",
async (tx) =>
{
await this.forgeStateTable.SetAsync(tx, this.keyPrefix + key, byteArray);
await this.forgeStateCached.Set<T>(key, value);
});
}
public async Task<T> GetValue<T>(string key)
{
// First attempt to get the value from the forgeStateCached table.
try
{
return await this.forgeStateCached.GetValue<T>(key);
}
catch
{
}
// Fall back to deserializing the value from persisted store.
ConditionalValue<byte[]> conditionalValue = default(ConditionalValue<byte[]>);
await this.serviceFabricTransactionHelper.RunInsideReadTransactionAsync(
"GetValueForgeState",
async (tx) =>
{
conditionalValue = await this.forgeStateTable.TryGetValueAsync(tx, this.keyPrefix + key);
});
// Convert byte[] into serialized Json and then deserialize to get the final value.
string serializedJson = Encoding.UTF8.GetString(conditionalValue.Value);
T finalValue = JsonConvert.DeserializeObject<T>(serializedJson);
// Add cache-missed value to cache.
await this.forgeStateCached.Set<T>(key, finalValue);
return finalValue;
}
public async Task<bool> RemoveKey(string key)
{
ConditionalValue<byte[]> removedRecord = default(ConditionalValue<byte[]>);
await this.serviceFabricTransactionHelper.RunInsideWriteTransactionAsync(
"RemoveKeyForgeState",
async (tx) =>
{
removedRecord = await this.forgeStateTable.TryRemoveAsync(tx, this.keyPrefix + key);
await this.forgeStateCached.RemoveKey(key);
});
return removedRecord.HasValue;
}
Helper class for handling ServiceFabric transactions.
public async Task RunInsideWriteTransactionAsync(string funcName, Func<ITransaction, Task> untransactedWriteActionAsync)
{
Stopwatch sw = new Stopwatch();
sw.Start();
try
{
using (ITransaction tx = this.stateManager.CreateTransaction())
{
await untransactedWriteActionAsync(tx);
await tx.CommitAsync();
}
this.instrumentation.EmitserviceFabricTransactionDuration(sw.GetElapsedMicroseconds(), "Write", funcName, "(none)");
}
catch (TimeoutException te)
{
this.instrumentation.EmitserviceFabricTransactionDuration(sw.GetElapsedMicroseconds(), "Write", funcName, te.GetType().Name);
this.TraceException("TransactionHelper::{0}: TimeoutException Received", funcName);
throw;
}
catch (Exception e)
{
this.instrumentation.EmitserviceFabricTransactionDuration(sw.GetElapsedMicroseconds(), "Write", funcName, e.GetType().Name);
this.TraceException("TransactionHelper::{0}: Exception Received: {1}", funcName, e.ToString());
throw;
}
}
public async Task<T> LoadTableAsync<T>(string tableName) where T : IReliableState
{
return await this.stateManager.GetOrAddAsync<T>(tableName);
}
public class ExampleApp
{
/// <summary>
/// The ServiceFabric ReliableDictionary holding the forge state.
/// Maps the string key to serialized byte[] value.
/// </summary>
private Task<IReliableDictionary<string, byte[]>> forgeStateTableTask;
public ExampleApp()
{
this.forgeStateTableTask = this.serviceFabricTransactionHelper.LoadTableAsync<IReliableDictionary<string, byte[]>>("ForgeStateTable");
}
public async Task<string> WalkTree()
{
IForgeDictionary forgeState = new ReliableForgeDictionary(await this.forgeStateTableTask, this.serviceFabricTransactionHelper, sessionId);
}
- The forgeStateTableTask contains the IReliableDictionary and is loaded asynchronously. This helps the App boot up quickly without having to block on each individual IReliableDictionary. This task is awaited when the value is needed.
Your application may want to support rate limiting. Perhaps you want to only allow a TreeNode to be visited 10 times per day. Or you want to only allow 5 concurrent requests at a time for a particular path. Here I'll lay out the high-level design of how Rate Limiting has been implemented in the example App.
"RateLimitExampleNode": {
"Type": "Leaf",
"Properties": {
"Notes": "RateLimits of RateLimitExampleNode is 50 times per day per resource",
"RateLimits": {
"0": {
"Expression": "50:1d:ResourceId",
"Block": true
}
}
}
}
TreeNode.Properties is a dynamic property that is passed from the ForgeSchema to the Callbacks and Actions, making it a great place to extend new features for your application. Here we are passing a RateLimits dictionary holding an Expression, Block, and other properties.
V1 Interface defined in ITreeWalkerCallback
public async Task BeforeVisitNode(
Guid sessionId,
string treeNodeKey,
dynamic properties,
dynamic userContext,
CancellationToken token)
{
bool isRateLimited = false;
bool isBlocked = false;
IDictionary<string, RateLimitObject> rateLimits = null;
// Try to get RateLimits if it exists in Properties.
if (properties != null)
{
IDictionary<string, dynamic> propertyValues = properties.ToObject<IDictionary<string, dynamic>>();
if (propertyValues.ContainsKey("RateLimits"))
{
rateLimits = propertyValues["RateLimits"].ToObject<IDictionary<string, RateLimitObject>>();
}
}
try
{
if (rateLimits != null)
{
// Check if rate limit was hit.
isRateLimited = await this.rateLimitManager.IsRateLimitHit(
treeNodeKey,
userContext.FaultObject);
...
}
}
...
}
V2 Interface defined in ITreeWalkerCallbackV2
/*
Basically, all input parameters are now included in TreeNodeContext.
There is an extended parameter CurrentNodeSkipActionContext. When
CurrentNodeSkipActionContext is set to non-null, the tree walker will
skip all actions defined in the current tree node, and proceed to
AfterVisitNode then ChildSelector.
Update this property inside BeforeVisitNode if you wish to use this
feature for the current tree node.
The string context is available to check in the current TreeNode's
ChildSelector via Session.GetCurrentNodeSkipActionContext().
*/
public async Task BeforeVisitNode(TreeNodeContext treeNodeContext)
{
// Populates local variable from the input treeNodeContext.
Guid sessionId = treeNodeContext.SessionId;
string treeNodeKey = treeNodeContext.TreeNodeKey;
dynamic properties = treeNodeContext.Properties;
ForgeUserContext userContext = (ForgeUserContext)treeNodeContext.UserContext;
string treeName = treeNodeContext.TreeName;
Guid requestIdentifier = treeNodeContext.RootSessionId;
CancellationToken token = treeNodeContext.Token;
bool isDropped = false;
bool isBlocked = false;
bool isSkippedAndGotoChildSelector = false; // When true, skip all actions in the current tree nodes, and move to the ChildSelector directly.
string resultMessage = string.Empty;
AnvilRequest anvilRequest = null;
token.ThrowIfCancellationRequested();
try
{
anvilRequest = userContext.AnvilRequest;
}
...
}
In BeforeVisitNode, we check if RateLimits dictionary exists, and if so, pass it to the RateLimitManager to check if the rate limit was hit. It does so by matching its persisted records against the incoming FaultObject, and checking if the given threshold has been reached.
The RateLimitManager is in charge of persisting RateLimitRecord objects and checking IsRateLimitHit. Here are some technologies utilized at a high-level.
- ServiceFabric - IReliableDictionary is used to persist RateLimitRecords. Each time a TreeNode is visited, BeforeVisitNode sends data to RateLimitManager about the event. This is translated into a RateLimitRecord object and persisted. Local caching is also used for quicker look-ups.
- LINQ - queries are generated based on the given Expression to query the RateLimitRecords table.
- Roslyn - is used to execute the generated LINQ queries. Caching is used here so Roslyn only needs to compile each unique query once, and can run it quickly thereafter.
The ForgeTree/ForgeSchema is highly dynamic, allowing users to input any object they wish in different properties, writing any C# code for Roslyn to evaluate. But.. what if you want some type-safety? How can you confirm the JSON schema doesn't have a typo or unexpected type?
Let's talk about ForgeSchemaValidationRules, and how it can be expanded to your specific needs using JSON Schema Validation.
The ForgeSchemaValidationRules.json file comes packaged with the Forge.TreeWalker Nuget package, and contains JSON schema validation rules that can be performed on any ForgeTree/ForgeSchema to ensure its validity. These rules basically define the ForgeTree data contract.
...
"ActionDefinition": {
"type": "object",
"properties": {
"Action": {
"type": "string"
},
"Input": {
"type": "object"
},
"Properties": {
"type": "object"
},
"Timeout": {
"type": [ "number", "string" ]
},
"ContinuationOnTimeout": {
"type": "boolean"
},
"RetryPolicy": {
"$ref": "#/definitions/RetryPolicy"
},
"ContinuationOnRetryExhaustion": {
"type": "boolean"
}
},
"additionalProperties": false,
"required": [ "Action" ]
}
...
Here is a simple UT that performs JSON Schema Validation using ForgeSchemaValidationRules.json on a test MyForgeSchema.json.
public void ValidateSchema_ForgeRules()
{
string schemaJSON = File.ReadAllText("MyForgeSchema.json");
string forgeRulesJSON = File.ReadAllText("ForgeSchemaValidationRules.json");
JObject schema = JObject.Parse(schemaJSON);
JSchema rules = JSchema.Parse(forgeRulesJSON);
IList<string> results;
bool isValid = schema.IsValid(rules, out results);
foreach (string message in results)
{
Console.WriteLine(message);
}
Assert.IsTrue(isValid);
}
ForgeSchemaValidationRules can be extended to suit your application's needs. In this example, we're strictly defining that only specified Actions can be included in the ForgeSchema with specified inputs.
"ActionDefinition": {
"oneOf": [
{
"allOf": [
{
"$ref": "file://ForgeSchemaValidationRules.json#/definitions/ActionDefinition"
},
{
"properties": {
"Action": {
"enum": ["CollectDiagnosticsAction"]
},
"Input": {
"type": "object",
"properties": {
"Command": {
"type": "string"
}
},
"additionalProperties": false,
"required": ["Command"]
}
},
"required": ["Action", "Input"]
}
]
}
...
]
}
- AllOf is used to run the base rules as well.
- CollectDiagnosticsAction name and Command input are called out explicitly.
- additionalProperties and required are set to ensure only the defined Inputs can be added.
Here is a simple UT that performs JSON Schema Validation using ExampleAppForgeSchemaValidationRules.json on a test MyForgeSchema.json.
public void ValidateSchema_ExampleAppForgeRules()
{
string schemaJSON = File.ReadAllText("MyForgeSchema.json");
string forgeRulesJSON = File.ReadAllText("ForgeSchemaValidationRules.json");
string exampleAppForgeRulesJSON = File.ReadAllText("ExampleAppForgeSchemaValidationRules.json");
JObject schema = JObject.Parse(schemaJSON);
JSchemaPreloadedResolver resolver = new JSchemaPreloadedResolver();
resolver.Add(new Uri("//ForgeSchemaValidationRules.json"), forgeRulesJSON);
JSchema rules = JSchema.Parse(exampleAppForgeRulesJSON, resolver);
IList<string> results;
bool isValid = schema.IsValid(rules, out results);
foreach (string message in results)
{
Console.WriteLine(message);
}
Assert.IsTrue(isValid);
}
There are some minor additional steps to link the two JSON schema validation files, but otherwise it is the same as the above UT.
See the BaseAction Inheritance section for details.
Good analytics are priceless. We've found the below pattern provides great logging for data analysis and debugging. The ForgeEditor (coming soon) can read these logs from Azure Data Explorer to provide analytics, such as a visualized heat-map over the tree schema.
Our ForgeEvents table in Azure Data Explorer has the following columns:
- SessionId
- SchemaIdentifier - unique id for each unique ForgeSchema file
- MessageTrigger - who is emitting this log
- TreeNodeKey
- TreeActionName
- TreeActionInput
- Properties
- TaskStatus - included when either an Action completed or the walk tree session completed
- Message - verbose string
The logs are emitted at the following MessageTriggers:
- OnBeforeWalkTree - in App
- OnBeforeVisitNode - ITreeWalkerCallbacks
- OnExecuteAction - ForgeActions
- OnAfterVisitNode - ITreeWalkerCallbacks
- OnAfterWalkTree - in App
- OnCleanUpWalkTree - in App