Skip to content

Commit

Permalink
Datalog with-recursive operator
Browse files Browse the repository at this point in the history
  • Loading branch information
FiV0 committed Jul 15, 2023
1 parent 735f233 commit 3d1e90c
Show file tree
Hide file tree
Showing 2 changed files with 292 additions and 38 deletions.
162 changes: 124 additions & 38 deletions core/src/main/clojure/xtdb/datalog.clj
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,15 @@
:match ::match-spec,
:temporal-opts (s/? ::temporal-opts))))

(s/def ::match-recursive
(s/and list?
(s/cat :tag #{'$-recursive 'match-recursive}
:table ::table,
:match ::match-spec,
:temporal-opts (s/? ::temporal-opts)
:sub-parts (s/coll-of ::query :kind vector?))))


(s/def ::triple
(s/and vector?
(s/conformer identity vec)
Expand Down Expand Up @@ -166,6 +175,7 @@
:triple ::triple
:call ::call-clause
:match ::match
:match-recursive ::match-recursive
:rule ::rule))

(s/def ::where
Expand Down Expand Up @@ -337,12 +347,12 @@
(s/unform ::in-binding in))))
in-arg))))})

:match {:provided-vars (->> term-arg
:match
(map second)
(filter (comp #{:logic-var} first))
(map second)
set)}))
(:match :match-recursive) {:provided-vars (->> term-arg
:match
(map second)
(filter (comp #{:logic-var} first))
(map second)
set)}))

(defn- ->param-sym [lv]
(-> (symbol (str "?" (name lv)))
Expand Down Expand Up @@ -486,31 +496,97 @@
{:triples (map #(s/unform ::triple %) triples)})))
matches->eids))

(defn- plan-from [{:keys [triples matches]}]
(defn- plan-from [{:keys [triples matches] ::keys [recursive-tables]}]
(let [matches-eids (zipmap matches (map match->eids matches))
tables (->> (group-by :e triples)
(reduce add-triples-to-matches matches-eids)
keys)]
(vec
(for [{:keys [table match temporal-opts]} tables]
(let [match (->> match (mapv (fn [[a v]] (MapEntry/create (col-sym a) v))))
var->cols (-> match
(->> (keep (fn [[a [v-type v-arg]]]
(case v-type
:logic-var {:lv v-arg
:col (if (contains?
#{app-time-period-sym
system-time-period-sym}
a)
(col-sym v-arg)
a)}
:unwind {:lv (first v-arg), :col (attr->unwind-col a)}
nil)))
(group-by :lv))
(update-vals #(into #{} (map :col) %)))]
(-> (plan-scan table match temporal-opts)
(wrap-unwind match)
(wrap-unify var->cols)))))))
(if (contains? recursive-tables table)
(let [renames (->> match
(keep (fn [[a [v-type v-arg]]]
(case v-type
:logic-var (MapEntry/create (col-sym a) v-arg)
;; TODO
:unwind (throw (ex-info "unwind not supported in match-recursive!" {}))
nil)))
(into {}))
selects (->> match
(keep (fn [[a [v-type v-arg]]]
(case v-type
:literal (list '= (col-sym a) v-arg)
nil))))]
;; TODO var->cols
(-> [:rename renames (wrap-select table selects)]
(with-meta {::vars (vals renames)})))
(let [match (->> match (mapv (fn [[a v]] (MapEntry/create (col-sym a) v))))
var->cols (-> match
(->> (keep (fn [[a [v-type v-arg]]]
(case v-type
:logic-var {:lv v-arg
:col (if (contains?
#{app-time-period-sym
system-time-period-sym}
a)
(col-sym v-arg)
a)}
:unwind {:lv (first v-arg), :col (attr->unwind-col a)}
nil)))
(group-by :lv))
(update-vals #(into #{} (map :col) %)))]
(-> (plan-scan table match temporal-opts)
(wrap-unwind match)
(wrap-unify var->cols))))))))

;; This might need some more tests
(defn- referenced-tables [{:keys [where] :as _query}]
(reduce (fn [acc [term-type term-arg]]
(case term-type
:match (conj acc (:table term-arg))
:match-recursive
(-> (conj acc (:table term-arg))
(into (map referenced-tables (:sub-parts term-arg))))
(:inner-join :semi-join :anti-join :left-join :single-join)
(let [{:keys [sub-query]} term-arg]
(into acc (referenced-tables sub-query)))
:union-join
(into acc (map referenced-tables) (:branches term-arg))
acc))
#{} where))

(defn plan-recursive-clauses [recursive-clauses recursive-tables]
(for [{:keys [table match sub-parts]} recursive-clauses]
(let [renames (->> match
(keep (fn [[a [v-type v-arg]]]
(case v-type
:logic-var (MapEntry/create (col-sym a) v-arg)
:unwind (throw (ex-info "unwind not supported in match-recursive!" {}))
nil)))
(into {}))
selects (->> match
(keep (fn [[a [v-type v-arg]]]
(case v-type
:literal (list '= (col-sym a) v-arg)
nil))))
{recursive-qs true, non-recursive-qs false} (group-by (comp #(contains? % table)
referenced-tables)
sub-parts)
recursive-q (->> recursive-qs
(map (comp plan-query #(assoc % ::recursive-tables ((fnil conj #{}) recursive-tables table))))
(reduce (fn [acc plan] [:union-all acc plan])))
non-recursive-q (->> non-recursive-qs
(map plan-query)
(reduce (fn [acc plan] [:union-all acc plan])))]
;; TODO var->cols, empty renames
(-> [:rename renames
(wrap-select
[:fixpoint table
non-recursive-q
recursive-q]
selects)]
(with-meta {::vars (vals renames)
::provided-vars (vals renames)})))))

(def ^:dynamic *gensym* gensym)

Expand Down Expand Up @@ -566,12 +642,12 @@
(MapEntry/create param param-symbol)))
(into {}))))

(defn- plan-sub-query [sq-type {:keys [sub-query] :as term-arg}]
(defn- plan-sub-query [sq-type {:keys [sub-query] ::keys [recursive-tables] :as term-arg}]
(let [{:keys [required-vars provided-vars]} (term-vars [sq-type term-arg])
apply-mapping (->apply-mapping required-vars)]
(-> (plan-query (-> sub-query
(dissoc :in)
(assoc ::apply-mapping apply-mapping)))
(assoc ::apply-mapping apply-mapping ::recursive-tables recursive-tables)))
(vary-meta into {::provided-vars provided-vars
::required-vars required-vars
::apply-mapping apply-mapping}))))
Expand Down Expand Up @@ -652,14 +728,15 @@
(= :left-outer-join join-type) (update ::vars into provided-vars))))))
plan)))

(defn- plan-union-join [{:keys [args branches] :as uj}]
(defn- plan-union-join [{:keys [args branches] ::keys [recursive-tables] :as uj}]
(let [{:keys [required-vars]} (term-vars [:union-join uj])
apply-mapping (->apply-mapping required-vars)]
(-> branches
(->> (mapv (fn [branch]
(plan-query
{:find (vec (for [arg args]
[:logic-var arg]))
::recursive-tables recursive-tables
::apply-mapping apply-mapping
:where branch})))
(reduce (fn [acc plan]
Expand Down Expand Up @@ -907,31 +984,40 @@
{:rule-name name})))))
rule-name->rules))

(defn- plan-body [{where-clauses :where, apply-mapping ::apply-mapping, rules :rules} in-rels]
(defn- plan-body [{where-clauses :where, apply-mapping ::apply-mapping, rules :rules
recursive-tables ::recursive-tables}
in-rels]
(let [{::keys [param-vars]} (meta in-rels)
rule-name->rules (->> rules
gensym-rules
(group-by (comp :name :head)))
_ (check-rule-arity rule-name->rules)
where-clauses (expand-rules rule-name->rules where-clauses)

{match-clauses :match, triple-clauses :triple, call-clauses :call
{match-clauses :match, recursive-clauses :match-recursive triple-clauses :triple, call-clauses :call
inner-join-clauses :inner-join, left-join-clauses :left-join
semi-join-clauses :semi-join, anti-join-clauses :anti-join, union-join-clauses :union-join}
(-> where-clauses
(->> (group-by first))
(update-vals #(mapv second %)))]
(update-vals #(mapv second %)))
assoc-recursive-tables #(if recursive-tables (assoc % ::recursive-tables recursive-tables) %)]

(loop [plan (mega-join (vec (concat in-rels (plan-from {:matches match-clauses
:triples triple-clauses})))
:triples triple-clauses
::recursive-tables recursive-tables})
(when recursive-clauses (plan-recursive-clauses recursive-clauses recursive-tables))))
(concat param-vars apply-mapping))

calls (some->> call-clauses (mapv plan-call))
union-joins (some->> union-join-clauses (mapv plan-union-join))
inner-joins (some->> inner-join-clauses (mapv (partial plan-sub-query :inner-join)))
left-joins (some->> left-join-clauses (mapv (partial plan-sub-query :left-join)))
semi-joins (some->> semi-join-clauses (mapv (partial plan-sub-query :semi-join)))
anti-joins (some->> anti-join-clauses (mapv (partial plan-sub-query :anti-join)))]
union-joins (some->> union-join-clauses (mapv (comp plan-union-join assoc-recursive-tables)))
inner-joins (some->> inner-join-clauses (mapv (comp (partial plan-sub-query :inner-join)
assoc-recursive-tables)))
left-joins (some->> left-join-clauses (mapv (comp (partial plan-sub-query :left-join)
assoc-recursive-tables)))
semi-joins (some->> semi-join-clauses (mapv (comp (partial plan-sub-query :semi-join)
assoc-recursive-tables)))
anti-joins (some->> anti-join-clauses (mapv (comp (partial plan-sub-query :anti-join)
assoc-recursive-tables)))]

(if (and (empty? calls) (empty? inner-joins) (empty? left-joins)
(empty? semi-joins) (empty? anti-joins) (empty? union-joins))
Expand Down Expand Up @@ -1216,7 +1302,7 @@

#_(doto clojure.pprint/pprint)
#_(->> (binding [*print-meta* true]))
(lp/rewrite-plan {})
#_(lp/rewrite-plan {})
#_(doto clojure.pprint/pprint)
(doto (lp/validate-plan)))]

Expand Down
Loading

0 comments on commit 3d1e90c

Please sign in to comment.