https://bitswanpump.readthedocs.io/en/latest/index.html
- Write once, use many times
- Everything is a stream
- Schema-less
- Kappa architecture
- Real-time
- High performance
- Simple to use and well documented, so anyone can write their own stream processor
- Asynchronous via Python 3.5+
async
/await
andasyncio
- Event driven Architecture / Reactor pattern
- Single-threaded core but compatible with threads
- Compatible with pypy, Just-In-Time compiler capable of boosting Python code performace by more than 5x
- Good citizen of the Python ecosystem
- Modularized
#!/usr/bin/env python3
import bspump
import bspump.socket
import bspump.common
import bspump.elasticsearch
class MyPipeline(bspump.Pipeline):
def __init__(self, app):
super().__init__(app)
self.build(
bspump.socket.TCPStreamSource(app, self),
bspump.common.JSONParserProcessor(app, self),
bspump.elasticsearch.ElasticSearchSink(app, self, "ESConnection")
)
if __name__ == '__main__':
app = bspump.BSPumpApplication()
svc = app.get_service("bspump.PumpService")
svc.add_connection(bspump.elasticsearch.ElasticSearchConnection(app, "ESConnection"))
svc.add_pipeline(MyPipeline(app))
app.run()
Dockerfile and instructions are in separate repository.
Releases are happening from a git tag (format: vYY.MM
)
git tag -a v19.07
Following the PyPI packaging, generate distribution package and upload it using following command python -m twine upload dist/*
You can clone the blank application from its own repository.
bspump.amqp
AMQP/RabbitMQ connection, source and sinkbspump.avro
Apache Avro file source and sinkbspump.common
Common processors and parsersbspump.elasticsearch
ElasticSearch connection, source and sinkbspump.file
File sources and sinks (plain files, JSON, CSV)bspump.filter
Content, Attribute and TimeDrift filtersbspump.http.client
HTTP client source, WebSocket client sinkbspump.http.web
HTTP server source and sink, WebSocket server sourcebspump.influxdb
InfluxDB connection and sinkbspump.kafka
Kafka connection, source and sinkbspump.mail
SMTP connection and sinkbspump.mongodb
MongoDB connection and lookupbspump.mysql
MySQL connection, source and sinkbspump.parquet
Apache Parquet file sinkbspump.postgresql
PostgreSQL connection and sinkbspump.slack
Slack connection and sinkbspump.socket
TCP source, UDP sourcebspump.trigger
Opportunistic, PubSub and Periodic triggersbspump.crypto
Cryptographybspump.declarative
Declarative processors and expressions- Hashing: SHA224, SHA256, SHA384, SHA512, SHA1, MD5, BLAKE2b, BLAKE2s
- Symmetric Encryption: AES 128, AES 192, AES 256
bspump.analyzer
- Time Window analyzer
- Session analyzer
- Geographical analyzer
- Time Drift analyzer
bspump.lookup
- GeoIP Lookup
bspump.unittest
- Interface for testing Processors / Pipelines
bspump.web
Pump API endpoints for pipelines, lookups etc.
Google Sheet with technological compatiblity matrix: https://docs.google.com/spreadsheets/d/1L1DvSuHuhKUyZ3FEFxqEKNpSoamPH2Z1ZaFuHyageoI/edit?usp=sharing
from unittest.mock import MagicMock
from bspump.unittest import ProcessorTestCase
class MyProcessorTestCase(ProcessorTestCase):
def test_my_processor(self):
# setup processor for test
self.set_up_processor(my_project.processor.MyProcessor, "proc-arg", proc="key_arg")
# mock methods to suit your needs on pipeline ..
self.Pipeline.method = MagicMock()
# .. or instance of processor
self.Pipeline.Processor.method = MagicMock()
output = self.execute(
[(None, {'foo': 'bar'})] # Context, event
)
# assert output
self.assertEqual(
[event for context, event in output],
[{'FOO': 'BAR'}]
)
# asssert expected calls on `self.Pipeline.method` or `self.Pipeline.Processor.method`
self.Pipeline.Processor.method.assert_called_with(**expected)
python3 -m unittest test
You can replace test
with a location of your unit test module.
BSPump is open source software, available under the BSD 3-Clause License.