Skip to content

Commit

Permalink
Support uploading stash/filter from multiple block types
Browse files Browse the repository at this point in the history
  • Loading branch information
KevinMind committed Nov 22, 2024
1 parent 21429e9 commit 80d0bb9
Show file tree
Hide file tree
Showing 9 changed files with 373 additions and 99 deletions.
10 changes: 8 additions & 2 deletions src/olympia/blocklist/cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ def get_last_generation_time():


def get_base_generation_time():
return get_config(MLBF_BASE_ID_CONFIG_KEY, None, json_value=True)
return get_config(
MLBF_BASE_ID_CONFIG_KEY(BlockType.BLOCKED, compat=True), None, json_value=True
)


def get_blocklist_last_modified_time():
Expand Down Expand Up @@ -130,7 +132,11 @@ def _upload_mlbf_to_remote_settings(*, force_base=False):
else:
mlbf.generate_and_write_stash(previous_filter)

upload_filter.delay(generation_time, is_base=make_base_filter)
upload_filter.delay(
generation_time,
filter_list=[BlockType.BLOCKED.name] if make_base_filter else [],
create_stash=not make_base_filter,
)

if base_filter:
cleanup_old_files.delay(base_filter_id=base_filter.created_at)
Expand Down
7 changes: 3 additions & 4 deletions src/olympia/blocklist/mlbf.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,15 +207,14 @@ def hash_filter_inputs(cls, input_list):
for (guid, version) in input_list
]

@property
def filter_path(self):
def filter_path(self, _block_type: BlockType = BlockType.BLOCKED):
return self.storage.path('filter')

@property
def stash_path(self):
return self.storage.path('stash.json')

def generate_and_write_filter(self):
def generate_and_write_filter(self, block_type: BlockType = BlockType.BLOCKED):
stats = {}

bloomfilter = generate_mlbf(
Expand All @@ -225,7 +224,7 @@ def generate_and_write_filter(self):
)

# write bloomfilter
mlbf_path = self.filter_path
mlbf_path = self.filter_path(block_type)
with self.storage.open(mlbf_path, 'wb') as filter_file:
log.info(f'Writing to file {mlbf_path}')
bloomfilter.tofile(filter_file)
Expand Down
125 changes: 104 additions & 21 deletions src/olympia/blocklist/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
import re
from datetime import datetime, timedelta
from typing import List

from django.conf import settings
from django.contrib.admin.models import CHANGE, LogEntry
Expand All @@ -21,10 +22,10 @@
REMOTE_SETTINGS_COLLECTION_MLBF,
)
from olympia.lib.remote_settings import RemoteSettings
from olympia.zadmin.models import set_config
from olympia.zadmin.models import get_config, set_config

from .mlbf import MLBF
from .models import BlocklistSubmission
from .models import BlocklistSubmission, BlockType
from .utils import (
datetime_to_ts,
)
Expand All @@ -35,7 +36,15 @@
bracket_open_regex = re.compile(r'(?<!\\){')
bracket_close_regex = re.compile(r'(?<!\\)}')

BLOCKLIST_RECORD_MLBF_BASE = 'bloomfilter-base'

def BLOCKLIST_RECORD_MLBF_BASE(block_type: BlockType):
match block_type:
case BlockType.SOFT_BLOCKED:
return 'softblocks-bloomfilter-base'
case BlockType.BLOCKED:
return 'bloomfilter-base'
case _:
raise ValueError(f'Unknown block type: {block_type}')


@task
Expand Down Expand Up @@ -88,28 +97,43 @@ def monitor_remote_settings():


@task
def upload_filter(generation_time, is_base=True):
def upload_filter(generation_time, filter_list=None, create_stash=False):
# We cannot send enum values to tasks so we serialize them as strings
# and deserialize them here back to the enum values.
filter_list: List[BlockType] = (
[] if filter_list is None else [BlockType[filter] for filter in filter_list]
)
bucket = settings.REMOTE_SETTINGS_WRITER_BUCKET
server = RemoteSettings(
bucket, REMOTE_SETTINGS_COLLECTION_MLBF, sign_off_needed=False
)
mlbf = MLBF.load_from_storage(generation_time, error_on_missing=True)
is_base = len(filter_list) > 0
# Download old records before uploading new ones
# this ensures we do not delete any records we just uplaoded
old_records = server.records()
attachment_types_to_delete = []

if is_base:
# clear the collection for the base - we want to be the only filter
server.delete_all_records()
statsd.incr('blocklist.tasks.upload_filter.reset_collection')
# Then the bloomfilter
data = {
'key_format': MLBF.KEY_FORMAT,
'generation_time': generation_time,
'attachment_type': BLOCKLIST_RECORD_MLBF_BASE,
}
with mlbf.storage.open(mlbf.filter_path, 'rb') as filter_file:
attachment = ('filter.bin', filter_file, 'application/octet-stream')
server.publish_attachment(data, attachment)
statsd.incr('blocklist.tasks.upload_filter.upload_mlbf')
statsd.incr('blocklist.tasks.upload_filter.upload_mlbf.base')
else:
for block_type in filter_list:
attachment_type = BLOCKLIST_RECORD_MLBF_BASE(block_type)
data = {
'key_format': MLBF.KEY_FORMAT,
'generation_time': generation_time,
'attachment_type': attachment_type,
}
with mlbf.storage.open(mlbf.filter_path(block_type), 'rb') as filter_file:
attachment = ('filter.bin', filter_file, 'application/octet-stream')
server.publish_attachment(data, attachment)
statsd.incr('blocklist.tasks.upload_filter.upload_mlbf')
# After we have succesfully uploaded the new filter
# we can safely delete others of that type
attachment_types_to_delete.append(attachment_type)

statsd.incr('blocklist.tasks.upload_filter.upload_mlbf.base')

# It is possible to upload a stash and a filter in the same task
if create_stash:
with mlbf.storage.open(mlbf.stash_path, 'r') as stash_file:
stash_data = json.load(stash_file)
# If we have a stash, write that
Expand All @@ -121,10 +145,69 @@ def upload_filter(generation_time, is_base=True):
server.publish_record(stash_upload_data)
statsd.incr('blocklist.tasks.upload_filter.upload_stash')

# Commit the changes to remote settings for review.
# only after this can we safely update config timestamps
server.complete_session()
set_config(MLBF_TIME_CONFIG_KEY, generation_time, json_value=True)
if is_base:
set_config(MLBF_BASE_ID_CONFIG_KEY, generation_time, json_value=True)

# Update the base_filter_id for uploaded filters
for block_type in filter_list:
# We currently write to the old singular config key for hard blocks
# to preserve backward compatibility.
# In https://github.com/mozilla/addons/issues/15193
# we can remove this and start writing to the new plural key.
if block_type == BlockType.BLOCKED:
set_config(
MLBF_BASE_ID_CONFIG_KEY(block_type, compat=True),
generation_time,
json_value=True,
)

set_config(
MLBF_BASE_ID_CONFIG_KEY(block_type), generation_time, json_value=True
)

oldest_base_filter_id: int | None = None

# Get the oldest base_filter_id from the set of defined IDs
# We should delete stashes that are older than this time
for block_type in BlockType:
base_filter_id = get_config(
# Currently we read from the old singular config key for
# hard blocks to preserve backward compatibility.
# In https://github.com/mozilla/addons/issues/15193
# we can remove this and start reading from the new plural key.
MLBF_BASE_ID_CONFIG_KEY(block_type, compat=True),
json_value=True,
)
if base_filter_id is not None:
if oldest_base_filter_id is None:
oldest_base_filter_id = base_filter_id
else:
oldest_base_filter_id = min(oldest_base_filter_id, base_filter_id)

for record in old_records:
# Delete attachment records that match the
# attachment types of filters we just uplaoded
# this ensures we only have one filter attachment
# per block_type
if 'attachment' in record:
attachment_type = record['attachment_type']

if attachment_type in attachment_types_to_delete:
server.delete_record(record['id'])

# Delete stash records that are older than the oldest
# pre-existing filter attachment records. These records
# cannot apply to any existing filter since we uploaded
elif 'stash' in record and oldest_base_filter_id is not None:
record_time = record['stash_time']

if record_time < oldest_base_filter_id:
server.delete_record(record['id'])

cleanup_old_files.delay(base_filter_id=oldest_base_filter_id)
statsd.incr('blocklist.tasks.upload_filter.reset_collection')


@task
Expand Down
2 changes: 1 addition & 1 deletion src/olympia/blocklist/tests/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,4 @@ def test_command(self):

call_command('export_blocklist', '1')
mlbf = MLBF.load_from_storage(1)
assert mlbf.storage.exists(mlbf.filter_path)
assert mlbf.storage.exists(mlbf.filter_path())
30 changes: 18 additions & 12 deletions src/olympia/blocklist/tests/test_cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def test_skip_update_unless_force_base(self):

# Check that a filter was created on the second attempt
mlbf = MLBF.load_from_storage(self.current_time)
assert mlbf.storage.exists(mlbf.filter_path)
assert mlbf.storage.exists(mlbf.filter_path())
assert not mlbf.storage.exists(mlbf.stash_path)

def test_skip_update_unless_no_base_mlbf(self):
Expand Down Expand Up @@ -220,11 +220,12 @@ def test_upload_stash_unless_force_base(self):
].call_args_list == [
mock.call(
self.current_time,
is_base=force_base,
filter_list=[BlockType.BLOCKED.name] if force_base else [],
create_stash=not force_base,
)
]
mlbf = MLBF.load_from_storage(self.current_time)
assert mlbf.storage.exists(mlbf.filter_path) == force_base
assert mlbf.storage.exists(mlbf.filter_path()) == force_base
assert mlbf.storage.exists(mlbf.stash_path) != force_base

def test_upload_stash_unless_missing_base_filter(self):
Expand All @@ -238,11 +239,12 @@ def test_upload_stash_unless_missing_base_filter(self):
].call_args_list == [
mock.call(
self.current_time,
is_base=False,
filter_list=[],
create_stash=True,
)
]
mlbf = MLBF.load_from_storage(self.current_time)
assert not mlbf.storage.exists(mlbf.filter_path)
assert not mlbf.storage.exists(mlbf.filter_path())
assert mlbf.storage.exists(mlbf.stash_path)

self.mocks[
Expand All @@ -252,11 +254,12 @@ def test_upload_stash_unless_missing_base_filter(self):
assert (
mock.call(
self.current_time,
is_base=True,
filter_list=[BlockType.BLOCKED.name],
create_stash=False,
)
in self.mocks['olympia.blocklist.cron.upload_filter.delay'].call_args_list
)
assert mlbf.storage.exists(mlbf.filter_path)
assert mlbf.storage.exists(mlbf.filter_path())

@mock.patch('olympia.blocklist.cron.BASE_REPLACE_THRESHOLD', 1)
def test_upload_stash_unless_enough_changes(self):
Expand All @@ -271,11 +274,12 @@ def test_upload_stash_unless_enough_changes(self):
].call_args_list == [
mock.call(
self.current_time,
is_base=False,
filter_list=[],
create_stash=True,
)
]
mlbf = MLBF.load_from_storage(self.current_time)
assert not mlbf.storage.exists(mlbf.filter_path)
assert not mlbf.storage.exists(mlbf.filter_path())
assert mlbf.storage.exists(mlbf.stash_path)

self._block_version(is_signed=True)
Expand All @@ -288,12 +292,13 @@ def test_upload_stash_unless_enough_changes(self):
assert (
mock.call(
self.current_time,
is_base=True,
filter_list=[BlockType.BLOCKED.name],
create_stash=False,
)
in self.mocks['olympia.blocklist.cron.upload_filter.delay'].call_args_list
)
new_mlbf = MLBF.load_from_storage(self.current_time)
assert new_mlbf.storage.exists(new_mlbf.filter_path)
assert new_mlbf.storage.exists(new_mlbf.filter_path())
assert not new_mlbf.storage.exists(new_mlbf.stash_path)

def test_cleanup_old_files(self):
Expand Down Expand Up @@ -374,7 +379,8 @@ def test_invalid_cache_results_in_diff(self):
assert (
mock.call(
self.current_time,
is_base=False,
filter_list=[],
create_stash=True,
)
in self.mocks['olympia.blocklist.cron.upload_filter.delay'].call_args_list
)
Expand Down
Loading

0 comments on commit 80d0bb9

Please sign in to comment.