diff --git a/src/xpra/client/ui_client_base.py b/src/xpra/client/ui_client_base.py index 6e6e34ee45..fa48d327b9 100644 --- a/src/xpra/client/ui_client_base.py +++ b/src/xpra/client/ui_client_base.py @@ -837,27 +837,28 @@ def stop_sending_sound_thread(): def start_receiving_sound(self): """ ask the server to start sending sound and emit the client signal """ soundlog("start_receiving_sound() sound sink=%s", self.sound_sink) - if self.sound_sink is not None and self.sound_sink.get_state()=="active": - soundlog("start_receiving_sound: we are already receiving sound!") + if self.sound_sink is not None: + soundlog("start_receiving_sound: we already have a sound sink") + return elif not self.server_sound_send: log.error("cannot start receiving sound: support not enabled on the server") - else: - #choose a codec: - from xpra.sound.gstreamer_util import CODEC_ORDER - matching_codecs = [x for x in self.server_sound_encoders if x in self.speaker_codecs] - ordered_codecs = [x for x in CODEC_ORDER if x in matching_codecs] - if len(ordered_codecs)==0: - log.error("no matching codecs between server (%s) and client (%s)", self.server_sound_encoders, self.speaker_codecs) - return - codec = ordered_codecs[0] - self.speaker_enabled = True - self.emit("speaker-changed") - def sink_ready(*args): - soundlog("sink_ready(%s) codec=%s", args, codec) - self.send("sound-control", "start", codec) - return False - self.on_sink_ready = sink_ready - self.start_sound_sink(codec) + return + #choose a codec: + from xpra.sound.gstreamer_util import CODEC_ORDER + matching_codecs = [x for x in self.server_sound_encoders if x in self.speaker_codecs] + ordered_codecs = [x for x in CODEC_ORDER if x in matching_codecs] + if len(ordered_codecs)==0: + log.error("no matching codecs between server (%s) and client (%s)", self.server_sound_encoders, self.speaker_codecs) + return + codec = ordered_codecs[0] + self.speaker_enabled = True + self.emit("speaker-changed") + def sink_ready(*args): + soundlog("sink_ready(%s) codec=%s", args, codec) + self.send("sound-control", "start", codec) + return False + self.on_sink_ready = sink_ready + self.start_sound_sink(codec) def stop_receiving_sound(self): """ ask the server to stop sending sound, toggle flag so we ignore further packets and emit client signal """ @@ -920,8 +921,8 @@ def restart(): soundlog("restart() sound_sink=%s, codec=%s, server_sound_sequence=%s", self.sound_sink, codec, self.server_sound_sequence) if self.server_sound_sequence: self.send_new_sound_sequence() - self.sink_restart_pending = False self.start_receiving_sound() + self.sink_restart_pending = False return False self.timeout_add(200, restart) @@ -962,9 +963,8 @@ def _process_sound_data(self, packet): self.sound_sink.stop() return if self.sound_sink is None: - soundlog("sound data received, creating a sound sink for it") - if not self.start_sound_sink(codec): - return + soundlog("no sound sink to process sound data, dropping it") + return elif self.sound_sink.get_state()=="stopped": soundlog("sound data received, sound sink is stopped - starting it") self.sound_sink.start() diff --git a/src/xpra/sound/sink.py b/src/xpra/sound/sink.py index 74728772de..2579ca12b9 100755 --- a/src/xpra/sound/sink.py +++ b/src/xpra/sound/sink.py @@ -29,19 +29,20 @@ GST_QUEUE_NO_LEAK = 0 GST_QUEUE_LEAK_UPSTREAM = 1 GST_QUEUE_LEAK_DOWNSTREAM = 2 +GST_QUEUE_LEAK_DEFAULT = GST_QUEUE_LEAK_DOWNSTREAM MS_TO_NS = 1000000 +QUEUE_LEAK = int(os.environ.get("XPRA_SOUND_QUEUE_LEAK", GST_QUEUE_LEAK_DEFAULT)) +if QUEUE_LEAK not in (GST_QUEUE_NO_LEAK, GST_QUEUE_LEAK_UPSTREAM, GST_QUEUE_LEAK_DOWNSTREAM): + log.error("invalid leak option %s", QUEUE_LEAK) + QUEUE_LEAK = GST_QUEUE_LEAK_DEFAULT QUEUE_TIME = int(os.environ.get("XPRA_SOUND_QUEUE_TIME", "450"))*MS_TO_NS -QUEUE_MIN_TIME = int(os.environ.get("XPRA_SOUND_QUEUE_MIN_TIME", "50"))*MS_TO_NS QUEUE_TIME = max(0, QUEUE_TIME) -QUEUE_MIN_TIME = max(0, min(QUEUE_TIME, QUEUE_MIN_TIME)) DEFAULT_SINK = os.environ.get("XPRA_SOUND_SINK", DEFAULT_SINK) if DEFAULT_SINK not in SINKS: log.error("invalid default sound sink: '%s' is not in %s, using %s instead", DEFAULT_SINK, SINKS, SINKS[0]) DEFAULT_SINK = SINKS[0] -VOLUME = False - def sink_has_device_attribute(sink): return sink not in ("autoaudiosink", "jackaudiosink", "directsoundsink") @@ -63,79 +64,67 @@ def __init__(self, sink_type=DEFAULT_SINK, options={}, codec=MP3, decoder_option self.sink_type = sink_type decoder_str = plugin_str(decoder, decoder_options) pipeline_els = [] - pipeline_els.append("appsrc name=src max-bytes=512") + pipeline_els.append("appsrc"+ + " name=src"+ + " max-bytes=32768"+ + " emit-signals=0"+ + " block=0"+ + " is-live=0"+ + " stream-type=stream"+ + " format=4") pipeline_els.append(parser) pipeline_els.append(decoder_str) - if VOLUME: - pipeline_els.append("volume name=volume") pipeline_els.append("audioconvert") pipeline_els.append("audioresample") - if QUEUE_TIME>0: - pipeline_els.append("queue" + - " name=queue"+ - " min-threshold-time=%s" % QUEUE_MIN_TIME+ - " max-size-time=%s" % QUEUE_TIME+ - " leaky=%s" % GST_QUEUE_LEAK_DOWNSTREAM) - else: - pipeline_els.append("queue leaky=%s" % GST_QUEUE_LEAK_DOWNSTREAM) + pipeline_els.append("queue" + + " name=queue"+ + " max-size-buffers=0"+ + " max-size-bytes=0"+ + " max-size-time=%s" % QUEUE_TIME+ + " leaky=%s" % QUEUE_LEAK) pipeline_els.append(sink_type) self.setup_pipeline_and_bus(pipeline_els) - self.volume = self.pipeline.get_by_name("volume") self.src = self.pipeline.get_by_name("src") - self.src.set_property('emit-signals', True) - self.src.set_property('stream-type', 'stream') - self.src.set_property('block', False) - self.src.set_property('format', 4) - self.src.set_property('is-live', True) self.queue = self.pipeline.get_by_name("queue") self.overruns = 0 + self.queue_state = "starting" self.queue.connect("overrun", self.queue_overrun) self.queue.connect("underrun", self.queue_underrun) - self.src.connect("need-data", self.need_data) - self.src.connect("enough-data", self.on_enough_data) + self.queue.connect("running", self.queue_running) + self.queue.connect("pushing", self.queue_pushing) + + def queue_pushing(self, *args): + ltime = int(self.queue.get_property("current-level-time")/MS_TO_NS) + debug("sound sink queue pushing: level=%s", ltime) + self.queue_state = "pushing" + + def queue_running(self, *args): + ltime = int(self.queue.get_property("current-level-time")/MS_TO_NS) + debug("sound sink queue running: level=%s", ltime) + self.queue_state = "running" def queue_underrun(self, *args): - ltime = int(self.queue.get_property("current-level-time")/1000000) + ltime = int(self.queue.get_property("current-level-time")/MS_TO_NS) debug("sound sink queue underrun: level=%s", ltime) - self.emit("underrun", ltime) + self.queue_state = "underrun" def queue_overrun(self, *args): - ltime = self.queue.get_property("current-level-time") - mtime = int(ltime/1000000) - debug("sound sink queue overrun: level=%s", mtime) + ltime = int(self.queue.get_property("current-level-time")/MS_TO_NS) + self.queue_state = "overrun" #no overruns for the first 2 seconds: - if time.time()-self.start_time<2.0 or ltime0 - self.queue.set_property("max-size-time", ms*1000000) - log("queue delay set to %s, current-level-time=%s", ms, int(self.queue.get_property("current-level-time")/1000/1000)) - - def set_mute(self, mute): - self.volume.set_property('mute', mute) - - def is_muted(self): - return bool(self.volume.get_property("mute")) - - def get_volume(self): - assert VOLUME and self.volume - return self.volume.get_property("volume") - - def set_volume(self, volume): - assert VOLUME and self.volume - assert volume>=0 and volume<=100 - self.volume.set_property('volume', float(volume)/10.0) - def eos(self): debug("eos()") if self.src: @@ -148,43 +137,43 @@ def get_info(self): clt = self.queue.get_property("current-level-time") info["queue.used_pct"] = int(min(QUEUE_TIME, clt)*100.0/QUEUE_TIME) info["queue.overruns"] = self.overruns - if VOLUME and self.volume: - info["mute"] = self.volume.get_property("mute") - info["volume"] = int(100.0*self.volume.get_property("volume")) + info["queue.state"] = self.queue_state return info def add_data(self, data, metadata=None): - debug("sound sink: adding %s bytes to %s", len(data), self.src) - #debug("sound sink: adding %s bytes to %s, metadata: %s, level=%s", len(data), self.src, metadata, int(self.queue.get_property("current-level-time")/1000000)) - if self.src: - buf = gst.Buffer(data) - if metadata: - ts = metadata.get("timestamp") - if ts is not None: - buf.timestamp = ts - d = metadata.get("duration") - if d is not None: - buf.duration = d - #buf.size = size - #buf.timestamp = timestamp - #buf.duration = duration - #buf.offset = offset - #buf.offset_end = offset_end - #buf.set_caps(gst.caps_from_string(caps)) - r = self.src.emit("push-buffer", buf) - if r!=gst.FLOW_OK: - log.error("push-buffer error: %s", r) - self.emit('error', "push-buffer error: %s" % r) - else: - self.buffer_count += 1 - self.byte_count += len(data) - debug("sound sink: new level=%s", int(self.queue.get_property("current-level-time")/1000000)) - - def need_data(self, src_arg, needed): - debug("need_data: %s bytes in %s", needed, src_arg) - - def on_enough_data(self, *args): - debug("on_enough_data(%s)", args) + #debug("sound sink: adding %s bytes to %s, metadata: %s, level=%s", len(data), self.src, metadata, int(self.queue.get_property("current-level-time")/MS_TO_NS)) + if not self.src: + return + buf = gst.Buffer(data) + d = 10*MS_TO_NS + if metadata and False: + ts = metadata.get("timestamp") + if ts is not None: + buf.timestamp = ts + d = metadata.get("duration") + if d is not None: + buf.duration = d + debug("add_data(..) queue_state=%s", self.queue_state) + self.push_buffer(buf) + + def push_buffer(self, buf): + #buf.size = size + #buf.timestamp = timestamp + #buf.duration = duration + #buf.offset = offset + #buf.offset_end = offset_end + #buf.set_caps(gst.caps_from_string(caps)) + r = self.src.emit("push-buffer", buf) + if r!=gst.FLOW_OK: + log.error("push-buffer error: %s", r) + self.emit('error', "push-buffer error: %s" % r) + return False + self.buffer_count += 1 + self.byte_count += len(buf.data) + ltime = int(self.queue.get_property("current-level-time")/MS_TO_NS) + debug("sound sink: pushed %s bytes, new buffer level: %sms", len(buf.data), ltime) + return True + def main(): @@ -219,7 +208,7 @@ def main(): import logging logging.basicConfig(format="%(asctime)s %(message)s") - logging.root.setLevel(logging.INFO) + logging.root.setLevel(logging.DEBUG) f = open(filename, "rb") data = f.read() f.close()