Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(decide): Add proper analytics for decide requests #15837

Merged
merged 19 commits into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bin/tests
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,4 @@ PG_PASSWORD="${PGPASSWORD:=posthog}"
PG_PORT="${PGPORT:=5432}"
PGOPTIONS='--client-min-messages=warning' psql posthog -d "postgres://${PG_USER}:${PG_PASSWORD}@${PG_HOST}:${PG_PORT}" -c "drop database if exists test_posthog" 1> /dev/null

nodemon -w ./posthog -w ./ee --ext py --exec "OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES pytest --reuse-db --durations-min=2.0 ${MIGRATIONS} -s $* --snapshot-update; mypy posthog ee"
nodemon -w ./posthog -w ./ee --ext py --exec "OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES pytest --reuse-db --durations-min=2.0 ${MIGRATIONS} -s $* --snapshot-update; mypy -p posthog --exclude bin/migrate_kafka_data.py --exclude posthog/hogql/grammar/HogQLParser.py --exclude gunicorn.config.py --exclude posthog/batch_exports/models.py"
11 changes: 11 additions & 0 deletions posthog/api/decide.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
from random import random
import re
from typing import Any, Dict, List, Optional, Union
from urllib.parse import urlparse
from posthog.models.feature_flag.flag_analytics import increment_request_count
from posthog.models.filters.mixins.utils import process_bool

import structlog
import posthoganalytics
from django.http import HttpRequest, JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django.conf import settings
from rest_framework import status
from sentry_sdk import capture_exception
from statshog.defaults.django import statsd
Expand Down Expand Up @@ -207,6 +210,14 @@ def get_decide(request: HttpRequest):
# `test_decide_doesnt_error_out_when_database_is_down`
# which ensures that decide doesn't error out when the database is down

if feature_flags and settings.ENABLE_DECIDE_BILLING_ANALYTICS:
# Billing analytics for decide requests with feature flags

# Sampling to relax the load on redis
if settings.DECIDE_BILLING_SAMPLING_RATE and random() < settings.DECIDE_BILLING_SAMPLING_RATE:
count = int(1 / settings.DECIDE_BILLING_SAMPLING_RATE)
increment_request_count(team.pk, count)

# Analytics for decide requests with feature flags
# Only send once flag definitions are loaded
# don't block on loading flag definitions
Expand Down
90 changes: 90 additions & 0 deletions posthog/api/test/test_decide.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import base64
import json
import random
from unittest.mock import patch
import time


from django.core.cache import cache
from django.db import connection
from django.test.client import Client
from freezegun import freeze_time
from rest_framework import status

from posthog.api.test.test_feature_flag import QueryTimeoutWrapper
Expand All @@ -18,6 +20,7 @@
from posthog.models.utils import generate_random_token_personal
from posthog.test.base import BaseTest, QueryMatchingTest, snapshot_postgres_queries, snapshot_postgres_queries_context
from posthog.utils import is_postgres_connected_cached_check
from posthog import redis


@patch("posthog.models.feature_flag.flag_matching.is_postgres_connected_cached_check", return_value=True)
Expand All @@ -30,6 +33,11 @@ class TestDecide(BaseTest, QueryMatchingTest):
def setUp(self, *args):
cache.clear()

# delete all keys in redis
r = redis.get_client()
for key in r.scan_iter("*"):
r.delete(key)

super().setUp()
# it is really important to know that /decide is CSRF exempt. Enforce checking in the client
self.client = Client(enforce_csrf_checks=True)
Expand Down Expand Up @@ -1806,6 +1814,88 @@ def test_rate_limits_dont_mix_teams(self, *args):
response = self._post_decide(api_version=3, data={"token": new_token, "distinct_id": "other id"})
self.assertEqual(response.status_code, 429)

@patch("posthog.models.feature_flag.flag_analytics.CACHE_BUCKET_SIZE", 10)
def test_decide_analytics_only_fires_when_enabled(self, *args):
FeatureFlag.objects.create(
team=self.team, rollout_percentage=50, name="Beta feature", key="beta-feature", created_by=self.user
)
self.client.logout()
with self.settings(ENABLE_DECIDE_BILLING_ANALYTICS="f"):
response = self._post_decide(api_version=3)
self.assertEqual(response.status_code, 200)

client = redis.get_client()
# check that no increments made it to redis
self.assertEqual(client.hgetall(f"posthog:decide_requests:{self.team.pk}"), {})

