Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds_from_ms memory bloat with frequency chunks #92

Closed
JSKenyon opened this issue Feb 28, 2020 · 9 comments
Closed

xds_from_ms memory bloat with frequency chunks #92

JSKenyon opened this issue Feb 28, 2020 · 9 comments
Labels

Comments

@JSKenyon
Copy link
Collaborator

  • dask-ms version: 0.2.3
  • Python version: 3.6.9
  • Operating System: Pop!_OS 18.04 LTS

Using xds_from_ms with frequency chunks uses substantially more memory than it should. I used the following example to expose this behaviour. To start with I established a baseline by running the below as is, with no frequency chunks:

import numpy as np
import dask.array as da
from daskms import xds_from_ms


xdss = xds_from_ms("C147_unflagged.MS/",
                   columns=("DATA",),
                   index_cols=("TIME",),
                   group_cols=("SCAN_NUMBER",
                               "FIELD_ID",
                               "DATA_DESC_ID"),
                   chunks={"row": 100000000})

results = [da.map_blocks(np.sum, xds.DATA.data, keepdims=True) for xds in xdss]

da.compute(results, scheduler="single-threaded")

Running with \usr\bin\time -v this produced:

Maximum resident set size (kbytes): 235368

I then replaced chunks in the above example with {"row": 100000000, "chan": 32}. This produced:

Maximum resident set size (kbytes): 2927936

This is ~3GB relative to the ~230MB used previously. Finally, I changed the experiment to use manual rechunking as follows:

import numpy as np
import dask.array as da
from daskms import xds_from_ms


xdss = xds_from_ms("C147_unflagged.MS/",
                   columns=("DATA",),
                   index_cols=("TIME",),
                   group_cols=("SCAN_NUMBER",
                               "FIELD_ID",
                               "DATA_DESC_ID"),
                   chunks={"row": 100000000})

# Doing this in place is bad but simplifies the example. :-)
for xds in xdss:
    xds.DATA.data = xds.DATA.data.rechunk({1: 32})

results = [da.map_blocks(np.sum, xds.DATA.data, keepdims=True) for xds in xdss]

da.compute(results, scheduler="single-threaded")

This produced:

Maximum resident set size (kbytes): 234144

which is what I would expect/is consistent with the time only case.

To summarise, I believe that there is a bug somewhere in the frequency chunking that causes more to be read into memory than is necessary.

@sjperkins sjperkins added the bug label Mar 24, 2020
@sjperkins
Copy link
Member

I've produced a modified script. One thing that's significantly different from yours is that the default number of rows is smaller (100,000).

import argparse
import numpy as np
import dask.array as da
from dask.diagnostics import ResourceProfiler, visualize
from daskms import xds_from_ms

def create_parser():
    p = argparse.ArgumentParser()
    p.add_argument("ms")
    p.add_argument("-r", "--row", type=int, default=100000)
    p.add_argument("-c", "--chan", type=int, default=-1)
    return p


def script():
    args = create_parser().parse_args()

    xdss = xds_from_ms(args.ms,
                    columns=("DATA",),
                    index_cols=("TIME",),
                    group_cols=("SCAN_NUMBER",
                                "FIELD_ID",
                                "DATA_DESC_ID"),
                    chunks={"row": args.row, "chan": args.chan})

    results = [da.map_blocks(np.sum, xds.DATA.data, keepdims=True)
               for xds in xdss]
    
    with ResourceProfiler() as p:
        da.compute(results, scheduler="single-threaded")

    visualize(p, file_path="out.html", show=False, save=True)

if __name__ == "__main__":
    script()

For a the full channel resolution (4k) measurement set I see the following for the time command:

