Skip to content

Commit

Permalink
Merge pull request #11 from idoleat/aba
Browse files Browse the repository at this point in the history
Add ABA problem subsection in "Atomic operations as building blocks"
  • Loading branch information
jserv authored Aug 1, 2024
2 parents f0ee6bc + ea19cad commit 68dae00
Show file tree
Hide file tree
Showing 4 changed files with 392 additions and 2 deletions.
89 changes: 89 additions & 0 deletions concurrency-primer.tex
Original file line number Diff line number Diff line change
Expand Up @@ -883,6 +883,95 @@ \subsection{Conclusion about lock-free}
Balancing complexity and performance is essential in concurrency,
a domain fraught with challenges.

\subsection{ABA problem}
CAS has been introduced as one of the read-modify-write operations.
However, the target object not changing does not necessarily mean that no other threads modified it halfway through.
If the target object is changed by another thread and then changed back, the result of comparison would still be equal.
In this case, the target object has indeed been modified, yet the operation appears unchanged, compromising its atomicity.
We call this \introduce{ABA problem}.
Consider the following scenario,

\inputminted{c}{./examples/simple_aba_example.c}

The execution result would be:

\begin{ccode}
A: v = 42
B: v = 47
B: v = 42
A: v = 52
\end{ccode}

In the example provided, the presence of ABA problem results in thread A being unaware that variable \cc{v} has been altered.
Since the comparison result indicates \cc{v} unchanged, \cc{v + 10} is swapped in.
Here sleeping is only used to ensure the occurrence of ABA problem.
In real world scenario, instead of sleeping, thread A could paused by being context switched for other tasks, including being preempted by higher priority tasks.
This example seems harmless, but things can get nasty when atomic \textsc{RMW} operations are used in more complex data structures.

In a broader context, the ABA problem occurs when changes occur between loading and comparing, but the comparing mechanism is unable to identify that the state of the target object is not the latest, yielding a false positive result.

Back to thread pool example in \secref{rmw}, it contains ABA problem as well.
In \monobox{worker} function, we have the thread trying to claim the job.

\begin{ccode}
job_t *job = atomic_load(&thrd_pool->head->prev);
...
while (!atomic_compare_exchange_weak(&thrd_pool->head->prev, &job,
job->prev))
;
\end{ccode}

Consider the following scenario:
\begin{enumerate}
\item There is only one job left.
\item Thread A loads the pointer to the job by \cc{atomic_load()}.
\item Thread A is preempted.
\item Thread B claims the job and successfully updates \cc{thrd_pool->head->prev}.
\item Thread B sets thread pool state to idle.
\item Main thread finishes waiting and adds more jobs.
\item Memory allocator reuses the recently freed memory as new jobs addresses.
\item Fortunately, the first added job has the same address as the one thread A held.
\item Thread A is back in running state. The comparison result is equal so it updates \cc{thrd_pool->head->prev} with the old \cc{job->prev}, which is already a dangling pointer.
\item Another thread loads the dangling pointer from \cc{thrd_pool->head->prev}.
\end{enumerate}

Notice that even though \cc{job->prev} is not loaded explicitly before comparison, compiler could place loading instructions before comparison.
At the end, the dangling pointer could either point to garbage or trigger segmentation fault.
It could be even worse if nested ABA problem occurs in thread B.
Also, the possibility to allocate a job with same address could be higher when using memory pool, meaning that more chances to have ABA problem occurred.
In fact, pre-allocated memory should be used to achieve lock-free since \monobox{malloc} could have mutex involved in multi-threaded environment.

Being unable to determine whether the target object has been changed through comparison could result in a false positive when the return value of CAS is true.
Thus, the atomicity provided by CAS is not guaranteed.
The general concept of solving this problem involves adding more information to make different state distinguishable, and then making a decision on whether to act on the old state or retry with the new state.
If acting on the old state is chosen, then safe memory reclamation should be considered as memory may have already been freed by other threads.
More aggressively, one might consider the programming paradigm where each operation on the target object does not have a side effect on modifying it.
In the later section, we will introduce a different way of implementing atomic \textsc{RMW} operations by using LL/SC instructions. The exclusive status provided by LL/SC instructions avoids the pitfall introduced by comparison.

To make different state distinguishable, a common solution is incrementing a version number each time target object is changed.
By bundling the target object and version into a comparison, it ensures that each change marks a distinguishable result.
Given a sufficient large size for version number, there should be no repeated version numbers.
There are multiple methods for storing the version number, depending on the evaluation of the duration before a version number wraps around.
In the thread pool example, the target object is a pointer. The unused bits in a pointer can be utilized to store the version number.
In addition to embedding the version number into a pointer, we could consider utilizing an additional 32-bit or 64-bit value next to the target object for the version number.
It requires the compare-and-swap instruction to be capable of comparing a wider size at once.
Sometimes, this is referred to as \introduce{double-width compare-and-swap}.
On x86-64 processors, for atomic instructions that load or store more than a CPU word size, it needs additional hardware support.
You can use \monobox{grep cx16 /proc/cpuinfo} to check if the processor supports 16-byte compare-and-swap.
For hardware that does not support the desired size, software implementations which may have locks involve are used instead, as mentioned in \secref{atomictype}.
Back to the example, ABA problem in the following code is fixed by using an version number that increments each time a job is added to the empty queue. On x86-64, add a compiler flag \monobox{-mcx16} to enable 16-byte compare-and-swap in \monobox{worker} function.

\inputminted{c}{./examples/rmw_example_aba.c}

Notice that, in the \cc{struct idle_job}, a union is used for type punning, which bundles the pointer and version number for compare-and-swap.
Directly casting a job pointer to a pointer that points to a 16-byte object is undefined behavior (due to having different alignment), thus type punning is used instead.
By using this techniques, \cc{struct idle_job} still can be accessed normally in other places, minimizing code modification.
Compiler optimizations are conservative on type punning, but it is acceptable for atomic operations.
See \secref{fusing}.
Another way to prevent ABA problem in the example is using safe memory reclamation mechanisms.
Different from previously mentioned acting on the old state, the address of a job is not freed until no one is using it.
This prevents memory allocator or memory pool from reusing the address and causing problem.

\section{Sequential consistency on weakly-ordered hardware}

Different hardware architectures offer distinct memory models or \introduce{memory models}.
Expand Down
8 changes: 6 additions & 2 deletions examples/Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
all:
$(CC) -Wall -o rmw_example rmw_example.c -pthread -lm
$(CC) -Wall -o rmw_example_aba rmw_example_aba.c -pthread -lm -mcx16
$(CC) -Wall -o simple_aba_example simple_aba_example.c -pthread
clean:
rm -f rmw_example
rm -f rmw_example rmw_example_aba simple_aba_example
check: all
./rmw_example
./rmw_example
./rmw_example_aba
./simple_aba_example
258 changes: 258 additions & 0 deletions examples/rmw_example_aba.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
#include <stdio.h>
#include <stdatomic.h>
#include <threads.h>
#include <stdlib.h>
#include <stdbool.h>
#include <assert.h>
#include <math.h>

#define PRECISION 100 /* upper bound in BPP sum */
#define CACHE_LINE_SIZE 64
#define N_THREADS 64

struct tpool_future {
void *result;
void *arg;
atomic_flag flag;
};

typedef struct job {
void *(*func)(void *);
struct tpool_future *future;
struct job *next, *prev;
} job_t;

typedef struct idle_job {
union {
struct {
_Atomic(job_t *) prev;
unsigned long long version;
};
_Atomic struct versioned_prev {
job_t *ptr;
unsigned long long _version;
} v_prev;
};
char padding[CACHE_LINE_SIZE - sizeof(_Atomic(job_t *)) -
sizeof(unsigned long long)]; /* avoid false sharing */
job_t job;
} idle_job_t;

enum state { idle, running, cancelled };

typedef struct tpool {
atomic_flag initialezed;
int size;
thrd_t *pool;
atomic_int state;
thrd_start_t func;
idle_job_t *head; /* job queue is a SPMC ring buffer */
} tpool_t;

static struct tpool_future *tpool_future_create(void *arg)
{
struct tpool_future *future = malloc(sizeof(struct tpool_future));
if (future) {
future->result = NULL;
future->arg = arg;
atomic_flag_clear(&future->flag);
atomic_flag_test_and_set(&future->flag);
}
return future;
}

void tpool_future_wait(struct tpool_future *future)
{
while (atomic_flag_test_and_set(&future->flag))
;
}

void tpool_future_destroy(struct tpool_future *future)
{
free(future->result);
free(future);
}

static int worker(void *args)
{
if (!args)
return EXIT_FAILURE;
tpool_t *thrd_pool = (tpool_t *)args;

while (1) {
/* worker is laid off */
if (atomic_load(&thrd_pool->state) == cancelled)
return EXIT_SUCCESS;
if (atomic_load(&thrd_pool->state) == running) {
/* worker takes the job */
struct versioned_prev job = atomic_load(&thrd_pool->head->v_prev);
/* worker checks if there is only an idle job in the job queue */
if (job.ptr == &thrd_pool->head->job) {
/* worker says it is idle */
atomic_store(&thrd_pool->state, idle);
thrd_yield();
continue;
}

struct versioned_prev next;
/* compare 16 byte at once */
do {
next.ptr = job.ptr->prev;
next._version = job._version;
} while (!atomic_compare_exchange_weak(&thrd_pool->head->v_prev,
&job, next));

job.ptr->future->result =
(void *)job.ptr->func(job.ptr->future->arg);
atomic_flag_clear(&job.ptr->future->flag);
free(job.ptr);
} else {
/* worker is idle */
thrd_yield();
}
};
return EXIT_SUCCESS;
}

static bool tpool_init(tpool_t *thrd_pool, size_t size)
{
if (atomic_flag_test_and_set(&thrd_pool->initialezed)) {
printf("This thread pool has already been initialized.\n");
return false;
}

assert(size > 0);
thrd_pool->pool = malloc(sizeof(thrd_t) * size);
if (!thrd_pool->pool) {
printf("Failed to allocate thread identifiers.\n");
return false;
}

idle_job_t *idle_job = malloc(sizeof(idle_job_t));
if (!idle_job) {
printf("Failed to allocate idle job.\n");
return false;
}

/* idle_job will always be the first job */
idle_job->job.next = &idle_job->job;
idle_job->job.prev = &idle_job->job;
idle_job->prev = &idle_job->job;
idle_job->version = 0ULL;
thrd_pool->func = worker;
thrd_pool->head = idle_job;
thrd_pool->state = idle;
thrd_pool->size = size;

/* employer hires many workers */
for (size_t i = 0; i < size; i++)
thrd_create(thrd_pool->pool + i, worker, thrd_pool);

return true;
}

static void tpool_destroy(tpool_t *thrd_pool)
{
if (atomic_exchange(&thrd_pool->state, cancelled))
printf("Thread pool cancelled with jobs still running.\n");

for (int i = 0; i < thrd_pool->size; i++)
thrd_join(thrd_pool->pool[i], NULL);

while (thrd_pool->head->prev != &thrd_pool->head->job) {
job_t *job = thrd_pool->head->prev->prev;
free(thrd_pool->head->prev);
thrd_pool->head->prev = job;
}
free(thrd_pool->head);
free(thrd_pool->pool);
atomic_fetch_and(&thrd_pool->state, 0);
atomic_flag_clear(&thrd_pool->initialezed);
}

/* Use Bailey–Borwein–Plouffe formula to approximate PI */
static void *bbp(void *arg)
{
int k = *(int *)arg;
double sum = (4.0 / (8 * k + 1)) - (2.0 / (8 * k + 4)) -
(1.0 / (8 * k + 5)) - (1.0 / (8 * k + 6));
double *product = malloc(sizeof(double));
if (!product)
return NULL;

*product = 1 / pow(16, k) * sum;
return (void *)product;
}

struct tpool_future *add_job(tpool_t *thrd_pool, void *(*func)(void *),
void *arg)
{
job_t *job = malloc(sizeof(job_t));
if (!job)
return NULL;

struct tpool_future *future = tpool_future_create(arg);
if (!future) {
free(job);
return NULL;
}

job->func = func;
job->future = future;
job->next = thrd_pool->head->job.next;
job->prev = &thrd_pool->head->job;
thrd_pool->head->job.next->prev = job;
thrd_pool->head->job.next = job;
if (thrd_pool->head->prev == &thrd_pool->head->job) {
thrd_pool->head->prev = job;
thrd_pool->head->version += 1;
/* the previous job of the idle job is itself */
thrd_pool->head->job.prev = &thrd_pool->head->job;
}
return future;
}

static inline void wait_until(tpool_t *thrd_pool, int state)
{
while (atomic_load(&thrd_pool->state) != state)
thrd_yield();
}

int main()
{
int bbp_args[PRECISION];
struct tpool_future *futures[PRECISION];
double bbp_sum = 0;

tpool_t thrd_pool = { .initialezed = ATOMIC_FLAG_INIT };
if (!tpool_init(&thrd_pool, N_THREADS)) {
printf("failed to init.\n");
return 0;
}
/* employer ask workers to work */
atomic_store(&thrd_pool.state, running);

/* employer wait ... until workers are idle */
wait_until(&thrd_pool, idle);

/* employer add more job to the job queue */
for (int i = 0; i < PRECISION; i++) {
bbp_args[i] = i;
futures[i] = add_job(&thrd_pool, bbp, &bbp_args[i]);
}

/* employer ask workers to work */
atomic_store(&thrd_pool.state, running);

/* employer wait for the result of job */
for (int i = 0; i < PRECISION; i++) {
tpool_future_wait(futures[i]);
bbp_sum += *(double *)(futures[i]->result);
tpool_future_destroy(futures[i]);
}

/* employer destroys the job queue and lays workers off */
tpool_destroy(&thrd_pool);
printf("PI calculated with %d terms: %.15f\n", PRECISION, bbp_sum);
return 0;
}
Loading

0 comments on commit 68dae00

Please sign in to comment.