diff --git a/chdb/driver/driver.go b/chdb/driver/driver.go index 22f7a57..bd0c101 100644 --- a/chdb/driver/driver.go +++ b/chdb/driver/driver.go @@ -214,6 +214,12 @@ func NewConnect(opts map[string]string) (ret *connector, err error) { if ok { ret.udfPath = udfPath } + if ret.session == nil { + ret.session, err = chdb.NewSession() + if err != nil { + return nil, err + } + } return } @@ -243,7 +249,8 @@ type conn struct { bufferSize int useUnsafe bool session *chdb.Session - QueryFun queryHandle + + QueryFun queryHandle } func prepareValues(values []driver.Value) []driver.NamedValue { @@ -267,6 +274,7 @@ func (c *conn) SetupQueryFun() { if c.session != nil { c.QueryFun = c.session.Query } + } func (c *conn) Query(query string, values []driver.Value) (driver.Rows, error) { @@ -334,7 +342,7 @@ func (c *conn) QueryContext(ctx context.Context, query string, args []driver.Nam } buf := result.Buf() - if buf == nil { + if len(buf) == 0 { return nil, fmt.Errorf("result is nil") } return c.driverType.PrepareRows(result, buf, c.bufferSize, c.useUnsafe) diff --git a/chdb/driver/driver_test.go b/chdb/driver/driver_test.go index ebbfcb1..d765e09 100644 --- a/chdb/driver/driver_test.go +++ b/chdb/driver/driver_test.go @@ -9,6 +9,39 @@ import ( "github.com/chdb-io/chdb-go/chdb" ) +var ( + session *chdb.Session +) + +func globalSetup() error { + sess, err := chdb.NewSession() + if err != nil { + return err + } + session = sess + return nil +} + +func globalTeardown() { + session.Cleanup() + session.Close() +} + +func TestMain(m *testing.M) { + if err := globalSetup(); err != nil { + fmt.Println("Global setup failed:", err) + os.Exit(1) + } + // Run all tests. + exitCode := m.Run() + + // Global teardown: clean up any resources here. + globalTeardown() + + // Exit with the code returned by m.Run(). + os.Exit(exitCode) +} + func TestDb(t *testing.T) { db, err := sql.Open("chdb", "") if err != nil { @@ -112,37 +145,74 @@ func TestDbWithOpt(t *testing.T) { } func TestDbWithSession(t *testing.T) { - sessionDir, err := os.MkdirTemp("", "unittest-sessiondata") + + session.Query( + "CREATE TABLE IF NOT EXISTS TestDbWithSession (id UInt32) ENGINE = MergeTree() ORDER BY id;") + + session.Query("INSERT INTO TestDbWithSession VALUES (1), (2), (3);") + + ret, err := session.Query("SELECT * FROM TestDbWithSession;") + if err != nil { + t.Fatalf("Query fail, err: %s", err) + } + if string(ret.Buf()) != "1\n2\n3\n" { + t.Errorf("Query result should be 1\n2\n3\n, got %s", string(ret.Buf())) + } + db, err := sql.Open("chdb", fmt.Sprintf("session=%s", session.ConnStr())) + if err != nil { + t.Fatalf("open db fail, err: %s", err) + } + if db.Ping() != nil { + t.Fatalf("ping db fail, err: %s", err) + } + rows, err := db.Query("select * from TestDbWithSession;") if err != nil { - t.Fatalf("create temp directory fail, err: %s", err) + t.Fatalf("exec create function fail, err: %s", err) } - defer os.RemoveAll(sessionDir) - session, err := chdb.NewSession(sessionDir) + defer rows.Close() + cols, err := rows.Columns() if err != nil { - t.Fatalf("new session fail, err: %s", err) + t.Fatalf("get result columns fail, err: %s", err) + } + if len(cols) != 1 { + t.Fatalf("result columns length shoule be 3, actual: %d", len(cols)) + } + var bar = 0 + var count = 1 + for rows.Next() { + err = rows.Scan(&bar) + if err != nil { + t.Fatalf("scan fail, err: %s", err) + } + if bar != count { + t.Fatalf("result is not match, want: %d actual: %d", count, bar) + } + count++ } - defer session.Cleanup() +} - session.Query("CREATE DATABASE IF NOT EXISTS testdb; " + - "CREATE TABLE IF NOT EXISTS testdb.testtable (id UInt32) ENGINE = MergeTree() ORDER BY id;") +func TestDbWithConnection(t *testing.T) { - session.Query("USE testdb; INSERT INTO testtable VALUES (1), (2), (3);") + session.Query( + "CREATE TABLE IF NOT EXISTS TestDbWithConnection (id UInt32) ENGINE = MergeTree() ORDER BY id;") - ret, err := session.Query("SELECT * FROM testtable;") + session.Query("INSERT INTO TestDbWithConnection VALUES (1), (2), (3);") + + ret, err := session.Query("SELECT * FROM TestDbWithConnection;") if err != nil { t.Fatalf("Query fail, err: %s", err) } if string(ret.Buf()) != "1\n2\n3\n" { t.Errorf("Query result should be 1\n2\n3\n, got %s", string(ret.Buf())) } - db, err := sql.Open("chdb", fmt.Sprintf("session=%s", sessionDir)) + db, err := sql.Open("chdb", fmt.Sprintf("session=%s", session.ConnStr())) if err != nil { t.Fatalf("open db fail, err: %s", err) } if db.Ping() != nil { t.Fatalf("ping db fail, err: %s", err) } - rows, err := db.Query("select * from testtable;") + rows, err := db.Query("select * from TestDbWithConnection;") if err != nil { t.Fatalf("exec create function fail, err: %s", err) } @@ -168,37 +238,73 @@ func TestDbWithSession(t *testing.T) { } } -func TestQueryRow(t *testing.T) { - sessionDir, err := os.MkdirTemp("", "unittest-sessiondata") +func TestDbWithConnectionSqlDriverOnly(t *testing.T) { + db, err := sql.Open("chdb", fmt.Sprintf("session=%s", session.ConnStr())) + if err != nil { + t.Fatalf("open db fail, err: %s", err) + } + if db.Ping() != nil { + t.Fatalf("ping db fail, err: %s", err) + } + + _, err = db.Exec( + "CREATE TABLE IF NOT EXISTS TestDbWithConnectionSqlDriverOnly (id UInt32) ENGINE = MergeTree() ORDER BY id;") if err != nil { - t.Fatalf("create temp directory fail, err: %s", err) + t.Fatalf("could not create database & table: %s", err) } - defer os.RemoveAll(sessionDir) - session, err := chdb.NewSession(sessionDir) + _, err = db.Exec("INSERT INTO TestDbWithConnectionSqlDriverOnly VALUES (1), (2), (3);") if err != nil { - t.Fatalf("new session fail, err: %s", err) + t.Fatalf("could not insert rows in the table: %s", err) } - defer session.Cleanup() - session.Query("CREATE DATABASE IF NOT EXISTS testdb; " + - "CREATE TABLE IF NOT EXISTS testdb.testtable (id UInt32) ENGINE = MergeTree() ORDER BY id;") - session.Query("USE testdb; INSERT INTO testtable VALUES (1), (2), (3);") + rows, err := db.Query("select * from TestDbWithConnectionSqlDriverOnly;") + if err != nil { + t.Fatalf("exec create function fail, err: %s", err) + } + defer rows.Close() + cols, err := rows.Columns() + if err != nil { + t.Fatalf("get result columns fail, err: %s", err) + } + if len(cols) != 1 { + t.Fatalf("result columns length shoule be 3, actual: %d", len(cols)) + } + var bar = 0 + var count = 1 + for rows.Next() { + err = rows.Scan(&bar) + if err != nil { + t.Fatalf("scan fail, err: %s", err) + } + if bar != count { + t.Fatalf("result is not match, want: %d actual: %d", count, bar) + } + count++ + } +} + +func TestQueryRow(t *testing.T) { + + session.Query( + "CREATE TABLE IF NOT EXISTS TestQueryRow (id UInt32) ENGINE = MergeTree() ORDER BY id;") - ret, err := session.Query("SELECT * FROM testtable;") + session.Query(" INSERT INTO TestQueryRow VALUES (1), (2), (3);") + + ret, err := session.Query("SELECT * FROM TestQueryRow;") if err != nil { t.Fatalf("Query fail, err: %s", err) } if string(ret.Buf()) != "1\n2\n3\n" { t.Errorf("Query result should be 1\n2\n3\n, got %s", string(ret.Buf())) } - db, err := sql.Open("chdb", fmt.Sprintf("session=%s", sessionDir)) + db, err := sql.Open("chdb", fmt.Sprintf("session=%s", session.ConnStr())) if err != nil { t.Fatalf("open db fail, err: %s", err) } if db.Ping() != nil { t.Fatalf("ping db fail, err: %s", err) } - rows := db.QueryRow("select * from testtable;") + rows := db.QueryRow("select * from TestQueryRow;") var bar = 0 var count = 1 @@ -217,20 +323,11 @@ func TestQueryRow(t *testing.T) { } func TestExec(t *testing.T) { - sessionDir, err := os.MkdirTemp("", "unittest-sessiondata") - if err != nil { - t.Fatalf("create temp directory fail, err: %s", err) - } - defer os.RemoveAll(sessionDir) - session, err := chdb.NewSession(sessionDir) - if err != nil { - t.Fatalf("new session fail, err: %s", err) - } - defer session.Cleanup() - session.Query("CREATE DATABASE IF NOT EXISTS testdb; " + - "CREATE TABLE IF NOT EXISTS testdb.testtable (id UInt32) ENGINE = MergeTree() ORDER BY id;") - db, err := sql.Open("chdb", fmt.Sprintf("session=%s", sessionDir)) + session.Query( + "CREATE TABLE IF NOT EXISTS TestExec (id UInt32) ENGINE = MergeTree() ORDER BY id;") + + db, err := sql.Open("chdb", fmt.Sprintf("session=%s", session.ConnStr())) if err != nil { t.Fatalf("open db fail, err: %s", err) } @@ -238,11 +335,25 @@ func TestExec(t *testing.T) { t.Fatalf("ping db fail, err: %s", err) } - _, err = db.Exec("INSERT INTO testdb.testtable VALUES (1), (2), (3);") + tables, err := db.Query("SHOW TABLES;") + if err != nil { + t.Fatalf(err.Error()) + } + defer tables.Close() + for tables.Next() { + var tblName string + if err := tables.Scan(&tblName); err != nil { + t.Fatal(err) + } + t.Log(tblName) + fmt.Printf("tblName: %v\n", tblName) + } + + _, err = db.Exec("INSERT INTO TestExec VALUES (1), (2), (3);") if err != nil { t.Fatalf("exec failed, err: %s", err) } - rows := db.QueryRow("select * from testdb.testtable;") + rows := db.QueryRow("select * from TestExec;") var bar = 0 var count = 1 diff --git a/chdb/driver/parquet_test.go b/chdb/driver/parquet_test.go index 44fdcc1..d21293c 100644 --- a/chdb/driver/parquet_test.go +++ b/chdb/driver/parquet_test.go @@ -3,10 +3,7 @@ package chdbdriver import ( "database/sql" "fmt" - "os" "testing" - - "github.com/chdb-io/chdb-go/chdb" ) func TestDbWithParquet(t *testing.T) { @@ -49,37 +46,74 @@ func TestDbWithParquet(t *testing.T) { } func TestDBWithParquetSession(t *testing.T) { - sessionDir, err := os.MkdirTemp("", "unittest-sessiondata") + + session.Query( + "CREATE TABLE IF NOT EXISTS TestDBWithParquetSession (id UInt32) ENGINE = MergeTree() ORDER BY id;") + + session.Query("INSERT INTO TestDBWithParquetSession VALUES (1), (2), (3);") + + ret, err := session.Query("SELECT * FROM TestDBWithParquetSession;") if err != nil { - t.Fatalf("create temp directory fail, err: %s", err) + t.Fatalf("Query fail, err: %s", err) } - defer os.RemoveAll(sessionDir) - session, err := chdb.NewSession(sessionDir) + if string(ret.Buf()) != "1\n2\n3\n" { + t.Errorf("Query result should be 1\n2\n3\n, got %s", string(ret.Buf())) + } + db, err := sql.Open("chdb", fmt.Sprintf("session=%s;driverType=%s", session.ConnStr(), "PARQUET")) if err != nil { - t.Fatalf("new session fail, err: %s", err) + t.Fatalf("open db fail, err: %s", err) } - defer session.Cleanup() + if db.Ping() != nil { + t.Fatalf("ping db fail, err: %s", err) + } + rows, err := db.Query("select * from TestDBWithParquetSession;") + if err != nil { + t.Fatalf("exec create function fail, err: %s", err) + } + defer rows.Close() + cols, err := rows.Columns() + if err != nil { + t.Fatalf("get result columns fail, err: %s", err) + } + if len(cols) != 1 { + t.Fatalf("result columns length shoule be 3, actual: %d", len(cols)) + } + var bar = 0 + var count = 1 + for rows.Next() { + err = rows.Scan(&bar) + if err != nil { + t.Fatalf("scan fail, err: %s", err) + } + if bar != count { + t.Fatalf("result is not match, want: %d actual: %d", count, bar) + } + count++ + } +} + +func TestDBWithParquetConnection(t *testing.T) { - session.Query("CREATE DATABASE IF NOT EXISTS testdb; " + - "CREATE TABLE IF NOT EXISTS testdb.testtable (id UInt32) ENGINE = MergeTree() ORDER BY id;") + session.Query( + "CREATE TABLE IF NOT EXISTS TestDBWithParquetConnection (id UInt32) ENGINE = MergeTree() ORDER BY id;") - session.Query("INSERT INTO testdb.testtable VALUES (1), (2), (3);") + session.Query("INSERT INTO TestDBWithParquetConnection VALUES (1), (2), (3);") - ret, err := session.Query("SELECT * FROM testdb.testtable;") + ret, err := session.Query("SELECT * FROM TestDBWithParquetConnection;") if err != nil { t.Fatalf("Query fail, err: %s", err) } if string(ret.Buf()) != "1\n2\n3\n" { t.Errorf("Query result should be 1\n2\n3\n, got %s", string(ret.Buf())) } - db, err := sql.Open("chdb", fmt.Sprintf("session=%s;driverType=%s", sessionDir, "PARQUET")) + db, err := sql.Open("chdb", fmt.Sprintf("session=%s;driverType=%s", session.ConnStr(), "PARQUET")) if err != nil { t.Fatalf("open db fail, err: %s", err) } if db.Ping() != nil { t.Fatalf("ping db fail, err: %s", err) } - rows, err := db.Query("select * from testdb.testtable;") + rows, err := db.Query("select * from TestDBWithParquetConnection;") if err != nil { t.Fatalf("exec create function fail, err: %s", err) } diff --git a/chdb/session.go b/chdb/session.go index 93d5fcb..a93f9d7 100644 --- a/chdb/session.go +++ b/chdb/session.go @@ -1,38 +1,55 @@ package chdb import ( - "io/ioutil" + "fmt" "os" "path/filepath" "github.com/chdb-io/chdb-go/chdbstable" ) +var ( + globalSession *Session +) + type Session struct { - path string - isTemp bool + conn *chdbstable.ChdbConn + connStr string + path string + isTemp bool } // NewSession creates a new session with the given path. // If path is empty, a temporary directory is created. // Note: The temporary directory is removed when Close is called. func NewSession(paths ...string) (*Session, error) { + if globalSession != nil { + return globalSession, nil + } + path := "" if len(paths) > 0 { path = paths[0] } - + isTemp := false if path == "" { // Create a temporary directory - tempDir, err := ioutil.TempDir("", "chdb_") + tempDir, err := os.MkdirTemp("", "chdb_") if err != nil { return nil, err } path = tempDir - return &Session{path: path, isTemp: true}, nil + isTemp = true + } + connStr := fmt.Sprintf("file:%s/chdb.db", path) - return &Session{path: path, isTemp: false}, nil + conn, err := initConnection(connStr) + if err != nil { + return nil, err + } + globalSession = &Session{connStr: connStr, path: path, isTemp: isTemp, conn: conn} + return globalSession, nil } // Query calls queryToBuffer with a default output format of "CSV" if not provided. @@ -41,7 +58,8 @@ func (s *Session) Query(queryStr string, outputFormats ...string) (result *chdbs if len(outputFormats) > 0 { outputFormat = outputFormats[0] } - return queryToBuffer(queryStr, outputFormat, s.path, "") + + return connQueryToBuffer(s.conn, queryStr, outputFormat) } // Close closes the session and removes the temporary directory @@ -49,9 +67,11 @@ func (s *Session) Query(queryStr string, outputFormats ...string) (result *chdbs // temporary directory is created when NewSession was called with an empty path. func (s *Session) Close() { // Remove the temporary directory if it starts with "chdb_" + s.conn.Close() if s.isTemp && filepath.Base(s.path)[:5] == "chdb_" { s.Cleanup() } + globalSession = nil } // Cleanup closes the session and removes the directory. @@ -65,6 +85,10 @@ func (s *Session) Path() string { return s.path } +func (s *Session) ConnStr() string { + return s.connStr +} + // IsTemp returns whether the session is temporary. func (s *Session) IsTemp() bool { return s.isTemp diff --git a/chdb/session_test.go b/chdb/session_test.go index f99ea62..c414a12 100644 --- a/chdb/session_test.go +++ b/chdb/session_test.go @@ -2,7 +2,6 @@ package chdb import ( "os" - "path/filepath" "testing" ) @@ -51,52 +50,3 @@ func TestSessionCleanup(t *testing.T) { t.Errorf("Session directory should be removed after Cleanup: %s", session.Path()) } } - -// TestQuery tests the Query method of the session. -func TestQuery(t *testing.T) { - path := filepath.Join(os.TempDir(), "chdb_test") - defer os.RemoveAll(path) - session, _ := NewSession(path) - defer session.Cleanup() - - session.Query("CREATE DATABASE IF NOT EXISTS testdb; " + - "CREATE TABLE IF NOT EXISTS testdb.testtable (id UInt32) ENGINE = MergeTree() ORDER BY id;") - - session.Query("USE testdb; INSERT INTO testtable VALUES (1), (2), (3);") - - ret, err := session.Query("SELECT * FROM testtable;") - if err != nil { - t.Errorf("Query failed: %s", err) - } - if string(ret.Buf()) != "1\n2\n3\n" { - t.Errorf("Query result should be 1\n2\n3\n, got %s", string(ret.Buf())) - } -} - -func TestSessionPathAndIsTemp(t *testing.T) { - // Create a new session and check its Path and IsTemp - session, _ := NewSession() - defer session.Cleanup() - - if session.Path() == "" { - t.Errorf("Session path should not be empty") - } - - if !session.IsTemp() { - t.Errorf("Session should be temporary") - } - - // Create a new session with a specific path and check its Path and IsTemp - path := filepath.Join(os.TempDir(), "chdb_test2") - defer os.RemoveAll(path) - session, _ = NewSession(path) - defer session.Cleanup() - - if session.Path() != path { - t.Errorf("Session path should be %s, got %s", path, session.Path()) - } - - if session.IsTemp() { - t.Errorf("Session should not be temporary") - } -} diff --git a/chdb/wrapper.go b/chdb/wrapper.go index be5ab63..ad0eb40 100644 --- a/chdb/wrapper.go +++ b/chdb/wrapper.go @@ -40,3 +40,16 @@ func queryToBuffer(queryStr, outputFormat, path, udfPath string) (result *chdbst // Call QueryStable with the constructed arguments return chdbstable.QueryStable(len(argv), argv) } + +func initConnection(connStr string) (result *chdbstable.ChdbConn, err error) { + argv := []string{connStr} + // Call NewConnection with the constructed arguments + return chdbstable.NewConnection(len(argv), argv) +} + +func connQueryToBuffer(conn *chdbstable.ChdbConn, queryStr, outputFormat string) (result *chdbstable.LocalResult, err error) { + if outputFormat == "" { + outputFormat = "CSV" + } + return conn.QueryConn(queryStr, outputFormat) +} diff --git a/chdbstable/chdb.go b/chdbstable/chdb.go index c02aba2..48fb506 100644 --- a/chdbstable/chdb.go +++ b/chdbstable/chdb.go @@ -8,6 +8,7 @@ package chdbstable import "C" import ( "errors" + "fmt" "runtime" "unsafe" ) @@ -29,6 +30,10 @@ type LocalResult struct { cResult *C.struct_local_result_v2 } +type ChdbConn struct { + conn *C.struct_chdb_conn +} + // newLocalResult creates a new LocalResult and sets a finalizer to free C memory. func newLocalResult(cResult *C.struct_local_result_v2) *LocalResult { result := &LocalResult{cResult: cResult} @@ -36,6 +41,30 @@ func newLocalResult(cResult *C.struct_local_result_v2) *LocalResult { return result } +// newChdbConn creates a new ChdbConn and sets a finalizer to close the connection (and thus free the memory) +func newChdbConn(conn *C.struct_chdb_conn) *ChdbConn { + result := &ChdbConn{conn: conn} + runtime.SetFinalizer(result, closeChdbConn) + return result +} + +func NewConnection(argc int, argv []string) (*ChdbConn, error) { + cArgv := make([]*C.char, len(argv)) + for i, s := range argv { + cArgv[i] = C.CString(s) + defer C.free(unsafe.Pointer(cArgv[i])) + } + conn := C.connect_chdb(C.int(argc), &cArgv[0]) + if conn == nil { + return nil, fmt.Errorf("could not create a chdb connection") + } + return newChdbConn(*conn), nil +} + +func closeChdbConn(conn *ChdbConn) { + C.close_conn(&conn.conn) +} + // freeLocalResult is called by the garbage collector. func freeLocalResult(result *LocalResult) { C.free_result_v2(result.cResult) @@ -62,6 +91,32 @@ func QueryStable(argc int, argv []string) (result *LocalResult, err error) { return newLocalResult(cResult), nil } +// QueryStable calls the C function query_conn. +func (c *ChdbConn) QueryConn(queryStr string, formatStr string) (result *LocalResult, err error) { + + query := C.CString(queryStr) + format := C.CString(formatStr) + // free the strings in the C heap + defer C.free(unsafe.Pointer(query)) + defer C.free(unsafe.Pointer(format)) + + cResult := C.query_conn(c.conn, query, format) + if cResult == nil { + // According to the C ABI of chDB v1.2.0, the C function query_stable_v2 + // returns nil if the query returns no data. This is not an error. We + // will change this behavior in the future. + return newLocalResult(cResult), nil + } + if cResult.error_message != nil { + return nil, &ChdbError{msg: C.GoString(cResult.error_message)} + } + return newLocalResult(cResult), nil +} + +func (c *ChdbConn) Close() { + C.close_conn(&c.conn) +} + // Accessor methods to access fields of the local_result_v2 struct. func (r *LocalResult) Buf() []byte { if r.cResult == nil { diff --git a/chdbstable/chdb.h b/chdbstable/chdb.h index 821e3e8..ebc2009 100644 --- a/chdbstable/chdb.h +++ b/chdbstable/chdb.h @@ -1,10 +1,15 @@ #pragma once #ifdef __cplusplus +# include # include # include +# include +# include +# include extern "C" { #else +# include # include # include #endif @@ -20,6 +25,18 @@ struct local_result uint64_t bytes_read; }; +#ifdef __cplusplus +struct local_result_v2 +{ + char * buf = nullptr; + size_t len = 0; + void * _vec = nullptr; // std::vector *, for freeing + double elapsed = 0.0; + uint64_t rows_read = 0; + uint64_t bytes_read = 0; + char * error_message = nullptr; +}; +#else struct local_result_v2 { char * buf; @@ -30,6 +47,7 @@ struct local_result_v2 uint64_t bytes_read; char * error_message; }; +#endif CHDB_EXPORT struct local_result * query_stable(int argc, char ** argv); CHDB_EXPORT void free_result(struct local_result * result); @@ -38,5 +56,68 @@ CHDB_EXPORT struct local_result_v2 * query_stable_v2(int argc, char ** argv); CHDB_EXPORT void free_result_v2(struct local_result_v2 * result); #ifdef __cplusplus -} +struct query_request +{ + std::string query; + std::string format; +}; + +struct query_queue +{ + std::mutex mutex; + std::condition_variable query_cv; // For query submission + std::condition_variable result_cv; // For query result retrieval + query_request current_query; + local_result_v2 * current_result = nullptr; + bool has_query = false; + bool shutdown = false; + bool cleanup_done = false; +}; #endif + +/** + * Connection structure for chDB + * Contains server instance, connection state, and query processing queue + */ +struct chdb_conn +{ + void * server; /* ClickHouse LocalServer instance */ + bool connected; /* Connection state flag */ + void * queue; /* Query processing queue */ +}; + +/** + * Creates a new chDB connection. + * Only one active connection is allowed per process. + * Creating a new connection with different path requires closing existing connection. + * + * @param argc Number of command-line arguments + * @param argv Command-line arguments array (--path= to specify database location) + * @return Pointer to connection pointer, or NULL on failure + * @note Default path is ":memory:" if not specified + */ +CHDB_EXPORT struct chdb_conn ** connect_chdb(int argc, char ** argv); + +/** + * Closes an existing chDB connection and cleans up resources. + * Thread-safe function that handles connection shutdown and cleanup. + * + * @param conn Pointer to connection pointer to close + */ +CHDB_EXPORT void close_conn(struct chdb_conn ** conn); + +/** + * Executes a query on the given connection. + * Thread-safe function that handles query execution in a separate thread. + * + * @param conn Connection to execute query on + * @param query SQL query string to execute + * @param format Output format string (e.g., "CSV", default format) + * @return Query result structure containing output or error message + * @note Returns error result if connection is invalid or closed + */ +CHDB_EXPORT struct local_result_v2 * query_conn(struct chdb_conn * conn, const char * query, const char * format); + +#ifdef __cplusplus +} +#endif \ No newline at end of file