$ /usr/bin/time -v python test_bloat.py ~/data/1527016443_sdp_l0.full_1284.full_pol.ms
        Command being timed: "python test_bloat.py /home/sperkins/data/1527016443_sdp_l0.full_1284.full_pol.ms"
        User time (seconds): 7.34
        System time (seconds): 15.56
        Percent of CPU this job got: 17%
        Elapsed (wall clock) time (h:mm:ss or m:ss): 2:13.04
        Average shared text size (kbytes): 0
        Average unshared data size (kbytes): 0
        Average stack size (kbytes): 0
        Average total size (kbytes): 0
        Maximum resident set size (kbytes): 3260140
        Average resident set size (kbytes): 0
        Major (requiring I/O) page faults: 202
        Minor (reclaiming a frame) page faults: 652485
        Voluntary context switches: 39653
        Involuntary context switches: 39968
        Swaps: 0
        File system inputs: 10225488
        File system outputs: 32
        Socket messages sent: 0
        Socket messages received: 0
        Signals delivered: 0
        Page size (bytes): 4096
        Exit status: 0

and the following resource profile:

4k-channels

Chunking by 32 channels at a time produces the following output for time:

$ /usr/bin/time -v python test_bloat.py ~/data/1527016443_sdp_l0.full_1284.full_pol.ms -c 32
        Command being timed: "python test_bloat.py /home/sperkins/data/1527016443_sdp_l0.full_1284.full_pol.ms -c 32"
        User time (seconds): 79.22
        System time (seconds): 143.54
        Percent of CPU this job got: 72%
        Elapsed (wall clock) time (h:mm:ss or m:ss): 5:06.82
        Average shared text size (kbytes): 0
        Average unshared data size (kbytes): 0
        Average stack size (kbytes): 0
        Average total size (kbytes): 0
        Maximum resident set size (kbytes): 144112
        Average resident set size (kbytes): 0
        Major (requiring I/O) page faults: 219
        Minor (reclaiming a frame) page faults: 39838
        Voluntary context switches: 43899
        Involuntary context switches: 1107
        Swaps: 0
        File system inputs: 8850088
        File system outputs: 48
        Socket messages sent: 0
        Socket messages received: 0
        Signals delivered: 0
        Page size (bytes): 4096
        Exit status: 0

and the following resource profile:

32-channels

@sjperkins
Copy link
Member

I reduced my number of rows to 100,000 to avoid blowing up my laptop memory on a full 4k MS, but to me it seems like its basically doing the right thing.

Any ideas on what might be causing the discrepancy? Could you re-run my script with your MS?

@JSKenyon
Copy link
Collaborator Author

I am still seeing the same bloat on all my 3C147 measurement sets, @sjperkins. I have checked using dask-ms==0.2.3, dask==2.9.1 (prior to ordering changes) and dask==2.14.0. The problem persists in all cases (checked with your script). In the example you ran, how many xarray datasets were there?

@sjperkins
Copy link
Member

@JSKenyon I can reproduce the issue with my script and your MS.

@sjperkins
Copy link
Member

I've spent a fair amount of time inspecting this with guppy

test_bloat.py
import argparse
import numpy as np
import dask
import dask.array as da
from dask.diagnostics import ResourceProfiler, visualize
from daskms import xds_from_ms

from guppy import hpy
import gc
import pickle
from pprint import pprint
import sys

def create_parser():
    p = argparse.ArgumentParser()
    p.add_argument("ms")
    p.add_argument("-r", "--row", type=int, default=1000000000)
    p.add_argument("-c", "--chan", type=int, default=-1)
    return p


hp = hpy()

def script():
    args = create_parser().parse_args()

    xdss = xds_from_ms(args.ms,
                    columns=("DATA",),
                    index_cols=("TIME",),
                    group_cols=("SCAN_NUMBER",
                                "FIELD_ID",
                                "DATA_DESC_ID"),
                    chunks={"row": args.row, "chan": args.chan})

    results = [[[xds.DATA.data.map_blocks(np.sum, keepdims=True, chunks=(1,1,1))]]
               for xds in xdss]

    results_heap = None

    results = da.block(results).sum()

    def heapy(a, block_info=None):
        nonlocal results_heap

        if block_info is None:
            return a

        heap = results_heap = hp.heap()

        print("block", block_info[0]['chunk-location'], heap)

        found = False

        for i in range(len(heap)):
            if heap[i].kind.arg == np.ndarray:
                found = True
                break

        if found:
            ndarrays = heap[i]

            ids = ndarrays.byid
            # Extract arrays for further inspection
            arrays = [ids[j].theone for j in range(len(ids))]

    results = results.map_blocks(heapy, dtype=results.dtype)
    initial_heap = hp.heap()

    dask.visualize(results, filename="out.pdf")

    with ResourceProfiler() as p:
        results = da.compute(results, scheduler="single-threaded")[0]

    visualize(p, file_path="out.html", show=False, save=True, optimize_graph=False)

    assert results_heap is not None
    diff = results_heap - initial_heap
    print("diff", diff)

