Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support State TTL #148

Closed
hzxa21 opened this issue Jan 18, 2022 · 7 comments
Closed

Support State TTL #148

hzxa21 opened this issue Jan 18, 2022 · 7 comments
Assignees

Comments

@hzxa21
Copy link
Collaborator

hzxa21 commented Jan 18, 2022

State TTL is needed:

  1. Old and unused state occupies storage space and may harm performance, which is cost inefficient. Letting users to issue delete on internal states is an overhead on their application and can be error-prone.
  2. TTL is required by some regulations (e.g. GDPR).

Things to consider:

  • Interface
  • Metadata
  • Choice of timestamp
  • Leverage compaction (using compaction filter)
@ghost ghost assigned ludics Jan 18, 2022
@skyzh
Copy link
Contributor

skyzh commented Jan 18, 2022

Why is this needed?

Old and unused state

How can we know a key belongs an unused state?

TTL is required by some regulations

If we know a table line will expire on a date, how can we notify the related executors to delete this line in their internal states?

@hzxa21
Copy link
Collaborator Author

hzxa21 commented Jan 18, 2022

  1. it is hard for application to GC states for streaming sources (expect for CDC) since user may not know exactly which rows to delete . User may only know states older than a certain time is not needed. There are serval ways to do the GC:
    1.1. User explicitly executes DROP MV and recreate the MV. This leaves the burden to the users and will complicate their application's design so I think it is not the right way to go.
    1.2. User provides a TTL configuration on the MV and our system takes care of the GC.
  2. after we introduce the concept of windowing, we will need to GC internal states (e.g. windowed join/aggregation states).

Haven't thought too deep on this topic but my gut feeling is 1) is a special case of 2). We will need careful design on how to support this without breaking consistency. For non-overlapping window, we can just inject a barrier to reset and delete operator states when the window ends. For overlapping window, we may need to introduce some timestamp and store it with the states, which may require TTL support in Hummock.

@hzxa21 hzxa21 changed the title Jan 18, 2022
@fuyufjh
Copy link
Member

fuyufjh commented Jan 19, 2022

In our current design, the time window is determined by some time column and the time column is nothing special but just a regular column, and nobody can guarantee the time column is stored with real-world time. For example, table order may have pay_time, ship_time and delivered_time and users could do a window aggregation on any one of these columns.

Generally, a table must not forget its data, otherwise, the data consistency is broken. The only correct way to 'clean' a row is to delete it from the source.

As a result, for now, I would suggest considering time windows and TTL as 2 different, unrelated things, even though they have some connections in some conditions.

@twocode
Copy link
Contributor

twocode commented Jan 24, 2022

State TTL's primary ability is to reduce storage size of long running streaming jobs, with a tradeoff of:

  1. Only process-time based TTL is supported initially (like current Flink).
  2. Might break consistency: if the stream job requires a state that has been TTL-ed, it won't get it.
  3. Theoretically, only TTLs significantly larger than window size and watermark can be "conditionally guaranteed" to be safe.
  4. Limited states are supported. If I read the code correctly, Flink Table API only supports TTL states for following executors:
twocode@twocode-pc:~/others/flink$ grep -rsni "getIdleStateRetention" . | grep "plan/node" | awk -F '[/:]' '{print $12"/"$13"/"$14"/"$15"/"$16}'
plan/nodes/exec/stream/StreamExecGlobalGroupAggregate.java
plan/nodes/exec/stream/StreamExecChangelogNormalize.java
plan/nodes/exec/stream/StreamExecIncrementalGroupAggregate.java
plan/nodes/exec/stream/StreamExecGroupTableAggregate.java
plan/nodes/exec/stream/StreamExecGroupAggregate.java
plan/nodes/exec/stream/StreamExecGroupAggregate.java
plan/nodes/exec/stream/StreamExecRank.java
plan/nodes/exec/common/CommonExecSink.java
  1. TTL is not exactly timely accurate: it will be fully deleted after the latest compactor gets to it after the TTL qual is met.

Implementation-wise, there are at least three options:

  1. Compaction filter with loose time. TTL is set with compute-node time, while compactors could run anywhere.
  2. StreamingMgr maintain a mapping from states to timestamp. When times up, delete the state through calling some storage interface like batchdelete().
  3. Hummock implicitly adds a timestamp column (process time) for incoming states. When times up, streaming mgr delete the state through calling some storage interface like deletebytimestamp().

Flink used to use timer based ttl for table states and hit many issues and migrate to storage TTLs instead. Apparently, # 1 is preferred.

@hzxa21
Copy link
Collaborator Author

hzxa21 commented Jan 25, 2022

Theoretically, only TTLs significantly larger than window size and watermark can be "conditionally guaranteed" to be safe.

Do we also want to support TTL for non-windowed states? Consider a non-windowed count, it is hard to implement TTL without breaking consistency because it has an implicit infinite window. Also at the very least we need to track count states by time so that we can get correct count in [t2, t3) after we TTL [t1, t2) states from [t1, t3)

  1. Compaction filter with loose time. TTL is set with compute-node time, while compactors could run anywhere.

IIUC, this also requires a timestamp column? The difference between 1) and 3) is 1) lets storage to TTL states on its own without cooperating with streaming.

@fuyufjh
Copy link
Member

fuyufjh commented Jan 25, 2022

Do we also want to support TTL for non-windowed states

Just to remind that we don't have a "windowed state" now. The reasons include:

  1. Windowed state requires users to group rows by event time, but we have not exposed event time as a column yet.
  2. Windowed state requires the source to guarantee no event older than watermark is sent, but we don't support watermark and sources does not guarantee this for now.

@twocode twocode transferred this issue from another repository Feb 8, 2022
@hzxa21
Copy link
Collaborator Author

hzxa21 commented Jul 22, 2022

duplicate with #3298

@hzxa21 hzxa21 closed this as not planned Won't fix, can't repro, duplicate, stale Jul 22, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants