Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Hive Support for S3 connector #237

Open
SandeepSolanki opened this issue Mar 29, 2019 · 26 comments
Open

Add Hive Support for S3 connector #237

SandeepSolanki opened this issue Mar 29, 2019 · 26 comments

Comments

@SandeepSolanki
Copy link

We have a use case to have Hive Support for S3 Connector on similar lines as the Hive Integration for the HDFS connector. Any thoughts on how the possible implementation for this would look like?

@grantatspothero
Copy link

+1 on this.

The only other service out there that can dump data from kafka -> s3 and create hive tables/partitions is https://github.com/pinterest/secor, and it is not maintained by pinterest longterm anymore as they rebuilt an internal service called merced to replace secor.

@kangtiann
Copy link

+1, we need this !

@brokenjacobs
Copy link

Emitting an event to a topic indicating a new partition has been created/finalized would go a long way toward this sort of integration.

@OneCricketeer
Copy link

Emitting an event to a topic indicating a new partition has been created/finalized would go a long way

Not perfect, but sounds like you want this

https://aws.amazon.com/blogs/big-data/data-lake-ingestion-automatically-partition-hive-external-tables-with-aws/

@grantatspothero
Copy link

grantatspothero commented Mar 11, 2020

Emitting an event to a topic indicating a new partition has been created/finalized would go a long way

Not perfect, but sounds like you want this

https://aws.amazon.com/blogs/big-data/data-lake-ingestion-automatically-partition-hive-external-tables-with-aws/

While it technically works there are major negatives:

  • Uses a bunch of proprietary aws tech (lambda, dynamo)
  • Uses s3 event notifications, which are lossy and can drop events
  • Lots of complexity instead of dropping in a config to Kafka connect

@OneCricketeer
Copy link

OneCricketeer commented Mar 11, 2020

You could build the same in OSS land.

Storage - MinIO / Ceph (has eventing on object creation)
Event Handles - Debezium / Kafka Connect / custom consumer
Database - (simply used for de-duping partition paths), say KSQL? Postgres / Maria (reuse the Hive Remote Metastore 🤔)? Maybe Redis?

Add in a Hive metastore client and you'd be close to the same thing...

Point being, you're already in AWS if you're using S3, and probably EC2/EKS if you're running Connect there, and probably EMR / Glue / Athena if you're using wanting to use Hive.

Thus, what's so wrong with adding Lambda events on your S3? Granted, they may be lossy, but plenty of businesses rely on them for highly scaled serverless use cases.

@OneCricketeer
Copy link

Additionally, the "bunch of tech" problem (and by that I mean pieces to configure} could be reduced down to a handful of Pulumi / Terraform / AWS-SDK + HTTP-REST scripts

@grantatspothero
Copy link

Not saying it is impossible to do (our company is building our own system in a similar but slightly different fashion) just that it feels like a common problem and it would help a lot of people not roll their own.

Thus, what's so wrong with adding Lambda events on your S3? Granted, they may be lossy, but plenty of businesses rely on them for highly scaled serverless use cases.

See here:
http://www.hydrogen18.com/blog/aws-s3-event-notifications-probably-once.html

I don't feel comfortable potentially losing data, but it depends on your usecase if that is acceptable.

@OneCricketeer
Copy link

OneCricketeer commented Mar 11, 2020

Understandable.

I'm happy to review/test a block of code that lives around here which happens to produce to some configurable topic

@OneCricketeer
Copy link

a block of code that lives around here

@teabot - You're working on something similar, too, yeah?

@teabot
Copy link

teabot commented Mar 13, 2020

Hey @Cricket007. Yes, this is a familiar problem domain for me (regrettably). We'd previously solved this by building around the ground up services that focused only on landing data to S3 and registering it in Hive. In these systems we coordinate things slightly differently: we had a central coordinator that would instruct workers to periodically write sets of partition-offsets to S3, and then, if all workers completed successfully, we'd add the Hive partition.

The benefit of this was:

  • Simple
  • Coordination of Hive + S3 tightly integrated
  • Partition only added after all data successfully written
  • Hive partitions are immutable
  • Max event latency == landing interval
  • Write all events with the most recent schema (avoid splitting files)

The disadvantages:

  • Consumers are very bursty (large batches on each landing interval)
  • Bespoke distributed system to maintain

In our use cases it is super important that Hive partitions are immutable and I'm of the opinion that it's not possible to robustly provide this behaviour with Kafka Connect. @brokenjacobs suggests that something should happen when a KC S3 partition is finalised, but I don't think that state exists in KC. We've got something similar by partitioning by wall clock, observing S3 paths, and adding the latest - 1 partition. But this increases the latency and has some potential edge cases, especially when concurrently writing from multiple regions.

@OneCricketeer
Copy link

Thanks for the response.

I want to ask about the definition of "finalized partitions". This makes sense in the context of the wall clock partitioner, but not others.

But in the event of other partitioners, you get into the flow chart of partitions already existing (yes/no). In the yes case, really nothing needs done, IMO. In the no case, really only on the first file dropped to s3 do you really need to run ADD PARTITION statement.

Now, in the event of schema modifications, that's just ALTER TABLE SET PROPERTIES, AFAIK.

I had a working PR over in the hdfs connect repo to refactor the Hive module to make it more generalized to be used here. I may work on it again while I #quarantineandchill

@teabot
Copy link

teabot commented Mar 13, 2020

I want to ask about the definition of "finalized partitions". This makes sense in the context of the wall clock partitioner, but not others.

