Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wfqueue MCSP stucking at spin function scope wfqueue.c [30|32] #11

Open
Taheta opened this issue Jun 24, 2019 · 6 comments
Open

wfqueue MCSP stucking at spin function scope wfqueue.c [30|32] #11

Taheta opened this issue Jun 24, 2019 · 6 comments

Comments

@Taheta
Copy link

Taheta commented Jun 24, 2019

Hi chaoran, this is confirm stuck if mcsp
Nproc = 16
consumer: 14
producer: 1

0x0000000000400e5f in running_wfq_test (arg_producer=, arg_consumer=, arg_producing=,
arg_consuming=, total_threads=15, test_type=0x401fbd "MCSP") at main_test.c:121
121 while (__sync_fetch_and_add(&config.nConsuming, 0) < TEST_MAX_INPUT * (config.nProducer)) {
Missing separate debuginfos, use: debuginfo-install glibc-2.17-222.el7.x86_64
(gdb) info threads
Id Target Id Frame
16 Thread 0x7fffbf7fe700 (LWP 25694) "test" spin (p=0x7fffc8031980) at wfqueue.c:32
15 Thread 0x7fffbffff700 (LWP 25693) "test" spin (p=0x7fffc8032180) at wfqueue.c:32
14 Thread 0x7fffdcff9700 (LWP 25692) "test" spin (p=0x7fffc80324c0) at wfqueue.c:30
13 Thread 0x7fffdd7fa700 (LWP 25691) "test" 0x0000000000401788 in spin (p=0x7fffc8032a80) at wfqueue.c:30
12 Thread 0x7fffddffb700 (LWP 25690) "test" 0x0000000000401788 in spin (p=0x7fffc8032d80) at wfqueue.c:30
11 Thread 0x7fffde7fc700 (LWP 25689) "test" 0x0000000000401788 in spin (p=0x7fffc8033200) at wfqueue.c:30
10 Thread 0x7fffdeffd700 (LWP 25688) "test" spin (p=0x7fffc8033540) at wfqueue.c:32
9 Thread 0x7fffdf7fe700 (LWP 25687) "test" spin (p=0x7fffc80338c0) at wfqueue.c:32
8 Thread 0x7fffdffff700 (LWP 25686) "test" 0x0000000000401788 in spin (p=0x7fffc8033cc0) at wfqueue.c:30
7 Thread 0x7ffff4fec700 (LWP 25685) "test" 0x00000000004017f9 in help_enq (i=91476149, c=0x7fffc8033ec0, th=0x7fffd8001000, q=0x605000)
at wfqueue.c:213
6 Thread 0x7ffff57ed700 (LWP 25684) "test" 0x0000000000401788 in spin (p=0x7fffe4013080) at wfqueue.c:30
5 Thread 0x7ffff5fee700 (LWP 25683) "test" 0x0000000000401788 in spin (p=0x7fffe4013200) at wfqueue.c:30
4 Thread 0x7ffff67ef700 (LWP 25682) "test" 0x0000000000401788 in spin (p=0x7fffc802cd80) at wfqueue.c:30
3 Thread 0x7ffff6ff0700 (LWP 25681) "test" 0x0000000000401788 in spin (p=0x7fffe4013380) at wfqueue.c:30

  • 1 Thread 0x7ffff7fe0740 (LWP 25676) "test" 0x0000000000400e5f in running_wfq_test (arg_producer=, arg_consumer=,
    arg_producing=, arg_consuming=, total_threads=15, test_type=0x401fbd "MCSP") at main_test.c:121
@chaoran
Copy link
Owner

chaoran commented Jun 24, 2019

Is this the same issue as #10 ? Or a different one?

@Taheta
Copy link
Author

Taheta commented Jun 24, 2019

This is different, this is MCSP. This is confirmed issue. I have dropped you a code, you may take a look.

@Taheta Taheta changed the title wfqueue MPSC stucking at spin function scope wfqueue.c [30|32] wfqueue MCSP stucking at spin function scope wfqueue.c [30|32] Jun 24, 2019
@Taheta
Copy link
Author

Taheta commented Jun 24, 2019

Hi Chaoran,

This is the MCSP code, it still stuck after I put barrier

/*
 * compile : g++ -std=c++11 -I./ OverallTest.cpp -pthread -Wall -o overalltest
 * execute
 * valgrind --fair-sched=yes ./overalltest
 */
// #include <iostream>
#include <stdio.h>
#include <time.h>
#include <pthread.h>
// #include <thread>

#include "queue.h"
#include <unistd.h>
#include <assert.h>
#include <sys/time.h>

#define MILLION  1000000
#define TEST_MAX_INPUT  MILLION

static pthread_barrier_t barrier;
typedef struct {
    size_t v;
} MyVal;

typedef struct {
    size_t nProducer;
    size_t nConsumer;
    size_t nProducing;
    size_t nConsuming;
    queue_t *q;
    handle_t **hds;
} wfq_test_config_t;

int TEST_COUNT = 0;


MyVal* newval(size_t digit) {
    MyVal *data = (MyVal*)malloc(sizeof(MyVal));
    data->v = digit;
    return  data;
}


static int id = 0;
void * producing_fn(void *v) {
    int z;
    wfq_test_config_t* config = (wfq_test_config_t*)v;
    queue_t *q = config->q;
    handle_t **hds = config->hds;
    int _id = __sync_fetch_and_add(&id, 1);
    hds[_id] = align_malloc(PAGE_SIZE, sizeof(handle_t));
    queue_register(q, hds[_id], _id);
    pthread_barrier_wait(&barrier);
    for (z = 0; z < TEST_MAX_INPUT; z++) {
        MyVal* s = newval(__sync_fetch_and_add(&config->nProducing, 1));

        enqueue(q, hds[_id], s);
        // wfq_sleep(1);
        // if (xx % 100000 == 0)
        //     printf("%zu\n", xx);
    }

    pthread_barrier_wait(&barrier);
    queue_free(q, hds[_id]);
    return NULL;
}
void * consuming_fn(void *v) {
    wfq_test_config_t* config = (wfq_test_config_t*)v;
    queue_t *q = config->q;
    handle_t **hds = config->hds;
    int _id = __sync_fetch_and_add(&id, 1);
    hds[_id] = align_malloc(PAGE_SIZE, sizeof(handle_t));
    queue_register(q, hds[_id], _id);
    pthread_barrier_wait(&barrier);

    for (;;) {
        MyVal* s;
        while ( (s = (MyVal*)dequeue(q, hds[_id]) )  ) {
            if (s->v % 100000 == 0) {
                printf("t %zu\n", s->v);
            }
            free(s);
            __sync_fetch_and_add(&config->nConsuming, 1);
        }
        if (__sync_fetch_and_add(&config->nConsuming, 0) >= TEST_MAX_INPUT * (config->nProducer)) {
            break;
        }
    }

    pthread_barrier_wait(&barrier);
    queue_free(q, hds[_id]);
    return NULL;
}

int running_wfq_test(size_t arg_producer, size_t arg_consumer, size_t arg_producing, size_t arg_consuming, const size_t total_threads, const char * test_type) {

    size_t i = 0;
    struct timeval start_t, end_t;
    double diff_t;
    wfq_test_config_t config;

    assert ((total_threads >= (arg_producer + arg_consumer)) && "not enough thread to test");

    pthread_t testThreads[total_threads];


    config.nProducer = arg_producer;
    config.nProducing = arg_producing;
    config.nConsumer = arg_consumer;
    config.nConsuming = arg_consuming;
    config.q = align_malloc(PAGE_SIZE, sizeof(queue_t));
    queue_init(config.q, total_threads);
    config.hds = align_malloc(PAGE_SIZE, sizeof(handle_t * [total_threads]));


    char *testname = (char*)"Fixed size wfqueue test";

    gettimeofday(&start_t, NULL);
    for (i = 0; i < arg_producer ; i++) {
        pthread_create(testThreads + i, 0, producing_fn,  &config);
    }
    for (; i < total_threads ; i++) {
        pthread_create(testThreads + i, 0, consuming_fn,  &config);
    }

    while (__sync_fetch_and_add(&config.nConsuming, 0) < TEST_MAX_INPUT * (config.nProducer)) {
      struct timeval curr;
      gettimeofday(&curr, NULL);
      if ((curr.tv_usec - start_t.tv_usec) >= (120 * 1000 * 1000)) { // 2 minute
          assert(0 && " too long to consuming the queue ");
      }
    }

    for (i = 0; i < total_threads; i++) {
        void *ret;
        pthread_join(testThreads[i], &ret);
        // free(ret);
    }

    gettimeofday(&end_t, NULL);


    diff_t = (double)(end_t.tv_usec - start_t.tv_usec) / 1000000 + (double)(end_t.tv_sec - start_t.tv_sec);


    printf("===END Test= %d - %s, test type %s ===\n", ++TEST_COUNT, testname, test_type);
    printf("======Total consuming = %zu\n", __sync_fetch_and_add(&config.nConsuming, 0));
    // printf("======Left over = %zu\n", wfq_size(config.q));
    printf("Execution time = %f\n", diff_t);
    // assert(wfq_size(config.q) == 0 && " still left over queue inside ");

    // wfq_destroy(config.q);
    free(config.q);
    free(config.hds);
    sleep(1);
    return 0;
}

int main(void) {
    int ret = 0, i;

    unsigned int n = sysconf(_SC_NPROCESSORS_ONLN); // Linux / MAC OS
    pthread_barrier_init(&barrier, NULL, n);
    if (n > 1) {
        // int NUM_PRODUCER = n/2;
        // int NUM_CONSUMER = (n/2) - 1;
        // int running_set = 1;
        //
        // for (i = 0; i < running_set; i++) {
        //     ret = running_wfq_test(NUM_PRODUCER, NUM_CONSUMER, 0, 0, NUM_PRODUCER + NUM_CONSUMER, "MPMC");
        // }
        //
        // NUM_PRODUCER = n - 2;
        // NUM_CONSUMER = 1;
        // for (i = 0; i < running_set; i++) {
        //     ret = running_wfq_test(NUM_PRODUCER, NUM_CONSUMER, 0, 0, NUM_PRODUCER + NUM_CONSUMER, "MPSC");
        // }

        int NUM_PRODUCER = 1;
        int NUM_CONSUMER = n-1;
        int running_set = 1;
        for (i = 0; i < running_set; i++) {
            ret = running_wfq_test(NUM_PRODUCER, NUM_CONSUMER, 0, 0, NUM_PRODUCER + NUM_CONSUMER, "MCSP");
        }
    } else {
        ret = -1;
        printf("One thread is not enough for testing\n");
    }

    return ret;
}

@Taheta
Copy link
Author

Taheta commented Jun 26, 2019

I thought it could be gcc 4.8.5 issue, but tried my mac clang Apple LLVM version 10.0.0 (clang-1000.11.45.5)

Still having same issue.

@chaoran
Copy link
Owner

chaoran commented Jun 26, 2019

It may be a bug in the wfqueue implementation. I don't have time to debug it right now. But I will leave this issue open until I fix it.

@Pslydhh
Copy link
Contributor

Pslydhh commented Mar 12, 2020

Hi, I have make a #13, It looks like no longer stucks in my tests of the dropped code, or is there some things still not right?
Maybe you can test it again.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants