Skip to content

Commit

Permalink
test_darray works
Browse files Browse the repository at this point in the history
  • Loading branch information
jedwards4b committed Mar 12, 2018
1 parent ea0f97c commit 3189787
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 81 deletions.
132 changes: 69 additions & 63 deletions src/clib/pio_darray_int.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@
#include <pio.h>
#include <pio_internal.h>

#if USE_VARD
#define USE_VARD_READ 1
#define USE_VARD_WRITE 1
#endif


/* 10MB default limit. */
extern PIO_Offset pio_buffer_size_limit;

Expand Down Expand Up @@ -100,16 +106,14 @@ int get_vard_mpidatatype(io_desc_t *iodesc, MPI_Offset gdim0, PIO_Offset unlimdi
{

int sa_ndims;
int sacount[fndims];
int sastart[fndims];
int gdims[fndims];
int dim_offset;
int mpierr;
MPI_Aint displacements[rrcnt];
int blocklengths[rrcnt];

if(gdim0)
sacount[0] = 1;
if(rrcnt == 0)
return PIO_NOERR;

for ( int rc=0; rc<rrcnt; rc++)
{
Expand Down Expand Up @@ -153,22 +157,29 @@ int get_vard_mpidatatype(io_desc_t *iodesc, MPI_Offset gdim0, PIO_Offset unlimdi

for( int rc=0; rc<rrcnt; rc++)
{
for (int i=dim_offset; i< fndims; i++)
{
sacount[i-dim_offset] = (int) countlist[rc][i];
sastart[i-dim_offset] = (int) startlist[rc][i];
}
int sacount[fndims];
int sastart[fndims];
if(gdim0 > 0)
{
unlimdimoffset = gdim0;
sastart[0] = max(0, frame);
displacements[rc]=0;
}
else
{
sacount[0] = 1;
displacements[rc] = unlimdimoffset * max(0, frame);
}
for (int i=dim_offset; i< fndims; i++)
{
sacount[i-dim_offset] = (int) countlist[rc][i];
sastart[i-dim_offset] = (int) startlist[rc][i];
}

#if PIO_ENABLE_LOGGING
for (int i=0; i< sa_ndims; i++)
LOG((3, "vard: sastart[%d]=%d sacount[%d]=%d gdims[%d]=%d %ld %ld",
i,sastart[i], i,sacount[i], i, gdims[i], startlist[rc][i], countlist[rc][i]));
LOG((3, "vard: sastart[%d]=%d sacount[%d]=%d gdims[%d]=%d %ld %ld displacement = %ld un %d",
i,sastart[i], i,sacount[i], i, gdims[i], startlist[rc][i], countlist[rc][i], displacements[rc], unlimdimoffset));
#endif
if((mpierr = MPI_Type_create_subarray(sa_ndims, gdims,
sacount, sastart,MPI_ORDER_C
Expand All @@ -190,7 +201,7 @@ int get_vard_mpidatatype(io_desc_t *iodesc, MPI_Offset gdim0, PIO_Offset unlimdi

if((mpierr = MPI_Type_commit(filetype)))
return check_mpi(NULL, mpierr, __FILE__, __LINE__);

return PIO_NOERR;
}

#endif
Expand Down Expand Up @@ -296,8 +307,9 @@ int write_darray_multi_par(file_desc_t *file, int nvars, int fndims, const int *
var_desc_t *vdesc; /* Pointer to var info struct. */
int dsize; /* Data size (for one region). */
int ierr = PIO_NOERR;
#if USE_VARD
#if USE_VARD_WRITE
PIO_Offset gdim0; /* global size of first dimension if no unlimited dimension and ndims<fndims */
gdim0 = 0;
#endif
/* Check inputs. */
pioassert(file && file->iosystem && varids && varids[0] >= 0 && varids[0] <= PIO_MAX_VARS &&
Expand Down Expand Up @@ -325,8 +337,9 @@ int write_darray_multi_par(file_desc_t *file, int nvars, int fndims, const int *
PIO_Offset llen = fill ? iodesc->holegridsize : iodesc->llen;
void *iobuf = fill ? vdesc->fillbuf : file->iobuf;

#if USE_VARD
ierr = get_gdim0(file, iodesc, varids[0], fndims, &gdim0);
#if USE_VARD_WRITE
if (!ios->async || !ios->ioproc)
ierr = get_gdim0(file, iodesc, varids[0], fndims, &gdim0);
#endif

/* If this is an IO task write the data. */
Expand All @@ -339,22 +352,6 @@ int write_darray_multi_par(file_desc_t *file, int nvars, int fndims, const int *
int ndims = iodesc->ndims;
PIO_Offset *startlist[num_regions]; /* Array of start arrays for ncmpi_iput_varn(). */
PIO_Offset *countlist[num_regions]; /* Array of count arrays for ncmpi_iput_varn(). */
#if USE_VARD
PIO_Offset unlimdimoffset;
int mpierr;
MPI_Datatype filetype;
MPI_Datatype subarray[num_regions];
filetype = MPI_DATATYPE_NULL;

if (file->iotype == PIO_IOTYPE_PNETCDF && gdim0 == 0) /* if there is an unlimited dimension get the offset between records of a variable */
{
#ifdef _PNETCDF
if((ierr = ncmpi_inq_recsize(file->fh, &unlimdimoffset)))
return pio_err(NULL, file, ierr, __FILE__, __LINE__);
#endif
}
#endif
LOG((3, "num_regions = %d", num_regions));

/* Process each region of data to be written. */
for (int regioncnt = 0; regioncnt < num_regions; regioncnt++)
Expand Down Expand Up @@ -423,17 +420,29 @@ int write_darray_multi_par(file_desc_t *file, int nvars, int fndims, const int *
/* Do this when we reach the last region. */
if (regioncnt == num_regions - 1)
{
#ifdef USE_VARD_WRITE
MPI_Datatype filetype = MPI_DATATYPE_NULL;
MPI_Datatype subarray[rrcnt];
#endif
/* For each variable to be written. */
for (int nv = 0; nv < nvars; nv++)
{
/* Get the var info. */
if ((ierr = get_var_desc(varids[nv], &file->varlist, &vdesc)))
return pio_err(NULL, file, ierr, __FILE__, __LINE__);

#if USE_VARD
#if USE_VARD_WRITE
/* If this is the first variable or the frame has changed between variables (this should be rare) */
if(nv==0 || (nv > 0 && frame != NULL && frame[nv] != frame[nv-1])){
int thisframe;
PIO_Offset unlimdimoffset;
if (gdim0 == 0) /* if there is an unlimited dimension get the offset between records of a variable */
{
if((ierr = ncmpi_inq_recsize(file->fh, &unlimdimoffset)))
return pio_err(NULL, file, ierr, __FILE__, __LINE__);
LOG((3, "num_regions = %d unlimdimoffset %ld", num_regions, unlimdimoffset));
}else
unlimdimoffset = gdim0;
if (frame)
thisframe = frame[nv];
else
Expand Down Expand Up @@ -461,10 +470,20 @@ int write_darray_multi_par(file_desc_t *file, int nvars, int fndims, const int *
vdesc->request[i] = NC_REQ_NULL;
}

#if USE_VARD
LOG((3, "vard: call ncmpi_put_vard llen = %d %d ", llen, iodesc->mpitype_size ));
#if USE_VARD_WRITE
LOG((3, "vard: call ncmpi_put_vard llen = %d %d", llen, iodesc->mpitype_size ));
ierr = ncmpi_put_vard_all(file->fh, varids[nv], filetype, bufptr, llen, iodesc->mpitype);
LOG((3, "vard: return ncmpi_put_vard ierr = %d", ierr));
if(filetype != MPI_DATATYPE_NULL)
{
int mpierr;
for(int i=0; i<rrcnt; i++)
if (subarray[i] != MPI_DATATYPE_NULL)
if((mpierr = MPI_Type_free(subarray+i)))
return check_mpi(NULL, mpierr, __FILE__, __LINE__);
if((mpierr = MPI_Type_free(&filetype)))
return check_mpi(NULL, mpierr, __FILE__, __LINE__);
}
#else
/* Write, in non-blocking fashion, a list of subarrays. */
LOG((3, "about to call ncmpi_iput_varn() varids[%d] = %d rrcnt = %d, llen = %d",
Expand Down Expand Up @@ -497,18 +516,6 @@ int write_darray_multi_par(file_desc_t *file, int nvars, int fndims, const int *
if (region)
region = region->next;
} /* next regioncnt */
#if USE_VARD
if(filetype != MPI_DATATYPE_NULL)
{
int mpierr;
for(int i=0; i<rrcnt; i++)
if (subarray[i] != MPI_DATATYPE_NULL)
if((mpierr = MPI_Type_free(subarray+i)))
return check_mpi(NULL, mpierr, __FILE__, __LINE__);
if((mpierr = MPI_Type_free(&filetype)))
return check_mpi(NULL, mpierr, __FILE__, __LINE__);
}
#endif
} /* endif (ios->ioproc) */

/* Check the return code from the netCDF/pnetcdf call. */
Expand Down Expand Up @@ -946,6 +953,7 @@ int pio_read_darray_nc(file_desc_t *file, io_desc_t *iodesc, int vid, void *iobu
int ierr; /* Return code from netCDF functions. */
#ifdef USE_VARD_READ
MPI_Offset gdim0;
gdim0 = 0;
#endif

/* Check inputs. */
Expand All @@ -971,7 +979,8 @@ int pio_read_darray_nc(file_desc_t *file, io_desc_t *iodesc, int vid, void *iobu
if ((ierr = PIOc_inq_varndims(file->pio_ncid, vid, &fndims)))
return pio_err(ios, file, ierr, __FILE__, __LINE__);
#if USE_VARD_READ
ierr = get_gdim0(file, iodesc, vid, fndims, &gdim0);
if(!ios->async || !ios->ioproc)
ierr = get_gdim0(file, iodesc, vid, fndims, &gdim0);
#endif

/* IO procs will read the data. */
Expand All @@ -986,20 +995,6 @@ int pio_read_darray_nc(file_desc_t *file, io_desc_t *iodesc, int vid, void *iobu
PIO_Offset *startlist[iodesc->maxregions];
PIO_Offset *countlist[iodesc->maxregions];

#if USE_VARD_READ
PIO_Offset unlimdimoffset;
int mpierr;
MPI_Datatype filetype;
MPI_Datatype subarray[iodesc->maxregions];
filetype = MPI_DATATYPE_NULL;

if (gdim0 == 0) /* if there is an unlimited dimension get the offset between records of a variable */
{
if((ierr = ncmpi_inq_recsize(file->fh, &unlimdimoffset)))
return pio_err(NULL, file, ierr, __FILE__, __LINE__);
}
#endif

/* buffer is incremented by byte and loffset is in terms of
the iodessc->mpitype so we need to multiply by the size of
the mpitype. */
Expand Down Expand Up @@ -1146,6 +1141,16 @@ int pio_read_darray_nc(file_desc_t *file, io_desc_t *iodesc, int vid, void *iobu
{
#if USE_VARD_READ
MPI_Datatype filetype, subarray[rrlen];
PIO_Offset unlimdimoffset;
int mpierr;
if (gdim0 == 0) /* if there is an unlimited dimension get the offset between records of a variable */
{
if((ierr = ncmpi_inq_recsize(file->fh, &unlimdimoffset)))
return pio_err(NULL, file, ierr, __FILE__, __LINE__);
}
else
unlimdimoffset = gdim0;

filetype = MPI_DATATYPE_NULL;
for(int i=0; i<rrlen; i++)
subarray[i] = MPI_DATATYPE_NULL;
Expand Down Expand Up @@ -1528,7 +1533,7 @@ int flush_output_buffer(file_desc_t *file, bool force, PIO_Offset addsize)

/* Check inputs. */
pioassert(file, "invalid input", __FILE__, __LINE__);

LOG((1, "flush_output_buffer"));
/* Find out the buffer usage. */
if ((ierr = ncmpi_inq_buffer_usage(file->fh, &usage)))
/* allow the buffer to be undefined */
Expand All @@ -1549,9 +1554,10 @@ int flush_output_buffer(file_desc_t *file, bool force, PIO_Offset addsize)
if (usage > maxusage)
maxusage = usage;

LOG((2, "flush_output_buffer usage=%ld force=%d",usage, force));
/* If the user forces it, or the buffer has exceeded the size
* limit, then flush to disk. */
if (force || usage >= pio_buffer_size_limit)
if (force || (usage >= pio_buffer_size_limit))
{
int rcnt;
int maxreq;
Expand Down
12 changes: 8 additions & 4 deletions src/clib/pio_getput_int.c
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ int PIOc_get_vars_tc(int ncid, int varid, const PIO_Offset *start, const PIO_Off
* num_elem will remain 1). */
for (int vd = 0; vd < ndims; vd++)
num_elem *= count[vd];
LOG((2, "PIOc_get_vars_tc num_elem = %d", num_elem));
LOG((1, "PIOc_get_vars_tc num_elem = %d", num_elem));
}

/* If async is in use, and this is not an IO task, bcast the parameters. */
Expand Down Expand Up @@ -583,7 +583,7 @@ int PIOc_get_vars_tc(int ncid, int varid, const PIO_Offset *start, const PIO_Off
mpierr = MPI_Bcast(&num_elem, 1, MPI_OFFSET, ios->compmaster, ios->intercomm);
if (!mpierr)
mpierr = MPI_Bcast(&typelen, 1, MPI_OFFSET, ios->compmaster, ios->intercomm);
LOG((2, "PIOc_get_vars_tc ncid = %d varid = %d ndims = %d start_present = %d "
LOG((1, "PIOc_get_vars_tc ncid = %d varid = %d ndims = %d start_present = %d "
"count_present = %d stride_present = %d xtype = %d num_elem = %d", ncid, varid,
ndims, start_present, count_present, stride_present, xtype, num_elem));
}
Expand Down Expand Up @@ -937,13 +937,17 @@ int PIOc_put_vars_tc(int ncid, int varid, const PIO_Offset *start, const PIO_Off
return check_netcdf(file, ierr, __FILE__, __LINE__);
}

LOG((2, "ndims = %d typelen = %d", ndims, typelen));

/* How many elements of data? If no count array was passed,
* this is a scalar. */
if (count)
for (int vd = 0; vd < ndims; vd++)
num_elem *= count[vd];

LOG((1, "ndims = %d typelen = %d num_elem %d", ndims, typelen, num_elem));
if (count)
for (int vd = 0; vd < ndims; vd++)
LOG((1, "count[%d] = %d",vd,count[vd]));

}

/* If async is in use, and this is not an IO task, bcast the parameters. */
Expand Down
8 changes: 4 additions & 4 deletions src/clib/pioc.c
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ int PIOc_advanceframe(int ncid, int varid)
int mpierr = MPI_SUCCESS, mpierr2; /* Return code from MPI function codes. */
int ret;

LOG((1, "PIOc_advanceframe ncid = %d varid = %d"));
LOG((1, "PIOc_advanceframe ncid = %d varid = %d", ncid, varid));

/* Get the file info. */
if ((ret = pio_get_file(ncid, &file)))
Expand Down Expand Up @@ -877,7 +877,7 @@ int PIOc_Init_Intracomm(MPI_Comm comp_comm, int num_iotasks, int stride, int bas
/* Create a group for the IO tasks. */
if ((mpierr = MPI_Group_incl(compgroup, ios->num_iotasks, ios->ioranks,
&iogroup)))
return check_mpi2(ios, NULL, mpierr, __FILE__, __LINE__);
return check_mpi2(ios, NULL, mpierr, __FILE__, __LINE__);

/* Create an MPI communicator for the IO tasks. */
if ((mpierr = MPI_Comm_create(ios->comp_comm, iogroup, &ios->io_comm)))
Expand Down Expand Up @@ -1287,7 +1287,7 @@ int PIOc_init_async(MPI_Comm world, int num_io_procs, int *io_proc_list,
/* Determine which tasks to use for each computational component. */
if ((ret = determine_procs(num_io_procs, component_count, num_procs_per_comp,
proc_list, my_proc_list)))
return pio_err(NULL, NULL, ret, __FILE__, __LINE__);
return pio_err(NULL, NULL, ret, __FILE__, __LINE__);

/* Get rank of this task in world. */
if ((ret = MPI_Comm_rank(world, &my_rank)))
Expand Down Expand Up @@ -1519,7 +1519,7 @@ int PIOc_init_async(MPI_Comm world, int num_io_procs, int *io_proc_list,
if ((ret = MPI_Comm_rank(my_iosys->union_comm, &my_iosys->union_rank)))
return check_mpi(NULL, ret, __FILE__, __LINE__);
LOG((3, "my_iosys->union_rank %d", my_iosys->union_rank));

/* Set my_comm to union_comm for async. */
my_iosys->my_comm = my_iosys->union_comm;
LOG((3, "intracomm created for union cmp = %d union_rank = %d union_comm = %d",
Expand Down
6 changes: 6 additions & 0 deletions src/clib/pioc_support.c
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,12 @@ void pio_log(int severity, const char *fmt, ...)
ptr += strlen(rank_str);
rem_len -= strlen(rank_str);

/* Show the severity. */
snprintf(rank_str, MAX_RANK_STR, ":%d ", severity);
strncpy(ptr, rank_str, (rem_len > 0) ? rem_len : 0);
ptr += strlen(rank_str);
rem_len -= strlen(rank_str);

/* Print out the variable list of args with vprintf. */
va_start(argp, fmt);
vsnprintf(ptr, ((rem_len > 0) ? rem_len : 0), fmt, argp);
Expand Down
2 changes: 1 addition & 1 deletion tests/cunit/test_darray.c
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ int main(int argc, char **argv)

/* Initialize test. */
if ((ret = pio_test_init2(argc, argv, &my_rank, &ntasks, MIN_NTASKS,
MIN_NTASKS, -1, &test_comm)))
MIN_NTASKS, 3, &test_comm)))
ERR(ERR_INIT);

if ((ret = PIOc_set_iosystem_error_handling(PIO_DEFAULT, PIO_RETURN_ERROR, NULL)))
Expand Down
Loading

0 comments on commit 3189787

Please sign in to comment.