Skip to content

Spark stream consumption commit in kafka consumer group

License

Notifications You must be signed in to change notification settings

HashLoad/freeza-offset

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

23 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

freeza-offset

freeza-offset

What is it?

freeza-offset is a Python package that provides a simple way to commit the offset consumed by Spark Streaming in Kafka's ConsumerGroup, but this commit is only for consum tracking.

Main Features

Here are just a few of the things that freeza-offset does well:

  • Commits the offset consumed in kafka
  • Tracking Spark consumption lag at Kafka
  • The offset is not just in control of the spark

Where to get it

The source code is currently hosted on GitHub at: https://github.com/HashLoad/freeza-offset

Binary installers for the latest released version are available at the Python package index and on conda.

# conda
conda install freeza-offset
# PyPI
pip install freeza-offset
# Databricks
dbutils.library.installPyPI("freeza-offset")

Dependencies

Installation from sources

In the freeza-offset directory (same one where you found this file after cloning the git repo), execute:

python setup.py install

Example:

pip install freeza-offset
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("FreezaCommitTest") \
    .getOrCreate()
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092") \
  .option("subscribe", "topic-name") \
  .option("startingOffsets", "earliest") \
  .option("kafka.group.id", "spark-freeza-runner") \
  .load()
df.selectExpr("key", "value")
qry = df.writeStream \
    .format("console") \
    .option("truncate","false") \
    .start()
import freeza
tr = freeza.start_commiter_thread(
    query=qry,
    bootstrap_servers=bootstrap_servers,
    group_id="spark-freeza-commiter"
)
tr.isAlive()

Getting Help

For usage questions, the best place to go to is open new issue

Contributing to freeza-offset

All contributions, bug reports, bug fixes, documentation improvements, enhancements, and ideas are welcome.

License

MIT