From b97d01640747b0a4d6d982f4049a96822422aa4f Mon Sep 17 00:00:00 2001 From: siyuan0322 Date: Tue, 13 Jun 2023 22:02:13 +0800 Subject: [PATCH 1/2] Allow pass params to gie instance --- coordinator/gscoordinator/coordinator.py | 5 +++- .../gscoordinator/kubernetes_launcher.py | 18 ++++++++++--- coordinator/gscoordinator/launcher.py | 4 ++- coordinator/gscoordinator/local_launcher.py | 8 +++++- .../assembly/src/bin/graphscope/giectl | 25 ++++++++++++++++--- .../graphscope/frontend.vineyard.properties | 2 +- python/graphscope/client/rpc.py | 6 ++++- python/graphscope/client/session.py | 10 +++++--- python/graphscope/proto/message.proto | 1 + 9 files changed, 63 insertions(+), 16 deletions(-) diff --git a/coordinator/gscoordinator/coordinator.py b/coordinator/gscoordinator/coordinator.py index 5c93544ed9c9..52062cb3a8c5 100644 --- a/coordinator/gscoordinator/coordinator.py +++ b/coordinator/gscoordinator/coordinator.py @@ -449,8 +449,11 @@ def _match_frontend_endpoint(pattern, lines): # create instance object_id = request.object_id schema_path = request.schema_path + params = request.params try: - proc = self._launcher.create_interactive_instance(object_id, schema_path) + proc = self._launcher.create_interactive_instance( + object_id, schema_path, params + ) gie_manager = InteractiveQueryManager(object_id) # Put it to object_manager to ensure it could be killed during coordinator cleanup # If coordinator is shutdown by force when creating interactive instance diff --git a/coordinator/gscoordinator/kubernetes_launcher.py b/coordinator/gscoordinator/kubernetes_launcher.py index 80d5b44f6af5..cdb62738bd6b 100644 --- a/coordinator/gscoordinator/kubernetes_launcher.py +++ b/coordinator/gscoordinator/kubernetes_launcher.py @@ -605,7 +605,12 @@ def _allocate_interactive_engine(self, object_id): return self.deploy_interactive_engine(object_id) def _distribute_interactive_process( - self, hosts, object_id: int, schema_path: str, engine_selector: str + self, + hosts, + object_id: int, + schema_path: str, + params: dict, + engine_selector: str, ): """ Args: @@ -617,6 +622,10 @@ def _distribute_interactive_process( env = os.environ.copy() env["GRAPHSCOPE_HOME"] = GRAPHSCOPE_HOME container = self._engine_cluster.interactive_executor_container_name + + params = "\n".join([f"{k}={v}" for k, v in params.items()]) + params = base64.b64encode(params.encode("utf-8")).decode("utf-8") + cmd = [ INTERACTIVE_ENGINE_SCRIPT, "create_gremlin_instance_on_k8s", @@ -630,6 +639,7 @@ def _distribute_interactive_process( str(self._interactive_port + 2), # frontend port self._coordinator_name, engine_selector, + params, ] self._interactive_port += 3 logger.info("Create GIE instance with command: %s", " ".join(cmd)) @@ -648,7 +658,9 @@ def _distribute_interactive_process( ) return process - def create_interactive_instance(self, object_id: int, schema_path: str): + def create_interactive_instance( + self, object_id: int, schema_path: str, params: dict + ): pod_name_list, _, _ = self._allocate_interactive_engine(object_id) if not pod_name_list: raise RuntimeError("Failed to allocate interactive engine") @@ -661,7 +673,7 @@ def create_interactive_instance(self, object_id: int, schema_path: str): ) return self._distribute_interactive_process( - hosts, object_id, schema_path, engine_selector + hosts, object_id, schema_path, params, engine_selector ) def close_interactive_instance(self, object_id): diff --git a/coordinator/gscoordinator/launcher.py b/coordinator/gscoordinator/launcher.py index 23cef60486a0..d2859cdccde5 100644 --- a/coordinator/gscoordinator/launcher.py +++ b/coordinator/gscoordinator/launcher.py @@ -87,7 +87,9 @@ def create_analytical_instance(self): pass @abstractmethod - def create_interactive_instance(self, object_id: int, schema_path: str): + def create_interactive_instance( + self, object_id: int, schema_path: str, params: dict + ): pass @abstractmethod diff --git a/coordinator/gscoordinator/local_launcher.py b/coordinator/gscoordinator/local_launcher.py index b0276c5fb13c..c7e27af2da51 100644 --- a/coordinator/gscoordinator/local_launcher.py +++ b/coordinator/gscoordinator/local_launcher.py @@ -192,7 +192,9 @@ def create_analytical_instance(self): "Analytical engine is listening on %s", self._analytical_engine_endpoint ) - def create_interactive_instance(self, object_id: int, schema_path: str): + def create_interactive_instance( + self, object_id: int, schema_path: str, params: dict + ): try: logger.info("Java version: %s", get_java_version()) except: # noqa: E722 @@ -218,6 +220,9 @@ def create_interactive_instance(self, object_id: int, schema_path: str): else: num_workers = self._num_workers + params = "\n".join([f"{k}={v}" for k, v in params.items()]) + params = base64.b64encode(params.encode("utf-8")).decode("utf-8") + cmd = [ INTERACTIVE_ENGINE_SCRIPT, "create_gremlin_instance_on_local", @@ -229,6 +234,7 @@ def create_interactive_instance(self, object_id: int, schema_path: str): str(self._interactive_port + 1), # executor rpc port str(self._interactive_port + 2 * num_workers), # frontend port self.vineyard_socket, + params, ] logger.info("Create GIE instance with command: %s", " ".join(cmd)) self._interactive_port += 3 diff --git a/interactive_engine/assembly/src/bin/graphscope/giectl b/interactive_engine/assembly/src/bin/graphscope/giectl index d80b4efe4a74..9ca9a46b6e7f 100755 --- a/interactive_engine/assembly/src/bin/graphscope/giectl +++ b/interactive_engine/assembly/src/bin/graphscope/giectl @@ -72,18 +72,23 @@ start_frontend() { declare -r schema_path=$3 declare -r pegasus_hosts=$4 declare -r frontend_port=$5 + declare -r params=$6 + declare -r threads_per_worker=${THREADS_PER_WORKER:-2} # create related directories declare -r log_dir=${GS_LOG}/${object_id} declare -r config_dir=${GRAPHSCOPE_RUNTIME}/config/${object_id} declare -r pid_dir=${GRAPHSCOPE_RUNTIME}/pid/${object_id} + mkdir -p ${log_dir} ${config_dir} ${pid_dir} # make a "current" link unlink ${GS_LOG}/current || true ln -s ${log_dir} ${GS_LOG}/current + decoded_params=`echo -n $params | base64 -d` + declare java_opt="-server -verbose:gc -Xloggc:${log_dir}/frontend.gc.log @@ -113,6 +118,8 @@ start_frontend() { -e "s@FRONTEND_SERVICE_PORT@${frontend_port}@g" \ -e "s@THREADS_PER_WORKER@${threads_per_worker}@g" \ ${GRAPHSCOPE_HOME}/conf/frontend.vineyard.properties > ${config_dir}/frontend.vineyard.properties + echo -e "\n" >> ${config_dir}/frontend.vineyard.properties + echo $decoded_params >> ${config_dir}/frontend.vineyard.properties # frontend service hold a handle client of coordinator java ${java_opt} \ @@ -142,6 +149,8 @@ start_executor() { declare -r server_size=$4 declare -r rpc_port=$5 declare -r network_servers=$6 + declare -r params=$7 + declare -r threads_per_worker=${THREADS_PER_WORKER:-2} declare -r log_dir=${GS_LOG}/${object_id} @@ -157,6 +166,8 @@ start_executor() { unlink ${GS_LOG}/current || true ln -s ${log_dir} ${GS_LOG}/current + decoded_params=`echo -n $params | base64 -d` + # set executor config file sed -e "s@GRAPH_NAME@${object_id}@g" \ -e "s@VINEYARD_OBJECT_ID@${object_id}@g" \ @@ -166,6 +177,8 @@ start_executor() { -e "s@NETWORK_SERVERS@${network_servers}@g" \ -e "s@THREADS_PER_WORKER@${threads_per_worker}@g" \ ${GRAPHSCOPE_HOME}/conf/executor.vineyard.properties > ${config_dir}/executor.$server_id.vineyard.properties + echo -e "\n" >> ${config_dir}/executor.$server_id.vineyard.properties + echo $decoded_params >> ${config_dir}/executor.$server_id.vineyard.properties cp ${GRAPHSCOPE_HOME}/conf/log4rs.yml ${config_dir}/log4rs.yml @@ -201,6 +214,7 @@ create_gremlin_instance_on_local() { declare -r executor_rpc_port=$6 declare -r frontend_port=$7 export VINEYARD_IPC_SOCKET=$8 + declare -r params=$9 declare -r cluster_type="local" declare -r executor_count="1" # local mode only start one executor @@ -228,7 +242,7 @@ create_gremlin_instance_on_local() { pegasus_hosts=${pegasus_hosts:1} start_frontend ${GRAPHSCOPE_RUNTIME} ${object_id} ${schema_path} ${pegasus_hosts} \ - ${frontend_port} + ${frontend_port} ${params} log "FRONTEND_ENDPOINT:127.0.0.1:${frontend_port}" @@ -237,7 +251,7 @@ create_gremlin_instance_on_local() { current_executor_port=$(($executor_port + 2 * $server_id)) current_executor_rpc_port=$(($executor_rpc_port + 2 * $server_id)) start_executor ${GRAPHSCOPE_RUNTIME} ${object_id} ${server_id} ${server_size} ${current_executor_rpc_port} \ - ${network_servers} + ${network_servers} ${params} done } @@ -271,6 +285,7 @@ create_gremlin_instance_on_k8s() { declare -r frontend_port=$8 declare -r coordinator_name=$9 # deployment name of coordinator declare -r engine_selector=${10} + declare -r params=${11} instance_id=${coordinator_name#*-} @@ -289,7 +304,7 @@ create_gremlin_instance_on_k8s() { launch_frontend_cmd="GRAPHSCOPE_HOME=${GRAPHSCOPE_HOME} \ ${GRAPHSCOPE_HOME}/bin/giectl start_frontend \ - ${GRAPHSCOPE_RUNTIME} ${object_id} ${schema_path} ${pegasus_hosts} ${frontend_port}" + ${GRAPHSCOPE_RUNTIME} ${object_id} ${schema_path} ${pegasus_hosts} ${frontend_port} '${params}'" kubectl cp ${schema_path} ${frontend_name}:${schema_path} kubectl exec ${frontend_name} -- /bin/bash -c "${launch_frontend_cmd}" @@ -303,7 +318,9 @@ create_gremlin_instance_on_k8s() { _server_id=0 for pod in $(echo ${pod_hosts}) do - launch_executor_cmd="GRAPHSCOPE_HOME=${GRAPHSCOPE_HOME} ${GRAPHSCOPE_HOME}/bin/giectl start_executor ${GRAPHSCOPE_RUNTIME} ${object_id} ${_server_id} ${server_size} ${executor_rpc_port} ${network_servers}" + launch_executor_cmd="GRAPHSCOPE_HOME=${GRAPHSCOPE_HOME} ${GRAPHSCOPE_HOME}/bin/giectl \ + start_executor ${GRAPHSCOPE_RUNTIME} ${object_id} ${_server_id} ${server_size} \ + ${executor_rpc_port} ${network_servers} '${params}'" # kubectl exec ${pod} -c ${engine_container} -- sudo mkdir -p /var/log/graphscope # kubectl exec ${pod} -c ${engine_container} -- sudo chown -R graphscope:graphscope /var/log/graphscope kubectl exec ${pod} -c ${engine_container} -- /bin/bash -c "${launch_executor_cmd}" diff --git a/interactive_engine/assembly/src/conf/graphscope/frontend.vineyard.properties b/interactive_engine/assembly/src/conf/graphscope/frontend.vineyard.properties index e4d074886b19..7628441d0d90 100644 --- a/interactive_engine/assembly/src/conf/graphscope/frontend.vineyard.properties +++ b/interactive_engine/assembly/src/conf/graphscope/frontend.vineyard.properties @@ -18,4 +18,4 @@ frontend.service.port = FRONTEND_SERVICE_PORT # disable the authentication if username or password is not set #auth.username = default -#auth.password = default \ No newline at end of file +#auth.password = default diff --git a/python/graphscope/client/rpc.py b/python/graphscope/client/rpc.py index 890be1a2c51d..ed259659c0a8 100644 --- a/python/graphscope/client/rpc.py +++ b/python/graphscope/client/rpc.py @@ -205,10 +205,14 @@ def create_analytical_instance(self): response = self._stub.CreateAnalyticalInstance(request) return json.loads(response.engine_config), response.host_names - def create_interactive_instance(self, object_id, schema_path): + def create_interactive_instance(self, object_id, schema_path, params=None): request = message_pb2.CreateInteractiveInstanceRequest( session_id=self._session_id, object_id=object_id, schema_path=schema_path ) + if params is not None: + for k, v in params.items(): + request.params[str(k)] = str(v) + response = self._stub.CreateInteractiveInstance(request) return response.gremlin_endpoint diff --git a/python/graphscope/client/session.py b/python/graphscope/client/session.py index 83eb68dd7d3c..d63fe0db7417 100755 --- a/python/graphscope/client/session.py +++ b/python/graphscope/client/session.py @@ -1304,7 +1304,7 @@ def _run_on_local(self): self._config_params["port"] = None self._config_params["vineyard_socket"] = "" - def gremlin(self, graph): + def gremlin(self, graph, params=None): """Get an interactive engine handler to execute gremlin queries. It will return an instance of :class:`graphscope.interactive.query.InteractiveQuery`, @@ -1343,7 +1343,9 @@ def gremlin(self, graph): object_id = graph.vineyard_id schema_path = graph.schema_path - endpoint = self._grpc_client.create_interactive_instance(object_id, schema_path) + endpoint = self._grpc_client.create_interactive_instance( + object_id, schema_path, params + ) interactive_query = InteractiveQuery(graph, endpoint) self._interactive_instance_dict[object_id] = interactive_query graph._attach_interactive_instance(interactive_query) @@ -1728,7 +1730,7 @@ def g( ) -def gremlin(graph): +def gremlin(graph, params=None): """Create an interactive engine and get the handler to execute the gremlin queries. See params detail in :meth:`graphscope.Session.gremlin` @@ -1749,7 +1751,7 @@ def gremlin(graph): assert ( graph._session is not None ), "The graph object is invalid" # pylint: disable=protected-access - return graph._session.gremlin(graph) # pylint: disable=protected-access + return graph._session.gremlin(graph, params) # pylint: disable=protected-access def graphlearn(graph, nodes=None, edges=None, gen_labels=None): diff --git a/python/graphscope/proto/message.proto b/python/graphscope/proto/message.proto index c03266eb6eb4..efc63e4f1065 100644 --- a/python/graphscope/proto/message.proto +++ b/python/graphscope/proto/message.proto @@ -184,6 +184,7 @@ message CreateInteractiveInstanceRequest { string session_id = 1; int64 object_id = 2; string schema_path = 3; + map params = 4; }; message CreateInteractiveInstanceResponse { From 7a5023f2c5467890189f4afd47073b45d237b575 Mon Sep 17 00:00:00 2001 From: siyuan0322 Date: Wed, 14 Jun 2023 11:13:08 +0800 Subject: [PATCH 2/2] update doc --- docs/interactive_engine/getting_started.md | 9 +++++++++ python/graphscope/client/session.py | 1 + 2 files changed, 10 insertions(+) diff --git a/docs/interactive_engine/getting_started.md b/docs/interactive_engine/getting_started.md index 81159bb24d64..453abe672fd0 100644 --- a/docs/interactive_engine/getting_started.md +++ b/docs/interactive_engine/getting_started.md @@ -89,6 +89,15 @@ You may see something like: The number 6 is printed, which is the number of vertices in modern graph. +### Customize Configurations for GIE instance + +You could pass additional key-value pairs to customize the startup configuration of GIE, for example: + +```python +# Set the timeout value to 10 min +g = gs.gremlin(graph, params={'pegasus.timeout': 600000}) +``` + ## What's the Next As shown in the above example, it is very easy to use GraphScope to interactively query a graph using the gremlin query language on your local machine. You may find more tutorials [here](https://tinkerpop.apache.org/docs/current/tutorials/getting-started/) for the basic Gremlin usage, in which most read-only queries can be seamlessly executed with the above `g.execute()` function. diff --git a/python/graphscope/client/session.py b/python/graphscope/client/session.py index d63fe0db7417..8b69c8ccfdf4 100755 --- a/python/graphscope/client/session.py +++ b/python/graphscope/client/session.py @@ -1319,6 +1319,7 @@ def gremlin(self, graph, params=None): Args: graph (:class:`graphscope.framework.graph.GraphDAGNode`): The graph to create interactive instance. + params: A dict consists of configurations of GIE instance. Raises: InvalidArgumentError: