Skip to content

Commit

Permalink
#362 (and #379): fix sound issues
Browse files Browse the repository at this point in the history
* use is-live=False solves most problems..
* remove remaining volume stuff (unused)
* add queue state info

git-svn-id: https://xpra.org/svn/Xpra/trunk@4040 3bb7dfac-3a0b-4e04-842a-767bc560f471
  • Loading branch information
totaam committed Aug 1, 2013
1 parent e91eb98 commit 1b6d7e7
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 110 deletions.
46 changes: 23 additions & 23 deletions src/xpra/client/ui_client_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -837,27 +837,28 @@ def stop_sending_sound_thread():
def start_receiving_sound(self):
""" ask the server to start sending sound and emit the client signal """
soundlog("start_receiving_sound() sound sink=%s", self.sound_sink)
if self.sound_sink is not None and self.sound_sink.get_state()=="active":
soundlog("start_receiving_sound: we are already receiving sound!")
if self.sound_sink is not None:
soundlog("start_receiving_sound: we already have a sound sink")
return
elif not self.server_sound_send:
log.error("cannot start receiving sound: support not enabled on the server")
else:
#choose a codec:
from xpra.sound.gstreamer_util import CODEC_ORDER
matching_codecs = [x for x in self.server_sound_encoders if x in self.speaker_codecs]
ordered_codecs = [x for x in CODEC_ORDER if x in matching_codecs]
if len(ordered_codecs)==0:
log.error("no matching codecs between server (%s) and client (%s)", self.server_sound_encoders, self.speaker_codecs)
return
codec = ordered_codecs[0]
self.speaker_enabled = True
self.emit("speaker-changed")
def sink_ready(*args):
soundlog("sink_ready(%s) codec=%s", args, codec)
self.send("sound-control", "start", codec)
return False
self.on_sink_ready = sink_ready
self.start_sound_sink(codec)
return
#choose a codec:
from xpra.sound.gstreamer_util import CODEC_ORDER
matching_codecs = [x for x in self.server_sound_encoders if x in self.speaker_codecs]
ordered_codecs = [x for x in CODEC_ORDER if x in matching_codecs]
if len(ordered_codecs)==0:
log.error("no matching codecs between server (%s) and client (%s)", self.server_sound_encoders, self.speaker_codecs)
return
codec = ordered_codecs[0]
self.speaker_enabled = True
self.emit("speaker-changed")
def sink_ready(*args):
soundlog("sink_ready(%s) codec=%s", args, codec)
self.send("sound-control", "start", codec)
return False
self.on_sink_ready = sink_ready
self.start_sound_sink(codec)

def stop_receiving_sound(self):
""" ask the server to stop sending sound, toggle flag so we ignore further packets and emit client signal """
Expand Down Expand Up @@ -920,8 +921,8 @@ def restart():
soundlog("restart() sound_sink=%s, codec=%s, server_sound_sequence=%s", self.sound_sink, codec, self.server_sound_sequence)
if self.server_sound_sequence:
self.send_new_sound_sequence()
self.sink_restart_pending = False
self.start_receiving_sound()
self.sink_restart_pending = False
return False
self.timeout_add(200, restart)

Expand Down Expand Up @@ -962,9 +963,8 @@ def _process_sound_data(self, packet):
self.sound_sink.stop()
return
if self.sound_sink is None:
soundlog("sound data received, creating a sound sink for it")
if not self.start_sound_sink(codec):
return
soundlog("no sound sink to process sound data, dropping it")
return
elif self.sound_sink.get_state()=="stopped":
soundlog("sound data received, sound sink is stopped - starting it")
self.sound_sink.start()
Expand Down
163 changes: 76 additions & 87 deletions src/xpra/sound/sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,20 @@
GST_QUEUE_NO_LEAK = 0
GST_QUEUE_LEAK_UPSTREAM = 1
GST_QUEUE_LEAK_DOWNSTREAM = 2
GST_QUEUE_LEAK_DEFAULT = GST_QUEUE_LEAK_DOWNSTREAM

MS_TO_NS = 1000000
QUEUE_LEAK = int(os.environ.get("XPRA_SOUND_QUEUE_LEAK", GST_QUEUE_LEAK_DEFAULT))
if QUEUE_LEAK not in (GST_QUEUE_NO_LEAK, GST_QUEUE_LEAK_UPSTREAM, GST_QUEUE_LEAK_DOWNSTREAM):
log.error("invalid leak option %s", QUEUE_LEAK)
QUEUE_LEAK = GST_QUEUE_LEAK_DEFAULT
QUEUE_TIME = int(os.environ.get("XPRA_SOUND_QUEUE_TIME", "450"))*MS_TO_NS
QUEUE_MIN_TIME = int(os.environ.get("XPRA_SOUND_QUEUE_MIN_TIME", "50"))*MS_TO_NS
QUEUE_TIME = max(0, QUEUE_TIME)
QUEUE_MIN_TIME = max(0, min(QUEUE_TIME, QUEUE_MIN_TIME))
DEFAULT_SINK = os.environ.get("XPRA_SOUND_SINK", DEFAULT_SINK)
if DEFAULT_SINK not in SINKS:
log.error("invalid default sound sink: '%s' is not in %s, using %s instead", DEFAULT_SINK, SINKS, SINKS[0])
DEFAULT_SINK = SINKS[0]

VOLUME = False


def sink_has_device_attribute(sink):
return sink not in ("autoaudiosink", "jackaudiosink", "directsoundsink")
Expand All @@ -63,79 +64,67 @@ def __init__(self, sink_type=DEFAULT_SINK, options={}, codec=MP3, decoder_option
self.sink_type = sink_type
decoder_str = plugin_str(decoder, decoder_options)
pipeline_els = []
pipeline_els.append("appsrc name=src max-bytes=512")
pipeline_els.append("appsrc"+
" name=src"+
" max-bytes=32768"+
" emit-signals=0"+
" block=0"+
" is-live=0"+
" stream-type=stream"+
" format=4")
pipeline_els.append(parser)
pipeline_els.append(decoder_str)
if VOLUME:
pipeline_els.append("volume name=volume")
pipeline_els.append("audioconvert")
pipeline_els.append("audioresample")
if QUEUE_TIME>0:
pipeline_els.append("queue" +
" name=queue"+
" min-threshold-time=%s" % QUEUE_MIN_TIME+
" max-size-time=%s" % QUEUE_TIME+
" leaky=%s" % GST_QUEUE_LEAK_DOWNSTREAM)
else:
pipeline_els.append("queue leaky=%s" % GST_QUEUE_LEAK_DOWNSTREAM)
pipeline_els.append("queue" +
" name=queue"+
" max-size-buffers=0"+
" max-size-bytes=0"+
" max-size-time=%s" % QUEUE_TIME+
" leaky=%s" % QUEUE_LEAK)
pipeline_els.append(sink_type)
self.setup_pipeline_and_bus(pipeline_els)
self.volume = self.pipeline.get_by_name("volume")
self.src = self.pipeline.get_by_name("src")
self.src.set_property('emit-signals', True)
self.src.set_property('stream-type', 'stream')
self.src.set_property('block', False)
self.src.set_property('format', 4)
self.src.set_property('is-live', True)
self.queue = self.pipeline.get_by_name("queue")
self.overruns = 0
self.queue_state = "starting"
self.queue.connect("overrun", self.queue_overrun)
self.queue.connect("underrun", self.queue_underrun)
self.src.connect("need-data", self.need_data)
self.src.connect("enough-data", self.on_enough_data)
self.queue.connect("running", self.queue_running)
self.queue.connect("pushing", self.queue_pushing)

def queue_pushing(self, *args):
ltime = int(self.queue.get_property("current-level-time")/MS_TO_NS)
debug("sound sink queue pushing: level=%s", ltime)
self.queue_state = "pushing"

def queue_running(self, *args):
ltime = int(self.queue.get_property("current-level-time")/MS_TO_NS)
debug("sound sink queue running: level=%s", ltime)
self.queue_state = "running"

def queue_underrun(self, *args):
ltime = int(self.queue.get_property("current-level-time")/1000000)
ltime = int(self.queue.get_property("current-level-time")/MS_TO_NS)
debug("sound sink queue underrun: level=%s", ltime)
self.emit("underrun", ltime)
self.queue_state = "underrun"

def queue_overrun(self, *args):
ltime = self.queue.get_property("current-level-time")
mtime = int(ltime/1000000)
debug("sound sink queue overrun: level=%s", mtime)
ltime = int(self.queue.get_property("current-level-time")/MS_TO_NS)
self.queue_state = "overrun"
#no overruns for the first 2 seconds:
if time.time()-self.start_time<2.0 or ltime<QUEUE_TIME:
elapsed = time.time()-self.start_time
if elapsed<2.0 or ltime<(QUEUE_TIME/MS_TO_NS/2*75/100):
debug("sound sink queue overrun ignored: level=%s, elapsed time=%.1f", ltime, elapsed)
return
debug("sound sink queue overrun: level=%s", ltime)
self.overruns += 1
self.emit("overrun", mtime)
self.emit("overrun", ltime)

def cleanup(self):
SoundPipeline.cleanup(self)
self.sink_type = ""
self.volume = None
self.src = None

def set_queue_delay(self, ms):
assert self.queue
assert ms>0
self.queue.set_property("max-size-time", ms*1000000)
log("queue delay set to %s, current-level-time=%s", ms, int(self.queue.get_property("current-level-time")/1000/1000))

def set_mute(self, mute):
self.volume.set_property('mute', mute)

def is_muted(self):
return bool(self.volume.get_property("mute"))

def get_volume(self):
assert VOLUME and self.volume
return self.volume.get_property("volume")

def set_volume(self, volume):
assert VOLUME and self.volume
assert volume>=0 and volume<=100
self.volume.set_property('volume', float(volume)/10.0)

def eos(self):
debug("eos()")
if self.src:
Expand All @@ -148,43 +137,43 @@ def get_info(self):
clt = self.queue.get_property("current-level-time")
info["queue.used_pct"] = int(min(QUEUE_TIME, clt)*100.0/QUEUE_TIME)
info["queue.overruns"] = self.overruns
if VOLUME and self.volume:
info["mute"] = self.volume.get_property("mute")
info["volume"] = int(100.0*self.volume.get_property("volume"))
info["queue.state"] = self.queue_state
return info

def add_data(self, data, metadata=None):
debug("sound sink: adding %s bytes to %s", len(data), self.src)
#debug("sound sink: adding %s bytes to %s, metadata: %s, level=%s", len(data), self.src, metadata, int(self.queue.get_property("current-level-time")/1000000))
if self.src:
buf = gst.Buffer(data)
if metadata:
ts = metadata.get("timestamp")
if ts is not None:
buf.timestamp = ts
d = metadata.get("duration")
if d is not None:
buf.duration = d
#buf.size = size
#buf.timestamp = timestamp
#buf.duration = duration
#buf.offset = offset
#buf.offset_end = offset_end
#buf.set_caps(gst.caps_from_string(caps))
r = self.src.emit("push-buffer", buf)
if r!=gst.FLOW_OK:
log.error("push-buffer error: %s", r)
self.emit('error', "push-buffer error: %s" % r)
else:
self.buffer_count += 1
self.byte_count += len(data)
debug("sound sink: new level=%s", int(self.queue.get_property("current-level-time")/1000000))

def need_data(self, src_arg, needed):
debug("need_data: %s bytes in %s", needed, src_arg)

def on_enough_data(self, *args):
debug("on_enough_data(%s)", args)
#debug("sound sink: adding %s bytes to %s, metadata: %s, level=%s", len(data), self.src, metadata, int(self.queue.get_property("current-level-time")/MS_TO_NS))
if not self.src:
return
buf = gst.Buffer(data)
d = 10*MS_TO_NS
if metadata and False:
ts = metadata.get("timestamp")
if ts is not None:
buf.timestamp = ts
d = metadata.get("duration")
if d is not None:
buf.duration = d
debug("add_data(..) queue_state=%s", self.queue_state)
self.push_buffer(buf)

def push_buffer(self, buf):
#buf.size = size
#buf.timestamp = timestamp
#buf.duration = duration
#buf.offset = offset
#buf.offset_end = offset_end
#buf.set_caps(gst.caps_from_string(caps))
r = self.src.emit("push-buffer", buf)
if r!=gst.FLOW_OK:
log.error("push-buffer error: %s", r)
self.emit('error', "push-buffer error: %s" % r)
return False
self.buffer_count += 1
self.byte_count += len(buf.data)
ltime = int(self.queue.get_property("current-level-time")/MS_TO_NS)
debug("sound sink: pushed %s bytes, new buffer level: %sms", len(buf.data), ltime)
return True



def main():
Expand Down Expand Up @@ -219,7 +208,7 @@ def main():

import logging
logging.basicConfig(format="%(asctime)s %(message)s")
logging.root.setLevel(logging.INFO)
logging.root.setLevel(logging.DEBUG)
f = open(filename, "rb")
data = f.read()
f.close()
Expand Down

0 comments on commit 1b6d7e7

Please sign in to comment.