Skip to content

Commit

Permalink
Merge pull request #2305 from uklotzde/2.2_cachingreader_reload_tracks
Browse files Browse the repository at this point in the history
Fix race condition(s) while unloading/loading tracks
  • Loading branch information
daschuer authored Sep 30, 2019
2 parents 95dde5d + 16655dc commit cd871db
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 56 deletions.
71 changes: 49 additions & 22 deletions src/engine/cachingreader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,12 @@ CachingReader::CachingReader(QString group,
// The capacity of the back channel must be equal to the number of
// allocated chunks, because the worker use writeBlocking(). Otherwise
// the worker could get stuck in a hot loop!!!
m_readerStatusFIFO(kNumberOfCachedChunksInMemory),
m_readerStatus(INVALID),
m_stateFIFO(kNumberOfCachedChunksInMemory),
m_state(State::Idle),
m_mruCachingReaderChunk(nullptr),
m_lruCachingReaderChunk(nullptr),
m_sampleBuffer(CachingReaderChunk::kSamples * kNumberOfCachedChunksInMemory),
m_worker(group, &m_chunkReadRequestFIFO, &m_readerStatusFIFO) {
m_worker(group, &m_chunkReadRequestFIFO, &m_stateFIFO) {

m_allocatedCachingReaderChunks.reserve(kNumberOfCachedChunksInMemory);
// Divide up the allocated raw memory buffer into total_chunks
Expand Down Expand Up @@ -201,15 +201,42 @@ CachingReaderChunkForOwner* CachingReader::lookupChunkAndFreshen(SINT chunkIndex
}

void CachingReader::newTrack(TrackPointer pTrack) {
// Feed the track to the worker as soon as possible
// to get ready while the reader switches its internal
// state. There are no race conditions, because the
// reader polls the worker.
m_worker.newTrack(pTrack);
m_worker.workReady();
// Don't accept any new read requests until the current
// track has been unloaded and the new track has been
// loaded.
m_state = State::TrackLoading;
// Free all chunks with sample data from the current track.
freeAllChunks();
}

void CachingReader::process() {
ReaderStatusUpdate update;
while (m_readerStatusFIFO.read(&update, 1) == 1) {
while (m_stateFIFO.read(&update, 1) == 1) {
DEBUG_ASSERT(m_state != State::Idle);
auto pChunk = update.takeFromWorker();
if (pChunk) {
// Result of a read request (with a chunk)
DEBUG_ASSERT(
update.status == CHUNK_READ_SUCCESS ||
update.status == CHUNK_READ_EOF ||
update.status == CHUNK_READ_INVALID ||
update.status == CHUNK_READ_DISCARDED);
if (m_state == State::TrackLoading) {
// All chunks have been freed before loading the next track!
DEBUG_ASSERT(!m_mruCachingReaderChunk);
DEBUG_ASSERT(!m_lruCachingReaderChunk);
// Discard all results from pending read requests for the
// previous track before the next track has been loaded.
freeChunk(pChunk);
continue;
}
DEBUG_ASSERT(m_state == State::TrackLoaded);
if (update.status == CHUNK_READ_SUCCESS) {
// Insert or freshen the chunk in the MRU/LRU list after
// obtaining ownership from the worker.
Expand All @@ -218,24 +245,24 @@ void CachingReader::process() {
// Discard chunks that don't carry any data
freeChunk(pChunk);
}
}
if (update.status == TRACK_NOT_LOADED) {
m_readerStatus = update.status;
} else if (update.status == TRACK_LOADED) {
m_readerStatus = update.status;
// Reset the max. readable frame index
m_readableFrameIndexRange = update.readableFrameIndexRange();
// Free all chunks with sample data from a previous track
freeAllChunks();
}
if (m_readerStatus == TRACK_LOADED) {
// Adjust the readable frame index range after loading or reading
m_readableFrameIndexRange = intersect(
m_readableFrameIndexRange,
update.readableFrameIndexRange());
// Adjust the readable frame index range (if available)
if (update.status != CHUNK_READ_DISCARDED) {
m_readableFrameIndexRange = intersect(
m_readableFrameIndexRange,
update.readableFrameIndexRange());
}
} else {
// State update (without a chunk)
DEBUG_ASSERT(!m_mruCachingReaderChunk);
DEBUG_ASSERT(!m_lruCachingReaderChunk);
if (update.status == TRACK_LOADED) {
m_state = State::TrackLoaded;
} else {
DEBUG_ASSERT(update.status == TRACK_UNLOADED);
m_state = State::Idle;
}
// Reset the readable frame index range
m_readableFrameIndexRange = mixxx::IndexRange();
m_readableFrameIndexRange = update.readableFrameIndexRange();
}
}
}
Expand All @@ -259,7 +286,7 @@ CachingReader::ReadResult CachingReader::read(SINT startSample, SINT numSamples,
}

// If no track is loaded, don't do anything.
if (m_readerStatus != TRACK_LOADED) {
if (m_state != State::TrackLoaded) {
return ReadResult::UNAVAILABLE;
}

Expand Down Expand Up @@ -456,7 +483,7 @@ CachingReader::ReadResult CachingReader::read(SINT startSample, SINT numSamples,

void CachingReader::hintAndMaybeWake(const HintVector& hintList) {
// If no file is loaded, skip.
if (m_readerStatus != TRACK_LOADED) {
if (m_state != State::TrackLoaded) {
return;
}

Expand Down
9 changes: 7 additions & 2 deletions src/engine/cachingreader.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ class CachingReader : public QObject {
// Thread-safe FIFOs for communication between the engine callback and
// reader thread.
FIFO<CachingReaderChunkReadRequest> m_chunkReadRequestFIFO;
FIFO<ReaderStatusUpdate> m_readerStatusFIFO;
FIFO<ReaderStatusUpdate> m_stateFIFO;

// Looks for the provided chunk number in the index of in-memory chunks and
// returns it if it is present. If not, returns nullptr. If it is present then
Expand All @@ -151,7 +151,12 @@ class CachingReader : public QObject {
// Gets a chunk from the free list, frees the LRU CachingReaderChunk if none available.
CachingReaderChunkForOwner* allocateChunkExpireLRU(SINT chunkIndex);

ReaderStatus m_readerStatus;
enum class State {
Idle,
TrackLoading,
TrackLoaded,
};
State m_state;

// Keeps track of all CachingReaderChunks we've allocated.
QVector<CachingReaderChunkForOwner*> m_chunks;
Expand Down
68 changes: 42 additions & 26 deletions src/engine/cachingreaderworker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,73 +122,89 @@ void CachingReaderWorker::run() {
}

void CachingReaderWorker::loadTrack(const TrackPointer& pTrack) {
ReaderStatusUpdate update;
update.init(TRACK_NOT_LOADED);
// Discard all pending read requests
CachingReaderChunkReadRequest request;
while (m_pChunkReadRequestFIFO->read(&request, 1) == 1) {
const auto update = ReaderStatusUpdate::readDiscarded(request.chunk);
m_pReaderStatusFIFO->writeBlocking(&update, 1);
}

// Unload the track
m_readableFrameIndexRange = mixxx::IndexRange();
m_pAudioSource.reset(); // Close open file handles

if (!pTrack) {
// Unload track
m_pAudioSource.reset(); // Close open file handles
m_readableFrameIndexRange = mixxx::IndexRange();
// If no new track is available then we are done
const auto update = ReaderStatusUpdate::trackNotLoaded();
m_pReaderStatusFIFO->writeBlocking(&update, 1);
return;
}

// Emit that a new track is loading, stops the current track
emit(trackLoading());
emit trackLoading();

QString filename = pTrack->getLocation();
if (filename.isEmpty() || !pTrack->exists()) {
kLogger.warning()
<< m_group
<< "File not found"
<< filename;
const auto update = ReaderStatusUpdate::trackNotLoaded();
m_pReaderStatusFIFO->writeBlocking(&update, 1);
emit(trackLoadFailed(
emit trackLoadFailed(
pTrack, QString("The file '%1' could not be found.")
.arg(QDir::toNativeSeparators(filename))));
.arg(QDir::toNativeSeparators(filename)));
return;
}

mixxx::AudioSource::OpenParams config;
config.setChannelCount(CachingReaderChunk::kChannels);
m_pAudioSource = SoundSourceProxy(pTrack).openAudioSource(config);
if (!m_pAudioSource) {
m_readableFrameIndexRange = mixxx::IndexRange();
kLogger.warning()
<< m_group
<< "Failed to open file"
<< filename;
const auto update = ReaderStatusUpdate::trackNotLoaded();
m_pReaderStatusFIFO->writeBlocking(&update, 1);
emit(trackLoadFailed(
pTrack, QString("The file '%1' could not be loaded.").arg(filename)));
emit trackLoadFailed(
pTrack, QString("The file '%1' could not be loaded").arg(filename));
return;
}

const SINT tempReadBufferSize = m_pAudioSource->frames2samples(CachingReaderChunk::kFrames);
if (m_tempReadBuffer.size() != tempReadBufferSize) {
mixxx::SampleBuffer(tempReadBufferSize).swap(m_tempReadBuffer);
}

// Initially assume that the complete content offered by audio source
// is available for reading. Later if read errors occur this value will
// be decreased to avoid repeated reading of corrupt audio data.
m_readableFrameIndexRange = m_pAudioSource->frameIndexRange();

update.init(TRACK_LOADED, nullptr, m_pAudioSource->frameIndexRange());
m_pReaderStatusFIFO->writeBlocking(&update, 1);

// Clear the chunks to read list.
CachingReaderChunkReadRequest request;
while (m_pChunkReadRequestFIFO->read(&request, 1) == 1) {
update.init(CHUNK_READ_INVALID, request.chunk);
if (m_readableFrameIndexRange.empty()) {
m_pAudioSource.reset(); // Close open file handles
kLogger.warning()
<< m_group
<< "Failed to open empty file"
<< filename;
const auto update = ReaderStatusUpdate::trackNotLoaded();
m_pReaderStatusFIFO->writeBlocking(&update, 1);
emit trackLoadFailed(
pTrack, QString("The file '%1' is empty and could not be loaded").arg(filename));
return;
}

// Adjust the internal buffer
const SINT tempReadBufferSize =
m_pAudioSource->frames2samples(CachingReaderChunk::kFrames);
if (m_tempReadBuffer.size() != tempReadBufferSize) {
mixxx::SampleBuffer(tempReadBufferSize).swap(m_tempReadBuffer);
}

const auto update =
ReaderStatusUpdate::trackLoaded(m_readableFrameIndexRange);
m_pReaderStatusFIFO->writeBlocking(&update, 1);

// Emit that the track is loaded.
const SINT sampleCount =
CachingReaderChunk::frames2samples(
m_pAudioSource->frameLength());
emit(trackLoaded(pTrack, m_pAudioSource->sampleRate(), sampleCount));
m_readableFrameIndexRange.length());
emit trackLoaded(pTrack, m_pAudioSource->sampleRate(), sampleCount);
}

void CachingReaderWorker::quitWait() {
Expand Down
33 changes: 27 additions & 6 deletions src/engine/cachingreaderworker.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ typedef struct CachingReaderChunkReadRequest {
} CachingReaderChunkReadRequest;

enum ReaderStatus {
INVALID,
TRACK_NOT_LOADED,
TRACK_LOADED,
TRACK_UNLOADED,
CHUNK_READ_SUCCESS,
CHUNK_READ_EOF,
CHUNK_READ_INVALID
CHUNK_READ_INVALID,
CHUNK_READ_DISCARDED, // response without frame index range!
};

// POD with trivial ctor/dtor/copy for passing through FIFO
Expand All @@ -45,15 +45,36 @@ typedef struct ReaderStatusUpdate {
ReaderStatus status;

void init(
ReaderStatus statusArg = INVALID,
CachingReaderChunk* chunkArg = nullptr,
const mixxx::IndexRange& readableFrameIndexRangeArg = mixxx::IndexRange()) {
ReaderStatus statusArg,
CachingReaderChunk* chunkArg,
const mixxx::IndexRange& readableFrameIndexRangeArg) {
status = statusArg;
chunk = chunkArg;
readableFrameIndexRangeStart = readableFrameIndexRangeArg.start();
readableFrameIndexRangeEnd = readableFrameIndexRangeArg.end();
}

static ReaderStatusUpdate readDiscarded(
CachingReaderChunk* chunk) {
ReaderStatusUpdate update;
update.init(CHUNK_READ_DISCARDED, chunk, mixxx::IndexRange());
return update;
}

static ReaderStatusUpdate trackLoaded(
const mixxx::IndexRange& readableFrameIndexRange) {
DEBUG_ASSERT(!readableFrameIndexRange.empty());
ReaderStatusUpdate update;
update.init(TRACK_LOADED, nullptr, readableFrameIndexRange);
return update;
}

static ReaderStatusUpdate trackNotLoaded() {
ReaderStatusUpdate update;
update.init(TRACK_UNLOADED, nullptr, mixxx::IndexRange());
return update;
}

CachingReaderChunkForOwner* takeFromWorker() {
CachingReaderChunkForOwner* pChunk = nullptr;
if (chunk) {
Expand Down

0 comments on commit cd871db

Please sign in to comment.