with self.settings(ENABLE_DECIDE_BILLING_ANALYTICS="true", DECIDE_BILLING_SAMPLING_RATE=1), freeze_time(
"2022-05-07 12:23:07"
):
response = self._post_decide(api_version=3)
self.assertEqual(response.status_code, 200)

client = redis.get_client()
# check that single increment made it to redis
self.assertEqual(client.hgetall(f"posthog:decide_requests:{self.team.pk}"), {b"165192618": b"1"})

# @patch("posthog.api.decide.increment_request_count")
@patch("posthog.models.feature_flag.flag_analytics.CACHE_BUCKET_SIZE", 10)
def test_decide_analytics_samples_appropriately(self, *args):
random.seed(12345)
FeatureFlag.objects.create(
team=self.team, rollout_percentage=50, name="Beta feature", key="beta-feature", created_by=self.user
)
self.client.logout()
with self.settings(ENABLE_DECIDE_BILLING_ANALYTICS="true", DECIDE_BILLING_SAMPLING_RATE=0.5), freeze_time(
"2022-05-07 12:23:07"
):
for _ in range(5):
# given the seed, 4 out of 5 are sampled
response = self._post_decide(api_version=3)
self.assertEqual(response.status_code, 200)

client = redis.get_client()
# check that no increments made it to redis
self.assertEqual(client.hgetall(f"posthog:decide_requests:{self.team.pk}"), {b"165192618": b"8"})

@patch("posthog.models.feature_flag.flag_analytics.CACHE_BUCKET_SIZE", 10)
def test_decide_analytics_samples_appropriately_with_small_sample_rate(self, *args):
random.seed(12345)
FeatureFlag.objects.create(
team=self.team, rollout_percentage=50, name="Beta feature", key="beta-feature", created_by=self.user
)
self.client.logout()
with self.settings(ENABLE_DECIDE_BILLING_ANALYTICS="true", DECIDE_BILLING_SAMPLING_RATE=0.02), freeze_time(
"2022-05-07 12:23:07"
):
for _ in range(5):
# given the seed, 1 out of 5 are sampled
response = self._post_decide(api_version=3)
self.assertEqual(response.status_code, 200)

client = redis.get_client()
# check that no increments made it to redis
self.assertEqual(client.hgetall(f"posthog:decide_requests:{self.team.pk}"), {b"165192618": b"50"})

@patch("posthog.models.feature_flag.flag_analytics.CACHE_BUCKET_SIZE", 10)
def test_decide_analytics_samples_dont_break_with_zero_sampling(self, *args):
random.seed(12345)
FeatureFlag.objects.create(
team=self.team, rollout_percentage=50, name="Beta feature", key="beta-feature", created_by=self.user
)
self.client.logout()
with self.settings(ENABLE_DECIDE_BILLING_ANALYTICS="true", DECIDE_BILLING_SAMPLING_RATE=0), freeze_time(
"2022-05-07 12:23:07"
):
for _ in range(5):
# 0 out of 5 are sampled
response = self._post_decide(api_version=3)
self.assertEqual(response.status_code, 200)

client = redis.get_client()
# check that no increments made it to redis
self.assertEqual(client.hgetall(f"posthog:decide_requests:{self.team.pk}"), {})


