Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix debug assert in CachingReader #2309

Merged
merged 11 commits into from
Oct 3, 2019
67 changes: 37 additions & 30 deletions src/engine/cachingreader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,12 @@ CachingReader::CachingReader(QString group,
// The capacity of the back channel must be equal to the number of
// allocated chunks, because the worker use writeBlocking(). Otherwise
// the worker could get stuck in a hot loop!!!
m_stateFIFO(kNumberOfCachedChunksInMemory),
m_state(State::Idle),
m_readerStatusUpdateFIFO(kNumberOfCachedChunksInMemory),
m_state(STATE_IDLE),
m_mruCachingReaderChunk(nullptr),
m_lruCachingReaderChunk(nullptr),
m_sampleBuffer(CachingReaderChunk::kSamples * kNumberOfCachedChunksInMemory),
m_worker(group, &m_chunkReadRequestFIFO, &m_stateFIFO) {
m_worker(group, &m_chunkReadRequestFIFO, &m_readerStatusUpdateFIFO) {

m_allocatedCachingReaderChunks.reserve(kNumberOfCachedChunksInMemory);
// Divide up the allocated raw memory buffer into total_chunks
Expand Down Expand Up @@ -200,43 +200,39 @@ CachingReaderChunkForOwner* CachingReader::lookupChunkAndFreshen(SINT chunkIndex
return pChunk;
}

// Invoked from the UI thread!!
void CachingReader::newTrack(TrackPointer pTrack) {
// Feed the track to the worker as soon as possible
// to get ready while the reader switches its internal
// state. There are no race conditions, because the
// reader polls the worker.
m_worker.newTrack(pTrack);
m_worker.workReady();
// Don't accept any new read requests until the current
// track has been unloaded and the new track has been
// loaded.
m_state = State::TrackLoading;
// Free all chunks with sample data from the current track.
freeAllChunks();
auto newState = pTrack ? STATE_TRACK_LOADING : STATE_TRACK_UNLOADING;
auto oldState = m_state.fetchAndStoreAcquire(newState);
VERIFY_OR_DEBUG_ASSERT(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this can currently actually happen in case the worker is stalled for some reasons.
We need to prevent it in the engine that this case is really impossible.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, from the caching reader perspective this is not longer an issue, since CachingReaderWorker::loadTrack handles the loading and is not interrupted. It is an engine issue, however we cannot judge from m_state, because this is delayed due to the fifos.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ups, the issue is the slotLoadingTrack(TrackPointer pNewTrack, TrackPointer pOldTrack) everywhere.
New Track is only a wild guess if we allow another new track loading. In most cases pNewTrack is not used but not in every case. We need to clean up this which is risky due to the string based signal slot connections.

So lets stick with this assertion, until we manage to make it happen, but not as critical.
I will add a comment.

oldState != STATE_TRACK_LOADING ||
newState != STATE_TRACK_LOADING) {
kLogger.critical()
<< "Cannot load new track while loading a track";
return;
}
m_worker.newTrack(std::move(pTrack));
Copy link
Contributor

@uklotzde uklotzde Oct 3, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The final line m_worker.workReady(); got lost along the way Ok, I see you moved it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from your template :-)

}

void CachingReader::process() {
ReaderStatusUpdate update;
while (m_stateFIFO.read(&update, 1) == 1) {
DEBUG_ASSERT(m_state != State::Idle);
while (m_readerStatusUpdateFIFO.read(&update, 1) == 1) {
auto pChunk = update.takeFromWorker();
if (pChunk) {
// Result of a read request (with a chunk)
DEBUG_ASSERT(m_state.load() != STATE_IDLE);
DEBUG_ASSERT(
update.status == CHUNK_READ_SUCCESS ||
update.status == CHUNK_READ_EOF ||
update.status == CHUNK_READ_INVALID ||
update.status == CHUNK_READ_DISCARDED);
if (m_state == State::TrackLoading) {
// All chunks have been freed before loading the next track!
DEBUG_ASSERT(!m_mruCachingReaderChunk);
DEBUG_ASSERT(!m_lruCachingReaderChunk);
if (m_state.load() == STATE_TRACK_LOADING) {
// Discard all results from pending read requests for the
// previous track before the next track has been loaded.
freeChunk(pChunk);
continue;
}
DEBUG_ASSERT(m_state == State::TrackLoaded);
DEBUG_ASSERT(m_state.load() == STATE_TRACK_LOADED);
if (update.status == CHUNK_READ_SUCCESS) {
// Insert or freshen the chunk in the MRU/LRU list after
// obtaining ownership from the worker.
Expand All @@ -253,16 +249,27 @@ void CachingReader::process() {
}
} else {
// State update (without a chunk)
DEBUG_ASSERT(!m_mruCachingReaderChunk);
DEBUG_ASSERT(!m_lruCachingReaderChunk);
if (update.status == TRACK_LOADED) {
m_state = State::TrackLoaded;
// We have a new Track ready to go.
// Assert that we had STATE_TRACK_LOADING before and all old chunks
// in the m_readerStatusUpdateFIFO have been discarded.
DEBUG_ASSERT(m_state.load() == STATE_TRACK_LOADING);
// now purge also the recently used chunk list from the old track.
freeAllChunks();
DEBUG_ASSERT(!m_mruCachingReaderChunk);
DEBUG_ASSERT(!m_lruCachingReaderChunk);
// Reset the readable frame index range
m_readableFrameIndexRange = update.readableFrameIndexRange();
m_state.storeRelease(STATE_TRACK_LOADED);
} else {
DEBUG_ASSERT(update.status == TRACK_UNLOADED);
m_state = State::Idle;
// This message could be processed later when a new
// track is already loading! In this case the TRACK_LOADED will
// be the very next status update.
if (!m_state.testAndSetRelease(STATE_TRACK_UNLOADING, STATE_IDLE)) {
DEBUG_ASSERT(m_state.load() == STATE_TRACK_LOADING);
}
}
// Reset the readable frame index range
m_readableFrameIndexRange = update.readableFrameIndexRange();
}
}
}
Expand All @@ -286,7 +293,7 @@ CachingReader::ReadResult CachingReader::read(SINT startSample, SINT numSamples,
}

// If no track is loaded, don't do anything.
if (m_state != State::TrackLoaded) {
if (m_state.load() != STATE_TRACK_LOADED) {
return ReadResult::UNAVAILABLE;
}

Expand Down Expand Up @@ -483,7 +490,7 @@ CachingReader::ReadResult CachingReader::read(SINT startSample, SINT numSamples,

void CachingReader::hintAndMaybeWake(const HintVector& hintList) {
// If no file is loaded, skip.
if (m_state != State::TrackLoaded) {
if (m_state.load() != STATE_TRACK_LOADED) {
return;
}

Expand Down
23 changes: 12 additions & 11 deletions src/engine/cachingreader.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#ifndef ENGINE_CACHINGREADER_H
#define ENGINE_CACHINGREADER_H

#include <QtDebug>
#include <QAtomicInt>
#include <QList>
#include <QVector>
#include <QLinkedList>
Expand Down Expand Up @@ -78,9 +78,9 @@ class CachingReader : public QObject {
// Construct a CachingReader with the given group.
CachingReader(QString group,
UserSettingsPointer _config);
virtual ~CachingReader();
~CachingReader() override;

virtual void process();
void process();

enum class ReadResult {
// No samples read and buffer untouched(!), try again later in case of a cache miss
Expand All @@ -101,12 +101,12 @@ class CachingReader : public QObject {
// that is not in the cache. If any hints do request a chunk not in cache,
// then wake the reader so that it can process them. Must only be called
// from the engine callback.
virtual void hintAndMaybeWake(const HintVector& hintList);
void hintAndMaybeWake(const HintVector& hintList);

// Request that the CachingReader load a new track. These requests are
// processed in the work thread, so the reader must be woken up via wake()
// for this to take effect.
virtual void newTrack(TrackPointer pTrack);
void newTrack(TrackPointer pTrack);

void setScheduler(EngineWorkerScheduler* pScheduler) {
m_worker.setScheduler(pScheduler);
Expand All @@ -124,7 +124,7 @@ class CachingReader : public QObject {
// Thread-safe FIFOs for communication between the engine callback and
// reader thread.
FIFO<CachingReaderChunkReadRequest> m_chunkReadRequestFIFO;
FIFO<ReaderStatusUpdate> m_stateFIFO;
FIFO<ReaderStatusUpdate> m_readerStatusUpdateFIFO;

// Looks for the provided chunk number in the index of in-memory chunks and
// returns it if it is present. If not, returns nullptr. If it is present then
Expand All @@ -151,12 +151,13 @@ class CachingReader : public QObject {
// Gets a chunk from the free list, frees the LRU CachingReaderChunk if none available.
CachingReaderChunkForOwner* allocateChunkExpireLRU(SINT chunkIndex);

enum class State {
Idle,
TrackLoading,
TrackLoaded,
enum State {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this changed from an enum class to an enum? Please change it back to an enum class.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we need integers for the atomic. I didn't check if this still works with an enum class.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So cast it to an int and back when needed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually had a version with a static_cast() one every use. But @uklotzde version with a plain enum conviced me, because it is less distracting. In this case we have no namespace pollution issue, because the enum is class private.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this special case the enum literals are just private integer constants and a classical enum is a good fit.

STATE_IDLE,
STATE_TRACK_LOADING,
STATE_TRACK_UNLOADING,
STATE_TRACK_LOADED,
};
State m_state;
QAtomicInt m_state;

// Keeps track of all CachingReaderChunks we've allocated.
QVector<CachingReaderChunkForOwner*> m_chunks;
Expand Down
20 changes: 10 additions & 10 deletions src/engine/cachingreaderworker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ CachingReaderWorker::CachingReaderWorker(
m_stop(0) {
}

CachingReaderWorker::~CachingReaderWorker() {
}

ReaderStatusUpdate CachingReaderWorker::processReadRequest(
const CachingReaderChunkReadRequest& request) {
CachingReaderChunk* pChunk = request.chunk;
Expand Down Expand Up @@ -87,9 +84,12 @@ ReaderStatusUpdate CachingReaderWorker::processReadRequest(

// WARNING: Always called from a different thread (GUI)
void CachingReaderWorker::newTrack(TrackPointer pTrack) {
QMutexLocker locker(&m_newTrackMutex);
m_pNewTrack = pTrack;
m_newTrackAvailable = true;
{
QMutexLocker locker(&m_newTrackMutex);
m_pNewTrack = pTrack;
m_newTrackAvailable = true;
}
workReady();
}

void CachingReaderWorker::run() {
Expand Down Expand Up @@ -135,7 +135,7 @@ void CachingReaderWorker::loadTrack(const TrackPointer& pTrack) {

if (!pTrack) {
// If no new track is available then we are done
const auto update = ReaderStatusUpdate::trackNotLoaded();
const auto update = ReaderStatusUpdate::trackUnloaded();
m_pReaderStatusFIFO->writeBlocking(&update, 1);
return;
}
Expand All @@ -149,7 +149,7 @@ void CachingReaderWorker::loadTrack(const TrackPointer& pTrack) {
<< m_group
<< "File not found"
<< filename;
const auto update = ReaderStatusUpdate::trackNotLoaded();
const auto update = ReaderStatusUpdate::trackUnloaded();
m_pReaderStatusFIFO->writeBlocking(&update, 1);
emit trackLoadFailed(
pTrack, QString("The file '%1' could not be found.")
Expand All @@ -165,7 +165,7 @@ void CachingReaderWorker::loadTrack(const TrackPointer& pTrack) {
<< m_group
<< "Failed to open file"
<< filename;
const auto update = ReaderStatusUpdate::trackNotLoaded();
const auto update = ReaderStatusUpdate::trackUnloaded();
m_pReaderStatusFIFO->writeBlocking(&update, 1);
emit trackLoadFailed(
pTrack, QString("The file '%1' could not be loaded").arg(filename));
Expand All @@ -182,7 +182,7 @@ void CachingReaderWorker::loadTrack(const TrackPointer& pTrack) {
<< m_group
<< "Failed to open empty file"
<< filename;
const auto update = ReaderStatusUpdate::trackNotLoaded();
const auto update = ReaderStatusUpdate::trackUnloaded();
m_pReaderStatusFIFO->writeBlocking(&update, 1);
emit trackLoadFailed(
pTrack, QString("The file '%1' is empty and could not be loaded").arg(filename));
Expand Down
8 changes: 4 additions & 4 deletions src/engine/cachingreaderworker.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ typedef struct ReaderStatusUpdate {
return update;
}

static ReaderStatusUpdate trackNotLoaded() {
static ReaderStatusUpdate trackUnloaded() {
ReaderStatusUpdate update;
update.init(TRACK_UNLOADED, nullptr, mixxx::IndexRange());
return update;
Expand Down Expand Up @@ -101,14 +101,14 @@ class CachingReaderWorker : public EngineWorker {
CachingReaderWorker(QString group,
FIFO<CachingReaderChunkReadRequest>* pChunkReadRequestFIFO,
FIFO<ReaderStatusUpdate>* pReaderStatusFIFO);
virtual ~CachingReaderWorker();
~CachingReaderWorker() override = default;

// Request to load a new track. wake() must be called afterwards.
virtual void newTrack(TrackPointer pTrack);
void newTrack(TrackPointer pTrack);

// Run upkeep operations like loading tracks and reading from file. Run by a
// thread pool via the EngineWorkerScheduler.
virtual void run();
void run() override;

void quitWait();

Expand Down