Skip to content

Commit

Permalink
#3750 add pure video mode if the client supports h264
Browse files Browse the repository at this point in the history
  • Loading branch information
totaam committed May 9, 2023
1 parent 687a657 commit b595562
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 27 deletions.
58 changes: 55 additions & 3 deletions xpra/codecs/gstreamer/capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from gi.repository import GObject # @UnresolvedImport

from xpra.gst_common import import_gst, GST_FLOW_OK
from xpra.gtk_common.gobject_util import one_arg_signal
from xpra.gtk_common.gobject_util import n_arg_signal
from xpra.gst_pipeline import Pipeline
from xpra.codecs.gstreamer.codec_common import (
get_version, get_type, get_info,
Expand All @@ -28,7 +28,7 @@ class Capture(Pipeline):
Uses a GStreamer pipeline to capture the screen
"""
__gsignals__ = Pipeline.__generic_signals__.copy()
__gsignals__["new-image"] = one_arg_signal
__gsignals__["new-image"] = n_arg_signal(3)

def __init__(self, element : str="ximagesrc", pixel_format : str="BGRX",
width : int=0, height : int=0):
Expand Down Expand Up @@ -83,7 +83,7 @@ def on_new_sample(self, _bus):
except Full:
log("image queue is already full")
else:
self.emit("new-image", self.frames)
self.emit("new-image", self.frames, self.pixel_format, image)
return GST_FLOW_OK

def on_new_preroll(self, _appsink):
Expand All @@ -105,6 +105,58 @@ def refresh(self):
def clean(self):
self.stop()

GObject.type_register(Capture)


class CaptureAndEncode(Capture):
"""
Uses a GStreamer pipeline to capture the screen
and encode it to a video stream
"""

def create_pipeline(self,
capture_element:str="ximagesrc",
encode_element:str="x264enc pass=4 speed-preset=1 tune=4 byte-stream=true quantizer=51 qp-max=51 qp-min=50"):
#encode_element="x264enc threads=8 pass=4 speed-preset=1 tune=zerolatency byte-stream=true quantizer=51 qp-max=51 qp-min=50"):
#encode_element="vp8enc deadline=1 min-quantizer=60 max-quantizer=63 cq-level=61"):
#encode_element="vp9enc deadline=1 error-resilient=1 min-quantizer=60 end-usage=2"):
elements = [
capture_element, #ie: ximagesrc or pipewiresrc
#"videorate",
#"video/x-raw,framerate=20/1",
#"queue leaky=2 max-size-buffers=1",
"videoconvert",
encode_element,
"appsink name=sink emit-signals=true max-buffers=1 drop=false sync=false async=true qos=true",
]
if not self.setup_pipeline_and_bus(elements):
raise RuntimeError("failed to setup gstreamer pipeline")
self.sink = self.pipeline.get_by_name("sink")
def sh(sig, handler):
self.element_connect(self.sink, sig, handler)
sh("new-sample", self.on_new_sample)
sh("new-preroll", self.on_new_preroll)

def on_new_sample(self, _bus):
sample = self.sink.emit("pull-sample")
buf = sample.get_buffer()
size = buf.get_size()
log("on_new_sample size=%s", size)
if size:
data = buf.extract_dup(0, size)
self.frames += 1
self.emit("new-image", self.frames, "h264", data)
return GST_FLOW_OK

def on_new_preroll(self, _appsink):
log("new-preroll")
return GST_FLOW_OK

def refresh(self):
return True

def clean(self):
self.stop()

GObject.type_register(Capture)

Expand Down
98 changes: 74 additions & 24 deletions xpra/platform/posix/fd_portal_shadow.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@

import os
import random
from time import monotonic
from dbus.types import UInt32
from dbus.types import Dictionary

from xpra.exit_codes import ExitCode
from xpra.util import typedict, ConnectionMessage
from xpra.util import typedict, envbool, ConnectionMessage
from xpra.net.compression import Compressed
from xpra.dbus.helper import dbus_to_native
from xpra.codecs.gstreamer.capture import Capture
from xpra.codecs.gstreamer.capture import Capture, CaptureAndEncode
from xpra.codecs.image_wrapper import ImageWrapper
from xpra.server.shadow.root_window_model import RootWindowModel
from xpra.server.shadow.gtk_shadow_server_base import GTKShadowServerBase
from xpra.platform.posix.fd_portal import (
Expand All @@ -29,6 +32,8 @@

session_counter : int = random.randint(0, 2**24)

VIDEO_MODE = envbool("XPRA_PIPEWIRE_VIDEO_MODE", True)


class PipewireWindowModel(RootWindowModel):
__slots__ = ("pipewire_id", "pipewire_props")
Expand Down Expand Up @@ -57,18 +62,8 @@ def notify_new_user(self, ss):

def last_client_exited(self):
super().last_client_exited()
c = self.capture
if c:
self.capture = None
c.stop()
if self.session:
#https://gitlab.gnome.org/-/snippets/1122
log(f"trying to close the session {self.session}")
try:
self.session.Close(dbus_interface=PORTAL_SESSION_INTERFACE)
except Exception as e:
log(f"ignoring error closing session {self.session}: {e}")
self.session = None
self.stop_capture()
self.stop_session()

def client_auth_error(self, message):
self.disconnect_authenticating_client(ConnectionMessage.AUTHENTICATION_FAILED, message)
Expand Down Expand Up @@ -113,6 +108,18 @@ def cleanup(self):
self.portal_interface = None


def stop_session(self):
s = self.session
if not s:
return
self.session = None
#https://gitlab.gnome.org/-/snippets/1122
log(f"trying to close the session {s}")
try:
s.Close(dbus_interface=PORTAL_SESSION_INTERFACE)
except Exception as e:
log(f"ignoring error closing session {s}: {e}")

def create_session(self):
global session_counter
session_counter += 1
Expand Down Expand Up @@ -231,19 +238,30 @@ def on_start_response(self, response, results):
log.warn(" keyboard and pointer events cannot be forwarded")


def create_capture_pipeline(self, fd : int, node_id : int, w : int, h : int):
el = f"pipewiresrc fd={fd} path={node_id} do-timestamp=true"
c = self.authenticating_client
if VIDEO_MODE:
encs = getattr(c, "core_encodings", ())
log(f"core_encodings({c})={encs}")
if "h264" in encs:
return CaptureAndEncode(el, pixel_format="BGRX", width=w, height=h)
return Capture(el, pixel_format="BGRX", width=w, height=h)

def start_pipewire_capture(self, node_id, props):
log(f"start_pipewire_capture({node_id}, {props})")
x, y = props.inttupleget("position", (0, 0))
w, h = props.inttupleget("size", (0, 0))
if w<=0 or h<=0:
raise ValueError(f"invalid dimensions: {w}x{h}")
empty_dict = Dictionary(signature="sv")
fd_object = self.portal_interface.OpenPipeWireRemote(
self.session_handle,
empty_dict,
dbus_interface=SCREENCAST_IFACE,
)
fd = fd_object.take()
x, y = props.inttupleget("position", (0, 0))
w, h = props.inttupleget("size", (0, 0))
el = f"pipewiresrc fd={fd} path={node_id}"
self.capture = Capture(el, pixel_format="BGRX", width=w, height=h)
self.capture = self.create_capture_pipeline(fd, node_id, w, h)
self.capture.node_id = node_id
self.capture.connect("state-changed", self.capture_state_changed)
self.capture.connect("error", self.capture_error)
Expand All @@ -258,18 +276,50 @@ def start_pipewire_capture(self, node_id, props):
#must be called from the main thread:
log(f"new model: {model}")
self.do_add_new_window_common(node_id, model)

def capture_new_image(self, capture, frame):
log(f"capture_new_image({capture}, {frame})")
#FIXME: only match the window that just got refreshed!
for w in tuple(self._id_to_window.values()):
self.refresh_window(w)
self._send_new_window_packet(model)

def capture_new_image(self, capture, frame, coding, data):
wid = capture.node_id
model = self._id_to_window.get(wid)
log(f"capture_new_image({capture}, {frame}, {coding}, {type(data)}) model({wid})={model}")
if not model:
log.error(f"Error: cannot find window model for node {wid}")
return
if isinstance(data, ImageWrapper):
self.refresh_window(model)
return
if not isinstance(data, bytes):
log.warn(f"Warning: unexpected image datatype: {type(data)}")
return
#this is a frame from a compressed stream,
#send it to all the window sources for this window:
cdata = Compressed(coding, data)
client_options = {
"frame" : frame,
}
options = {}
x = y = 0
w, h = model.geometry[2:4]
outstride = 0
damage_time = process_damage_time = monotonic()
for ss in tuple(self._server_sources.values()):
if not hasattr(ss, "get_window_source"):
#client is not showing any windows
continue
ws = ss.get_window_source(wid)
if not ws:
#client not showing this window
continue
log(f"sending {len(data)} bytes packet of {coding} stream to {ws} of {ss}")
packet = ws.make_draw_packet(x, y, w, h, coding, cdata, outstride, client_options, options)
ws.queue_damage_packet(packet, damage_time, process_damage_time, options)


def capture_error(self, capture, message):
log(f"capture_error({capture}, {message})")
log.error("Error capturing screen:")
log.estr(message)
#perhaps we should "lose" the window here?

def capture_state_changed(self, capture, state):
log(f"screencast capture state: {state}")

0 comments on commit b595562

Please sign in to comment.