Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improving the cache and make cache invalidation easier and more robust #987

Merged
merged 52 commits into from
Feb 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
7eaef75
intermediate table lookup
RobinL Jan 21, 2023
ec570aa
estimate u works
RobinL Jan 21, 2023
2b62122
update
RobinL Jan 21, 2023
1a36d19
add cachedictwithlogging
RobinL Jan 22, 2023
0bb7b86
analyse blocking
RobinL Jan 22, 2023
d41c4b7
invalidate cache
RobinL Jan 22, 2023
439cff1
add register table methods
RobinL Jan 24, 2023
b17c225
progress with term frequencies
RobinL Jan 24, 2023
3a1473f
em training etc
RobinL Jan 27, 2023
ae78e7f
Adjust caching for our concat tables
ThomasHepworth Feb 2, 2023
22bb2c8
Add some quick and dirty tests to quickly check caching and materiali…
ThomasHepworth Feb 2, 2023
b2b9663
Add an additional check on `_cache_uuid`
ThomasHepworth Feb 2, 2023
9cdb129
lint with black
ThomasHepworth Feb 2, 2023
6115040
Revert "lint with black"
RobinL Feb 2, 2023
3be11ac
Merge pull request #1013 from moj-analytical-services/caching_adjustm…
RobinL Feb 2, 2023
108747f
fix merge conflicts
RobinL Feb 2, 2023
3f91ed4
lint with black
RobinL Feb 2, 2023
4a59025
Merge branch 'master' into 961-attempt-2
RobinL Feb 3, 2023
bab2a17
ensure splinkdataframes are copied when returned from cache
RobinL Feb 3, 2023
a3af262
Fix correctness of convergence test
RobinL Feb 3, 2023
bb518aa
revert lock to pyspark 3.2.1 so tests pass
RobinL Feb 4, 2023
cd1de8a
Refector _initialise_df_concat to return tf table if exists
RobinL Feb 4, 2023
5a90107
_initialise_df_concat optionally returns list
RobinL Feb 4, 2023
94faf31
Merge pull request #1023 from moj-analytical-services/_initialise_df_…
RobinL Feb 4, 2023
091bb3a
Register table accepts string tablename, enable labels registration
RobinL Feb 4, 2023
897db3c
fix linting errors in connected components
RobinL Feb 4, 2023
954276c
Move left and right table logic into blocking
RobinL Feb 4, 2023
2df6f0a
refactor tf tables to work with compare two records
RobinL Feb 4, 2023
04b6345
fix bug in register_table_predict
RobinL Feb 4, 2023
221b749
ensure real time linking tables have uuids
RobinL Feb 4, 2023
7a5ebf8
df concat and df concat with tf return SplinkDataframe or None
RobinL Feb 7, 2023
1efc9a4
tests now pass
RobinL Feb 7, 2023
7c96cd0
add docstring
RobinL Feb 7, 2023
7c39564
Merge pull request #1033 from moj-analytical-services/simplify__initi…
RobinL Feb 7, 2023
d495124
add another (currently failing) chart call to test conditional branch
ADBond Feb 13, 2023
5755965
correct wrong variable use
ADBond Feb 13, 2023
0c03f10
let missingness function use inbuilt caching code rather than re-impl…
ADBond Feb 13, 2023
66db0fc
test some linker methods do not make db calls when cache options avai…
ADBond Feb 13, 2023
89d2f10
simple tests of invalidate_cache
ADBond Feb 14, 2023
ac2acd5
basic test of caching with two linkers in play
ADBond Feb 15, 2023
71a295e
adjust cache so that it only adds when type check passes, and test of…
ADBond Feb 15, 2023
952134d
lint with black
ADBond Feb 15, 2023
dd4c629
test register tf tables cache functions
ADBond Feb 15, 2023
315ed29
Merge branch 'cache-tests' of https://github.com/moj-analytical-servi…
ADBond Feb 15, 2023
695f599
lint with black
ADBond Feb 15, 2023
c967093
replace uuid with uid
RobinL Feb 16, 2023
93e7220
make uid more robust to no settings obj
RobinL Feb 16, 2023
183ee80
Merge pull request #1050 from moj-analytical-services/cache-tests
RobinL Feb 16, 2023
0677585
Merge branch '961-attempt-2' of github.com:moj-analytical-services/sp…
RobinL Feb 16, 2023
72f3a44
create uid if not provided
RobinL Feb 16, 2023
ca70230
fix convrgence test
RobinL Feb 16, 2023
7ebfa65
remove superflous argument from docstring
RobinL Feb 16, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 9 additions & 2 deletions splink/accuracy.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,9 @@ def truth_space_table_from_labels_table(
linker, labels_tablename, threshold_actual=0.5, match_weight_round_to_nearest=None
):

# Read from the cache or generate
concat_with_tf = linker._initialise_df_concat_with_tf()

sqls = predictions_from_sample_of_pairwise_labels_sql(linker, labels_tablename)

for sql in sqls:
Expand All @@ -166,7 +169,7 @@ def truth_space_table_from_labels_table(
for sql in sqls:
linker._enqueue_sql(sql["sql"], sql["output_table_name"])

df_truth_space_table = linker._execute_sql_pipeline()
df_truth_space_table = linker._execute_sql_pipeline([concat_with_tf])

return df_truth_space_table

Expand Down Expand Up @@ -252,6 +255,10 @@ def prediction_errors_from_labels_table(
include_false_negatives=True,
threshold=0.5,
):

# Read from the cache or generate
nodes_with_tf = linker._initialise_df_concat_with_tf()

sqls = predictions_from_sample_of_pairwise_labels_sql(linker, labels_tablename)

for sql in sqls:
Expand Down Expand Up @@ -290,7 +297,7 @@ def prediction_errors_from_labels_table(

linker._enqueue_sql(sql, "__splink__labels_with_fp_fn_status")

return linker._execute_sql_pipeline()
return linker._execute_sql_pipeline([nodes_with_tf])


def _predict_from_label_column_sql(linker, label_colname):
Expand Down
30 changes: 18 additions & 12 deletions splink/analyse_blocking.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from .blocking import _sql_gen_where_condition, block_using_rules_sql

from .misc import calculate_cartesian, calculate_reduction_ratio
from .vertically_concatenate import vertically_concatenate_sql

# https://stackoverflow.com/questions/39740632/python-type-hinting-without-cyclic-imports
if TYPE_CHECKING:
Expand Down Expand Up @@ -43,19 +42,28 @@ def cumulative_comparisons_generated_by_blocking_rules(
output_chart=True,
):

# Deepcopy our original linker so we can safely adjust our settings.
# This is particularly important to ensure we don't overwrite our
# original blocking rules.
linker = deepcopy(linker)

settings_obj = linker._settings_obj
linker._settings_obj_ = settings_obj

# Deepcopy our original linker so we can safely adjust our settings.
# This is particularly important to ensure we don't overwrite our
# original blocking rules.
linker._analyse_blocking_mode = True

if blocking_rules:
brs_as_objs = settings_obj._brs_as_objs(blocking_rules)
linker._settings_obj_._blocking_rules_to_generate_predictions = brs_as_objs

# Turn tf off. No need to apply term frequencies to perform these calcs
settings_obj._retain_matching_columns = False
settings_obj._retain_intermediate_calculation_columns = False
for cc in settings_obj.comparisons:
for cl in cc.comparison_levels:
cl._level_dict["tf_adjustment_column"] = None

concat = linker._initialise_df_concat(materialise=True)

# Calculate the Cartesian Product
if output_chart:
# We only need the cartesian product if we want to output the chart view
Expand All @@ -64,23 +72,19 @@ def cumulative_comparisons_generated_by_blocking_rules(
else:
group_by_statement = "group by source_dataset"

sql = vertically_concatenate_sql(linker)
linker._enqueue_sql(sql, "__splink__df_concat")

sql = f"""
select count(*) as count
from __splink__df_concat
from {concat.physical_name}
{group_by_statement}
"""
linker._enqueue_sql(sql, "__splink__cartesian_product")
cartesian_count = linker._execute_sql_pipeline()
cartesian_count = linker._execute_sql_pipeline([concat])
row_count_df = cartesian_count.as_record_dict()
cartesian_count.drop_table_from_database()

cartesian = calculate_cartesian(row_count_df, settings_obj._link_type)

# Calculate the total number of rows generated by each blocking rule
linker._initialise_df_concat_with_tf(materialise=False)
sql = block_using_rules_sql(linker)
linker._enqueue_sql(sql, "__splink__df_blocked_data")

Expand All @@ -95,7 +99,7 @@ def cumulative_comparisons_generated_by_blocking_rules(
order by cast(match_key as int) asc
"""
linker._enqueue_sql(sql, "__splink__df_count_cumulative_blocks")
cumulative_blocking_rule_count = linker._execute_sql_pipeline()
cumulative_blocking_rule_count = linker._execute_sql_pipeline([concat])
br_n = cumulative_blocking_rule_count.as_pandas_dataframe()
cumulative_blocking_rule_count.drop_table_from_database()
br_count, br_keys = list(br_n.row_count), list(br_n["match_key"].astype("int"))
Expand Down Expand Up @@ -136,4 +140,6 @@ def cumulative_comparisons_generated_by_blocking_rules(

br_comparisons.append(out_dict.copy())

linker._analyse_blocking_mode = False

return br_comparisons
4 changes: 4 additions & 0 deletions splink/athena/athena_linker.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,10 @@ def _execute_sql_against_backend(self, sql, templated_name, physical_name):

def register_table(self, input, table_name, overwrite=False):

# If the user has provided a table name, return it as a SplinkDataframe
if isinstance(input, str):
return self._table_to_splink_dataframe(table_name, input)

# Check if table name is already in use
exists = self._table_exists_in_database(table_name)
if exists:
Expand Down
25 changes: 25 additions & 0 deletions splink/blocking.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,31 @@ def block_using_rules_sql(linker: Linker):
" will not be implemented for this run."
)

if (
linker._two_dataset_link_only
and not linker._find_new_matches_mode
and not linker._compare_two_records_mode
):
source_dataset_col = linker._settings_obj._source_dataset_column_name
# Need df_l to be the one with the lowest id to preeserve the property
# that the left dataset is the one with the lowest concatenated id
keys = linker._input_tables_dict.keys()
keys = list(sorted(keys))
df_l = linker._input_tables_dict[keys[0]]
df_r = linker._input_tables_dict[keys[1]]

sql = f"""
select * from __splink__df_concat_with_tf
where {source_dataset_col} = '{df_l.templated_name}'
"""
linker._enqueue_sql(sql, "__splink__df_concat_with_tf_left")

sql = f"""
select * from __splink__df_concat_with_tf
where {source_dataset_col} = '{df_r.templated_name}'
"""
linker._enqueue_sql(sql, "__splink_df_concat_with_tf_right")

# Cover the case where there are no blocking rules
# This is a bit of a hack where if you do a self-join on 'true'
# you create a cartesian product, rather than having separate code
Expand Down
Loading