From 2c870d00e7fa693e57b5f1321f3f7e3a53ffebf8 Mon Sep 17 00:00:00 2001 From: colin Date: Fri, 14 Jun 2024 16:27:08 +0800 Subject: [PATCH] Add TsFile Python. --- .github/workflows/unit-test.yml | 2 +- .gitignore | 8 + pom.xml | 24 +++ python/README.md | 72 +++++++ python/examlpes.py | 79 +++++++ python/pom.xml | 133 ++++++++++++ python/requirements.txt | 23 ++ python/setup.py | 99 +++++++++ python/test.py | 145 +++++++++++++ python/tsfile/__init__.py | 18 ++ python/tsfile/tsfile.pxd | 100 +++++++++ python/tsfile/tsfile.py | 138 ++++++++++++ python/tsfile/tsfile_pywrapper.pyx | 330 +++++++++++++++++++++++++++++ 13 files changed, 1170 insertions(+), 1 deletion(-) create mode 100644 python/README.md create mode 100644 python/examlpes.py create mode 100644 python/pom.xml create mode 100644 python/requirements.txt create mode 100644 python/setup.py create mode 100644 python/test.py create mode 100644 python/tsfile/__init__.py create mode 100644 python/tsfile/tsfile.pxd create mode 100644 python/tsfile/tsfile.py create mode 100644 python/tsfile/tsfile_pywrapper.pyx diff --git a/.github/workflows/unit-test.yml b/.github/workflows/unit-test.yml index f8d79fede..a95c39407 100644 --- a/.github/workflows/unit-test.yml +++ b/.github/workflows/unit-test.yml @@ -91,4 +91,4 @@ jobs: - name: Build and test with Maven (All others) shell: bash run: | - ./mvnw${{ steps.platform_suffix.outputs.platform_suffix }} -P with-java,with-cpp clean verify + ./mvnw${{ steps.platform_suffix.outputs.platform_suffix }} -P with-java,with-cpp,with-python clean verify diff --git a/.gitignore b/.gitignore index 77f4d73a8..e9f65e201 100644 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,11 @@ docs/node_modules/ docs/src/.vuepress/.cache/ docs/src/.vuepress/.temp/ docs/src/.vuepress/dist/ + +# python files +python/build +python/tsfile/__pycache__ +python/tsfile/*.so* +python/tsfile/TsFile_cwrapper.h +python/tsfile/*.cpp +python/data diff --git a/pom.xml b/pom.xml index 1a3c1b4ef..a3f9982a1 100644 --- a/pom.xml +++ b/pom.xml @@ -121,6 +121,7 @@ **/.clang-format **/tsfile/tsfile_pywrapper.cpp + **/venv/** @@ -579,6 +580,13 @@ cpp + + + with-python + + python + + .java-9-and-above @@ -653,6 +661,8 @@ linux linux-x86_64 Unix Makefiles + venv/bin/ + python3 @@ -669,6 +679,8 @@ linux linux-amd64 Unix Makefiles + venv/bin/ + python3 @@ -685,6 +697,8 @@ linux linux-${os.arch} Unix Makefiles + venv/bin/ + python3 @@ -700,6 +714,8 @@ mac mac-x86_64 Unix Makefiles + venv/bin/ + python3 @@ -715,6 +731,8 @@ mac mac-aarch64 Unix Makefiles + venv/bin/ + python3 @@ -732,6 +750,8 @@ MinGW Makefiles + venv/Scripts/ + python @@ -749,6 +769,8 @@ MinGW Makefiles + venv/Scripts/ + python @@ -766,6 +788,8 @@ MinGW Makefiles + venv/Scripts/ + python diff --git a/python/README.md b/python/README.md new file mode 100644 index 000000000..b8b6fa82d --- /dev/null +++ b/python/README.md @@ -0,0 +1,72 @@ + + +# TsFile Python Document + +
+___________    ___________.__.__          
+\__    ___/____\_   _____/|__|  |   ____  
+  |    | /  ___/|    __)  |  |  | _/ __ \ 
+  |    | \___ \ |     \   |  |  |_\  ___/ 
+  |____|/____  >\___  /   |__|____/\___  >  version 1.0.0
+             \/     \/                 \/  
+
+ + +## Introduction + +This directory contians the Python implementation of TsFile. The Python version of TsFile is implemented based on the CPP version. It utilizes the Cython package to integrate the read and write capabilities of TsFile CPP into the Python environment. Users can read and write TsFile just like they use read_csv and write_csv in Pandas. + +The source code can be found in the `./tsfile` directory. Files ending with `.pyx` and `.pyd` are wrapper code written in Cython. The `tsfile/tsfile.py` defines some user interfaces. You can find some examples of reading and writing in the `.examples/examples.py`. + + +## How to make contributions + +Using pylint to check Python code is recommended. However, there is no suitable style checking tool for Cython code, and this part of the code should be consistent with the Python style required by pylint. + +**Feature List** +- [ ] In pywrapper, invoke the batch reading interface implemented in tsfile_CPP. + + + +## Build + +You can build tsfile_python by mvn or shell command. + +Before constructing tsfile_python, it is necessary to build [tsfile_cpp](../cpp/README.md) first, because tsfile_python relies on the shared library files provided by tsfile_cpp. + +Build by mvn in this directory: + +```sh +mvn initialize compile test +``` + +Build by mvn in tsfile root directory: + +```sh +mvn clean package -P with-python +``` + +Build by python command: +```sh +python3 setup.py build_ext --inplace +``` + diff --git a/python/examlpes.py b/python/examlpes.py new file mode 100644 index 000000000..c0106cd29 --- /dev/null +++ b/python/examlpes.py @@ -0,0 +1,79 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import numpy as np +import pandas as pd +import os + +import tsfile as ts + + +# test writing data +data_dir = os.path.join(os.path.dirname(__file__), "test.tsfile") +TABLE_NAME = "test_table" + +# 1000 rows data +time = np.arange(1, 1001, dtype=np.int64) +level = np.linspace(2000, 3000, num=1000, dtype=np.float32) +num = np.arange(10000, 11000, dtype=np.int64) +df = pd.DataFrame({"Time": time, "level": level, "num": num}) + +if os.path.exists(data_dir): + os.remove(data_dir) +ts.write_tsfile(data_dir, TABLE_NAME, df) + + +# read data we already wrote +# with 20 chunksize +tsfile_ret = ts.read_tsfile(data_dir, TABLE_NAME, ["level", "num"], chunksize=20) +print(tsfile_ret.shape) + +# # with 100 chunksize +tsfile_ret = ts.read_tsfile(data_dir, TABLE_NAME, ["level", "num"], chunksize = 100) +print(tsfile_ret.shape) + +# # get all data +tsfile_ret = ts.read_tsfile(data_dir, TABLE_NAME, ["level", "num"]) +print(tsfile_ret.shape) + +# # with iterator +with ts.read_tsfile(data_dir, TABLE_NAME, ["level", "num"], iterator=True, chunksize=100) as reader: + for chunk in reader: + print(chunk.shape) + +# # with time scale and chunksize +tsfile_ret = ts.read_tsfile(data_dir, TABLE_NAME, + ["level"], start_time=50, end_time=100, chunksize=10) +print(tsfile_ret.shape) + +# with time scale +tsfile_ret = ts.read_tsfile(data_dir, TABLE_NAME, ["num"], start_time=50, end_time=100) +print(tsfile_ret.shape) + + +with ts.read_tsfile( + data_dir, + TABLE_NAME, + ["level", "num"], + iterator=True, + start_time=100, + end_time=500, + chunksize=100, +) as reader: + for chunk in reader: + print(chunk.shape) diff --git a/python/pom.xml b/python/pom.xml new file mode 100644 index 000000000..bef223663 --- /dev/null +++ b/python/pom.xml @@ -0,0 +1,133 @@ + + + + 4.0.0 + + org.apache.tsfile + tsfile-parent + 1.0.1-SNAPSHOT + + tsfile-python + pom + TsFile: Python + + 2.6.0 + 4.0.21 + + tsfile + ${project.build.directory}/build-wrapper-output + + + ${project.basedir} + + + org.codehaus.mojo + exec-maven-plugin + + + + python-venv + initialize + + exec + + + ${python.exe.bin} + + -m + venv + ${project.basedir}/venv + + + + + python-upgrade-pip + initialize + + exec + + + ${python.venv.bin}${python.exe.bin} + + -m + pip + install + --upgrade + pip + + + + + python-install-requirements + initialize + + exec + + + ${python.venv.bin}${python.exe.bin} + + -m + pip + install + -r + ${project.basedir}/requirements.txt + + + + + + + org.codehaus.mojo + exec-maven-plugin + 1.6.0 + + + compile-python-code + compile + + exec + + + python3 + + setup.py + build_ext + --inplace + + + + + run-python-tests + test + + exec + + + ${python.venv.bin}${python.exe.bin} + + ${project.basedir}/test.py + + + + + + + + diff --git a/python/requirements.txt b/python/requirements.txt new file mode 100644 index 000000000..893ae550f --- /dev/null +++ b/python/requirements.txt @@ -0,0 +1,23 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +cython==3.0.10 +numpy==1.26.4 +pandas==2.2.2 + diff --git a/python/setup.py b/python/setup.py new file mode 100644 index 000000000..b2628b498 --- /dev/null +++ b/python/setup.py @@ -0,0 +1,99 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from setuptools import setup, Extension +from setuptools.command.build_ext import build_ext +from Cython.Build import cythonize +import numpy as np +import platform +import shutil +import os + + +def copy_lib_files(system, source_dir, target_dir, file_ext, version=None): + if version: + lib_file_name = f"libtsfile.{file_ext}.{version}" + else: + lib_file_name = f"libtsfile.{file_ext}" + + source = os.path.join(source_dir, lib_file_name) + target = os.path.join(target_dir, lib_file_name) + shutil.copyfile(source, target) + + if system == "Linux" and version: + link_name = os.path.join(target_dir, f"libtsfile.{file_ext}") + if os.path.exists(link_name): + os.remove(link_name) + os.symlink(lib_file_name, link_name) + +def copy_header(source, target): + shutil.copyfile(source, target) + +class BuildExt(build_ext): + def build_extensions(self): + numpy_include = np.get_include() + for ext in self.extensions: + ext.include_dirs.append(numpy_include) + super().build_extensions() + + +project_dir = os.path.dirname(__file__) +libtsfile_shard_dir = os.path.join(project_dir, "..", "cpp", "build", "Debug", "lib") +libtsfile_dir = os.path.join(project_dir, "tsfile") +include_dir = os.path.join(project_dir, "tsfile") +source_file = os.path.join(project_dir, "tsfile", "tsfile_pywrapper.pyx") + +if platform.system() == "Darwin": + copy_lib_files("Darwin", libtsfile_shard_dir, libtsfile_dir, "dylib") +elif platform.system() == "Linux": + copy_lib_files("Linux", libtsfile_shard_dir, libtsfile_dir, "so", "1.0") +else: + raise Exception("Unsupported platform") + +source_include_dir = os.path.join(project_dir, "..", "cpp", "src", "cwrapper", "TsFile-cwrapper.h") +target_include_dir = os.path.join(project_dir, "tsfile", "TsFile-cwrapper.h") +copy_header(source_include_dir, target_include_dir) + + + +ext_modules_tsfile = [ + Extension( + "tsfile.tsfile_pywrapper", + sources=[source_file], + libraries=["tsfile"], + library_dirs=[libtsfile_dir], + include_dirs=[include_dir, np.get_include()], + runtime_library_dirs=[libtsfile_dir], + extra_compile_args=["-std=c++11"], + language="c++" + ) +] + +setup( + name="tsfile", + version="0.1", + description="Tsfile reader and writer for python", + url="https://tsfile.apache.org", + author='"Apache TsFile"', + packages=["tsfile"], + license="Apache 2.0", + ext_modules=cythonize(ext_modules_tsfile), + cmdclass={"build_ext": BuildExt}, + include_dirs=[np.get_include()], + package_data={"tsfile": ["*tsfile/*.so*", "*tsfile/*.dylib", "tsfile/tsfile.py"]} +) diff --git a/python/test.py b/python/test.py new file mode 100644 index 000000000..9fbf4f45c --- /dev/null +++ b/python/test.py @@ -0,0 +1,145 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os +import shutil + +import unittest as ut +import numpy as np +import pandas as pd + + +import tsfile as ts + +TABLE_NAME = "test_table" +DATA_PATH = os.path.join(os.path.dirname(__file__), "data") + + +# test writing data +def test_write_tsfile(): + # test write empty data + df = pd.DataFrame() + ts.write_tsfile(DATA_PATH + "/empty.tsfile", TABLE_NAME, df) + assert not os.path.exists(DATA_PATH + "/empty.tsfile") + + # data without Time + # 1000 rows data + level = np.linspace(2000, 3000, num=1000, dtype=np.float32) + num = np.arange(10000, 11000, dtype=np.int64) + df = pd.DataFrame({"level": level, "num": num}) + with ut.TestCase().assertRaises(AttributeError): + ts.write_tsfile(DATA_PATH + "/no_time.tsfile", TABLE_NAME, df) + + # time with wrong type + time = np.arange(1, 1001, dtype=np.float32) + df = pd.DataFrame({"Time": time, "level": level, "num": num}) + with ut.TestCase().assertRaises(TypeError): + ts.write_tsfile(DATA_PATH + "/wrong_time_type.tsfile", TABLE_NAME, df)\ + + # TXT is not support yet + time = np.arange(1, 1001, dtype=np.int64) + text = np.random.choice(["a", "b", "c"], 1000) + df = pd.DataFrame({"Time": time, "text": text}) + with ut.TestCase().assertRaises(TypeError): + ts.write_tsfile(DATA_PATH + "/txt.tsfile", TABLE_NAME, df) + + # full datatypes test + time = np.arange(1, 1001, dtype=np.int64) # int64 + level = np.linspace(2000, 3000, num=1000, dtype=np.float32) # float32 + num = np.arange(10000, 11000, dtype=np.int64) # int64 + bools = np.random.choice([True, False], 1000) # bool + double = np.random.rand(1000) # double + df = pd.DataFrame({"Time": time, "level": level, "num": num, "bools": bools, "double": double}) + ts.write_tsfile(DATA_PATH + "/full_datatypes.tsfile", TABLE_NAME, df) + +# test reading data +def test_read_tsfile(): + # test read not exist file + with ut.TestCase().assertRaises(FileNotFoundError): + ts.read_tsfile(DATA_PATH + "/notexit.tsfile", TABLE_NAME, ["level", "num"]) + + # test read empty file + with open(DATA_PATH + "/empty.tsfile", "w", encoding="utf-8") as f: + pass + + with ut.TestCase().assertRaises(ValueError): + ts.read_tsfile(DATA_PATH + "/empty.tsfile", TABLE_NAME, ["level", "num"]) + + FILE_NAME= DATA_PATH + "/full_datatypes.tsfile" + # test read data + ## 1. read all data + df = ts.read_tsfile(FILE_NAME, TABLE_NAME, ["level", "num", "bools", "double"]) + assert df.shape == (1000, 5) + assert df["level"].dtype == np.float32 + assert df["Time"].dtype == np.int64 + assert df["num"].dtype == np.int64 + assert df["bools"].dtype == np.bool_ + assert df["double"].dtype == np.float64 + + + ## 2. read with chunksize + df = ts.read_tsfile(FILE_NAME, TABLE_NAME, ["level", "num"], chunksize = 100) + assert df.shape == (100, 3) + assert df["level"].dtype == np.float32 + assert df["Time"].sum() == np.arange(1, 101).sum() + + + ## 3. read with iterator + chunk_num = 0 + with ts.read_tsfile(FILE_NAME, TABLE_NAME, ["level", "num"], iterator=True, chunksize=100) as reader: + for chunk in reader: + assert chunk.shape == (100, 3) + assert chunk["level"].dtype == np.float32 + assert chunk["Time"].sum() == np.arange(1 + chunk_num *100, 101 + chunk_num * 100).sum() + chunk_num += 1 + assert chunk_num == 10 + + + ## 4. read with time scale + df = ts.read_tsfile(FILE_NAME, TABLE_NAME, ["num"], start_time=50, end_time=99) + assert df.shape == (50, 2) + assert df["num"][0] == 10049 + assert df["num"][9] == 10058 + + ## 5. read with time scale and chunksize + df = ts.read_tsfile(FILE_NAME, TABLE_NAME, ["num"], start_time=50, end_time=99, chunksize=10) + assert df.shape == (10, 2) + assert df["num"][0] == 10049 + assert df["num"][9] == 10058 + + ## 6. read with time scale and iterator + chunk_num = 0 + with ts.read_tsfile(FILE_NAME, TABLE_NAME, ["num"], start_time=50, end_time=99, iterator=True, chunksize=10) as reader: + for chunk in reader: + assert chunk.shape == (10, 2) + assert chunk["num"][0] == 10049 + chunk_num * 10 + assert chunk["num"][9] == 10058 + chunk_num * 10 + chunk_num += 1 + assert chunk_num == 5 + +if __name__ == '__main__': + if os.path.exists(DATA_PATH): + print("Remove old data") + shutil.rmtree(DATA_PATH) + os.makedirs(DATA_PATH) + else: + os.makedirs(DATA_PATH) + test_write_tsfile() + test_read_tsfile() + print("All tests passed") + shutil.rmtree(DATA_PATH) \ No newline at end of file diff --git a/python/tsfile/__init__.py b/python/tsfile/__init__.py new file mode 100644 index 000000000..b428537a3 --- /dev/null +++ b/python/tsfile/__init__.py @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +from .tsfile import read_tsfile, write_tsfile \ No newline at end of file diff --git a/python/tsfile/tsfile.pxd b/python/tsfile/tsfile.pxd new file mode 100644 index 000000000..a7b86cf53 --- /dev/null +++ b/python/tsfile/tsfile.pxd @@ -0,0 +1,100 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +#cython: language_level=3 +cdef extern from "./TsFile-cwrapper.h": + # common + ctypedef int ErrorCode + ctypedef long long timestamp + ctypedef long long SchemaInfo + + + + # for read data from tsfile + ctypedef void* CTsFileReader + ctypedef void* TsFileRowData + ctypedef void* QueryDataRetINTERNAL + ctypedef void* TimeFilterExpression + + cdef struct query_data_ret: + char** column_names + int column_num + QueryDataRetINTERNAL data + + ctypedef query_data_ret* QueryDataRet + + + # for writer data to tsfile + ctypedef void* CTsFileWriter + cdef struct column_schema: + char* name + SchemaInfo column_def + ctypedef column_schema ColumnSchema + + cdef struct TableSchema: + char* table_name + ColumnSchema** column_schema + int column_num + + cdef struct Tablet: + char* table_name + ColumnSchema** column_schema + int column_num + timestamp* times + bint** bitmap + void** value + int cur_num + int max_capacity + + ctypedef Tablet DataResult + + # Function Declarations + # reader:tsfile reader + CTsFileReader ts_reader_open(const char* path, ErrorCode* err_code) + ErrorCode ts_reader_close(CTsFileReader reader) + + # writer:tsfile writer + CTsFileWriter ts_writer_open(const char* path, ErrorCode* err_code) + ErrorCode ts_writer_close(CTsFileWriter writer) + + + # read tsfile data + QueryDataRet ts_reader_begin_end(CTsFileReader reader, const char* table_name, + char** columns, int colum_num, timestamp start_time, timestamp end_time) + QueryDataRet ts_reader_read(CTsFileReader reader, const char* table_name, + char** columns, int colum_num) + DataResult* ts_next(QueryDataRet data, int expect_line_count) + ErrorCode destory_query_dataret(QueryDataRet query_data_set) + ErrorCode destory_tablet(Tablet* tablet) + + # writer tsfile data + ErrorCode tsfile_register_table(CTsFileWriter writer, TableSchema* schema) + ErrorCode tsfile_register_table_column(CTsFileWriter writer, const char* table_name, ColumnSchema* schema) + TsFileRowData create_tsfile_row(const char* table_name, timestamp timestamp, int column_length) + ErrorCode insert_data_into_tsfile_row_int32(TsFileRowData row_data, char* column_name, int value) + ErrorCode insert_data_into_tsfile_row_int64(TsFileRowData row_data, char* column_name, long long value) + ErrorCode insert_data_into_tsfile_row_float(TsFileRowData row_data, char* column_name, float value) + ErrorCode insert_data_into_tsfile_row_double(TsFileRowData row_data, char* column_name, double value) + ErrorCode insert_data_into_tsfile_row_boolean(TsFileRowData row_data, char* column_name, bint value) + ErrorCode tsfile_write_row_data(CTsFileWriter writer, TsFileRowData data); + ErrorCode tsfile_flush_data(CTsFileWriter writer) + ErrorCode destory_tsfile_row(TsFileRowData data) + + + + \ No newline at end of file diff --git a/python/tsfile/tsfile.py b/python/tsfile/tsfile.py new file mode 100644 index 000000000..1b83029d5 --- /dev/null +++ b/python/tsfile/tsfile.py @@ -0,0 +1,138 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import os + +from .tsfile_pywrapper import tsfile_reader, tsfile_writer +from typing import overload +from pandas import DataFrame + +TIMESTAMP_STR = "Time" + + +# default case -> Dataframe +@overload +def read_tsfile( + file_path: str, + table_name: str, + columns: list[str] | str, +) -> DataFrame: ... + + +# case with filter -> Dataframe +@overload +def read_tsfile( + file_path: str, + table_name: str, + columns: list[str] | str, + filter: str, + start_time: int, + end_time: int, +) -> DataFrame: ... + + +# chunksize = int -> Dataframe +@overload +def read_tsfile( + file_path: str, + table_name: str, + columns: list[str] | str, + chunksize: int, +) -> DataFrame: ... + + +@overload +def read_tsfile( + file_path: str, + table_name: str, + columns: list[str] | str, + filter: str, + start_time: int, + end_time: int, + chunksize: int, +) -> DataFrame: ... + + +# iterator = True -> Iterator +@overload +def read_tsfile( + file_path: str, + table_name: str, + columns: list[str] | str, + iterator: bool, + chunksize: int, +) -> tsfile_reader: ... + + +@overload +def read_tsfile( + file_path: str, + table_name: str, + columns: list[str] | str, + start_time: int, + end_time: int, + iterator: bool, + chunksize: int, +) -> tsfile_reader: ... + + +def read_tsfile( + file_path: str, + table_name: str, + columns: list[str] | str, + start_time: int = None, + end_time: int = None, + chunksize: int = None, + iterator: bool = False, +) -> DataFrame | tsfile_reader: + if not os.path.exists(file_path): + raise FileNotFoundError(f"File '{file_path}' does not exist") + if os.path.getsize(file_path) == 0: + raise ValueError(f"File '{file_path}' is empty") + reader = tsfile_reader( + file_path, table_name, columns, start_time, end_time, chunksize + ) + if iterator: + return reader + else: + return reader.read_tsfile() + + +def write_tsfile( + file_path: str, + table_name: str, + data: DataFrame, +): + if data.empty: + return + column_names = data.columns.tolist() + column_types = data.dtypes + + if TIMESTAMP_STR not in column_names: + raise AttributeError("Time column is missing") + if column_types[TIMESTAMP_STR] != "int64": + raise TypeError("Time column must be of type int64") + allowed_types = {"int64", "int32", "bool", "float32", "float64"} + + for col, dtype in column_types.items(): + if dtype.name not in allowed_types: + raise TypeError( + f"Column '{col}' has an invalid type '{dtype}'." + ) + + writer = tsfile_writer(file_path) + writer.write_tsfile(table_name, data) diff --git a/python/tsfile/tsfile_pywrapper.pyx b/python/tsfile/tsfile_pywrapper.pyx new file mode 100644 index 000000000..ef9613d93 --- /dev/null +++ b/python/tsfile/tsfile_pywrapper.pyx @@ -0,0 +1,330 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +#cython: language_level=3 +from libc.string cimport strcpy +from libc.stdlib cimport malloc, free +import pandas as pd +from cpython.bytes cimport PyBytes_AsString +cimport numpy as cnp +import numpy as np +from .tsfile cimport * + +TIMESTAMP_STR = "Time" +TS_TYPE_INT32 = 1 << 8 +TS_TYPE_BOOLEAN = 1 << 9 +TS_TYPE_FLOAT = 1 << 10 +TS_TYPE_DOUBLE = 1 << 11 +TS_TYPE_INT64 = 1 << 12 +TS_TYPE_TEXT = 1 << 13 + +type_mapping = { + 'int32': TS_TYPE_INT32, + 'bool': TS_TYPE_BOOLEAN, + 'float32': TS_TYPE_FLOAT, + 'float64': TS_TYPE_DOUBLE, + 'int64': TS_TYPE_INT64 +} + +cdef class tsfile_reader: + + cdef CTsFileReader reader + cdef QueryDataRet ret + cdef int batch_size + + + def __init__(self, pathname, table_name, columns, start_time=None, end_time=None, batch_size=None): + self.open_reader(pathname) + self.query_data_ret(table_name, columns, start_time, end_time) + + if batch_size is not None: + self.batch_size = batch_size + else: + self.batch_size = -1 + + cdef open_reader(self, pathname): + cdef ErrorCode err_code + err_code = 0 + self.reader = ts_reader_open(pathname.encode('utf-8'), &err_code) + if (err_code != 0): + raise Exception("Failed to open tsfile: %s, %s" %( pathname, err_code)) + + cdef query_data_ret(self, table_name, columns, start_time = None, end_time=None): + cdef bytes py_table_name + cdef char** c_columns + py_table_name = table_name.encode('utf-8') + c_table_name = PyBytes_AsString(py_table_name) + if isinstance(columns, str): + columns = [columns] + + c_columns = malloc(len(columns) * sizeof(char*)) + if not c_columns: + raise MemoryError("Failed to allocate memory for columns") + + for i in range(len(columns)): + c_columns[i] = malloc(len(columns[i]) + 1) + if not c_columns[i]: + for j in range(i): + free(c_columns[j]) + free(c_columns) + raise MemoryError("Failed to allocate memory for columns") + column_binary = columns[i].encode('utf-8') + column = PyBytes_AsString(column_binary) + strcpy(c_columns[i], column) + # query data from tsfile + if start_time is not None or end_time is not None: + if start_time is None: + start_time = -1 + if end_time is None: + end_time = -1 + self.ret = ts_reader_begin_end(self.reader, c_table_name, c_columns, len(columns), start_time, end_time) + else: + self.ret = ts_reader_read(self.reader, table_name.encode('utf-8'), c_columns, len(columns)) + + + def read_tsfile(self): + # open tsfile to read + res = pd.DataFrame() + if self.batch_size == -1: + self.batch_size = 1024 + while True: + chunk = self.get_next_dataframe() + if chunk is not None: + res = pd.concat([res, chunk]) + else: + break + else: + res = self.get_next_dataframe() + self.free_resources() + return res + + def __iter__(self): + return self + + def __next__(self): + res = self.get_next_dataframe() + if res is None: + raise StopIteration + return res + + def get_next_dataframe(self): + cdef: + DataResult* result + ColumnSchema* schema = NULL + cnp.ndarray[cnp.int64_t, ndim=1, mode='c'] np_array_i64 + cnp.ndarray[cnp.int32_t, ndim=1, mode='c'] np_array_i32 + cnp.ndarray[cnp.float32_t, ndim=1, mode='c'] np_array_float + cnp.ndarray[cnp.float64_t, ndim=1, mode='c'] np_array_double + cnp.ndarray[bint, ndim=1, mode='c'] np_array_bool + cnp.npy_intp length + bint has_null + bytes pystr + str py_string + + res = {} + column_order = [] + + # Time column will be the first column + column_order.append(TIMESTAMP_STR) + + for i in range(self.ret.column_num): + pystr = self.ret.column_names[i] + py_string = pystr.decode('utf-8') + column_order.append(py_string) + res[py_string] = [] + + res[TIMESTAMP_STR] = [] + + if self.ret.data == NULL: + return None + + result = ts_next(self.ret, self.batch_size) + + # there is no data meet our requirement + if result.column_schema == NULL: + # free memory + if (destory_tablet(result) != 0): + raise Exception("Failed to destroy tablet") + return None + + # time column + length = result.cur_num + 1 + cdef cnp.ndarray[cnp.int64_t, ndim=1, mode='c'] data_array = \ + cnp.PyArray_SimpleNewFromData(1, &length, cnp.NPY_INT64, result.times) + res[TIMESTAMP_STR] = np.array(data_array, dtype = np.int64) + + for i in range(result.column_num): + + # column name + schema = result.column_schema[i] + pystr = schema.name + column_name = pystr.decode('utf-8') + + # column bitmap + is_not_null = np.empty(length, dtype = bool) + bool_ptr = result.bitmap[i] + has_null = False + for j in range(length): + is_not_null[j] = bool_ptr[j] != 0 + if bool_ptr[j] == 0 and ~has_null: + has_null = True + + + if schema.column_def == TS_TYPE_INT32: + np_array_i32 = cnp.PyArray_SimpleNewFromData(1, &length, cnp.NPY_INT32, result.value[i]) + arr = np.array(np_array_i32, dtype = np.int32) + + elif schema.column_def == TS_TYPE_BOOLEAN: + arr_bool_ = np.empty(length, dtype=np.bool_) + bool_ptr = result.value[i] + for j in range(length): + arr_bool_[j] = bool_ptr[j] != 0 + arr = np.array(arr_bool_, dtype = np.bool_) + + elif schema.column_def == TS_TYPE_FLOAT: + np_array_float = cnp.PyArray_SimpleNewFromData(1, &length, cnp.NPY_FLOAT32, result.value[i]) + arr = np.array(np_array_float, dtype = np.float32) + arr = np.where(is_not_null, arr, np.nan) + res[column_name]=arr + continue + + elif schema.column_def == TS_TYPE_DOUBLE: + np_array_double = cnp.PyArray_SimpleNewFromData(1, &length, cnp.NPY_FLOAT64, result.value[i]) + arr= np.array(np_array_double, dtype = np.float64) + arr = np.where(is_not_null, arr, np.nan) + res[column_name]=arr + continue + + elif schema.column_def == TS_TYPE_INT64: + np_array_i64 = cnp.PyArray_SimpleNewFromData(1, &length, cnp.NPY_INT64, result.value[i]) + arr = np.array(np_array_i64, dtype = np.int64) + else: + raise Exception("UnSupport column type") + + if has_null: + tmp_array = np.full(length, np.nan, np.float64) + tmp_array[is_not_null] = arr[is_not_null] + if schema.column_def == TS_TYPE_INT32: + arr = pd.Series(tmp_array).astype('Int32') + elif schema.column_def == TS_TYPE_BOOLEAN: + arr = pd.Series(tmp_array).astype(np.bool_) + elif schema.column_def == TS_TYPE_INT64: + arr = pd.Series(tmp_array).astype('Int64') + + res[column_name] = arr + if (destory_tablet(result) != 0): + raise Exception("Failed to destroy tablet") + return pd.DataFrame(res, columns = column_order) + + def __dealloc__(self): + self.free_resources() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.free_resources() + + cdef free_resources(self): + if self.reader: + if ts_reader_close(self.reader) != 0 : + raise Exception("Failed to close tsfile") + if self.ret: + if destory_query_dataret(self.ret) != 0: + raise Exception("Failed to free query data ret") + self.reader = NULL + self.ret = NULL + self.batch_size = -1 + +cdef class tsfile_writer: + cdef CTsFileWriter writer + cdef TsFileRowData row_data + + def __init__(self, pathname): + self.open_writer(pathname) + + + cdef open_writer(self, pathname): + cdef ErrorCode err_code + err_code = 0 + self.writer = ts_writer_open(pathname.encode('utf-8'), &err_code) + if (err_code != 0): + raise Exception("Failed to open tsfile: %s, %s" %( pathname, err_code)) + + def resister_timeseries(self, table_name, column_name, data_type): + cdef char* c_columns + cdef bytes py_table_name + cdef ColumnSchema schema + cdef bytes encoded_column_name = column_name.encode('utf-8') + py_table_name = table_name.encode('utf-8') + c_table_name = PyBytes_AsString(py_table_name) + schema.name = encoded_column_name + schema.column_def = data_type + if tsfile_register_table_column(self.writer, c_table_name, &schema) != 0: + raise Exception("Failed to register timeseries") + cdef create_row_data(self, table_name, time, column_length): + self.row_data = create_tsfile_row(table_name.encode('utf-8'), time, column_length) + def write_into_row_data(self, column_name, value, type): + cdef char* c_column_name = PyBytes_AsString(column_name.encode('utf-8')) + if type == TS_TYPE_INT32: + insert_data_into_tsfile_row_int32(self.row_data, c_column_name, value) + elif type == TS_TYPE_BOOLEAN: + insert_data_into_tsfile_row_boolean(self.row_data, c_column_name, value) + elif type == TS_TYPE_FLOAT: + insert_data_into_tsfile_row_float(self.row_data, c_column_name, value) + elif type == TS_TYPE_DOUBLE: + insert_data_into_tsfile_row_double(self.row_data, c_column_name, value) + elif type == TS_TYPE_INT64: + insert_data_into_tsfile_row_int64(self.row_data, c_column_name, value) + else: + raise TypeError("Unknown column type") + def write_tsfile(self, table_name, df): + column_names = df.columns.tolist() + column_types = df.dtypes + column_ctypes = [] + for i in range(len(column_names)): + column_type = column_types[i].name + if column_type in type_mapping: + column_ctypes.append(type_mapping[column_type]) + else: + raise TypeError("Unknown column type") + + if (column_names[i] != TIMESTAMP_STR): + self.resister_timeseries(table_name, column_names[i], column_ctypes[i]) + + + for i in range(len(df)): + time = df.iloc[i][TIMESTAMP_STR] + self.create_row_data(table_name, time, len(column_names)) + for j in range(1, len(column_names)): + column_name = column_names[j] + column_value = df.iloc[i][column_name] + column_ctype = column_ctypes[j] + self.write_into_row_data(column_name, column_value, column_ctype) + if tsfile_write_row_data(self.writer, self.row_data) != 0: + raise Exception("Failed to write row data") + + if tsfile_flush_data(self.writer) != 0: + raise Exception("Failed to flush data") + self.row_data = NULL + self.__exit__(None, None, None) + def __exit__(self, exc_type, exc_value, traceback): + if self.writer != NULL: + if ts_writer_close(self.writer) != 0: + raise Exception("Failed to close tsfile") + self.writer = NULL