diff --git a/src/frontend/src/instance/opentsdb.rs b/src/frontend/src/instance/opentsdb.rs
index 1ac8fe029048..47bb940a1bb2 100644
--- a/src/frontend/src/instance/opentsdb.rs
+++ b/src/frontend/src/instance/opentsdb.rs
@@ -12,13 +12,13 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-use api::v1::InsertRequests;
 use async_trait::async_trait;
 use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
 use common_error::ext::BoxedError;
 use servers::error as server_error;
 use servers::error::AuthSnafu;
 use servers::opentsdb::codec::DataPoint;
+use servers::opentsdb::data_point_to_grpc_row_insert_requests;
 use servers::query_handler::OpentsdbProtocolHandler;
 use session::context::QueryContextRef;
 use snafu::prelude::*;
@@ -27,23 +27,27 @@ use crate::instance::Instance;
 
 #[async_trait]
 impl OpentsdbProtocolHandler for Instance {
-    async fn exec(&self, data_point: &DataPoint, ctx: QueryContextRef) -> server_error::Result<()> {
+    async fn exec(
+        &self,
+        data_points: Vec<DataPoint>,
+        ctx: QueryContextRef,
+    ) -> server_error::Result<usize> {
         self.plugins
             .get::<PermissionCheckerRef>()
             .as_ref()
             .check_permission(ctx.current_user(), PermissionReq::Opentsdb)
             .context(AuthSnafu)?;
 
-        let requests = InsertRequests {
-            inserts: vec![data_point.as_grpc_insert()],
-        };
-        let _ = self
-            .handle_inserts(requests, ctx)
+        let (requests, _) = data_point_to_grpc_row_insert_requests(data_points)?;
+        let output = self
+            .handle_row_inserts(requests, ctx)
             .await
             .map_err(BoxedError::new)
-            .with_context(|_| server_error::ExecuteQuerySnafu {
-                query: format!("{data_point:?}"),
-            })?;
-        Ok(())
+            .context(servers::error::ExecuteGrpcQuerySnafu)?;
+
+        Ok(match output {
+            common_query::Output::AffectedRows(rows) => rows,
+            _ => unreachable!(),
+        })
     }
 }
diff --git a/src/servers/src/http/opentsdb.rs b/src/servers/src/http/opentsdb.rs
index c5b90b42a438..054595252ad3 100644
--- a/src/servers/src/http/opentsdb.rs
+++ b/src/servers/src/http/opentsdb.rs
@@ -84,17 +84,19 @@ pub async fn put(
     let summary = params.contains_key("summary");
     let details = params.contains_key("details");
 
-    let data_points = parse_data_points(body).await?;
+    let data_point_requests = parse_data_points(body).await?;
+    let data_points = data_point_requests
+        .iter()
+        .map(|point| point.clone().into())
+        .collect::<Vec<_>>();
 
     let response = if !summary && !details {
-        for data_point in data_points.into_iter() {
-            if let Err(e) = opentsdb_handler.exec(&data_point.into(), ctx.clone()).await {
-                // Not debugging purpose, failed fast.
-                return error::InternalSnafu {
-                    err_msg: e.to_string(),
-                }
-                .fail();
+        if let Err(e) = opentsdb_handler.exec(data_points, ctx.clone()).await {
+            // Not debugging purpose, failed fast.
+            return error::InternalSnafu {
+                err_msg: e.to_string(),
             }
+            .fail();
         }
         (HttpStatusCode::NO_CONTENT, Json(OpentsdbPutResponse::Empty))
     } else {
@@ -108,15 +110,11 @@ pub async fn put(
             },
         };
 
-        for data_point in data_points.into_iter() {
-            let result = opentsdb_handler
-                .exec(&data_point.clone().into(), ctx.clone())
-                .await;
+        for (data_point, request) in data_points.into_iter().zip(data_point_requests) {
+            let result = opentsdb_handler.exec(vec![data_point], ctx.clone()).await;
             match result {
-                Ok(()) => response.on_success(),
-                Err(e) => {
-                    response.on_failed(data_point, e);
-                }
+                Ok(affected_rows) => response.on_success(affected_rows),
+                Err(e) => response.on_failed(request, e),
             }
         }
         (
@@ -151,8 +149,8 @@ pub struct OpentsdbDebuggingResponse {
 }
 
 impl OpentsdbDebuggingResponse {
-    fn on_success(&mut self) {
-        self.success += 1;
+    fn on_success(&mut self, affected_rows: usize) {
+        self.success += affected_rows as i32;
     }
 
     fn on_failed(&mut self, datapoint: DataPointRequest, error: impl ErrorExt) {
diff --git a/src/servers/src/opentsdb.rs b/src/servers/src/opentsdb.rs
index 61ed84167064..07cde1e14765 100644
--- a/src/servers/src/opentsdb.rs
+++ b/src/servers/src/opentsdb.rs
@@ -20,16 +20,20 @@ use std::future::Future;
 use std::net::SocketAddr;
 use std::sync::Arc;
 
+use api::v1::RowInsertRequests;
 use async_trait::async_trait;
 use common_runtime::Runtime;
 use common_telemetry::logging::error;
 use futures::StreamExt;
 use tokio::sync::broadcast;
 
+use self::codec::DataPoint;
 use crate::error::Result;
 use crate::opentsdb::connection::Connection;
 use crate::opentsdb::handler::Handler;
+use crate::prom_store::{FIELD_COLUMN_NAME, TIMESTAMP_COLUMN_NAME};
 use crate::query_handler::OpentsdbProtocolHandlerRef;
+use crate::row_writer::{self, MultiTableData};
 use crate::server::{AbortableStream, BaseTcpServer, Server};
 use crate::shutdown::Shutdown;
 
@@ -126,3 +130,38 @@ impl Server for OpentsdbServer {
         OPENTSDB_SERVER
     }
 }
+
+pub fn data_point_to_grpc_row_insert_requests(
+    data_points: Vec<DataPoint>,
+) -> Result<(RowInsertRequests, usize)> {
+    let mut multi_table_data = MultiTableData::new();
+
+    for mut data_point in data_points {
+        let tags: Vec<(String, String)> = std::mem::take(data_point.tags_mut());
+        let table_name = data_point.metric();
+        let value = data_point.value();
+        let timestamp = data_point.ts_millis();
+        // length of tags + 2 extra columns for greptime_timestamp and the value
+        let num_columns = tags.len() + 2;
+
+        let table_data = multi_table_data.get_or_default_table_data(table_name, num_columns, 1);
+        let mut one_row = table_data.alloc_one_row();
+
+        // tags
+        row_writer::write_tags(table_data, tags.into_iter(), &mut one_row)?;
+
+        // value
+        row_writer::write_f64(table_data, FIELD_COLUMN_NAME, value, &mut one_row)?;
+        // timestamp
+        row_writer::write_ts_millis(
+            table_data,
+            TIMESTAMP_COLUMN_NAME,
+            Some(timestamp),
+            &mut one_row,
+        )?;
+
+        table_data.add_row(one_row);
+    }
+
+    Ok(multi_table_data.into_row_insert_requests())
+}
diff --git a/src/servers/src/opentsdb/codec.rs b/src/servers/src/opentsdb/codec.rs
index 163e060adece..55e160460554 100644
--- a/src/servers/src/opentsdb/codec.rs
+++ b/src/servers/src/opentsdb/codec.rs
@@ -19,7 +19,7 @@ use crate::error::{self, Result};
 pub const OPENTSDB_TIMESTAMP_COLUMN_NAME: &str = "greptime_timestamp";
 pub const OPENTSDB_FIELD_COLUMN_NAME: &str = "greptime_value";
 
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct DataPoint {
     metric: String,
     ts_millis: i64,
@@ -115,6 +115,10 @@ impl DataPoint {
         &self.tags
     }
 
+    pub fn tags_mut(&mut self) -> &mut Vec<(String, String)> {
+        &mut self.tags
+    }
+
     pub fn ts_millis(&self) -> i64 {
         self.ts_millis
     }
diff --git a/src/servers/src/opentsdb/handler.rs b/src/servers/src/opentsdb/handler.rs
index 4cbe1731fe11..c260ccf1749f 100644
--- a/src/servers/src/opentsdb/handler.rs
+++ b/src/servers/src/opentsdb/handler.rs
@@ -94,7 +94,7 @@ impl<S: AsyncWrite + AsyncRead + Unpin> Handler<S> {
             match DataPoint::try_create(&line) {
                 Ok(data_point) => {
                     let _timer = timer!(crate::metrics::METRIC_TCP_OPENTSDB_LINE_WRITE_ELAPSED);
-                    let result = self.query_handler.exec(&data_point, ctx.clone()).await;
+                    let result = self.query_handler.exec(vec![data_point], ctx.clone()).await;
                     if let Err(e) = result {
                         self.connection.write_line(e.output_msg()).await?;
                     }
@@ -128,8 +128,8 @@ mod tests {
 
     #[async_trait]
     impl OpentsdbProtocolHandler for DummyQueryHandler {
-        async fn exec(&self, data_point: &DataPoint, _ctx: QueryContextRef) -> Result<()> {
-            let metric = data_point.metric();
+        async fn exec(&self, data_points: Vec<DataPoint>, _ctx: QueryContextRef) -> Result<usize> {
+            let metric = data_points.first().unwrap().metric();
             if metric == "should_failed" {
                 return error::InternalSnafu {
                     err_msg: "expected",
@@ -137,7 +137,7 @@ mod tests {
                 .fail();
             }
             self.tx.send(metric.to_string()).await.unwrap();
-            Ok(())
+            Ok(data_points.len())
         }
     }
 
diff --git a/src/servers/src/query_handler.rs b/src/servers/src/query_handler.rs
index ef8f74575e7c..4180c8469c8b 100644
--- a/src/servers/src/query_handler.rs
+++ b/src/servers/src/query_handler.rs
@@ -74,7 +74,7 @@ pub trait InfluxdbLineProtocolHandler {
 pub trait OpentsdbProtocolHandler {
     /// A successful request will not return a response.
     /// Only on error will the socket return a line of data.
-    async fn exec(&self, data_point: &DataPoint, ctx: QueryContextRef) -> Result<()>;
+    async fn exec(&self, data_points: Vec<DataPoint>, ctx: QueryContextRef) -> Result<usize>;
 }
 
 pub struct PromStoreResponse {
diff --git a/src/servers/tests/http/opentsdb_test.rs b/src/servers/tests/http/opentsdb_test.rs
index e77143d3b3a1..552ef6ecb940 100644
--- a/src/servers/tests/http/opentsdb_test.rs
+++ b/src/servers/tests/http/opentsdb_test.rs
@@ -51,7 +51,8 @@ impl GrpcQueryHandler for DummyInstance {
 
 #[async_trait]
 impl OpentsdbProtocolHandler for DummyInstance {
-    async fn exec(&self, data_point: &DataPoint, _ctx: QueryContextRef) -> Result<()> {
+    async fn exec(&self, data_points: Vec<DataPoint>, _ctx: QueryContextRef) -> Result<usize> {
+        let data_point = data_points.first().unwrap();
         if data_point.metric() == "should_failed" {
             return error::InternalSnafu {
                 err_msg: "expected",
@@ -59,7 +60,7 @@ impl OpentsdbProtocolHandler for DummyInstance {
             .fail();
         }
         let _ = self.tx.send(data_point.metric().to_string()).await;
-        Ok(())
+        Ok(data_points.len())
     }
 }
 
@@ -172,10 +173,7 @@ async fn test_opentsdb_put() {
     while let Ok(s) = rx.try_recv() {
         metrics.push(s);
     }
-    assert_eq!(
-        metrics,
-        vec!["m1".to_string(), "m2".to_string(), "m3".to_string()]
-    );
+    assert_eq!(metrics, vec!["m1".to_string(), "m2".to_string()]);
 }
 
 #[tokio::test]
diff --git a/src/servers/tests/opentsdb.rs b/src/servers/tests/opentsdb.rs
index 145fdc07dbe6..79ac2ba21939 100644
--- a/src/servers/tests/opentsdb.rs
+++ b/src/servers/tests/opentsdb.rs
@@ -37,8 +37,8 @@ struct DummyOpentsdbInstance {
 
 #[async_trait]
 impl OpentsdbProtocolHandler for DummyOpentsdbInstance {
-    async fn exec(&self, data_point: &DataPoint, _ctx: QueryContextRef) -> Result<()> {
-        let metric = data_point.metric();
+    async fn exec(&self, data_points: Vec<DataPoint>, _ctx: QueryContextRef) -> Result<usize> {
+        let metric = data_points.first().unwrap().metric();
         if metric == "should_failed" {
             return server_error::InternalSnafu {
                 err_msg: "expected",
@@ -47,7 +47,7 @@ impl OpentsdbProtocolHandler for DummyOpentsdbInstance {
         }
         let i = metric.parse::<i32>().unwrap();
         let _ = self.tx.send(i * i).await;
-        Ok(())
+        Ok(data_points.len())
     }
 }
 
diff --git a/tests-integration/src/opentsdb.rs b/tests-integration/src/opentsdb.rs
index 5d6338d94270..c5474ea4e240 100644
--- a/tests-integration/src/opentsdb.rs
+++ b/tests-integration/src/opentsdb.rs
@@ -46,6 +46,8 @@ mod tests {
 
     async fn test_exec(instance: &Arc<Instance>) {
         let ctx = QueryContext::arc();
+
+        // should create new table "my_metric_1" directly
         let data_point1 = DataPoint::new(
             "my_metric_1".to_string(),
             1000,
@@ -55,9 +57,8 @@ mod tests {
                 ("tagk2".to_string(), "tagv2".to_string()),
             ],
         );
-        // should create new table "my_metric_1" directly
-        instance.exec(&data_point1, ctx.clone()).await.unwrap();
 
+        // should create new column "tagk3" directly
         let data_point2 = DataPoint::new(
             "my_metric_1".to_string(),
             2000,
@@ -67,12 +68,12 @@ mod tests {
                 ("tagk3".to_string(), "tagv3".to_string()),
             ],
         );
-        // should create new column "tagk3" directly
-        instance.exec(&data_point2, ctx.clone()).await.unwrap();
 
-        let data_point3 = DataPoint::new("my_metric_1".to_string(), 3000, 3.0, vec![]);
         // should handle null tags properly
-        instance.exec(&data_point3, ctx.clone()).await.unwrap();
+        let data_point3 = DataPoint::new("my_metric_1".to_string(), 3000, 3.0, vec![]);
+
+        let data_points = vec![data_point1, data_point2, data_point3];
+        instance.exec(data_points, ctx.clone()).await.unwrap();
 
         let output = instance
             .do_query(
@@ -87,13 +88,13 @@ mod tests {
                 let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
                 let pretty_print = recordbatches.pretty_print().unwrap();
                 let expected = vec![
-                    "+---------------------+----------------+-------+-------+-------+",
-                    "| greptime_timestamp  | greptime_value | tagk1 | tagk2 | tagk3 |",
-                    "+---------------------+----------------+-------+-------+-------+",
-                    "| 1970-01-01T00:00:01 | 1.0            | tagv1 | tagv2 |       |",
-                    "| 1970-01-01T00:00:02 | 2.0            |       | tagv2 | tagv3 |",
-                    "| 1970-01-01T00:00:03 | 3.0            |       |       |       |",
-                    "+---------------------+----------------+-------+-------+-------+",
+                    "+-------+-------+----------------+---------------------+-------+",
+                    "| tagk1 | tagk2 | greptime_value | greptime_timestamp  | tagk3 |",
+                    "+-------+-------+----------------+---------------------+-------+",
+                    "| tagv1 | tagv2 | 1.0            | 1970-01-01T00:00:01 |       |",
+                    "|       | tagv2 | 2.0            | 1970-01-01T00:00:02 | tagv3 |",
+                    "|       |       | 3.0            | 1970-01-01T00:00:03 |       |",
+                    "+-------+-------+----------------+---------------------+-------+",
                 ]
                 .into_iter()
                 .join("\n");