diff --git a/src/xpra/client/ui_client_base.py b/src/xpra/client/ui_client_base.py index 7ec0806597..8a066a8b25 100644 --- a/src/xpra/client/ui_client_base.py +++ b/src/xpra/client/ui_client_base.py @@ -1719,12 +1719,6 @@ def sound_process_stopped(self, sound_sink, *args): soundlog.warn("the sound process has stopped") self.stop_receiving_sound() - def sound_sink_overrun(self, sound_sink, *args): - if sound_sink!=self.sound_sink: - soundlog("sound_sink_overrun() not the current sink, ignoring it") - else: - soundlog.warn("sound sink overrun") - def sound_sink_exit(self, sound_sink, *args): log("sound_sink_exit(%s, %s) sound_sink=%s", sound_sink, args, self.sound_sink) ss = self.sound_sink @@ -1736,8 +1730,6 @@ def sound_sink_exit(self, sound_sink, *args): #we use the "codec" field as guard to ensure we only print this warning once.. log.warn("the %s sound sink has stopped", ss.codec) ss.codec = "" - #if we had an overrun, we should have restarted things already - #(and the guard at the top ensures we don't end up stopping the new sink) self.stop_receiving_sound() def start_sound_sink(self, codec): @@ -1752,7 +1744,6 @@ def start_sound_sink(self, codec): self.sound_sink = ss ss.connect("state-changed", self.sound_sink_state_changed) ss.connect("error", self.sound_sink_error) - ss.connect("overrun", self.sound_sink_overrun) ss.connect("exit", self.sound_sink_exit) from xpra.net.protocol import Protocol ss.connect(Protocol.CONNECTION_LOST, self.sound_process_stopped) diff --git a/src/xpra/server/source.py b/src/xpra/server/source.py index 9b30c480ff..016d9a2a58 100644 --- a/src/xpra/server/source.py +++ b/src/xpra/server/source.py @@ -959,8 +959,6 @@ def sound_data(self, codec, data, metadata, *args): def sound_sink_error(*args): log.warn("stopping sound input because of error") self.stop_receiving_sound() - def sound_sink_overrun(*args): - log.warn("sound overrun") from xpra.sound.wrapper import start_receiving_sound ss = start_receiving_sound(codec) if not ss: @@ -968,7 +966,6 @@ def sound_sink_overrun(*args): self.sound_sink = ss soundlog("sound_data(..) created sound sink: %s", self.sound_sink) ss.connect("error", sound_sink_error) - ss.connect("overrun", sound_sink_overrun) ss.start() soundlog("sound_data(..) sound sink started") except Exception: diff --git a/src/xpra/sound/sink.py b/src/xpra/sound/sink.py index dee8a75903..7daabe5cb0 100755 --- a/src/xpra/sound/sink.py +++ b/src/xpra/sound/sink.py @@ -4,7 +4,9 @@ # Xpra is released under the terms of the GNU GPL v2, or, at your option, any # later version. See the file COPYING for details. -import sys, os +import sys, os, time +from collections import deque +from threading import Lock from xpra.sound.sound_pipeline import SoundPipeline, gobject, glib, one_arg_signal from xpra.sound.pulseaudio_util import has_pa @@ -29,8 +31,6 @@ if os.name=="posix": SINKS += ["alsasink", "osssink", "oss4sink", "jackaudiosink"] -#SINK_SHARED_DEFAULT_ATTRIBUTES = {"sync" : False, -# "async" : True} SINK_SHARED_DEFAULT_ATTRIBUTES = {"sync" : False, "async" : True, "qos" : True @@ -47,6 +47,10 @@ QUEUE_SILENT = 0 QUEUE_TIME = get_queue_time(450) +GRACE_PERIOD = int(os.environ.get("XPRA_SOUND_GRACE_PERIOD", "2000")) +#percentage: from 0 for no margin, to 200% which triples the buffer target +MARGIN = max(0, min(200, int(os.environ.get("XPRA_SOUND_MARGIN", "50")))) + GST_FORMAT_BUFFERS = 4 def sink_has_device_attribute(sink): @@ -57,8 +61,6 @@ class SoundSink(SoundPipeline): __gsignals__ = SoundPipeline.__generic_signals__.copy() __gsignals__.update({ - "underrun" : one_arg_signal, - "overrun" : one_arg_signal, "eos" : one_arg_signal, }) @@ -73,15 +75,17 @@ def __init__(self, sink_type=None, sink_options={}, codecs=CODECS, codec_options decoder, parser = get_decoder_parser(codec) SoundPipeline.__init__(self, codec) self.sink_type = sink_type + self.levels = deque(maxlen=100) decoder_str = plugin_str(decoder, codec_options) pipeline_els = [] - pipeline_els.append("appsrc"+ - " name=src"+ - " emit-signals=0"+ - " block=0"+ - " is-live=0"+ - " stream-type=stream"+ - " format=%s" % GST_FORMAT_BUFFERS) + appsrc_el = ["appsrc", + "name=src", + "emit-signals=0", + "block=0", + "is-live=0", + "stream-type=stream", + "format=%s" % GST_FORMAT_BUFFERS] + pipeline_els.append(" ".join(appsrc_el)) pipeline_els.append(parser) pipeline_els.append(decoder_str) pipeline_els.append("audioconvert") @@ -107,7 +111,14 @@ def __init__(self, sink_type=None, sink_options={}, codecs=CODECS, codec_options self.src = self.pipeline.get_by_name("src") self.queue = self.pipeline.get_by_name("queue") self.overruns = 0 + self.underruns = 0 + self.overrun_events = deque(maxlen=100) + self.underrun_events = deque(maxlen=100) self.queue_state = "starting" + self.last_underrun = 0 + self.last_overrun = 0 + self.last_max_update = time.time() + self.level_lock = Lock() if QUEUE_SILENT==0: self.queue.connect("overrun", self.queue_overrun) self.queue.connect("underrun", self.queue_underrun) @@ -124,84 +135,102 @@ def cleanup(self): def queue_pushing(self, *args): - ltime = self.queue.get_property("current-level-time")/MS_TO_NS - log("queue pushing: level=%i", ltime) - self.check_levels("pushing") - return 0 + self.queue_state = "pushing" + self.emit_info() + return True def queue_running(self, *args): - ltime = self.queue.get_property("current-level-time")/MS_TO_NS - log("queue running: level=%s", ltime) - self.check_levels("running") - return 0 - - def check_levels(self, new_state): - if self.queue_state=="underrun": - #lift min time restrictions: - self.queue.set_property("min-threshold-time", 0) - elif self.queue_state == "overrun": - clt = self.queue.get_property("current-level-time") - qpct = min(QUEUE_TIME, clt)*100//QUEUE_TIME - log("resetting max-size-time back to %ims (level=%ims, %i%%)", QUEUE_TIME//MS_TO_NS, clt//MS_TO_NS, qpct) - self.queue.set_property("max-size-time", QUEUE_TIME) - self.queue_state = new_state + self.queue_state = "running" + self.set_min_level() + self.set_max_level() self.emit_info() - + return True def queue_underrun(self, *args): - if self.queue_state=="underrun" or self.queue_state=="starting": + now = time.time() + if self.queue_state=="starting" or 1000*(now-self.start_time)2: + self.last_underrun = now + self.set_min_level() + self.underrun_events.append(now) + self.emit_info() + return 1 + + def get_level_range(self, mintime=2, maxtime=10): + now = time.time() + filtered = [v for t,v in list(self.levels) if (now-t)>=mintime and (now-t)<=maxtime] + if len(filtered)>=10: + maxl = max(filtered) + minl = min(filtered) + #range of the levels recorded: + return maxl-minl return 0 + def set_min_level(self): + if not self.level_lock.acquire(False): + return + try: + lrange = self.get_level_range() + if lrange>0: + cmtt = self.queue.get_property("min-threshold-time")//MS_TO_NS + #from 100% down to 0% in 2 seconds after underrun: + now = time.time() + pct = max(0, int((self.last_underrun+2-now)*50)) + mtt = min(50, pct*lrange//200) + log("set_min_level pct=%2i, cmtt=%3i, mtt=%3i", pct, cmtt, mtt) + if cmtt!=mtt: + self.queue.set_property("min-threshold-time", mtt*MS_TO_NS) + log("set_min_level min-threshold-time=%s", mtt) + finally: + self.level_lock.release() + + def set_max_level(self, force=False): + if not self.level_lock.acquire(False): + return + try: + lrange = self.get_level_range(mintime=0) + now = time.time() + log("set_max_level lrange=%3i, last_max_update=%is", lrange, int(now-self.last_max_update)) + #more than one second since last update and we have a range: + if now-self.last_max_update>1 and lrange>0: + cmst = self.queue.get_property("max-size-time")//MS_TO_NS + #overruns in the last minute: + olm = len([x for x in list(self.overrun_events) if now-x<60]) + #increase target if we have more than 5 overruns in the last minute: + target_mst = lrange*(100 + MARGIN + min(100, olm*20))//100 + #from 100% down to 0% in 2 seconds after underrun: + pct = max(0, int((self.last_overrun+2-now)*50)) + #use this last_overrun percentage value to temporarily decrease the target + #(causes overruns that drop packets and lower the buffer level) + target_mst = max(50, int(target_mst - pct*lrange//100)) + mst = (cmst + target_mst)//2 + log("set_max_level overrun count=%-2i, margin=%3i, pct=%2i, cmst=%3i, mst=%3i", olm, MARGIN, pct, cmst, mst) + if force or abs(cmst-mst)>=max(50, lrange//2): + self.queue.set_property("max-size-time", mst*MS_TO_NS) + self.last_max_update = now + finally: + self.level_lock.release() + def queue_overrun(self, *args): - if self.queue_state=="overrun": + now = time.time() + if self.queue_state=="starting" or 1000*(now-self.start_time)2: + self.last_overrun = now + self.set_max_level() + self.overrun_events.append(now) self.overruns += 1 - self.emit("overrun", ltime) - def restore(): - if self.queue.get_property("max-size-time")==QUEUE_TIME: - log("max-size-time already restored!") - return False - ltime = self.queue.get_property("current-level-time")//MS_TO_NS - if ltime>=REDUCED_QT: - log("max-size-time not restored! (still overrun: %ims)", ltime) - return True - log("raising the max-size-time back to %ims", QUEUE_TIME//MS_TO_NS) - self.queue.set_property("max-size-time", QUEUE_TIME) - self.queue_state = "running" - return False - glib.timeout_add(1000, restore) - return 0 + return 1 def eos(self): log("eos()") @@ -222,6 +251,7 @@ def get_info(self): "cur" : clt//MS_TO_NS, "pct" : min(QUEUE_TIME, clt)*100//qmax, "overruns" : self.overruns, + "underruns" : self.underruns, "state" : self.queue_state}) return info @@ -247,8 +277,12 @@ def add_data(self, data, metadata=None): if self.push_buffer(buf): self.buffer_count += 1 self.byte_count += len(data) - ltime = self.queue.get_property("current-level-time")//MS_TO_NS - log("pushed %s bytes, new buffer level: %sms, queue state=%s", len(data), ltime, self.queue_state) + clt = self.queue.get_property("current-level-time")//MS_TO_NS + log("pushed %5i bytes, new buffer level: %3ims, queue state=%s", len(data), clt, self.queue_state) + self.levels.append((time.time(), clt)) + if self.queue_state=="pushing": + self.set_min_level() + self.set_max_level() self.emit_info() def push_buffer(self, buf): @@ -262,8 +296,8 @@ def push_buffer(self, buf): if r!=gst.FLOW_OK: log.error("push-buffer error: %s", r) self.emit('error', "push-buffer error: %s" % r) - return False - return True + return 0 + return 1 gobject.type_register(SoundSink) diff --git a/src/xpra/sound/wrapper.py b/src/xpra/sound/wrapper.py index f64b0c9f59..106dc4246c 100644 --- a/src/xpra/sound/wrapper.py +++ b/src/xpra/sound/wrapper.py @@ -14,7 +14,6 @@ DEBUG_SOUND = os.environ.get("XPRA_SOUND_DEBUG", "0")=="1" SUBPROCESS_DEBUG = os.environ.get("XPRA_SOUND_SUBPROCESS_DEBUG", "").split(",") -FAKE_OVERRUN = int(os.environ.get("XPRA_SOUND_FAKE_OVERRUN", "0")) FAKE_START_FAILURE = os.environ.get("XPRA_SOUND_FAKE_START_FAILURE", "0")=="1" FAKE_EXIT = int(os.environ.get("XPRA_SOUND_FAKE_EXIT", "0")) FAKE_CRASH = int(os.environ.get("XPRA_SOUND_FAKE_CRASH", "0")) @@ -96,13 +95,7 @@ class sound_play(sound_subprocess): def __init__(self, *pipeline_args): from xpra.sound.sink import SoundSink sound_pipeline = SoundSink(*pipeline_args) - sound_subprocess.__init__(self, sound_pipeline, ["add_data"], ["underrun", "overrun"]) - if FAKE_OVERRUN>0: - def fake_overrun(*args): - wo = self.wrapped_object - if wo: - wo.emit("overrun", 500) - glib.timeout_add(FAKE_OVERRUN*1000, fake_overrun) + sound_subprocess.__init__(self, sound_pipeline, ["add_data"], []) def run_sound(mode, error_cb, options, args):