Skip to content

Commit

Permalink
dp: introduce dp_queue
Browse files Browse the repository at this point in the history
DP queue is a circular buffer providing safe
consumer/producer cached operations cross cores

Both data consumer and data producer declare max chunk sizes
they want to use (IBS/OBS)

The queue may work in 2 modes
 1) simple mode
    in case both receiver and sender are located on the
    same core and cache coherency does not matter.
    dp_queue structure is located in cached memory
    In this case DP Queue is a simple ring buffer
    Buffer size must be:
	- 2*MAX(IBS,OBS) if IBS(obs) is a multiplication of OBS(IBS)
	- 3*MAX(IBS,OBS) otherwise

 2) shared mode
    In this case we need to writeback cache when
    new data arrive and invalidate cache on secondary core.
    That means the whole cacheline must be used exclusively
    by sink or by source. Incoming data will be available
    to use when a cacheline is filled completely
    dp_queue structure is located in shared memory

    buffer size is always 3*MAX(IBS,OBS,CACHELINE) + CACHELINE

Signed-off-by: Marcin Szkudlinski <marcin.szkudlinski@intel.com>
  • Loading branch information
marcinszkudlinski committed Aug 16, 2023
1 parent 17cb6d9 commit 1239780
Show file tree
Hide file tree
Showing 3 changed files with 481 additions and 0 deletions.
391 changes: 391 additions & 0 deletions src/audio/dp_queue.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,391 @@
// SPDX-License-Identifier: BSD-3-Clause
//
// Copyright(c) 2021 Intel Corporation. All rights reserved.
//

#include <sof/audio/dp_queue.h>
#include <sof/audio/sink_api_implementation.h>
#include <sof/audio/source_api_implementation.h>
#include <sof/audio/audio_stream.h>

#include <rtos/alloc.h>
#include <ipc/topology.h>

struct dp_queue {
struct sof_source source_api; /**< src api handler */
struct sof_sink sink_api; /**< sink api handler */

uint32_t ibs;
uint32_t obs;
uint32_t flags;

size_t data_buffer_size;
uint8_t __sparse_cache *data_buffer;

uint32_t write_offset;
uint32_t read_offset;
uint32_t available_data; /* amount of data ready for immediate reading */
/* free buffer space
* NOTE!
* - when dp queue is shared, available_data + free_space DOES NOT eq data_buffer_size
* - when dp queue is not shared available_data + free_space always == data_buffer_size
*/
uint32_t free_space;

bool hw_params_configured;
struct sof_audio_stream_params audio_stream_params;

struct k_spinlock lock;
};

union lock_key {
k_spinlock_key_t spinlock;
int irq_lock;
};

static inline bool dp_queue_is_shared(struct dp_queue *dp_queue)
{
return !!(dp_queue->flags & DP_QUEUE_MODE_SHARED);
}

static inline uint8_t __sparse_cache *dp_queue_buffer_end(struct dp_queue *dp_queue)
{
return dp_queue->data_buffer + dp_queue->data_buffer_size;
}

static inline struct dp_queue *dp_queue_get_from_sink(struct sof_sink *sink)
{
return container_of(sink, struct dp_queue, sink_api);
}

static inline struct dp_queue *dp_queue_get_from_source(struct sof_source *source)
{
return container_of(source, struct dp_queue, source_api);
}

static inline void dp_queue_invalidate_shared(struct dp_queue *dp_queue,
void __sparse_cache *ptr, uint32_t size)
{
/* wrap-around? */
if ((uintptr_t)ptr + size > (uintptr_t)dp_queue_buffer_end(dp_queue)) {
/* writeback till the end of circular buffer */
dcache_invalidate_region
(ptr, (uintptr_t)dp_queue_buffer_end(dp_queue) - (uintptr_t)ptr);
size -= (uintptr_t)dp_queue_buffer_end(dp_queue) - (uintptr_t)ptr;
ptr = (__sparse_force void __sparse_cache *)dp_queue->data_buffer;
}
/* invalidate rest of data */
dcache_invalidate_region(ptr, size);
}

static inline union lock_key dp_queue_lock(struct dp_queue *dp_queue)
{
/* use cross-core spinlock in case of shared queue
* When shared, dp_queue structure is located in not cached memory
* as required by spinlock
*/
union lock_key key;

if (dp_queue_is_shared(dp_queue))
key.spinlock = k_spin_lock(&dp_queue->lock);
else
/* use faster irq_lock in case of not shared queue (located in cached mem) */
key.irq_lock = irq_lock();

return key;
}

static inline void dp_queue_unlock(struct dp_queue *dp_queue, union lock_key key)
{
if (dp_queue_is_shared(dp_queue))
k_spin_unlock(&dp_queue->lock, key.spinlock);
else
irq_unlock(key.irq_lock);
}

struct sof_sink *dp_queue_get_sink(struct dp_queue *dp_queue)
{
return &dp_queue->sink_api;
}

struct sof_source *dp_queue_get_source(struct dp_queue *dp_queue)
{
return &dp_queue->source_api;
}

static size_t dp_queue_get_free_size(struct sof_sink *sink)
{
struct dp_queue *dp_queue = dp_queue_get_from_sink(sink);

return dp_queue->free_space;
}

static int dp_queue_get_buffer(struct sof_sink *sink, size_t req_size,
void **data_ptr, void **buffer_start, size_t *buffer_size)
{
struct dp_queue *dp_queue = dp_queue_get_from_sink(sink);

/* dp_queue_get_free_size will return free size with adjustment for cacheline if needed */
if (req_size > dp_queue_get_free_size(sink))
return -ENODATA;

/* no need to lock, just reading data that may be modified by commit_buffer only
*
* note! a sparse warning will be generated here till
* https://github.com/thesofproject/sof/issues/8006 is implemented
*/
*data_ptr = dp_queue->data_buffer + dp_queue->write_offset;
*buffer_start = dp_queue->data_buffer;
*buffer_size = dp_queue->data_buffer_size;

/* provided buffer is an empty space, the requester will perform write operations only
* no need to invalidate cache - will be overwritten anyway
*/
return 0;
}

static int dp_queue_commit_buffer(struct sof_sink *sink, size_t commit_size)
{
struct dp_queue *dp_queue = dp_queue_get_from_sink(sink);

if (commit_size) {
union lock_key key = dp_queue_lock(dp_queue);

if (dp_queue_is_shared(dp_queue)) {
/* a shared queue. We need to go through committed cachelines one-by-one
* and if the whole cacheline is committed - writeback cache
* and mark data as available for reading
*
* first, calculate the current and last committed cacheline
* as offsets from buffer start
*/
uint32_t current_cacheline = dp_queue->write_offset /
PLATFORM_DCACHE_ALIGN;

/* Last used cacheline may not be filled completely, calculate cacheline
* containing 1st free byte
*/
uint32_t last_cacheline = (dp_queue->write_offset + commit_size + 1) /
PLATFORM_DCACHE_ALIGN;
uint32_t total_num_of_cachelines = dp_queue->data_buffer_size /
PLATFORM_DCACHE_ALIGN;
/* wrap-around? */
last_cacheline %= total_num_of_cachelines;

/* now go one by one.
* if current_cacheline == last_full_cacheline - nothing to do
*/
while (current_cacheline != last_cacheline) {
/* writeback / invalidate */
uint8_t __sparse_cache *ptr = dp_queue->data_buffer +
(current_cacheline * PLATFORM_DCACHE_ALIGN);
dcache_writeback_region(ptr, PLATFORM_DCACHE_ALIGN);

/* mark data as available to read */
dp_queue->available_data += PLATFORM_DCACHE_ALIGN;
/* get next cacheline */
current_cacheline = (current_cacheline + 1) %
total_num_of_cachelines;
}
} else {
/* not shared */
dp_queue->available_data += commit_size;
}

/* move write pointer */
dp_queue->free_space -= commit_size;
dp_queue->write_offset = (dp_queue->write_offset + commit_size) %
dp_queue->data_buffer_size;

dp_queue_unlock(dp_queue, key);
}

return 0;
}

static size_t dp_queue_get_data_available(struct sof_source *source)
{
struct dp_queue *dp_queue = dp_queue_get_from_source(source);

/* access is read only, using uncached alias, no need to lock */
return dp_queue->available_data;
}

static int dp_queue_get_data(struct sof_source *source, size_t req_size,
void **data_ptr, void **buffer_start, size_t *buffer_size)
{
struct dp_queue *dp_queue = dp_queue_get_from_source(source);

/* no need to lock, just reading data */
if (req_size > dp_queue_get_data_available(source))
return -ENODATA;

/*
* note! a sparse warning will be generated here till
* https://github.com/thesofproject/sof/issues/8006 is implemented
*/
*buffer_start = dp_queue->data_buffer;
*buffer_size = dp_queue->data_buffer_size;
*data_ptr = dp_queue->data_buffer + dp_queue->read_offset;

/* clean cache in provided data range */
if (dp_queue_is_shared(dp_queue))
dp_queue_invalidate_shared(dp_queue,
(__sparse_force void __sparse_cache *)*data_ptr,
req_size);

return 0;
}

static int dp_queue_release_data(struct sof_source *source, size_t free_size)
{
struct dp_queue *dp_queue = dp_queue_get_from_source(source);

if (free_size) {
/* data consumed, free buffer space, no need for any special cache operations */
union lock_key key = dp_queue_lock(dp_queue);

dp_queue->available_data -= free_size;
dp_queue->free_space += free_size;
dp_queue->read_offset =
(dp_queue->read_offset + free_size) % dp_queue->data_buffer_size;

dp_queue_unlock(dp_queue, key);
}

return 0;
}

static int dp_queue_set_ipc_params(struct dp_queue *dp_queue,
struct sof_ipc_stream_params *params,
bool force_update)
{
if (dp_queue->hw_params_configured && !force_update)
return 0;

dp_queue->audio_stream_params.frame_fmt = params->frame_fmt;
dp_queue->audio_stream_params.rate = params->rate;
dp_queue->audio_stream_params.channels = params->channels;
dp_queue->audio_stream_params.buffer_fmt = params->buffer_fmt;

dp_queue->hw_params_configured = true;

return 0;
}

/*
* note! a sparse warning will be generated here till
* https://github.com/thesofproject/sof/issues/8006 is implemented
*/

static int dp_queue_set_ipc_params_source(struct sof_source *source,
struct sof_ipc_stream_params *params,
bool force_update)
{
struct dp_queue *dp_queue = dp_queue_get_from_source(source);

return dp_queue_set_ipc_params(dp_queue, params, force_update);
}

static int dp_queue_set_ipc_params_sink(struct sof_sink *sink,
struct sof_ipc_stream_params *params,
bool force_update)
{
struct dp_queue *dp_queue = dp_queue_get_from_sink(sink);

return dp_queue_set_ipc_params(dp_queue, params, force_update);
}

/*
* note! a sparse warning will be generated here till
* https://github.com/thesofproject/sof/issues/8006 is implemented
*/
static const struct source_ops dp_queue_source_ops = {
.get_data_available = dp_queue_get_data_available,
.get_data = dp_queue_get_data,
.release_data = dp_queue_release_data,
.audio_set_ipc_params = dp_queue_set_ipc_params_source,
};

static const struct sink_ops dp_queue_sink_ops = {
.get_free_size = dp_queue_get_free_size,
.get_buffer = dp_queue_get_buffer,
.commit_buffer = dp_queue_commit_buffer,
.audio_set_ipc_params = dp_queue_set_ipc_params_sink,
};

struct dp_queue *dp_queue_create(uint32_t ibs, uint32_t obs, uint32_t flags)
{
uint32_t align;
struct dp_queue *dp_queue;

/* allocate DP structure */
if (flags & DP_QUEUE_MODE_SHARED)
dp_queue = rzalloc(SOF_MEM_ZONE_RUNTIME_SHARED, 0, SOF_MEM_CAPS_RAM,
sizeof(*dp_queue));
else
dp_queue = rzalloc(SOF_MEM_ZONE_RUNTIME, 0, SOF_MEM_CAPS_RAM, sizeof(*dp_queue));
if (!dp_queue)
return NULL;

dp_queue->flags = flags;

/* initiate sink/source provide uncached pointers to audio_stream_params
*
* note! a sparse warning will be generated here till
* https://github.com/thesofproject/sof/issues/8006 is implemented
*/
source_init(dp_queue_get_source(dp_queue), &dp_queue_source_ops,
&dp_queue->audio_stream_params);
sink_init(dp_queue_get_sink(dp_queue), &dp_queue_sink_ops,
&dp_queue->audio_stream_params);

/* set original ibs/obs as required by the module */
sink_set_min_free(&dp_queue->sink_api, ibs);
source_set_min_available(&dp_queue->source_api, obs);

/* calculate required buffer size */
if (dp_queue_is_shared(dp_queue)) {
ibs = MAX(ibs, PLATFORM_DCACHE_ALIGN);
obs = MAX(obs, PLATFORM_DCACHE_ALIGN);
align = PLATFORM_DCACHE_ALIGN;
dp_queue->data_buffer_size = 3 * MAX(ibs, obs) + PLATFORM_DCACHE_ALIGN;
/* reserve 1 cacheline to ensure exclusive use */
dp_queue->free_space = dp_queue->data_buffer_size - PLATFORM_DCACHE_ALIGN;
} else {
if ((ibs % obs == 0) && (obs % ibs == 0))
dp_queue->data_buffer_size = 2 * MAX(ibs, obs);
else
dp_queue->data_buffer_size = 3 * MAX(ibs, obs);
align = 0;
dp_queue->free_space = dp_queue->data_buffer_size;
}
dp_queue->available_data = 0;

/* allocate data buffer - always in cached memory */
dp_queue->data_buffer = (__sparse_force __sparse_cache void *)
rballoc_align(0, 0, dp_queue->data_buffer_size, align);
if (!dp_queue->data_buffer)
goto err;

/* return allocated structure */
return dp_queue;
err:
rfree(dp_queue);
return NULL;
}

void dp_queue_free(struct dp_queue *dp_queue)
{
dp_queue_invalidate_shared(dp_queue,
(__sparse_force void __sparse_cache *)dp_queue->data_buffer,
dp_queue->data_buffer_size);

rfree((__sparse_force void *)dp_queue->data_buffer);
rfree(dp_queue);
}

struct sof_audio_stream_params *dp_queue_get_audio_params(struct dp_queue *dp_queue)
{
return &dp_queue->audio_stream_params;
}
Loading

0 comments on commit 1239780

Please sign in to comment.