Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP on python substreams handler #6

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions python/.idea/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions python/.idea/inspectionProfiles/Project_Default.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions python/.idea/inspectionProfiles/profiles_settings.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions python/.idea/misc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions python/.idea/modules.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions python/.idea/python.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions python/.idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions python/cursor.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
block:
number: 300810091
cursor: U2Z2OCSd5WHhWt9z2SEHLqWwLpc_DFlvVQPkKhFIhIGOkS2UnNXySARzAXjVycz2o2CqHVvfmoa4bh8xoMkJvpbrxesenFV_FRFOzoPt-7TnefPyMAsRCdo9DqvGNpC-XVWyZ1OefM91oYPmSbeAEjhTKJUvdWT0jjkD2PQgN_B72y4=
263 changes: 198 additions & 65 deletions python/main.py
Original file line number Diff line number Diff line change
@@ -1,81 +1,214 @@
import grpc
import sys
import os
import requests
import time
import yaml
from collections import deque
from abc import ABC, abstractmethod
import sys

import sf.substreams.v1.package_pb2 as package_pb2
import sf.substreams.rpc.v2.service_pb2 as service_pb2
import sf.substreams.rpc.v2.service_pb2_grpc as service_pb2_grpc
import sf.solana.spl.token.v1.spl_token_pb2 as spl_token_pb2


def read_spkg_file(file_path):
package = package_pb2.Package()

with open(file_path, "rb") as file:
package.ParseFromString(file.read())

return package

import sf.substreams.solana.type.v1.account_pb2 as account_pb2

CURSOR_FILE = "cursor.yml"
BLOCK_HISTORY_SIZE = 20
INACTIVITY_TIMEOUT = 2 # seconds


class EventProcessor(ABC):
"""Abstract base class for processing events."""
@abstractmethod
def process_event(self, event):
pass


class DefaultEventProcessor(EventProcessor):
"""Default implementation of EventProcessor."""
def process_event(self, event):
import base58
print(f"Processing event:", base58.b58encode(event.address).decode('utf-8'))


class SubstreamsClient:
def __init__(self, package_url, module, start_block_num=-1, stop_block_num=None):
self.package_url = package_url
self.module = module
self.start_block_num = start_block_num
self.stop_block_num = stop_block_num
self.package = self.read_spkg_from_url_or_path(package_url)
self.cursor_data = None if start_block_num != -1 else self.load_cursor()
self.block_history = deque(maxlen=BLOCK_HISTORY_SIZE)
self.last_data_time = time.time()

def read_spkg_from_url_or_path(self, spkg_path):
"""Reads a Substreams package from either a URL or a local file path."""
package = package_pb2.Package()
if spkg_path.startswith("http://") or spkg_path.startswith("https://"):
print(f"Downloading package from URL: {spkg_path}")
response = requests.get(spkg_path)
response.raise_for_status()
package.ParseFromString(response.content)
else:
with open(spkg_path, "rb") as f:
package.ParseFromString(f.read())
return package

def save_cursor(self, cursor, block_number):
"""Save the cursor and block number to a YAML file."""
with open(CURSOR_FILE, "w") as file:
yaml.dump({"cursor": cursor, "block": {"number": block_number}}, file)
print(f"Saved cursor: {cursor}, Block: {block_number}")

def load_cursor(self):
"""Load the cursor and block number from a YAML file."""
if os.path.exists(CURSOR_FILE):
with open(CURSOR_FILE, "r") as file:
cursor_data = yaml.safe_load(file)
print(f"Loaded cursor from file: {cursor_data}")
return cursor_data
return None

def handle_block_scoped_data(self, data, event_processor):
"""Handle block-scoped data and process events using the given event processor."""
output = data.output.map_output
events = account_pb2.FilteredAccounts()

if not output.Unpack(events):
print(f"Failed to unpack events for block {data.clock.number}")
return

block_number = data.clock.number
cursor = data.cursor
self.last_data_time = time.time() # Reset the inactivity timer

# Process events if any
if not events.accounts:
print(f"No events found in block {block_number}")

for event in events.accounts:
event_processor.process_event(event)

# Save cursor and block number
self.save_cursor(cursor, block_number)

# Terminate if stop block number is reached
if self.stop_block_num is not None and block_number >= self.stop_block_num:
print(f"Reached stop block {self.stop_block_num}. Terminating...")
sys.exit(0)

time.sleep(0.05)

def handle_block_undo_signal(self, undo_data):
"""Handle block undo signals by removing affected blocks."""
last_valid_block = undo_data.last_valid_block
print(f"Received block undo signal for block: {last_valid_block}")

while self.block_history and self.block_history[-1]["block_number"] > last_valid_block:
removed_block = self.block_history.pop()
print(f"Removed block {removed_block['block_number']} due to undo signal")


def start_stream(self, event_processor):
"""Start the gRPC stream and handle the connection."""
sf_api_token = os.getenv("SUBSTREAMS_API_TOKEN")
creds = grpc.ssl_channel_credentials()
metadata = [("authorization", f"Bearer {sf_api_token}")]

def read_spkg_from_url(url):
response = requests.get(url)
response.raise_for_status() # Raise an error for bad status codes
package = package_pb2.Package()
package.ParseFromString(response.content)
return package
# Store the initial start block to differentiate between provided and cursor-based streaming
initial_start_block = self.start_block_num

while True:
try:
with grpc.secure_channel("accounts.mainnet.sol.streamingfast.io:443", creds) as channel:
stub = service_pb2_grpc.StreamStub(channel)

# Determine the starting point for streaming
if initial_start_block != -1:
# If a specific start block was provided, ignore the cursor
print(f"Starting from block {initial_start_block} (ignoring saved cursor)")
start_block = initial_start_block
cursor = None
else:
# If no start block was provided, use the saved cursor if available
if self.cursor_data:
print(f"Resuming from saved cursor at block {self.cursor_data['block']['number']}")
start_block = self.cursor_data["block"]["number"]
cursor = self.cursor_data.get("cursor")
else:
print("No cursor found, starting from the earliest available block")
start_block = -1
cursor = None

# Create the gRPC request
request = service_pb2.Request(
start_block_num=start_block,
stop_block_num=self.stop_block_num,
modules=self.package.modules,
output_module=self.module,
production_mode=True,
start_cursor=cursor
)

print(f"Starting stream from block {start_block} with cursor: {cursor}")
stream = stub.Blocks(request, metadata=metadata)

for response in stream:
message_type = response.WhichOneof("message")
if message_type == "block_scoped_data":
self.handle_block_scoped_data(response.block_scoped_data, event_processor)
elif message_type == "block_undo_signal":
self.handle_block_undo_signal(response.block_undo_signal)

# Reset inactivity timer
self.last_data_time = time.time()

# Check if stop block is reached and terminate if so
if self.stop_block_num is not None and self.block_history[-1][
"block_number"] >= self.stop_block_num:
print(f"Reached stop block {self.stop_block_num}. Terminating...")
sys.exit(0)

# Check for inactivity timeout
if time.time() - self.last_data_time > INACTIVITY_TIMEOUT:
print("Inactivity detected.")

if initial_start_block == -1:
# Restart using the last processed block if no specific start block was provided
last_processed_block = self.block_history[-1]["block_number"] if self.block_history else -1
print(f"Restarting stream from last processed block: {last_processed_block}")
self.start_block_num = last_processed_block
self.cursor_data = None # Clear cursor to restart from last processed block
else:
# Restart using the original start block if provided
print(f"Restarting stream from initial block {initial_start_block}")
self.start_block_num = initial_start_block

break # Restart the stream

except grpc.RpcError as e:
print(f"gRPC error: {e}. Reconnecting in 5 seconds...")
time.sleep(5)

# Reload cursor only if no explicit start block is given
if initial_start_block == -1:
self.cursor_data = self.load_cursor()


def main():
sf_api_token = os.getenv("SF_API_TOKEN")
if not sf_api_token:
print("Error: SF_API_TOKEN environment variable is not defined.")
if len(sys.argv) < 3:
print("Usage: python main.py <package_url_or_path> <module_name> [start_block] [stop_block]")
sys.exit(1)

input = "https://github.com/streamingfast/substreams-solana-spl-token/raw/refs/heads/master/tokens/solana-spl-token-v0.1.0.spkg"
module = "map_block"

if input.startswith("http"):
package = read_spkg_from_url(input)
else:
package = read_spkg_file(input)

# Create a Blocks request
request = service_pb2.Request(
start_block_num=200010000,
stop_block_num=200015000,
modules=package.modules,
output_module=module,
production_mode=True,
)

# Create a credentials object
creds = grpc.ssl_channel_credentials()

print("Create a secure channel using the credentials")
with grpc.secure_channel("mainnet.sol.streamingfast.io:443", creds) as channel:
stub = service_pb2_grpc.StreamStub(channel)
metadata = [("authorization", f"Bearer {sf_api_token}")]
stream = stub.Blocks(request, metadata=metadata)

print("Waiting for stream to start...")
for response in stream:
if response.WhichOneof("message") == "block_scoped_data":
data = response.block_scoped_data
if data.output.name == module:
output = data.output.map_output
events = spl_token_pb2.Events()
succeed = output.Unpack(events)
if succeed != True:
raise Exception(
"Invalid target type, field to unpack is of type {} but tried to unpack it into type {}".format(
output.TypeName(), output.DESCRIPTOR.full_name
)
)

print("Solana Token Events:")
for event in events.data:
print(event)
package_url = sys.argv[1]
module = sys.argv[2]
start_block_num = int(sys.argv[3]) if len(sys.argv) > 3 else -1
stop_block_num = int(sys.argv[4]) if len(sys.argv) > 4 else None

client = SubstreamsClient(package_url, module, start_block_num, stop_block_num)
event_processor = DefaultEventProcessor()
client.start_stream(event_processor)


if __name__ == "__main__":
Expand Down
Loading