Skip to content

Commit

Permalink
Merge pull request ESMCI#1285 from jedwards4b/add_pnetcdf_vard2
Browse files Browse the repository at this point in the history
Add pnetcdf vard2 write only
  • Loading branch information
jedwards4b authored Mar 9, 2018
2 parents f28f2f3 + 04b00aa commit 4836393
Show file tree
Hide file tree
Showing 10 changed files with 212 additions and 36 deletions.
8 changes: 8 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,16 @@ option (PIO_TEST_BIG_ENDIAN "Enable test to see if machine is big endian" ON)
option (PIO_USE_MPIIO "Enable support for MPI-IO auto detect" ON)
option (PIO_USE_MPISERIAL "Enable mpi-serial support (instead of MPI)" OFF)
option (PIO_USE_MALLOC "Use native malloc (instead of bget package)" OFF)
option (PIO_USE_PNETCDF_VARD "Use pnetcdf put_vard " OFF)
option (WITH_PNETCDF "Require the use of PnetCDF" ON)

# Set a variable that appears in the config.h.in file.
if(PIO_USE_PNETCDF_VARD)
set(USE_VARD 1)
else()
set(USE_VARD 0)
endif()

# Set a variable that appears in the config.h.in file.
if(PIO_USE_MALLOC)
set(USE_MALLOC 1)
Expand Down
4 changes: 3 additions & 1 deletion cmake_config.h.in
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/** @file
/** @file
*
* This is the template for the config.h file, which is created at
* build-time by cmake.
Expand Down Expand Up @@ -28,4 +28,6 @@
/* buffer size for darray data. */
#define PIO_BUFFER_SIZE @PIO_BUFFER_SIZE@

#define USE_VARD @USE_VARD@

#endif /* _PIO_CONFIG_ */
12 changes: 6 additions & 6 deletions examples/c/darray_no_async.c
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ int check_file(int iosysid, int ntasks, char *filename, int iotype,
for (int d = 0; d < NDIM3; d++)
{
char my_dim_name[NC_MAX_NAME];
PIO_Offset dimlen;
PIO_Offset dimlen;

if ((ret = PIOc_inq_dim(ncid, d, my_dim_name, &dimlen)))
return ret;
if (dimlen != (d ? dim_len[d] : NUM_TIMESTEPS) || strcmp(my_dim_name, dim_name[d]))
Expand All @@ -148,7 +148,7 @@ int check_file(int iosysid, int ntasks, char *filename, int iotype,
for (int t = 0; t < NUM_TIMESTEPS; t++)
{
int varid = 0; /* There's only one var in sample file. */

/* This is the data we expect for this timestep. */
for (int i = 0; i < elements_per_pe; i++)
buffer[i] = 100 * t + START_DATA_VAL + my_rank;
Expand Down Expand Up @@ -208,7 +208,7 @@ netcdf darray_no_async_iotype_1 {
int ioproc_stride = 1; /* Stride in the mpi rank between io tasks. */
int ioproc_start = 0; /* Rank of first task to be used for I/O. */
PIO_Offset elements_per_pe; /* Array elements per processing unit. */
int iosysid; /* The ID for the parallel I/O system. */
int iosysid; /* The ID for the parallel I/O system. */
int ncid; /* The ncid of the netCDF file. */
int dimid[NDIM3]; /* The dimension ID. */
int varid; /* The ID of the netCDF varable. */
Expand Down Expand Up @@ -245,7 +245,7 @@ netcdf darray_no_async_iotype_1 {
/* Turn on logging. */
if ((ret = PIOc_set_log_level(LOG_LEVEL)))
return ret;

/* Initialize the PIO IO system. This specifies how many and
* which processors are involved in I/O. */
if ((ret = PIOc_Init_Intracomm(MPI_COMM_WORLD, 1, ioproc_stride,
Expand Down Expand Up @@ -310,7 +310,7 @@ netcdf darray_no_async_iotype_1 {
/* Create some data for this timestep. */
for (int i = 0; i < elements_per_pe; i++)
buffer[i] = 100 * t + START_DATA_VAL + my_rank;

/* Write data to the file. */
printf("rank: %d Writing sample data...\n", my_rank);
if ((ret = PIOc_setframe(ncid, varid, t)))
Expand Down
4 changes: 2 additions & 2 deletions src/clib/pio.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ typedef struct var_desc_t
/** ID of each outstanding pnetcdf request for this variable. */
int *request;

/** Number of requests bending with pnetcdf. */
/** Number of requests pending with pnetcdf. */
int nreqs;

/* Holds the fill value of this var. */
Expand Down Expand Up @@ -278,7 +278,7 @@ typedef struct io_desc_t
* dimensions. */
int ndims;

/** An array of size ndims with the length of each dimension. */
/** An array of size ndims with the global length of each dimension. */
int *dimlen;

/** The actual number of IO tasks participating. */
Expand Down
182 changes: 169 additions & 13 deletions src/clib/pio_darray_int.c
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ int find_start_count(int ndims, int fndims, var_desc_t *vdesc,
}
else if (fndims == ndims)
{
/* ??? */
/* In some cases the unlimited dim is not treated as
the pio record dim */
start[0] += vdesc->record;
}
}
Expand Down Expand Up @@ -165,7 +166,9 @@ int write_darray_multi_par(file_desc_t *file, int nvars, int fndims, const int *
var_desc_t *vdesc; /* Pointer to var info struct. */
int dsize; /* Data size (for one region). */
int ierr = PIO_NOERR;

#if USE_VARD
PIO_Offset gdim0; /* global size of first dimension if no unlimited dimension and ndims<fndims */
#endif
/* Check inputs. */
pioassert(file && file->iosystem && varids && varids[0] >= 0 && varids[0] <= PIO_MAX_VARS &&
iodesc, "invalid input", __FILE__, __LINE__);
Expand All @@ -192,6 +195,29 @@ int write_darray_multi_par(file_desc_t *file, int nvars, int fndims, const int *
PIO_Offset llen = fill ? iodesc->holegridsize : iodesc->llen;
void *iobuf = fill ? vdesc->fillbuf : file->iobuf;

#if USE_VARD
if(file->iotype == PIO_IOTYPE_PNETCDF && iodesc->ndims < fndims)
{
int numunlimdims;
gdim0 = 0;
/* We need to confirm the file has an unlimited dimension and if it doesn't we need to find
the extent of the first variable dimension */
LOG((3,"look for numunlimdims"));
if ((ierr = PIOc_inq_unlimdims(file->pio_ncid, &numunlimdims, NULL)))
return check_netcdf(file, ierr, __FILE__, __LINE__);
LOG((3,"numunlimdims = %d", numunlimdims));
if (numunlimdims <= 0)
{
int dimids[fndims];
if ((ierr = PIOc_inq_vardimid(file->pio_ncid, varids[0], dimids)))
return check_netcdf(file, ierr, __FILE__, __LINE__);
if ((ierr = PIOc_inq_dimlen(file->pio_ncid, dimids[0], &gdim0)))
return check_netcdf(file, ierr, __FILE__, __LINE__);
}
}
LOG((3,"gdim0 = %d",gdim0));
#endif

/* If this is an IO task write the data. */
if (ios->ioproc)
{
Expand All @@ -202,7 +228,19 @@ int write_darray_multi_par(file_desc_t *file, int nvars, int fndims, const int *
int ndims = iodesc->ndims;
PIO_Offset *startlist[num_regions]; /* Array of start arrays for ncmpi_iput_varn(). */
PIO_Offset *countlist[num_regions]; /* Array of count arrays for ncmpi_iput_varn(). */

#if USE_VARD
PIO_Offset unlimdimoffset;
int mpierr;
MPI_Datatype filetype;
MPI_Datatype subarray[num_regions];
filetype = MPI_DATATYPE_NULL;

if (gdim0 == 0) /* if there is an unlimited dimension get the offset between records of a variable */
{
if((ierr = ncmpi_inq_recsize(file->fh, &unlimdimoffset)))
return pio_err(NULL, file, ierr, __FILE__, __LINE__);
}
#endif
LOG((3, "num_regions = %d", num_regions));

/* Process each region of data to be written. */
Expand Down Expand Up @@ -247,7 +285,6 @@ int write_darray_multi_par(file_desc_t *file, int nvars, int fndims, const int *
for (int i = 0; i < fndims; i++)
dsize *= count[i];
LOG((3, "dsize = %d", dsize));

/* For pnetcdf's ncmpi_iput_varn() function, we need
* to provide arrays of arrays for start/count. */
if (dsize > 0)
Expand Down Expand Up @@ -280,12 +317,113 @@ int write_darray_multi_par(file_desc_t *file, int nvars, int fndims, const int *
if ((ierr = get_var_desc(varids[nv], &file->varlist, &vdesc)))
return pio_err(NULL, file, ierr, __FILE__, __LINE__);

/* If this is a record var, set the start for
* the record dimension. */
#if USE_VARD
/* If this is the first variable or the frame has changed between variables (this should be rare) */
if(nv==0 || (nv > 0 && frame != NULL && frame[nv] != frame[nv-1])){
int sa_ndims;
int sacount[fndims];
int sastart[fndims];
int gdims[fndims];
int dim_offset;
int mpierr;
MPI_Aint displacements[rrcnt];
int blocklengths[rrcnt];

if(gdim0)
sacount[0] = 1;

for ( int rc=0; rc<rrcnt; rc++)
{
displacements[rc] = 0;
blocklengths[rc] = 1;
subarray[rc] = MPI_DATATYPE_NULL;
}
if(filetype != MPI_DATATYPE_NULL)
{
for( int rc=0; rc<rrcnt; rc++)
if ((mpierr = MPI_Type_free(subarray + rc)))
return check_mpi(NULL, mpierr, __FILE__, __LINE__);
if ((mpierr = MPI_Type_free(&filetype)))
return check_mpi(NULL, mpierr, __FILE__, __LINE__);
}
if(fndims > ndims)
{
if ( gdim0 > 0)
{
gdims[0] = gdim0;
sa_ndims = fndims;
dim_offset = 0;
for (int i=1; i < fndims; i++)
gdims[i] = iodesc->dimlen[i-1];
}
else
{
sa_ndims = ndims;
dim_offset = 1;
for (int i=0; i < ndims; i++)
gdims[i] = iodesc->dimlen[i];
}
}
else
{
sa_ndims = fndims;
dim_offset = 0;
for (int i=0; i < fndims; i++)
gdims[i] = iodesc->dimlen[i];
}

for( int rc=0; rc<rrcnt; rc++)
{
for (int i=dim_offset; i< fndims; i++)
{
sacount[i-dim_offset] = (int) countlist[rc][i];
sastart[i-dim_offset] = (int) startlist[rc][i];
}
if(gdim0 > 0)
{
if(frame != NULL)
{
sastart[0] = frame[nv];
displacements[rc]=0;
}
}
else
if (frame != NULL)
displacements[rc] = unlimdimoffset * frame[nv];
else
displacements[rc] = 0;

for (int i=0; i< sa_ndims; i++)
LOG((3, "vard: sastart[%d]=%d sacount[%d]=%d gdims[%d]=%d %ld %ld",
i,sastart[i], i,sacount[i], i, gdims[i], startlist[rc][i], countlist[rc][i]));

if((mpierr = MPI_Type_create_subarray(sa_ndims, gdims,
sacount, sastart,MPI_ORDER_C
,iodesc->mpitype, subarray + rc)))
return check_mpi(NULL, mpierr, __FILE__, __LINE__);

if((mpierr = MPI_Type_commit(subarray + rc)))
return check_mpi(NULL, mpierr, __FILE__, __LINE__);


LOG((3,"vard: blocklengths[%d]=%d displacement[%d]=%ld unlimdimoffset=%ld",rc,blocklengths[rc], rc, displacements[rc], unlimdimoffset));



}

if((mpierr = MPI_Type_create_struct(rrcnt, blocklengths, displacements, subarray, &filetype)))
return check_mpi(NULL, mpierr, __FILE__, __LINE__);

if((mpierr = MPI_Type_commit(&filetype)))
return check_mpi(NULL, mpierr, __FILE__, __LINE__);

}
#else
if (vdesc->record >= 0 && ndims < fndims)
for (int rc = 0; rc < rrcnt; rc++)
startlist[rc][0] = frame[nv];

#endif
/* Get a pointer to the data. */
bufptr = (void *)((char *)iobuf + nv * iodesc->mpitype_size * llen);

Expand All @@ -299,6 +437,11 @@ int write_darray_multi_par(file_desc_t *file, int nvars, int fndims, const int *
vdesc->request[i] = NC_REQ_NULL;
}

#if USE_VARD
LOG((3, "vard: call ncmpi_put_vard llen = %d %d ", llen, iodesc->mpitype_size ));
ierr = ncmpi_put_vard_all(file->fh, varids[nv], filetype, bufptr, llen, iodesc->mpitype);
LOG((3, "vard: return ncmpi_put_vard ierr = %d", ierr));
#else
/* Write, in non-blocking fashion, a list of subarrays. */
LOG((3, "about to call ncmpi_iput_varn() varids[%d] = %d rrcnt = %d, llen = %d",
nv, varids[nv], rrcnt, llen));
Expand All @@ -310,7 +453,8 @@ int write_darray_multi_par(file_desc_t *file, int nvars, int fndims, const int *
vdesc->request[vdesc->nreqs] = PIO_REQ_NULL;

vdesc->nreqs++;
}
#endif
}

/* Free resources. */
for (int i = 0; i < rrcnt; i++)
Expand All @@ -329,6 +473,18 @@ int write_darray_multi_par(file_desc_t *file, int nvars, int fndims, const int *
if (region)
region = region->next;
} /* next regioncnt */
#if USE_VARD
if(filetype != MPI_DATATYPE_NULL)
{
int mpierr;
for(int i=0; i<rrcnt; i++)
if (subarray[i] != MPI_DATATYPE_NULL)
if((mpierr = MPI_Type_free(subarray+i)))
return check_mpi(NULL, mpierr, __FILE__, __LINE__);
if((mpierr = MPI_Type_free(&filetype)))
return check_mpi(NULL, mpierr, __FILE__, __LINE__);
}
#endif
} /* endif (ios->ioproc) */

/* Check the return code from the netCDF/pnetcdf call. */
Expand Down Expand Up @@ -1062,10 +1218,10 @@ int pio_read_darray_nc_serial(file_desc_t *file, io_desc_t *iodesc, int vid,

/* If setframe was not set before this call, assume a value of
* 0. This is required for backward compatibility. */
if (fndims > ndims)
if (vdesc->record < 0)
if (fndims > ndims)
if (vdesc->record < 0)
vdesc->record = 0;

/* Put together start/count arrays for all regions. */
for (int regioncnt = 0; regioncnt < iodesc->maxregions; regioncnt++)
{
Expand Down Expand Up @@ -1318,7 +1474,7 @@ int flush_output_buffer(file_desc_t *file, bool force, PIO_Offset addsize)

/* If we are not forcing a flush, spread the usage to all IO
* tasks. */
if (!force && file->iosystem->io_comm != MPI_COMM_NULL)
if (!force && file->iosystem->ioproc)
{
usage += addsize;
if ((mpierr = MPI_Allreduce(MPI_IN_PLACE, &usage, 1, MPI_OFFSET, MPI_MAX,
Expand Down Expand Up @@ -1368,6 +1524,7 @@ int flush_output_buffer(file_desc_t *file, bool force, PIO_Offset addsize)
#endif
for (reqcnt = 0; reqcnt < vdesc->nreqs; reqcnt++)
request[rcnt++] = max(vdesc->request[reqcnt], NC_REQ_NULL);
LOG((3,"flush_output_buffer rcnt=%d",rcnt));

if (vdesc->request != NULL)
free(vdesc->request);
Expand Down Expand Up @@ -1540,4 +1697,3 @@ int flush_buffer(int ncid, wmulti_buffer *wmb, bool flushtodisk)

return PIO_NOERR;
}

1 change: 0 additions & 1 deletion src/clib/pio_file.c
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,6 @@ int PIOc_sync(int ncid)
#ifdef _PNETCDF
case PIO_IOTYPE_PNETCDF:
flush_output_buffer(file, true, 0);
ierr = ncmpi_sync(file->fh);
break;
#endif
default:
Expand Down
2 changes: 1 addition & 1 deletion src/clib/pioc_support.c
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ int check_netcdf2(iosystem_desc_t *ios, file_desc_t *file, int status,
LOG((2, "check_netcdf2 chose error handler = %d", eh));

/* Decide what to do based on the error handler. */
if (eh == PIO_INTERNAL_ERROR)
if (eh == PIO_INTERNAL_ERROR && status != PIO_NOERR)
{
char errmsg[PIO_MAX_NAME + 1]; /* Error message. */
PIOc_strerror(status, errmsg);
Expand Down
Loading

0 comments on commit 4836393

Please sign in to comment.