Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(python)!: use bidder indexing as input #11

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions locustfiles/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,18 @@ class AdRequest(typing.TypedDict):
"""Device used by the user visiting the publisher's site"""


class BidderFactory(polyfactory.factories.TypedDictFactory[Bidder]):
__set_as_default_factory_for_type__ = True
_names = [
polyfactory.factories.TypedDictFactory.__faker__.unique.company()
for _ in range(20)
]

@classmethod
def name(cls) -> str:
return str(cls.__random__.choice(cls._names))


class AdRequestFactory(polyfactory.factories.TypedDictFactory[AdRequest]):
"""Utility object to generate random AdRequests"""

Expand Down
6 changes: 3 additions & 3 deletions locustfiles/root.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ class Root(locust.FastHttpUser):
def on_start(self) -> None:
ad_request = AdRequestFactory.build()

self.fabrics = [
{
self.fabrics = {
bidder["name"]: {
"featureMap": {
"bidder": bidder["name"],
"userSynced": bidder.get("user_id") is not None,
Expand All @@ -22,7 +22,7 @@ def on_start(self) -> None:
"groundTruth": {"hasResponse": True},
}
for bidder in ad_request["bidders"]
]
}
return super().on_start()

@locust.task(100)
Expand Down
16 changes: 10 additions & 6 deletions locustfiles/rtb.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ def handleAdRequest(self):
ad_request = AdRequestFactory.build()

# Build a list of features for each bidder in the ad request
feature_maps = [
{
fabrics = {
bidder["name"]: {
"featureMap": {
"bidder": bidder["name"],
"userSynced": bidder.get("user_id") is not None,
Expand All @@ -29,12 +29,14 @@ def handleAdRequest(self):
}
}
for bidder in ad_request["bidders"]
]
}
# Do a single call to the Greenbids Tailor service
fabrics = self.client.put("", json=feature_maps).json()
fabrics = self.client.put("", json=fabrics).json()
# Prepare a flag to know if you must make the POST call
is_exploration = False

# Do your regular calls here to send a bid requests to the selected bidders
for fabric in fabrics:
for fabric in fabrics.values():
if not fabric["prediction"]["shouldSend"]:
# Skip any bidder that as too few response probability
continue
Expand All @@ -49,7 +51,9 @@ def handleAdRequest(self):
# Store the outcome in the fabric
fabric["groundTruth"] = dict(hasResponse=hasResponse)

is_exploration = is_exploration or fabric["prediction"]["isExploration"]

# For a sample of calls, report the outcomes to the Greenbids Tailor POST endpoint
if fabrics[0]["prediction"]["isExploration"]:
if is_exploration:
# You may use a fire-and-forget mechanism
self.client.post("", json=fabrics)
1 change: 1 addition & 0 deletions python/src/greenbids/tailor/core/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def _setup_logging():
tracer_provider=telemetry.tracer_provider,
meter_provider=telemetry.meter_provider,
excluded_urls=f"{healthz.router.prefix}/.*",
http_capture_headers_server_response=["greenbids-tailor-.*"],
)

app.include_router(root.router)
Expand Down
40 changes: 33 additions & 7 deletions python/src/greenbids/tailor/core/app/routers/root.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,53 @@
from fastapi import APIRouter
from greenbids.tailor.core import fabric
from fastapi import APIRouter, Response
from greenbids.tailor.core import fabric, telemetry
from .. import resources

router = APIRouter(tags=["Main"])

_invokation_count = telemetry.meter.create_counter("greenbids_tailor_invokation_count")
_fabrics_count = telemetry.meter.create_counter("greenbids_tailor_fabrics_count")


@router.put("/")
async def get_buyers_probabilities(
fabrics: list[fabric.Fabric],
) -> list[fabric.Fabric]:
fabrics: dict[str, fabric.Fabric], response: Response
) -> dict[str, fabric.Fabric]:
"""Compute the probability of the buyers to provide a bid.

This must be called for each adcall.
Only the feature map attribute of the fabrics needs to be present.
The prediction attribute will be populated in the returned response.
"""
return resources.get_instance().gb_model.get_buyers_probabilities(fabrics)
inst = resources.get_instance()
results = inst.gb_model.get_buyers_probabilities(fabrics)
is_exploration = False
for k, f in results.items():
is_exploration = is_exploration or f.prediction.is_exploration
_fabrics_count.add(
1,
attributes={
"greenbids.tailor.model_name": inst.gb_model_name,
"greenbids.tailor.fabric_id": k,
"greenbids.tailor.should_send": f.prediction.should_send,
},
)
_invokation_count.add(
1,
{
"greenbids.tailor.model_name": inst.gb_model_name,
"greenbids.tailor.is_exploration": is_exploration,
},
)

response.headers["greenbids-tailor-is-exploration"] = str(is_exploration)
response.headers["greenbids-tailor-model-name"] = inst.gb_model_name
return results


@router.post("/")
async def report_buyers_status(
fabrics: list[fabric.Fabric],
) -> list[fabric.Fabric]:
fabrics: dict[str, fabric.Fabric],
) -> dict[str, fabric.Fabric]:
"""Train model according to actual outcome.

This must NOT be called for each adcall, but only for exploration ones.
Expand Down
29 changes: 18 additions & 11 deletions python/src/greenbids/tailor/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ class Model(ABC):
@abstractmethod
def get_buyers_probabilities(
self,
fabrics: list[fabric.Fabric],
) -> list[fabric.Fabric]:
fabrics: dict[str, fabric.Fabric],
) -> dict[str, fabric.Fabric]:
raise NotImplementedError

@abstractmethod
def report_buyers_status(
self,
fabrics: list[fabric.Fabric],
) -> list[fabric.Fabric]:
fabrics: dict[str, fabric.Fabric],
) -> dict[str, fabric.Fabric]:
raise NotImplementedError

def dump(self, fp: typing.BinaryIO) -> None:
Expand All @@ -44,16 +44,23 @@ def __init__(self):

def get_buyers_probabilities(
self,
fabrics: list[fabric.Fabric],
) -> list[fabric.Fabric]:
prediction = fabric.Prediction(exploration_rate=0.2)
return [f.model_copy(update=dict(prediction=prediction)) for f in fabrics]
fabrics: dict[str, fabric.Fabric],
) -> dict[str, fabric.Fabric]:
prediction = fabric.Prediction(
exploration_rate=float(
os.environ.get("GREENBIDS_TAILOR_MODELS_NONE_EXPLORATION_RATE", 0.2)
)
)
return {
k: f.model_copy(update=dict(prediction=prediction))
for k, f in fabrics.items()
}

def report_buyers_status(
self,
fabrics: list[fabric.Fabric],
) -> list[fabric.Fabric]:
self._logger.debug([f.feature_map.root for f in fabrics[:1]])
fabrics: dict[str, fabric.Fabric],
) -> dict[str, fabric.Fabric]:
self._logger.debug(next(f.feature_map.root for f in fabrics.values()))
return fabrics


Expand Down
7 changes: 5 additions & 2 deletions python/src/greenbids/tailor/core/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@


from .logging import RateLimitingFilter
from greenbids.tailor.core import version
from greenbids.tailor import core

RESOURCE = resources.Resource.create(
{
resources.SERVICE_INSTANCE_ID: str(
os.environ.get("GREENBIDS_TAILOR_API_USER", "Unknown")
),
resources.SERVICE_VERSION: version,
resources.SERVICE_VERSION: core.version,
}
)

Expand All @@ -41,10 +41,13 @@
)
metric_readers.append(PrometheusMetricReader())
meter_provider = metrics.MeterProvider(metric_readers=metric_readers, resource=RESOURCE)
_instrumentation_name = ".".join(core.__name__.split(".")[:-1])
meter = meter_provider.get_meter(_instrumentation_name, core.version)

_OTLP_TRACES_PROCESSOR = BatchSpanProcessor(OTLPSpanExporter())
tracer_provider = trace.TracerProvider(resource=RESOURCE)
tracer_provider.add_span_processor(_OTLP_TRACES_PROCESSOR)
tracer = tracer_provider.get_tracer(_instrumentation_name, core.version)

_OTLP_LOGS_PROCESSOR = BatchLogRecordProcessor(OTLPLogExporter())
logger_provider = logs.LoggerProvider(resource=RESOURCE)
Expand Down