Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

how to implement non-invertible user-defined aggregates? #130

Open
Ohyoukillkenny opened this issue Mar 2, 2020 · 2 comments
Open

how to implement non-invertible user-defined aggregates? #130

Ohyoukillkenny opened this issue Mar 2, 2020 · 2 comments

Comments

@Ohyoukillkenny
Copy link

In addition to Trill’s built-in aggregates (i.e., Count, Sum, Average, Max, Min, TopK), Trill provides a
framework for users to create their own custom aggregates by implementing the IAggregate interface.

public interface IAggregate<TInput, TState, TResult>
{
    Expression<Func<TState>> InitialState();
    Expression<Func<TState, long, TInput, TState>> Accumulate();
    Expression<Func<TState, long, TInput, TState>> Deaccumulate();
    Expression<Func<TState, TState, TState>> Difference();
    Expression<Func<TState, TResult>> ComputeResult();
}

However, it looks to me that Trill assumes the user-defined aggregates are invertible and associative, as it requires users to specify the Deaccumulate and the Difference methods.

My question is "Is there a way to implement an aggregate that is meant to be non-invertible in Trill?"

More specifically, how can we implement an operator such as finite impulse response (FIR) filtering that follows the computation as below:

  • input: a stream of signal measurements of the format {val: double, ts: int}, where for simplicity, ts = 1,2,3,4,... that represents the timestamp of the measurement, and val_{t} is the value of the measurement at the time t.
  • FIR parameters: [f1, f2, f3, f4, f5] (an array of doubles)
  • output: a stream of double values that are obtained by out_{t} = f1*val_{t-2} + f2*val_{t-1} + f3*val_{t} + f4*val_{t+1} + f5*val_{t+2}.

In general, the FIR filtering computes the dot product of FIR parameters and every five elements in the stream. As the first step, I think a hopping window shall be applied to the input stream. But I am stuck at designing a user-defined aggregate to address the remaining computations.

Can anyone help me to get through this problem? I will really appreciate it.

@arunkm
Copy link
Collaborator

arunkm commented Mar 20, 2020

You can start with a simple approach like this:

  1. Keep state as Deque < (ts, val) > (deque of tuple of ts, val)
  2. Accumulate : Add the entry to the state (or merge if same timestamp)
    optionally, optimize memory by removing older elements that would not contribute to the result.
  3. De-accumulate : Remove the entry from deque/state
    If you have optimized in Accumulate by reducing elements, you need to appropriately handle here.
  4. Difference : Difference the two deques/states.
  5. ComputeResult :
    a. Select K most recent items from the State/deque (where k is the required items for the FIR product).
    b. Compute out_{t} = f1*val_{t-2} + f2*val_{t-1} + f3*val_{t} with values obtained as above

Then check if you can optimize further by representing the state in a more concise (less memory) form.

@Ohyoukillkenny
Copy link
Author

You can start with a simple approach like this:

  1. Keep state as Deque < (ts, val) > (deque of tuple of ts, val)
  2. Accumulate : Add the entry to the state (or merge if same timestamp)
    optionally, optimize memory by removing older elements that would not contribute to the result.
  3. De-accumulate : Remove the entry from deque/state
    If you have optimized in Accumulate by reducing elements, you need to appropriately handle here.
  4. Difference : Difference the two deques/states.
  5. ComputeResult :
    a. Select K most recent items from the State/deque (where k is the required items for the FIR product).
    b. Compute out_{t} = f1*val_{t-2} + f2*val_{t-1} + f3*val_{t} with values obtained as above

Then check if you can optimize further by representing the state in a more concise (less memory) form.

Thanks for the reply, and it is very helpful, and I successfully implement the FIR operator according to your suggestion.
May I ask is there any other way to implement customized operators?
I find actually it is arduous to implement aggregators when the deaccumulate and the difference methods are not easy or "unnatural" to be implemented.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants