diff --git a/ee/clickhouse/generate_local.py b/ee/clickhouse/generate_local.py new file mode 100644 index 0000000000000..257a179f9acb3 --- /dev/null +++ b/ee/clickhouse/generate_local.py @@ -0,0 +1,74 @@ +import uuid + +from django.db import connection + +from ee.clickhouse.client import sync_execute +from ee.clickhouse.models.event import create_event +from posthog.models import EventDefinition, Person, Team + + +class GenerateLocal: + team: Team + number: int + + def __init__(self, team, number=250): + self.team = team + self.number = number + + def generate(self): + self._insert_persons() + self._insert_event_definitions() + self._insert_events() + + def destroy(self): + # You'll need to manually clean up the clickhouse database by: + # 1. docker compose -f ee/docker-compose.ch.yml down clickhouse zookeeper kafka + # 2. DEBUG=1;DJANGO_SETTINGS_MODULE=posthog.settings;PRIMARY_DB=clickhouse;CLICKHOUSE_HOST=clickhouse;CLICKHOUSE_DATABASE=posthog;CLICKHOUSE_SECURE=false;CLICKHOUSE_VERIFY=false python migrate.py migrate_clickhouse + + with connection.cursor() as cursor: + cursor.execute("delete from posthog_persondistinctid where distinct_id like 'user_%%'") + cursor.execute("delete from posthog_person where properties->> 'name' like 'user_%'") + cursor.execute("delete from posthog_eventdefinition where name like 'step %'") + + def _insert_event_definitions(self): + EventDefinition.objects.get_or_create(team=self.team, name="step one") + EventDefinition.objects.get_or_create(team=self.team, name="step two") + EventDefinition.objects.get_or_create(team=self.team, name="step three") + EventDefinition.objects.get_or_create(team=self.team, name="step four") + EventDefinition.objects.get_or_create(team=self.team, name="step five") + + def _insert_persons(self): + for i in range(1, self.number + 1): + try: + person = Person.objects.create( + distinct_ids=[f"user_{i}"], team=self.team, properties={"name": f"user_{i}"} + ) + self._insert_person_distinct_ids(f"user_{i}", person.uuid) + except Exception as e: + print(str(e)) + + def _insert_person_distinct_ids(self, distinct_id, person_uuid): + sql = f""" + insert into person_distinct_id (distinct_id, person_id, team_id, _timestamp) values + ('{distinct_id}', '{person_uuid}', '{self.team.id}', now()); + """ + + sync_execute(sql) + + def _insert_events(self): + step_one = self.number + 1 + step_two = round(step_one / 2) + step_three = round(step_one / 3) + step_four = round(step_one / 4) + step_five = round(step_one / 5) + + for i in range(1, step_one): + create_event(uuid.uuid4(), "step one", self.team, f"user_{i}", "2021-05-01 00:00:00") + for i in range(1, step_two): + create_event(uuid.uuid4(), "step two", self.team, f"user_{i}", "2021-05-03 00:00:00") + for i in range(1, step_three): + create_event(uuid.uuid4(), "step three", self.team, f"user_{i}", "2021-05-05 00:00:00") + for i in range(1, step_four): + create_event(uuid.uuid4(), "step four", self.team, f"user_{i}", "2021-05-07 00:00:00") + for i in range(1, step_five): + create_event(uuid.uuid4(), "step five", self.team, f"user_{i}", "2021-05-09 00:00:00") diff --git a/ee/clickhouse/queries/__init__.py b/ee/clickhouse/queries/__init__.py index 15e5acf5b26b5..4da9ab069eb4f 100644 --- a/ee/clickhouse/queries/__init__.py +++ b/ee/clickhouse/queries/__init__.py @@ -1,4 +1,5 @@ -from .clickhouse_funnel import ClickhouseFunnel +from ee.clickhouse.queries.funnels.funnel import ClickhouseFunnel + from .clickhouse_paths import ClickhousePaths from .clickhouse_retention import ClickhouseRetention from .clickhouse_session_recording import SessionRecording diff --git a/ee/clickhouse/queries/clickhouse_funnel.py b/ee/clickhouse/queries/clickhouse_funnel.py deleted file mode 100644 index c7daac8ebc220..0000000000000 --- a/ee/clickhouse/queries/clickhouse_funnel.py +++ /dev/null @@ -1,75 +0,0 @@ -from typing import Any, Dict, List, Tuple - -from django.utils import timezone - -from ee.clickhouse.client import format_sql, sync_execute -from ee.clickhouse.models.property import parse_prop_clauses -from ee.clickhouse.queries.clickhouse_funnel_base import ClickhouseFunnelBase -from ee.clickhouse.queries.clickhouse_funnel_trends import ClickhouseFunnelTrends -from ee.clickhouse.queries.util import parse_timestamps -from ee.clickhouse.sql.funnels.funnel import FUNNEL_SQL -from ee.clickhouse.sql.person import GET_LATEST_PERSON_DISTINCT_ID_SQL -from posthog.constants import TRENDS_LINEAR -from posthog.utils import relative_date_parse - - -class ClickhouseFunnel(ClickhouseFunnelBase): - def run(self, *args, **kwargs) -> List[Dict[str, Any]]: - if len(self._filter.entities) == 0: - return [] - - if self._filter.display == TRENDS_LINEAR: - return ClickhouseFunnelTrends(self._filter, self._team).run() - else: - # Format of this is [step order, person count (that reached that step), array of person uuids] - results = self._exec_query() - - steps = [] - relevant_people = [] - total_people = 0 - - for step in reversed(self._filter.entities): - # Clickhouse step order starts at one, hence the +1 - result_step = [x for x in results if step.order + 1 == x[0]] - if len(result_step) > 0: - total_people += result_step[0][1] - relevant_people += result_step[0][2] - steps.append(self._serialize_step(step, total_people, relevant_people[0:100])) - - return steps[::-1] #  reverse - - def _exec_query(self) -> List[Tuple]: - prop_filters, prop_filter_params = parse_prop_clauses( - self._filter.properties, - self._team.pk, - prepend="global", - allow_denormalized_props=True, - filter_test_accounts=self._filter.filter_test_accounts, - ) - - # format default dates - data = {} - if not self._filter._date_from: - data.update({"date_from": relative_date_parse("-7d")}) - if not self._filter._date_to: - data.update({"date_to": timezone.now()}) - self._filter = self._filter.with_data(data) - - parsed_date_from, parsed_date_to, _ = parse_timestamps( - filter=self._filter, table="events.", team_id=self._team.pk - ) - self.params.update(prop_filter_params) - steps = [self._build_steps_query(entity, index) for index, entity in enumerate(self._filter.entities)] - query = FUNNEL_SQL.format( - team_id=self._team.id, - steps=", ".join(steps), - filters=prop_filters.replace("uuid IN", "events.uuid IN", 1), - parsed_date_from=parsed_date_from, - parsed_date_to=parsed_date_to, - top_level_groupby="", - extra_select="", - extra_groupby="", - within_time="6048000000000000", - latest_distinct_id_sql=GET_LATEST_PERSON_DISTINCT_ID_SQL, - ) - return sync_execute(query, self.params) diff --git a/ee/clickhouse/queries/clickhouse_funnel_base.py b/ee/clickhouse/queries/clickhouse_funnel_base.py deleted file mode 100644 index c89fb4374a546..0000000000000 --- a/ee/clickhouse/queries/clickhouse_funnel_base.py +++ /dev/null @@ -1,44 +0,0 @@ -from ee.clickhouse.models.action import format_action_filter -from ee.clickhouse.models.property import parse_prop_clauses -from posthog.constants import TREND_FILTER_TYPE_ACTIONS -from posthog.models import Action, Entity, Filter, Team -from posthog.queries.funnel import Funnel - - -class ClickhouseFunnelBase(Funnel): - _filter: Filter - _team: Team - - def __init__(self, filter: Filter, team: Team) -> None: - self._filter = filter - self._team = team - self.params = { - "team_id": self._team.pk, - "events": [], # purely a speed optimization, don't need this for filtering - } - - def _build_steps_query(self, entity: Entity, index: int) -> str: - filters = self._build_filters(entity, index) - if entity.type == TREND_FILTER_TYPE_ACTIONS: - action = Action.objects.get(pk=entity.id) - for action_step in action.steps.all(): - self.params["events"].append(action_step.event) - action_query, action_params = format_action_filter(action, "step_{}".format(index)) - if action_query == "": - return "" - - self.params.update(action_params) - content_sql = "{actions_query} {filters}".format(actions_query=action_query, filters=filters,) - else: - self.params["events"].append(entity.id) - content_sql = "event = '{event}' {filters}".format(event=entity.id, filters=filters) - return content_sql - - def _build_filters(self, entity: Entity, index: int) -> str: - prop_filters, prop_filter_params = parse_prop_clauses( - entity.properties, self._team.pk, prepend=str(index), allow_denormalized_props=True - ) - self.params.update(prop_filter_params) - if entity.properties: - return prop_filters - return "" diff --git a/ee/clickhouse/queries/funnels/__init__.py b/ee/clickhouse/queries/funnels/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/ee/clickhouse/queries/funnels/base.py b/ee/clickhouse/queries/funnels/base.py new file mode 100644 index 0000000000000..da770a0e94579 --- /dev/null +++ b/ee/clickhouse/queries/funnels/base.py @@ -0,0 +1,122 @@ +from abc import ABC, abstractmethod +from typing import List, Tuple + +from django.utils import timezone + +from ee.clickhouse.client import sync_execute +from ee.clickhouse.models.action import format_action_filter +from ee.clickhouse.models.property import parse_prop_clauses +from ee.clickhouse.queries.util import parse_timestamps +from ee.clickhouse.sql.person import GET_LATEST_PERSON_DISTINCT_ID_SQL +from posthog.constants import TREND_FILTER_TYPE_ACTIONS +from posthog.models import Action, Entity, Filter, Team +from posthog.models.filters.mixins.funnel_window_days import FunnelWindowDaysMixin +from posthog.queries.funnel import Funnel +from posthog.utils import relative_date_parse + + +class ClickhouseFunnelBase(ABC, Funnel): + _filter: Filter + _team: Team + + def __init__(self, filter: Filter, team: Team) -> None: + self._filter = filter + self._team = team + self.params = { + "team_id": self._team.pk, + "events": [], # purely a speed optimization, don't need this for filtering + } + + def run(self, *args, **kwargs): + if len(self._filter.entities) == 0: + return [] + + results = self._exec_query() + return self._format_results(results) + + def _format_results(self, results): + # Format of this is [step order, person count (that reached that step), array of person uuids] + steps = [] + relevant_people = [] + total_people = 0 + + for step in reversed(self._filter.entities): + # Clickhouse step order starts at one, hence the +1 + result_step = [x for x in results if step.order + 1 == x[0]] + if len(result_step) > 0: + total_people += result_step[0][1] + relevant_people += result_step[0][2] + steps.append(self._serialize_step(step, total_people, relevant_people[0:100])) + + return steps[::-1] #  reverse + + def _exec_query(self) -> List[Tuple]: + prop_filters, prop_filter_params = parse_prop_clauses( + self._filter.properties, + self._team.pk, + prepend="global", + allow_denormalized_props=True, + filter_test_accounts=self._filter.filter_test_accounts, + ) + + # format default dates + data = {} + if not self._filter._date_from: + data.update({"date_from": relative_date_parse("-7d")}) + if not self._filter._date_to: + data.update({"date_to": timezone.now()}) + self._filter = self._filter.with_data(data) + + parsed_date_from, parsed_date_to, _ = parse_timestamps( + filter=self._filter, table="events.", team_id=self._team.pk + ) + self.params.update(prop_filter_params) + steps = [self._build_steps_query(entity, index) for index, entity in enumerate(self._filter.entities)] + + format_properties = { + "team_id": self._team.id, + "steps": ", ".join(steps), + "filters": prop_filters.replace("uuid IN", "events.uuid IN", 1), + "parsed_date_from": parsed_date_from, + "parsed_date_to": parsed_date_to, + "top_level_groupby": "", + "extra_select": "", + "extra_groupby": "", + "within_time": FunnelWindowDaysMixin.microseconds_from_days(self._filter.funnel_window_days), + "latest_distinct_id_sql": GET_LATEST_PERSON_DISTINCT_ID_SQL, + "offset": self._filter.offset, + } + + query = self.get_query(format_properties) + + return sync_execute(query, self.params) + + def _build_steps_query(self, entity: Entity, index: int) -> str: + filters = self._build_filters(entity, index) + if entity.type == TREND_FILTER_TYPE_ACTIONS: + action = Action.objects.get(pk=entity.id) + for action_step in action.steps.all(): + self.params["events"].append(action_step.event) + action_query, action_params = format_action_filter(action, "step_{}".format(index)) + if action_query == "": + return "" + + self.params.update(action_params) + content_sql = "{actions_query} {filters}".format(actions_query=action_query, filters=filters,) + else: + self.params["events"].append(entity.id) + content_sql = "event = '{event}' {filters}".format(event=entity.id, filters=filters) + return content_sql + + def _build_filters(self, entity: Entity, index: int) -> str: + prop_filters, prop_filter_params = parse_prop_clauses( + entity.properties, self._team.pk, prepend=str(index), allow_denormalized_props=True + ) + self.params.update(prop_filter_params) + if entity.properties: + return prop_filters + return "" + + @abstractmethod + def get_query(self, format_properties): + pass diff --git a/ee/clickhouse/queries/funnels/funnel.py b/ee/clickhouse/queries/funnels/funnel.py new file mode 100644 index 0000000000000..76cd6479bd01d --- /dev/null +++ b/ee/clickhouse/queries/funnels/funnel.py @@ -0,0 +1,7 @@ +from ee.clickhouse.queries.funnels.base import ClickhouseFunnelBase +from ee.clickhouse.sql.funnels.funnel import FUNNEL_SQL + + +class ClickhouseFunnel(ClickhouseFunnelBase): + def get_query(self, format_properties): + return FUNNEL_SQL.format(**format_properties) diff --git a/ee/clickhouse/queries/funnels/funnel_persons.py b/ee/clickhouse/queries/funnels/funnel_persons.py new file mode 100644 index 0000000000000..02cee79410d69 --- /dev/null +++ b/ee/clickhouse/queries/funnels/funnel_persons.py @@ -0,0 +1,15 @@ +from ee.clickhouse.queries.funnels.base import ClickhouseFunnelBase +from ee.clickhouse.sql.funnels.funnel import FUNNEL_PERSONS_SQL +from posthog.models import Person + + +class ClickhouseFunnelPersons(ClickhouseFunnelBase): + def get_query(self, format_properties): + return FUNNEL_PERSONS_SQL.format(**format_properties) + + def _format_results(self, results): + formatted_results = [] + for row in results: + distinct_ids, email = Person.get_distinct_ids_and_email_by_id(row[1], self._team.id) + formatted_results.append({"max_step": row[0], "distinct_ids": distinct_ids, "email": email}) + return formatted_results diff --git a/ee/clickhouse/queries/clickhouse_funnel_trends.py b/ee/clickhouse/queries/funnels/funnel_trends.py similarity index 96% rename from ee/clickhouse/queries/clickhouse_funnel_trends.py rename to ee/clickhouse/queries/funnels/funnel_trends.py index e13c8705ad3b1..159bb4dea8bd0 100644 --- a/ee/clickhouse/queries/clickhouse_funnel_trends.py +++ b/ee/clickhouse/queries/funnels/funnel_trends.py @@ -2,7 +2,7 @@ from ee.clickhouse.client import sync_execute from ee.clickhouse.models.property import parse_prop_clauses -from ee.clickhouse.queries.clickhouse_funnel_base import ClickhouseFunnelBase +from ee.clickhouse.queries.funnels.base import ClickhouseFunnelBase from ee.clickhouse.queries.util import get_time_diff, get_trunc_func_ch, parse_timestamps from ee.clickhouse.sql.events import NULL_SQL_FUNNEL_TRENDS from ee.clickhouse.sql.funnels.funnel_trend import FUNNEL_TREND_SQL @@ -18,6 +18,9 @@ class ClickhouseFunnelTrends(ClickhouseFunnelBase): def run(self): + if len(self._filter.entities) == 0: + return [] + summary = self.perform_query() ui_response = self._get_ui_response(summary) return ui_response @@ -124,3 +127,6 @@ def _determine_complete(self, timestamp): compare_timestamp = timestamp.date() if type(timestamp) is datetime else timestamp is_incomplete = compare_timestamp > completed_end return not is_incomplete + + def get_query(self, format_properties): + pass diff --git a/ee/clickhouse/queries/funnels/funnel_trends_persons.py b/ee/clickhouse/queries/funnels/funnel_trends_persons.py new file mode 100644 index 0000000000000..1690eb056a55e --- /dev/null +++ b/ee/clickhouse/queries/funnels/funnel_trends_persons.py @@ -0,0 +1,6 @@ +from ee.clickhouse.queries.funnels.base import ClickhouseFunnelBase + + +class ClickhouseFunnelTrendsPersons(ClickhouseFunnelBase): + def get_query(self, format_properties): + pass diff --git a/ee/clickhouse/queries/funnels/test/__init__.py b/ee/clickhouse/queries/funnels/test/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/ee/clickhouse/queries/test/test_funnel.py b/ee/clickhouse/queries/funnels/test/test_funnel.py similarity index 79% rename from ee/clickhouse/queries/test/test_funnel.py rename to ee/clickhouse/queries/funnels/test/test_funnel.py index 60931793f7de0..130dc03b177f5 100644 --- a/ee/clickhouse/queries/test/test_funnel.py +++ b/ee/clickhouse/queries/funnels/test/test_funnel.py @@ -1,11 +1,16 @@ from uuid import uuid4 from ee.clickhouse.models.event import create_event -from ee.clickhouse.queries.clickhouse_funnel import ClickhouseFunnel +from ee.clickhouse.queries.funnels.funnel import ClickhouseFunnel from ee.clickhouse.util import ClickhouseTestMixin from posthog.models.person import Person from posthog.queries.test.test_funnel import funnel_test_factory +FORMAT_TIME = "%Y-%m-%d 00:00:00" +MAX_STEP_COLUMN = 0 +COUNT_COLUMN = 1 +PERSON_ID_COLUMN = 2 + def _create_person(**kwargs): person = Person.objects.create(**kwargs) diff --git a/ee/clickhouse/queries/funnels/test/test_funnel_persons.py b/ee/clickhouse/queries/funnels/test/test_funnel_persons.py new file mode 100644 index 0000000000000..dbd37d7ba6617 --- /dev/null +++ b/ee/clickhouse/queries/funnels/test/test_funnel_persons.py @@ -0,0 +1,104 @@ +from uuid import uuid4 + +from ee.clickhouse.models.event import create_event +from ee.clickhouse.queries.funnels.funnel import ClickhouseFunnel +from ee.clickhouse.queries.funnels.funnel_persons import ClickhouseFunnelPersons +from ee.clickhouse.util import ClickhouseTestMixin +from posthog.constants import INSIGHT_FUNNELS, TRENDS_FUNNEL +from posthog.models import Filter +from posthog.models.filters.mixins.funnel_window_days import FunnelWindowDaysMixin +from posthog.models.person import Person +from posthog.test.base import APIBaseTest + +FORMAT_TIME = "%Y-%m-%d 00:00:00" +MAX_STEP_COLUMN = 0 +COUNT_COLUMN = 1 +PERSON_ID_COLUMN = 2 + + +def _create_person(**kwargs): + person = Person.objects.create(**kwargs) + return Person(id=person.uuid, uuid=person.uuid) + + +def _create_event(**kwargs): + kwargs.update({"event_uuid": uuid4()}) + create_event(**kwargs) + + +class TestFunnel(ClickhouseTestMixin, APIBaseTest): + def setUp(self): + self._create_sample_data() + super().setUp() + + def _create_sample_data(self): + for i in range(250): + _create_person(distinct_ids=[f"user_{i}"], team=self.team) + _create_event(event="step one", distinct_id=f"user_{i}", team=self.team, timestamp="2021-05-01 00:00:00") + _create_event(event="step two", distinct_id=f"user_{i}", team=self.team, timestamp="2021-05-03 00:00:00") + _create_event(event="step three", distinct_id=f"user_{i}", team=self.team, timestamp="2021-05-05 00:00:00") + + def test_basic_offset(self): + data = { + "insight": INSIGHT_FUNNELS, + "display": TRENDS_FUNNEL, + "interval": "day", + "date_from": "2021-05-01 00:00:00", + "date_to": "2021-05-07 00:00:00", + "funnel_window_days": 7, + "events": [ + {"id": "step one", "order": 0}, + {"id": "step two", "order": 1}, + {"id": "step three", "order": 2}, + ], + } + + filter = Filter(data=data) + results = ClickhouseFunnelPersons(filter, self.team)._exec_query() + self.assertEqual(100, len(results)) + + filter_offset = Filter(data={**data, "offset": 100,}) + results = ClickhouseFunnelPersons(filter_offset, self.team).run() + self.assertEqual(100, len(results)) + + filter_offset = Filter(data={**data, "offset": 200,}) + results = ClickhouseFunnelPersons(filter_offset, self.team).run() + self.assertEqual(50, len(results)) + + def test_funnel_window_days_to_microseconds(self): + one_day = FunnelWindowDaysMixin.microseconds_from_days(1) + two_days = FunnelWindowDaysMixin.microseconds_from_days(2) + three_days = FunnelWindowDaysMixin.microseconds_from_days(3) + + self.assertEqual(86400000000, one_day) + self.assertEqual(172800000000, two_days) + self.assertEqual(259200000000, three_days) + + def test_basic_conversion_window(self): + data = { + "insight": INSIGHT_FUNNELS, + "display": TRENDS_FUNNEL, + "interval": "day", + "date_from": "2021-05-01 00:00:00", + "date_to": "2021-05-07 00:00:00", + "funnel_window_days": 7, + "events": [ + {"id": "step one", "order": 0}, + {"id": "step two", "order": 1}, + {"id": "step three", "order": 2}, + ], + } + + filter = Filter(data={**data, "funnel_window_days": 1,}) + results = ClickhouseFunnel(filter, self.team)._exec_query() + self.assertEqual(1, len(results)) + self.assertEqual(1, results[0][MAX_STEP_COLUMN]) + self.assertEqual(250, results[0][COUNT_COLUMN]) + self.assertEqual(100, len(results[0][PERSON_ID_COLUMN])) + + filter = Filter(data={**data, "funnel_window_days": 2,}) + results = ClickhouseFunnel(filter, self.team)._exec_query() + self.assertEqual(1, len(results)) + self.assertEqual(2, results[0][MAX_STEP_COLUMN]) + self.assertEqual(250, results[0][COUNT_COLUMN]) + self.assertEqual(100, len(results[0][PERSON_ID_COLUMN])) diff --git a/ee/clickhouse/queries/test/test_funnel_trends.py b/ee/clickhouse/queries/funnels/test/test_funnel_trends.py similarity index 99% rename from ee/clickhouse/queries/test/test_funnel_trends.py rename to ee/clickhouse/queries/funnels/test/test_funnel_trends.py index 9a4484689ba5f..fdf2671207677 100644 --- a/ee/clickhouse/queries/test/test_funnel_trends.py +++ b/ee/clickhouse/queries/funnels/test/test_funnel_trends.py @@ -2,7 +2,7 @@ from uuid import uuid4 from ee.clickhouse.models.event import create_event -from ee.clickhouse.queries.clickhouse_funnel_trends import ClickhouseFunnelTrends +from ee.clickhouse.queries.funnels.funnel_trends import ClickhouseFunnelTrends from ee.clickhouse.util import ClickhouseTestMixin from posthog.constants import INSIGHT_FUNNELS, TRENDS_LINEAR from posthog.models.filters import Filter diff --git a/ee/clickhouse/sql/funnels/funnel.py b/ee/clickhouse/sql/funnels/funnel.py index 7201a8070759c..8869360d67459 100644 --- a/ee/clickhouse/sql/funnels/funnel.py +++ b/ee/clickhouse/sql/funnels/funnel.py @@ -22,3 +22,35 @@ ORDER BY max_step {top_level_groupby} ASC ; """ + +FUNNEL_PERSONS_SQL = """ +SELECT max_step, id +FROM +( + SELECT + pid.person_id as id, + {extra_select} + windowFunnel({within_time})(toUInt64(toUnixTimestamp64Micro(timestamp)), + {steps} + ) as max_step + FROM + events + JOIN ( + SELECT person_id, distinct_id FROM ({latest_distinct_id_sql}) WHERE team_id = %(team_id)s + ) as pid + ON pid.distinct_id = events.distinct_id + WHERE + team_id = %(team_id)s + {filters} + {parsed_date_from} + {parsed_date_to} + AND event IN %(events)s + GROUP BY pid.person_id {extra_groupby} +) +WHERE max_step > 0 +GROUP BY max_step, id +ORDER BY max_step ASC +limit 100 +offset {offset} +; +""" diff --git a/ee/clickhouse/views/insights.py b/ee/clickhouse/views/insights.py index 27161ef44d535..0178903407d10 100644 --- a/ee/clickhouse/views/insights.py +++ b/ee/clickhouse/views/insights.py @@ -1,20 +1,19 @@ -from typing import Any, Dict, List +from typing import Any, Dict from rest_framework.decorators import action from rest_framework.request import Request from rest_framework.response import Response -from ee.clickhouse.queries.clickhouse_funnel import ClickhouseFunnel from ee.clickhouse.queries.clickhouse_paths import ClickhousePaths from ee.clickhouse.queries.clickhouse_retention import ClickhouseRetention from ee.clickhouse.queries.clickhouse_stickiness import ClickhouseStickiness +from ee.clickhouse.queries.funnels.funnel import ClickhouseFunnel from ee.clickhouse.queries.sessions.clickhouse_sessions import ClickhouseSessions from ee.clickhouse.queries.trends.clickhouse_trends import ClickhouseTrends from ee.clickhouse.queries.util import get_earliest_timestamp from posthog.api.insight import InsightViewSet from posthog.constants import INSIGHT_FUNNELS, INSIGHT_PATHS, INSIGHT_SESSIONS, INSIGHT_STICKINESS, TRENDS_STICKINESS from posthog.decorators import cached_function -from posthog.models import Event from posthog.models.filters import Filter from posthog.models.filters.path_filter import PathFilter from posthog.models.filters.retention_filter import RetentionFilter diff --git a/ee/clickhouse/views/person.py b/ee/clickhouse/views/person.py index 24f58d3b60785..179f3ae9b40eb 100644 --- a/ee/clickhouse/views/person.py +++ b/ee/clickhouse/views/person.py @@ -1,12 +1,16 @@ from rest_framework import request, response +from rest_framework.decorators import action from rest_framework.exceptions import NotFound from ee.clickhouse.models.person import delete_person from ee.clickhouse.queries.clickhouse_retention import ClickhouseRetention from ee.clickhouse.queries.clickhouse_stickiness import ClickhouseStickiness +from ee.clickhouse.queries.funnels.funnel_persons import ClickhouseFunnelPersons +from ee.clickhouse.queries.funnels.funnel_trends_persons import ClickhouseFunnelTrendsPersons from ee.clickhouse.queries.trends.lifecycle import ClickhouseLifecycle from posthog.api.person import PersonViewSet -from posthog.models import Event, Person +from posthog.api.utils import format_next_absolute_url, format_next_url +from posthog.models import Event, Filter, Person # TODO: Move grabbing all this to Clickhouse. See WIP-people-from-clickhouse branch. @@ -16,6 +20,30 @@ class ClickhousePersonViewSet(PersonViewSet): retention_class = ClickhouseRetention stickiness_class = ClickhouseStickiness + @action(methods=["GET"], detail=False) + def funnel(self, request: request.Request, **kwargs) -> response.Response: + if request.user.is_anonymous or not request.user.team: + return response.Response(data=[]) + + filter = Filter(request=request) + team = request.user.team + results = ClickhouseFunnelPersons(filter, team).run() + + next_url = format_next_absolute_url(request, filter.offset, 100) if len(results) > 99 else None + return response.Response(data={"results": results, "next": next_url}) + + @action(methods=["GET"], detail=False) + def funnel_trends(self, request: request.Request, **kwargs) -> response.Response: + if request.user.is_anonymous or not request.user.team: + return response.Response(data=[]) + + filter = Filter(request=request) + team = request.user.team + results = ClickhouseFunnelTrendsPersons(filter, team).run() + + next_url = format_next_absolute_url(request, filter.offset, 100) if len(results) > 99 else None + return response.Response(data={"results": results, "next": next_url}) + def destroy(self, request: request.Request, pk=None, **kwargs): # type: ignore try: person = Person.objects.get(team=self.team, pk=pk) diff --git a/ee/clickhouse/views/test/test_clickhouse_funnel_person.py b/ee/clickhouse/views/test/test_clickhouse_funnel_person.py new file mode 100644 index 0000000000000..492b45735a6e7 --- /dev/null +++ b/ee/clickhouse/views/test/test_clickhouse_funnel_person.py @@ -0,0 +1,50 @@ +import json + +from rest_framework import status + +from ee.clickhouse.generate_local import GenerateLocal +from ee.clickhouse.util import ClickhouseTestMixin +from posthog.constants import INSIGHT_FUNNELS, TRENDS_FUNNEL +from posthog.test.base import APIBaseTest + + +class TestFunnelPerson(ClickhouseTestMixin, APIBaseTest): + def setUp(self): + super().setUp() + GenerateLocal(self.team).generate() + + def test_basic_pagination(self): + request_data = { + "insight": INSIGHT_FUNNELS, + "display": TRENDS_FUNNEL, + "interval": "day", + "actions": json.dumps([]), + "events": json.dumps( + [{"id": "step one", "order": 0}, {"id": "step two", "order": 1}, {"id": "step three", "order": 2},] + ), + "properties": json.dumps([]), + "funnel_window_days": 14, + "filter_test_accounts": "false", + "new_entity": json.dumps([]), + "date_from": "2021-05-01", + "date_to": "2021-05-10", + } + + response = self.client.get("/api/person/funnel/", data=request_data) + self.assertEqual(response.status_code, status.HTTP_200_OK) + j = response.json() + next = j["next"] + self.assertEqual(100, len(j["results"])) + + response = self.client.get(next) + self.assertEqual(response.status_code, status.HTTP_200_OK) + j = response.json() + next = j["next"] + self.assertEqual(100, len(j["results"])) + self.assertNotEqual(None, next) + + response = self.client.get(next) + self.assertEqual(response.status_code, status.HTTP_200_OK) + j = response.json() + self.assertEqual(50, len(j["results"])) + self.assertEqual(None, j["next"]) diff --git a/posthog/api/utils.py b/posthog/api/utils.py index fabecdd264337..78cbff2140e88 100644 --- a/posthog/api/utils.py +++ b/posthog/api/utils.py @@ -22,9 +22,11 @@ def format_next_url(request: request.Request, offset: int, page_size: int): if not next_url: return None + new_offset = str(offset + page_size) + if "offset" in next_url: next_url = next_url[1:] - next_url = next_url.replace("offset=" + str(offset), "offset=" + str(offset + page_size)) + next_url = next_url.replace(f"offset={str(offset)}", f"offset={new_offset}") else: next_url = request.build_absolute_uri( "{}{}offset={}".format(next_url, "&" if "?" in next_url else "?", offset + page_size) @@ -32,6 +34,22 @@ def format_next_url(request: request.Request, offset: int, page_size: int): return next_url +def format_next_absolute_url(request: request.Request, offset: int, page_size: int): + next_url = request.get_raw_uri() + + if not next_url: + return None + + new_offset = str(offset + page_size) + + if "offset" in next_url: + next_url = next_url.replace(f"offset={str(offset)}", f"offset={new_offset}") + else: + next_url = next_url + ("&" if "?" in next_url else "?") + f"offset={new_offset}" + + return next_url + + def get_token(data, request) -> Tuple[Optional[str], bool]: token = None if request.method == "GET": diff --git a/posthog/models/filters/mixins/funnel_window_days.py b/posthog/models/filters/mixins/funnel_window_days.py index 460cf534b1a19..07e3487ae2705 100644 --- a/posthog/models/filters/mixins/funnel_window_days.py +++ b/posthog/models/filters/mixins/funnel_window_days.py @@ -15,5 +15,10 @@ def funnel_window_days_to_dict(self): @staticmethod def milliseconds_from_days(days): - second, minute, hour, day = [1000, 60, 60, 24] - return second * minute * hour * day * days + milliseconds, seconds, minutes, hours = [1000, 60, 60, 24] + return milliseconds * seconds * minutes * hours * days + + @staticmethod + def microseconds_from_days(days): + microseconds = 1000 + return microseconds * FunnelWindowDaysMixin.milliseconds_from_days(days) diff --git a/posthog/models/person.py b/posthog/models/person.py index 673701e94f4ba..b50dd94219c08 100644 --- a/posthog/models/person.py +++ b/posthog/models/person.py @@ -79,6 +79,22 @@ def merge_people(self, people_to_merge: List["Person"]): # Has an index on properties -> email, built concurrently # See migration 0121 + @staticmethod + def get_distinct_ids_and_email_by_ids(person_ids, team_id): + decorated = [] + for person_id in person_ids: + distinct_ids, email = Person.get_distinct_ids_and_email_by_id(person_id, team_id) + decorated.append({"distinct_ids": distinct_ids, "email": email}) + return decorated + + @staticmethod + def get_distinct_ids_and_email_by_id(person_id, team_id): + person = Person.objects.get(uuid=person_id) + distinct_ids = PersonDistinctId.objects.filter(person_id=person.id, team_id=team_id) + flat_distinct_ids = [row.distinct_id for row in distinct_ids] + email = person.properties.get("email", "") + return flat_distinct_ids, email + class PersonDistinctId(models.Model): class Meta: diff --git a/posthog/queries/base.py b/posthog/queries/base.py index 37b854af0a85d..1f155e8cc88b0 100644 --- a/posthog/queries/base.py +++ b/posthog/queries/base.py @@ -1,5 +1,3 @@ -import copy -from datetime import timedelta from typing import Any, Callable, Dict, List, Optional, Union, cast from dateutil.relativedelta import relativedelta diff --git a/posthog/tasks/update_cache.py b/posthog/tasks/update_cache.py index dc8dc46b2aff3..56e295b00ae13 100644 --- a/posthog/tasks/update_cache.py +++ b/posthog/tasks/update_cache.py @@ -19,6 +19,7 @@ INSIGHT_SESSIONS, INSIGHT_STICKINESS, INSIGHT_TRENDS, + TRENDS_LINEAR, TRENDS_STICKINESS, ) from posthog.decorators import CacheType @@ -151,7 +152,10 @@ def _calculate_funnel(filter: Filter, key: str, team_id: int) -> List[Dict[str, dashboard_items.update(refreshing=True) if is_clickhouse_enabled(): - insight_class = import_from("ee.clickhouse.queries.clickhouse_funnel", "ClickhouseFunnel") + if filter.display == TRENDS_LINEAR: + insight_class = import_from("ee.clickhouse.queries.funnels.funnel_trends", "ClickhouseFunnelTrends") + else: + insight_class = import_from("ee.clickhouse.queries.funnels.funnel", "ClickhouseFunnel") else: insight_class = import_from("posthog.queries.funnel", "Funnel")