Skip to content

Commit

Permalink
Add DML samples for BigQuery. (#546)
Browse files Browse the repository at this point in the history
Add BigQuery DML sample for inserts.

This sample reads a SQL file (for example: one that was output from
mysqldump) and executes each line as a query. At least in my
configuration of mysqldump, each insert statement was on a single line,
so I was able to write data from MySQL to BigQuery with this sample.
  • Loading branch information
tswast authored and Jon Wayne Parrott committed Sep 28, 2016
1 parent de3cf80 commit 0f85ad0
Show file tree
Hide file tree
Showing 9 changed files with 356 additions and 0 deletions.
1 change: 1 addition & 0 deletions bigquery/dml/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
sample_db_export.sql
15 changes: 15 additions & 0 deletions bigquery/dml/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# BigQuery DML Samples

<!-- auto-doc-link -->
These samples are used on the following documentation page:

> https://cloud.google.combigquery/docs/loading-data-sql-dml
<!-- end-auto-doc-link -->

To create a test database, run the `populate_db.py` script.

```
python populate_db.py 100 localhost root 'mysql-password' sample_db
```

78 changes: 78 additions & 0 deletions bigquery/dml/insert_sql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
#!/usr/bin/env python

# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Sample that runs a file containing INSERT SQL statements in Big Query.
This could be used to run the INSERT statements in a mysqldump output such as
mysqldump --user=root \
--password='secret-password' \
--host=127.0.0.1 \
--no-create-info sample_db \
--skip-add-locks > sample_db_export.sql
To run, first create tables with the same names and columns as the sample
database. Then run this script.
python insert_sql.py my-project my_dataset sample_db_export.sql
"""

# [START insert_sql]
import argparse

from gcloud import bigquery


def insert_sql(project, default_dataset, sql_path):
"""Run all the SQL statements in a SQL file."""

client = bigquery.Client(project=project)

with open(sql_path) as f:
for line in f:
line = line.strip()

if not line.startswith('INSERT'):
continue

print('Running query: {}{}'.format(
line[:60],
'...' if len(line) > 60 else ''))
query = client.run_sync_query(line)

# Set use_legacy_sql to False to enable standard SQL syntax.
# This is required to use the Data Manipulation Language features.
#
# For more information about enabling standard SQL, see:
# https://cloud.google.com/bigquery/sql-reference/enabling-standard-sql
query.use_legacy_sql = False
query.default_dataset = client.dataset(default_dataset)
query.run()


if __name__ == "__main__":
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument('project', help='Google Cloud project name')
parser.add_argument(
'default_dataset', help='Default BigQuery dataset name')
parser.add_argument('sql_path', help='Path to SQL file')

args = parser.parse_args()

insert_sql(args.project, args.default_dataset, args.sql_path)
# [END insert_sql]
32 changes: 32 additions & 0 deletions bigquery/dml/insert_sql_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os.path

from insert_sql import insert_sql


def test_insert_sql(cloud_config, capsys):
sql_path = os.path.join(
os.path.dirname(__file__),
'resources',
'insert_sql_test.sql')

insert_sql(cloud_config.project, 'test_dataset', sql_path)

out, _ = capsys.readouterr()

assert (
'INSERT INTO `test_table` (`Name`) VALUES (\'hello world\')'
in out)
182 changes: 182 additions & 0 deletions bigquery/dml/populate_db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
#!/usr/bin/env python

# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Command-line tool to simulate user actions and write to SQL database.
"""

from __future__ import division

import argparse
import datetime
import random
import uuid

from six.moves.urllib import parse
import sqlalchemy
from sqlalchemy.ext import declarative
import sqlalchemy.orm


SECONDS_IN_DAY = 24 * 60 * 60
SECONDS_IN_2016 = 366 * SECONDS_IN_DAY

# Unix timestamp for the beginning of 2016.
# http://stackoverflow.com/a/19801806/101923
TIMESTAMP_2016 = (
datetime.datetime(2016, 1, 1, 0, 0, 0) -
datetime.datetime.fromtimestamp(0)).total_seconds()


Base = declarative.declarative_base()


class User(Base):
__tablename__ = 'Users'

id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True)
date_joined = sqlalchemy.Column(sqlalchemy.DateTime)


class UserSession(Base):
__tablename__ = 'UserSessions'

id = sqlalchemy.Column(sqlalchemy.String(length=36), primary_key=True)
user_id = sqlalchemy.Column(
sqlalchemy.Integer, sqlalchemy.ForeignKey('Users.id'))
login_time = sqlalchemy.Column(sqlalchemy.DateTime)
logout_time = sqlalchemy.Column(sqlalchemy.DateTime)
ip_address = sqlalchemy.Column(sqlalchemy.String(length=40))


