Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@ static/data
.vscode/settings.json
.env
.evidence/meta

.meltano/
40 changes: 40 additions & 0 deletions meltano.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
version: 1
default_environment: dev
project_id: 64e752dd-4728-4bbd-b8d0-9af37a481f6b
environments:
- name: dev
- name: staging
- name: prod
send_anonymous_usage_stats: false
plugins:
extractors:
- name: tap-gharchive
namespace: tap_gharchive
pip_url: -e .
executable: tap-gharchive
config:
api_url: https://data.gharchive.org
streams:
- name: github_events
path: /2023-10-01-0.json.gz
primary_keys:
- id
schema:
properties:
type:
type: string
actor:
type: object
repo:
type: object
created_at:
type: string
org:
type: object
payload:
type: object
start_date: '2023-10-01'
loaders:
- name: target-jsonl
variant: andyh1203
pip_url: target-jsonl
218 changes: 218 additions & 0 deletions pipeline/gharchive_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
import dlt
from dlt.sources.sql_database import sql_table
import pendulum
from typing import Iterator, Dict, List
from collections import defaultdict
import os


def process_events_for_stats(events: List[Dict]) -> Dict:
"""Process a batch of events to calculate statistics."""
stats = {
"repo_metrics": defaultdict(
lambda: {
"stars": 0,
"forks": 0,
"watchers": 0,
}
),
"traffic": defaultdict(
lambda: {
"views": defaultdict(int),
"clones": defaultdict(int),
"unique_views": defaultdict(int),
"unique_clones": defaultdict(int),
}
),
"contributors": defaultdict(
lambda: {
"commits": 0,
"additions": 0,
"deletions": 0,
"first_contribution": None,
}
),
"issues": {
"total": 0,
"open": 0,
"closed": 0,
"response_times": [],
"close_times": [],
"daily_opened": defaultdict(int),
"daily_closed": defaultdict(int),
},
"pull_requests": {
"total": 0,
"open": 0,
"closed": 0,
"response_times": [],
"close_times": [],
"daily_opened": defaultdict(int),
"daily_closed": defaultdict(int),
},
}

for event in events:
repo_name = event.get("repo", {}).get("name", "").split("/")[-1]
event_type = event.get("type")

# Process different event types
if event_type == "WatchEvent":
stats["repo_metrics"][repo_name]["stars"] += 1
elif event_type == "ForkEvent":
stats["repo_metrics"][repo_name]["forks"] += 1
elif event_type == "PushEvent":
author = event.get("actor", {}).get("login")
if author:
stats["contributors"][author]["commits"] += len(
event.get("payload", {}).get("commits", [])
)
for commit in event.get("payload", {}).get("commits", []):
stats["contributors"][author]["additions"] += commit.get(
"stats", {}
).get("additions", 0)
stats["contributors"][author]["deletions"] += commit.get(
"stats", {}
).get("deletions", 0)
elif event_type == "IssuesEvent":
action = event.get("payload", {}).get("action")
issue = event.get("payload", {}).get("issue", {})
is_pr = "pull_request" in issue

stats_key = "pull_requests" if is_pr else "issues"
stats[stats_key]["total"] += 1

if action == "opened":
stats[stats_key]["open"] += 1
created_date = pendulum.parse(issue.get("created_at")).format(
"YYYY-MM-DD"
)
stats[stats_key]["daily_opened"][created_date] += 1
elif action == "closed":
stats[stats_key]["closed"] += 1
closed_date = pendulum.parse(event.get("created_at")).format(
"YYYY-MM-DD"
)
stats[stats_key]["daily_closed"][closed_date] += 1

# Calculate close time
if issue.get("created_at"):
created_time = pendulum.parse(issue["created_at"])
closed_time = pendulum.parse(event.get("created_at"))
close_time = (closed_time - created_time).total_seconds()
stats[stats_key]["close_times"].append(close_time)

elif event_type == "IssueCommentEvent":
issue = event.get("payload", {}).get("issue", {})
is_pr = "pull_request" in issue
stats_key = "pull_requests" if is_pr else "issues"

# Calculate response time for first comment
if issue.get("created_at") and event.get("created_at"):
created_time = pendulum.parse(issue["created_at"])
comment_time = pendulum.parse(event.get("created_at"))
response_time = (comment_time - created_time).total_seconds()
stats[stats_key]["response_times"].append(response_time)

return stats


@dlt.source
def gharchive_source(start_date: str = "2024-01-01", end_date: str = None):
"""
Load GitHub events for nf-core organization from GH Archive BigQuery dataset.

Args:
start_date: Start date in YYYY-MM-DD format
end_date: End date in YYYY-MM-DD format (defaults to current date)
"""
if end_date is None:
end_date = pendulum.now().format("YYYY-MM-DD")

# Get BigQuery credentials path from environment
# credentials_path = os.getenv('GOOGLE_APPLICATION_CREDENTIALS')
# if not credentials_path:
# raise ValueError("GOOGLE_APPLICATION_CREDENTIALS environment variable must be set")

# Construct BigQuery connection string for ConnectorX
conn = f"bigquery://{credentials_path}"

@dlt.resource(write_disposition="append", name="raw_events")
def events(
start_date: str = start_date, end_date: str = end_date
) -> Iterator[Dict]:
"""Query GitHub events from BigQuery."""
query = f"""
SELECT *
FROM `githubarchive.day.*`
WHERE _TABLE_SUFFIX BETWEEN FORMAT_DATE('%Y%m%d', DATE('{start_date}'))
AND FORMAT_DATE('%Y%m%d', DATE('{end_date}'))
AND JSON_EXTRACT_SCALAR(payload, '$.organization.login') = 'nf-core'
"""

# Create sql_table with ConnectorX backend
table = sql_table(
conn, # BigQuery connection string
query=query,
chunk_size=100000, # Process in chunks for better memory usage
backend="connectorx", # Use ConnectorX backend for better performance
reflection_level="full_with_precision", # Get exact data types
backend_kwargs={"return_type": "arrow"}, # Use Arrow for better performance
)
yield from table

@dlt.resource(write_disposition="merge", primary_key="date", name="daily_stats")
def daily_statistics(
start_date: str = start_date, end_date: str = end_date
) -> Iterator[Dict]:
"""Calculate daily statistics from BigQuery events."""
start = pendulum.parse(start_date)
end = pendulum.parse(end_date)
current = start

while current <= end:
query = f"""
SELECT *
FROM `githubarchive.day.{current.format("YYYYMMDD")}`
WHERE JSON_EXTRACT_SCALAR(payload, '$.organization.login') = 'nf-core'
"""

# Create sql_table with ConnectorX backend
table = sql_table(
conn,
query=query,
chunk_size=100000,
backend="connectorx",
reflection_level="full_with_precision",
backend_kwargs={"return_type": "arrow"},
)
daily_events = list(table)

if daily_events:
stats = process_events_for_stats(daily_events)
yield {"date": current.format("YYYY-MM-DD"), "stats": stats}

current = current.add(days=1)

return events, daily_statistics


if __name__ == "__main__":
# Initialize the pipeline
pipeline = dlt.pipeline(
pipeline_name="gharchive",
destination="duckdb",
dataset_name="github_events",
progress="log", # Add progress logging
dev_mode=True, # Enable dev mode for better debugging
)

# Run the pipeline for 2024 data
load_info = pipeline.run(
gharchive_source(
start_date="2024-01-01", end_date=pendulum.now().format("YYYY-MM-DD")
),
loader_file_format="parquet", # Use parquet for better performance
)

print(load_info)
5 changes: 4 additions & 1 deletion pipeline/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ dependencies = [
"dlt[cli,duckdb,motherduck]>=1.4.1",
"python-dotenv==1.0.0",
"requests>=2.31.0",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this version seems to not be available?

"slack-sdk>=3.33.5",
"slack-sdk>=3.33.0",
"pendulum>=2.1.2",
Comment on lines +12 to +13
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same for these ones

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am trying to resolve the environment with poetry and it has been running for an hour now 😢 ...

# "google-cloud-bigquery>=3.17.2",
# "connectorx>=0.3.2"
]
requires-python = ">=3.9"

Expand Down
12 changes: 10 additions & 2 deletions pipeline/uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

64 changes: 64 additions & 0 deletions plugins/extractors/tap-bigquery--anelendata.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
{
"plugin_type": "extractors",
"name": "tap-bigquery",
"namespace": "tap_bigquery",
"variant": "anelendata",
"label": "BigQuery",
"docs": "https://hub.meltano.com/extractors/tap-bigquery--anelendata",
"repo": "https://github.com/anelendata/tap-bigquery",
"pip_url": "tap-bigquery",
"description": "BigQuery data warehouse extractor",
"logo_url": "https://hub.meltano.com/assets/logos/extractors/bigquery.png",
"capabilities": [
"catalog",
"discover",
"state"
],
"settings_group_validation": [
[
"streams",
"start_datetime",
"credentials_path"
]
],
"settings": [
{
"name": "streams",
"kind": "array",
"label": "Streams",
"description": "Array of objects with `name`, `table`, `columns`, `datetime_key`, and `filters` keys:\n\n- `name`: The entity name, used by most loaders as the name of the table to be created.\n- `table`: Fully qualified table name in BigQuery, with format `` `<project>.<dataset>.<table>` ``. Since backticks have special meaning in YAML, values in `meltano.yml` should be wrapped in double quotes.\n- `columns`: Array of column names to select. Using `[\"*\"]` is not recommended as it can become very expensive for a table with a large number of columns.\n- `datetime_key`: Name of datetime column to use as [replication key](https://docs.meltano.com/guide/integration#replication-key).\n- `filters`: Optional array of `WHERE` clauses to filter extracted data, e.g. `\"column='value'\"`.\n"
},
{
"name": "credentials_path",
"env": "GOOGLE_APPLICATION_CREDENTIALS",
"value": "$MELTANO_PROJECT_ROOT/client_secrets.json",
"label": "Credentials Path",
"description": "Fully qualified path to `client_secrets.json` for your service account.\n\nSee the [\"Activate the Google BigQuery API\" section of the repository's README](https://github.com/anelendata/tap-bigquery#step-1-activate-the-google-bigquery-api) and <https://cloud.google.com/docs/authentication/production>.\n\nBy default, this file is expected to be at the root of your project directory.\n"
},
{
"name": "start_datetime",
"kind": "date_iso8601",
"label": "Start Datetime",
"description": "Determines how much historical data will be extracted. Please be aware that the larger the time period and amount of data, the longer the initial extraction can be expected to take."
},
{
"name": "end_datetime",
"kind": "date_iso8601",
"label": "End Datetime",
"description": "Date up to when historical data will be extracted."
},
{
"name": "limit",
"kind": "integer",
"label": "Limit",
"description": "Limits the number of records returned in each stream, applied as a limit in the query."
},
{
"name": "start_always_inclusive",
"kind": "boolean",
"value": true,
"label": "Start Always Inclusive",
"description": "When replicating incrementally, disable to only select records whose `datetime_key` is greater than the maximum value replicated in the last run, by excluding records whose timestamps match exactly. This could cause records to be missed that were created after the last run finished, but during the same second and with the same timestamp."
}
]
}
Loading