Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pdef and template quick fix and test file #31

Merged
merged 2 commits into from
Nov 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added .DS_Store
Binary file not shown.
Binary file added ctls/.DS_Store
Binary file not shown.
12 changes: 12 additions & 0 deletions ctls/test8_5.ctl
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
dset ^test8_%y4%m2%d2%h2.dat
options template yrev big_endian
undef -9.99e33
pdef 53 25 lcc 30.618 110.375 144.500 135.500 40.00000 25.00000 105.00000 5000.000 5000.000

xdef 100 LINEAR 200 2.5
ydef 200 LINEAR 15 2.5
zdef 1 LEVELS 1000
tdef 4 LINEAR 01JAN2013 6hr
vars 1
air 0 99 air temperature
endvars
12 changes: 12 additions & 0 deletions ctls/test8_6.ctl
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
dset ^test8_2013010100.dat
options yrev big_endian
undef -9.99e33
pdef 53 25 lcc 30.618 110.375 144.500 135.500 40.00000 25.00000 105.00000 5000.000 5000.000

xdef 100 LINEAR 200 2.5
ydef 200 LINEAR 15 2.5
zdef 1 LEVELS 1000
tdef 1 LINEAR 01JAN2013 6hr
vars 1
air 0 99 air temperature
endvars
109 changes: 56 additions & 53 deletions xgrads/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,18 @@ def open_mfdataset(paths, parallel=False, encoding='GBK'):
open_ = dask.delayed(open_CtlDataset)
else:
open_ = open_CtlDataset

datasets = [open_(p, encoding=encoding) for p in paths]

if parallel:
# calling compute here will return the datasets/file_objs lists,
# the underlying datasets will still be stored as dask arrays
datasets = dask.compute(datasets)

return xr.concat(datasets[0], dim='time')

combined = xr.concat(datasets, dim='time')

return combined


Expand All @@ -81,14 +81,14 @@ def open_CtlDataset(desfile, returnctl=False, encoding='GBK'):
Open a 4D dataset with a descriptor file end with .ctl and
return a xarray.Dataset. This also uses the dask to chunk
very large dataset, which is similar to the xarray.open_dataset.

Parameters
----------
desfile: string
Path to the descriptor file end with .ctl or .cts
returnctl: bool
Return dset and ctl as a tuple

Returns
-------
dset : xarray.Dataset
Expand All @@ -99,34 +99,34 @@ def open_CtlDataset(desfile, returnctl=False, encoding='GBK'):
if isinstance(desfile, str):
if not desfile.endswith('.ctl'):
raise Exception('unsupported file, suffix should be .ctl')

ctl = CtlDescriptor(encoding=encoding, file=desfile)
elif isinstance(desfile, CtlDescriptor):
ctl = desfile
else:
raise Exception('unsupported type of input ('+str(type(desfile))+'), ' +
'[CtlDescriptor or str] are allowed')

if ctl.template:
tcount = len(ctl.tdef.samples) # number of total time count
tcPerf = [] # number of time count per file

for file in ctl.dsetPath:
fsize = os.path.getsize(file)

if fsize % ctl.tRecLength != 0:
raise Exception('incomplete file for ' + file +
' (not multiple of ' + str(ctl.tRecLength) +
' bytes)')

tcPerf.append(fsize // ctl.tRecLength)

total_size = sum(tcPerf)

if total_size < tcount:
raise Exception('no enough files for ' + str(tcount) +
' time records')

# get time record number in each file
rem = tcount
idx = 0
Expand All @@ -138,25 +138,25 @@ def open_CtlDataset(desfile, returnctl=False, encoding='GBK'):

tcPerf_m = tcPerf[:idx+1]
tcPerf_m[idx] = tcPerf[idx ] + rem

# print(ctl.dsetPath)
# print(tcPerf)
# print(tcPerf_m)

binData = __read_template_as_dask(ctl, tcPerf_m)

else:
expect = ctl.tRecLength * ctl.tdef.length()
actual = os.path.getsize(ctl.dsetPath)

if expect != actual:
print('WARNING: expected binary file size: {0}, actual size: {1}'
.format(expect, actual))

binData = __read_as_dask(ctl)

variables = []

if ctl.pdef is None:
for m, v in enumerate(ctl.vdef):
if v.dependZ:
Expand All @@ -182,18 +182,18 @@ def open_CtlDataset(desfile, returnctl=False, encoding='GBK'):

else:
PDEF = ctl.pdef

if PDEF.proj in ['lcc', 'lccr']:
ycoord = np.linspace(0, (PDEF.jsize-1) * PDEF.dx, PDEF.jsize)
xcoord = np.linspace(0, (PDEF.isize-1) * PDEF.dy, PDEF.isize)

elif PDEF.proj in ['sps', 'nps']:
inc = PDEF.gridinc * 1000 # change unit from km to m
ycoord = np.linspace(-(PDEF.jpole), (PDEF.jsize-PDEF.jpole),
PDEF.jsize) * inc
xcoord = np.linspace(-(PDEF.ipole), (PDEF.isize-PDEF.ipole),
PDEF.isize) * inc

for m, v in enumerate(ctl.vdef):
if v.dependZ:
da = xr.DataArray(name=v.name, data=binData[m],
Expand Down Expand Up @@ -225,10 +225,10 @@ def open_CtlDataset(desfile, returnctl=False, encoding='GBK'):
dset.attrs['title'] = ctl.title
dset.attrs['undef'] = ctl.undef
dset.attrs['pdef' ] = 'None'

if ctl.pdef:
dset.attrs['pdef' ] = ctl.pdef.proj

if returnctl:
return dset, ctl
else:
Expand All @@ -250,7 +250,7 @@ def __read_as_dask(dd):

totalNum = sum([reduce(lambda x, y:
x*y, (t,v.zcount,y,x)) for v in dd.vdef])

if dd.sequential:
sequentialSize = x * y + 2
else:
Expand All @@ -259,17 +259,17 @@ def __read_as_dask(dd):
# print(totalNum * 4.0 / 1024.0 / 1024.0)

binData = []

dtype = '<f4' if dd.byteOrder == 'little' else '>f4'

for m, v in enumerate(dd.vdef):
name = '@miniufo_' + tokenize(v, m)

if totalNum < (100 * 100 * 100 * 10): # about 40 MB, chunk all
# print('small')
chunk = (t, v.zcount, y, x)
shape = (t, v.zcount, y, x)

dsk = {(name, 0, 0, 0, 0):
(__read_var, dd.dsetPath, v, dd.tRecLength,
None, None, dtype, sequentialSize)}
Expand All @@ -281,7 +281,7 @@ def __read_as_dask(dd):
# print('large')
chunk = (1, 1, y, x)
shape = (t, v.zcount, y, x)

dsk = {(name, l, k, 0, 0):
(__read_var, dd.dsetPath, v, dd.tRecLength,
l, k, dtype, sequentialSize)
Expand All @@ -295,7 +295,7 @@ def __read_as_dask(dd):
# print('between')
chunk = (1, v.zcount, y, x)
shape = (t, v.zcount, y, x)

dsk = {(name, l, 0, 0, 0):
(__read_var, dd.dsetPath, v, dd.tRecLength,
l, None, dtype, sequentialSize)
Expand All @@ -311,11 +311,14 @@ def __read_template_as_dask(dd, tcPerf):
"""
Read template binary data and return as a dask array
"""
t, y, x = dd.tdef.length(), dd.ydef.length(), dd.xdef.length()
if dd.pdef is None:
t, y, x = dd.tdef.length(), dd.ydef.length(), dd.xdef.length()
else:
t, y, x = dd.tdef.length(), dd.pdef.jsize, dd.pdef.isize

totalNum = sum([reduce(lambda x, y:
x*y, (tcPerf[0],v.zcount,y,x)) for v in dd.vdef])

if dd.sequential:
sequentialSize = x * y + 2
else:
Expand All @@ -324,17 +327,17 @@ def __read_template_as_dask(dd, tcPerf):
# print(totalNum * 4.0 / 1024.0 / 1024.0)

binData = []

dtype = '<f4' if dd.byteOrder == 'little' else '>f4'

for m, v in enumerate(dd.vdef):
name = '@miniufo_' + tokenize(v, m)

if totalNum > (200 * 100 * 100 * 100): # about 800 MB, chunk 2D slice
# print('large')
chunk = (1, 1, y, x)
shape = (t, v.zcount, y, x)

dsk = {(name, l + sum(tcPerf[:m]), k, 0, 0):
(__read_var, f, v, dd.tRecLength,
l, k, dtype, sequentialSize)
Expand All @@ -349,7 +352,7 @@ def __read_template_as_dask(dd, tcPerf):
# print('between')
chunk = (1, v.zcount, y, x)
shape = (t, v.zcount, y, x)

dsk = {(name, l + sum(tcPerf[:m]), 0, 0, 0):
(__read_var, f, v, dd.tRecLength,
l, None, dtype, sequentialSize)
Expand Down Expand Up @@ -394,7 +397,7 @@ def __read_var(file, var, tstride, tstep, zstep, dtype, sequentialSize=-1):
pos = var.strPos
return __read_continuous(file, pos, shape, dtype,
sequentialShape=seqShp)

elif zstep is None and tstep is not None:
shape = (1, var.zcount, var.ycount, var.xcount)
if sequentialSize != -1:
Expand All @@ -404,10 +407,10 @@ def __read_var(file, var, tstride, tstep, zstep, dtype, sequentialSize=-1):
pos = var.strPos
return __read_continuous(file, pos, shape, dtype,
sequentialShape=seqShp)

elif tstep is None and zstep is not None:
raise Exception('not implemented in -1,20')

else:
shape = (1, 1, var.ycount, var.xcount)
zstri = var.ycount * var.xcount * 4
Expand All @@ -419,7 +422,7 @@ def __read_var(file, var, tstride, tstep, zstep, dtype, sequentialSize=-1):
pos = var.strPos + zstri * zstep
return __read_continuous(file, pos, shape, dtype,
sequentialShape=seqShp)

elif var.storage in ['99', '0', '00', '000', '1', '11', '111']:
# elif var.storage == '99' or var.storage == '0':
if tstep is None and zstep is None:
Expand All @@ -430,14 +433,14 @@ def __read_var(file, var, tstride, tstep, zstep, dtype, sequentialSize=-1):
seqShp = shape
pos = var.strPos
data = []

for l in range(var.tcount):
data.append(__read_continuous(file, pos, shape, dtype,
sequentialShape=seqShp))
pos += tstride

return np.concatenate(data)

elif zstep is None and tstep is not None:
shape = (1, var.zcount, var.ycount, var.xcount)
if sequentialSize != -1:
Expand All @@ -447,12 +450,12 @@ def __read_var(file, var, tstride, tstep, zstep, dtype, sequentialSize=-1):
pos = var.strPos + tstride * tstep
data = __read_continuous(file, pos, shape, dtype,
sequentialShape=seqShp)

return data

elif tstep is None and zstep is not None:
raise Exception('not implemented in 0,99')

else:
shape = (1, 1, var.ycount, var.xcount)
zstri = var.ycount * var.xcount * 4
Expand All @@ -464,7 +467,7 @@ def __read_var(file, var, tstride, tstep, zstep, dtype, sequentialSize=-1):
pos = var.strPos + tstride * tstep + zstri * zstep
return __read_continuous(file, pos, shape, dtype,
sequentialShape=seqShp)

else:
raise Exception('invalid storage ' + var.storage +
', only "99" or "-1,20" are supported')
Expand Down Expand Up @@ -493,15 +496,15 @@ def __read_continuous(file, offset=0, shape=None, dtype='<f4',
shape=sequentialShape, order='C')
else:
number_of_values = reduce(lambda x, y: x*y, sequentialShape)

f.seek(offset)
data = np.fromfile(f, dtype=dtype, count=number_of_values)

if sequentialShape != shape:
data = data.reshape((shape[0],shape[1],-1))[:,:,1:-1]

data = data.reshape(shape, order='C')

data.shape = shape

return data