Our Hive ETL consumers are driven by Hive events. For example, when a partition gets added, Hive ETLs receive an ADD PARTITION event and get to work processing it. This avoids best guess chronological scheduling and associated latencies. My question then is, as a downstream Hive ETL:

  • When do I know that I can process a partition? (I can't use ADD PARTITION event, as data is still arriving)
  • How can I ensure that I process all the data that resides in a partition, now and forever (I somehow need to be told whenever a new file arrives in the partition - trigger with ALTER PARTITION event perhaps?)
  • How can I report on the data that I've processed (I can't use the partition identifier, as this potentially refers to an ever changing set of data)

Other systems solve this by tracking files, not folders, but for Hive we have to live with this limitation.

@OneCricketeer
Copy link

I can verify on HDFS at least, you can add a Hive partition before data even exists under the path. If you have one file, or all files that'll ever be written in the path, the fact that the partition is created is all the query cares about. msck repair doesn't refresh any file metadata...

So, when can you access? Immediately. It just will indefinitely be incomplete outside of the wallclock partitioning scheme. (correct me if I'm way off here)

AFAIK, this problem isn't inherent to Hive. Even in traditional RDBMS, you could have update historical records and without a last_updated field or other eventing mechanism, the fact that the change happened would go unnoticed until queried

@massdosage
Copy link

@Cricket007 the above is true but then the partitions aren't immutable, running a query against that partition will potentially return different results every time until it's "finished". What we'd want to do is know when a partition is "finished" and only then process this block of immutable data and cascade this downstream in a similar chain of complete immutable partition changes.

@grantatspothero
Copy link

grantatspothero commented Mar 13, 2020

I think @teabot is suggesting is for ETL processes (not for querying), you need to know when the partition is finalized so you don't start processing data too soon and lose out on data.

The tricky part: depending on your partitioning strategy the partition may never be finalized. For example: if you partition by non-wall clock time (some property of the record like a business_id) then it is possible for a partition to never be finalized. Data will be added forever over time.

This feels like an intrinsic problem with the choice of partitioning strategy. If you want partitions to be immutable then ensure you partition by a field that won't have late arriving data but if you don't care about having immutable partitions then partition by whatever you want.

@OneCricketeer
Copy link

Yeah, I get that.

In that case, we could take the approach of the Hive-Kafka adapter and sink the actual event time (possible already with SMT), but even there producers are able to modify that field on their own... So 🤷🏼‍♂️

I wonder how Secor or Gobblin handle this problem

@massdosage
Copy link

What we typically do is land the data from Kafka into Hive partitioned by some kind of wall time and then have a process downstream which takes this data and repartitions it using something more meaningful for end users or other ETLs. And yes, as @grantatspothero says, this might mean existing partitions are affected. But that's OK, we just merge the data into the existing partitions by creating a "new" location with the old and new data and then changing the existing partition location to point to this. We then trigger events for all the affected partitions which in turn cascade these changes downstream. The point is that each change is consistent at the partition grain. So having this feature in Kafka Connect would mean we can enable this pattern right from the source which would be great.

@OneCricketeer
Copy link

Some great discussion here now 😊

Linking to a comment I made on a Kafka JIRA about exposing callbacks in Connect tasks

https://issues.apache.org/jira/browse/KAFKA-7815

@teabot
Copy link

teabot commented Mar 13, 2020

This feels like an intrinsic problem with the choice of partitioning strategy. If you want partitions to be immutable then ensure you partition by a field that won't have late arriving data but if you don't care about having immutable partitions then partition by whatever you want.

It is possible to solve this. Apache Iceberg does so by also modelling dataset changes as snapshots. You can partition the data however you want and then processes snapshots as they are created. We also do this (much less efficiently) with Hive by using a copy-on-write approach when updating partitions. It adds more complexity of course, but you get some nice guarantees.

@grantatspothero
Copy link

grantatspothero commented Mar 13, 2020

We also do this (much less efficiently) with Hive by using a copy-on-write approach when updating partitions.

You don't need to do copy on write with this s3 sink though right? The sink was designed to never need to delete files from s3 by deterministic processing and usage of s3 MPU(see design here) so it is safe to have concurrent readers querying the data while writes are happening.

These are two separate problems:

  1. Can you have concurrent readers while writes are happening (for this kafka connect s3 sink, yes)
  2. Can you have immutable partitions that do not change once finalized (for this kafka connect s3 sink, sort of. it depends on your choice of partitioning strategy)

Iceberg solves 1 in a different more general way (through snapshot isolation). But for this specific usecase of an s3 kafka connect sink integrated with hive, it shouldn't be necessary.

@ferozed
Copy link

ferozed commented May 25, 2020

We also have this need. We currently solve this by landing data in s3 from connect in a raw bucket. Then have a scheduled airflow job that converts the data to parquet, writes to a different location and then creates hive tables. We solve the problem or late arriving data by triggering the workflow 'N' minutes after the last hour. Our partitioning is all time based ( YYYY/MM/DD/HH )

Needless to say, the airflow based job is not the best, and we want to see if we can merge that into the kafka connector itself. Is anybody doing this currently? i dont want to duplicate efforts.

@jatink5251
Copy link

jatink5251 commented Apr 29, 2021

@OneCricketeer can we add this support in s3 sink that will be great help.
this will help i think apache/kafka#6171

@OneCricketeer
Copy link

That PR is for sources, not sinks, AFAIK

@jatink5251
Copy link

@OneCricketeer any plans to close this feature, it will be great help.

@OneCricketeer
Copy link

I no longer use S3 connect and don't work for Confluent. PRs are open to anyone that wants to contribute

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

9 participants