Skip to content

Commit

Permalink
#849:
Browse files Browse the repository at this point in the history
* remove overrun and underrun signals
* change sink buffer level adjustments code to try to lower the buffer levels

git-svn-id: https://xpra.org/svn/Xpra/trunk@10250 3bb7dfac-3a0b-4e04-842a-767bc560f471
  • Loading branch information
totaam committed Aug 11, 2015
1 parent c433847 commit 63774e4
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 102 deletions.
9 changes: 0 additions & 9 deletions src/xpra/client/ui_client_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1719,12 +1719,6 @@ def sound_process_stopped(self, sound_sink, *args):
soundlog.warn("the sound process has stopped")
self.stop_receiving_sound()

def sound_sink_overrun(self, sound_sink, *args):
if sound_sink!=self.sound_sink:
soundlog("sound_sink_overrun() not the current sink, ignoring it")
else:
soundlog.warn("sound sink overrun")

def sound_sink_exit(self, sound_sink, *args):
log("sound_sink_exit(%s, %s) sound_sink=%s", sound_sink, args, self.sound_sink)
ss = self.sound_sink
Expand All @@ -1736,8 +1730,6 @@ def sound_sink_exit(self, sound_sink, *args):
#we use the "codec" field as guard to ensure we only print this warning once..
log.warn("the %s sound sink has stopped", ss.codec)
ss.codec = ""
#if we had an overrun, we should have restarted things already
#(and the guard at the top ensures we don't end up stopping the new sink)
self.stop_receiving_sound()

def start_sound_sink(self, codec):
Expand All @@ -1752,7 +1744,6 @@ def start_sound_sink(self, codec):
self.sound_sink = ss
ss.connect("state-changed", self.sound_sink_state_changed)
ss.connect("error", self.sound_sink_error)
ss.connect("overrun", self.sound_sink_overrun)
ss.connect("exit", self.sound_sink_exit)
from xpra.net.protocol import Protocol
ss.connect(Protocol.CONNECTION_LOST, self.sound_process_stopped)
Expand Down
3 changes: 0 additions & 3 deletions src/xpra/server/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -959,16 +959,13 @@ def sound_data(self, codec, data, metadata, *args):
def sound_sink_error(*args):
log.warn("stopping sound input because of error")
self.stop_receiving_sound()
def sound_sink_overrun(*args):
log.warn("sound overrun")
from xpra.sound.wrapper import start_receiving_sound
ss = start_receiving_sound(codec)
if not ss:
return
self.sound_sink = ss
soundlog("sound_data(..) created sound sink: %s", self.sound_sink)
ss.connect("error", sound_sink_error)
ss.connect("overrun", sound_sink_overrun)
ss.start()
soundlog("sound_data(..) sound sink started")
except Exception:
Expand Down
198 changes: 116 additions & 82 deletions src/xpra/sound/sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
# Xpra is released under the terms of the GNU GPL v2, or, at your option, any
# later version. See the file COPYING for details.

import sys, os
import sys, os, time
from collections import deque
from threading import Lock

from xpra.sound.sound_pipeline import SoundPipeline, gobject, glib, one_arg_signal
from xpra.sound.pulseaudio_util import has_pa
Expand All @@ -29,8 +31,6 @@
if os.name=="posix":
SINKS += ["alsasink", "osssink", "oss4sink", "jackaudiosink"]

#SINK_SHARED_DEFAULT_ATTRIBUTES = {"sync" : False,
# "async" : True}
SINK_SHARED_DEFAULT_ATTRIBUTES = {"sync" : False,
"async" : True,
"qos" : True
Expand All @@ -47,6 +47,10 @@
QUEUE_SILENT = 0
QUEUE_TIME = get_queue_time(450)

GRACE_PERIOD = int(os.environ.get("XPRA_SOUND_GRACE_PERIOD", "2000"))
#percentage: from 0 for no margin, to 200% which triples the buffer target
MARGIN = max(0, min(200, int(os.environ.get("XPRA_SOUND_MARGIN", "50"))))

GST_FORMAT_BUFFERS = 4

def sink_has_device_attribute(sink):
Expand All @@ -57,8 +61,6 @@ class SoundSink(SoundPipeline):

__gsignals__ = SoundPipeline.__generic_signals__.copy()
__gsignals__.update({
"underrun" : one_arg_signal,
"overrun" : one_arg_signal,
"eos" : one_arg_signal,
})

Expand All @@ -73,15 +75,17 @@ def __init__(self, sink_type=None, sink_options={}, codecs=CODECS, codec_options
decoder, parser = get_decoder_parser(codec)
SoundPipeline.__init__(self, codec)
self.sink_type = sink_type
self.levels = deque(maxlen=100)
decoder_str = plugin_str(decoder, codec_options)
pipeline_els = []
pipeline_els.append("appsrc"+
" name=src"+
" emit-signals=0"+
" block=0"+
" is-live=0"+
" stream-type=stream"+
" format=%s" % GST_FORMAT_BUFFERS)
appsrc_el = ["appsrc",
"name=src",
"emit-signals=0",
"block=0",
"is-live=0",
"stream-type=stream",
"format=%s" % GST_FORMAT_BUFFERS]
pipeline_els.append(" ".join(appsrc_el))
pipeline_els.append(parser)
pipeline_els.append(decoder_str)
pipeline_els.append("audioconvert")
Expand All @@ -107,7 +111,14 @@ def __init__(self, sink_type=None, sink_options={}, codecs=CODECS, codec_options
self.src = self.pipeline.get_by_name("src")
self.queue = self.pipeline.get_by_name("queue")
self.overruns = 0
self.underruns = 0
self.overrun_events = deque(maxlen=100)
self.underrun_events = deque(maxlen=100)
self.queue_state = "starting"
self.last_underrun = 0
self.last_overrun = 0
self.last_max_update = time.time()
self.level_lock = Lock()
if QUEUE_SILENT==0:
self.queue.connect("overrun", self.queue_overrun)
self.queue.connect("underrun", self.queue_underrun)
Expand All @@ -124,84 +135,102 @@ def cleanup(self):


def queue_pushing(self, *args):
ltime = self.queue.get_property("current-level-time")/MS_TO_NS
log("queue pushing: level=%i", ltime)
self.check_levels("pushing")
return 0
self.queue_state = "pushing"
self.emit_info()
return True

def queue_running(self, *args):
ltime = self.queue.get_property("current-level-time")/MS_TO_NS
log("queue running: level=%s", ltime)
self.check_levels("running")
return 0

def check_levels(self, new_state):
if self.queue_state=="underrun":
#lift min time restrictions:
self.queue.set_property("min-threshold-time", 0)
elif self.queue_state == "overrun":
clt = self.queue.get_property("current-level-time")
qpct = min(QUEUE_TIME, clt)*100//QUEUE_TIME
log("resetting max-size-time back to %ims (level=%ims, %i%%)", QUEUE_TIME//MS_TO_NS, clt//MS_TO_NS, qpct)
self.queue.set_property("max-size-time", QUEUE_TIME)
self.queue_state = new_state
self.queue_state = "running"
self.set_min_level()
self.set_max_level()
self.emit_info()

return True

def queue_underrun(self, *args):
if self.queue_state=="underrun" or self.queue_state=="starting":
now = time.time()
if self.queue_state=="starting" or 1000*(now-self.start_time)<GRACE_PERIOD:
log("ignoring underrun during startup")
return
ltime = self.queue.get_property("current-level-time")/MS_TO_NS
log("queue underrun: level=%i, previous state=%s", ltime, self.queue_state)
self.queue_state = "underrun"
MIN_QUEUE = QUEUE_TIME//2
mts = self.queue.get_property("min-threshold-time")
if mts==MIN_QUEUE:
return 0
#set min time restrictions to fill up queue:
log("increasing the min-threshold-time to %ims", MIN_QUEUE//MS_TO_NS)
self.queue.set_property("min-threshold-time", MIN_QUEUE)
def restore():
if self.queue.get_property("min-threshold-time")==0:
log("min-threshold-time already restored!")
return False
ltime = self.queue.get_property("current-level-time")//MS_TO_NS
if ltime==0:
log("not restored! (still underrun: %ims)", ltime)
return True
log("resetting the min-threshold-time back to %ims", 0)
self.queue.set_property("min-threshold-time", 0)
self.queue_state = "running"
return False
glib.timeout_add(1000, restore)
if now-self.last_underrun>2:
self.last_underrun = now
self.set_min_level()
self.underrun_events.append(now)
self.emit_info()
return 1

def get_level_range(self, mintime=2, maxtime=10):
now = time.time()
filtered = [v for t,v in list(self.levels) if (now-t)>=mintime and (now-t)<=maxtime]
if len(filtered)>=10:
maxl = max(filtered)
minl = min(filtered)
#range of the levels recorded:
return maxl-minl
return 0

def set_min_level(self):
if not self.level_lock.acquire(False):
return
try:
lrange = self.get_level_range()
if lrange>0:
cmtt = self.queue.get_property("min-threshold-time")//MS_TO_NS
#from 100% down to 0% in 2 seconds after underrun:
now = time.time()
pct = max(0, int((self.last_underrun+2-now)*50))
mtt = min(50, pct*lrange//200)
log("set_min_level pct=%2i, cmtt=%3i, mtt=%3i", pct, cmtt, mtt)
if cmtt!=mtt:
self.queue.set_property("min-threshold-time", mtt*MS_TO_NS)
log("set_min_level min-threshold-time=%s", mtt)
finally:
self.level_lock.release()

def set_max_level(self, force=False):
if not self.level_lock.acquire(False):
return
try:
lrange = self.get_level_range(mintime=0)
now = time.time()
log("set_max_level lrange=%3i, last_max_update=%is", lrange, int(now-self.last_max_update))
#more than one second since last update and we have a range:
if now-self.last_max_update>1 and lrange>0:
cmst = self.queue.get_property("max-size-time")//MS_TO_NS
#overruns in the last minute:
olm = len([x for x in list(self.overrun_events) if now-x<60])
#increase target if we have more than 5 overruns in the last minute:
target_mst = lrange*(100 + MARGIN + min(100, olm*20))//100
#from 100% down to 0% in 2 seconds after underrun:
pct = max(0, int((self.last_overrun+2-now)*50))
#use this last_overrun percentage value to temporarily decrease the target
#(causes overruns that drop packets and lower the buffer level)
target_mst = max(50, int(target_mst - pct*lrange//100))
mst = (cmst + target_mst)//2
log("set_max_level overrun count=%-2i, margin=%3i, pct=%2i, cmst=%3i, mst=%3i", olm, MARGIN, pct, cmst, mst)
if force or abs(cmst-mst)>=max(50, lrange//2):
self.queue.set_property("max-size-time", mst*MS_TO_NS)
self.last_max_update = now
finally:
self.level_lock.release()

def queue_overrun(self, *args):
if self.queue_state=="overrun":
now = time.time()
if self.queue_state=="starting" or 1000*(now-self.start_time)<GRACE_PERIOD:
log("ignoring overrun during startup")
return
ltime = self.queue.get_property("current-level-time")//MS_TO_NS
log("queue overrun: level=%i, previous state=%s", ltime, self.queue_state)
self.queue_state = "overrun"
REDUCED_QT = QUEUE_TIME//2
#empty the queue by reducing its max size:
log("queue overrun: halving the max-size-time to %ims", REDUCED_QT//MS_TO_NS)
self.queue.set_property("max-size-time", REDUCED_QT)
clt = self.queue.get_property("current-level-time")//MS_TO_NS
log.warn("overrun level=%ims", clt)
now = time.time()
#grace period of recording overruns:
#(because when we record an overrun, we lower the max-time,
# which causes more overruns!)
if self.last_overrun is None or now-self.last_overrun>2:
self.last_overrun = now
self.set_max_level()
self.overrun_events.append(now)
self.overruns += 1
self.emit("overrun", ltime)
def restore():
if self.queue.get_property("max-size-time")==QUEUE_TIME:
log("max-size-time already restored!")
return False
ltime = self.queue.get_property("current-level-time")//MS_TO_NS
if ltime>=REDUCED_QT:
log("max-size-time not restored! (still overrun: %ims)", ltime)
return True
log("raising the max-size-time back to %ims", QUEUE_TIME//MS_TO_NS)
self.queue.set_property("max-size-time", QUEUE_TIME)
self.queue_state = "running"
return False
glib.timeout_add(1000, restore)
return 0
return 1

def eos(self):
log("eos()")
Expand All @@ -222,6 +251,7 @@ def get_info(self):
"cur" : clt//MS_TO_NS,
"pct" : min(QUEUE_TIME, clt)*100//qmax,
"overruns" : self.overruns,
"underruns" : self.underruns,
"state" : self.queue_state})
return info

Expand All @@ -247,8 +277,12 @@ def add_data(self, data, metadata=None):
if self.push_buffer(buf):
self.buffer_count += 1
self.byte_count += len(data)
ltime = self.queue.get_property("current-level-time")//MS_TO_NS
log("pushed %s bytes, new buffer level: %sms, queue state=%s", len(data), ltime, self.queue_state)
clt = self.queue.get_property("current-level-time")//MS_TO_NS
log("pushed %5i bytes, new buffer level: %3ims, queue state=%s", len(data), clt, self.queue_state)
self.levels.append((time.time(), clt))
if self.queue_state=="pushing":
self.set_min_level()
self.set_max_level()
self.emit_info()

def push_buffer(self, buf):
Expand All @@ -262,8 +296,8 @@ def push_buffer(self, buf):
if r!=gst.FLOW_OK:
log.error("push-buffer error: %s", r)
self.emit('error', "push-buffer error: %s" % r)
return False
return True
return 0
return 1

gobject.type_register(SoundSink)

Expand Down
9 changes: 1 addition & 8 deletions src/xpra/sound/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

DEBUG_SOUND = os.environ.get("XPRA_SOUND_DEBUG", "0")=="1"
SUBPROCESS_DEBUG = os.environ.get("XPRA_SOUND_SUBPROCESS_DEBUG", "").split(",")
FAKE_OVERRUN = int(os.environ.get("XPRA_SOUND_FAKE_OVERRUN", "0"))
FAKE_START_FAILURE = os.environ.get("XPRA_SOUND_FAKE_START_FAILURE", "0")=="1"
FAKE_EXIT = int(os.environ.get("XPRA_SOUND_FAKE_EXIT", "0"))
FAKE_CRASH = int(os.environ.get("XPRA_SOUND_FAKE_CRASH", "0"))
Expand Down Expand Up @@ -96,13 +95,7 @@ class sound_play(sound_subprocess):
def __init__(self, *pipeline_args):
from xpra.sound.sink import SoundSink
sound_pipeline = SoundSink(*pipeline_args)
sound_subprocess.__init__(self, sound_pipeline, ["add_data"], ["underrun", "overrun"])
if FAKE_OVERRUN>0:
def fake_overrun(*args):
wo = self.wrapped_object
if wo:
wo.emit("overrun", 500)
glib.timeout_add(FAKE_OVERRUN*1000, fake_overrun)
sound_subprocess.__init__(self, sound_pipeline, ["add_data"], [])


def run_sound(mode, error_cb, options, args):
Expand Down

0 comments on commit 63774e4

Please sign in to comment.