Skip to content

Commit

Permalink
finish minibatch, parameter is still hardcoded, debug function need f…
Browse files Browse the repository at this point in the history
…ix, f_long is not copied back
  • Loading branch information
xenshinu committed Oct 27, 2023
1 parent 5598718 commit 5e1abe8
Show file tree
Hide file tree
Showing 6 changed files with 330 additions and 71 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ verf
trace
ncu
nsys
profile_output*
*_output*
workloads
.cmake/**
.depend
172 changes: 121 additions & 51 deletions gpu/plchain.cu
Original file line number Diff line number Diff line change
Expand Up @@ -200,13 +200,14 @@ void plchain_cal_score_launch(chain_read_t **reads_, int *n_read_, Misc misc, st
int stream_id = plchain_schedule_stream(stream_setup, batchid);
if (stream_setup.streams[stream_id].busy) {
#ifdef DEBUG_PRINT
fprintf(stderr, "[Info] %s (%s:%d) stream %d sync, total_n %lu\n", __func__, __FILE__, __LINE__, stream_id, stream_setup.streams[stream_id].host_mem.total_n);
fprintf(stderr, "[Info] %s (%s:%d) stream %d sync, total_n %lu\n", __func__, __FILE__, __LINE__, stream_id, stream_setup.streams[stream_id].host_mems[0].total_n);
#endif // DEBUG_PRINT
// cleanup previous batch in the stream
plchain_backtracking(&stream_setup.streams[stream_id].host_mem,
plchain_backtracking(&stream_setup.streams[stream_id].host_mems[0],
stream_setup.streams[stream_id].reads, misc, km);

*reads_ = stream_setup.streams[stream_id].reads;
*n_read_ = stream_setup.streams[stream_id].host_mem.size;
*n_read_ = stream_setup.streams[stream_id].host_mems[0].size;
stream_setup.streams[stream_id].busy = false;
}

Expand Down Expand Up @@ -236,7 +237,7 @@ void plchain_cal_score_launch(chain_read_t **reads_, int *n_read_, Misc misc, st
assert(stream_setup.max_num_cut >= cut_num);

plmem_reorg_input_arr(reads, n_read,
&stream_setup.streams[stream_id].host_mem,
&stream_setup.streams[stream_id].host_mems[0],
range_kernel_config);

plmem_async_h2d_memcpy(&stream_setup.streams[stream_id]);
Expand Down Expand Up @@ -416,11 +417,14 @@ void plchain_cal_sc_pair_density(size_t total_n, size_t num_cut, deviceMemPtr* d
#ifdef DEBUG_CHECK

void plchain_debug_analysis(stream_ptr_t stream){
size_t total_n = stream.host_mem.total_n;
// TODO: analysis multiple or current host mems
// TODO: this needs to be recalculated
size_t uid = 0;
size_t total_n = stream.host_mems[uid].total_n;
chain_read_t* reads = stream.reads;
deviceMemPtr* dev_mem = &stream.dev_mem;
hostMemPtr* host_mem = &stream.host_mem;
size_t cut_num = stream.host_mem.cut_num;
hostMemPtr* host_mem = &stream.host_mems[uid];
size_t cut_num = stream.host_mems[uid].cut_num;

unsigned int num_mid_seg, num_long_seg;
cudaMemcpy(&num_mid_seg, dev_mem->d_mid_seg_count, sizeof(unsigned int),
Expand Down Expand Up @@ -466,7 +470,7 @@ void plchain_debug_analysis(stream_ptr_t stream){
#if defined(DEBUG_VERBOSE) && 0
int32_t* ax = (int32_t*) malloc(sizeof(int32_t) * dev_mem->buffer_size_long);
cudaMemcpy(ax, dev_mem->d_ax_long, sizeof(int32_t) * dev_mem->buffer_size_long, cudaMemcpyDeviceToHost);
debug_print_segs(host_mem->long_segs, reads, host_mem->long_segs_num, stream.host_mem.size);
debug_print_segs(host_mem->long_segs, reads, host_mem->long_segs_num, stream.host_mems[uid].size);
debug_check_anchors(host_mem->long_segs, host_mem->long_segs_num, ax, host_mem->ax);
#endif

Expand All @@ -493,74 +497,114 @@ void plchain_cal_score_async(chain_read_t **reads_, int *n_read_, Misc misc, str
chain_read_t* reads = *reads_;
*reads_ = NULL;
int n_read = *n_read_;
fprintf(stderr, "[Debug] %s (%s:%d) n_read %d\n", __func__, __FILE__, __LINE__, n_read);
*n_read_ = 0;
/* sync stream and process previous batch */
int stream_id = thread_id;
if (stream_setup.streams[stream_id].busy) {
cudaStreamSynchronize(stream_setup.streams[stream_id].cudastream);

#ifdef DEBUG_PRINT
float milliseconds = 0;
cudaEventElapsedTime(&milliseconds, stream_setup.streams[stream_id].startevent, stream_setup.streams[stream_id].cudaevent);
fprintf(stderr, "[Info] %s (%s:%d) last launch runtime: %f ms\n", __func__, __FILE__, __LINE__, milliseconds);
float milliseconds = 0;
cudaEventElapsedTime(&milliseconds, stream_setup.streams[stream_id].startevent, stream_setup.streams[stream_id].cudaevent);
fprintf(stderr, "[Info] %s (%s:%d) last launch runtime: %f ms\n", __func__, __FILE__, __LINE__, milliseconds);
#endif

// DEBUG: debug analysis that involves sychronizating the whole device
#if defined(DEBUG_CHECK)
plchain_debug_analysis(stream_setup.streams[stream_id]);
#endif // DEBUG_VERBOSE
// reset values
cudaMemsetAsync(stream_setup.streams[stream_id].dev_mem.d_long_seg_count, 0, sizeof(unsigned int),
stream_setup.streams[stream_id].cudastream);
cudaMemsetAsync(stream_setup.streams[stream_id].dev_mem.d_mid_seg_count, 0, sizeof(unsigned int),
stream_setup.streams[stream_id].cudastream);
cudaMemsetAsync(stream_setup.streams[stream_id].dev_mem.d_total_n_long, 0, sizeof(size_t),
stream_setup.streams[stream_id].cudastream);
seg_t* long_segs = stream_setup.streams[stream_id].long_mem.long_segs;
size_t long_seg_idx = 0;
for (int uid = 0; uid < MICRO_BATCH; uid++) {
// regorg long to each host mem ptr
// NOTE: this is the number of long segs till this microbatch
size_t long_segs_num = stream_setup.streams[stream_id].host_mems[uid].long_segs_num;
for (; long_seg_idx < long_segs_num; long_seg_idx++) {
// TODO: write long_segs + long_seg_idx to f/p
}

// backtrack after p/f is copied
plchain_backtracking(&stream_setup.streams[stream_id].host_mems[uid],
stream_setup.streams[stream_id].reads + *n_read_, misc, km);
// accumulate n_reads
*n_read_ += stream_setup.streams[stream_id].host_mems[uid].size;
}

// cleanup previous batch in the stream
plchain_backtracking(&stream_setup.streams[stream_id].host_mem,
stream_setup.streams[stream_id].reads, misc, km);
*reads_ = stream_setup.streams[stream_id].reads;
*n_read_ = stream_setup.streams[stream_id].host_mem.size;
fprintf(stderr, "[Debug] %s finish (%s:%d) n_read %d\n", __func__, __FILE__, __LINE__, *n_read_);
stream_setup.streams[stream_id].busy = false;
}

cudaEventRecord(stream_setup.streams[stream_id].startevent,
stream_setup.streams[stream_id].cudastream);
// size sanity check
size_t total_n = 0, cut_num = 0;
int griddim = 0;
size_t total_n = 0;
for (int i = 0; i < n_read; i++) {
total_n += reads[i].n;
int an_p_block = range_kernel_config.anchor_per_block;
int an_p_cut = range_kernel_config.blockdim;
int block_num = (reads[i].n - 1) / an_p_block + 1;
griddim += block_num;
cut_num += (reads[i].n - 1) / an_p_cut + 1;
}
if (stream_setup.max_anchors_stream < total_n){
fprintf(stderr, "max_anchors_stream %lu total_n %lu n_read %d\n",
stream_setup.max_anchors_stream, total_n, n_read);
}
} // compute total_n first

if (stream_setup.max_range_grid < griddim) {
fprintf(stderr, "max_range_grid %d griddim %d, total_n %lu n_read %d\n",
stream_setup.max_range_grid, griddim, total_n, n_read);
}
stream_setup.streams[stream_id].reads = reads;
int read_start = 0;
for (int uid = 0; uid < MICRO_BATCH; uid++) {
// decide the size of micro batch
size_t batch_n = 0;
int read_end = 0;
size_t cut_num = 0;
int griddim = 0;
for (read_end = read_start; read_end < n_read; read_end++) {
if (batch_n > (total_n - 1) / MICRO_BATCH + 1) {
break;
}
batch_n += reads[read_end].n;
int an_p_block = range_kernel_config.anchor_per_block;
int an_p_cut = range_kernel_config.blockdim;
int block_num = (reads[read_end].n - 1) / an_p_block + 1;
griddim += block_num;
cut_num += (reads[read_end].n - 1) / an_p_cut + 1;
}
fprintf(stderr, "[Debug] %s (%s:%d) batch_n %lu, read_start %d, read_end %d\n", __func__, __FILE__, __LINE__, batch_n, read_start, read_end);
// sanity check
if (stream_setup.max_anchors_stream < total_n){
fprintf(stderr, "max_anchors_stream %lu total_n %lu n_read %d\n",
stream_setup.max_anchors_stream, total_n, n_read);
}

assert(stream_setup.max_anchors_stream >= total_n);
assert(stream_setup.max_range_grid >= griddim);
assert(stream_setup.max_num_cut >= cut_num);
if (stream_setup.max_range_grid < griddim) {
fprintf(stderr, "max_range_grid %d griddim %d, total_n %lu n_read %d\n",
stream_setup.max_range_grid, griddim, total_n, n_read);
}

plmem_reorg_input_arr(reads, n_read,
&stream_setup.streams[stream_id].host_mem,
assert(stream_setup.max_anchors_stream >= total_n);
assert(stream_setup.max_range_grid >= griddim);
assert(stream_setup.max_num_cut >= cut_num);
// work on micro batch
plmem_reorg_input_arr(reads + read_start, read_end - read_start,
&stream_setup.streams[stream_id].host_mems[uid],
range_kernel_config);

plmem_async_h2d_memcpy(&stream_setup.streams[stream_id]);
plrange_async_range_selection(&stream_setup.streams[stream_id].dev_mem,
&stream_setup.streams[stream_id].cudastream);
// plscore_async_naive_forward_dp(&stream_setup.streams[stream_id].dev_mem,
// &stream_setup.streams[stream_id].cudastream);
plscore_async_long_short_forward_dp(&stream_setup.streams[stream_id].dev_mem,
plmem_async_h2d_short_memcpy(&stream_setup.streams[stream_id], uid);
plrange_async_range_selection(&stream_setup.streams[stream_id].dev_mem,
&stream_setup.streams[stream_id].cudastream);
plscore_async_short_mid_forward_dp(&stream_setup.streams[stream_id].dev_mem,
&stream_setup.streams[stream_id].cudastream);
plmem_async_d2h_short_memcpy(&stream_setup.streams[stream_id], uid);
// update index
read_start = read_end;
}

plscore_async_long_forward_dp(&stream_setup.streams[stream_id].dev_mem,
&stream_setup.streams[stream_id].cudastream);
plmem_async_d2h_memcpy(&stream_setup.streams[stream_id]);
plmem_async_d2h_long_memcpy(&stream_setup.streams[stream_id]);
cudaEventRecord(stream_setup.streams[stream_id].cudaevent,
stream_setup.streams[stream_id].cudastream);
stream_setup.streams[stream_id].busy = true;
stream_setup.streams[stream_id].reads = reads;
cudaCheck();
}

Expand Down Expand Up @@ -710,7 +754,7 @@ void finish_stream_gpu(const mm_idx_t *mi, const mm_mapopt_t *opt, chain_read_t*
}

chain_read_t* reads;
int n_read;
int n_read = 0;
cudaStreamSynchronize(stream_setup.streams[t].cudastream);
cudaCheck();

Expand All @@ -725,15 +769,39 @@ void finish_stream_gpu(const mm_idx_t *mi, const mm_mapopt_t *opt, chain_read_t*
plchain_debug_analysis(stream_setup.streams[t]);
#endif // DEBUG_CHECK

plchain_backtracking(&stream_setup.streams[t].host_mem,
stream_setup.streams[t].reads, misc, km);
// TODO: backtrack multiple pending batches
// reset values
cudaMemsetAsync(stream_setup.streams[t].dev_mem.d_long_seg_count, 0, sizeof(unsigned int),
stream_setup.streams[t].cudastream);
cudaMemsetAsync(stream_setup.streams[t].dev_mem.d_mid_seg_count, 0, sizeof(unsigned int),
stream_setup.streams[t].cudastream);
cudaMemsetAsync(stream_setup.streams[t].dev_mem.d_total_n_long, 0, sizeof(size_t),
stream_setup.streams[t].cudastream);
seg_t* long_segs = stream_setup.streams[t].long_mem.long_segs;
size_t long_seg_idx = 0;
for (int uid = 0; uid < MICRO_BATCH; uid++) {
// regorg long to each host mem ptr
// NOTE: this is the number of long segs till this microbatch
size_t long_segs_num = stream_setup.streams[t].host_mems[uid].long_segs_num;
for (; long_seg_idx < long_segs_num; long_seg_idx++) {
// TODO: write long_segs + long_seg_idx to f/p
}

// backtrack after p/f is copied
plchain_backtracking(&stream_setup.streams[t].host_mems[uid],
stream_setup.streams[t].reads + *n_read_, misc, km);
// accumulate n_reads
n_read += stream_setup.streams[t].host_mems[uid].size;
}
reads = stream_setup.streams[t].reads;
n_read = stream_setup.streams[t].host_mem.size;
fprintf(stderr, "[Debug] %s finish (%s:%d) n_read %d\n", __func__, __FILE__, __LINE__, n_read);
stream_setup.streams[t].busy = false;

for (int i = 0; i < n_read; i++) {
post_chaining_helper(mi, opt, &reads[i], misc, km);
}
stream_setup.streams[t].busy = false;

// FXIME: return an array of reads
*reads_ = reads;
*n_read_ = n_read;

Expand All @@ -742,7 +810,9 @@ void finish_stream_gpu(const mm_idx_t *mi, const mm_mapopt_t *opt, chain_read_t*

void free_stream_gpu(int n_threads){
for (int t = 0; t < n_threads; t++){
plmem_free_host_mem(&stream_setup.streams[t].host_mem);
for (int i = 0; i < MICRO_BATCH; i++){
plmem_free_host_mem(&stream_setup.streams[t].host_mems[i]);
}
plmem_free_device_mem(&stream_setup.streams[t].dev_mem);
}
#ifdef DEBUG_PRINT
Expand Down
Loading

0 comments on commit 5e1abe8

Please sign in to comment.