-
-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix race condition(s) while unloading/loading tracks #2305
Changes from all commits
a7a1344
dc1302a
e0b94c3
89b5b40
16655dc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -56,12 +56,12 @@ CachingReader::CachingReader(QString group, | |
// The capacity of the back channel must be equal to the number of | ||
// allocated chunks, because the worker use writeBlocking(). Otherwise | ||
// the worker could get stuck in a hot loop!!! | ||
m_readerStatusFIFO(kNumberOfCachedChunksInMemory), | ||
m_readerStatus(INVALID), | ||
m_stateFIFO(kNumberOfCachedChunksInMemory), | ||
m_state(State::Idle), | ||
m_mruCachingReaderChunk(nullptr), | ||
m_lruCachingReaderChunk(nullptr), | ||
m_sampleBuffer(CachingReaderChunk::kSamples * kNumberOfCachedChunksInMemory), | ||
m_worker(group, &m_chunkReadRequestFIFO, &m_readerStatusFIFO) { | ||
m_worker(group, &m_chunkReadRequestFIFO, &m_stateFIFO) { | ||
|
||
m_allocatedCachingReaderChunks.reserve(kNumberOfCachedChunksInMemory); | ||
// Divide up the allocated raw memory buffer into total_chunks | ||
|
@@ -201,15 +201,42 @@ CachingReaderChunkForOwner* CachingReader::lookupChunkAndFreshen(SINT chunkIndex | |
} | ||
|
||
void CachingReader::newTrack(TrackPointer pTrack) { | ||
// Feed the track to the worker as soon as possible | ||
// to get ready while the reader switches its internal | ||
// state. There are no race conditions, because the | ||
// reader polls the worker. | ||
m_worker.newTrack(pTrack); | ||
m_worker.workReady(); | ||
// Don't accept any new read requests until the current | ||
// track has been unloaded and the new track has been | ||
// loaded. | ||
m_state = State::TrackLoading; | ||
// Free all chunks with sample data from the current track. | ||
freeAllChunks(); | ||
} | ||
|
||
void CachingReader::process() { | ||
ReaderStatusUpdate update; | ||
while (m_readerStatusFIFO.read(&update, 1) == 1) { | ||
while (m_stateFIFO.read(&update, 1) == 1) { | ||
DEBUG_ASSERT(m_state != State::Idle); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This one always fails after ejecting a track and loading a new one. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
While the reader is idle no update messages are expected! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The debug assertion is there for a reason. Idle means don't bother me with update messages or something is seriously wrong. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This might be true after your recent changes in the other PR, but this assumption is here wrong. Was this your assertion as well? |
||
auto pChunk = update.takeFromWorker(); | ||
if (pChunk) { | ||
// Result of a read request (with a chunk) | ||
DEBUG_ASSERT( | ||
update.status == CHUNK_READ_SUCCESS || | ||
update.status == CHUNK_READ_EOF || | ||
update.status == CHUNK_READ_INVALID || | ||
update.status == CHUNK_READ_DISCARDED); | ||
if (m_state == State::TrackLoading) { | ||
// All chunks have been freed before loading the next track! | ||
DEBUG_ASSERT(!m_mruCachingReaderChunk); | ||
DEBUG_ASSERT(!m_lruCachingReaderChunk); | ||
// Discard all results from pending read requests for the | ||
// previous track before the next track has been loaded. | ||
freeChunk(pChunk); | ||
continue; | ||
} | ||
DEBUG_ASSERT(m_state == State::TrackLoaded); | ||
if (update.status == CHUNK_READ_SUCCESS) { | ||
// Insert or freshen the chunk in the MRU/LRU list after | ||
// obtaining ownership from the worker. | ||
|
@@ -218,24 +245,24 @@ void CachingReader::process() { | |
// Discard chunks that don't carry any data | ||
freeChunk(pChunk); | ||
} | ||
} | ||
if (update.status == TRACK_NOT_LOADED) { | ||
m_readerStatus = update.status; | ||
} else if (update.status == TRACK_LOADED) { | ||
m_readerStatus = update.status; | ||
// Reset the max. readable frame index | ||
m_readableFrameIndexRange = update.readableFrameIndexRange(); | ||
// Free all chunks with sample data from a previous track | ||
freeAllChunks(); | ||
} | ||
if (m_readerStatus == TRACK_LOADED) { | ||
// Adjust the readable frame index range after loading or reading | ||
m_readableFrameIndexRange = intersect( | ||
m_readableFrameIndexRange, | ||
update.readableFrameIndexRange()); | ||
// Adjust the readable frame index range (if available) | ||
if (update.status != CHUNK_READ_DISCARDED) { | ||
m_readableFrameIndexRange = intersect( | ||
m_readableFrameIndexRange, | ||
update.readableFrameIndexRange()); | ||
} | ||
} else { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are we sure that we hit always the else branch? We need to receive status update without a chunk. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. This is caused by the coupled design. But I won't change that. The debug assertions clearly indicate that there are two kinds of updates:
|
||
// State update (without a chunk) | ||
DEBUG_ASSERT(!m_mruCachingReaderChunk); | ||
DEBUG_ASSERT(!m_lruCachingReaderChunk); | ||
if (update.status == TRACK_LOADED) { | ||
m_state = State::TrackLoaded; | ||
} else { | ||
DEBUG_ASSERT(update.status == TRACK_UNLOADED); | ||
m_state = State::Idle; | ||
} | ||
// Reset the readable frame index range | ||
m_readableFrameIndexRange = mixxx::IndexRange(); | ||
m_readableFrameIndexRange = update.readableFrameIndexRange(); | ||
} | ||
} | ||
} | ||
|
@@ -259,7 +286,7 @@ CachingReader::ReadResult CachingReader::read(SINT startSample, SINT numSamples, | |
} | ||
|
||
// If no track is loaded, don't do anything. | ||
if (m_readerStatus != TRACK_LOADED) { | ||
if (m_state != State::TrackLoaded) { | ||
return ReadResult::UNAVAILABLE; | ||
} | ||
|
||
|
@@ -456,7 +483,7 @@ CachingReader::ReadResult CachingReader::read(SINT startSample, SINT numSamples, | |
|
||
void CachingReader::hintAndMaybeWake(const HintVector& hintList) { | ||
// If no file is loaded, skip. | ||
if (m_readerStatus != TRACK_LOADED) { | ||
if (m_state != State::TrackLoaded) { | ||
return; | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -122,73 +122,89 @@ void CachingReaderWorker::run() { | |
} | ||
|
||
void CachingReaderWorker::loadTrack(const TrackPointer& pTrack) { | ||
ReaderStatusUpdate update; | ||
update.init(TRACK_NOT_LOADED); | ||
// Discard all pending read requests | ||
CachingReaderChunkReadRequest request; | ||
while (m_pChunkReadRequestFIFO->read(&request, 1) == 1) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is the fifo multi consumer aware? I think we need to move this into the run() method and the if (m_newTrackAvailable) { branch. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No and it doesn't have to be. loadTrack() is private and only invoked from run() within the same thread. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah yes, I misread and took it for newTrack() |
||
const auto update = ReaderStatusUpdate::readDiscarded(request.chunk); | ||
m_pReaderStatusFIFO->writeBlocking(&update, 1); | ||
} | ||
|
||
// Unload the track | ||
m_readableFrameIndexRange = mixxx::IndexRange(); | ||
m_pAudioSource.reset(); // Close open file handles | ||
|
||
if (!pTrack) { | ||
// Unload track | ||
m_pAudioSource.reset(); // Close open file handles | ||
m_readableFrameIndexRange = mixxx::IndexRange(); | ||
// If no new track is available then we are done | ||
const auto update = ReaderStatusUpdate::trackNotLoaded(); | ||
m_pReaderStatusFIFO->writeBlocking(&update, 1); | ||
return; | ||
} | ||
|
||
// Emit that a new track is loading, stops the current track | ||
emit(trackLoading()); | ||
emit trackLoading(); | ||
|
||
QString filename = pTrack->getLocation(); | ||
if (filename.isEmpty() || !pTrack->exists()) { | ||
kLogger.warning() | ||
<< m_group | ||
<< "File not found" | ||
<< filename; | ||
const auto update = ReaderStatusUpdate::trackNotLoaded(); | ||
m_pReaderStatusFIFO->writeBlocking(&update, 1); | ||
emit(trackLoadFailed( | ||
emit trackLoadFailed( | ||
pTrack, QString("The file '%1' could not be found.") | ||
.arg(QDir::toNativeSeparators(filename)))); | ||
.arg(QDir::toNativeSeparators(filename))); | ||
return; | ||
} | ||
|
||
mixxx::AudioSource::OpenParams config; | ||
config.setChannelCount(CachingReaderChunk::kChannels); | ||
m_pAudioSource = SoundSourceProxy(pTrack).openAudioSource(config); | ||
if (!m_pAudioSource) { | ||
m_readableFrameIndexRange = mixxx::IndexRange(); | ||
kLogger.warning() | ||
<< m_group | ||
<< "Failed to open file" | ||
<< filename; | ||
const auto update = ReaderStatusUpdate::trackNotLoaded(); | ||
m_pReaderStatusFIFO->writeBlocking(&update, 1); | ||
emit(trackLoadFailed( | ||
pTrack, QString("The file '%1' could not be loaded.").arg(filename))); | ||
emit trackLoadFailed( | ||
pTrack, QString("The file '%1' could not be loaded").arg(filename)); | ||
return; | ||
} | ||
|
||
const SINT tempReadBufferSize = m_pAudioSource->frames2samples(CachingReaderChunk::kFrames); | ||
if (m_tempReadBuffer.size() != tempReadBufferSize) { | ||
mixxx::SampleBuffer(tempReadBufferSize).swap(m_tempReadBuffer); | ||
} | ||
|
||
// Initially assume that the complete content offered by audio source | ||
// is available for reading. Later if read errors occur this value will | ||
// be decreased to avoid repeated reading of corrupt audio data. | ||
m_readableFrameIndexRange = m_pAudioSource->frameIndexRange(); | ||
|
||
update.init(TRACK_LOADED, nullptr, m_pAudioSource->frameIndexRange()); | ||
m_pReaderStatusFIFO->writeBlocking(&update, 1); | ||
|
||
// Clear the chunks to read list. | ||
CachingReaderChunkReadRequest request; | ||
while (m_pChunkReadRequestFIFO->read(&request, 1) == 1) { | ||
update.init(CHUNK_READ_INVALID, request.chunk); | ||
if (m_readableFrameIndexRange.empty()) { | ||
m_pAudioSource.reset(); // Close open file handles | ||
kLogger.warning() | ||
<< m_group | ||
<< "Failed to open empty file" | ||
<< filename; | ||
const auto update = ReaderStatusUpdate::trackNotLoaded(); | ||
m_pReaderStatusFIFO->writeBlocking(&update, 1); | ||
emit trackLoadFailed( | ||
pTrack, QString("The file '%1' is empty and could not be loaded").arg(filename)); | ||
return; | ||
} | ||
|
||
// Adjust the internal buffer | ||
const SINT tempReadBufferSize = | ||
m_pAudioSource->frames2samples(CachingReaderChunk::kFrames); | ||
if (m_tempReadBuffer.size() != tempReadBufferSize) { | ||
mixxx::SampleBuffer(tempReadBufferSize).swap(m_tempReadBuffer); | ||
} | ||
|
||
const auto update = | ||
ReaderStatusUpdate::trackLoaded(m_readableFrameIndexRange); | ||
m_pReaderStatusFIFO->writeBlocking(&update, 1); | ||
|
||
// Emit that the track is loaded. | ||
const SINT sampleCount = | ||
CachingReaderChunk::frames2samples( | ||
m_pAudioSource->frameLength()); | ||
emit(trackLoaded(pTrack, m_pAudioSource->sampleRate(), sampleCount)); | ||
m_readableFrameIndexRange.length()); | ||
emit trackLoaded(pTrack, m_pAudioSource->sampleRate(), sampleCount); | ||
} | ||
|
||
void CachingReaderWorker::quitWait() { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we set this bit before touching the worker?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not needed. The worker thread is not intercepting the reader. The reader polls for updates. But if it helps to understand what is happening I will reorder the instructions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
...it is even desired to inform the worker asap. A new comment explains why.