Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EN Performance] Reuse ledger state for about -200GB peak RAM, -160GB disk i/o, and about -32 minutes duration #2792

Merged
merged 38 commits into from
Aug 2, 2022

Conversation

fxamacker
Copy link
Member

@fxamacker fxamacker commented Jul 13, 2022

EDIT: When deployed on August 24, 2022, this PR reduced peak RAM use by over 250GB (out of over 300GB total reduction). Initial estimate of -150GB was based on old, smaller checkpoint file. By August, checkpoint file grew substantially so memory savings were better. Duration is about 14 minutes today (Sep 12), it was 46-58 minutes in mid-August, and it was 12-17 hours in Dec 2021 depending on system load.

Avoid creating separate ledger state during checkpointing.

Closes #2286
Closes #2378 because this PR avoids reading 160GB checkpoint file (except during EN startup).
Updates #1744

Impact

Based on EN2 logs (July 8, 2022), this will

  • Reduce operational memory (peak RAM) by (very roughly) about 150GB on Grafana EN Memory Usage chart. Other charts showing smaller peaks will show relatively smaller reduction.
  • Reduce checkpoint duration by 24 mins (from 45 mins).
  • Reduce disk i/o by 160GB by not reading checkpoint file (except during EN startup).
  • Reduce memory allocations (TBD)

Ledger state is continuously growing larger, so memory & duration savings will be better than listed as each day passes.

Context

Recent increase in transactions is causing WAL files to get created more frequently, which causes checkpoints to happen more frequently, increases checkpoint file size, and increases ledger state size in memory.

File Size Checkpoint Frequency
Early 2022 53 GB 0-2 times per day
July 8, 2022 126 GB every 2 hours

Without PR #1944 the system checkpointing would currently be:

  • taking well over 20-30 hours each time, making it impossible to complete every 2 hours
  • requiring more operational RAM, making OOM crashes very frequent
  • creating billions more allocations and gc pressure, consuming CPU cycles and slowing down EN

After PR #1944 reduced Mtrie flattening and serialization phase to under 5 minutes (which sometimes took 17 hours on mainnet16), creating a separate MTrie state currently accounts for most of the duration and memory used by checkpointing. This opens up new possibilities such as reusing ledger state to significantly reduce duration and operational RAM of checkpointing again.

Design

Design goal is to reduce operational RAM, reduce allocations, and speed up checkpointing by not creating a separate ledger state.

To achieve these goals, this PR:

  • reuses tries from the main ledger state for checkpointing
  • avoids blocking with time-consuming tasks such as creating checkpoint

The Compactor:

  • receives trie updates and new trie created from the update
  • saves encoded updates in WAL segment
  • tracks active segment number returned by RecordUpdate()
  • starts checkpointing async when enough finalized segments are accumulated

NOTE: To reuse ledger tries for checkpointing, new tries must match its corresponding WAL update.

TODO:

  • add more tests
  • handle known edge cases
  • handle errors
  • add more tests related to concurrency (longer duration test runs on benchnet)
  • preliminary review of draft PR by @zhangchiqing
  • remove draft status early to get @m4ksio and @ramtinms reviews
  • preliminary review and approval of PR by @m4ksio
  • preliminary review and approval of PR by @ramtinms
  • incorporate feedback from preliminary reviews
  • add more tests
  • awesome suggestion by @ramtinms which should not be shortchanged "... run tests on benchnet by turning nodes off to make sure the logic is safe and no unknown case is there."
  • continue looking for edge cases we may have missed
  • cleanup code and follow best practices (e.g. document errors returned from functions, etc.)
  • request code reviews for non-urgent items (micro optimizations, improved design, etc.)

See TODOs listed in the source code for more details.

Avoid creating separate ledger state during checkpointing.

Based on EN2 logs (July 8, 2022), this will
- reduce operational memory (peak RAM) by at least 152GB
- reduce checkpoint duration by 24 mins (from 45 mins)
- reduce memory allocations (TBD)

Impact of this change will increase as ledger state grows larger,
so memory savings will be substantially better than 152GB this year.
@fxamacker fxamacker added Performance Execution Cadence Execution Team labels Jul 13, 2022
@fxamacker fxamacker self-assigned this Jul 13, 2022
@fxamacker fxamacker requested a review from AlexHentschel as a code owner July 13, 2022 15:36
@fxamacker fxamacker marked this pull request as draft July 13, 2022 15:50
@codecov-commenter
Copy link

codecov-commenter commented Jul 13, 2022

Codecov Report

Merging #2792 (f774617) into master (03634c1) will increase coverage by 0.08%.
The diff coverage is 72.64%.

@@            Coverage Diff             @@
##           master    #2792      +/-   ##
==========================================
+ Coverage   56.87%   56.96%   +0.08%     
==========================================
  Files         690      691       +1     
  Lines       62661    62930     +269     
==========================================
+ Hits        35638    35845     +207     
- Misses      24074    24124      +50     
- Partials     2949     2961      +12     
Flag Coverage Δ
unittests 56.96% <72.64%> (+0.08%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
cmd/execution_builder.go 0.00% <0.00%> (ø)
ledger/complete/wal/wal.go 56.66% <37.50%> (-0.83%) ⬇️
ledger/complete/mtrie/forest.go 63.49% <42.85%> (+0.70%) ⬆️
ledger/complete/compactor.go 71.20% <71.20%> (ø)
...execution-state-extract/execution_state_extract.go 53.53% <78.57%> (+2.39%) ⬆️
ledger/complete/ledger.go 63.03% <80.00%> (+2.43%) ⬆️
cmd/bootstrap/run/execution_state.go 72.72% <83.33%> (+1.75%) ⬆️
ledger/complete/wal/triequeue.go 100.00% <100.00%> (ø)
module/mempool/epochs/transactions.go 90.32% <0.00%> (-9.68%) ⬇️
consensus/hotstuff/eventloop/event_loop.go 73.46% <0.00%> (-1.37%) ⬇️
... and 5 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 03634c1...f774617. Read the comment docs.

fxamacker added 10 commits July 19, 2022 17:50
Improve concurrency, so goroutines don't close WAL file
before writes to it finish:
- Remove DiskWAL as standalone component in NodeBuilder.
- Make Ledger reponsible for starting DiskWAL.
- Make Ledger reponsible for shutting down DiskWAL to ensure that
  all WAL updates are completed before closing opened WAL segment.
- Make Compactor close a channel to signal to Ledger that all WAL
  updates are done during shutdown.
- Remove explicit DiskWAL shutdown when it is used by Ledger.
Retry checkpointing at next segment on
- failure to checkpoint
- failure to get tries snapshot
- failure to start checkpointing because
  previous checkpointing is still running

Add more logging.
observer.OnComplete() is called when Compactor starts shutting down,
which may close channel that observer.OnNext() uses to send data.
Handle ledger updates in parallel for checkpointing by making
WAL update and state update atomic through channels between
Ledger and Compactor.

In order to reuse ledger state for checkpointing, these two ops
must be atomic:
- WAL update (logging encoded update to WAL)
- state update (adding new trie to forest)

In other words, when Compactor starts processing new WAL update,
previous WAL update and state update are complete.
@@ -343,12 +343,6 @@ func (e *ExecutionNodeBuilder) LoadComponentsAndModules() {
}
return nil
}).
Component("Write-Ahead Log", func(node *NodeConfig) (module.ReadyDoneAware, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why moving this down?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DiskWAL needs to be a dependent component of Ledger because I found component shutdown is parallel (although it used to be in sequence).

<-update.DoneCh
}

func (c *Compactor) checkpoint(ctx context.Context, tries []*trie.MTrie, checkpointNum int) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checkpoint doesn't happen often. It worth to add logs about when checkpointing is started, and when it's finished, as well as when it's aborted or failed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The checkpoint logging is already done in createCheckpoint().

}

if checkpointNum > 0 {
for observer := range c.observers {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to place a lock before loop through the observers?

Is it possible that Subscribe or Unsubscribe is called concurrently which will modify the observers?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to place a lock before loop through the observers?

Is it possible that Subscribe or Unsubscribe is called concurrently which will modify the observers?

This code block is unchanged by this PR. I can look into this later.

Comment on lines +312 to +322
if len(checkpoints) > int(checkpointsToKeep) {
// if condition guarantees this never fails
checkpointsToRemove := checkpoints[:len(checkpoints)-int(checkpointsToKeep)]

for _, checkpoint := range checkpointsToRemove {
err := checkpointer.RemoveCheckpoint(checkpoint)
if err != nil {
return fmt.Errorf("cannot remove checkpoint %d: %w", checkpoint, err)
}
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to split into two functions, one to compute what files to remove, the other to remove by the specified files.

The first function would be easier to write unittest for.

Comment on lines 144 to 145
// Wait for Compactor to finish all trie updates before closing WAL component.
<-l.trieUpdateDoneCh
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this wait forever? What guarantees that eventually there will be a message pushed to this channel?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this wait forever? What guarantees that eventually there will be a message pushed to this channel?

No because Compactor closes trieUpdateDoneCh on shutdown.

Thanks for bringing this up 👍 , I think it would be safer to use defer to close trieUpdateDoneCh, so I pushed commit b8797cc.

Comment on lines 279 to 284
if l.trieUpdateCh == nil {
go func() {
_, _, err := l.wal.RecordUpdate(trieUpdate)
resultCh <- err
}()
} else {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the use case that we create a Ledger without the trieUpdateCh?

If the creator of a leader doesn't care about the trieUpdate, we can use a for-loop to ignore the updates. That would allow us to remove the if branch.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the use case that we create a Ledger without the trieUpdateCh?

It's easier to not require MockCompactor be running when testing Ledger and maybe others.

If the creator of a leader doesn't care about the trieUpdate, we can use a for-loop to ignore the updates. That would allow us to remove the if branch.

I'm not sure I understand. Can you elaborate? I'm not a fan of this if branch too.

Comment on lines 163 to 173
case checkpointResult := <-checkpointResultCh:
if checkpointResult.err != nil {
c.logger.Error().Err(checkpointResult.err).Msgf(
"compactor failed to checkpoint %d", checkpointResult.num,
)

// Retry checkpointing after active segment is finalized.
nextCheckpointNum = activeSegmentNum
}

case update, ok := <-c.trieUpdateCh:
Copy link
Member

@zhangchiqing zhangchiqing Jul 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to move the checkpoint related logic to a separate module, rather than mixing together here.

Basically, there are two pipelines:
The first pipeline, subscribes from trieUpdateCh and decide whether a segment has been created. Once created, it passes the segment to the next pipeline
The second pipeline, will subscribe the segment events, and decide whether to create a checkpoint. Since checkpointing takes time, if won't block the first pipeline from processing the trieUpdates, or passing the segment even if the checkpointing hasn't been done yet.

Correct me if wrong. The problem of the current implementation is that during checkpointing, we won't process trie update.

I think the compactor should just be a module to wire the two pipelines.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct me if wrong. The problem of the current implementation is that during checkpointing, we won't process trie update.

That is incorrect. Checkpointing is done in a new goroutine.

Compactor can process trie update while checkpointing is running (they execute in parallel). Also, semaphore makes sure only one checkpointing is running at a time.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically, there are two pipelines:
The first pipeline, subscribes from trieUpdateCh and decide whether a segment has been created. Once created, it passes the segment to the next pipeline
The second pipeline, will subscribe the segment events, and decide whether to create a checkpoint. Since checkpointing takes time, if won't block the first pipeline from processing the trieUpdates, or passing the segment even if the checkpointing hasn't been done yet.

Although I agree that there are two pipelines, I think they need to have different responsibilities from what you mentioned. The goal is to not hold on to tries in memory that don't need to be checkpointed.

The first pipeline: subscribes from trieUpdateCh, decides whether a segment has been created, and decides whether a checkpoint needs to be created. More importantly, it needs to take a snapshot of main ledger state at segment boundary when checkpointing is needed. Taking a snapshot requires communication with Ledger.Set to ensure that WAL update and ledger state update are in sync.

The second pipeline is to create checkpoint file with ledger state snapshot taken from previous pipeline. Checkpointing is done in a separate goroutine so it doesn't block the first pipeline.

Thoughts?

}

// Try to checkpoint
if checkpointSem.TryAcquire(1) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a second call is trying to acquire before the first call release it, would the second call get blocked and get resumed after the first call is done?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the second call would return without blocking. If it fails to acquire semaphore, it retries checkpointing when active segment is finalized.

Copy link
Member

@zhangchiqing zhangchiqing Jul 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. The lock is created outside of the for-loop.

Make Compactor shutdown use defer to close trieUpdateDoneCh.
Although it was closing trieUpdateDoneCh, using defer is better.
Comment on lines 115 to 116
// TrieUpdateDoneChan returns a channel which is closed when there are no more WAL updates.
// This is used to signal that it is safe to shutdown WAL component (closing opened WAL segment).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need extra channel? Can we just close the channel to indicate there is no more updates?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need extra channel? Can we just close the channel to indicate there is no more updates?

We use the extra channel because when trie update channel is closed, Compactor still needs to drain and process remaining trie update in the closed channel. Ledger is responsible for closing opened WAL segment, so TrieUpdateDoneChan is used to indicate all WAL writing activities are done in Compactor.

On the other hand, if Compactor is responsible for closing opened WAL segment, this extra communication isn't needed. Thanks for bringing this up 👍 , I'll look into it.

return newState, trieUpdate, nil
}

func (l *Ledger) set(trieUpdate *ledger.TrieUpdate) (newState ledger.State, err error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A block might have multiple collections, each collection will produce a trieUpdate.

If the Set method is called concurrently with multiple trieUpdate from different blocks, then it's possible that the different trieUpdate will be saved in different order in the WAL files. Would it be possible that they got saved into different WAL files?

For instance:

block_10_trie_update_for_collection_0 // saved to segment_30
block_10_trie_update_for_collection_1 // saved to segment_30
block_11_trie_update_for_collection_0 // saved to segment_30
---
block_10_trie_update_for_collection_2 // saved to segment_31
block_11_trie_update_for_collection_1 // saved to segment_31

The trie update for block 10 is spread across segment_30 and segment_31, however, segment_31 is damaged. In the end, we only have incomplete trie updates for block 10 from segment 30.

If we drop the damaged segment_31, and re-execute block 10, then we would produce trie updates for segment_30 again, do we allow redundancy for the trie updates?

block_10_trie_update_for_collection_0 // saved to segment_30
block_10_trie_update_for_collection_1 // saved to segment_30
block_11_trie_update_for_collection_0 // saved to segment_30
---
block_10_trie_update_for_collection_2 // saved to segment_31 (damaged)
block_11_trie_update_for_collection_1 // saved to segment_31 (damanged)
---
block_10_trie_update_for_collection_0 // saved to segment_31 (newly generated after re-executing block 10,)
block_10_trie_update_for_collection_1 // saved to segment_31 (but segment_30 has this trie update already,)
block_10_trie_update_for_collection_2 // saved to segment_31 (would it become a problem?)
block_11_trie_update_for_collection_0 // saved to segment_31
block_11_trie_update_for_collection_1 // saved to segment_31

Copy link
Member

@zhangchiqing zhangchiqing Jul 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the questions are:

  1. Is it OK for trie updates for the same block to be written into two different WAL segment file?
  2. Is EN able to tolerate incomplete trie updates for a block in a WAL file? If yes, how?
  3. Is EN able to tolerate duplicated trie updates in a WAL file?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel there are quite many edge cases when allowing trie updates for different blocks to be written to WAL concurrently. Those edge cases would cause incomplete trie updates and duplications in the WAL files.

I also wonder how does it work to load trie updates that are saved out of order.
For instance, say block 10 has two trie updates, the second update is based on a new node that is created by the first trie update. However, if the second trie update is written before the first trie update, how can we add the second trie update before the first? Wouldn't it fail because it the new node created by the first trie update doesn't exist yet?

cc @m4ksio

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel there are quite many edge cases when allowing trie updates for different blocks to be written to WAL concurrently. Those edge cases would cause incomplete trie updates and duplications in the WAL files.

I hope the meeting we had on Friday with @m4ksio resolved your concerns.

Checkpointing in this PR properly handles parallel trie updates. Changing or improving the use of concurrent trie updates outside of checkpointing is outside the scope of this PR.

Comment on lines 280 to 283
go func() {
_, _, err := l.wal.RecordUpdate(trieUpdate)
resultCh <- err
}()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does concurrent write to a disk WAL file work? Is it ultimately only one file writer writing at a time, and all other writers are blocked waiting?

If so, we might want to make WAL writing sequential, in order to ensure all trie updates of a block are written to a segment file before writing the next.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code you highlighted is existing code currently running in mainnet. With this PR, that block of code is only used in tests for Ledger, etc without having to run a MockCompactor. Getting rid of it is on my mind but it can be done later because it doesn't impact the main goal of this PR.

how does concurrent write to a disk WAL file work? Is it ultimately only one file writer writing at a time, and all other writers are blocked waiting?

Yes. wal package uses write lock so only one writing at a time.

If so, we might want to make WAL writing sequential, in order to ensure all trie updates of a block are written to a segment file before writing the next.

WAL writing is already sequential by sending trieUpdate over a channel so Compactor processes trieUpdate in sequence.

} else {
doneCh := make(chan struct{})
defer close(doneCh)
l.trieUpdateCh <- &WALTrieUpdate{Update: trieUpdate, ResultCh: resultCh, DoneCh: doneCh}
}

// are finalized for next checkpointing, retry checkpointing
// again when next segment is finalized.
// This avoids having more tries in memory than needed.
checkpointSem := semaphore.NewWeighted(1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don’t understand how this works. Every call will create a new semaphore object. Wouldn’t “TryAcquire” always return true? Don’t we need to share the semaphore object between different calls?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don’t understand how this works. Every call will create a new semaphore object.

No, that is incorrect. checkpointSem is initialized outside of goroutine's for loop which waits for and processes messages from channels. So it is created once and used inside for loop to limit one checkpointing at a time.

Wouldn’t “TryAcquire” always return true? Don’t we need to share the semaphore object between different calls?

No, TryAquire doesn't always return true. All checkpointings need to acquire the same semaphore. So this limits to one checkpointing at a time, and all other calls to TryAcquire returns false immediately (non-blocking).

resultCh <- err
}()
} else {
doneCh := make(chan struct{})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
doneCh := make(chan struct{})
// when checkpointing is triggered by the compactor, we call forest.GetTries() to
// get the last X tries to be checkpointed. However, Adding the trie to the forest
// and processing trieupdate happens in parallel, it's possible that when checkpointing
// is triggered, the trie has not been added to the forest yet, in this case the result of
// forest.GetTries will not include the trie from the trieupdate that triggers the checkpointing.
// In order to ensure forest has seen the trie, we make a done channel to block compactor from
// processing the next trie update, and wait until the trie has been added to the forest.
// Note, checkpointing does need the trie that triggers the checkpointing, but the previous trie,
// since we won't process the next trie until the previous trie has been processed and added to forest,
// it's guaranteed the GetTries will see the latest trie update up to the previous one.
doneCh := make(chan struct{})

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The 3 channels (trieUpdateCh, resultCh, doneCh) introduced some complexity. I had to add the above comments to explain why we need it and its behavior.

Here is some idea to simply by possibly getting rid of them.

The reason why we need so many channels

The reason we use the trieUpdate channel is to create a "message queue" for the compacter running on a different thread to subscribe from. And later we want to:

  1. wait until the compactor finish processing the trie update (have saved the trie update to WAL files)
  2. To ensure concurrent safety, if another set call happens concurrently, we want that call to be blocked until the first is done. This is achieved by introducing the doneCh. The compactor is running a for-select loop to consume each trieUpdate from the trieUpdateCh. When the compactor finishes writing to WAL, it won't process the next trieUpdate, instead it wait until the doneCh returns, which wait until the ledger writing to the forest. .

This means, if ledger.Set is called concurrently by two calls, one of the call will be blocked waiting until the first call is completed.

Will Ledger.Set be called concurrently?

Although we try to ensure ledger.Set is concurrent safe, the question is would ledger.Set be called concurrently at all? If not, we might be able to simplify it and still be concurrent safe.

ledger.Set is called by CommitDelta, and CommitDelta is called by CommitView, and CommitView is called the block committer that is processing from a channel one at a time.

In other word, even though ledger.Set needs to be concurrent-safe, it's actually called sequentially, and never called concurrently

Why ledger.Set is always called sequentially?

It actually makes sense for ledger.Set to be called sequentially, because Ledger works at trieUpdate level, and trieUpdate might depend on another trieUpdate in the same block, so a trieUpdate might not be able to add to the forest until the previous trieUpdate has been added.

However, Ledger doesn't have visibility on the dependencies between trieUpdate, so it's safe to just make ledger.Set to process trieUpdate sequentially even if called concurrently.

How to simply if ledger.Set is always called sequentially

Given that, if we want to make ledger.Set to process trieUpdate sequentially even if called concurrently, we probably don't have to use 3 channels here, we could simply use a RWMutex lock, and place a write lock when ledger.Set is called to block the second call. And then within the ledger.Set we can pass the trieUpdate to compactor and simply wait for it to finish processing it.

IMO, the behavior of a RWMutex lock is easier than the doneCh to understand, even though both can be used to ensure concurrent-safety.

And the reason we choose RWMutex lock instead of a Mutex lock, is that the Ledger.Get needs to answer script query while we are updating the ledger.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means, if ledger.Set is called concurrently by two calls, one of the call will be blocked waiting until the first call is completed.

No, that's not exactly how it works.

When Ledger.Set is called concurrently by two calls, both send trieUpdate message to Compactor non-blocking, and create new tries from updates concurrently. What is blocking is Compactor can only process one trieUpdate at a time, so the second Ledger.Set waits for Compactor to process its message in turn.

In other words, two Ledger.Set create new tries concurrently while updating WAL and adding new trie to ledger state are in sequence.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, the behavior of a RWMutex lock is easier than the doneCh to understand, even though both can be used to ensure concurrent-safety.

I understand what you're saying but adding RWMutex doesn't help because we need the extra channel based on discussions @m4ksio, you, and I had on Friday morning.

Based Friday's meeting, I replaced doneCh with trieCh to send new trie from Ledger.Set to Compactor. It is safer to use a FIFO queue to store tries for checkpointing in Compactor than taking a snapshot of main ledger state.

With this change, ledger.Set should communicate new trie to Compactor using the third channel.

One alternative approach to use 2 channels instead of 3 channels is:

  • sending both trieUpdate and new trie from Ledger.Set to Compactor in first channel
  • waiting for WAL update result in second channel

However, the alternative approach would take longer for updates so it isn't worthwhile.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right. Adding the trie update to forest could be concurrent. But the problem is it’s not safe. TrieUpdates could have dependency, and has to be applied sequentially if there is. For instance, if a block has two trieUpdates, the second update might build on a node created by the first update, adding them concurrently would fail.
Concurrent update could only work if the blocks of the updates are from different forks, and all the updates they depend on has been added to the forest.
We’ve been running this code for a long time, and never run into problem, is it because concurrent update actually worked? We actually concurrently adding updates to the forest? That’s why I checked the code and see whether Ledger.set could be called concurrently. Turns out, it’s always called sequentially, which means the concurrent update case never happens, and we never run into problem.

In other words, the ledger set is not safe to be called concurrently with two updates. Ledger has no protocol level knowledge, has no visibility of the dependencies between two updates, so it’s only safe to be called sequentially for now, which is what we do currently.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding the trie update to forest could be concurrent. But the problem is it’s not safe.

No that is incorrect.

In this PR creating new trie from update is concurrent. However, adding the new trie to forest is sequential (by use of channels).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For instance, if a block has two trieUpdates, the second update might build on a node created by the first update, adding them concurrently would fail.

No it won't fail because trie updates are in sequence for a block, so this case is handled properly.

Concurrent update could only work if the blocks of the updates are from different forks, and all the updates they depend on has been added to the forest.

Yes, @m4ksio confirmed in #flow-execution that concurrent trie updates only happen if the block forks happened.

CheckpointQueue is a fix-sized FIFO queue.  Compactor
stores new tries in CheckpointQueue and retrieves all
stored tries for checkpointing when needed.
return activeSegmentNum, -1, nil
}

// In the remaining code: nextCheckpointNum == prevSegmentNum
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that prevSegmentNum > nextCheckpointNum? Especially in the case when checkpoint triggers again before the previous checkpoint finishes? Just to double check.

If this is possible, we might want to use prevSegmentNum to trigger checkpoint. Either way, I think it's always safer to use prevSegmentNum than nextCheckpointNum.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that prevSegmentNum > nextCheckpointNum?

No. It is an invariant that nextCheckpointNum >= activeSegmentNum. Because prevSegmentNum is initialized to activeSegmentNum when active segment is finalized and becomes previous segment, therefore nextCheckpointNum >= prevSegmentNum.

Especially in the case when checkpoint triggers again before the previous checkpoint finishes?

When checkpoint triggers again before the previous checkpoint finishes, we fail to acquire semaphore and this block is executed so that checkpointing is retried when active segment is finalized.

c.logger.Info().Msgf("compactor delayed checkpoint %d because prior checkpointing is ongoing", nextCheckpointNum)
nextCheckpointNum = activeSegmentNum

Copy link
Member

@zhangchiqing zhangchiqing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work! Looks good to me.

fxamacker and others added 7 commits July 27, 2022 15:36
Rename CheckpointQueue to TrieQueue as suggested in PR review.

Since the name TrieQueue is not specific to checkpointing,
also moved it from complete package to wal package.

Putting it in the wal package makes it more clear TrieQueue
is specific to checkpointing and can prevent it from being
used for other purposes, getting modified, and introducing
side-effects to checkpointing.
Co-authored-by: Leo Zhang <zhangchiqing@gmail.com>
Co-authored-by: Leo Zhang <zhangchiqing@gmail.com>
Co-authored-by: Leo Zhang <zhangchiqing@gmail.com>
@fxamacker fxamacker changed the title [Execution Node] [WIP] Reuse ledger state in checkpoints for -152GB RAM and -24 minutes [Execution Node] Reuse ledger state in checkpoints for -152GB RAM and -24 minutes Jul 28, 2022
@fxamacker fxamacker merged commit a908e36 into master Aug 2, 2022
@fxamacker fxamacker deleted the fxamacker/reuse-mtrie-state-for-checkpointing-2 branch August 2, 2022 19:07
@fxamacker fxamacker changed the title [Execution Node] Reuse ledger state in checkpoints for -152GB RAM and -24 minutes [EN Performance] Reuse ledger state in checkpoints for -152GB RAM and -24 minutes Aug 9, 2022
@fxamacker fxamacker changed the title [EN Performance] Reuse ledger state in checkpoints for -152GB RAM and -24 minutes [EN Performance] Reuse ledger state to reduce peak RAM use by ~152+ GB and checkpoint duration by ~24 minutes Aug 10, 2022
@fxamacker fxamacker changed the title [EN Performance] Reuse ledger state to reduce peak RAM use by ~152+ GB and checkpoint duration by ~24 minutes [EN Performance] Reuse ledger state to reduce peak RAM use by ~150 GB and checkpoint duration by ~24 minutes Aug 12, 2022
@fxamacker fxamacker changed the title [EN Performance] Reuse ledger state to reduce peak RAM use by ~150 GB and checkpoint duration by ~24 minutes [EN Performance] Reuse ledger state for about -330GB peak RAM, -160GB disk i/o, and about -32 minutes duration Sep 8, 2022
@fxamacker fxamacker changed the title [EN Performance] Reuse ledger state for about -330GB peak RAM, -160GB disk i/o, and about -32 minutes duration [EN Performance] Reuse ledger state for about -200GB peak RAM, -160GB disk i/o, and about -32 minutes duration Sep 8, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Execution Cadence Execution Team Performance
Projects
None yet
5 participants