Skip to content

Commit

Permalink
Merge pull request #273 from analyst-collective/fix/tracking-import
Browse files Browse the repository at this point in the history
scope all tracking state to fix imported dbt runs
  • Loading branch information
drewbanin authored Feb 8, 2017
2 parents a3123ee + 05d4472 commit 2677f03
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 66 deletions.
9 changes: 8 additions & 1 deletion dbt/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ def handle(args):
# correct profiles.yml file
if not config.send_anonymous_usage_stats(parsed.profiles_dir):
dbt.tracking.do_not_track()
else:
dbt.tracking.initialize_tracking()

res = run_from_args(parsed)
dbt.tracking.flush()
Expand Down Expand Up @@ -92,10 +94,13 @@ def run_from_args(parsed):
log_path = proj.get('log-path', 'logs')

initialize_logger(parsed.debug, log_path)
logger.debug("Tracking: {}".format(dbt.tracking.active_user.state()))

dbt.tracking.track_invocation_start(project=proj, args=parsed)

result = None
try:
return task.run()
result = task.run()
dbt.tracking.track_invocation_end(
project=proj, args=parsed, result_type="ok", result=None
)
Expand All @@ -110,6 +115,8 @@ def run_from_args(parsed):
)
raise

return result


def invoke_dbt(parsed):
task = None
Expand Down
5 changes: 3 additions & 2 deletions dbt/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ def call_table_exists(schema, table):

self.context = {
"run_started_at": datetime.now(),
"invocation_id": dbt.tracking.invocation_id,
"invocation_id": dbt.tracking.active_user.invocation_id,
"get_columns_in_table": call_get_columns_in_table,
"get_missing_columns": call_get_missing_columns,
"already_exists": call_table_exists,
Expand Down Expand Up @@ -575,8 +575,9 @@ def on_complete(run_model_results):
run_model_result.execution_time
)

invocation_id = dbt.tracking.active_user.invocation_id
dbt.tracking.track_model_run({
"invocation_id": dbt.tracking.invocation_id,
"invocation_id": invocation_id,
"index": index,
"total": num_models,
"execution_time": run_model_result.execution_time,
Expand Down
170 changes: 107 additions & 63 deletions dbt/tracking.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,35 +31,56 @@
emitter = Emitter(COLLECTOR_URL, protocol=COLLECTOR_PROTOCOL, buffer_size=1)
tracker = Tracker(emitter, namespace="cf", app_id="dbt")

active_user = None

def __write_user():
user = {
"id": str(uuid.uuid4())
}

cookie_dir = os.path.dirname(COOKIE_PATH)
if not os.path.exists(cookie_dir):
os.makedirs(cookie_dir)
class User(object):

with open(COOKIE_PATH, "w") as fh:
yaml.dump(user, fh)
def __init__(self):
self.do_not_track = True

return user
self.id = None
self.invocation_id = None

def state(self):
return "do not track" if self.do_not_track else "tracking"

def get_user():
if os.path.isfile(COOKIE_PATH):
with open(COOKIE_PATH, "r") as fh:
try:
user = yaml.safe_load(fh)
if user is None:
user = __write_user()
except yaml.reader.ReaderError as e:
user = __write_user()
else:
user = __write_user()
def initialize(self):
self.do_not_track = False

self.invocation_id = str(uuid.uuid4())

cookie = self.get_cookie()
self.id = cookie.get('id')

subject = Subject()
subject.set_user_id(self.id)
tracker.set_subject(subject)

def set_cookie(self):
cookie_dir = os.path.dirname(COOKIE_PATH)
user = {"id": str(uuid.uuid4())}

if not os.path.exists(cookie_dir):
os.makedirs(cookie_dir)

return user
with open(COOKIE_PATH, "w") as fh:
yaml.dump(user, fh)

return user

def get_cookie(self):
if not os.path.isfile(COOKIE_PATH):
user = self.set_cookie()
else:
with open(COOKIE_PATH, "r") as fh:
try:
user = yaml.safe_load(fh)
if user is None:
user = self.set_cookie()
except yaml.reader.ReaderError as e:
user = self.set_cookie()
return user


def get_options(args):
Expand All @@ -75,11 +96,11 @@ def get_run_type(args):
return 'regular'


def get_invocation_context(invocation_id, user, project, args):
def get_invocation_context(user, project, args):
return {
"project_id": None if project is None else project.hashed_name(),
"user_id": user.get("id", None),
"invocation_id": invocation_id,
"user_id": user.id,
"invocation_id": user.invocation_id,

"command": args.which,
"options": get_options(args),
Expand All @@ -89,8 +110,8 @@ def get_invocation_context(invocation_id, user, project, args):
}


def get_invocation_start_context(invocation_id, user, project, args):
data = get_invocation_context(invocation_id, user, project, args)
def get_invocation_start_context(user, project, args):
data = get_invocation_context(user, project, args)

start_data = {
"progress": "start",
Expand All @@ -102,10 +123,8 @@ def get_invocation_start_context(invocation_id, user, project, args):
return SelfDescribingJson(INVOCATION_SPEC, data)


def get_invocation_end_context(
invocation_id, user, project, args, result_type, result
):
data = get_invocation_context(invocation_id, user, project, args)
def get_invocation_end_context(user, project, args, result_type, result):
data = get_invocation_context(user, project, args)

start_data = {
"progress": "end",
Expand All @@ -117,10 +136,8 @@ def get_invocation_end_context(
return SelfDescribingJson(INVOCATION_SPEC, data)


def get_invocation_invalid_context(
invocation_id, user, project, args, result_type, result
):
data = get_invocation_context(invocation_id, user, project, args)
def get_invocation_invalid_context(user, project, args, result_type, result):
data = get_invocation_context(user, project, args)

start_data = {
"progress": "invalid",
Expand Down Expand Up @@ -155,20 +172,9 @@ def get_dbt_env_context():

return SelfDescribingJson(INVOCATION_ENV_SPEC, data)

invocation_id = str(uuid.uuid4())
platform_context = get_platform_context()
env_context = get_dbt_env_context()

user = get_user()
subject = Subject()
subject.set_user_id(user.get("id", None))
tracker.set_subject(subject)

__is_do_not_track = False


def track(*args, **kwargs):
if __is_do_not_track:
def track(user, *args, **kwargs):
if user.do_not_track:
return
else:
logger.debug("Sending event: {}".format(kwargs))
Expand All @@ -181,43 +187,76 @@ def track(*args, **kwargs):


def track_invocation_start(project=None, args=None):
invocation_context = get_invocation_start_context(
invocation_id, user, project, args
context = [
get_invocation_start_context(active_user, project, args),
get_platform_context(),
get_dbt_env_context()
]

track(
active_user,
category="dbt",
action='invocation',
label='start',
context=context
)
context = [invocation_context, platform_context, env_context]
track(category="dbt", action='invocation', label='start', context=context)


def track_model_run(options):
context = [SelfDescribingJson(RUN_MODEL_SPEC, options)]
model_id = options['model_id']
track(
active_user,
category="dbt",
action='run_model',
label=invocation_id,
label=active_user.invocation_id,
context=context
)


def track_invocation_end(
project=None, args=None, result_type=None, result=None
):
invocation_context = get_invocation_end_context(
invocation_id, user, project, args, result_type, result
user = active_user
context = [
get_invocation_end_context(user, project, args, result_type, result),
get_platform_context(),
get_dbt_env_context()
]
track(
active_user,
category="dbt",
action='invocation',
label='end',
context=context
)
context = [invocation_context, platform_context, env_context]
track(category="dbt", action='invocation', label='end', context=context)


def track_invalid_invocation(
project=None, args=None, result_type=None, result=None
):

user = active_user
invocation_context = get_invocation_invalid_context(
invocation_id, user, project, args, result_type, result
user,
project,
args,
result_type,
result
)
context = [invocation_context, platform_context, env_context]

context = [
invocation_context,
get_platform_context(),
get_dbt_env_context()
]

track(
category="dbt", action='invocation', label='invalid', context=context
active_user,
category="dbt",
action='invocation',
label='invalid',
context=context
)


Expand All @@ -227,6 +266,11 @@ def flush():


def do_not_track():
global __is_do_not_track
logger.debug("Not sending anonymous usage events")
__is_do_not_track = True
global active_user
active_user = User()


def initialize_tracking():
global active_user
active_user = User()
active_user.initialize()

0 comments on commit 2677f03

Please sign in to comment.