Skip to content

Conversation

@cloud-fan
Copy link
Contributor

It's unefficient to drop memory blocks to disk inside a synchronized block as IO is slow. As the TODO says, we just need synchronize selecting blocks to be dropped. So my implementation is: in ensureFreeSpace, we iterate entries and select blocks to be dropped. But instead of dropping block inside ensureFreeSpace, we can just mark selected entries as dropping, and return these blocks, let the caller do the dropping. When other thread call ensureFreeSpace again, they will skip entries that marked as dropping when iterating entries. And the caller, tryToPut, will do the dropping before put the new block into entries. In this way, we can do dropping in parallel.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@mridulm
Copy link
Contributor

mridulm commented May 16, 2014

IMO this make things fragile.
First off, not MT safe.
Secondly does not handle corner cases - for example exception handling.

@cloud-fan
Copy link
Contributor Author

This is thread safe. tryToPut call ensureFreeSpace in a synchronized block, so there is only one thread can run ensureFreeSpace at the same time, which means each thread will select different to-be-dropped blocks. @mridulm could you point out where may case multi thread problem that I may missed?
About exception handling, It may happen that some entries are marked as dropping but no thread is dropping them. I will work on it.

@mridulm
Copy link
Contributor

mridulm commented May 17, 2014

Use of dropping is not my safe
On 17-May-2014 9:42 pm, "Wenchen Fan" notifications@github.com wrote:

This is thread safe. tryToPut call ensureFreeSpace in a synchronized
block, so there is only one thread can run ensureFreeSpace at the same
time, which means each thread will select different to-be-dropped blocks.
@mridulm https://github.com/mridulm could you point out where may case
multi thread problem that I may missed?
About exception handling, It may happen that some entries are marked as
dropping but no thread is dropping them. I will work on it.


Reply to this email directly or view it on GitHubhttps://github.com//pull/791#issuecomment-43411524
.

@cloud-fan
Copy link
Contributor Author

As far as I know, reasons for task failure may be: exception happens during task execution, Executor lost and relaunch, stage cancelled by user. But I'm not sure if I listed all the reasons. And I don't know the detail how spark relaunch Executor and cancel stage and how to handle these when dropping memory blocks. Is a try-catch enough for it? I want to reset the dropping flag if the task is terminated.

@mridulm
Copy link
Contributor

mridulm commented May 19, 2014

It should read MT safe - phone "autocorrected" it, sigh.

There could be any number of reasons for dropping block to fail (including disk issues, etc).
When it does, we should not have inconsistent state.

@tdas
Copy link
Contributor

tdas commented May 19, 2014

Can you please create a JIRA for this, and update the title of the PR.

@cloud-fan cloud-fan changed the title improve performance of MemoryStore#tryToPut by elimating unnecessary lock [SPARK-1888] enhance MEMORY_AND_DISK mode by dropping blocks in parallel May 20, 2014
@cloud-fan
Copy link
Contributor Author

@mridulm @tdas I have created a JIRA for this: https://issues.apache.org/jira/browse/SPARK-1888

@cloud-fan
Copy link
Contributor Author

@mridulm Sorry I may misunderstood you because of my poor english :(
Let me list things one by one so that we can make it clear

  1. Currently spark MEMORY_AND_DISK mode is slower than DISK_ONLY mode sometimes because of the lock on IO(dropping blocks)
  2. As the TODO says, the solution is: just synchronize the selecting of to-be-dropped blocks and do dropping in parallel
  3. My solution is fragile, but it works if nothing goes wrong
  4. My solution is not MT safe. For example, if a block is being dropped by one thread and another thread is trying to remove it, oops.
  5. There could be any number of reasons for dropping block to fail, but wouldn't be any KINDS of them. As far as I know, one is exception(including disk issue, etc), one is executor lost, one is stage cancelled.
    I do appreciate if you could discuss with me one by one as I listed above. Thanks!

@cloud-fan
Copy link
Contributor Author

As we know, memory store is used for add, read, remove blocks. Reading and removing is quite simple, so let's focus on adding.
Adding may trigger dropping action, as I said before, dropping flag can make each thread select different to-be-dropped blocks, so it's safe to do dropping in parallel.
When dropping and reading are processing at the same time, the entry is still there before dropping finished, so it's safe.
When dropping and removing are processing at the same time, if dropping finished first, then remove will fail and it's OK. If removing finished first, dropping thread will try to remove entry after write block into disk store, and this remove action will fail because the entry has been removed, then dropping thread will remove the block from disk store to cancel the dropping. You can find this logic in BlockManage#dropFromMemory.
So I think my solution is MT safe already.
And about task termination, one reason is self exception(like disk error, etc), another is killed by executor. Now I put dropping code in a try-catch and if catch any exception, reset the dropping flag of to-be-dropped blocks and throw that exception. I think this should work for handling corner cases.

@mridulm
Copy link
Contributor

mridulm commented May 20, 2014

It is not MT safe because the PR is checking/modifiying shared state (like dropping variable) in an unsafe manner.
I will comment in detail on the patch later today since I dont see to be conveying what I mean properly :-)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are modifying entry.dropping here - there is no gaurantee this change will be visible to other threads anytime soon.

@cloud-fan
Copy link
Contributor Author

@mridulm Thanks very much for your comment! I think a big difference is: earlier code call BlockManager#dropFromMemory within putLock, but now we call it in parallel, we have to check it carefully.
About the try-catch, I agree using finally would be better, I will work on it.
About the entry.getSize, I found I need pair.getValue twice, so I extract it into val entry = pair.getValue. Is that right?
By the way, what do you think of define the dropping flag as volatile?

@cloud-fan
Copy link
Contributor Author

ensureFreeSpace has 2 jobs. 1) iterate entries and select blocks to be dropped. 2) if to-be-dropped blocks can free enough space, mark them as dropping and return them to the caller.
ensureFreeSpace is called within putLock, so each thread will see the dropping flag modification(I will discuss flag resetting in exception handling later) and thus get different to-be-dropped blocks. And block reading don't need the dropping flag so no conflict there. Let's consider block removing and exception handling(reset dropping flag)
Job 1 of ensureFreeSpace(selecting) and removing are both synchronized by entries, so they must process by turn.
If a block is removed first, then everything is OK.
If a block is removed after Job 2 of ensureFreeSpace(marking) which is also synchronized by entries(in my modification), then the block will be dropped into disk and managed by diskStore, which I think is OK.
If a block is removed between selecting and marking, the marking will check if entry is null, so it's OK, too.
About exception handling, flag resetting is also synchronized by entries, so it won't process during selecting and marking.
If resetting happened before selecting, then selecting will be able to select these blocks and re-drop them.
If resetting happened after selecting, which means the selected to-be-dropped blocks won't include the resetted blocks, so there is no conflict.
Actually there are 3 place that write or read the dropping flag(selecting, marking and resetting) and they are all synchronized by entries, so I think we don't need to define the flag as volatile.

@mridulm
Copy link
Contributor

mridulm commented May 21, 2014

  • With the latest commit, the issue with dropping flag is gone - which is great.
  • There is a change of behavior w.r.t earlier code.
    Whether the earlier code was the way it was intentionally or accidentally, I am not sure - will let @mateiz or others comment.

Essentially there are a few things here :

a) What happens if existing block is re-added. Looks like this was probably handled earlier also ?
I went up the call tree a bit, and did not look like this was prevented : but maybe I missed it. Any comments @mateiz ?

b) What happens if same block is added in parallel by two threads.
If this was supported usecase, then the current PR breaks this - it is possible for first thread to add it, and second to evict it from memory in case it was not possible to host both two copies in memory (according to the free space computed).

@cloud-fan
Copy link
Contributor Author

@mridulm I checked the code of BlockManager#doPut.

val putBlockInfo = {
  val tinfo = new BlockInfo(level, tellMaster)
  // Do atomically !
  val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)

  if (oldBlockOpt.isDefined) {
    if (oldBlockOpt.get.waitForReady()) {
      logWarning("Block " + blockId + " already exists on this machine; not re-adding it")
      return updatedBlocks
    }

    // TODO: So the block info exists - but previous attempt to load it (?) failed.
    // What do we do now ? Retry on it ?
    oldBlockOpt.get
  } else {
    tinfo
  }
}

BlockManger will create a BlockInfo for the block to be added, and val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo), so if multi threads are adding same block, one thread will put the BlockInfo successfully and the other will fail and stop to put.

@tdas
Copy link
Contributor

tdas commented May 21, 2014

This seems really promising!! However, can you explain whether the following sequence of events is possible or not in ensureFreeSpace and tryToPut?

Both thread 1 and thread 2 wants to insert blocks of 100 bytes. Existing blocks include block A and block B of 100 bytes each, and the total capacity is 200 bytes. Next,

  • Thread 1 selects block A (not marked yet) and exits the entries.synchronized { // select }
  • Thread 2 selects block A as well (as it is not marked yet) and exits entries.synchronized { // select }
  • Thread 1 enters entries.synchronized { // mark } and marks block A to be dropped
  • Thread 2 also enters entries.synchronized { // mark } and marks block A to be dropped again (this seems to be possible since there is no double check to see whether each block has already been marked or not)
  • Thread 1 then drops Block A to disk
  • Thread 2 tries to drop Block A to disk as well, but since it is already dropped, no more action is taken.
  • Both threads think that 100 bytes have been cleared. Hence 2 x 100 bytes are inserted after dropping only 100 bytes.

Is this sequence possible?

@mridulm
Copy link
Contributor

mridulm commented May 21, 2014

@cloud-fan there are multiple calls to memoryStore to directly put a block - not just from external addition.
So looking at only doPut might not help ?

@mridulm
Copy link
Contributor

mridulm commented May 21, 2014

@tdas there is a dropping flag which prevents this.
Or did I misunderstand your query ?

@tdas
Copy link
Contributor

tdas commented May 21, 2014

@mridulm i may be missing something as well. Are you referring to the new dropping flag inside the case class Entry?

@mridulm
Copy link
Contributor

mridulm commented May 21, 2014

@tdas yes - thread 1 should set A's dropping to true; so thread 2 should not select it

@tdas
Copy link
Contributor

tdas commented May 21, 2014

@mridulm Is that so? Since selection and marking are occurring in different entries.synchronized blocks, selection and marking are not "atomic together". So two threads can select the same block, before marking that block.

@cloud-fan
Copy link
Contributor Author

@tdas you missed an important thing. trToPut call ensureFreeSpace within the putLock, so one thread have to wait another thread done both selection and marking, which means selection and marking are "atomic together" for adding block action.

@mridulm
Copy link
Contributor

mridulm commented May 21, 2014

@tdas as @cloud-fan stated, the code uses the implementation detail that the private method is always called within context of a tryToPut lock - and not called by anyone else. I dont like the fact that we have locking state spread out like this, but then this is how it was already I guess ...
Maybe we should at best annotate the method with a comment ? And possibly assert that it is within tryToPut lock ?

@cloud-fan
Copy link
Contributor Author

@tdas @mridulm what about we moving the putLock.synchronized into ensureFreeSpace and let tryToPut call ensureFreeSpace directly? I think it will be more clear this way.

@mridulm
Copy link
Contributor

mridulm commented May 21, 2014

@cloud-fan makes more sense.
Also, please rename it to something more appropriate (since it is not longer trying to put within that block !)

@tdas, can you also comment about the usecases/flows I mentioned above ?

@cloud-fan
Copy link
Contributor Author

@tdas I think we shouldn't synchronize on this. When one thread is running ensureFreeSpace, others should not get into ensureFreeSpace, but should be able to add and remove blocks. So using a putLock is better.
About the test, I haven't but I'm going to. Can spark-perf do this?

@cloud-fan
Copy link
Contributor Author

@mridulm I checked all caller of MemoryStore#putValues and putBytes via IDE, it shows only BlockManager will call them and with block info synchronized. So maybe we don't need to worry about putting same block in parallel?

@cloud-fan
Copy link
Contributor Author

@mridulm @tdas I have moved putLock.synchronized into ensureFreeSpace and rename this method to getToBeDroppedBlocks. And I also updated the scaladoc to explain this selection, marking, then dropping process. Please take a look to see if I missed something.
Can one of the admins ask @AmplabJenkins to run the unit_test? I want to make sure my PR doesn't break some basic functions...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of 'get', can you rename it to 'find' or some such ?

@cloud-fan
Copy link
Contributor Author

did a manual merge :)

@SparkQA
Copy link

SparkQA commented Sep 5, 2014

Can one of the admins verify this patch?

@andrewor14
Copy link
Contributor

@cloud-fan This is now outdated. There have been relatively significant changes that went into MemoryStore recently. Do you mind updating this to master?

@andrewor14
Copy link
Contributor

Actually, before you do that, have you looked at #2134, which seems to be doing something really similar on the new code?

@andrewor14
Copy link
Contributor

Let me raise the same question here that I raised in #2134. If my understanding is correct, by the time ensureFreeSpace returns we aren't guaranteed to have actually freed up the space requested, but we return "success" to the caller anyway. Doesn't this cause a potential race condition where we put a new block faster than the old block is dropped? And this is likely if we put the new block into memory but the old block onto disk. I haven't followed the details of the above conversation, but the consequence of this condition could be an OOM if we're already at the edge of the total memory.

@cloud-fan
Copy link
Contributor Author

First, ensureFreeSpace(I renamed it to findToBeDroppedBlocks) doesn't always return true. If it can't find enough to-be-dropped blocks to "free space", it will return false.
Then if ensureFreeSpace return true, you can regard it as return a Future that PROMISE you it will free a certain amount of memory after you finish that Future. At that moment, the freeMemory is not updated(means the memory has not been freed yet) until you finish the Future.
So if we put new blocks faster than the old blocks are dropped, ensureFreeSpace will return false(because there is not enough memory), then no to-be-dropped block is added, that thread will put that block into disk store.

@cloud-fan
Copy link
Contributor Author

Seems a big change has made to memory store, I will digest it and update my PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan @andrewor14
Hi cloud-fan, I think there will be some problem when you doesn't update the currentMemory. Assume there are two threads, the first one get the selectLock and finished running and release the lock, till now the currentMemory is not updated, then the second thread get the selectLock, the value of currentMemory for the second thread is the same with the first thread, so, the freeMemory=maxMemory-currentMemory is use for two times by the two threads. which means the selectedMemory for the second thread is smaller than it actually required.

@cloud-fan
Copy link
Contributor Author

Hi @liyezhang556520 , thanks for pointing this out! I have updated my PR, please review @andrewor14

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @cloud-fan , you removed accountingLock.synchronized here, so there will be more than one thread call planFreeSpace here for reserving memory. And each thread will asking for memory with size maxUnrollMemory - currentUnrollMemory. I think the logic is not the same with the original intention.

There is second question, what if maxUnrollMemory is large (maxMemory*unrollFraction might be dozens of GB large), while the requested memory amountToRequest is small (maybe dozens of MB), then you only use one thread to free the size, which is spaceToEnsure, this seems doesn't solve the IO issue.

Third, since you are lazy drop the to be dropped blocks, how can you avoid OOM which is @andrewor14 pointed out (the putting speed is faster than dropping)?

Does the three problems exists in the current patch? Maybe I missed something.

@cloud-fan
Copy link
Contributor Author

@liyezhang556520 Thanks for you comments. 1) yes, the logic is not the same with the original intention. I have updated my PR to fix this. 2) the origin logic to calculate spaceToEnsure is in consideration of single thread, I have updated that logic now. 3) I didn't drop blocks lazily. unrollSafely will call DroppingTask#runTask and wait it done.

@pwendell
Copy link
Contributor

This has mostly gone stale so I'd suggest we close this issue and revisit this later. This is a decent idea, but it does complicate things a good amount, and this particular piece of code IMO is already quite complicated. As with any performance change, it would be useful to quantify the performance problems observed as a result of this issue. For instance, has it been observed as a bottleneck in real clusters? Putting information of this type on the JIRA would be useful.

@liyezhang556520
Copy link
Contributor

@pwendell , I updated a design doc for SPARK-3000 several days ago which is also mainly to resolve the issue, There might have some performance problems in some case. You can have a look on this.

@cloud-fan cloud-fan closed this Nov 10, 2014
agirish pushed a commit to HPEEzmeral/apache-spark that referenced this pull request May 5, 2022
udaynpusa pushed a commit to mapr/spark that referenced this pull request Jan 30, 2024
mapr-devops pushed a commit to mapr/spark that referenced this pull request May 8, 2025
turboFei pushed a commit to turboFei/spark that referenced this pull request Nov 6, 2025
turboFei pushed a commit to turboFei/spark that referenced this pull request Nov 6, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants