From b595562d9c0746be7fe991d221fc83998c856e4f Mon Sep 17 00:00:00 2001 From: totaam Date: Tue, 9 May 2023 21:47:36 +0700 Subject: [PATCH] #3750 add pure video mode if the client supports h264 --- xpra/codecs/gstreamer/capture.py | 58 ++++++++++++++- xpra/platform/posix/fd_portal_shadow.py | 98 +++++++++++++++++++------ 2 files changed, 129 insertions(+), 27 deletions(-) diff --git a/xpra/codecs/gstreamer/capture.py b/xpra/codecs/gstreamer/capture.py index 988ebbf13d..4c42b2d7ce 100755 --- a/xpra/codecs/gstreamer/capture.py +++ b/xpra/codecs/gstreamer/capture.py @@ -7,7 +7,7 @@ from gi.repository import GObject # @UnresolvedImport from xpra.gst_common import import_gst, GST_FLOW_OK -from xpra.gtk_common.gobject_util import one_arg_signal +from xpra.gtk_common.gobject_util import n_arg_signal from xpra.gst_pipeline import Pipeline from xpra.codecs.gstreamer.codec_common import ( get_version, get_type, get_info, @@ -28,7 +28,7 @@ class Capture(Pipeline): Uses a GStreamer pipeline to capture the screen """ __gsignals__ = Pipeline.__generic_signals__.copy() - __gsignals__["new-image"] = one_arg_signal + __gsignals__["new-image"] = n_arg_signal(3) def __init__(self, element : str="ximagesrc", pixel_format : str="BGRX", width : int=0, height : int=0): @@ -83,7 +83,7 @@ def on_new_sample(self, _bus): except Full: log("image queue is already full") else: - self.emit("new-image", self.frames) + self.emit("new-image", self.frames, self.pixel_format, image) return GST_FLOW_OK def on_new_preroll(self, _appsink): @@ -105,6 +105,58 @@ def refresh(self): def clean(self): self.stop() +GObject.type_register(Capture) + + +class CaptureAndEncode(Capture): + """ + Uses a GStreamer pipeline to capture the screen + and encode it to a video stream + """ + + def create_pipeline(self, + capture_element:str="ximagesrc", + encode_element:str="x264enc pass=4 speed-preset=1 tune=4 byte-stream=true quantizer=51 qp-max=51 qp-min=50"): + #encode_element="x264enc threads=8 pass=4 speed-preset=1 tune=zerolatency byte-stream=true quantizer=51 qp-max=51 qp-min=50"): + #encode_element="vp8enc deadline=1 min-quantizer=60 max-quantizer=63 cq-level=61"): + #encode_element="vp9enc deadline=1 error-resilient=1 min-quantizer=60 end-usage=2"): + elements = [ + capture_element, #ie: ximagesrc or pipewiresrc + #"videorate", + #"video/x-raw,framerate=20/1", + #"queue leaky=2 max-size-buffers=1", + "videoconvert", + encode_element, + "appsink name=sink emit-signals=true max-buffers=1 drop=false sync=false async=true qos=true", + ] + if not self.setup_pipeline_and_bus(elements): + raise RuntimeError("failed to setup gstreamer pipeline") + self.sink = self.pipeline.get_by_name("sink") + def sh(sig, handler): + self.element_connect(self.sink, sig, handler) + sh("new-sample", self.on_new_sample) + sh("new-preroll", self.on_new_preroll) + + def on_new_sample(self, _bus): + sample = self.sink.emit("pull-sample") + buf = sample.get_buffer() + size = buf.get_size() + log("on_new_sample size=%s", size) + if size: + data = buf.extract_dup(0, size) + self.frames += 1 + self.emit("new-image", self.frames, "h264", data) + return GST_FLOW_OK + + def on_new_preroll(self, _appsink): + log("new-preroll") + return GST_FLOW_OK + + def refresh(self): + return True + + def clean(self): + self.stop() GObject.type_register(Capture) diff --git a/xpra/platform/posix/fd_portal_shadow.py b/xpra/platform/posix/fd_portal_shadow.py index 90f62ce58c..652182b539 100755 --- a/xpra/platform/posix/fd_portal_shadow.py +++ b/xpra/platform/posix/fd_portal_shadow.py @@ -7,13 +7,16 @@ import os import random +from time import monotonic from dbus.types import UInt32 from dbus.types import Dictionary from xpra.exit_codes import ExitCode -from xpra.util import typedict, ConnectionMessage +from xpra.util import typedict, envbool, ConnectionMessage +from xpra.net.compression import Compressed from xpra.dbus.helper import dbus_to_native -from xpra.codecs.gstreamer.capture import Capture +from xpra.codecs.gstreamer.capture import Capture, CaptureAndEncode +from xpra.codecs.image_wrapper import ImageWrapper from xpra.server.shadow.root_window_model import RootWindowModel from xpra.server.shadow.gtk_shadow_server_base import GTKShadowServerBase from xpra.platform.posix.fd_portal import ( @@ -29,6 +32,8 @@ session_counter : int = random.randint(0, 2**24) +VIDEO_MODE = envbool("XPRA_PIPEWIRE_VIDEO_MODE", True) + class PipewireWindowModel(RootWindowModel): __slots__ = ("pipewire_id", "pipewire_props") @@ -57,18 +62,8 @@ def notify_new_user(self, ss): def last_client_exited(self): super().last_client_exited() - c = self.capture - if c: - self.capture = None - c.stop() - if self.session: - #https://gitlab.gnome.org/-/snippets/1122 - log(f"trying to close the session {self.session}") - try: - self.session.Close(dbus_interface=PORTAL_SESSION_INTERFACE) - except Exception as e: - log(f"ignoring error closing session {self.session}: {e}") - self.session = None + self.stop_capture() + self.stop_session() def client_auth_error(self, message): self.disconnect_authenticating_client(ConnectionMessage.AUTHENTICATION_FAILED, message) @@ -113,6 +108,18 @@ def cleanup(self): self.portal_interface = None + def stop_session(self): + s = self.session + if not s: + return + self.session = None + #https://gitlab.gnome.org/-/snippets/1122 + log(f"trying to close the session {s}") + try: + s.Close(dbus_interface=PORTAL_SESSION_INTERFACE) + except Exception as e: + log(f"ignoring error closing session {s}: {e}") + def create_session(self): global session_counter session_counter += 1 @@ -231,8 +238,22 @@ def on_start_response(self, response, results): log.warn(" keyboard and pointer events cannot be forwarded") + def create_capture_pipeline(self, fd : int, node_id : int, w : int, h : int): + el = f"pipewiresrc fd={fd} path={node_id} do-timestamp=true" + c = self.authenticating_client + if VIDEO_MODE: + encs = getattr(c, "core_encodings", ()) + log(f"core_encodings({c})={encs}") + if "h264" in encs: + return CaptureAndEncode(el, pixel_format="BGRX", width=w, height=h) + return Capture(el, pixel_format="BGRX", width=w, height=h) + def start_pipewire_capture(self, node_id, props): log(f"start_pipewire_capture({node_id}, {props})") + x, y = props.inttupleget("position", (0, 0)) + w, h = props.inttupleget("size", (0, 0)) + if w<=0 or h<=0: + raise ValueError(f"invalid dimensions: {w}x{h}") empty_dict = Dictionary(signature="sv") fd_object = self.portal_interface.OpenPipeWireRemote( self.session_handle, @@ -240,10 +261,7 @@ def start_pipewire_capture(self, node_id, props): dbus_interface=SCREENCAST_IFACE, ) fd = fd_object.take() - x, y = props.inttupleget("position", (0, 0)) - w, h = props.inttupleget("size", (0, 0)) - el = f"pipewiresrc fd={fd} path={node_id}" - self.capture = Capture(el, pixel_format="BGRX", width=w, height=h) + self.capture = self.create_capture_pipeline(fd, node_id, w, h) self.capture.node_id = node_id self.capture.connect("state-changed", self.capture_state_changed) self.capture.connect("error", self.capture_error) @@ -258,18 +276,50 @@ def start_pipewire_capture(self, node_id, props): #must be called from the main thread: log(f"new model: {model}") self.do_add_new_window_common(node_id, model) - - def capture_new_image(self, capture, frame): - log(f"capture_new_image({capture}, {frame})") - #FIXME: only match the window that just got refreshed! - for w in tuple(self._id_to_window.values()): - self.refresh_window(w) + self._send_new_window_packet(model) + + def capture_new_image(self, capture, frame, coding, data): + wid = capture.node_id + model = self._id_to_window.get(wid) + log(f"capture_new_image({capture}, {frame}, {coding}, {type(data)}) model({wid})={model}") + if not model: + log.error(f"Error: cannot find window model for node {wid}") + return + if isinstance(data, ImageWrapper): + self.refresh_window(model) + return + if not isinstance(data, bytes): + log.warn(f"Warning: unexpected image datatype: {type(data)}") + return + #this is a frame from a compressed stream, + #send it to all the window sources for this window: + cdata = Compressed(coding, data) + client_options = { + "frame" : frame, + } + options = {} + x = y = 0 + w, h = model.geometry[2:4] + outstride = 0 + damage_time = process_damage_time = monotonic() + for ss in tuple(self._server_sources.values()): + if not hasattr(ss, "get_window_source"): + #client is not showing any windows + continue + ws = ss.get_window_source(wid) + if not ws: + #client not showing this window + continue + log(f"sending {len(data)} bytes packet of {coding} stream to {ws} of {ss}") + packet = ws.make_draw_packet(x, y, w, h, coding, cdata, outstride, client_options, options) + ws.queue_damage_packet(packet, damage_time, process_damage_time, options) def capture_error(self, capture, message): log(f"capture_error({capture}, {message})") log.error("Error capturing screen:") log.estr(message) + #perhaps we should "lose" the window here? def capture_state_changed(self, capture, state): log(f"screencast capture state: {state}")