Skip to content

Monitoring changes to a time series

Doug Schmidt edited this page Jul 16, 2018 · 4 revisions

The Publish API Reference Guide describes a "changes since" pattern available for locations, time-series, rating-models, and field-visits. Browse to the Publish endpoint on your AQTS server to read the detailed online documentation for this feature.

See the TimeSeriesChangeMonitor project for a working code example

The TimeSeriesChangeMonitor project in the examples repo uses the "changes since" API pattern to monitor a configurable set of time-series for any changes. Running this tool and examining its source code can be helpful to understanding this "changes since" concept.

Changes-since concept

The idea behind the "changes since" pattern is to enable multiple clients to synchronize their state with the state of the AQTS server, without requiring AQTS to store any extra per-client information.

Instead, a client that wants to keep in sync with AQTS is required to perform these steps:

  1. Persist a "Token" value, a timestamp property available on many Publish response DTOs as NextToken.
  2. On the next request from AQTS, set the ChangesSinceToken request property to the persisted token value from the previous call. AQTS will only respond with changes that occurred since the token.
  3. The client processes all the changes in the response however it needs to
  4. Finally, update the persisted "token" value with the response.NextToken received in step 2.

If an error/exception occurs on the client during step 3, that's OK, since the client's persisted token has not yet been updated. The next time the client runs its synchronization step, it will simply supply the AQTS with a ChangesSinceToken value from the last previously successful sequence. No changed data will be skipped.

MonitorTimeSeries()

The MonitorTimeSeries() method below will monitor a time-series for changes, and log any newly changed points as they are detected.

Basic logic:

  • Use the GetTimeSeriesUniqueId() example method to the get the unique ID of the target time-series
  • Just persist a "changes since token" in memory (start from 2 hours ago)
  • Loop:
    • Issue a GET /GetTimeSeriesUniqueIdList request for the location that owns the time-series, adding the persisted token as the ChangesSinceToken value
    • Search the results for target time-series unique ID.
    • If the target time-series changed, issue a GET /TimeSeriesCorrectedData request to read/print the changed points
    • Sleep for minute
  • Throw an exception when something goes wrong
private void MonitorTimeSeries(string timeSeriesIdentifier)
{
    var uniqueId = GetTimeSeriesUniqueId(timeSeriesIdentifier);
    var location = ParseLocationIdentifier(timeSeriesIdentifier);
    var changesSinceTime = Instant.FromDateTimeUtc(DateTime.UtcNow).Minus(Duration.FromHours(2));
    var pollInterval = Duration.FromMinutes(1);

    while (true)
    {
        Console.WriteLine($"Polling for changes to {timeSeriesIdentifier} since {changesSinceTime} ...");
        var request = new TimeSeriesUniqueIdListServiceRequest
        {
            ChangesSinceToken = changesSinceTime.ToDateTimeUtc(),
            LocationIdentifier = location
        };
        var response = _client.Publish.Get(request);

        var nextToken = Instant.FromDateTimeUtc(response.NextToken ?? DateTime.UtcNow.AddHours(-1));
        changesSinceTime = nextToken;

        var hasTokenExpired = response.TokenExpired ?? false;

        if (hasTokenExpired)
        {
            Console.WriteLine($"The changes since token expired. Some data may have been missed. Resetting to '{response.NextToken}'");
            continue;
        }

        var targetTimeSeries =
            response.TimeSeriesUniqueIds.SingleOrDefault(t => t.UniqueId == uniqueId);

        if (targetTimeSeries?.FirstPointChanged != null)
        {
            var changedPoints = _client.Publish
                .Get(new TimeSeriesDataCorrectedServiceRequest
                {
                    TimeSeriesUniqueId = uniqueId,
                    QueryFrom = targetTimeSeries.FirstPointChanged.Value,
                    GetParts = "PointsOnly"
                })
                .Points;

            foreach (var point in changedPoints)
            {
                Console.WriteLine($"{point.Timestamp.DateTimeOffset} => {point.Value.Numeric}");
            }
        }

        Console.WriteLine($"Sleeping for {pollInterval} ...");
        Thread.Sleep(pollInterval.ToTimeSpan());
    }
}
Clone this wiki locally