Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Table restore error when the server has compacted the changelog-topic by segment time #176

Open
PJ-Schulz opened this issue Aug 9, 2021 · 3 comments · May be fixed by #373
Open

Table restore error when the server has compacted the changelog-topic by segment time #176

PJ-Schulz opened this issue Aug 9, 2021 · 3 comments · May be fixed by #373
Labels
bug Something isn't working duplicate This issue or pull request already exists

Comments

@PJ-Schulz
Copy link
Contributor

Hello

When I restart my application after a few time I get this error: [^---Recovery]: No event received for active tp TopicPartition(topic='philipp.match.04-matchtable-changelog', partition=0) since recovery start (started 33.43 seconds ago) on Recoverying for the Table.

Full traceback

[2021-08-05 14:11:10,553] [6] [INFO] [^---Recovery]: Highwater for active changelog partitions:
┌Highwater - Active─────────────────────┬───────────┬───────────┐
│ topic                                 │ partition │ highwater │
├───────────────────────────────────────┼───────────┼───────────┤
│ philipp.match.04-matchtable-changelog │ 0         │ 19477     │
└───────────────────────────────────────┴───────────┴───────────┘ 
[2021-08-05 14:11:10,589] [6] [INFO] [^---Recovery]: active offsets at start of reading:
┌Reading Starts At - Active─────────────┬───────────┬────────┐
│ topic                                 │ partition │ offset │
├───────────────────────────────────────┼───────────┼────────┤
│ philipp.match.04-matchtable-changelog │ 0         │ 17091  │
└───────────────────────────────────────┴───────────┴────────┘ 
[2021-08-05 14:11:10,596] [6] [INFO] [^---Recovery]: Restoring state from changelog topics... 
[2021-08-05 14:11:10,597] [6] [INFO] [^---Recovery]: Resuming flow... 
[2021-08-05 14:11:10,600] [6] [INFO] [^---Fetcher]: Starting... 
[2021-08-05 14:11:12,132] [6] [INFO] Fetch offset 17091 is out of range for partition TopicPartition(topic='philipp.match.04-matchtable-changelog', partition=0), resetting offset 
[2021-08-05 14:11:44,025] [6] [WARNING] [^---Recovery]: No event received for active tp TopicPartition(topic='philipp.match.04-matchtable-changelog', partition=0) since recovery start (started 33.43 seconds ago) 
[2021-08-05 14:11:49,028] [6] [WARNING] [^---Recovery]: No event received for active tp TopicPartition(topic='philipp.match.04-matchtable-changelog', partition=0) since recovery start (started 38.43 seconds ago) 
[2021-08-05 14:11:54,030] [6] [WARNING] [^---Recovery]: No event received for active tp TopicPartition(topic='philipp.match.04-matchtable-changelog', partition=0) since recovery start (started 43.43 seconds ago) 
[2021-08-05 14:11:59,031] [6] [WARNING] [^---Recovery]: No event received for active tp TopicPartition(topic='philipp.match.04-matchtable-changelog', partition=0) since recovery start (started 48.44 seconds ago) 

Steps to reproduce

I have set the segmenttime to 10 minutes, the retentiontime to 100 minuntes and the cleanup policy to compact,delete. I use Confluent Cloud as Kafka Service.

extra_topic_configs = {
    'segment.ms': 600000,  # 10 minutes
    'retention.ms': 6000000,  # 100 minutes
    'cleanup.policy': 'compact,delete'}

The error occurs when I switch off the app for about 30 minutes and then restart it.
When I wait much more than the retention time (for example 10 hours), the the error does not occur and the App starts with an empty Table. Just as I would expect.

In my opinion, the error comes from the fact that some messages are no longer available due to the segment time. The Kafka Service has already compacted and deleted several older messages with the same key. As a result, the offset 17091 is no longer available, but the changelog topic is not empty either, otherwise it would start with an empty table.

Expected behavior

Faust starts with the next available offset from the changelog-topic for Recovery

Actual behavior

If the changelog-topic is not empty, a warning is issued in an endless loop and the agent does not working.

Versions

  • Python version 3.7.9
  • Faust version 0.6.9

Has anyone of you had the same problem or a solution for me?

@wbarnha wbarnha added bug Something isn't working duplicate This issue or pull request already exists labels Aug 31, 2022
@bvanelli
Copy link

bvanelli commented Sep 2, 2022

Can confirm this error, here is the exact line where the error is raised:

Traceback (most recent call last):
  File ".\venv\lib\site-packages\mode\services.py", line 835, in _execute_task
    await task
  File ".\faust\tables\recovery.py", line 409, in _restart_recovery
    raise ConsistencyError(
faust.exceptions.ConsistencyError: The persisted offset for changelog topic partition TP(topic='ca-produced_process-changelog', partition=0) is higher
than the last offset in that topic (highwater) (524 > -1).
Most likely you have removed data from the topics without
removing the RocksDB database file for this partition.

The suggestion that the database file for this partition has to be removed is also wrong, as it does not solve the issue. Actually, it's printed regardless of if you use rocksdb or not.

Altering the consistency check from faust/tables/recovery.py line 402 to:

                if self.app.conf.recovery_consistency_check:
                    for tp in assigned_active_tps:
                        if (
                            active_offsets[tp]
                            and active_highwaters[tp]
                            and active_offsets[tp] > active_highwaters[tp]
                        ):
                            active_offsets[tp] = active_highwaters[tp]
                            continue

causes the offset to be later on reset. Not sure if the state retrieval then works properly.

[2022-09-02 13:16:34,842] [146628] [INFO] Fetch offset 524 is out of range for partition TopicPartition(topic='process-changelog', partition=0), resetting offset 

@elrik75
Copy link

elrik75 commented Sep 29, 2022

Same error for me. I found a workaround...

As you can confirm the active offset you have (17091 or 524) is one less than the first offset in Kafka.
You can check the first Kafka offset with this:

$ kafka-console-consumer.sh <boostrap> -topic <TOPIC NAME> --partition XX  --from-beginning --max-messages 1 --property print.offset=true
Offset: 17092 (or 525)

You should have 1 more that what Faust is expecting.

So then i search in the table/recovery.py code to find how it works and why it's not the good value.
Then I found this code:

    async def _build_offsets(
        self, consumer: ConsumerT, tps: Set[TP], destination: Counter[TP], title: str
    ) -> None:
        # -- Update offsets
        # Offsets may have been compacted, need to get to the recent ones
        earliest = await consumer.earliest_offsets(*tps)
        # FIXME To be consistent with the offset -1 logic
        earliest = {
            tp: offset - 1 if offset is not None else None
            for tp, offset in earliest.items()
        }

As you can see there is a FIXME comment and a offset - 1. If you removed the -1, you get this: tp: offset if offset is not None else None

Then restart your Faust worker and now it works fine.

BE CAREFUL: it worked for me but i'm not a Faust developer and I did it without really understand why this -1 stand for.
@wbarnha maybe you are able to understand why and maybe remove this -1?

edit: of course, the resulting code could be simplified by just not re-creating earliest.

@wbarnha
Copy link
Member

wbarnha commented Sep 29, 2022

tp: offset - 1 if offset is not None else None

has been around since Ask Solem originally committed it. I appreciate you finding the workaround because I'm quite eager to include this functionality into Faust. I'll start a PR and test further.

@wbarnha wbarnha linked a pull request Sep 29, 2022 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working duplicate This issue or pull request already exists
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants