Skip to content

Commit

Permalink
improved the correctness of Cache_bgdl()
Browse files Browse the repository at this point in the history
  • Loading branch information
fangfufu committed Apr 25, 2019
1 parent f5aceba commit df025b1
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 48 deletions.
112 changes: 67 additions & 45 deletions src/cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -387,14 +387,19 @@ static Cache *Cache_alloc()
exit(EXIT_FAILURE);
}

if (pthread_mutex_init(&cf->rw_lock, NULL)) {
fprintf(stderr, "Cache_alloc(): rw_lock initialisation failed!\n");
if (pthread_mutexattr_init(&cf->rw_lock_attr)) {
fprintf(stderr,
"Cache_alloc(): rw_lock_attr initialisation failed!\n");
}

if (pthread_mutex_init(&cf->bgt_lock, NULL)) {
fprintf(stderr, "Cache_alloc(): seg_lock initialisation failed!\n");
if (pthread_mutexattr_setpshared(&cf->rw_lock_attr,
PTHREAD_PROCESS_SHARED)) {
fprintf(stderr, "Cache_alloc(): could not set rw_lock_attr!\n");
}

if (pthread_mutex_init(&cf->rw_lock, &cf->rw_lock_attr)) {
fprintf(stderr, "Cache_alloc(): rw_lock initialisation failed!\n");
}

if (pthread_mutexattr_init(&cf->bgt_lock_attr)) {
fprintf(stderr,
Expand Down Expand Up @@ -422,6 +427,10 @@ static void Cache_free(Cache *cf)
fprintf(stderr, "Cache_free(): could not destroy rw_lock!\n");
}

if (pthread_mutexattr_destroy(&cf->rw_lock_attr)) {
fprintf(stderr, "Cache_alloc(): could not destroy rw_lock_attr!\n");
}

if (pthread_mutex_destroy(&cf->bgt_lock)) {
fprintf(stderr, "Cache_free(): could not destroy bgt_lock!\n");
}
Expand Down Expand Up @@ -748,11 +757,12 @@ static void Seg_set(Cache *cf, off_t offset, int i)
* segment, we can spawn a pthread using this function to download the next
* segment.
*/
static void *Cache_bg_download(void *arg)
static void *Cache_bgdl(void *arg)
{
Cache *cf = (Cache *) arg;
pthread_mutex_lock(&cf->rw_lock);
uint8_t *recv_buf = calloc(DATA_BLK_SZ, sizeof(uint8_t));
fprintf(stderr, "Cache_bg_download(): ");
fprintf(stderr, "Cache_bgdl(): ");
long recv = path_download(cf->path, (char *) recv_buf, cf->blksz,
cf->next_offset);
if ( (recv == cf->blksz) ||
Expand All @@ -762,75 +772,87 @@ static void *Cache_bg_download(void *arg)
Seg_set(cf, cf->next_offset, 1);
} else {
fprintf(stderr,
"Cache_bg_download(): recv (%ld) < cf->blksz! \
"Cache_bgdl(): recv (%ld) < cf->blksz! \
Possible network error?\n",
recv);
}
free(recv_buf);
pthread_mutex_unlock(&cf->bgt_lock);
pthread_mutex_unlock(&cf->rw_lock);
pthread_exit(NULL);
}

long Cache_read(Cache *cf, char *output_buf, off_t len, off_t offset)
{
/*
* Quick fix for SIGFPE,
* this shouldn't happen in the first place!
*/
// size_t start = offset;
// size_t end = start + len;
// char range_str[64];
// snprintf(range_str, sizeof(range_str), "%lu-%lu", start, end);
// fprintf(stderr, "Cache_read(%s, %s);\n", cf->path, range_str);

/* SIGFPE prevention, although this shouldn't happen in the first place! */
if (!cf->blksz) {
fprintf(stderr,
"Cache_read(): Warning: cf->blksz: %d, directly downloading",
cf->blksz);
return path_download(cf->path, output_buf, len, offset);
}

pthread_mutex_lock(&cf->rw_lock);
long send;
/* Calculate the aligned offset */
off_t dl_offset = offset / cf->blksz * cf->blksz;

/* ------------------ Check if the segment already exists ---------------*/
if (Seg_exist(cf, offset)) {
/*
* The metadata shows the segment already exists. This part is easy,
* as you don't have to worry about alignment
*/
send = Data_read(cf, (uint8_t *) output_buf, len, offset);
goto bgdl;
} else {
/* Download the segment */
fprintf(stderr, "Cache_read(): ");
uint8_t *recv_buf = calloc(DATA_BLK_SZ, sizeof(uint8_t));
long recv = path_download(cf->path, (char *) recv_buf, cf->blksz,
dl_offset);
/*
* check if we have received enough data
* send it off, then write it to the disk
*
* Condition 1: received the exact amount as the segment size.
* Condition 2: offset is the last segment
*/
if ( (recv == cf->blksz) ||
(dl_offset == (cf->content_length / cf->blksz * cf->blksz)) )
{
memmove(output_buf, recv_buf + (offset - dl_offset), len);
send = len;
Data_write(cf, recv_buf, cf->blksz, dl_offset);
Seg_set(cf, dl_offset, 1);
} else {
memmove(output_buf, recv_buf + (offset - dl_offset), recv);
send = recv;
fprintf(stderr,
"Cache_read(): recv (%ld) < cf->blksz! Possible network error?\n",
recv);
/* Wait until the background thread finishes, then lock the I/O */
pthread_mutex_lock(&cf->rw_lock);
if (Seg_exist(cf, offset)) {
/* The segment already exists, send it off the unlock the I/O */
send = Data_read(cf, (uint8_t *) output_buf, len, offset);
pthread_mutex_unlock(&cf->rw_lock);
goto bgdl;
}
free(recv_buf);
}

/* ------------------------Download the segment -------------------------*/

uint8_t *recv_buf = calloc(DATA_BLK_SZ, sizeof(uint8_t));
fprintf(stderr, "Cache_read(): ");
long recv = path_download(cf->path, (char *) recv_buf, cf->blksz,
dl_offset);
/*
* check if we have received enough data
* send it off, then write it to the disk
*
* Condition 1: received the exact amount as the segment size.
* Condition 2: offset is the last segment
*/
if ( (recv == cf->blksz) ||
(dl_offset == (cf->content_length / cf->blksz * cf->blksz)) )
{
memmove(output_buf, recv_buf + (offset - dl_offset), len);
send = len;
Data_write(cf, recv_buf, cf->blksz, dl_offset);
Seg_set(cf, dl_offset, 1);
} else {
memmove(output_buf, recv_buf + (offset - dl_offset), recv);
send = recv;
fprintf(stderr,
"Cache_read(): recv (%ld) < cf->blksz! Possible network error?\n",
recv);
}
free(recv_buf);
pthread_mutex_unlock(&cf->rw_lock);

/* Download the next segment in background */
/* -----------Download the next segment in background -------------------*/
bgdl:
cf->next_offset = round_div(offset, cf->blksz) * cf->blksz;
if ( (cf->next_offset > dl_offset) && !Seg_exist(cf, cf->next_offset) ) {
/* Stop the spawning of multiple background pthreads */
if(!pthread_mutex_trylock(&cf->bgt_lock)) {
if (pthread_create(&cf->bgt, NULL, Cache_bg_download, cf)) {
if (pthread_create(&cf->bgt, NULL, Cache_bgdl, cf)) {
fprintf(stderr,
"Cache_read(): Error creating background download thread\n"
);
Expand Down
4 changes: 3 additions & 1 deletion src/cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ typedef struct {

pthread_t bgt; /**< background pthread */
pthread_mutex_t bgt_lock; /**< mutex for spawning a background thread */
pthread_mutexattr_t bgt_lock_attr;
pthread_mutexattr_t bgt_lock_attr; /**< attributes for bgt_lock */
off_t next_offset; /**<the offset of the next segment to be
downloaded in background*/

pthread_mutex_t rw_lock; /**< mutex for read/write operation */
pthread_mutexattr_t rw_lock_attr; /**< attributes for rw_lock */

FILE *dfp; /**< The FILE pointer for the data file*/
FILE *mfp; /**< The FILE pointer for the metadata */
int blksz; /**<the block size of the data file */
Expand Down
3 changes: 1 addition & 2 deletions src/link.c
Original file line number Diff line number Diff line change
Expand Up @@ -411,13 +411,12 @@ long path_download(const char *path, char *output_buf, size_t size,
size_t end = start + size;
char range_str[64];
snprintf(range_str, sizeof(range_str), "%lu-%lu", start, end);
fprintf(stderr, "path_download(%s, %s);\n", path, range_str);

MemoryStruct buf;
buf.size = 0;
buf.memory = NULL;

fprintf(stderr, "path_download(%s, %s);\n", path, range_str);

CURL *curl = Link_to_curl(link);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *)&buf);
curl_easy_setopt(curl, CURLOPT_RANGE, range_str);
Expand Down

0 comments on commit df025b1

Please sign in to comment.