if __name__ == "__main__":
    script()

    import gc
    gc.collect()

    print("final", hp.heap())

No chunking

See below:

  1. heap within the heapy call
  2. heap diff between (1) and initial heap
  3. final heap
  4. /usr/bin/time -v output showing 258448kb (0.26GB) memory usage
  5. ResourceProfiler memory plot
$ /usr/bin/time -v python test_bloat.py ~/data/for_simon/C147_unflagged.MS/ 
block () Partition of a set of 500337 objects. Total size = 73930495 bytes.
 Index  Count   %     Size   % Cumulative  % Kind (class / dict of class)
     0 132500  26 18979397  26  18979397  26 str
     1 148310  30 11291800  15  30271197  41 tuple
     2    233   0 11020694  15  41291891  56 numpy.ndarray
     3  66813  13  5109029   7  46400920  63 bytes
     4  33977   7  4921256   7  51322176  69 types.CodeType
     5  31728   6  4315008   6  55637184  75 function
     6   3404   1  3309576   4  58946760  80 type
     7   7260   1  2692672   4  61639432  83 dict (no owner)
     8   1375   0  2082240   3  63721672  86 dict of module
     9   3395   1  1837288   2  65558960  89 dict of type
<963 more rows. Type e.g. '_.more' to view.>
diff Partition of a set of 117892 objects. Total size = 25190687 bytes.
 Index  Count   %     Size   % Cumulative  % Kind (class / dict of class)
     0     80   0 11004000  44  11004000  44 numpy.ndarray
     1  33230  28  4056309  16  15060309  60 str
     2  35722  30  2676400  11  17736709  70 tuple
     3   7590   6  1099048   4  18835757  75 types.CodeType
     4  14883  13  1090340   4  19926097  79 bytes
     5   6916   6   940576   4  20866673  83 function
     6    840   1   873864   3  21740537  86 type
     7   1558   1   613672   2  22354209  89 dict (no owner)
     8    404   0   465352   2  22819561  91 dict of module
     9   1335   1   401952   2  23221513  92 set
<261 more rows. Type e.g. '_.more' to view.>
final Partition of a set of 564955 objects. Total size = 71034269 bytes.
 Index  Count   %     Size   % Cumulative  % Kind (class / dict of class)
     0 149655  26 21218575  30  21218575  30 str
     1 159346  28 12140448  17  33359023  47 tuple
     2  72060  13  5471325   8  38830348  55 bytes
     3  36894   7  5342856   8  44173204  62 types.CodeType
     4  34545   6  4698120   7  48871324  69 function
     5   3774   1  3708112   5  52579436  74 type
     6   7030   1  2689432   4  55268868  78 dict (no owner)
     7   1549   0  2329208   3  57598076  81 dict of module
     8   3765   1  2049216   3  59647292  84 dict of type
     9   3905   1  1431520   2  61078812  86 set
<1221 more rows. Type e.g. '_.more' to view.>
        Command being timed: "python test_bloat.py /home/sperkins/data/for_simon/C147_unflagged.MS/"
        User time (seconds): 6.52
        System time (seconds): 1.26
        Percent of CPU this job got: 100%
        Elapsed (wall clock) time (h:mm:ss or m:ss): 0:07.78
        Average shared text size (kbytes): 0
        Average unshared data size (kbytes): 0
        Average stack size (kbytes): 0
        Average total size (kbytes): 0
        Maximum resident set size (kbytes): 258448
        Average resident set size (kbytes): 0
        Major (requiring I/O) page faults: 0
        Minor (reclaiming a frame) page faults: 99224
        Voluntary context switches: 505
        Involuntary context switches: 245
        Swaps: 0
        File system inputs: 0
        File system outputs: 112
        Socket messages sent: 0
        Socket messages received: 0
        Signals delivered: 0
        Page size (bytes): 4096
        Exit status: 0

bokeh_plot

32 channel chunking

See below:

  1. heap within the heapy call
  2. heap diff between (1) and initial heap
  3. final heap
  4. /usr/bin/time -v output showing 2963544kb (2.9GB) memory usage
  5. ResourceProfiler memory plot
$ /usr/bin/time -v python test_bloat.py ~/data/for_simon/C147_unflagged.MS/ -c 32
block () Partition of a set of 501813 objects. Total size = 74106542 bytes.
 Index  Count   %     Size   % Cumulative  % Kind (class / dict of class)
     0 132701  26 18997188  26  18997188  26 str
     1 149266  30 11366624  15  30363812  41 tuple
     2    233   0 11020694  15  41384506  56 numpy.ndarray
     3  66813  13  5109029   7  46493535  63 bytes
     4  33977   7  4921256   7  51414791  69 types.CodeType
     5  31728   6  4315008   6  55729799  75 function
     6   3404   1  3309576   4  59039375  80 type
     7   7260   1  2720368   4  61759743  83 dict (no owner)
     8   1375   0  2082240   3  63841983  86 dict of module
     9   3395   1  1837288   2  65679271  89 dict of type
<963 more rows. Type e.g. '_.more' to view.>
diff Partition of a set of 118453 objects. Total size = 25288647 bytes.
 Index  Count   %     Size   % Cumulative  % Kind (class / dict of class)
     0     80   0 11004000  44  11004000  44 numpy.ndarray
     1  33270  28  4060945  16  15064945  60 str
     2  36082  30  2702640  11  17767585  70 tuple
     3   7590   6  1099048   4  18866633  75 types.CodeType
     4  14883  13  1090340   4  19956973  79 bytes
     5   6916   6   940576   4  20897549  83 function
     6    840   1   873864   3  21771413  86 type
     7   1558   1   640272   3  22411685  89 dict (no owner)
     8    404   0   465352   2  22877037  90 dict of module
     9   1495   1   437792   2  23314829  92 set
<261 more rows. Type e.g. '_.more' to view.>
final Partition of a set of 564960 objects. Total size = 71040216 bytes.
 Index  Count   %     Size   % Cumulative  % Kind (class / dict of class)
     0 149656  26 21223570  30  21223570  30 str
     1 159348  28 12141192  17  33364762  47 tuple
     2  72060  13  5471325   8  38836087  55 bytes
     3  36894   7  5342856   8  44178943  62 types.CodeType
     4  34545   6  4698120   7  48877063  69 function
     5   3774   1  3708112   5  52585175  74 type
     6   7030   1  2689432   4  55274607  78 dict (no owner)
     7   1549   0  2329208   3  57603815  81 dict of module
     8   3765   1  2049216   3  59653031  84 dict of type
     9   3905   1  1431520   2  61084551  86 set
<1221 more rows. Type e.g. '_.more' to view.>
        Command being timed: "python test_bloat.py /home/sperkins/data/for_simon/C147_unflagged.MS/ -c 32"
        User time (seconds): 7.18
        System time (seconds): 2.21
        Percent of CPU this job got: 104%
        Elapsed (wall clock) time (h:mm:ss or m:ss): 0:08.98
        Average shared text size (kbytes): 0
        Average unshared data size (kbytes): 0
        Average stack size (kbytes): 0
        Average total size (kbytes): 0
        Maximum resident set size (kbytes): 2963544
        Average resident set size (kbytes): 0
        Major (requiring I/O) page faults: 0
        Minor (reclaiming a frame) page faults: 813320
        Voluntary context switches: 739
        Involuntary context switches: 184
        Swaps: 0
        File system inputs: 0
        File system outputs: 160
        Socket messages sent: 0
        Socket messages received: 0
        Signals delivered: 0
        Page size (bytes): 4096
        Exit status: 0

bokeh_plot(1)

Analysis

Both /usr/bin/time and the dask ResourceProfiler see the unnecessary growth in memory in the 32 channel chunking case (the use of getcolslice data accessors internally)

However, guppy does not and is reporting a ~73MB heap in both cases of which ~11MB is taken up by numpy arrays. Inspection of these arrays shows that most of the space is taken by the cached row id arrays introduced in #75. This is a deliberate tradeoff as the ROW id's are generally small relative to the rest of the data and solves dask parallelism issues in #75.

We therefore have a situation where we see memory growth in the process, but we don't see commensurate growth in the memory footprint of objects on the Python heap. This might point to issues in the C++ layer, perhaps internal casacore caching when using getcolslice functionality, as opposed to getcol. @o-smirnov, @tammojan and @gervandiepen: does any of this ring a bell?

In any case, I suspect the next step is to try and isolate the problem in the casacore layer.

@JSKenyon
Copy link
Collaborator Author

After discussion with @sjperkins, I followed up and attempted to reproduce this problem outside of dask-ms, using pyrap.tables directly. My first test was as follows:

import pyrap.tables as pt
import numpy as np

ms = pt.table("C147_unflagged.MS")

ms_nrow = ms.nrows()

sel_nrow = 10000

n_chan = 64
n_chan_per_slice = 32

results = []

for start_row in range(0, ms_nrow, sel_nrow):
    for start_chan in range(0, n_chan, n_chan_per_slice):

        nrow = sel_nrow if (ms_nrow - start_row) > sel_nrow \
                        else (ms_nrow - start_row)

        sel = ms.getcolslice("MODEL_DATA",
                             blc=(start_chan, 0),
                             trc=(start_chan + n_chan_per_slice - 1, 3),
                             startrow=start_row,
                             nrow=nrow)

        results.append(np.sum(sel))

This produced the following output from \usr\bin\time:

Maximum resident set size (kbytes): 2992436

This is unexpected - the above example reads a block of 10000 rows and 32 channels and sums them into a single number. This is done serially, so at no point should there be more than a single array in memory and yet this example uses 3GB of memory. I believe the culprit is getcolslice.

I wanted to verify that this behaviour is also apparent in getcolslicenp. My second test was as follows:

import pyrap.tables as pt
import numpy as np

ms = pt.table("C147_unflagged.MS")

ms_nrow = ms.nrows()

sel_nrow = 10000

n_chan = 64
n_chan_per_slice = 32

results = []

for start_row in range(0, ms_nrow, sel_nrow):
    for start_chan in range(0, n_chan, n_chan_per_slice):

        nrow = sel_nrow if (ms_nrow - start_row) > sel_nrow \
                        else (ms_nrow - start_row)

        sel = np.empty((nrow, n_chan_per_slice, 4), dtype=np.complex64)

        ms.getcolslicenp("MODEL_DATA",
                         sel,
                         blc=(start_chan, 0),
                         trc=(start_chan + n_chan_per_slice - 1, 3),
                         startrow=start_row,
                         nrow=nrow)

        results.append(np.sum(sel))

This produced the following output from usr/bin/time:

Maximum resident set size (kbytes): 2815340

The behaviour is similar and equally worrying.

Finally, as a sanity check, I performed a similar experiment using getcol and foregoing slicing the frequency axis. This test was as follows:

import pyrap.tables as pt
import numpy as np

ms = pt.table("C147_unflagged.MS")

ms_nrow = ms.nrows()

sel_nrow = 10000

results = []

for start_row in range(0, ms_nrow, sel_nrow):

    sel = ms.getcol("MODEL_DATA",
                    startrow=start_row,
                    nrow=sel_nrow)

    results.append(np.sum(sel))

This produced the following output from \usr\bin\time:

Maximum resident set size (kbytes): 175656

This is roughly 175MB, more than an order of magnitude smaller than the getcolslice cases, despite the individual selections being twice the size due to the absence of slicing along frequency. This is, in my opinion, a pretty major problem.

@sjperkins
Copy link
Member

I've raised this in casacore as casacore/casacore#1018.

Thanks for the reproducer, @JSKenyon

@sjperkins
Copy link
Member

#107 now produces the following behaviour:

$ /usr/bin/time -v python test_bloat.py ~/data/for_simon/C147_unflagged.MS/ -c 32
block () Partition of a set of 501796 objects. Total size = 74100731 bytes.
 Index  Count   %     Size   % Cumulative  % Kind (class / dict of class)
     0 132723  26 18994749  26  18994749  26 str
     1 149228  30 11363656  15  30358405  41 tuple
     2    233   0 11020694  15  41379099  56 numpy.ndarray
     3  66811  13  5108593   7  46487692  63 bytes
     4  33976   7  4921112   7  51408804  69 types.CodeType
     5  31729   6  4315144   6  55723948  75 function
     6   3404   1  3309576   4  59033524  80 type
     7   7260   1  2720368   4  61753892  83 dict (no owner)
     8   1375   0  2082240   3  63836132  86 dict of module
     9   3395   1  1837288   2  65673420  89 dict of type
<963 more rows. Type e.g. '_.more' to view.>
diff Partition of a set of 118453 objects. Total size = 25288647 bytes.
 Index  Count   %     Size   % Cumulative  % Kind (class / dict of class)
     0     80   0 11004000  44  11004000  44 numpy.ndarray
     1  33270  28  4060945  16  15064945  60 str
     2  36082  30  2702640  11  17767585  70 tuple
     3   7590   6  1099048   4  18866633  75 types.CodeType
     4  14883  13  1090340   4  19956973  79 bytes
     5   6916   6   940576   4  20897549  83 function
     6    840   1   873864   3  21771413  86 type
     7   1558   1   640272   3  22411685  89 dict (no owner)
     8    404   0   465352   2  22877037  90 dict of module
     9   1495   1   437792   2  23314829  92 set
<261 more rows. Type e.g. '_.more' to view.>
final Partition of a set of 564995 objects. Total size = 71038709 bytes.
 Index  Count   %     Size   % Cumulative  % Kind (class / dict of class)
     0 149678  26 21221131  30  21221131  30 str
     1 159348  28 12141184  17  33362315  47 tuple
     2  72058  13  5470889   8  38833204  55 bytes
     3  36893   7  5342712   8  44175916  62 types.CodeType
     4  34546   6  4698256   7  48874172  69 function
     5   3774   1  3708112   5  52582284  74 type
     6   7030   1  2689432   4  55271716  78 dict (no owner)
     7   1549   0  2329208   3  57600924  81 dict of module
     8   3765   1  2049216   3  59650140  84 dict of type
     9   3905   1  1431520   2  61081660  86 set
<1221 more rows. Type e.g. '_.more' to view.>
        Command being timed: "python test_bloat.py /home/sperkins/data/for_simon/C147_unflagged.MS/ -c 32"
        User time (seconds): 5.63
        System time (seconds): 1.38
        Percent of CPU this job got: 102%
        Elapsed (wall clock) time (h:mm:ss or m:ss): 0:06.83
        Average shared text size (kbytes): 0
        Average unshared data size (kbytes): 0
        Average stack size (kbytes): 0
        Average total size (kbytes): 0
        Maximum resident set size (kbytes): 233920
        Average resident set size (kbytes): 0
        Major (requiring I/O) page faults: 0
        Minor (reclaiming a frame) page faults: 123706
        Voluntary context switches: 996
        Involuntary context switches: 289
        Swaps: 0
        File system inputs: 0
        File system outputs: 184
        Socket messages sent: 0
        Socket messages received: 0
        Signals delivered: 0
        Page size (bytes): 4096
        Exit status: 0

bokeh_plot(2)

This may disable some of the benefits of column caching, but I believe that this is an acceptable tradeoff for now.

@sjperkins
Copy link
Member

Worked around in #107

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants