Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Funnels reorganization, persons pagination, and conversion window support #4810

Merged
merged 15 commits into from
Jun 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions ee/clickhouse/generate_local.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import uuid

from django.db import connection

from ee.clickhouse.client import sync_execute
from ee.clickhouse.models.event import create_event
from posthog.models import EventDefinition, Person, Team


class GenerateLocal:
team: Team
number: int

def __init__(self, team, number=250):
self.team = team
self.number = number

def generate(self):
self._insert_persons()
self._insert_event_definitions()
self._insert_events()

def destroy(self):
# You'll need to manually clean up the clickhouse database by:
# 1. docker compose -f ee/docker-compose.ch.yml down clickhouse zookeeper kafka
# 2. DEBUG=1;DJANGO_SETTINGS_MODULE=posthog.settings;PRIMARY_DB=clickhouse;CLICKHOUSE_HOST=clickhouse;CLICKHOUSE_DATABASE=posthog;CLICKHOUSE_SECURE=false;CLICKHOUSE_VERIFY=false python migrate.py migrate_clickhouse

with connection.cursor() as cursor:
cursor.execute("delete from posthog_persondistinctid where distinct_id like 'user_%%'")
cursor.execute("delete from posthog_person where properties->> 'name' like 'user_%'")
cursor.execute("delete from posthog_eventdefinition where name like 'step %'")

def _insert_event_definitions(self):
EventDefinition.objects.get_or_create(team=self.team, name="step one")
EventDefinition.objects.get_or_create(team=self.team, name="step two")
EventDefinition.objects.get_or_create(team=self.team, name="step three")
EventDefinition.objects.get_or_create(team=self.team, name="step four")
EventDefinition.objects.get_or_create(team=self.team, name="step five")

def _insert_persons(self):
for i in range(1, self.number + 1):
try:
person = Person.objects.create(
distinct_ids=[f"user_{i}"], team=self.team, properties={"name": f"user_{i}"}
)
self._insert_person_distinct_ids(f"user_{i}", person.uuid)
except Exception as e:
print(str(e))

def _insert_person_distinct_ids(self, distinct_id, person_uuid):
sql = f"""
insert into person_distinct_id (distinct_id, person_id, team_id, _timestamp) values
('{distinct_id}', '{person_uuid}', '{self.team.id}', now());
"""

sync_execute(sql)

def _insert_events(self):
step_one = self.number + 1
step_two = round(step_one / 2)
step_three = round(step_one / 3)
step_four = round(step_one / 4)
step_five = round(step_one / 5)

for i in range(1, step_one):
create_event(uuid.uuid4(), "step one", self.team, f"user_{i}", "2021-05-01 00:00:00")
for i in range(1, step_two):
create_event(uuid.uuid4(), "step two", self.team, f"user_{i}", "2021-05-03 00:00:00")
for i in range(1, step_three):
create_event(uuid.uuid4(), "step three", self.team, f"user_{i}", "2021-05-05 00:00:00")
for i in range(1, step_four):
create_event(uuid.uuid4(), "step four", self.team, f"user_{i}", "2021-05-07 00:00:00")
for i in range(1, step_five):
create_event(uuid.uuid4(), "step five", self.team, f"user_{i}", "2021-05-09 00:00:00")
3 changes: 2 additions & 1 deletion ee/clickhouse/queries/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .clickhouse_funnel import ClickhouseFunnel
from ee.clickhouse.queries.funnels.funnel import ClickhouseFunnel

from .clickhouse_paths import ClickhousePaths
from .clickhouse_retention import ClickhouseRetention
from .clickhouse_session_recording import SessionRecording
Expand Down
75 changes: 0 additions & 75 deletions ee/clickhouse/queries/clickhouse_funnel.py

This file was deleted.

44 changes: 0 additions & 44 deletions ee/clickhouse/queries/clickhouse_funnel_base.py

This file was deleted.

Empty file.
122 changes: 122 additions & 0 deletions ee/clickhouse/queries/funnels/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
from abc import ABC, abstractmethod
from typing import List, Tuple

from django.utils import timezone

from ee.clickhouse.client import sync_execute
from ee.clickhouse.models.action import format_action_filter
from ee.clickhouse.models.property import parse_prop_clauses
from ee.clickhouse.queries.util import parse_timestamps
from ee.clickhouse.sql.person import GET_LATEST_PERSON_DISTINCT_ID_SQL
from posthog.constants import TREND_FILTER_TYPE_ACTIONS
from posthog.models import Action, Entity, Filter, Team
from posthog.models.filters.mixins.funnel_window_days import FunnelWindowDaysMixin
from posthog.queries.funnel import Funnel
from posthog.utils import relative_date_parse


class ClickhouseFunnelBase(ABC, Funnel):
_filter: Filter
_team: Team

def __init__(self, filter: Filter, team: Team) -> None:
self._filter = filter
self._team = team
self.params = {
"team_id": self._team.pk,
"events": [], # purely a speed optimization, don't need this for filtering
}

def run(self, *args, **kwargs):
if len(self._filter.entities) == 0:
return []

results = self._exec_query()
return self._format_results(results)

def _format_results(self, results):
# Format of this is [step order, person count (that reached that step), array of person uuids]
steps = []
relevant_people = []
total_people = 0

for step in reversed(self._filter.entities):
# Clickhouse step order starts at one, hence the +1
result_step = [x for x in results if step.order + 1 == x[0]]
if len(result_step) > 0:
total_people += result_step[0][1]
relevant_people += result_step[0][2]
steps.append(self._serialize_step(step, total_people, relevant_people[0:100]))

return steps[::-1] #  reverse

def _exec_query(self) -> List[Tuple]:
prop_filters, prop_filter_params = parse_prop_clauses(
self._filter.properties,
self._team.pk,
prepend="global",
allow_denormalized_props=True,
filter_test_accounts=self._filter.filter_test_accounts,
)

# format default dates
data = {}
if not self._filter._date_from:
data.update({"date_from": relative_date_parse("-7d")})
if not self._filter._date_to:
data.update({"date_to": timezone.now()})
self._filter = self._filter.with_data(data)

parsed_date_from, parsed_date_to, _ = parse_timestamps(
filter=self._filter, table="events.", team_id=self._team.pk
)
self.params.update(prop_filter_params)
steps = [self._build_steps_query(entity, index) for index, entity in enumerate(self._filter.entities)]

format_properties = {
"team_id": self._team.id,
"steps": ", ".join(steps),
"filters": prop_filters.replace("uuid IN", "events.uuid IN", 1),
"parsed_date_from": parsed_date_from,
"parsed_date_to": parsed_date_to,
"top_level_groupby": "",
"extra_select": "",
"extra_groupby": "",
"within_time": FunnelWindowDaysMixin.microseconds_from_days(self._filter.funnel_window_days),
"latest_distinct_id_sql": GET_LATEST_PERSON_DISTINCT_ID_SQL,
"offset": self._filter.offset,
}

query = self.get_query(format_properties)

return sync_execute(query, self.params)

def _build_steps_query(self, entity: Entity, index: int) -> str:
filters = self._build_filters(entity, index)
if entity.type == TREND_FILTER_TYPE_ACTIONS:
action = Action.objects.get(pk=entity.id)
for action_step in action.steps.all():
self.params["events"].append(action_step.event)
action_query, action_params = format_action_filter(action, "step_{}".format(index))
if action_query == "":
return ""

self.params.update(action_params)
content_sql = "{actions_query} {filters}".format(actions_query=action_query, filters=filters,)
else:
self.params["events"].append(entity.id)
content_sql = "event = '{event}' {filters}".format(event=entity.id, filters=filters)
return content_sql

def _build_filters(self, entity: Entity, index: int) -> str:
prop_filters, prop_filter_params = parse_prop_clauses(
entity.properties, self._team.pk, prepend=str(index), allow_denormalized_props=True
)
self.params.update(prop_filter_params)
if entity.properties:
return prop_filters
return ""

@abstractmethod
def get_query(self, format_properties):
pass
7 changes: 7 additions & 0 deletions ee/clickhouse/queries/funnels/funnel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from ee.clickhouse.queries.funnels.base import ClickhouseFunnelBase
from ee.clickhouse.sql.funnels.funnel import FUNNEL_SQL


class ClickhouseFunnel(ClickhouseFunnelBase):
def get_query(self, format_properties):
return FUNNEL_SQL.format(**format_properties)
15 changes: 15 additions & 0 deletions ee/clickhouse/queries/funnels/funnel_persons.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from ee.clickhouse.queries.funnels.base import ClickhouseFunnelBase
from ee.clickhouse.sql.funnels.funnel import FUNNEL_PERSONS_SQL
from posthog.models import Person


class ClickhouseFunnelPersons(ClickhouseFunnelBase):
def get_query(self, format_properties):
return FUNNEL_PERSONS_SQL.format(**format_properties)

def _format_results(self, results):
formatted_results = []
for row in results:
distinct_ids, email = Person.get_distinct_ids_and_email_by_id(row[1], self._team.id)
formatted_results.append({"max_step": row[0], "distinct_ids": distinct_ids, "email": email})
return formatted_results
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from ee.clickhouse.client import sync_execute
from ee.clickhouse.models.property import parse_prop_clauses
from ee.clickhouse.queries.clickhouse_funnel_base import ClickhouseFunnelBase
from ee.clickhouse.queries.funnels.base import ClickhouseFunnelBase
from ee.clickhouse.queries.util import get_time_diff, get_trunc_func_ch, parse_timestamps
from ee.clickhouse.sql.events import NULL_SQL_FUNNEL_TRENDS
from ee.clickhouse.sql.funnels.funnel_trend import FUNNEL_TREND_SQL
Expand All @@ -18,6 +18,9 @@

class ClickhouseFunnelTrends(ClickhouseFunnelBase):
def run(self):
if len(self._filter.entities) == 0:
return []

summary = self.perform_query()
ui_response = self._get_ui_response(summary)
return ui_response
Expand Down Expand Up @@ -124,3 +127,6 @@ def _determine_complete(self, timestamp):
compare_timestamp = timestamp.date() if type(timestamp) is datetime else timestamp
is_incomplete = compare_timestamp > completed_end
return not is_incomplete

def get_query(self, format_properties):
pass
6 changes: 6 additions & 0 deletions ee/clickhouse/queries/funnels/funnel_trends_persons.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from ee.clickhouse.queries.funnels.base import ClickhouseFunnelBase


class ClickhouseFunnelTrendsPersons(ClickhouseFunnelBase):
def get_query(self, format_properties):
pass
Empty file.
Loading