Skip to content

Observable interval appears to leak #259

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
AndrewLipscomb opened this issue Sep 27, 2018 · 6 comments
Open

Observable interval appears to leak #259

AndrewLipscomb opened this issue Sep 27, 2018 · 6 comments

Comments

@AndrewLipscomb
Copy link

In this test example

from __future__ import print_function

import multiprocessing
import threading
import os
import psutil
from time import sleep

from rx import Observable

# Synchronisable heartbeat
heartbeat = Observable.interval(100).publish()

heartbeat.connect()

heartbeat.subscribe(lambda beat: None )

# Mocks a 50Hz GPS signal
provider = Observable.interval(20) \
                            .map(lambda i: (24, 24) ) \
                            .publish()

provider.connect()

heartbeat.with_latest_from(provider, lambda heartbeat, gps: gps) \
        .subscribe( lambda combined : None ) # lambda combined: print(combined) )

try:
    while True:
        sleep(0.1)
        process = psutil.Process(os.getpid())
        print(str(process.memory_info().rss/1000) + " KB in mem; " + str(threading.active_count()) + " active threads")

except KeyboardInterrupt:
    print("Caught KeyboardInterrupt")

which is running in 2.7.15 on Kubuntu 18.04 with RxPy 1.6.1, I'm seeing the memory of this application slowly tick upwards. The output of psutil (and the system profiler) shows memory ticking upwards, while the thread count stays constant at around 9.

Full disclosure - Python is not my usual language, so I may be a missing a language gotcha.

From having a read of an implementation (in Java) of interval I didn't believe that anything here needs to managed for memory growth - Googling on memory management for infinite sequences isn't giving me much either. Is this normal (and my code is badly written) or is there an issue here?

@AndrewLipscomb
Copy link
Author

AndrewLipscomb commented Sep 27, 2018

FWIW wrote up a comparison test in my usual language (c++) and couldn't replicate any leaking.

#include <iostream>
#include <chrono>
#include "rxcpp/rx.hpp"

int main(int argc, char** argv) {

    auto threads = rxcpp::observe_on_new_thread();

    auto heartbeat = rxcpp::observable<>::interval( std::chrono::milliseconds(100), threads )
                        .observe_on(threads)
                        .publish();
    heartbeat.connect();

    auto gps = rxcpp::observable<>::interval( std::chrono::milliseconds(20), threads )
                        .observe_on(threads)
                        .publish();
    gps.connect();

    auto values = heartbeat.with_latest_from(gps);

    values
        .subscribe_on(threads)
        .subscribe(
            [](std::tuple<int, int> v) { printf("OnNext: %d, %d\n", std::get<0>(v), std::get<1>(v)); },
            []() { printf("OnCompleted\n"); }
        );

    int i;
    std::cout << "This blocks until Ctrl C";
    std::cin >> i;

    return 0;
}

Edit: fixed code

@rfum
Copy link

rfum commented Mar 7, 2019

I have the following logging format for std logging lib. logging.basicConfig(level=logging.INFO,format="[%(threadName)s] %(asctime)-15s %(message)s ")

This makes print the execution thread of the logging.info(foo).

I've created interval stream an consumer via the code below.

timer_stream = rx.Observable.interval(1000).subscribe_on(time_pool)
timer_stream.subscribe(
    lambda count: logging.info(count))

As you can see from the output every tick of interval command executes in a new thread id. I think this might be the reason of the issue. I'm using RxPY 1.6.1 version. FYI.

image

Edit: creating a scheduler and specifying it on scheduler argument of the interval solves the problem.
New code is like below.

import rx.concurrency as rxc 
time_pool = rxc.ThreadPoolScheduler(1)
timer_stream = rx.Observable.interval(1000,scheduler=time_pool)
timer_stream.subscribe(
    lambda count: logging.info(count))

image

@jcafhe
Copy link
Collaborator

jcafhe commented May 14, 2019

Hi,

I've been running a bunch of tests with a simpler sample and a slightly different way of monitoring memory, RxPY3 (HEAD master), python 3.7.3 & linux. I must say that I'm not an expert in memory management, but I believe that the garbage collector can make memory measurements quite irrelevant (?). I have no idea how and when python frees unused memory. Also it's not clear what the rss value really means (linux) and if it's reliable in a python context.

Basically, the script consists in a function that creates one or more interval observables and which is executed on a second python process. This process is monitored from the main python process by pulling and logging memory infos (rss, vms) related to the 'observables' process. IMO, it avoids some potential unwanted interactions and measurement bias.

I've tested with 2 versions of this script:

  • one or more hot interval observables (see below)
  • one or more cold interval observables (i.e. without publish operator)
def fn_interval_hot(dt, ninstances=1):
    observables = []
    disposables = []
    for i in range(ninstances):
        observable = rx.interval(dt).pipe(ops.publish())
        d = observable.subscribe(lambda _: None)
        observables.append(observable)
        disposables.append(d)

    # connect observables
    for o in observables:
        o.connect()

    # maintain thread alive indefinitely
    while True:
        time.sleep(10.0)

Tests have been running with:

  • interval type: hot, cold
  • interval period: 1ms, 50ms
  • duration: 30mn
  • number of simultaneous observables: 1, 5, 8

Figure_1

We can see that in every cases, rss quickly grows until it reaches an almost constant value in an asymptotic fashion. So I would say that's not that bad after all.

@erikkemperman @dbrattli , interval operator has TimeoutScheduler as a default scheduler. It doesn't implement schedule_periodic so the one from SchedulerBase is used instead, calling recursively schedule_relative. Each time, schedule_relative creates a threading.Timer which seems to be a subclass of threading.Thread.
In short, as noticed by @rfum, periodic scheduling with TimeoutScheduler results in creating a thread per item. I don't think this is an issue right now, but it may be non-optimal. Maybe something to keep in mind for potentially future scheduler improvements.

@erikkemperman
Copy link
Collaborator

Nice analysis!

Yes, TimeoutScheduler creates a lot of threads, where recycling is certainly possible. I've been meaning to try to make a pool or something, but have not found the time for that so far.

@dbrattli
Copy link
Collaborator

Perhaps we should just retire TimeoutScheduler and instead use ThreadPoolScheduler as default. The TimeoutScheduler was ported from JS and might not make sense to use for Python since it's unbounded.

@erikkemperman
Copy link
Collaborator

Yes, that's one option. I was hoping to actually try something slightly more ambitious, namely to postpone mapping scheduled items to threads until they're almost due. That way we avoid claiming threads from the pool only for them to do nothing for a relatively long time. But actually, that kind of logic might make sense for the ThreadPoolScheduler as well, and in that case I guess they could be folded into a single class.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants