Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Broken metadata values encoding #56

Closed
slavirok opened this issue Jul 10, 2019 · 13 comments
Closed

Broken metadata values encoding #56

slavirok opened this issue Jul 10, 2019 · 13 comments

Comments

@slavirok
Copy link

slavirok commented Jul 10, 2019

Description

We have been using MirrorMaker to copy data from EventHub to Kafka for a while. Everything worked well since then except for one thing.

When consuming messages from Kafka we noticed that Header values got weird encoding. Please see the screenshot below.

Screen Shot 2019-07-10 at 16 53 33

P.S. I skipped the checklist because I don't think it would bring any value.

@arerlend
Copy link
Contributor

Thanks for the report @slavirok, I'll look into it.

@arerlend
Copy link
Contributor

Can you paste your MirrorMaker configs? I'll set up a repro. I wonder if this may be IoT Hub sending in a specific encoding.

@arerlend
Copy link
Contributor

Yup - if you have an EH producer adding headers to AMQP messages (e.g. IoT Hub enriching messages), the message headers will be AMQP encoded when read by a Kafka consumer. EH is encoding-agnostic, we just pass around bytes. I'll still do a repro on my own, but you should try doing an AMQP decoding on the headers.

@slavirok
Copy link
Author

Can you paste your MirrorMaker configs? I'll set up a repro. I wonder if this may be IoT Hub sending in a specific encoding.

Producer:

max.in.flight.requests.per.connection=1
security.protocol=ssl
ssl.keystore.type=PKCS12
ssl.truststore.type=JKS
client.id=${KAFKA_CLIENT_ID}
bootstrap.servers=${KAFKA_BOOTSTRAP_SERVER}
ssl.keystore.location=${KAFKA_SSL_KEYSTORE_LOCATION}
ssl.truststore.location=${KAFKA_SSL_TRUSTSTORE_LOCATION}
ssl.keystore.password=${KAFKA_SSL_PASSWORD}
ssl.key.password=${KAFKA_SSL_PASSWORD}
ssl.truststore.password=${KAFKA_SSL_PASSWORD}

Consumer:

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${DOLLAR}ConnectionString" password="${EVENTHUB_CONNECTION_STRING}";
group.id=${EVENTHUB_CLIENT_ID}
client.id=${EVENTHUB_CLIENT_ID}
exclude.internal.topics=true
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
auto.offset.reset=earlies

@slavirok
Copy link
Author

@arerlend, thanks for reply.

Yup - if you have an EH producer adding headers to AMQP messages (e.g. IoT Hub enriching messages), the message headers will be AMQP encoded when read by a Kafka consumer. EH is encoding-agnostic, we just pass around bytes. I'll still do a repro on my own, but you should try doing an AMQP decoding on the headers.

By any chance do you have an example how AMQP decoding on the headers looks like?

@arerlend
Copy link
Contributor

Sorry about taking a while to respond, that is a good question...

Here's the spec - http://docs.oasis-open.org/amqp/core/v1.0/amqp-core-types-v1.0.xml

But I don't know if that's the most helpful answer....

@xinchen10
Copy link
Member

The .Net AMQP library (https://www.nuget.org/packages/Microsoft.Azure.Amqp/) supports it. This is one example.

using System;
using Microsoft.Azure.Amqp;
using Microsoft.Azure.Amqp.Encoding;

namespace ConsoleApplication1
{
    class Program
    {
        static void Main(string[] args)
        {
            byte[] bytes = { 0xa1, 0x05, 0x48, 0x65, 0x6c, 0x6c, 0x6f };
            Console.WriteLine(Decode(bytes));
        }

        static object Decode(byte[] bytes)
        {
            return AmqpEncoding.DecodeObject(new ByteBuffer(bytes, 0, bytes.Length));
        }
    }
}

If you use Java, take a look at the proton-j package (https://mvnrepository.com/artifact/org.apache.qpid/proton-j). Using the DecoderImpl class you should be able to read an object from a buffer.

@arerlend
Copy link
Contributor

I see, thanks Xin.

Link to docs for proton-j decoder class - https://qpid.apache.org/releases/qpid-proton-j-0.33.1/api/index.html

@slavirok
Copy link
Author

Thanks for your help, @arerlend, @xinchen10.

In case someone is interested, this is Java code to decode Kafka header values using proton-j library.

  DecoderImpl decoder = new DecoderImpl();
  EncoderImpl encoder = new EncoderImpl(decoder);
  AMQPDefinedTypes.registerAllTypes(decoder, encoder);

  decoder.setByteBuffer(ByteBuffer.wrap(header.value()));
  Object obj = decoder.readObject();
  String result = obj.toString();```

@JimThorstad
Copy link

Does anyone know of a way to decode these values in a Python UDF? We have a use case where we can't run the scala example.

@seb-emmot
Copy link

Does anyone know of a way to decode these values in a Python UDF? We have a use case where we can't run the scala example.

Have you looked into the pamqp package? https://github.com/gmr/pamqp
I myself decided to go for the Scala impl, but if you can't go that way, look into the above package.

@epa095
Copy link

epa095 commented Feb 15, 2023

If someone happends to want to decode AMQP encoded strings in spark (e.g. IoT hub headers), I found that this worked (as long as you know that it is strings, since all it does is ditch the typing information in the first bytes):

def decode_amqp_str(val_col):
  """Amqp adds some bytes of type-metadata. We ditch it and parses the rest as a string"""
  #2147483647:   https://stackoverflow.com/questions/57867088/pyspark-substr-without-length
  return F.substring(val_col, 3,2147483647).cast("string")

@mike-schenk
Copy link

If someone happends to want to decode AMQP encoded strings in spark (e.g. IoT hub headers), I found that this worked (as long as you know that it is strings, since all it does is ditch the typing information in the first bytes):

def decode_amqp_str(val_col):
  """Amqp adds some bytes of type-metadata. We ditch it and parses the rest as a string"""
  #2147483647:   https://stackoverflow.com/questions/57867088/pyspark-substr-without-length
  return F.substring(val_col, 3,2147483647).cast("string")

Here are some slightly more robust ones

@udf("string")
def string_from_amqp(val: bytes):
    # See https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#type-string
    if val == None:
        return None
    match val[0]:
        # An AMQP Null value
        case 0x40:
            return None
        # An AMQP string up to 2^8 - 1 octets worth of UTF-8 Unicode (with no byte order mark)
        case 0xa1:
            return val[2:].decode('UTF-8', 'strict')
        # an AMQP string up to 2^32 - 1 octets worth of UTF-8 Unicode (with no byte order mark)
        case 0xb1:
            return val[5:].decode('UTF-8', 'strict')

    #TODO: maybe this should fail here instead
    return val.hex()

@udf("boolean")
def bool_from_amqp(val: bytes):
    # See https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#type-boolean
    if val == None:
        return None
    match val[0]:
        case 0x56:
            if val[1] == 0x1: 
                return True
            return False
        case 0x41:
            return True
        case 0x42:
            return False

    #TODO: maybe this should fail here instead
    return None

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

8 participants