class TestDatabaseCheckForDecide(BaseTest, QueryMatchingTest):
"""
Expand Down
2 changes: 1 addition & 1 deletion posthog/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def ready(self):
posthoganalytics.disabled = True

# load feature flag definitions if not already loaded
if posthoganalytics.feature_flag_definitions() is None:
if not posthoganalytics.disabled and posthoganalytics.feature_flag_definitions() is None:
posthoganalytics.default_client.load_feature_flags()

if not settings.SKIP_SERVICE_VERSION_REQUIREMENTS:
Expand Down
26 changes: 25 additions & 1 deletion posthog/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from posthog.cloud_utils import is_cloud
from posthog.metrics import pushed_metrics_registry
from posthog.redis import get_client
from posthog.utils import get_crontab
from posthog.utils import get_crontab, get_instance_region

# set the default Django settings module for the 'celery' program.
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "posthog.settings")
Expand Down Expand Up @@ -95,6 +95,10 @@ def setup_periodic_tasks(sender: Celery, **kwargs):
# Verify that persons data is in sync every day at 4 AM UTC
sender.add_periodic_task(crontab(hour=4, minute=0), verify_persons_data_in_sync.s())

if settings.ENABLE_DECIDE_BILLING_ANALYTICS:
# Every 30 minutes, send decide request counts to the main posthog instance
sender.add_periodic_task(crontab(minute="*/30"), calculate_decide_usage.s(), name="calculate decide usage")

# if is_cloud() or settings.DEMO:
# Reset master project data every Monday at Thursday at 5 AM UTC. Mon and Thu because doing this every day
# would be too hard on ClickHouse, and those days ensure most users will have data at most 3 days old.
Expand Down Expand Up @@ -748,6 +752,26 @@ def calculate_billing_daily_usage():
compute_daily_usage_for_organizations()


@app.task(ignore_result=True, max_retries=1)
def calculate_decide_usage() -> None:
from posthog.models.feature_flag.flag_analytics import capture_team_decide_usage
from posthog.models import Team
from django.db.models import Q
from posthoganalytics import Posthog

# send EU data to EU, US data to US
api_key = "phc_dZ4GK1LRjhB97XozMSkEwPXx7OVANaJEwLErkY1phUF" if get_instance_region() == "EU" else "sTMFPsFhdP1Ssg"

ph_client = Posthog(api_key)

for team in Team.objects.select_related("organization").exclude(
Q(organization__for_internal_metrics=True) | Q(is_demo=True)
):
capture_team_decide_usage(ph_client, team.id, team.uuid)

ph_client.shutdown()


@app.task(ignore_result=True)
def demo_reset_master_team():
from posthog.tasks.demo_reset_master_team import demo_reset_master_team
Expand Down
55 changes: 55 additions & 0 deletions posthog/models/feature_flag/flag_analytics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from typing import TYPE_CHECKING
from posthog.redis import redis, get_client
import time
from sentry_sdk import capture_exception

if TYPE_CHECKING:
from posthoganalytics import Posthog

REDIS_LOCK_TOKEN = "posthog:decide_analytics:lock"
CACHE_BUCKET_SIZE = 60 * 2 # duration in seconds


def get_team_request_key(team_id: int) -> str:
return f"posthog:decide_requests:{team_id}"


def increment_request_count(team_id: int, count: int = 1) -> None:
try:
client = get_client()
time_bucket = str(int(time.time() / CACHE_BUCKET_SIZE))
key_name = get_team_request_key(team_id)
client.hincrby(key_name, time_bucket, count)
except Exception as error:
capture_exception(error)


def capture_team_decide_usage(ph_client: "Posthog", team_id: int, team_uuid: str) -> None:
try:
client = get_client()
total_request_count = 0

with client.lock(f"{REDIS_LOCK_TOKEN}:{team_id}", timeout=60, blocking=False):
key_name = get_team_request_key(team_id)
existing_values = client.hgetall(key_name)
time_buckets = existing_values.keys()
# The latest bucket is still being filled, so we don't want to delete it nor count it.
# It will be counted in a later iteration, when it's not being filled anymore.
if time_buckets and len(time_buckets) > 1:
# redis returns encoded bytes, so we need to convert them into unix epoch for sorting
for time_bucket in sorted(time_buckets, key=lambda bucket: int(bucket))[:-1]:
total_request_count += int(existing_values[time_bucket])
client.hdel(key_name, time_bucket)

if total_request_count > 0:
ph_client.capture(
team_id,
"decide usage",
{"count": total_request_count, "team_id": team_id, "team_uuid": team_uuid},
)

except redis.exceptions.LockError:
# lock wasn't acquired, which means another worker is working on this, so we don't need to do anything
pass
except Exception as error:
capture_exception(error)
4 changes: 4 additions & 0 deletions posthog/settings/web.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
DECIDE_BUCKET_CAPACITY = get_from_env("DECIDE_BUCKET_CAPACITY", type_cast=int, default=500)
DECIDE_BUCKET_REPLENISH_RATE = get_from_env("DECIDE_BUCKET_REPLENISH_RATE", type_cast=float, default=10.0)

# Decide billing analytics

ENABLE_DECIDE_BILLING_ANALYTICS = get_from_env("ENABLE_DECIDE_BILLING_ANALYTICS", False, type_cast=str_to_bool)
DECIDE_BILLING_SAMPLING_RATE = get_from_env("DECIDE_BILLING_SAMPLING_RATE", 0.1, type_cast=float)

# Application definition

Expand Down
Loading