This library helps to process streaming data using Polars.
pip install polars-streaming
Install from sources
Alternatively, you can also clone the latest version from the repository and install it directly from the source code:
pip install -e .
>>> from polars_streaming import StreamProcessor
>>> s = StreamProcessor()
>>> s.readStream.format('socket').options({'host':'localhost','port':12345}).load()
>>> def transformation(df):
>>> # Add your transformation code here
>>> df = df.sum() # For example purpose, I am calculating the sum.
>>> return df # Return the transformed dataframe
>>> s.add_transform(transformation)
>>> s.writeStream.format('console').trigger('3 seconds')
>>> s.start()
>>> from polars_streaming import StreamProcessor
>>> s = StreamProcessor()
>>> s.readStream.format('kafka').options({'kafka.bootstrap.servers':'localhost','subscribe': 'topic_name',
'startingOffsets': 'earliest',
'kafka.group.id': 'g1'}).load()
>>> def transformation(df):
>>> # Add your transformation code here
>>> df = df.sum() # For example purpose, I am calculating the sum.
>>> return df # Return the transformed dataframe
>>> s.add_transform(transformation)
>>> s.writeStream.format('console').trigger('10 seconds')
>>> s.start()
>>> from polars_streaming import StreamProcessor
>>> s = StreamProcessor()
>>> s.readStream.format('csv').load('read_path_of_file_source') # Reads the csv file from the path once it is created
>>> def transformation(df):
>>> # Add your transformation code here
>>> df = df.sum() # For example purpose, I am calculating the sum.
>>> return df # Return the transformed dataframe
>>> s.add_transform(transformation)
>>> s.writeStream.option('path','write_path').format('avro') # Write the processed data to the write path in avro format
>>> s.start()
- Socket
- File Sources
- CSV
- JSON
- AVRO
- PARQUET
- Kafka
- Console
- File Sources
- CSV
- JSON
- AVRO
- PARQUET
- MongoDB
- ElasticSearch