Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow pass params to gie instance #2885

Merged
merged 2 commits into from
Jun 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion coordinator/gscoordinator/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,8 +449,11 @@ def _match_frontend_endpoint(pattern, lines):
# create instance
object_id = request.object_id
schema_path = request.schema_path
params = request.params
try:
proc = self._launcher.create_interactive_instance(object_id, schema_path)
proc = self._launcher.create_interactive_instance(
object_id, schema_path, params
)
gie_manager = InteractiveQueryManager(object_id)
# Put it to object_manager to ensure it could be killed during coordinator cleanup
# If coordinator is shutdown by force when creating interactive instance
Expand Down
18 changes: 15 additions & 3 deletions coordinator/gscoordinator/kubernetes_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,12 @@ def _allocate_interactive_engine(self, object_id):
return self.deploy_interactive_engine(object_id)

def _distribute_interactive_process(
self, hosts, object_id: int, schema_path: str, engine_selector: str
self,
hosts,
object_id: int,
schema_path: str,
params: dict,
engine_selector: str,
):
"""
Args:
Expand All @@ -617,6 +622,10 @@ def _distribute_interactive_process(
env = os.environ.copy()
env["GRAPHSCOPE_HOME"] = GRAPHSCOPE_HOME
container = self._engine_cluster.interactive_executor_container_name

params = "\n".join([f"{k}={v}" for k, v in params.items()])
params = base64.b64encode(params.encode("utf-8")).decode("utf-8")

cmd = [
INTERACTIVE_ENGINE_SCRIPT,
"create_gremlin_instance_on_k8s",
Expand All @@ -630,6 +639,7 @@ def _distribute_interactive_process(
str(self._interactive_port + 2), # frontend port
self._coordinator_name,
engine_selector,
params,
]
self._interactive_port += 3
logger.info("Create GIE instance with command: %s", " ".join(cmd))
Expand All @@ -648,7 +658,9 @@ def _distribute_interactive_process(
)
return process

def create_interactive_instance(self, object_id: int, schema_path: str):
def create_interactive_instance(
self, object_id: int, schema_path: str, params: dict
):
pod_name_list, _, _ = self._allocate_interactive_engine(object_id)
if not pod_name_list:
raise RuntimeError("Failed to allocate interactive engine")
Expand All @@ -661,7 +673,7 @@ def create_interactive_instance(self, object_id: int, schema_path: str):
)

return self._distribute_interactive_process(
hosts, object_id, schema_path, engine_selector
hosts, object_id, schema_path, params, engine_selector
)

def close_interactive_instance(self, object_id):
Expand Down
4 changes: 3 additions & 1 deletion coordinator/gscoordinator/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ def create_analytical_instance(self):
pass

@abstractmethod
def create_interactive_instance(self, object_id: int, schema_path: str):
def create_interactive_instance(
self, object_id: int, schema_path: str, params: dict
):
pass

@abstractmethod
Expand Down
8 changes: 7 additions & 1 deletion coordinator/gscoordinator/local_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,9 @@ def create_analytical_instance(self):
"Analytical engine is listening on %s", self._analytical_engine_endpoint
)

def create_interactive_instance(self, object_id: int, schema_path: str):
def create_interactive_instance(
self, object_id: int, schema_path: str, params: dict
):
try:
logger.info("Java version: %s", get_java_version())
except: # noqa: E722
Expand All @@ -218,6 +220,9 @@ def create_interactive_instance(self, object_id: int, schema_path: str):
else:
num_workers = self._num_workers

params = "\n".join([f"{k}={v}" for k, v in params.items()])
params = base64.b64encode(params.encode("utf-8")).decode("utf-8")

cmd = [
INTERACTIVE_ENGINE_SCRIPT,
"create_gremlin_instance_on_local",
Expand All @@ -229,6 +234,7 @@ def create_interactive_instance(self, object_id: int, schema_path: str):
str(self._interactive_port + 1), # executor rpc port
str(self._interactive_port + 2 * num_workers), # frontend port
self.vineyard_socket,
params,
]
logger.info("Create GIE instance with command: %s", " ".join(cmd))
self._interactive_port += 3
Expand Down
9 changes: 9 additions & 0 deletions docs/interactive_engine/getting_started.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,15 @@ You may see something like:

The number 6 is printed, which is the number of vertices in modern graph.

### Customize Configurations for GIE instance

You could pass additional key-value pairs to customize the startup configuration of GIE, for example:

```python
# Set the timeout value to 10 min
g = gs.gremlin(graph, params={'pegasus.timeout': 600000})
```

## What's the Next
As shown in the above example, it is very easy to use GraphScope to interactively query a graph using the gremlin query language on your local machine. You may find more tutorials [here](https://tinkerpop.apache.org/docs/current/tutorials/getting-started/) for the basic Gremlin usage, in which most read-only queries can be seamlessly executed with the above `g.execute()` function.

Expand Down
25 changes: 21 additions & 4 deletions interactive_engine/assembly/src/bin/graphscope/giectl
Original file line number Diff line number Diff line change
Expand Up @@ -72,18 +72,23 @@ start_frontend() {
declare -r schema_path=$3
declare -r pegasus_hosts=$4
declare -r frontend_port=$5
declare -r params=$6

declare -r threads_per_worker=${THREADS_PER_WORKER:-2}

# create related directories
declare -r log_dir=${GS_LOG}/${object_id}
declare -r config_dir=${GRAPHSCOPE_RUNTIME}/config/${object_id}
declare -r pid_dir=${GRAPHSCOPE_RUNTIME}/pid/${object_id}

mkdir -p ${log_dir} ${config_dir} ${pid_dir}

# make a "current" link
unlink ${GS_LOG}/current || true
ln -s ${log_dir} ${GS_LOG}/current

decoded_params=`echo -n $params | base64 -d`

declare java_opt="-server
-verbose:gc
-Xloggc:${log_dir}/frontend.gc.log
Expand Down Expand Up @@ -113,6 +118,8 @@ start_frontend() {
-e "s@FRONTEND_SERVICE_PORT@${frontend_port}@g" \
-e "s@THREADS_PER_WORKER@${threads_per_worker}@g" \
${GRAPHSCOPE_HOME}/conf/frontend.vineyard.properties > ${config_dir}/frontend.vineyard.properties
echo -e "\n" >> ${config_dir}/frontend.vineyard.properties
echo $decoded_params >> ${config_dir}/frontend.vineyard.properties

# frontend service hold a handle client of coordinator
java ${java_opt} \
Expand Down Expand Up @@ -142,6 +149,8 @@ start_executor() {
declare -r server_size=$4
declare -r rpc_port=$5
declare -r network_servers=$6
declare -r params=$7

declare -r threads_per_worker=${THREADS_PER_WORKER:-2}

declare -r log_dir=${GS_LOG}/${object_id}
Expand All @@ -157,6 +166,8 @@ start_executor() {
unlink ${GS_LOG}/current || true
ln -s ${log_dir} ${GS_LOG}/current

decoded_params=`echo -n $params | base64 -d`

# set executor config file
sed -e "s@GRAPH_NAME@${object_id}@g" \
-e "s@VINEYARD_OBJECT_ID@${object_id}@g" \
Expand All @@ -166,6 +177,8 @@ start_executor() {
-e "s@NETWORK_SERVERS@${network_servers}@g" \
-e "s@THREADS_PER_WORKER@${threads_per_worker}@g" \
${GRAPHSCOPE_HOME}/conf/executor.vineyard.properties > ${config_dir}/executor.$server_id.vineyard.properties
echo -e "\n" >> ${config_dir}/executor.$server_id.vineyard.properties
echo $decoded_params >> ${config_dir}/executor.$server_id.vineyard.properties

cp ${GRAPHSCOPE_HOME}/conf/log4rs.yml ${config_dir}/log4rs.yml

Expand Down Expand Up @@ -201,6 +214,7 @@ create_gremlin_instance_on_local() {
declare -r executor_rpc_port=$6
declare -r frontend_port=$7
export VINEYARD_IPC_SOCKET=$8
declare -r params=$9

declare -r cluster_type="local"
declare -r executor_count="1" # local mode only start one executor
Expand Down Expand Up @@ -228,7 +242,7 @@ create_gremlin_instance_on_local() {
pegasus_hosts=${pegasus_hosts:1}

start_frontend ${GRAPHSCOPE_RUNTIME} ${object_id} ${schema_path} ${pegasus_hosts} \
${frontend_port}
${frontend_port} ${params}

log "FRONTEND_ENDPOINT:127.0.0.1:${frontend_port}"

Expand All @@ -237,7 +251,7 @@ create_gremlin_instance_on_local() {
current_executor_port=$(($executor_port + 2 * $server_id))
current_executor_rpc_port=$(($executor_rpc_port + 2 * $server_id))
start_executor ${GRAPHSCOPE_RUNTIME} ${object_id} ${server_id} ${server_size} ${current_executor_rpc_port} \
${network_servers}
${network_servers} ${params}
done
}

Expand Down Expand Up @@ -271,6 +285,7 @@ create_gremlin_instance_on_k8s() {
declare -r frontend_port=$8
declare -r coordinator_name=$9 # deployment name of coordinator
declare -r engine_selector=${10}
declare -r params=${11}

instance_id=${coordinator_name#*-}

Expand All @@ -289,7 +304,7 @@ create_gremlin_instance_on_k8s() {

launch_frontend_cmd="GRAPHSCOPE_HOME=${GRAPHSCOPE_HOME} \
${GRAPHSCOPE_HOME}/bin/giectl start_frontend \
${GRAPHSCOPE_RUNTIME} ${object_id} ${schema_path} ${pegasus_hosts} ${frontend_port}"
${GRAPHSCOPE_RUNTIME} ${object_id} ${schema_path} ${pegasus_hosts} ${frontend_port} '${params}'"
kubectl cp ${schema_path} ${frontend_name}:${schema_path}

kubectl exec ${frontend_name} -- /bin/bash -c "${launch_frontend_cmd}"
Expand All @@ -303,7 +318,9 @@ create_gremlin_instance_on_k8s() {
_server_id=0
for pod in $(echo ${pod_hosts})
do
launch_executor_cmd="GRAPHSCOPE_HOME=${GRAPHSCOPE_HOME} ${GRAPHSCOPE_HOME}/bin/giectl start_executor ${GRAPHSCOPE_RUNTIME} ${object_id} ${_server_id} ${server_size} ${executor_rpc_port} ${network_servers}"
launch_executor_cmd="GRAPHSCOPE_HOME=${GRAPHSCOPE_HOME} ${GRAPHSCOPE_HOME}/bin/giectl \
start_executor ${GRAPHSCOPE_RUNTIME} ${object_id} ${_server_id} ${server_size} \
${executor_rpc_port} ${network_servers} '${params}'"
# kubectl exec ${pod} -c ${engine_container} -- sudo mkdir -p /var/log/graphscope
# kubectl exec ${pod} -c ${engine_container} -- sudo chown -R graphscope:graphscope /var/log/graphscope
kubectl exec ${pod} -c ${engine_container} -- /bin/bash -c "${launch_executor_cmd}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@ frontend.service.port = FRONTEND_SERVICE_PORT

# disable the authentication if username or password is not set
#auth.username = default
#auth.password = default
#auth.password = default
6 changes: 5 additions & 1 deletion python/graphscope/client/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,10 +205,14 @@ def create_analytical_instance(self):
response = self._stub.CreateAnalyticalInstance(request)
return json.loads(response.engine_config), response.host_names

def create_interactive_instance(self, object_id, schema_path):
def create_interactive_instance(self, object_id, schema_path, params=None):
request = message_pb2.CreateInteractiveInstanceRequest(
session_id=self._session_id, object_id=object_id, schema_path=schema_path
)
if params is not None:
for k, v in params.items():
request.params[str(k)] = str(v)

response = self._stub.CreateInteractiveInstance(request)
return response.gremlin_endpoint

Expand Down
11 changes: 7 additions & 4 deletions python/graphscope/client/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -1304,7 +1304,7 @@ def _run_on_local(self):
self._config_params["port"] = None
self._config_params["vineyard_socket"] = ""

def gremlin(self, graph):
def gremlin(self, graph, params=None):
"""Get an interactive engine handler to execute gremlin queries.

It will return an instance of :class:`graphscope.interactive.query.InteractiveQuery`,
Expand All @@ -1319,6 +1319,7 @@ def gremlin(self, graph):
Args:
graph (:class:`graphscope.framework.graph.GraphDAGNode`):
The graph to create interactive instance.
params: A dict consists of configurations of GIE instance.

Raises:
InvalidArgumentError:
Expand All @@ -1343,7 +1344,9 @@ def gremlin(self, graph):

object_id = graph.vineyard_id
schema_path = graph.schema_path
endpoint = self._grpc_client.create_interactive_instance(object_id, schema_path)
endpoint = self._grpc_client.create_interactive_instance(
object_id, schema_path, params
)
interactive_query = InteractiveQuery(graph, endpoint)
self._interactive_instance_dict[object_id] = interactive_query
graph._attach_interactive_instance(interactive_query)
Expand Down Expand Up @@ -1728,7 +1731,7 @@ def g(
)


def gremlin(graph):
def gremlin(graph, params=None):
"""Create an interactive engine and get the handler to execute the gremlin queries.

See params detail in :meth:`graphscope.Session.gremlin`
Expand All @@ -1749,7 +1752,7 @@ def gremlin(graph):
assert (
graph._session is not None
), "The graph object is invalid" # pylint: disable=protected-access
return graph._session.gremlin(graph) # pylint: disable=protected-access
return graph._session.gremlin(graph, params) # pylint: disable=protected-access


def graphlearn(graph, nodes=None, edges=None, gen_labels=None):
Expand Down
1 change: 1 addition & 0 deletions python/graphscope/proto/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ message CreateInteractiveInstanceRequest {
string session_id = 1;
int64 object_id = 2;
string schema_path = 3;
map<string, string> params = 4;
};

message CreateInteractiveInstanceResponse {
Expand Down