diff --git a/src/metabase/driver/clickhouse.clj b/src/metabase/driver/clickhouse.clj index 6c8750a..a8d0966 100644 --- a/src/metabase/driver/clickhouse.clj +++ b/src/metabase/driver/clickhouse.clj @@ -2,6 +2,7 @@ "Driver for ClickHouse databases" #_{:clj-kondo/ignore [:unsorted-required-namespaces]} (:require [clojure.string :as str] + [honey.sql :as sql] [metabase [config :as config]] [metabase.driver :as driver] [metabase.driver.clickhouse-introspection] @@ -12,7 +13,11 @@ [metabase.driver.sql-jdbc [common :as sql-jdbc.common] [connection :as sql-jdbc.conn]] [metabase.driver.sql-jdbc.execute :as sql-jdbc.execute] + [metabase.driver.sql.query-processor :as sql.qp] [metabase.driver.sql.util :as sql.u] + [metabase.query-processor.writeback :as qp.writeback] + [metabase.test.data.sql :as sql.tx] + [metabase.upload :as upload] [metabase.util.log :as log])) (set! *warn-on-reflection* true) @@ -32,7 +37,9 @@ :test/jvm-timezone-setting false :connection-impersonation false :schemas true - :datetime-diff true}] + :uploads true + :datetime-diff true + :upload-with-auto-pk false}] (defmethod driver/database-supports? [:clickhouse feature] [_driver _feature _db] supported?)) @@ -112,6 +119,74 @@ :semantic-version {:major (.getInt rset 2) :minor (.getInt rset 3)}}))))) +(defmethod driver/upload-type->database-type :clickhouse + [_driver upload-type] + (case upload-type + ::upload/varchar-255 "Nullable(String)" + ::upload/text "Nullable(String)" + ::upload/int "Nullable(Int64)" + ::upload/float "Nullable(Float64)" + ::upload/boolean "Nullable(Boolean)" + ::upload/date "Nullable(Date32)" + ::upload/datetime "Nullable(DateTime64(3))" + ;; FIXME: should be `Nullable(DateTime64(3))` + ::upload/offset-datetime nil)) + +(defmethod driver/table-name-length-limit :clickhouse + [_driver] + ;; FIXME: This is a lie because you're really limited by a filesystems' limits, because Clickhouse uses + ;; filenames as table/column names. But its an approximation + 206) + +(defn- quote-name [s] + (let [parts (str/split (name s) #"\.")] + (str/join "." (map #(str "`" % "`") parts)))) + +(defn- create-table!-sql + [driver table-name column-definitions & {:keys [primary-key]}] + (str/join "\n" + [(first (sql/format {:create-table (keyword table-name) + :with-columns (mapv (fn [[name type-spec]] + (vec (cons name [[:raw type-spec]]))) + column-definitions)} + :quoted true + :dialect (sql.qp/quote-style driver))) + "ENGINE = MergeTree" + (format "PRIMARY KEY (%s)" (str/join ", " (map quote-name primary-key))) + "ORDER BY ()"])) + +(defmethod driver/create-table! :clickhouse + [driver db-id table-name column-definitions & {:keys [primary-key]}] + (let [sql (create-table!-sql driver table-name column-definitions :primary-key primary-key)] + (qp.writeback/execute-write-sql! db-id sql))) + +(defmethod driver/insert-into! :clickhouse + [driver db-id table-name column-names values] + (when (seq values) + (sql-jdbc.execute/do-with-connection-with-options + driver + db-id + {:write? true} + (fn [^java.sql.Connection conn] + (let [sql (format "insert into %s (%s)" (quote-name table-name) (str/join ", " (map quote-name column-names)))] + (with-open [ps (.prepareStatement conn sql)] + (doseq [row values] + (when (seq row) + (doseq [[idx v] (map-indexed (fn [x y] [(inc x) y]) row)] + (condp isa? (type v) + java.lang.String (.setString ps idx v) + java.lang.Boolean (.setBoolean ps idx v) + java.lang.Long (.setLong ps idx v) + java.lang.Double (.setFloat ps idx v) + java.math.BigInteger (.setObject ps idx v) + java.time.LocalDate (.setObject ps idx v) + java.time.LocalDateTime (.setObject ps idx v) + (.setString ps idx v))) + (.addBatch ps))) + (doall (.executeBatch ps)))))))) + +(defmethod sql.tx/session-schema :clickhouse [_] "default") + ;;; ------------------------------------------ User Impersonation ------------------------------------------ (defmethod driver.sql/set-role-statement :clickhouse diff --git a/src/metabase/driver/scratch.clj b/src/metabase/driver/scratch.clj new file mode 100644 index 0000000..60fff9b --- /dev/null +++ b/src/metabase/driver/scratch.clj @@ -0,0 +1,277 @@ +(ns metabase.driver.scratch + (:require [clojure.java.io :as io] + [clojure.java.jdbc :as jdbc] + [metabase [config :as config]] + [metabase.driver.clickhouse-introspection] + [metabase.driver.clickhouse-nippy] + [metabase.driver.clickhouse-qp] + [metabase.driver.sql-jdbc [common :as sql-jdbc.common] + [connection :as sql-jdbc.conn]] + [metabase.driver.sql-jdbc.execute :as sql-jdbc.execute] + [metabase.query-processor.writeback :as qp.writeback] + [metabase.test :as mt]) + (:import [java.io ByteArrayInputStream] + [java.nio.charset StandardCharsets] + [com.clickhouse.data.value ClickHouseArrayValue] + [com.clickhouse.data ClickHouseFormat ClickHouseFile ClickHouseCompression ClickHouseDataStreamFactory + ClickHousePipedOutputStream ClickHouseInputStream] + [com.clickhouse.client ClickHouseNode ClickHouseProtocol ClickHouseCredentials ClickHouseClient])) + +(def server + (.. (ClickHouseNode/builder) + (host "127.0.0.1") + (port ClickHouseProtocol/HTTP) + (database "default") + (credentials (ClickHouseCredentials/fromUserAndPassword "default" "")) + (build))) + +(def file + (ClickHouseFile/of "/Users/callumherries/meta/mb49/t.csv" ClickHouseCompression/NONE ClickHouseFormat/CSV)) + +(def client + (ClickHouseClient/newInstance (into-array ClickHouseProtocol [(.getProtocol server)]))) + +(.. client + (write server) + (set "input_format_csv_skip_first_lines" "1") + (set "format_csv_delimiter" ",") + (table "t") + (data file) + (executeAndWait)) + +(defn- new-input-stream [content] + (ByteArrayInputStream. (.getBytes content StandardCharsets/US_ASCII))) + +(let [input-stream (new-input-stream "1,1\n2,3")] + (.. client + (write server) + (set "input_format_csv_skip_first_lines" "1") + (set "format_csv_delimiter" ",") + (format ClickHouseFormat/CSV) + (table "t") + (data input-stream) + (executeAndWait))) + +(q "SELECT * FROM t") +(e! "DELETE FROM t WHERE 1=1") + +(e! "CREATE TABLE t (one Int32, two Int32) ENGINE = MergeTree ORDER BY ()") +(e! "DROP TABLE IF EXISTS t") + +(comment + (defmethod driver/create-table! :clickhouse + [driver db-id table-name column-definitions & {:keys [primary-key]}] + (let [sql (create-table!-sql driver table-name column-definitions :primary-key primary-key)] + (qp.writeback/execute-write-sql! db-id sql))) + + (def db-id 6) + + (def table-name "t") + + (defn e! [e] + (jdbc/with-db-transaction [conn (sql-jdbc.conn/db->pooled-connection-spec db-id)] + (jdbc/execute! conn e))) + + (defn q [e] + (jdbc/with-db-transaction [conn (sql-jdbc.conn/db->pooled-connection-spec db-id)] + (jdbc/query conn e))) + + (e! "CREATE TABLE t (one Int32, two Int32) ENGINE = MergeTree ORDER BY ()") + (e! "INSERT INTO t FROM INFILE 't.csv' FORMAT CSV") + (q "SELECT * FROM t") + ) + +;; ClickHouseNode server = ClickHouseNode.builder() +;; .host( "127.0.0.1" ) +;; .port( ClickHouseProtocol.HTTP ) +;; .database( "mydatabase" ) +;; .credentials( ClickHouseCredentials +;; .fromUserAndPassword( "username", "password" ) ) +;; .build(); + +;; // Create a CSV file reference + +;; ClickHouseFile file = ClickHouseFile.of( +;; "/path/to/file.csv", ClickHouseCompression.NONE, ClickHouseFormat.CSV ); + +;; // Create a client + +;; ClickHouseClient client = ClickHouseClient.newInstance( server.getProtocol() ); + +;; // Specify settings, load data to the specified table and wait for completion + +;; client.write( server ) +;; .set( "input_format_csv_skip_first_lines", "1" ) +;; .set( "format_csv_delimiter", "," ) +;; .table( "mytable" ) +;; .data( file ) +;; .executeAndWait(); + + + ;; https://github.com/ClickHouse/clickhouse-java/blob/6a4ffc4ae0b76a6f65a57ee0db3246d45fbc78c1/examples/jdbc/src/main/java/com/clickhouse/examples/jdbc/Advanced.java#L37 + ;; private static ByteArrayInputStream newInputStream (String content) + ;; {return new ByteArrayInputStream (content.getBytes (StandardCharsets.US_ASCII)); + ;; } + + ;; static String exteralTables(String url) throws SQLException { + ;; String sql = "select a.name as n1, b.name as n2 from {tt 'table1'} a inner join {tt 'table2'} b on a.id=b.id"; + ;; try (Connection conn = getConnection(url); PreparedStatement ps = conn.prepareStatement(sql)) { + ;; ps.setObject(1, + ;; ClickHouseExternalTable.builder().name("table1").columns("id Int32, name Nullable(String)") + ;; .format(ClickHouseFormat.CSV) + ;; .content(newInputStream("1,a\n2,b")).build()); + ;; ps.setObject(2, + ;; ClickHouseExternalTable.builder().name("table2").columns("id Int32, name String") + ;; .format(ClickHouseFormat.JSONEachRow) + ;; .content(newInputStream("{\"id\":3,\"name\":\"c\"}\n{\"id\":1,\"name\":\"d\"}")).build()); + ;; try (ResultSet rs = ps.executeQuery()) { + ;; if (!rs.next()) { + ;; throw new IllegalStateException("Should have at least one record"); + ;; } + + ;; // n1=a, n2=d + ;; return String.format("n1=%s, n2=%s", rs.getString(1), rs.getString(2)); + ;; } + ;; } + ;; } + + ;; static long insert(ClickHouseNode server, String table) throws ClickHouseException { + ;; try (ClickHouseClient client = ClickHouseClient.newInstance(server.getProtocol())) { + ;; ClickHouseRequest.Mutation request = client.read(server).write().table(table) + ;; .format(ClickHouseFormat.RowBinary); + ;; ClickHouseConfig config = request.getConfig(); + ;; CompletableFuture future; + ;; // back-pressuring is not supported, you can adjust the first two arguments + ;; try (ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance() + ;; .createPipedOutputStream(config, (Runnable) null)) { + ;; // in async mode, which is default, execution happens in a worker thread + ;; future = request.data(stream.getInputStream()).execute(); + + ;; // writing happens in main thread + ;; for (int i = 0; i < 10_000; i++) { + ;; BinaryStreamUtils.writeString(stream, String.valueOf(i % 16)); + ;; BinaryStreamUtils.writeNonNull(stream); + ;; BinaryStreamUtils.writeString(stream, UUID.randomUUID().toString()); + ;; } + ;; } + + ;; // response should be always closed + ;; try (ClickHouseResponse response = future.get()) { + ;; ClickHouseResponseSummary summary = response.getSummary(); + ;; return summary.getWrittenRows(); + ;; } + ;; } catch (InterruptedException e) { + ;; Thread.currentThread().interrupt(); + ;; throw ClickHouseException.forCancellation(e, server); + ;; } catch (ExecutionException | IOException e) { + ;; throw ClickHouseException.of(e, server); + ;; } + ;; } + + +(defn create-file [file-path content] + (with-open [wrtr (io/writer file-path)] + (.write wrtr content))) + +(create-file "f.csv" "1,one\n2,two") + +(mt/db) + +(def spec (sql-jdbc.conn/connection-details->spec :clickhouse (:details (mt/db)))) + +(sql-jdbc.execute/do-with-connection-with-options + :clickhouse spec nil + (fn [^java.sql.Connection conn] + (jdbc/execute! {:connection conn} "SELECT 1"))) + +(let [props (doto (java.util.Properties.) + (.setProperty "localFile" "true")) + f (io/file "f.csv") + spec (sql-jdbc.conn/connection-details->spec :clickhouse (:details (mt/db)))] + #_(when (.exists f) + (io/delete-file f)) + + (qp.writeback/execute-write-sql! (mt/id) (str "drop table if exists test_load_infile_with_params;")) + (qp.writeback/execute-write-sql! (mt/id) (str "create table test_load_infile_with_params(n Int32, s String) engine=MergeTree ORDER BY ()")) + ;; (qp.writeback/execute-write-sql! (mt/id) "insert into test_load_infile_with_params from infile 'f.csv' FORMAT CSV") + (with-open [conn (.getConnection (metabase.driver.sql-jdbc.execute/do-with-resolved-connection-data-source :clickhouse spec {})) + stmt (.prepareStatement conn "insert into test_load_infile_with_params from infile ? format CSV")] + (.setString stmt 1 (.getName f)) + (.addBatch stmt) + (.setString stmt 1 (.getName f)) + (.addBatch stmt) + (.setString stmt 1 (str (.getName f) "!")) + (.addBatch stmt) + (.executeBatch stmt)) + #_(sql-jdbc.execute/do-with-connection-with-options + :clickhouse spec nil + (fn [^java.sql.Connection conn] + (with-open [stmt (.prepareStatement conn "insert into test_load_infile_with_params from infile ? format CSV")] + (.setString stmt 1 (.getName f)) + (.addBatch stmt) + (.setString stmt 1 (.getName f)) + (.addBatch stmt) + (.setString stmt 1 (str (.getName f) "!")) + (.addBatch stmt) + (.executeBatch stmt))))) + +;; Use non-csv formatted data +;; // 'format RowBinary' is the hint to use streaming mode, you may use different +;; // format like JSONEachRow as needed +;; sql = String.format("insert into %s format RowBinary", TABLE_NAME); +;; try (PreparedStatement ps = conn.prepareStatement(sql)) { +;; // it's streaming so there's only one parameter(could be one of String, byte[], +;; // InputStream, File, ClickHouseWriter), and you don't have to process batch by +;; // batch +;; ps.setObject(1, new ClickHouseWriter() { +;; @Override +;; public void write(ClickHouseOutputStream output) throws IOException { +;; // this will be executed in a separate thread +;; for (int i = 0; i < 1_000_000; i++) { +;; output.writeUnicodeString("a-" + i); +;; output.writeBoolean(false); // non-null +;; output.writeUnicodeString("b-" + i); +;; } +;; } +;; }); +;; ps.executeUpdate(); +;; } +;; ByteArrayInputStream + ;; try (InputStream inputStream = Files.newInputStream(file); + ;; ClickHouseStatement statement = clickHouseConnection.createStatement()) { + ;; statement.write() + ;; .table(tableName)) + ;; .option("format_csv_delimiter", ";") + ;; .data(inputStream, ClickHouseFormat.CSV) + ;; .send(); + ;; } + + + +;;;; EITHER THIS + + ;; // 4. fastest(close to Java client) but requires manual serialization and it's + ;; // NOT portable(as it's limited to ClickHouse) + ;; // 'format RowBinary' is the hint to use streaming mode, you may use different + ;; // format like JSONEachRow as needed + ;; sql = String.format("insert into %s format RowBinary", TABLE_NAME); + ;; try (PreparedStatement ps = conn.prepareStatement(sql)) { + ;; // it's streaming so there's only one parameter(could be one of String, byte[], + ;; // InputStream, File, ClickHouseWriter), and you don't have to process batch by + ;; // batch + ;; ps.setObject(1, new ClickHouseWriter() { + ;; @Override + ;; public void write(ClickHouseOutputStream output) throws IOException { + ;; // this will be executed in a separate thread + ;; for (int i = 0; i < 1_000_000; i++) { + ;; output.writeUnicodeString("a-" + i); + ;; output.writeBoolean(false); // non-null + ;; output.writeUnicodeString("b-" + i); + ;; } + ;; } + ;; }); + ;; ps.executeUpdate(); + ;; } + + ;; return count; + ;; } \ No newline at end of file