Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GIE] Support PathExpand with OPT=ALL_V_E in GIE #2841

Merged
merged 18 commits into from
Jun 13, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@

public enum ResultOpt implements IntEnum<ResultOpt> {
EndV,
AllV;
AllV,
AllVe;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AllVe is strange, either AllVE, if the compiler complains, change everything to full name, ie., EndVertex, AllVertex, AllVertexEdge

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated as AllVE.


@Override
public int getInt() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,14 @@ public void configure(final Object... keyValues) {
+ " insensitive)");
}
} else if (key.equals("ResultOpt")) {
if (value.equals("AllV") || value.equals("EndV")) {
if (value.equals("AllV") || value.equals("EndV") || value.equals("AllVe")) {
this.resultOpt = ResultOpt.valueOf(value);
} else {
throw new ExtendGremlinStepException(
"value "
+ originalVal
+ " is invalid, use ALL_V or END_V instead (case insensitive)");
+ " is invalid, use ALL_V, END_V, ALL_VE instead (case"
+ " insensitive)");
}
} else if (key.equals("Until")) {
this.untilCondition = ObjectUtils.requireNonEmpty(originalVal);
Expand Down
1 change: 1 addition & 0 deletions interactive_engine/executor/ir/core/src/plan/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2175,6 +2175,7 @@ mod graph {
pub enum PathResultOpt {
EndV = 0,
AllV = 1,
AllVe = 2,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the above AllVe

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated as AllVE.

}

/// To initialize an path expand operator from an edge_expand base
Expand Down
4 changes: 4 additions & 0 deletions interactive_engine/executor/ir/core/src/plan/physical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ impl AsPhysical for pb::PathExpand {
// 1. the previous op is ExpandE, and with no alias (which means that the edges won't be accessed later).
// 2. `GetV` is GetV(Adj) (i.e., opt=Start/End/Other) without any filters or further query semantics.
// 3. the direction should be: outE + inV = out; inE + outV = in; and bothE + otherV = both
// In addition, if PathExpand + GetV, make opt of GetV to be `End`.
fn build_and_try_fuse_get_v(builder: &mut JobBuilder, mut get_v: pb::GetV) -> IrResult<()> {
if get_v.opt == 4 {
return Err(IrError::Unsupported("Try to fuse GetV with Opt=Self into ExpandE".to_string()));
Expand Down Expand Up @@ -352,6 +353,9 @@ fn build_and_try_fuse_get_v(builder: &mut JobBuilder, mut get_v: pb::GetV) -> Ir
return Ok(());
}
}
} else if let physical_pb::physical_opr::operator::OpKind::Path(ref _path) = op_kind {
// make opt of getV after path expand as End.
get_v.opt = physical_pb::get_v::VOpt::End as i32;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using std::mem::transmute ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
}
builder.get_v(get_v);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use pegasus_common::impl_as_any;
use crate::apis::{read_id, write_id, Details, DynDetails, Element, GraphElement, PropertyValue, ID};
use crate::utils::expr::eval::Context;

#[derive(Clone, Debug)]
#[derive(Clone, Debug, Default)]
pub struct Edge {
id: ID,
label: Option<LabelId>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use ahash::HashMap;
use dyn_type::{BorrowObject, Object};
pub use edge::Edge;
use ir_common::{LabelId, NameOrId};
pub use path::GraphPath;
pub use path::{GraphPath, VertexOrEdge};
pub use property::{Details, DynDetails, PropKey, PropertyValue};
pub use vertex::Vertex;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
//! See the License for the specific language governing permissions and
//! limitations under the License.

use std::any::Any;
use std::cmp::Ordering;
use std::convert::{TryFrom, TryInto};
use std::hash::{Hash, Hasher};
Expand All @@ -26,46 +25,81 @@ use ir_common::generated::algebra::path_expand::ResultOpt;
use ir_common::generated::results as result_pb;
use ir_common::{LabelId, NameOrId};
use pegasus::codec::{Decode, Encode, ReadExt, WriteExt};
use pegasus_common::downcast::*;
use pegasus_common::downcast::Any;
use pegasus_common::downcast::AsAny;
use pegasus_common::impl_as_any;

use crate::apis::{Element, GraphElement, PropertyValue, Vertex, ID};
use crate::apis::{Edge, Element, GraphElement, PropertyValue, Vertex, ID};

#[derive(Clone, Debug, Hash, PartialEq, PartialOrd)]
pub enum VertexOrEdge {
V(Vertex),
E(Edge),
}

impl From<Vertex> for VertexOrEdge {
fn from(v: Vertex) -> Self {
Self::V(v)
}
}

impl From<Edge> for VertexOrEdge {
fn from(e: Edge) -> Self {
Self::E(e)
}
}

impl VertexOrEdge {
pub fn as_vertex(&self) -> Option<&Vertex> {
match self {
VertexOrEdge::V(v) => Some(v),
_ => None,
}
}

pub fn as_edge(&self) -> Option<&Edge> {
match self {
VertexOrEdge::E(e) => Some(e),
_ => None,
}
}
}

#[derive(Clone, Debug)]
pub enum GraphPath {
AllV(Vec<Vertex>),
SimpleAllV(Vec<Vertex>),
EndV((Vertex, usize)),
SimpleEndV((Vertex, Vec<ID>)),
AllV(Vec<VertexOrEdge>),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is already a complex combination, we may want to include two enum, one is PathOpt, one is ResultOpt, as what we have defined in proto. namely,

pub struct GraphPath {
     path_opt: PathOpt,
     reult_opt: ResultOpt, 
     data: Vec<VertexOrEdge>
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some path may not need to preserve the whole path data of Vec<VertexOrEdge> (e.g., when EndV or SimpleEndV here).

SimpleAllV(Vec<VertexOrEdge>),
EndV((VertexOrEdge, usize)),
SimpleEndV((VertexOrEdge, Vec<ID>)),
}

impl_as_any!(GraphPath);

impl GraphPath {
pub fn new(entry: Vertex, path_opt: PathOpt, result_opt: ResultOpt) -> Self {
pub fn new<E: Into<VertexOrEdge>>(entry: E, path_opt: PathOpt, result_opt: ResultOpt) -> Self {
match result_opt {
ResultOpt::EndV => match path_opt {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ResultOpt and PathOpt are both from pb, includes the whole path, like: pb::xx::xx::ResultOpt. In addition, I know ResultOpt::AllVe is generated, but can we avoid generating this name (that is inconsistent with our naming convention).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

PathOpt::Arbitrary => GraphPath::EndV((entry, 1)),
PathOpt::Arbitrary => GraphPath::EndV((entry.into(), 1)),
PathOpt::Simple => {
let entry = entry.into();
let id = entry.id();
GraphPath::SimpleEndV((entry, vec![id]))
}
},
ResultOpt::AllV => match path_opt {
PathOpt::Arbitrary => GraphPath::AllV(vec![entry]),
PathOpt::Simple => GraphPath::SimpleAllV(vec![entry]),
ResultOpt::AllV | ResultOpt::AllVe => match path_opt {
PathOpt::Arbitrary => GraphPath::AllV(vec![entry.into()]),
PathOpt::Simple => GraphPath::SimpleAllV(vec![entry.into()]),
},
}
}

// append an entry and return the flag of whether the entry has been appended or not.
pub fn append(&mut self, entry: Vertex) -> bool {
pub fn append<E: Into<VertexOrEdge>>(&mut self, entry: E) -> bool {
match self {
GraphPath::AllV(ref mut path) => {
path.push(entry);
path.push(entry.into());
true
}
GraphPath::SimpleAllV(ref mut path) => {
let entry = entry.into();
if path.contains(&entry) {
false
} else {
Expand All @@ -74,37 +108,91 @@ impl GraphPath {
}
}
GraphPath::EndV((ref mut e, ref mut weight)) => {
*e = entry;
*e = entry.into();
*weight += 1;
true
}
GraphPath::SimpleEndV((ref mut e, ref mut path)) => {
let entry = entry.into();
if path.contains(&entry.id()) {
false
} else {
path.push(entry.id());
*e = entry;
*e = entry.into();
true
}
}
}
}

pub fn get_path_end(&self) -> &Vertex {
pub fn get_path_end(&self) -> &VertexOrEdge {
match self {
GraphPath::AllV(ref p) | GraphPath::SimpleAllV(ref p) => p.last().unwrap(),
GraphPath::EndV((ref e, _)) | GraphPath::SimpleEndV((ref e, _)) => e,
}
}

pub fn take_path(self) -> Option<Vec<Vertex>> {
pub fn take_path(self) -> Option<Vec<VertexOrEdge>> {
match self {
GraphPath::AllV(p) | GraphPath::SimpleAllV(p) => Some(p),
GraphPath::EndV(_) | GraphPath::SimpleEndV(_) => None,
}
}
}

impl Element for VertexOrEdge {
fn as_graph_element(&self) -> Option<&dyn GraphElement> {
match self {
VertexOrEdge::V(v) => v.as_graph_element(),
VertexOrEdge::E(e) => e.as_graph_element(),
}
}

fn len(&self) -> usize {
match self {
VertexOrEdge::V(v) => v.len(),
VertexOrEdge::E(e) => e.len(),
}
}

fn as_borrow_object(&self) -> BorrowObject {
match self {
VertexOrEdge::V(v) => v.as_borrow_object(),
VertexOrEdge::E(e) => e.as_borrow_object(),
}
}
}

impl GraphElement for VertexOrEdge {
fn id(&self) -> ID {
match self {
VertexOrEdge::V(v) => v.id(),
VertexOrEdge::E(e) => e.id(),
}
}

fn label(&self) -> Option<LabelId> {
match self {
VertexOrEdge::V(v) => v.label(),
VertexOrEdge::E(e) => e.label(),
}
}

fn get_property(&self, key: &NameOrId) -> Option<PropertyValue> {
match self {
VertexOrEdge::V(v) => v.get_property(key),
VertexOrEdge::E(e) => e.get_property(key),
}
}

fn get_all_properties(&self) -> Option<HashMap<NameOrId, Object>> {
match self {
VertexOrEdge::V(v) => v.get_all_properties(),
VertexOrEdge::E(e) => e.get_all_properties(),
}
}
}

impl Element for GraphPath {
fn as_graph_element(&self) -> Option<&dyn GraphElement> {
Some(self)
Expand Down Expand Up @@ -161,7 +249,6 @@ impl PartialEq for GraphPath {
}
}
}

impl PartialOrd for GraphPath {
// We define partial_cmp by structure, ignoring path weight
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Expand All @@ -179,6 +266,39 @@ impl PartialOrd for GraphPath {
}
}

impl Encode for VertexOrEdge {
fn write_to<W: WriteExt>(&self, writer: &mut W) -> std::io::Result<()> {
match self {
VertexOrEdge::V(v) => {
writer.write_u8(0)?;
v.write_to(writer)?;
}
VertexOrEdge::E(e) => {
writer.write_u8(1)?;
e.write_to(writer)?;
}
}
Ok(())
}
}

impl Decode for VertexOrEdge {
fn read_from<R: ReadExt>(reader: &mut R) -> std::io::Result<Self> {
let e = reader.read_u8()?;
match e {
0 => {
let v = <Vertex>::read_from(reader)?;
Ok(VertexOrEdge::V(v))
}
1 => {
let e = <Edge>::read_from(reader)?;
Ok(VertexOrEdge::E(e))
}
_ => Err(std::io::Error::new(std::io::ErrorKind::Other, "unreachable")),
}
}
}

impl Encode for GraphPath {
fn write_to<W: WriteExt>(&self, writer: &mut W) -> std::io::Result<()> {
match self {
Expand Down Expand Up @@ -210,20 +330,20 @@ impl Decode for GraphPath {
let opt = reader.read_u8()?;
match opt {
0 => {
let path = <Vec<Vertex>>::read_from(reader)?;
let path = <Vec<VertexOrEdge>>::read_from(reader)?;
Ok(GraphPath::AllV(path))
}
1 => {
let vertex_or_edge = <Vertex>::read_from(reader)?;
let vertex_or_edge = <VertexOrEdge>::read_from(reader)?;
let weight = <u64>::read_from(reader)? as usize;
Ok(GraphPath::EndV((vertex_or_edge, weight)))
}
2 => {
let path = <Vec<Vertex>>::read_from(reader)?;
let path = <Vec<VertexOrEdge>>::read_from(reader)?;
Ok(GraphPath::SimpleAllV(path))
}
3 => {
let vertex_or_edge = <Vertex>::read_from(reader)?;
let vertex_or_edge = <VertexOrEdge>::read_from(reader)?;
let path = <Vec<ID>>::read_from(reader)?;
Ok(GraphPath::SimpleEndV((vertex_or_edge, path)))
}
Expand All @@ -232,7 +352,7 @@ impl Decode for GraphPath {
}
}

impl TryFrom<result_pb::graph_path::VertexOrEdge> for Vertex {
impl TryFrom<result_pb::graph_path::VertexOrEdge> for VertexOrEdge {
type Error = ParsePbError;
fn try_from(e: result_pb::graph_path::VertexOrEdge) -> Result<Self, Self::Error> {
let vertex_or_edge = e
Expand All @@ -241,10 +361,11 @@ impl TryFrom<result_pb::graph_path::VertexOrEdge> for Vertex {
match vertex_or_edge {
result_pb::graph_path::vertex_or_edge::Inner::Vertex(v) => {
let vertex = v.try_into()?;
Ok(vertex)
Ok(VertexOrEdge::V(vertex))
}
result_pb::graph_path::vertex_or_edge::Inner::Edge(_) => {
Err(ParsePbError::Unsupported("Path with edges".to_string()))
result_pb::graph_path::vertex_or_edge::Inner::Edge(e) => {
let edge = e.try_into()?;
Ok(VertexOrEdge::E(edge))
}
}
}
Expand All @@ -265,10 +386,10 @@ impl TryFrom<result_pb::GraphPath> for GraphPath {
impl Hash for GraphPath {
fn hash<H: Hasher>(&self, state: &mut H) {
match self {
GraphPath::AllV(p) => p.hash(state),
GraphPath::SimpleAllV(p) => p.hash(state),
GraphPath::EndV((e, _)) => e.hash(state),
GraphPath::SimpleEndV((e, _)) => e.hash(state),
GraphPath::AllV(p) | GraphPath::SimpleAllV(p) => p.hash(state),
GraphPath::EndV((e, _)) | GraphPath::SimpleEndV((e, _)) => e.hash(state),
}
}
}

impl_as_any!(GraphPath);
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ impl QueryParams {
}

pub fn is_queryable(&self) -> bool {
!(self.labels.is_empty() && self.filter.is_none() && self.limit.is_none() && self.columns.is_none())
!(self.filter.is_none() && self.limit.is_none() && self.columns.is_none())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove labels.is_empty() here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed this function as it is not used.

}

pub fn get_extra_param(&self, key: &str) -> Option<&String> {
Expand Down
Loading