Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CSV uploads for ClickHouse Cloud #236

Merged
merged 11 commits into from
May 17, 2024
2 changes: 1 addition & 1 deletion .github/workflows/check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
uses: actions/checkout@v2
with:
repository: metabase/metabase
ref: v0.49.6
ref: v0.49.11

- name: Remove incompatible tests
# dataset-definition-test tests test data definition,
Expand Down
123 changes: 116 additions & 7 deletions src/metabase/driver/clickhouse.clj
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
(ns metabase.driver.clickhouse
"Driver for ClickHouse databases"
#_{:clj-kondo/ignore [:unsorted-required-namespaces]}
(:require [clojure.string :as str]
(:require [clojure.core.memoize :as memoize]
[clojure.string :as str]
[honey.sql :as sql]
[metabase [config :as config]]
[metabase.driver :as driver]
[metabase.driver.clickhouse-introspection]
Expand All @@ -12,8 +14,11 @@
[metabase.driver.sql-jdbc [common :as sql-jdbc.common]
[connection :as sql-jdbc.conn]]
[metabase.driver.sql-jdbc.execute :as sql-jdbc.execute]
[metabase.driver.sql.query-processor :as sql.qp]
[metabase.driver.sql.util :as sql.u]
[metabase.util.log :as log]))
[metabase.upload :as upload]
[metabase.util.log :as log])
(:import [com.clickhouse.jdbc.internal ClickHouseStatementImpl]))

(set! *warn-on-reflection* true)

Expand All @@ -32,17 +37,17 @@
:test/jvm-timezone-setting false
:connection-impersonation false
:schemas true
:datetime-diff true}]
:datetime-diff true
:upload-with-auto-pk false}]
calherries marked this conversation as resolved.
Show resolved Hide resolved

(defmethod driver/database-supports? [:clickhouse feature] [_driver _feature _db] supported?))

(def ^:private default-connection-details
{:user "default" :password "" :dbname "default" :host "localhost" :port "8123"})

(defmethod sql-jdbc.conn/connection-details->spec :clickhouse
[_ details]
;; ensure defaults merge on top of nils
(let [details (reduce-kv (fn [m k v] (assoc m k (or v (k default-connection-details))))
(defn- connection-details->spec* [details]
(let [;; ensure defaults merge on top of nils
details (reduce-kv (fn [m k v] (assoc m k (or v (k default-connection-details))))
default-connection-details
details)
{:keys [user password dbname host port ssl use-no-proxy]} details
Expand All @@ -61,6 +66,33 @@
:product_name product-name}
(sql-jdbc.common/handle-additional-options details :separator-style :url))))

(def ^:private ^{:arglists '([db-details])} cloud?
"Is this a cloud DB?"
(memoize/ttl
(fn [db-details]
(sql-jdbc.execute/do-with-connection-with-options
:clickhouse
(connection-details->spec* db-details)
nil
(fn [^java.sql.Connection conn]
(with-open [stmt (.prepareStatement conn "SELECT value='1' FROM system.settings WHERE name='cloud_mode'")
rset (.executeQuery stmt)]
(when (.next rset)
(.getBoolean rset 1))))))
;; cache the results for 48 hours; TTL is here only to eventually clear out old entries
:ttl/threshold (* 48 60 60 1000)))

(defmethod sql-jdbc.conn/connection-details->spec :clickhouse
[_ details]
(cond-> (connection-details->spec* details)
(cloud? details)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I understand, select_sequential_consistency just limits which replica you query from, and doesn't affect the performance of the query on that specific replica.

This logic seems like it will break consistency for multi-replica on prem installations, and needlessly disable it on single replica ones. It seems best to just to enable it unconditionally for now?

For any multi-replica installation, enabling it for all connections seems to wipe out most of the benefits of connecting to a distributed installation, but I don't see any way around this besides reworking metabase to support separate pools for different connection flavors.

Copy link
Contributor Author

@calherries calherries May 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're mistaking the behaviour of select_sequential_consistency on CH Cloud (docs here), thinking it's the same as on-premise (the link you shared). We're only enabling select_sequential_consistency on CH Cloud, where it's much less of an issue.

There's more on SharedMergeTree in this blog post.

Unlike ReplicatedMergeTree, SharedMergeTree doesn't require replicas to communicate with each other. Instead, all communication happens through shared storage and clickhouse-keeper. SharedMergeTree implements asynchronous leaderless replication and uses clickhouse-keeper for coordination and metadata storage. This means that metadata doesn’t need to be replicated as your service scales up and down. This leads to faster replication, mutation, merges and scale-up operations. SharedMergeTree allows for hundreds of replicas for each table, making it possible to dynamically scale without shards. A distributed query execution approach is used in ClickHouse Cloud to utilize more compute resources for a query.

If your use case requires consistency guarantees that each server is delivering the same query result, then you can run the SYNC REPLICA system statement, which is a much more lightweight operation with the SharedMergeTree. Instead of syncing data (or metadata with zero-copy replication) between servers, each server just needs to fetch the current version of metadata from Keeper.

And I can't imagine fetching metadata from Keeper can be a bottleneck. I also got this DM from Serge on select_sequential_consistency:

With CH Cloud, there should not be any issues with that, as it uses “shared” merge tree and this is merely a metadata sync before executing the query (this is super fast, as it is a fetch of the meta from zookeeper).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, select_sequential_consistency is correctly enabled for the Cloud type of instance only.

For on-premise clusters (in a follow-up PR), we should just set insert_quorum equal to the nodes count so that replica sync won't be necessary and will be effectively done during the CSV insert, and it won't affect other read-only operations.

;; select_sequential_consistency guarantees that we can query data from any replica in CH Cloud
;; immediately after it is written
(assoc :select_sequential_consistency true)))

(defmethod driver/database-supports? [:clickhouse :uploads] [_driver _feature db]
(cloud? (:details db)))

(defmethod driver/can-connect? :clickhouse
[driver details]
(if config/is-test?
Expand Down Expand Up @@ -112,6 +144,83 @@
:semantic-version {:major (.getInt rset 2)
:minor (.getInt rset 3)}})))))

(defmethod driver/upload-type->database-type :clickhouse
[_driver upload-type]
(case upload-type
::upload/varchar-255 "Nullable(String)"
::upload/text "Nullable(String)"
::upload/int "Nullable(Int64)"
::upload/float "Nullable(Float64)"
::upload/boolean "Nullable(Boolean)"
::upload/date "Nullable(Date32)"
::upload/datetime "Nullable(DateTime64(3))"
calherries marked this conversation as resolved.
Show resolved Hide resolved
;; FIXME: should be `Nullable(DateTime64(3))`
::upload/offset-datetime nil))

(defmethod driver/table-name-length-limit :clickhouse
[_driver]
;; FIXME: This is a lie because you're really limited by a filesystems' limits, because Clickhouse uses
;; filenames as table/column names. But its an approximation
206)

(defn- quote-name [s]
(let [parts (str/split (name s) #"\.")]
(str/join "." (map #(str "`" % "`") parts))))

(defn- create-table!-sql
"Creates a ClickHouse table with the given name and column definitions. It assumes the engine is MergeTree,
so it only works with Clickhouse Cloud and single node on-premise deployments at the moment."
[driver table-name column-definitions & {:keys [primary-key]}]
(str/join "\n"
[(first (sql/format {:create-table (keyword table-name)
:with-columns (mapv (fn [[name type-spec]]
(vec (cons name [[:raw type-spec]])))
column-definitions)}
:quoted true
:dialect (sql.qp/quote-style driver)))
"ENGINE = MergeTree"
(format "ORDER BY (%s)" (str/join ", " (map quote-name primary-key)))]))

(defmethod driver/create-table! :clickhouse
[driver db-id table-name column-definitions & {:keys [primary-key]}]
(sql-jdbc.execute/do-with-connection-with-options
driver
db-id
{:write? true}
(fn [^java.sql.Connection conn]
(with-open [stmt (.createStatement conn)]
(let [^ClickHouseStatementImpl stmt (.unwrap stmt ClickHouseStatementImpl)
request (.getRequest stmt)]
(.set request "wait_end_of_query" "1")
(with-open [_response (-> request
(.query ^String (create-table!-sql driver table-name column-definitions :primary-key primary-key))
(.executeAndWait))]))))))

(defmethod driver/insert-into! :clickhouse
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than use the default sql-jdbc implementation, I've chosen to use the approach recommended here, to allow for large uploads.

[driver db-id table-name column-names values]
(when (seq values)
(sql-jdbc.execute/do-with-connection-with-options
driver
db-id
{:write? true}
(fn [^java.sql.Connection conn]
(let [sql (format "INSERT INTO %s (%s)" (quote-name table-name) (str/join ", " (map quote-name column-names)))]
(with-open [ps (.prepareStatement conn sql)]
(doseq [row values]
(when (seq row)
(doseq [[idx v] (map-indexed (fn [x y] [(inc x) y]) row)]
(condp isa? (type v)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably not a bottleneck, but making a protocol like (ps-set! v ps idx) could remove the reflection of type and the O(n) branching of condp.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, how do you imagine this being implemented exactly? The tricky part is this multimethod doesn't know the type of each column without using reflection, and values can be nil. I'm imagining this:

  • for each value, call (ps-set! v ps idx)
  • px-set! looks up the method to use for the idx in a volatile map, e.g. setString etc. If the map contains a method for the idx, it uses it. Otherwise calculate the method using a similar condp expression and save it to the map under the key idx, and execute the method.

That way we'll only use reflection once for every column.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I'm missing something, but I was thinking we'd just implement the protocol directly on each of these java types for v, eschewing any reflection or map look-up. I think you can just use Object for the fallback case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, anyway this seems pretty inconsequential compared to all the slow stuff we're doing on the Metabase side. I think I'd prefer to keep it simple and not change this condp for now

java.lang.String (.setString ps idx v)
java.lang.Boolean (.setBoolean ps idx v)
java.lang.Long (.setLong ps idx v)
java.lang.Double (.setFloat ps idx v)
java.math.BigInteger (.setObject ps idx v)
java.time.LocalDate (.setObject ps idx v)
java.time.LocalDateTime (.setObject ps idx v)
(.setString ps idx v)))
(.addBatch ps)))
(doall (.executeBatch ps))))))))

;;; ------------------------------------------ User Impersonation ------------------------------------------

(defmethod driver.sql/set-role-statement :clickhouse
Expand Down
70 changes: 42 additions & 28 deletions test/metabase/driver/clickhouse_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
[cljc.java-time.temporal.chrono-unit :as chrono-unit]
[clojure.test :refer :all]
[metabase.driver :as driver]
[metabase.driver.clickhouse :as clickhouse]
[metabase.driver.clickhouse-data-types-test]
[metabase.driver.clickhouse-introspection-test]
[metabase.driver.clickhouse-substitution-test]
Expand Down Expand Up @@ -56,34 +57,47 @@
(offset-date-time/parse shanghai-now date-time-formatter/iso-offset-date-time))))))))

(deftest ^:parallel clickhouse-connection-string
(testing "connection with no additional options"
(is (= ctd/default-connection-params
(sql-jdbc.conn/connection-details->spec
:clickhouse
{}))))
(testing "custom connection with additional options"
(is (= (merge
ctd/default-connection-params
{:subname "//myclickhouse:9999/foo?sessionTimeout=42"
:user "bob"
:password "qaz"
:use_no_proxy true
:ssl true})
(sql-jdbc.conn/connection-details->spec
:clickhouse
{:host "myclickhouse"
:port 9999
:user "bob"
:password "qaz"
:dbname "foo"
:use-no-proxy true
:additional-options "sessionTimeout=42"
:ssl true}))))
(testing "nil dbname handling"
(is (= ctd/default-connection-params
(sql-jdbc.conn/connection-details->spec
:clickhouse
{:dbname nil})))))
(mt/with-dynamic-redefs [;; This function's implementation requires the connection details to actually connect to the
;; database, which is orthogonal to the purpose of this test.
clickhouse/cloud? (constantly false)]
(testing "connection with no additional options"
(is (= ctd/default-connection-params
(sql-jdbc.conn/connection-details->spec
:clickhouse
{}))))
(testing "custom connection with additional options"
(is (= (merge
ctd/default-connection-params
{:subname "//myclickhouse:9999/foo?sessionTimeout=42"
:user "bob"
:password "qaz"
:use_no_proxy true
:ssl true})
(sql-jdbc.conn/connection-details->spec
:clickhouse
{:host "myclickhouse"
:port 9999
:user "bob"
:password "qaz"
:dbname "foo"
:use-no-proxy true
:additional-options "sessionTimeout=42"
:ssl true}))))
(testing "nil dbname handling"
(is (= ctd/default-connection-params
(sql-jdbc.conn/connection-details->spec
:clickhouse
{:dbname nil}))))))

(deftest ^:parallel clickhouse-connection-string-select-sequential-consistency
(mt/with-dynamic-redefs [;; This function's implementation requires the connection details to actually
;; connect to the database, which is orthogonal to the purpose of this test.
clickhouse/cloud? (constantly true)]
(testing "connection with no additional options"
(is (= (assoc ctd/default-connection-params :select_sequential_consistency true)
(sql-jdbc.conn/connection-details->spec
:clickhouse
{}))))))

(deftest ^:parallel clickhouse-tls
(mt/test-driver
Expand Down
2 changes: 2 additions & 0 deletions test/metabase/test/data/clickhouse.clj
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@

(defmethod sql.tx/add-fk-sql :clickhouse [& _] nil) ; TODO - fix me

(defmethod sql.tx/session-schema :clickhouse [_] "default")

(defmethod tx/supports-time-type? :clickhouse [_driver] false)

(defn rows-without-index
Expand Down
Loading