diff --git a/.gitignore b/.gitignore index 59eb553..1cddc94 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,5 @@ static/data .vscode/settings.json .env .evidence/meta + +.meltano/ \ No newline at end of file diff --git a/meltano.yml b/meltano.yml new file mode 100644 index 0000000..cd42753 --- /dev/null +++ b/meltano.yml @@ -0,0 +1,40 @@ +version: 1 +default_environment: dev +project_id: 64e752dd-4728-4bbd-b8d0-9af37a481f6b +environments: +- name: dev +- name: staging +- name: prod +send_anonymous_usage_stats: false +plugins: + extractors: + - name: tap-gharchive + namespace: tap_gharchive + pip_url: -e . + executable: tap-gharchive + config: + api_url: https://data.gharchive.org + streams: + - name: github_events + path: /2023-10-01-0.json.gz + primary_keys: + - id + schema: + properties: + type: + type: string + actor: + type: object + repo: + type: object + created_at: + type: string + org: + type: object + payload: + type: object + start_date: '2023-10-01' + loaders: + - name: target-jsonl + variant: andyh1203 + pip_url: target-jsonl diff --git a/pipeline/gharchive_pipeline.py b/pipeline/gharchive_pipeline.py new file mode 100644 index 0000000..a746e7b --- /dev/null +++ b/pipeline/gharchive_pipeline.py @@ -0,0 +1,218 @@ +import dlt +from dlt.sources.sql_database import sql_table +import pendulum +from typing import Iterator, Dict, List +from collections import defaultdict +import os + + +def process_events_for_stats(events: List[Dict]) -> Dict: + """Process a batch of events to calculate statistics.""" + stats = { + "repo_metrics": defaultdict( + lambda: { + "stars": 0, + "forks": 0, + "watchers": 0, + } + ), + "traffic": defaultdict( + lambda: { + "views": defaultdict(int), + "clones": defaultdict(int), + "unique_views": defaultdict(int), + "unique_clones": defaultdict(int), + } + ), + "contributors": defaultdict( + lambda: { + "commits": 0, + "additions": 0, + "deletions": 0, + "first_contribution": None, + } + ), + "issues": { + "total": 0, + "open": 0, + "closed": 0, + "response_times": [], + "close_times": [], + "daily_opened": defaultdict(int), + "daily_closed": defaultdict(int), + }, + "pull_requests": { + "total": 0, + "open": 0, + "closed": 0, + "response_times": [], + "close_times": [], + "daily_opened": defaultdict(int), + "daily_closed": defaultdict(int), + }, + } + + for event in events: + repo_name = event.get("repo", {}).get("name", "").split("/")[-1] + event_type = event.get("type") + + # Process different event types + if event_type == "WatchEvent": + stats["repo_metrics"][repo_name]["stars"] += 1 + elif event_type == "ForkEvent": + stats["repo_metrics"][repo_name]["forks"] += 1 + elif event_type == "PushEvent": + author = event.get("actor", {}).get("login") + if author: + stats["contributors"][author]["commits"] += len( + event.get("payload", {}).get("commits", []) + ) + for commit in event.get("payload", {}).get("commits", []): + stats["contributors"][author]["additions"] += commit.get( + "stats", {} + ).get("additions", 0) + stats["contributors"][author]["deletions"] += commit.get( + "stats", {} + ).get("deletions", 0) + elif event_type == "IssuesEvent": + action = event.get("payload", {}).get("action") + issue = event.get("payload", {}).get("issue", {}) + is_pr = "pull_request" in issue + + stats_key = "pull_requests" if is_pr else "issues" + stats[stats_key]["total"] += 1 + + if action == "opened": + stats[stats_key]["open"] += 1 + created_date = pendulum.parse(issue.get("created_at")).format( + "YYYY-MM-DD" + ) + stats[stats_key]["daily_opened"][created_date] += 1 + elif action == "closed": + stats[stats_key]["closed"] += 1 + closed_date = pendulum.parse(event.get("created_at")).format( + "YYYY-MM-DD" + ) + stats[stats_key]["daily_closed"][closed_date] += 1 + + # Calculate close time + if issue.get("created_at"): + created_time = pendulum.parse(issue["created_at"]) + closed_time = pendulum.parse(event.get("created_at")) + close_time = (closed_time - created_time).total_seconds() + stats[stats_key]["close_times"].append(close_time) + + elif event_type == "IssueCommentEvent": + issue = event.get("payload", {}).get("issue", {}) + is_pr = "pull_request" in issue + stats_key = "pull_requests" if is_pr else "issues" + + # Calculate response time for first comment + if issue.get("created_at") and event.get("created_at"): + created_time = pendulum.parse(issue["created_at"]) + comment_time = pendulum.parse(event.get("created_at")) + response_time = (comment_time - created_time).total_seconds() + stats[stats_key]["response_times"].append(response_time) + + return stats + + +@dlt.source +def gharchive_source(start_date: str = "2024-01-01", end_date: str = None): + """ + Load GitHub events for nf-core organization from GH Archive BigQuery dataset. + + Args: + start_date: Start date in YYYY-MM-DD format + end_date: End date in YYYY-MM-DD format (defaults to current date) + """ + if end_date is None: + end_date = pendulum.now().format("YYYY-MM-DD") + + # Get BigQuery credentials path from environment + # credentials_path = os.getenv('GOOGLE_APPLICATION_CREDENTIALS') + # if not credentials_path: + # raise ValueError("GOOGLE_APPLICATION_CREDENTIALS environment variable must be set") + + # Construct BigQuery connection string for ConnectorX + conn = f"bigquery://{credentials_path}" + + @dlt.resource(write_disposition="append", name="raw_events") + def events( + start_date: str = start_date, end_date: str = end_date + ) -> Iterator[Dict]: + """Query GitHub events from BigQuery.""" + query = f""" + SELECT * + FROM `githubarchive.day.*` + WHERE _TABLE_SUFFIX BETWEEN FORMAT_DATE('%Y%m%d', DATE('{start_date}')) + AND FORMAT_DATE('%Y%m%d', DATE('{end_date}')) + AND JSON_EXTRACT_SCALAR(payload, '$.organization.login') = 'nf-core' + """ + + # Create sql_table with ConnectorX backend + table = sql_table( + conn, # BigQuery connection string + query=query, + chunk_size=100000, # Process in chunks for better memory usage + backend="connectorx", # Use ConnectorX backend for better performance + reflection_level="full_with_precision", # Get exact data types + backend_kwargs={"return_type": "arrow"}, # Use Arrow for better performance + ) + yield from table + + @dlt.resource(write_disposition="merge", primary_key="date", name="daily_stats") + def daily_statistics( + start_date: str = start_date, end_date: str = end_date + ) -> Iterator[Dict]: + """Calculate daily statistics from BigQuery events.""" + start = pendulum.parse(start_date) + end = pendulum.parse(end_date) + current = start + + while current <= end: + query = f""" + SELECT * + FROM `githubarchive.day.{current.format("YYYYMMDD")}` + WHERE JSON_EXTRACT_SCALAR(payload, '$.organization.login') = 'nf-core' + """ + + # Create sql_table with ConnectorX backend + table = sql_table( + conn, + query=query, + chunk_size=100000, + backend="connectorx", + reflection_level="full_with_precision", + backend_kwargs={"return_type": "arrow"}, + ) + daily_events = list(table) + + if daily_events: + stats = process_events_for_stats(daily_events) + yield {"date": current.format("YYYY-MM-DD"), "stats": stats} + + current = current.add(days=1) + + return events, daily_statistics + + +if __name__ == "__main__": + # Initialize the pipeline + pipeline = dlt.pipeline( + pipeline_name="gharchive", + destination="duckdb", + dataset_name="github_events", + progress="log", # Add progress logging + dev_mode=True, # Enable dev mode for better debugging + ) + + # Run the pipeline for 2024 data + load_info = pipeline.run( + gharchive_source( + start_date="2024-01-01", end_date=pendulum.now().format("YYYY-MM-DD") + ), + loader_file_format="parquet", # Use parquet for better performance + ) + + print(load_info) diff --git a/pipeline/pyproject.toml b/pipeline/pyproject.toml index 83a1584..5a0ea0b 100644 --- a/pipeline/pyproject.toml +++ b/pipeline/pyproject.toml @@ -9,7 +9,10 @@ dependencies = [ "dlt[cli,duckdb,motherduck]>=1.4.1", "python-dotenv==1.0.0", "requests>=2.31.0", - "slack-sdk>=3.33.5", + "slack-sdk>=3.33.0", + "pendulum>=2.1.2", +# "google-cloud-bigquery>=3.17.2", +# "connectorx>=0.3.2" ] requires-python = ">=3.9" diff --git a/pipeline/uv.lock b/pipeline/uv.lock index 25c2c16..d56504a 100644 --- a/pipeline/uv.lock +++ b/pipeline/uv.lock @@ -1,5 +1,11 @@ version = 1 requires-python = ">=3.9" +resolution-markers = [ + "python_full_version >= '3.13'", + "python_full_version >= '3.11' and python_full_version < '3.13'", + "python_full_version == '3.10.*'", + "python_full_version < '3.10'", +] [[package]] name = "certifi" @@ -99,7 +105,7 @@ name = "click" version = "8.1.7" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "colorama", marker = "platform_system == 'Windows'" }, + { name = "colorama", marker = "sys_platform == 'win32'" }, ] sdist = { url = "https://files.pythonhosted.org/packages/96/d3/f04c7bfcf5c1862a2a5b845c6b2b360488cf47af55dfa79c98f6a6bf98b5/click-8.1.7.tar.gz", hash = "sha256:ca9853ad459e787e2192211578cc907e7594e294c7ccc834310722b41b9ca6de", size = 336121 } wheels = [ @@ -317,6 +323,7 @@ version = "0.1.0" source = { virtual = "." } dependencies = [ { name = "dlt", extra = ["cli", "duckdb", "motherduck"] }, + { name = "pendulum" }, { name = "python-dotenv" }, { name = "requests" }, { name = "slack-sdk" }, @@ -325,9 +332,10 @@ dependencies = [ [package.metadata] requires-dist = [ { name = "dlt", extras = ["cli", "duckdb", "motherduck"], specifier = ">=1.4.1" }, + { name = "pendulum", specifier = ">=2.1.2" }, { name = "python-dotenv", specifier = "==1.0.0" }, { name = "requests", specifier = ">=2.31.0" }, - { name = "slack-sdk", specifier = ">=3.33.5" }, + { name = "slack-sdk", specifier = ">=3.33.0" }, ] [[package]] diff --git a/plugins/extractors/tap-bigquery--anelendata.lock b/plugins/extractors/tap-bigquery--anelendata.lock new file mode 100644 index 0000000..851bc56 --- /dev/null +++ b/plugins/extractors/tap-bigquery--anelendata.lock @@ -0,0 +1,64 @@ +{ + "plugin_type": "extractors", + "name": "tap-bigquery", + "namespace": "tap_bigquery", + "variant": "anelendata", + "label": "BigQuery", + "docs": "https://hub.meltano.com/extractors/tap-bigquery--anelendata", + "repo": "https://github.com/anelendata/tap-bigquery", + "pip_url": "tap-bigquery", + "description": "BigQuery data warehouse extractor", + "logo_url": "https://hub.meltano.com/assets/logos/extractors/bigquery.png", + "capabilities": [ + "catalog", + "discover", + "state" + ], + "settings_group_validation": [ + [ + "streams", + "start_datetime", + "credentials_path" + ] + ], + "settings": [ + { + "name": "streams", + "kind": "array", + "label": "Streams", + "description": "Array of objects with `name`, `table`, `columns`, `datetime_key`, and `filters` keys:\n\n- `name`: The entity name, used by most loaders as the name of the table to be created.\n- `table`: Fully qualified table name in BigQuery, with format `` `..` ``. Since backticks have special meaning in YAML, values in `meltano.yml` should be wrapped in double quotes.\n- `columns`: Array of column names to select. Using `[\"*\"]` is not recommended as it can become very expensive for a table with a large number of columns.\n- `datetime_key`: Name of datetime column to use as [replication key](https://docs.meltano.com/guide/integration#replication-key).\n- `filters`: Optional array of `WHERE` clauses to filter extracted data, e.g. `\"column='value'\"`.\n" + }, + { + "name": "credentials_path", + "env": "GOOGLE_APPLICATION_CREDENTIALS", + "value": "$MELTANO_PROJECT_ROOT/client_secrets.json", + "label": "Credentials Path", + "description": "Fully qualified path to `client_secrets.json` for your service account.\n\nSee the [\"Activate the Google BigQuery API\" section of the repository's README](https://github.com/anelendata/tap-bigquery#step-1-activate-the-google-bigquery-api) and .\n\nBy default, this file is expected to be at the root of your project directory.\n" + }, + { + "name": "start_datetime", + "kind": "date_iso8601", + "label": "Start Datetime", + "description": "Determines how much historical data will be extracted. Please be aware that the larger the time period and amount of data, the longer the initial extraction can be expected to take." + }, + { + "name": "end_datetime", + "kind": "date_iso8601", + "label": "End Datetime", + "description": "Date up to when historical data will be extracted." + }, + { + "name": "limit", + "kind": "integer", + "label": "Limit", + "description": "Limits the number of records returned in each stream, applied as a limit in the query." + }, + { + "name": "start_always_inclusive", + "kind": "boolean", + "value": true, + "label": "Start Always Inclusive", + "description": "When replicating incrementally, disable to only select records whose `datetime_key` is greater than the maximum value replicated in the last run, by excluding records whose timestamps match exactly. This could cause records to be missed that were created after the last run finished, but during the same second and with the same timestamp." + } + ] +} diff --git a/plugins/extractors/tap-github--meltanolabs.lock b/plugins/extractors/tap-github--meltanolabs.lock new file mode 100644 index 0000000..6357c1a --- /dev/null +++ b/plugins/extractors/tap-github--meltanolabs.lock @@ -0,0 +1,225 @@ +{ + "plugin_type": "extractors", + "name": "tap-github", + "namespace": "tap_github", + "variant": "meltanolabs", + "label": "GitHub", + "docs": "https://hub.meltano.com/extractors/tap-github--meltanolabs", + "repo": "https://github.com/MeltanoLabs/tap-github", + "pip_url": "meltanolabs-tap-github", + "description": "Code hosting platform", + "logo_url": "https://hub.meltano.com/assets/logos/extractors/github.png", + "capabilities": [ + "about", + "batch", + "catalog", + "discover", + "schema-flattening", + "state", + "stream-maps" + ], + "settings_group_validation": [ + [ + "repositories" + ], + [ + "organizations" + ], + [ + "searches" + ], + [ + "user_usernames" + ], + [ + "user_ids" + ] + ], + "settings": [ + { + "name": "additional_auth_tokens", + "kind": "array", + "label": "Additional Auth Tokens", + "description": "List of GitHub tokens to authenticate with. Streams will loop through them when hitting rate limits." + }, + { + "name": "auth_app_keys", + "kind": "array", + "label": "Auth App Keys", + "description": "List of GitHub App credentials to authenticate with. Each credential can be constructed by combining an App ID and App private key into the format `:app_id:;;-----BEGIN RSA PRIVATE KEY----- _YOUR_P_KEY_ -----END RSA PRIVATE KEY-----`." + }, + { + "name": "auth_token", + "kind": "string", + "label": "Auth Token", + "description": "GitHub token to authenticate with.", + "sensitive": true + }, + { + "name": "batch_config.encoding.compression", + "kind": "options", + "label": "Batch Compression Format", + "description": "Compression format to use for batch files.", + "options": [ + { + "label": "GZIP", + "value": "gzip" + }, + { + "label": "None", + "value": "none" + } + ] + }, + { + "name": "batch_config.encoding.format", + "kind": "options", + "label": "Batch Encoding Format", + "description": "Format to use for batch files.", + "options": [ + { + "label": "JSONL", + "value": "jsonl" + }, + { + "label": "Parquet", + "value": "parquet" + } + ] + }, + { + "name": "batch_config.storage.prefix", + "kind": "string", + "label": "Batch Storage Prefix", + "description": "Prefix to use when writing batch files." + }, + { + "name": "batch_config.storage.root", + "kind": "string", + "label": "Batch Storage Root", + "description": "Root path to use when writing batch files." + }, + { + "name": "expiry_time_buffer", + "kind": "integer", + "label": "Expiry Time Buffer" + }, + { + "name": "faker_config.locale", + "kind": "array", + "label": "Faker Locale", + "description": "One or more LCID locale strings to produce localized output for: https://faker.readthedocs.io/en/master/#localization" + }, + { + "name": "faker_config.seed", + "kind": "string", + "label": "Faker Seed", + "description": "Value to seed the Faker generator for deterministic output: https://faker.readthedocs.io/en/master/#seeding-the-generator" + }, + { + "name": "flattening_enabled", + "kind": "boolean", + "label": "Enable Schema Flattening", + "description": "'True' to enable schema flattening and automatically expand nested properties." + }, + { + "name": "flattening_max_depth", + "kind": "integer", + "label": "Max Flattening Depth", + "description": "The max depth to flatten schemas." + }, + { + "name": "metrics_log_level", + "kind": "string", + "label": "Metrics Log Level", + "description": "The log level of the API response metrics." + }, + { + "name": "organizations", + "kind": "array", + "label": "Organizations", + "description": "An array of strings containing the github organizations to be included" + }, + { + "name": "rate_limit_buffer", + "kind": "integer", + "label": "Rate Limit Buffer", + "description": "Add a buffer to avoid consuming all query points for the token at hand. Defaults to 1000." + }, + { + "name": "repositories", + "kind": "array", + "label": "Repositories", + "description": "An array of strings containing the github repos to be included" + }, + { + "name": "searches", + "kind": "array", + "label": "Searches", + "description": "An array of search descriptor objects with the following properties. \"name\" - a human readable name for the search query. \"query\" - a github search string (generally the same as would come after ?q= in the URL)" + }, + { + "name": "skip_parent_streams", + "kind": "boolean", + "label": "Skip Parent Streams", + "description": "Set to true to skip API calls for the parent streams (such as repositories) if it is not selected but children are" + }, + { + "name": "start_date", + "kind": "date_iso8601", + "label": "Start Date" + }, + { + "name": "stream_map_config", + "kind": "object", + "label": "Stream Map Config" + }, + { + "name": "stream_maps", + "kind": "object", + "label": "Stream Maps" + }, + { + "name": "stream_options.milestones.state", + "kind": "options", + "value": "open", + "label": "Stream Options Milestones State", + "description": "Configures which states are of interest. Must be one of [open, closed, all], defaults to open.", + "options": [ + { + "label": "Open", + "value": "open" + }, + { + "label": "Closed", + "value": "closed" + }, + { + "label": "All", + "value": "all" + } + ] + }, + { + "name": "user_agent", + "kind": "string", + "label": "User Agent" + }, + { + "name": "user_ids", + "kind": "array", + "label": "User IDs", + "description": "A list of GitHub user ids." + }, + { + "name": "user_usernames", + "kind": "array", + "label": "User Usernames", + "description": "A list of GithHub usernames." + } + ], + "select": [ + "*.*", + "!traffic_*.*" + ] +} diff --git a/plugins/extractors/tap-rest-api-msdk--widen.lock b/plugins/extractors/tap-rest-api-msdk--widen.lock new file mode 100644 index 0000000..40d66b1 --- /dev/null +++ b/plugins/extractors/tap-rest-api-msdk--widen.lock @@ -0,0 +1,357 @@ +{ + "plugin_type": "extractors", + "name": "tap-rest-api-msdk", + "namespace": "tap_rest_api_msdk", + "variant": "widen", + "label": "REST API", + "docs": "https://hub.meltano.com/extractors/tap-rest-api-msdk--widen", + "repo": "https://github.com/Widen/tap-rest-api-msdk", + "pip_url": "tap-rest-api-msdk", + "description": "REST API", + "logo_url": "https://hub.meltano.com/assets/logos/extractors/restapi.png", + "capabilities": [ + "about", + "batch", + "catalog", + "discover", + "schema-flattening", + "state", + "stream-maps" + ], + "settings_group_validation": [ + [ + "api_url" + ] + ], + "settings": [ + { + "name": "access_token_url", + "kind": "string", + "label": "Access Token URL", + "description": "Used for the OAuth2 authentication method. This is the end-point for the authentication server used to exchange the authorization codes for a access token.", + "sensitive": true + }, + { + "name": "api_keys", + "kind": "object", + "label": "API Keys", + "description": "A object of API Key/Value pairs used by the api_key auth method Example: { X-API-KEY: my secret value}." + }, + { + "name": "api_url", + "kind": "string", + "label": "API URL", + "description": "The base url/endpoint for the desired api" + }, + { + "name": "auth_method", + "kind": "string", + "value": "no_auth", + "label": "Auth Method", + "description": "The method of authentication used by the API. Supported options include oauth: for OAuth2 authentication, basic: Basic Header authorization - base64-encoded username + password config items, api_key: for API Keys in the header e.g. X-API-KEY,bearer_token: for Bearer token authorization, aws: for AWS Authentication. Defaults to `no_auth` which will take authentication parameters passed via the headersconfig." + }, + { + "name": "aws_credentials", + "kind": "object", + "label": "AWS Credentials", + "description": "An object of aws credentials to authenticate to access AWS services. This example is to access the AWS OpenSearch service. Example: { aws_access_key_id: my_aws_key_id, aws_secret_access_key: my_aws_secret_access_key, aws_region: us-east-1, aws_service: es, use_signed_credentials: true}" + }, + { + "name": "backoff_param", + "kind": "string", + "value": "Retry-After", + "label": "Backoff Param", + "description": "The header parameter to inspect for a backoff time. Optional: Defaults to `Retry-After`." + }, + { + "name": "backoff_time_extension", + "kind": "integer", + "value": 0, + "label": "Backoff Time Extension", + "description": "An additional extension (seconds) to the backoff time over and above a jitter value - use where an API is not precise in its backoff times. Optional: Defaults to `0`." + }, + { + "name": "backoff_type", + "kind": "string", + "label": "Backoff Type", + "description": "The style of Backoff [message|header] applied to rate limited APIs. Backoff times (seconds) come from response either the `message` or `header`. Optional: Defaults to `None`." + }, + { + "name": "batch_config.encoding.compression", + "kind": "options", + "label": "Batch Config Encoding Compression", + "description": "Compression format to use for batch files.", + "options": [ + { + "label": "Gzip", + "value": "gzip" + }, + { + "label": "None", + "value": "none" + } + ] + }, + { + "name": "batch_config.encoding.format", + "kind": "options", + "label": "Batch Config Encoding Format", + "description": "Format to use for batch files.", + "options": [ + { + "label": "Jsonl", + "value": "jsonl" + } + ] + }, + { + "name": "batch_config.storage.prefix", + "kind": "string", + "label": "Batch Config Storage Prefix", + "description": "Prefix to use when writing batch files." + }, + { + "name": "batch_config.storage.root", + "kind": "string", + "label": "Batch Config Storage Root", + "description": "Root path to use when writing batch files." + }, + { + "name": "bearer_token", + "kind": "string", + "label": "Bearer Token", + "description": "Used for the Bearer Authentication method, which uses a token as part of the authorization header for authentication.", + "sensitive": true + }, + { + "name": "client_id", + "kind": "string", + "label": "Client ID", + "description": "Used for the OAuth2 authentication method. The public application ID that's assigned for Authentication. The client_id should accompany a client_secret.", + "sensitive": true + }, + { + "name": "client_secret", + "kind": "string", + "label": "Client Secret", + "description": "Used for the OAuth2 authentication method. The client_secret is a secret known only to the application and the authorization server. It is essential the application's own password.", + "sensitive": true + }, + { + "name": "except_keys", + "kind": "array", + "value": [], + "label": "Except Keys", + "description": "This tap automatically flattens the entire json structure and builds keys based on the corresponding paths. Keys, whether composite or otherwise, listed in this dictionary will not be recursively flattened, but instead their values will be; turned into a json string and processed in that format. This is also automatically done for any lists within the records; therefore, records are not duplicated for each item in lists." + }, + { + "name": "flattening_enabled", + "kind": "boolean", + "label": "Flattening Enabled", + "description": "'True' to enable schema flattening and automatically expand nested properties." + }, + { + "name": "flattening_max_depth", + "kind": "integer", + "label": "Flattening Max Depth", + "description": "The max depth to flatten schemas." + }, + { + "name": "grant_type", + "kind": "string", + "label": "Grant Type", + "description": "Used for the OAuth2 authentication method. The grant_type is required to describe the OAuth2 flow. Flows support by this tap include client_credentials, refresh_token, password." + }, + { + "name": "headers", + "kind": "object", + "label": "Headers", + "description": "An object of headers to pass into the api calls. Stream level headers will be merged with top-level params with streamlevel params overwriting top-level params with the same key." + }, + { + "name": "next_page_token_path", + "kind": "string", + "label": "Next Page Token Path", + "description": "A jsonpath string representing the path to the 'next page' token. Defaults to `$.next_page` for the `jsonpath_paginator` paginator only otherwise `None`.", + "sensitive": true + }, + { + "name": "num_inference_records", + "kind": "integer", + "value": 50, + "label": "Num Inference Records", + "description": "Number of records used to infer the stream's schema. Defaults to `50`." + }, + { + "name": "oauth_expiration_secs", + "kind": "integer", + "label": "OAuth Expiration Secs", + "description": "Used for OAuth2 authentication method. This optional setting is a timer for the expiration of a token in seconds. If not set the OAuth will use the default expiration set in the token by the authorization server." + }, + { + "name": "oauth_extras", + "kind": "object", + "label": "OAuth Extras", + "description": "A object of Key/Value pairs for additional oauth config parameters which may be required by the authorization server. Example: {resource: https://analysis.windows.net/powerbi/api}." + }, + { + "name": "pagination_limit_per_page_param", + "kind": "string", + "label": "Pagination Limit Per Page Param", + "description": "The name of the param that indicates the limit/per_page. Defaults to `None`." + }, + { + "name": "pagination_next_page_param", + "kind": "string", + "label": "Pagination Next Page Param", + "description": "The name of the param that indicates the page/offset. Defaults to `None`." + }, + { + "name": "pagination_page_size", + "kind": "integer", + "label": "Pagination Page Size", + "description": "The size of each page in records. Defaults to `None`." + }, + { + "name": "pagination_request_style", + "kind": "string", + "value": "default", + "label": "Pagination Request Style", + "description": "The pagination style to use for requests. Defaults to `default`." + }, + { + "name": "pagination_response_style", + "kind": "string", + "value": "default", + "label": "Pagination Response Style", + "description": "The pagination style to use for response. Defaults to `default`." + }, + { + "name": "pagination_results_limit", + "kind": "integer", + "label": "Pagination Results Limit", + "description": "Limits the max number of records. Defaults to `None`." + }, + { + "name": "pagination_total_limit_param", + "kind": "string", + "value": "total", + "label": "Pagination Total Limit Param", + "description": "The name of the param that indicates the total limit e.g. `total`, `count`. Defaults to `total`." + }, + { + "name": "params", + "kind": "object", + "value": {}, + "label": "Params", + "description": "An object providing the `params` in a `requests.get` method. Stream level params will be merged with top-level params with stream level params overwriting top-level params with the same key." + }, + { + "name": "password", + "kind": "string", + "label": "Password", + "description": "Used for a number of authentication methods that use a user password combination for authentication.", + "sensitive": true + }, + { + "name": "path", + "kind": "string", + "label": "Path", + "description": "The path appended to the `api_url`. Stream-level path will overwrite top-level path" + }, + { + "name": "primary_keys", + "kind": "array", + "label": "Primary Keys", + "description": "A list of the json keys of the primary key for the stream." + }, + { + "name": "records_path", + "kind": "string", + "label": "Records Path", + "description": "A jsonpath string representing the path in the requests response that contains the records to process. Defaults to `$[*]`. Stream level records_path will overwrite the top-level records_path" + }, + { + "name": "redirect_uri", + "kind": "string", + "label": "Redirect Uri", + "description": "Used for the OAuth2 authentication method. This is optional as the redirect_uri may be part of the token returned by the authentication server. If a redirect_uri is provided, it determines where the API server redirects the user after the user completes the authorization flow." + }, + { + "name": "refresh_token", + "kind": "string", + "label": "Refresh Token", + "description": "An OAuth2 Refresh Token is a string that the OAuth2 client can use to get a new access token without the user's interaction.", + "sensitive": true + }, + { + "name": "replication_key", + "kind": "string", + "label": "Replication Key", + "description": "The json response field representing the replication key. Note that this should be an incrementing integer or datetime object.", + "sensitive": true + }, + { + "name": "scope", + "kind": "string", + "label": "Scope", + "description": "Used for the OAuth2 authentication method. The scope is optional, it is a mechanism to limit the amount of access that is granted to an access token. One or more scopes can be provided delimited by a space." + }, + { + "name": "source_search_field", + "kind": "string", + "label": "Source Search Field", + "description": "An optional field name which can be used for querying specific records from supported API's. The intend for this parameter is to continue incrementally processing from a previous state. Example `last-updated`. Note: You must also set the replication_key, where the replication_key is json response representation of the API `source_search_field`. You should also supply the `source_search_query`, `replication_key` and `start_date`." + }, + { + "name": "source_search_query", + "kind": "string", + "label": "Source Search Query", + "description": "An optional query template to be issued against the API. Substitute the query field you are querying against with $last_run_date. At run-time, the tap will dynamically update the token with either the `start_date` or the last bookmark / state value. A simple template Example for FHIR APIs: gt$last_run_date. A more complex example against an Opensearch API, `\"{\\\"bool\\\": {\\\"filter\\\": [{\\\"range\\\": { \\\"meta.lastUpdated\\\": { \\\"gt\\\": \\\"$last_run_date\\\" }}}] }}\"`. Note: Any required double quotes in the query template must be escaped." + }, + { + "name": "start_date", + "kind": "date_iso8601", + "label": "Start Date", + "description": "An optional field. Normally required when using the replication_key. This is the initial starting date when using adate based replication key and there is no state available." + }, + { + "name": "store_raw_json_message", + "kind": "boolean", + "value": false, + "label": "Store Raw JSON Message", + "description": "An additional extension which will emit the whole message into an field. Optional: Defaults to `False`." + }, + { + "name": "stream_map_config", + "kind": "object", + "label": "Stream Map Config", + "description": "User-defined config values to be used within map expressions." + }, + { + "name": "stream_maps", + "kind": "object", + "label": "Stream Maps", + "description": "Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html)." + }, + { + "name": "streams", + "kind": "array", + "label": "Streams", + "description": "An array of streams, designed for separate paths using thesame base url.\n\nStream level config options.\n\nParameters that appear at the stream-level will overwrite their top-level counterparts except where noted below:\n\n- name: required: name of the stream.\n- path: optional: the path appended to the `api_url`.\n- params: optional: an object of objects that provide the `params` in a `requests.get` method. Stream level params will be merged with top-level params with stream level params overwriting top-level params with the same key.\n- headers: optional: an object of headers to pass into the api calls. Stream level headers will be merged with top-level params with stream level params overwriting top-level params with the same key\n- records_path: optional: a jsonpath string representing the path in the requests response that contains the records to process. Defaults to `$[*]`.\n- primary_keys: required: a list of the json keys of the primary key for the stream.\n- replication_key: optional: the json key of the replication key. Note that this should be an incrementing integer or datetime object.\n- except_keys: This tap automatically flattens the entire json structure and builds keys based on the corresponding paths. Keys, whether composite or otherwise, listed in this dictionary will not be recursively flattened, but instead their values will be turned into a json string and processed in that format. This is also automatically done for any lists within the records; therefore, records are not duplicated for each item in lists.\n- num_inference_keys: optional: number of records used to infer the stream's schema. Defaults to `50`.\n- schema: optional: A valid Singer schema or a path-like string that provides the path to a `.json` file that contains a valid Singer schema. If provided, the schema will not be inferred from the results of an api call.\n" + }, + { + "name": "use_request_body_not_params", + "kind": "boolean", + "value": false, + "label": "Use Request Body Not Params", + "description": "Sends the request parameters in the request body. This is normally not required, a few API's like OpenSearch require this. Defaults to `False`." + }, + { + "name": "username", + "kind": "string", + "label": "Username", + "description": "Used for a number of authentication methods that use a user password combination for authentication." + } + ] +} diff --git a/plugins/loaders/target-duckdb--jwills.lock b/plugins/loaders/target-duckdb--jwills.lock new file mode 100644 index 0000000..25028e2 --- /dev/null +++ b/plugins/loaders/target-duckdb--jwills.lock @@ -0,0 +1,136 @@ +{ + "plugin_type": "loaders", + "name": "target-duckdb", + "namespace": "target_duckdb", + "variant": "jwills", + "label": "DuckDB", + "docs": "https://hub.meltano.com/loaders/target-duckdb--jwills", + "repo": "https://github.com/jwills/target-duckdb", + "pip_url": "target-duckdb~=0.6", + "description": "DuckDB loader", + "logo_url": "https://hub.meltano.com/assets/logos/loaders/duckdb.png", + "settings_group_validation": [ + [ + "default_target_schema", + "filepath" + ] + ], + "settings": [ + { + "name": "add_metadata_columns", + "kind": "boolean", + "value": false, + "label": "Add Metadata Columns", + "description": "Metadata columns add extra row level information about data ingestions, (i.e. when was the row read in source, when was inserted or deleted in postgres etc.) Metadata columns are creating automatically by adding extra columns to the tables with a column prefix _SDC_. The column names are following the stitch naming conventions documented at https://www.stitchdata.com/docs/data-structure/integration-schemas#sdc-columns. Enabling metadata columns will flag the deleted rows by setting the _SDC_DELETED_AT metadata column. Without the add_metadata_columns option the deleted rows from singer taps will not be recognisable in DuckDB." + }, + { + "name": "batch_size_rows", + "kind": "integer", + "value": 100000, + "label": "Batch Size Rows", + "description": "Maximum number of rows in each batch. At the end of each batch, the rows in the batch are loaded into DuckDB." + }, + { + "name": "data_flattening_max_level", + "kind": "integer", + "value": 0, + "label": "Data Flattening Max Level", + "description": "Object type RECORD items from taps can be transformed to flattened columns by creating columns automatically.\n\nWhen value is 0 (default) then flattening functionality is turned off.\n" + }, + { + "name": "database", + "kind": "string", + "label": "Database name", + "description": "Alias of `dbname`." + }, + { + "name": "dbname", + "kind": "string", + "label": "Database", + "description": "The database name to write to; this will be inferred from the path property if it is not specified." + }, + { + "name": "default_target_schema", + "kind": "string", + "value": "$MELTANO_EXTRACT__LOAD_SCHEMA", + "label": "Default Target Schema", + "description": "Name of the schema where the tables will be created. If schema_mapping is not defined then every stream sent by the tap is loaded into this schema." + }, + { + "name": "delimiter", + "kind": "string", + "value": ",", + "label": "Delimiter", + "description": "The delimiter to use for the CSV files that are used for record imports." + }, + { + "name": "filepath", + "kind": "string", + "value": "${MELTANO_PROJECT_ROOT}/output/warehouse.duckdb", + "label": "File Path", + "description": "Alias of `path`.", + "placeholder": "/path/to/local/file.duckdb" + }, + { + "name": "flush_all_streams", + "kind": "boolean", + "value": false, + "label": "Flush All Streams", + "description": "Flush and load every stream into DuckDB when one batch is full. Warning - This may trigger the COPY command to use files with low number of records." + }, + { + "name": "hard_delete", + "kind": "boolean", + "value": false, + "label": "Hard Delete", + "description": "When hard_delete option is true then DELETE SQL commands will be performed in DuckDB to delete rows in tables. It's achieved by continuously checking the _SDC_DELETED_AT metadata column sent by the singer tap. Due to deleting rows requires metadata columns, hard_delete option automatically enables the add_metadata_columns option as well." + }, + { + "name": "path", + "kind": "string", + "label": "Connection Path", + "description": "The path to use for the `duckdb.connect` call; either a local file or a MotherDuck connection uri.", + "placeholder": "/path/to/local/file.duckdb" + }, + { + "name": "primary_key_required", + "kind": "boolean", + "value": true, + "label": "Primary Key Required", + "description": "Log based and Incremental replications on tables with no Primary Key cause duplicates when merging UPDATE events. When set to true, stop loading data if no Primary Key is defined." + }, + { + "name": "quotechar", + "kind": "string", + "value": "\"", + "label": "Quote Character", + "description": "The quote character to use for the CSV files that are used for record imports." + }, + { + "name": "schema_mapping", + "kind": "object", + "label": "schema_mapping", + "description": "Useful if you want to load multiple streams from one tap to multiple DuckDB schemas.\n\nIf the tap sends the stream_id in - format then this option overwrites the default_target_schema value.\n" + }, + { + "name": "temp_dir", + "kind": "string", + "label": "Temporary Directory", + "description": "Directory of temporary CSV files with RECORD messages." + }, + { + "name": "token", + "kind": "string", + "label": "Token", + "description": "For MotherDuck connections, the auth token to use.", + "sensitive": true + }, + { + "name": "validate_records", + "kind": "boolean", + "value": false, + "label": "Validate Records", + "description": "Validate every single record message to the corresponding JSON schema. This option is disabled by default and invalid RECORD messages will fail only at load time by DuckDB. Enabling this option will detect invalid records earlier but could cause performance degradation." + } + ] +} diff --git a/plugins/loaders/target-jsonl--andyh1203.lock b/plugins/loaders/target-jsonl--andyh1203.lock new file mode 100644 index 0000000..11fa0ba --- /dev/null +++ b/plugins/loaders/target-jsonl--andyh1203.lock @@ -0,0 +1,34 @@ +{ + "plugin_type": "loaders", + "name": "target-jsonl", + "namespace": "target_jsonl", + "variant": "andyh1203", + "label": "JSON Lines (JSONL)", + "docs": "https://hub.meltano.com/loaders/target-jsonl--andyh1203", + "repo": "https://github.com/andyh1203/target-jsonl", + "pip_url": "target-jsonl", + "description": "JSONL loader", + "logo_url": "https://hub.meltano.com/assets/logos/loaders/jsonl.png", + "settings": [ + { + "name": "destination_path", + "kind": "string", + "value": "output", + "label": "Destination Path", + "description": "Sets the destination path the JSONL files are written to, relative\nto the project root.\n\nThe directory needs to exist already, it will not be created\nautomatically.\n\nTo write JSONL files to the project root, set an empty string (`\"\"`).\n" + }, + { + "name": "do_timestamp_file", + "kind": "boolean", + "value": false, + "label": "Include Timestamp in File Names", + "description": "Specifies if the files should get timestamped.\n\nBy default, the resulting file will not have a timestamp in the file name (i.e. `exchange_rate.jsonl`).\n\nIf this option gets set to `true`, the resulting file will have a timestamp associated with it (i.e. `exchange_rate-{timestamp}.jsonl`).\n" + }, + { + "name": "custom_name", + "kind": "string", + "label": "Custom File Name Override", + "description": "Specifies a custom name for the filename, instead of the stream name.\n\nThe file name will be `{custom_name}-{timestamp}.jsonl`, if `do_timestamp_file` is `true`.\nOtherwise the file name will be `{custom_name}.jsonl`.\n\nIf custom name is not provided, the stream name will be used.\n" + } + ] +} diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..8e55606 --- /dev/null +++ b/setup.py @@ -0,0 +1,13 @@ +from setuptools import setup + +setup( + name="tap-gharchive", + version="0.1.0", + py_modules=["tap_gharchive"], + install_requires=[ + "tap-rest-api-msdk", + ], + entry_points={ + "console_scripts": ["tap-gharchive=tap_gharchive:TapGHArchive.cli"], + }, +) \ No newline at end of file diff --git a/tap_gharchive.py b/tap_gharchive.py new file mode 100644 index 0000000..4ac5bfd --- /dev/null +++ b/tap_gharchive.py @@ -0,0 +1,45 @@ +from singer_sdk import Tap, Stream +from singer_sdk import typing as th # JSON schema typing helpers +import gzip +import io +import json +import requests + +class GHArchiveStream(Stream): + name = "github_events" + path = "/2023-10-01-0.json.gz" + primary_keys = ["id"] + replication_key = None + schema = th.PropertiesList( + th.Property("type", th.StringType), + th.Property("actor", th.ObjectType()), + th.Property("repo", th.ObjectType()), + th.Property("created_at", th.StringType), + th.Property("org", th.ObjectType()), + th.Property("payload", th.ObjectType()), + ).to_dict() + + def get_records(self, context): + """Get records from the source.""" + url = "https://data.gharchive.org" + self.path + response = requests.get(url) + gzdata = gzip.GzipFile(fileobj=io.BytesIO(response.content)) + # Process NDJSON format - each line is a separate JSON object + for line in gzdata: + if line.strip(): # Skip empty lines + record = json.loads(line) + yield record + + def parse_response(self, response): + """Parse the response and return an iterator of result rows.""" + gzdata = gzip.GzipFile(fileobj=io.BytesIO(response.content)).read() + data = json.loads(gzdata) + for row in data: + yield row + +class TapGHArchive(Tap): + name = "tap-gharchive" + + def discover_streams(self): + """Return a list of discovered streams.""" + return [GHArchiveStream(self)] \ No newline at end of file