def generate_users(session, num_users):
users = []

for userid in range(1, num_users + 1):
year_portion = random.random()
date_joined = datetime.datetime.fromtimestamp(
TIMESTAMP_2016 + SECONDS_IN_2016 * year_portion)
user = User(id=userid, date_joined=date_joined)
users.append(user)
session.add(user)

session.commit()
return users


def random_ip():
"""Choose a random example IP address.
Examples are chosen from the test networks described in
https://tools.ietf.org/html/rfc5737
"""
network = random.choice([
'192.0.2', # RFC-5737 TEST-NET-1
'198.51.100', # RFC-5737 TEST-NET-2
'203.0.113', # RFC-5737 TEST-NET-3
])
ip_address = '{}.{}'.format(network, random.randrange(256))
return ip_address


def simulate_user_session(session, user, previous_user_session=None):
"""Simulates a single session (login to logout) of a user's history."""
login_time = user.date_joined

if previous_user_session is not None:
login_time = (
previous_user_session.logout_time +
datetime.timedelta(
days=1, seconds=random.randrange(SECONDS_IN_DAY)))

session_id = str(uuid.uuid4())
user_session = UserSession(
id=session_id,
user_id=user.id,
login_time=login_time,
ip_address=random_ip())
user_session.logout_time = (
login_time +
datetime.timedelta(seconds=(1 + random.randrange(59))))
session.commit()
session.add(user_session)
return user_session


def simulate_user_history(session, user):
"""Simulates the entire history of activity for a single user."""
total_sessions = random.randrange(10)
previous_user_session = None

for _ in range(total_sessions):
user_session = simulate_user_session(
session, user, previous_user_session)
previous_user_session = user_session


def run_simulation(session, users):
"""Simulates app activity for all users."""

for n, user in enumerate(users):
if n % 100 == 0 and n != 0:
print('Simulated data for {} users'.format(n))

simulate_user_history(session, user)

print('COMPLETE: Simulated data for {} users'.format(len(users)))


def populate_db(session, total_users=3):
"""Populate database with total_users simulated users and their actions."""
users = generate_users(session, total_users)
run_simulation(session, users)


def create_session(engine):
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)
Session = sqlalchemy.orm.sessionmaker(bind=engine)
return Session()


def main(total_users, host, user, password, db_name):
engine = sqlalchemy.create_engine(
'mysql+pymysql://{user}:{password}@{host}/{db_name}'.format(
user=user,
password=parse.quote_plus(password),
host=host,
db_name=db_name))
session = create_session(engine)

try:
populate_db(session, total_users)
finally:
session.close()


if __name__ == '__main__':
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument(
'total_users', help='How many simulated users to create.', type=int)
parser.add_argument('host', help='Host of the database to write to.')
parser.add_argument('user', help='User to connect to the database.')
parser.add_argument('password', help='Password for the database user.')
parser.add_argument('db', help='Name of the database to write to.')

args = parser.parse_args()

main(args.total_users, args.host, args.user, args.password, args.db)
34 changes: 34 additions & 0 deletions bigquery/dml/populate_db_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import sqlalchemy

from populate_db import create_session, populate_db


def test_populate_db_populates_users():
engine = sqlalchemy.create_engine('sqlite://')
session = create_session(engine)

try:
populate_db(session, total_users=10)

connection = session.connection().connection
cursor = connection.cursor()
cursor.execute('SELECT COUNT(*) FROM Users')
assert cursor.fetchone()[0] == 10
cursor.execute('SELECT COUNT(*) FROM UserSessions')
assert cursor.fetchone()[0] >= 10
finally:
session.close()
5 changes: 5 additions & 0 deletions bigquery/dml/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
flake8==3.0.4
gcloud==0.18.1
PyMySQL==0.7.7
six==1.10.0
SQLAlchemy==1.0.15
6 changes: 6 additions & 0 deletions bigquery/dml/resources/insert_sql_test.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
-- This file is used to test ../insert_sql.py.
-- These are comments.
-- Each query to be executed should be on a single line.

/* Another ignored line. */
INSERT INTO `test_table` (`Name`) VALUES ('hello world')
3 changes: 3 additions & 0 deletions scripts/resources/docs-links.json
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,9 @@
"/bigquery/docs/data": [
"bigquery/api/sync_query.py"
],
"bigquery/docs/loading-data-sql-dml": [
"bigquery/dml/insert_sql.py"
],
"/appengine/docs/python/memcache/examples": [
"appengine/memcache/snippets/snippets.py",
"appengine/memcache/guestbook/main.py"
Expand Down

0 comments on commit 0f85ad0

Please sign in to comment.