Skip to content

Commit

Permalink
Fix: sample traces per command, not globally (#73)
Browse files Browse the repository at this point in the history
We now have a few commands that happen 100 times less than others.
These now rarely show up, while we are interested in stats per command,
not of the full population.

While at it moved some code around to separate tracer code from
the rest.
  • Loading branch information
TrueBrain authored Oct 17, 2021
1 parent 8cec6f5 commit 53cb04c
Show file tree
Hide file tree
Showing 9 changed files with 144 additions and 109 deletions.
98 changes: 4 additions & 94 deletions game_coordinator/__main__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import asyncio
import beeline
import click
import logging
import random
import signal

from openttd_helpers import click_helper
Expand All @@ -22,87 +20,13 @@
click_application_turn,
)
from .database.redis import click_database_redis
from .tracer import click_tracer
from .web import start_webserver

TRACES_PER_HOUR = 100
TRACES_SAMPLE_RATE = 10

log = logging.getLogger(__name__)

# Current active samples.
samples = {}
# Bucket for the amount of allowed samples to go out.
samples_bucket = 0.0
# Window-based stats about how many samples were send and how many were there
# in total. Used to estimate the sample rate.
samples_accepted = [0] * 60
samples_total = [0] * 60


def beeline_sampler(event):
global samples_bucket, samples_skipped

trace_id = event["trace.trace_id"]

# New trace. Check if we want to sample it.
if trace_id not in samples:
samples_total[0] += 1
samples[trace_id] = False

# Check if we can send this trace.
if samples_bucket > 1 and random.randint(1, TRACES_SAMPLE_RATE) == 1:
samples_bucket -= 1
samples_accepted[0] += 1

samples[trace_id] = True

# Calculate the result and sample-rate.
result = samples[trace_id]
sample_rate = sum(samples_total) // sum(samples_accepted) if result else 0

# This trace is closing. So forget any information about it.
if event["trace.parent_id"] is None:
del samples[trace_id]

return result, sample_rate


async def fill_samples_bucket():
global samples_bucket

count = 0

# Every five seconds, fill the bucket a bit, so we can sample randomly
# during the hour.
# Every minute, move the window of samples_accepted / sample_total.
while True:
await asyncio.sleep(5)

# Don't overflow the bucket past the size of an hour.
if samples_bucket < TRACES_PER_HOUR:
samples_bucket += TRACES_PER_HOUR * 5 / 3600

# Update the samples-stats every minute.
count += 1
if count > 60 / 5:
count = 0

# Move the window of the samples-stats.
samples_accepted[:] = [0] + samples_accepted[:-1]
samples_total[:] = [0] + samples_total[:-1]


async def run_server(application, bind, port, ProtocolClass, honeycomb_api_key):
if honeycomb_api_key:
beeline.init(
writekey=honeycomb_api_key,
dataset="game-coordinator",
service_name=application.name,
sampler_hook=beeline_sampler,
)
asyncio.create_task(fill_samples_bucket())
log.info("Honeycomb beeline initialized")

async def run_server(application, bind, port, ProtocolClass):
loop = asyncio.get_event_loop()

server = await loop.create_server(
Expand Down Expand Up @@ -141,13 +65,7 @@ async def close_server(loop, app_instance, server):
@click_helper.command()
@click_logging # Should always be on top, as it initializes the logging
@click_sentry
@click.option("--honeycomb-api-key", help="Honeycomb API key.")
@click.option(
"--honeycomb-rate-limit", help="How many traces to send to Honeycomb per hour.", default=100, show_default=True
)
@click.option(
"--honeycomb-sample-rate", help="The sample rate of traces to send to Honeycomb.", default=10, show_default=True
)
@click_tracer
@click.option(
"--bind", help="The IP to bind the server to", multiple=True, default=["::1", "127.0.0.1"], show_default=True
)
Expand Down Expand Up @@ -177,9 +95,6 @@ async def close_server(loop, app_instance, server):
@click_application_coordinator
@click_application_turn
def main(
honeycomb_api_key,
honeycomb_rate_limit,
honeycomb_sample_rate,
bind,
app,
coordinator_port,
Expand All @@ -189,11 +104,6 @@ def main(
db,
proxy_protocol,
):
global TRACES_PER_HOUR, TRACES_SAMPLE_RATE

TRACES_PER_HOUR = int(honeycomb_rate_limit)
TRACES_SAMPLE_RATE = int(honeycomb_sample_rate)

loop = asyncio.get_event_loop()

db_instance = db()
Expand All @@ -214,7 +124,7 @@ def main(
port = stun_port
protocol = StunProtocol

server = loop.run_until_complete(run_server(app_instance, bind, port, protocol, honeycomb_api_key))
server = loop.run_until_complete(run_server(app_instance, bind, port, protocol))

loop.add_signal_handler(signal.SIGTERM, lambda: asyncio.ensure_future(close_server(loop, app_instance, server)))
loop.add_signal_handler(signal.SIGINT, lambda: asyncio.ensure_future(close_server(loop, app_instance, server)))
Expand Down
2 changes: 0 additions & 2 deletions game_coordinator/application/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@


class Application:
name = "coordinator"

def __init__(self, database):
if not _shared_secret:
raise Exception("Please set --shared-secret for this application")
Expand Down
4 changes: 2 additions & 2 deletions game_coordinator/application/helpers/token_connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ async def stun_result_concluded(self, prefix, interface_number, result):

@tracer.untraced
async def _timeout(self):
tracer.add_context({"command": "connect.timeout"})
tracer.add_trace_field("command", "connect.timeout")

try:
await asyncio.sleep(TIMEOUT)
Expand All @@ -135,7 +135,7 @@ async def _timeout(self):
@tracer.untraced
@tracer.traced("token-connect")
async def _connect_guard(self):
tracer.add_context({"command": "connect.connect"})
tracer.add_trace_field("command", "connect.connect")

try:
await self._connect()
Expand Down
4 changes: 2 additions & 2 deletions game_coordinator/application/helpers/token_verify.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ async def stun_result_concluded(self, prefix, interface_number, result):
@tracer.untraced
@tracer.traced("token-verify")
async def _start_detection(self, interface_number, server_ip):
tracer.add_context({"command": "verify.start"})
tracer.add_trace_field("command", "verify.start")

try:
await asyncio.wait_for(self._create_connection(server_ip, self._server.server_port), TIMEOUT_DIRECT_CONNECT)
Expand Down Expand Up @@ -135,7 +135,7 @@ async def _start_detection(self, interface_number, server_ip):
@tracer.untraced
@tracer.traced("token-verify")
async def _conclude_detection(self):
tracer.add_context({"command": "verify.conclude"})
tracer.add_trace_field("command", "verify.conclude")

try:
await asyncio.wait_for(self._stun_done_event.wait(), TIMEOUT_VERIFY)
Expand Down
2 changes: 0 additions & 2 deletions game_coordinator/application/stun.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@


class Application:
name = "stun"

def __init__(self, database):
self.database = database

Expand Down
2 changes: 0 additions & 2 deletions game_coordinator/application/turn.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@


class Application:
name = "turn"

def __init__(self, database):
if not _turn_address:
raise Exception("Please set --turn-address for this application")
Expand Down
8 changes: 4 additions & 4 deletions game_coordinator/database/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,20 +107,20 @@ async def _monitor_expire(self):

with tracer.tracer("db.expire-handler"):
if message["data"].startswith("turn-server:"):
tracer.add_context({"command": "expire.turn-server"})
tracer.add_trace_field("command", "expire.turn-server")
_, _, turn_server = message["data"].partition(":")

await self.application.remove_turn_server(turn_server)

if message["data"].startswith("gc-newgrf:"):
tracer.add_context({"command": "expire.gc-newgrf"})
tracer.add_trace_field("command", "expire.gc-newgrf")
_, _, grfid_md5sum = message["data"].partition(":")
grfid, _, md5sum = grfid_md5sum.partition("-")

await self.application.remove_newgrf_from_table(grfid, md5sum)

if message["data"].startswith("gc-server:"):
tracer.add_context({"command": "expire.gc-server"})
tracer.add_trace_field("command", "expire.gc-server")
_, _, server_id = message["data"].partition(":")

await self._redis.delete(f"gc-direct-ipv4:{server_id}")
Expand Down Expand Up @@ -242,7 +242,7 @@ async def _follow_stream(self):
continue

with tracer.tracer("db.stream-handler"):
tracer.add_context({"command": f"dbstream.{entry['type']}"})
tracer.add_trace_field("command", f"dbstream.{entry['type']}")

proc = lookup_table.get(entry["type"])
if proc is None:
Expand Down
131 changes: 131 additions & 0 deletions game_coordinator/tracer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import asyncio
import beeline
import click
import logging
import math

from collections import defaultdict
from openttd_helpers import click_helper

log = logging.getLogger(__name__)


MAX_INT32 = math.pow(2, 32) - 1

TRACES_PER_HOUR = 5
TRACES_SAMPLE_RATE = 10


class Sample:
def __init__(self):
self._traces_per_hour = TRACES_PER_HOUR
self._sample_rate = TRACES_SAMPLE_RATE

self._accepted = [0] * 60
self._total = [0] * 60
self._bucket = 1
self._traces = {}

def sampler(self, trace_id, parent_id):
# A new trace. Calculate if we want to sample it.
if trace_id not in self._traces:
self._total[0] += 1
self._traces[trace_id] = False

# Check if we can and want to sample this trace.
if self._bucket >= 1 and int(trace_id[:8], 16) < MAX_INT32 / self._sample_rate:
self._bucket -= 1
self._accepted[0] += 1

self._traces[trace_id] = True

# Calculate the result and sample-rate.
result = self._traces[trace_id]
sample_rate = sum(self._total) // sum(self._accepted) if result else 0

# This is the last event of this trace.
if parent_id is None:
del self._traces[trace_id]

return result, sample_rate

def fill_bucket(self, delta):
if self._bucket < self._traces_per_hour:
self._bucket += self._traces_per_hour * delta / 3600

def update_window(self):
self._accepted[:] = [0] + self._accepted[:-1]
self._total[:] = [0] + self._total[:-1]


samples = defaultdict(Sample)


def beeline_sampler(event):
# Some redis backend calls are not called from a trace (like disconnect).
# Simply do not sample these traces.
if "app.command" not in event:
return False, 0

return samples[event["app.command"]].sampler(event["trace.trace_id"], event["trace.parent_id"])


async def fill_bucket():
while True:
await asyncio.sleep(5)

for sample in samples.values():
sample.fill_bucket(5)


async def update_window():
while True:
await asyncio.sleep(60)

for sample in samples.values():
sample.update_window()


async def tracer_init(honeycomb_api_key, honeycomb_service_name):
beeline.init(
writekey=honeycomb_api_key,
dataset="game-coordinator",
service_name=honeycomb_service_name,
sampler_hook=beeline_sampler,
)
asyncio.create_task(fill_bucket())
asyncio.create_task(update_window())
log.info("Honeycomb initialized with traces-per-hour=%d and sample-rate=%d", TRACES_PER_HOUR, TRACES_SAMPLE_RATE)


@click_helper.extend
@click.option("--honeycomb-api-key", help="Honeycomb API key.")
@click.option(
"--honeycomb-service-name",
help="Honeycomb service name to use.",
default="coordinator",
show_default=True,
)
@click.option(
"--honeycomb-rate-limit",
help="How many traces per command per hour to send to Honeycomb.",
default=5,
show_default=True,
)
@click.option(
"--honeycomb-sample-rate",
help="The sample rate of traces to send to Honeycomb.",
default=10,
show_default=True,
)
def click_tracer(honeycomb_api_key, honeycomb_service_name, honeycomb_rate_limit, honeycomb_sample_rate):
global TRACES_PER_HOUR, TRACES_SAMPLE_RATE

if not honeycomb_api_key:
return

TRACES_PER_HOUR = int(honeycomb_rate_limit)
TRACES_SAMPLE_RATE = int(honeycomb_sample_rate)

loop = asyncio.get_event_loop()
loop.run_until_complete(tracer_init(honeycomb_api_key, honeycomb_service_name))
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ idna==3.3
libhoney==1.11.1
multidict==5.2.0
openttd-helpers==1.0.1
openttd-protocol==1.4.0
openttd-protocol==1.4.1
pproxy==2.7.8
requests==2.26.0
sentry-sdk==1.4.3
Expand Down

0 comments on commit 53cb04c

Please sign in to comment.