Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert BYTES DataType to DECIMAL in Apache Pinot #14510

Open
rajat-sr1704 opened this issue Nov 21, 2024 · 2 comments
Open

Convert BYTES DataType to DECIMAL in Apache Pinot #14510

rajat-sr1704 opened this issue Nov 21, 2024 · 2 comments

Comments

@rajat-sr1704
Copy link

I am working on ingesting realtime data from apache kafka topic. The data is stored in Avro format with Schema Registry, I want to ingest the real time data into apache pinot. As the data in Avro is in nested format I am using JSONPATH(query, 'query_field') to extract data from the nested data. All the data are coming correctly but cost, tax, cod_charges, entry_tax these fields are stored in avro topic in BYTES data type, the actual data type for these fields are integer in the original dataset and it got converted into bytes while ingesting into kafka topic as avro format. I want to convert it again into decimal or integer format while ingesting into apache pinot, there's no function for extracting bytes data type from the nested data topic.

my ingestionConfig looks like:

"ingestionConfig": { "transformConfigs": [ { "columnName": "id", "transformFunction": "JSONPATHLONG(after, '$.id')" }, { "columnName": "order_id", "transformFunction": "JSONPATHLONG(after, '$.order_id')" }, { "columnName": "company_id", "transformFunction": "JSONPATHLONG(after, '$.company_id')" }, { "columnName": "channel_id", "transformFunction": "JSONPATHLONG(after, '$.channel_id')" }, { "columnName": "invoice_no", "transformFunction": "JSONPATHSTRING(after, '$.invoice_no')" }, { "columnName": "encrypt_invoice_name", "transformFunction": "JSONPATHSTRING(after, '$.encrypt_invoice_name')" }, { "columnName": "courier", "transformFunction": "JSONPATHSTRING(after, '$.courier')" }, { "columnName": "sr_courier_id", "transformFunction": "JSONPATHLONG(after, '$.sr_courier_id')" }, { "columnName": "code", "transformFunction": "JSONPATHSTRING(after, '$.code')" }, { "columnName": "awb", "transformFunction": "JSONPATHSTRING(after, '$.awb')" }, { "columnName": "pickup_token_number", "transformFunction": "JSONPATHSTRING(after, '$.pickup_token_number')" }, { "columnName": "dhl_handover_id", "transformFunction": "JSONPATHSTRING(after, '$.dhl_handover_id')" }, { "columnName": "dhl_handover_url", "transformFunction": "JSONPATHSTRING(after, '$.dhl_handover_url')" }, { "columnName": "pickup_address_id", "transformFunction": "JSONPATHLONG(after, '$.pickup_address_id')" }, { "columnName": "pickup_reshedule_count", "transformFunction": "JSONPATHDOUBLE(after, '$.pickup_reshedule_count')" }, { "columnName": "return_pickup_address_id", "transformFunction": "JSONPATHDOUBLE(after, '$.return_pickup_address_id')" }, { "columnName": "dhl_pickup_url", "transformFunction": "JSONPATHSTRING(after, '$.dhl_pickup_url')" }, { "columnName": "method", "transformFunction": "JSONPATHSTRING(after, '$.method')" }, { "columnName": "channel_shipment_id", "transformFunction": "JSONPATHSTRING(after, '$.channel_shipment_id')" }, { "columnName": "weight", "transformFunction": "JSONPATHSTRING(after, '$.weight')" }, { "columnName": "dimensions", "transformFunction": "JSONPATHSTRING(after, '$.dimensions')" }, { "columnName": "volumetric_weight", "transformFunction": "JSONPATHSTRING(after, '$.volumetric_weight')" }, { "columnName": "quantity", "transformFunction": "JSONPATHDOUBLE(after, '$.quantity')" }, { "columnName": "status", "transformFunction": "JSONPATHDOUBLE(after, '$.status')" }, { "columnName": "state_type", "transformFunction": "JSONPATHDOUBLE(after, '$.state_type')" }, { "columnName": "sub_status", "transformFunction": "JSONPATHDOUBLE(after, '$.sub_status')" }, { "columnName": "status_code", "transformFunction": "JSONPATHSTRING(after, '$.status_code')" }, { "columnName": "shipment_zone", "transformFunction": "JSONPATHDOUBLE(after, '$.shipment_zone')" }, { "columnName": "label_url", "transformFunction": "JSONPATHSTRING(after, '$.label_url')" }, { "columnName": "manifest_url", "transformFunction": "JSONPATHSTRING(after, '$.manifest_url')" }, { "columnName": "is_locked", "transformFunction": "JSONPATHSTRING(after, '$.is_locked')" }, { "columnName": "customer_gstin", "transformFunction": "JSONPATHSTRING(after, '$.customer_gstin')" }, { "columnName": "eway_bill_number", "transformFunction": "JSONPATHSTRING(after, '$.eway_bill_number')" }, { "columnName": "pod", "transformFunction": "JSONPATHSTRING(after, '$.pod')" }, { "columnName": "frozen_weight", "transformFunction": "JSONPATHDOUBLE(after, '$.frozen_weight')" }, { "columnName": "isd_code", "transformFunction": "JSONPATHSTRING(after, '$.isd_code')" }, { "columnName": "seller_address", "transformFunction": "JSONPATHSTRING(after, '$.seller_address')" }, { "columnName": "shipping_address", "transformFunction": "JSONPATHSTRING(after, '$.shipping_address')" }, { "columnName": "customer_details", "transformFunction": "JSONPATHSTRING(after, '$.customer_details')" }, { "columnName": "comment", "transformFunction": "JSONPATHSTRING(after, '$.comment')" }, { "columnName": "others", "transformFunction": "JSONPATHSTRING(after, '$.others')" }, { "columnName": "entry_tax", "transformFunction": "JSONPATHDOUBLE(after, '$.entry_tax')" }, { "columnName": "cost", "transformFunction": "JSONPATHDOUBLE(after, '$.cost')" }, { "columnName": "tax", "transformFunction": "JSONPATHDOUBLE(after, '$.tax')" }, { "columnName": "cod_charges", "transformFunction": "JSONPATHDOUBLE(after, '$.cod_charges')" }, { "columnName": "total", "transformFunction": "JSONPATHDOUBLE(after, '$.total')" }, { "columnName": "invoice_date", "transformFunction": "JSONPATHLONG(after, '$.invoice_date')" }, { "columnName": "awb_assign_date", "transformFunction": "JSONPATHLONG(after, '$.awb_assign_date')" }, { "columnName": "pickup_generated_date", "transformFunction": "JSONPATHLONG(after, '$.pickup_generated_date')" }, { "columnName": "pickup_scheduled_date", "transformFunction": "JSONPATHLONG(after, '$.pickup_scheduled_date')" }, { "columnName": "out_for_pickup_date", "transformFunction": "JSONPATHSTRING(after, '$.out_for_pickup_date')" }, { "columnName": "created_at", "transformFunction": "JSONPATHSTRING(after, '$.created_at')" }, { "columnName": "updated_at", "transformFunction": "JSONPATHSTRING(after, '$.updated_at')" }, { "columnName": "rto_initiated_date", "transformFunction": "JSONPATHSTRING(after, '$.rto_initiated_date')" }, { "columnName": "rto_delivered_date", "transformFunction": "JSONPATHSTRING(after, '$.rto_delivered_date')" }, { "columnName": "updated_on", "transformFunction": "JSONPATHSTRING(after, '$.updated_on')" }, { "columnName": "etd", "transformFunction": "JSONPATHLONG(after, '$.etd')" }, { "columnName": "promised_pickup_tat", "transformFunction": "JSONPATHDOUBLE(after, '$.promised_pickup_tat')" }, { "columnName": "promised_delivery_tat", "transformFunction": "JSONPATHDOUBLE(after, '$.promised_delivery_tat')" }, { "columnName": "promised_rto_tat", "transformFunction": "JSONPATHDOUBLE(after, '$.promised_rto_tat')" }, { "columnName": "promised_cod_remittance_tat", "transformFunction": "JSONPATHDOUBLE(after, '$.promised_cod_remittance_tat')" }, { "columnName": "shipped_date", "transformFunction": "JSONPATHSTRING(after, '$.shipped_date')" }, { "columnName": "delivered_date", "transformFunction": "JSONPATHSTRING(after, '$.delivered_date')" }, { "columnName": "returned_date", "transformFunction": "JSONPATHSTRING(after, '$.returned_date')" }, { "columnName": "eway_bill_date", "transformFunction": "JSONPATHSTRING(after, '$.returned_date')" }, { "columnName": "mps_data", "transformFunction": "JSONPATHSTRING(after, '$.returned_date')" } ],

and my schemaFile.json:

{ "schemaName": "shipments", "enableColumnBasedNullHandling": true, "dimensionFieldSpecs": [ { "name": "id", "dataType": "LONG" }, { "name": "order_id", "dataType": "LONG" }, { "name": "company_id", "dataType": "LONG" }, { "name": "channel_id", "dataType": "LONG" }, { "name": "invoice_no", "dataType": "STRING" }, { "name": "encrypt_invoice_name", "dataType": "STRING" }, { "name": "courier", "dataType": "STRING" }, { "name": "sr_courier_id", "dataType": "INT" }, { "name": "code", "dataType": "STRING" }, { "name": "awb", "dataType": "STRING" }, { "name": "pickup_token_number", "dataType": "STRING" }, { "name": "pickup_address_id", "dataType": "LONG" }, { "name": "return_pickup_address_id", "dataType": "INT" }, { "name": "dhl_handover_id", "dataType": "STRING" }, { "name": "dhl_handover_url", "dataType": "STRING" }, { "name": "pickup_reshedule_count", "dataType": "INT" }, { "name": "dhl_pickup_url", "dataType": "STRING" }, { "name": "method", "dataType": "STRING" }, { "name": "channel_shipment_id", "dataType": "STRING" }, { "name": "weight", "dataType": "STRING" }, { "name": "dimensions", "dataType": "STRING" }, { "name": "volumetric_weight", "dataType": "STRING" }, { "name": "quantity", "dataType": "INT" }, { "name": "status", "dataType": "INT" }, { "name": "state_type", "dataType": "INT" }, { "name": "sub_status", "dataType": "INT" }, { "name": "status_code", "dataType": "STRING" }, { "name": "shipment_zone", "dataType": "INT" }, { "name": "label_url", "dataType": "STRING" }, { "name": "manifest_url", "dataType": "STRING" }, { "name": "is_locked", "dataType": "INT" }, { "name": "customer_gstin", "dataType": "STRING" }, { "name": "eway_bill_number", "dataType": "STRING" }, { "name": "pod", "dataType": "STRING" }, { "name": "frozen_weight", "dataType": "INT" }, { "name": "isd_code", "dataType": "STRING" }, { "name": "seller_address", "dataType": "STRING" }, { "name": "shipping_address", "dataType": "STRING" }, { "name": "customer_details", "dataType": "STRING" }, { "name": "comment", "dataType": "STRING" }, { "name": "others", "dataType": "STRING" }, { "name": "mps_data", "dataType": "STRING" } ], "metricFieldSpecs": [ { "name": "entry_tax", "dataType": "DOUBLE", "defaultNullValue": null }, { "name": "cost", "dataType": "DOUBLE", "defaultNullValue": null }, { "name": "tax", "dataType": "DOUBLE", "defaultNullValue": null }, { "name": "cod_charges", "dataType": "DOUBLE", "defaultNullValue": null }, { "name": "total", "dataType": "DOUBLE", "defaultNullValue": null } ], "dateTimeFieldSpecs": [ { "name": "invoice_date", "dataType": "LONG", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:MILLISECONDS" }, { "name": "awb_assign_date", "dataType": "LONG", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:MILLISECONDS" }, { "name": "pickup_generated_date", "dataType": "LONG", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:MILLISECONDS" }, { "name": "pickup_scheduled_date", "dataType": "LONG", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:MILLISECONDS" }, { "name": "out_for_pickup_date", "dataType": "STRING", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:DAYS" }, { "name": "created_at", "dataType": "STRING", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:DAYS" }, { "name": "updated_at", "dataType": "STRING", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:DAYS" }, { "name": "rto_initiated_date", "dataType": "STRING", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:DAYS" }, { "name": "rto_delivered_date", "dataType": "STRING", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:DAYS" }, { "name": "updated_on", "dataType": "STRING", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:DAYS" }, { "name": "etd", "dataType": "LONG", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:MILLISECONDS" }, { "name": "promised_pickup_tat", "dataType": "STRING", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:DAYS" }, { "name": "promised_delivery_tat", "dataType": "STRING", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:DAYS" }, { "name": "promised_rto_tat", "dataType": "STRING", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:DAYS" }, { "name": "promised_cod_remittance_tat", "dataType": "STRING", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:DAYS" }, { "name": "shipped_date", "dataType": "STRING", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:DAYS" }, { "name": "delivered_date", "dataType": "STRING", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:DAYS" }, { "name": "returned_date", "dataType": "STRING", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:DAYS" }, { "name": "eway_bill_date", "dataType": "STRING", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:DAYS" } ] }

Please let me know if you need anything to debug this issue.

I have tried making it into Double type data type and then using JSONPATHDOUBLE but the data that is coming to pinot is all null.
bZfoEV3U

@Jackie-Jiang
Copy link
Contributor

How do the values being converted from bytes?
There are 2 solutions:

  1. Do not convert them into bytes from first place
  2. Understand the conversion algorithm, and use a UDF to convert it back during ingestion

cc @swaminathanmanish @KKcorps

@rajat-sr1704
Copy link
Author

It is converted into Bytes using Kafka Connect's "connect.default": "\u0000", "connect.name": "org.apache.kafka.connect.data.Decimal", "logicalType": "decimal" . It is converted into Bytes for some other reason cannot stop that process as it will cause to re run all the clusters in kafka to store data.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants