Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple Aggregates #254

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 94 additions & 0 deletions pipeline_dp/private_beam.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import collections
import dataclasses
import typing
from apache_beam.transforms import ptransform
Expand Down Expand Up @@ -481,3 +482,96 @@ def expand(self, pcol: pvalue.PCollection):
# dp_result : (partition_key, result)

return dp_result


# Cache for namedtuple types. It is should be used only in
# '_get_or_create_named_tuple()' function.
_agg_named_tuple_cache = {}


def _get_or_create_named_tuple(type_name: str,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, this is a correct approach to generate dynamic tuples!

field_names: tuple) -> 'MetricsTuple':
"""Creates namedtuple type with a custom serializer."""

# The custom serializer is required for supporting serialization of
# namedtuples in Apache Beam.
cache_key = (type_name, field_names)
named_tuple = _agg_named_tuple_cache.get(cache_key)
if named_tuple is None:
named_tuple = collections.namedtuple(type_name, field_names)
named_tuple.__reduce__ = lambda self: (_create_named_tuple_instance,
(type_name, field_names,
tuple(self)))
_agg_named_tuple_cache[cache_key] = named_tuple
return named_tuple


def _create_named_tuple_instance(type_name: str, field_names: tuple, values):
return _get_or_create_named_tuple(type_name, field_names)(*values)


class Aggregate(PrivatePTransform):
"""Transform class for performing multiple aggregations on a PrivatePCollection."""

def __init__(self, label=None):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've though in more details, in most use cases aggregations will share the same parameters (and also sharing the same parameters will help to optimize performance and utility of queries). Could you please

  1. add argument params of type AggregateParams.

2.add argument partition_extractor_fn

Those arguments will be used in each aggregation

super().__init__(return_anonymized=True, label=label)

def aggregate_value(self, *args, col_name: str,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since all shared parameters will be provided in the constructor, there are just a few parameters that's needed value_extractor

agg_type: pipeline_dp.Metrics):
"""Returns _Aggregate transform corresponding to the agg_type

Args:
args: args for Aggregate Transforms like SumParams.)
col_name: name of the column for the resulting aggregate value.
agg_type: type of pipeline_dp.Metrics identifying the aggregate
to calculate."""
return _Aggregate([args], col_name=[col_name], agg_type=[agg_type])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks that we can have only one class Aggregate, w/o _Aggregate namely

  1. aggregate_value returns self. aggregate_value saves in some member variable information about aggregations.
  2. expand works as in _Aggregate

The advantage is that it will be simpler and no need to create multiple instances of _Aggregate. WDYT?



class _Aggregate(PrivatePTransform):

def __init__(self,
*args,
col_name: str,
agg_type: pipeline_dp.Metrics,
label: Optional[str] = None):
super().__init__(return_anonymized=True, label=label)
self.args = args
self.col_name = col_name
self.agg_type = agg_type

def aggregate_value(self, *args, col_name: str,
agg_type: pipeline_dp.Metrics):
return _Aggregate(list(*self.args) + [args],
col_name=list(self.col_name) + [col_name],
agg_type=list(self.agg_type) + [agg_type])

def expand(self, pcol: pvalue.PCollection):
columns = {
self.col_name[i]: pcol | "agg " + str(i) >> self._getTransform(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the right idea to add numbers to solve problem with duplicating labels!

Nit: f"Aggregation{i}"

Comment about adding numbers to label names: In BeamBackend such function was implemented with UniqueLabelGenerator class. But here it's simple enough, so I think the current approach to add numbers instead of using UniqueLabelGenerator makes sense.

self.agg_type[i], *self.args[0][i])
for i in range(len(self.col_name))
}
return columns | 'LeftJoiner: Combine' >> beam.CoGroupByKey(
) | beam.Map(lambda x: _create_named_tuple_instance(
'AggregatesTuple', tuple(["pid"] + [k for k in x[1]]),
tuple([x[0]] + [x[1][k][0] for k in x[1]])))

def _getTransform(self, agg_type: pipeline_dp.Metrics, *args):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest to use DPEngine.aggregate instead of PrivatePTransforms Mean, Sum, Count. The benefits are that in future we can optimize performance/utility by computing multiple aggregations with DPEngine.aggregate

"""Gets the correct transform corresponding to agg_type."""
transform = None
if agg_type == pipeline_dp.Metrics.MEAN:
transform = Mean(*args)
elif agg_type == pipeline_dp.Metrics.SUM:
transform = Sum(*args)
elif agg_type == pipeline_dp.Metrics.COUNT:
transform = Count(*args)
elif agg_type == pipeline_dp.Metrics.PRIVACY_ID_COUNT:
transform = PrivacyIdCount(*args)
else:
raise NotImplementedError(
"Transform for agg_type: %s is not "
"implemented.", agg_type)
transform.set_additional_parameters(
budget_accountant=self._budget_accountant)
return transform
68 changes: 68 additions & 0 deletions tests/private_beam_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import unittest
import apache_beam as beam
from apache_beam.runners.portability import fn_api_runner
Expand Down Expand Up @@ -41,6 +42,14 @@ def value_per_key_within_tolerance(expected, actual, tolerance):
return actual[0] == expected[0] and abs(actual[1] -
expected[1]) <= tolerance

@staticmethod
def value_per_key_within_tolerance_named_tuple(expected, actual, tolerance):
expected_dict = expected._asdict()
actual_dict = actual._asdict()
return all([(actual_dict[k] == expected_dict[k]) or
(abs(actual_dict[k] - expected_dict[k]) <= tolerance)
for k in actual_dict])

def test_make_private_transform_succeeds(self):
runner = fn_api_runner.FnApiRunner()
with beam.Pipeline(runner=runner) as pipeline:
Expand Down Expand Up @@ -769,6 +778,65 @@ def test_combine_per_returns_sensible_result(self):
equals_fn=lambda e, a: PrivateBeamTest.
value_per_key_within_tolerance(e, a, 10.0)))

def test_multiple_aggregates(self):
with TestPipeline() as pipeline:
# Arrange
col = [(u, "pk1", 100) for u in range(30)]
col += [(f"{u + 20}", "pk2", 100) for u in range(30)]
col += [(f"{u + 30}", "pk1", -100.0) for u in range(30)]
pcol = pipeline | 'Create produce' >> beam.Create(col)
# Use very high epsilon and delta to minimize noise and test
# flakiness.
budget_accountant = budget_accounting.NaiveBudgetAccountant(
total_epsilon=800, total_delta=0.999)
private_collection = (
pcol | 'Create private collection' >> private_beam.MakePrivate(
budget_accountant=budget_accountant,
privacy_id_extractor=lambda x: x[0]))

privacy_id_count_params = aggregate_params.PrivacyIdCountParams(
noise_kind=pipeline_dp.NoiseKind.GAUSSIAN,
max_partitions_contributed=2,
budget_weight=1,
partition_extractor=lambda x: x[1])
sum_params = aggregate_params.SumParams(
noise_kind=pipeline_dp.NoiseKind.GAUSSIAN,
max_partitions_contributed=2,
max_contributions_per_partition=3,
min_value=1.55,
max_value=2.7889,
budget_weight=1,
partition_extractor=lambda x: x[1],
value_extractor=lambda x: x[2])

# Act
result = private_collection | private_beam.Aggregate(
).aggregate_value(
privacy_id_count_params,
col_name='privacy_id_count',
agg_type=pipeline_dp.Metrics.PRIVACY_ID_COUNT).aggregate_value(
sum_params,
col_name='sum',
agg_type=pipeline_dp.Metrics.SUM)
budget_accountant.compute_budgets()

# Assert
# This is a health check to validate that the result is sensible.
# Hence, we use a very large tolerance to reduce test flakiness.
beam_util.assert_that(
result,
beam_util.equal_to(
[
collections.namedtuple(
"AggregatesTuple",
['pid', 'privacy_id_count', 'sum'])('pk1', 60, 130),
collections.namedtuple(
"AggregatesTuple",
['pid', 'privacy_id_count', 'sum'])('pk2', 30, 83)
],
equals_fn=lambda e, a: PrivateBeamTest.
value_per_key_within_tolerance_named_tuple(e, a, 10)))


class SumCombineFn(private_beam.PrivateCombineFn):
"""Test-only, not private combine_fn."""
Expand Down