Skip to content

Commit

Permalink
dp: introduce dp_queue
Browse files Browse the repository at this point in the history
DP queue is a lockless circular buffer
providing safe consumer/producer cached operations cross cores

prerequisites:
 1) incoming and outgoing data rate MUST be the same
 2) Both data consumer and data producer declare
    max chunk sizes they want to use (IBS/OBS)

required Buffer size:
	- 2*MAX(IBS,OBS) if the larger of IBS/OBS
          is multiplication of smaller
	- 3*MAX(IBS,OBS) otherwise

The queue may work in 2 modes
1) local mode
   in case both receiver and sender are located on
   the same core and cache coherency
   does not matter. dp_queue structure is located in cached memory
   In this case DP Queue is a simple ring buffer

2) shared mode
   In this case we need to writeback cache when new data
   arrive and invalidate cache on secondary core.
   dp_queue structure is located in shared memory

dpQueue is a lockless consumer/producer safe buffer.
It is achieved by having only 2 shared variables:

 write_offset - can be modified by data producer only
 read_offset - can be modified by data consumer only

 as 32 bit operations are atomic, it is multi-thread and multi-core save

There some explanation needed how free_space and
available_data are calculated

number of avail data in circular buffer may be calculated as:
	data_avail = write_offset - read_offset
  and check for wrap around
	if (data_avail < 0) data_avail = buffer_size + data_avail

The problem is when write_offset == read_offset,
!!! it may mean either that the buffer is empty
    or the buffer is completely filled !!!

To solve the above issue having only 2 variables mentioned before:
 - allow both offsets to point from 0 to DOUBLE buffer_size
 - when calculating pointers to data, use: data_bufer[offset % buffer_size]
 - use double buffer size in wrap around check when calculating available data

And now:
  - write_offset == read_offset
		always means "buffer empty"
  - write_offset == read_offset + buffer_size
		always means "buffer full"

  - data_avail = write_offset - read_offset
	if (data_avail < 0) data_avail = 2 * buffer_size + data_avail

Signed-off-by: Marcin Szkudlinski <marcin.szkudlinski@intel.com>
  • Loading branch information
marcinszkudlinski committed Aug 25, 2023
1 parent d2f96d4 commit 8c902e9
Show file tree
Hide file tree
Showing 3 changed files with 468 additions and 0 deletions.
351 changes: 351 additions & 0 deletions src/audio/dp_queue.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,351 @@
// SPDX-License-Identifier: BSD-3-Clause
//
// Copyright(c) 2021 Intel Corporation. All rights reserved.
//

#include <sof/common.h>
#include <sof/trace/trace.h>
#include <sof/lib/uuid.h>

#include <sof/audio/dp_queue.h>
#include <sof/audio/sink_api_implementation.h>
#include <sof/audio/source_api_implementation.h>
#include <sof/audio/audio_stream.h>

#include <rtos/alloc.h>
#include <ipc/topology.h>

LOG_MODULE_REGISTER(dp_queue, CONFIG_SOF_LOG_LEVEL);

/* 393608d8-4188-11ee-be56-0242ac120002 */
DECLARE_SOF_RT_UUID("dp_queue", dp_queue_uuid, 0x393608d8, 0x4188, 0x11ee,
0xbe, 0x56, 0x02, 0x42, 0xac, 0x12, 0x20, 0x02);
DECLARE_TR_CTX(dp_queue_tr, SOF_UUID(dp_queue_uuid), LOG_LEVEL_INFO);


struct dp_queue {
struct sof_source source_api; /**< src api handler */
struct sof_sink sink_api; /**< sink api handler */

size_t ibs;
size_t obs;
uint32_t flags; /* DP_QUEUE_MODE_* */

size_t data_buffer_size;
uint8_t __sparse_cache *data_buffer;

uint32_t write_offset; /* to be modified by data producer ONLY */
uint32_t read_offset; /* to be modified by data consumer ONLY */

bool hw_params_configured;
struct sof_audio_stream_params audio_stream_params;
};

static inline bool dp_queue_is_shared(struct dp_queue *dp_queue)
{
return !!(dp_queue->flags & DP_QUEUE_MODE_SHARED);
}

static inline uint8_t __sparse_cache *dp_queue_buffer_end(struct dp_queue *dp_queue)
{
return dp_queue->data_buffer + dp_queue->data_buffer_size;
}

static inline struct dp_queue *dp_queue_from_sink(struct sof_sink *sink)
{
return container_of(sink, struct dp_queue, sink_api);
}

static inline struct dp_queue *dp_queue_from_source(struct sof_source *source)
{
return container_of(source, struct dp_queue, source_api);
}

static inline void dp_queue_invalidate_shared(struct dp_queue *dp_queue,
void __sparse_cache *ptr, size_t size)
{
/* no cache required in case of not shared queue */
if (!dp_queue_is_shared(dp_queue))
return;

/* wrap-around? */
if ((uintptr_t)ptr + size > (uintptr_t)dp_queue_buffer_end(dp_queue)) {
/* writeback till the end of circular buffer */
dcache_invalidate_region
(ptr, (uintptr_t)dp_queue_buffer_end(dp_queue) - (uintptr_t)ptr);
size -= (uintptr_t)dp_queue_buffer_end(dp_queue) - (uintptr_t)ptr;
ptr = (__sparse_force void __sparse_cache *)dp_queue->data_buffer;
}
/* invalidate rest of data */
dcache_invalidate_region(ptr, size);
}

static inline void dp_queue_writeback_shared(struct dp_queue *dp_queue,
void __sparse_cache *ptr, size_t size)
{
/* no cache required in case of not shared queue */
if (!dp_queue_is_shared(dp_queue))
return;

/* wrap-around? */
if ((uintptr_t)ptr + size > (uintptr_t)dp_queue_buffer_end(dp_queue)) {
/* writeback till the end of circular buffer */
dcache_writeback_region
(ptr, (uintptr_t)dp_queue_buffer_end(dp_queue) - (uintptr_t)ptr);
size -= (uintptr_t)dp_queue_buffer_end(dp_queue) - (uintptr_t)ptr;
ptr = (__sparse_force void __sparse_cache *)dp_queue->data_buffer;
}
/* writeback rest of data */
dcache_writeback_region(ptr, size);
}

struct sof_sink *dp_queue_get_sink(struct dp_queue *dp_queue)
{
return &dp_queue->sink_api;
}

struct sof_source *dp_queue_get_source(struct dp_queue *dp_queue)
{
return &dp_queue->source_api;
}

static inline
uint8_t __sparse_cache *dp_queue_get_pointer(struct dp_queue *dp_queue, uint32_t offset)
{
/* check if offset is not in "double area" */
if (offset >= dp_queue->data_buffer_size)
offset -= dp_queue->data_buffer_size;
return dp_queue->data_buffer + offset;
}

static inline
uint32_t dp_queue_inc_offset(struct dp_queue *dp_queue, uint32_t offset, uint32_t inc)
{
offset += inc;
/* wrap around ? 2*size because of "double area" */
if (offset >= 2*dp_queue->data_buffer_size)
offset -= 2 * dp_queue->data_buffer_size;
return offset;
}

static inline
size_t _dp_queue_get_data_available(struct dp_queue *dp_queue)
{
int32_t avail_data = dp_queue->write_offset - dp_queue->read_offset;
/* wrap around ? 2*size because of "double area" */
if (avail_data < 0)
avail_data = 2 * dp_queue->data_buffer_size + avail_data;

return avail_data;

}

static size_t dp_queue_get_data_available(struct sof_source *source)
{
struct dp_queue *dp_queue = dp_queue_from_source(source);

return _dp_queue_get_data_available(dp_queue);
}

static size_t dp_queue_get_free_size(struct sof_sink *sink)
{
struct dp_queue *dp_queue = dp_queue_from_sink(sink);

return dp_queue->data_buffer_size - _dp_queue_get_data_available(dp_queue);
}

static int dp_queue_get_buffer(struct sof_sink *sink, size_t req_size,
void **data_ptr, void **buffer_start, size_t *buffer_size)
{
struct dp_queue *dp_queue = dp_queue_from_sink(sink);

if (req_size > dp_queue_get_free_size(sink))
return -ENODATA;

/* note! a sparse warning will be generated here till
* https://github.com/thesofproject/sof/issues/8006 is implemented
*/
*data_ptr = dp_queue_get_pointer(dp_queue, dp_queue->write_offset);
*buffer_start = dp_queue->data_buffer;
*buffer_size = dp_queue->data_buffer_size;

/* no need to invalidate cache - buffer is to be written only */
return 0;
}

static int dp_queue_commit_buffer(struct sof_sink *sink, size_t commit_size)
{
struct dp_queue *dp_queue = dp_queue_from_sink(sink);

if (commit_size) {
dp_queue_writeback_shared(dp_queue,
dp_queue_get_pointer(dp_queue, dp_queue->write_offset),
commit_size);

/* move write pointer */
dp_queue->write_offset =
dp_queue_inc_offset(dp_queue, dp_queue->write_offset, commit_size);
}

return 0;
}

static int dp_queue_get_data(struct sof_source *source, size_t req_size,
void **data_ptr, void **buffer_start, size_t *buffer_size)
{
struct dp_queue *dp_queue = dp_queue_from_source(source);

if (req_size > dp_queue_get_data_available(source))
return -ENODATA;

/*
* note! a sparse warning will be generated here till
* https://github.com/thesofproject/sof/issues/8006 is implemented
*/
*buffer_start = dp_queue->data_buffer;
*buffer_size = dp_queue->data_buffer_size;
*data_ptr = dp_queue_get_pointer(dp_queue, dp_queue->read_offset);

/* clean cache in provided data range */
dp_queue_invalidate_shared(dp_queue, *data_ptr, req_size);

return 0;
}

static int dp_queue_release_data(struct sof_source *source, size_t free_size)
{
struct dp_queue *dp_queue = dp_queue_from_source(source);

if (free_size) {
/* data consumed, free buffer space, no need for any special cache operations */
dp_queue->read_offset =
dp_queue_inc_offset(dp_queue, dp_queue->read_offset, free_size);
}

return 0;
}

static int dp_queue_set_ipc_params(struct dp_queue *dp_queue,
struct sof_ipc_stream_params *params,
bool force_update)
{
if (dp_queue->hw_params_configured && !force_update)
return 0;

dp_queue->audio_stream_params.frame_fmt = params->frame_fmt;
dp_queue->audio_stream_params.rate = params->rate;
dp_queue->audio_stream_params.channels = params->channels;
dp_queue->audio_stream_params.buffer_fmt = params->buffer_fmt;

dp_queue->hw_params_configured = true;

return 0;
}

/*
* note! a sparse warning will be generated here till
* https://github.com/thesofproject/sof/issues/8006 is implemented
*/

static int dp_queue_set_ipc_params_source(struct sof_source *source,
struct sof_ipc_stream_params *params,
bool force_update)
{
struct dp_queue *dp_queue = dp_queue_from_source(source);

return dp_queue_set_ipc_params(dp_queue, params, force_update);
}

static int dp_queue_set_ipc_params_sink(struct sof_sink *sink,
struct sof_ipc_stream_params *params,
bool force_update)
{
struct dp_queue *dp_queue = dp_queue_from_sink(sink);

return dp_queue_set_ipc_params(dp_queue, params, force_update);
}

/*
* note! a sparse warning will be generated here till
* https://github.com/thesofproject/sof/issues/8006 is implemented
*/
static const struct source_ops dp_queue_source_ops = {
.get_data_available = dp_queue_get_data_available,
.get_data = dp_queue_get_data,
.release_data = dp_queue_release_data,
.audio_set_ipc_params = dp_queue_set_ipc_params_source,
};

static const struct sink_ops dp_queue_sink_ops = {
.get_free_size = dp_queue_get_free_size,
.get_buffer = dp_queue_get_buffer,
.commit_buffer = dp_queue_commit_buffer,
.audio_set_ipc_params = dp_queue_set_ipc_params_sink,
};

struct dp_queue *dp_queue_create(size_t ibs, size_t obs, uint32_t flags)
{
struct dp_queue *dp_queue;

/* allocate DP structure */
if (flags & DP_QUEUE_MODE_SHARED)
dp_queue = rzalloc(SOF_MEM_ZONE_RUNTIME_SHARED, 0, SOF_MEM_CAPS_RAM,
sizeof(*dp_queue));
else
dp_queue = rzalloc(SOF_MEM_ZONE_RUNTIME, 0, SOF_MEM_CAPS_RAM, sizeof(*dp_queue));
if (!dp_queue)
return NULL;

dp_queue->flags = flags;

/* initiate sink/source
*
* note! a sparse warning will be generated here till
* https://github.com/thesofproject/sof/issues/8006 is implemented
*/
source_init(dp_queue_get_source(dp_queue), &dp_queue_source_ops,
&dp_queue->audio_stream_params);
sink_init(dp_queue_get_sink(dp_queue), &dp_queue_sink_ops,
&dp_queue->audio_stream_params);

/* set original ibs/obs as required by the module */
sink_set_obs(&dp_queue->sink_api, ibs);
source_set_ibs(&dp_queue->source_api, obs);

uint32_t max_ibs_obs = MAX(ibs, obs);
uint32_t min_ibs_obs = MIN(ibs, obs);

/* calculate required buffer size */
if (max_ibs_obs % min_ibs_obs == 0)
dp_queue->data_buffer_size = 2 * max_ibs_obs;
else
dp_queue->data_buffer_size = 3 * max_ibs_obs;

/* allocate data buffer - always in cached memory alias */
dp_queue->data_buffer_size = ALIGN_UP(dp_queue->data_buffer_size, PLATFORM_DCACHE_ALIGN);
dp_queue->data_buffer = (__sparse_force __sparse_cache void *)
rballoc_align(0, 0, dp_queue->data_buffer_size, PLATFORM_DCACHE_ALIGN);
if (!dp_queue->data_buffer)
goto err;

tr_info(&dp_queue_tr, "DpQueue created, shared: %u ibs: %u obs %u, size %u",
dp_queue_is_shared(dp_queue), ibs, obs, dp_queue->data_buffer_size);

/* return a pointer to allocated structure */
return dp_queue;
err:
tr_err(&dp_queue_tr, "DpQueue creation failure");
rfree(dp_queue);
return NULL;
}

void dp_queue_free(struct dp_queue *dp_queue)
{
rfree((__sparse_force void *)dp_queue->data_buffer);
rfree(dp_queue);
}

struct sof_audio_stream_params *dp_queue_get_audio_params(struct dp_queue *dp_queue)
{
return &dp_queue->audio_stream_params;
}
Loading

0 comments on commit 8c902e9

Please sign in to comment.