-
Notifications
You must be signed in to change notification settings - Fork 148
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Block to IDs #2231
Block to IDs #2231
Conversation
Exploding exampelimport pandas as pd
import splink.comparison_library as cl
from splink import DuckDBAPI, Linker, SettingsCreator
data_1 = [
{"unique_id": 1, "first_name": "John", "surname": "Doe", "postcode": ["A", "B"]},
{"unique_id": 2, "first_name": "John", "surname": "Doe", "postcode": ["B"]},
]
data_2 = [
{"unique_id": 3, "first_name": "John", "surname": "Smith", "postcode": ["A"]},
{"unique_id": 4, "first_name": "John", "surname": "Smith", "postcode": ["Z"]},
]
df_1 = pd.DataFrame(data_1)
df_2 = pd.DataFrame(data_2)
settings = SettingsCreator(
link_type="link_only",
blocking_rules_to_generate_predictions=[
{
"blocking_rule": "l.postcode = r.postcode and l.first_name = r.first_name",
"arrays_to_explode": ["postcode"],
},
"l.surname = r.surname",
],
comparisons=[
cl.ExactMatch("first_name"),
cl.ExactMatch("surname"),
cl.ExactMatch("postcode"),
],
retain_intermediate_calculation_columns=True,
)
db_api = DuckDBAPI()
linker = Linker(
[df_1, df_2], settings, database_api=db_api, input_table_aliases=["l", "r"]
)
linker._debug_mode = True
linker.inference.predict().as_pandas_dataframe()
# concat_with_tf = linker._initialise_df_concat_with_tf()
# sql = block_using_rules_sql(linker)
# linker._enqueue_sql(sql, "__splink__df_blocked")
# sql_gen = linker._pipeline._generate_pipeline([concat_with_tf])
# print(sql_gen)
# linker.predict().as_pandas_dataframe() 50k exampleimport logging
import time
import splink.comparison_library as cl
from splink import DuckDBAPI, Linker, SettingsCreator, block_on, splink_datasets
from splink.blocking_analysis import (
cumulative_comparisons_to_be_scored_from_blocking_rules_chart,
)
db_api = DuckDBAPI()
df = splink_datasets.historical_50k
df = df.head(2000)
blocking_rules = [
block_on("substr(first_name,1,2)", "substr(surname,1,1)", salting_partitions=4),
block_on("surname", "dob"),
block_on("first_name", "dob"),
block_on("postcode_fake", "first_name"),
block_on("postcode_fake", "surname"),
block_on("dob", "birth_place"),
block_on("substr(postcode_fake,1,3)", "dob"),
block_on("substr(postcode_fake,1,3)", "first_name"),
block_on("substr(postcode_fake,1,3)", "surname"),
block_on("substr(first_name,1,2)", "substr(surname,1,2)", "substr(dob,1,4)"),
]
settings = SettingsCreator(
link_type="dedupe_only",
blocking_rules_to_generate_predictions=blocking_rules,
comparisons=[
cl.ForenameSurnameComparison(
"first_name",
"surname",
forename_surname_concat_col_name="first_name_surname_concat",
),
cl.DateOfBirthComparison(
"dob", input_is_string=True, separate_1st_january=True
),
cl.PostcodeComparison("postcode_fake"),
cl.ExactMatch("birth_place").configure(term_frequency_adjustments=True),
cl.ExactMatch("occupation").configure(term_frequency_adjustments=True),
],
retain_intermediate_calculation_columns=True,
)
df["first_name_surname_concat"] = df["first_name"] + " " + df["surname"]
linker = Linker(df, settings, database_api=db_api, input_table_aliases=["l", "r"])
logging.basicConfig(format="%(message)s")
logging.getLogger("splink").setLevel(20)
linker.training.estimate_probability_two_random_records_match(
[block_on("first_name", "surname")],
recall=0.7,
)
linker.training.estimate_u_using_random_sampling(1e5, seed=2)
linker.training.estimate_parameters_using_expectation_maximisation(
blocking_rule=block_on("first_name", "surname"),
estimate_without_term_frequencies=True,
fix_u_probabilities=False,
) |
Splink 3 test scriptimport logging
import time
import duckdb
import pandas as pd
import splink.duckdb.comparison_library as cl
from splink.datasets import splink_datasets
from splink.duckdb.blocking_rule_library import block_on
from splink.duckdb.linker import DuckDBLinker
df = splink_datasets.historical_50k
start_time = time.time()
blocking_rules = [
block_on(["substr(first_name,1,2)", "substr(surname,1,1)"]),
block_on(["substr(first_name,1,1)", "substr(surname,1,2)"]),
block_on(["surname", "dob"]),
block_on(["first_name", "dob"]),
block_on(["postcode_fake", "first_name"]),
block_on(["postcode_fake", "surname"]),
block_on(["dob", "birth_place"]),
block_on(["substr(postcode_fake,1,3)", "dob"]),
block_on(["substr(postcode_fake,1,3)", "first_name"]),
block_on(["substr(postcode_fake,1,3)", "surname"]),
block_on(["substr(first_name,1,2)", "substr(surname,1,2)", "substr(dob,1,4)"]),
]
settings = {
"link_type": "dedupe_only",
"blocking_rules_to_generate_predictions": blocking_rules,
"comparisons": [
cl.jaro_winkler_at_thresholds("first_name", term_frequency_adjustments=True),
cl.jaro_winkler_at_thresholds("surname", term_frequency_adjustments=True),
cl.levenshtein_at_thresholds("dob"),
cl.levenshtein_at_thresholds("postcode_fake"),
cl.exact_match("birth_place", term_frequency_adjustments=True),
cl.exact_match("occupation", term_frequency_adjustments=True),
],
"retain_intermediate_calculation_columns": True,
}
linker = DuckDBLinker(df, settings, input_table_aliases=["l", "r"])
logging.basicConfig(format="%(message)s")
logging.getLogger("splink").setLevel(20)
linker.estimate_probability_two_random_records_match(
[block_on(["first_name", "surname"])],
recall=0.7,
)
linker.estimate_u_using_random_sampling(1e6, seed=2)
linker.estimate_parameters_using_expectation_maximisation(
blocking_rule=block_on(["first_name", "surname"]),
estimate_without_term_frequencies=True,
)
elapsed_time = time.time() - start_time
print(f"Time taken to estimate parameters: {elapsed_time} seconds")
start_time = time.time()
pred = linker.predict(threshold_match_weight=-2)
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Time taken to predict: {elapsed_time} seconds") Splink 4 eqiuvalentimport logging
import time
import splink.comparison_library as cl
from splink import DuckDBAPI, Linker, SettingsCreator, block_on, splink_datasets
from splink.blocking_analysis import (
cumulative_comparisons_to_be_scored_from_blocking_rules_chart,
)
db_api = DuckDBAPI()
df = splink_datasets.historical_50k
start_time = time.time()
blocking_rules = [
block_on("substr(first_name,1,2)", "substr(surname,1,1)"),
block_on("substr(first_name,1,1)", "substr(surname,1,2)"),
block_on("surname", "dob"),
block_on("first_name", "dob"),
block_on("postcode_fake", "first_name"),
block_on("postcode_fake", "surname"),
block_on("dob", "birth_place"),
block_on("substr(postcode_fake,1,3)", "dob"),
block_on("substr(postcode_fake,1,3)", "first_name"),
block_on("substr(postcode_fake,1,3)", "surname"),
block_on("substr(first_name,1,2)", "substr(surname,1,2)", "substr(dob,1,4)"),
]
settings = SettingsCreator(
link_type="dedupe_only",
blocking_rules_to_generate_predictions=blocking_rules,
comparisons=[
cl.JaroWinklerAtThresholds("first_name").configure(
term_frequency_adjustments=True
),
cl.JaroWinklerAtThresholds("surname").configure(
term_frequency_adjustments=True
),
cl.LevenshteinAtThresholds("dob"),
cl.LevenshteinAtThresholds("postcode_fake"),
cl.ExactMatch("birth_place").configure(term_frequency_adjustments=True),
cl.ExactMatch("occupation").configure(term_frequency_adjustments=True),
],
retain_intermediate_calculation_columns=True,
)
linker = Linker(df, settings, database_api=db_api, input_table_aliases=["l", "r"])
linker.training.estimate_probability_two_random_records_match(
[block_on("first_name", "surname")],
recall=0.7,
)
linker.training.estimate_u_using_random_sampling(1e6, seed=2)
linker.training.estimate_parameters_using_expectation_maximisation(
blocking_rule=block_on("first_name", "surname"),
estimate_without_term_frequencies=True,
)
elapsed_time = time.time() - start_time
print(f"Time taken to estimate parameters: {elapsed_time} seconds")
pred = linker.inference.predict(threshold_match_weight=-2)
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Time taken to predict: {elapsed_time} seconds") Timings here are 50 seconds (Splink 3) vs 15.9 seconds (Splink4) Although if you set salting to, say a value of 4 in the first script, then it comes down to about 20.6 seconds vs 15.4 seconds |
Spark example%load_ext autoreload
%autoreload 2
import logging
import time
from pyspark.context import SparkConf, SparkContext
from pyspark.sql import SparkSession
import splink.comparison_library as cl
from splink import Linker, SettingsCreator, SparkAPI, block_on, splink_datasets
from splink.backends.spark import similarity_jar_location
path = similarity_jar_location()
df_pandas = splink_datasets.historical_50k
# df_pandas.iteritems = df_pandas.items
conf = SparkConf()
conf.set("spark.jars", path)
conf.set("spark.driver.memory", "12g")
conf.set("spark.sql.shuffle.partitions", "12")
conf.set("spark.default.parallelism", "12")
sc = SparkContext.getOrCreate(conf=conf)
sc.setCheckpointDir("tmp_checkpoints/")
spark = SparkSession(sc)
print(spark)
display(spark)
df = spark.createDataFrame(df_pandas)
db_api = SparkAPI(
spark_session=spark,
break_lineage_method="parquet",
num_partitions_on_repartition=6,
)
blocking_rules = [
block_on("substr(first_name,1,2)", "substr(surname,1,1)"),
block_on("surname", "dob"),
block_on("first_name", "dob"),
block_on("postcode_fake", "first_name"),
block_on("postcode_fake", "surname"),
block_on("dob", "birth_place"),
block_on("substr(postcode_fake,1,3)", "dob"),
block_on("substr(postcode_fake,1,3)", "first_name"),
block_on("substr(postcode_fake,1,3)", "surname"),
block_on("substr(first_name,1,2)", "substr(surname,1,2)", "substr(dob,1,4)"),
]
settings = SettingsCreator(
link_type="dedupe_only",
blocking_rules_to_generate_predictions=blocking_rules,
comparisons=[
cl.JaroWinklerAtThresholds("first_name").configure(
term_frequency_adjustments=True
),
cl.JaroWinklerAtThresholds("surname").configure(
term_frequency_adjustments=True
),
cl.LevenshteinAtThresholds("dob"),
cl.PostcodeComparison("postcode_fake"),
cl.ExactMatch("birth_place").configure(term_frequency_adjustments=True),
cl.ExactMatch("occupation").configure(term_frequency_adjustments=True),
],
retain_intermediate_calculation_columns=True,
)
linker = Linker(df, settings, db_api)
import logging
logging.basicConfig(format="%(message)s")
logging.getLogger("splink").setLevel(10)
start = time.time()
df = linker.inference.predict(threshold_match_weight=2)
end_time = time.time()
elapsed_time = end_time - start
print(f"Elapsed time: {elapsed_time:.2f} seconds")
# 49 seconcs
# 106- on splink4_dev
|
The latter is very closely related to cascading 😃 |
Initial tests suggest this approach makes Splink maybe 25% faster to 3x faster, but, more importantly probably also more scalable and requiring fewer 'tweaks' like salting and adjusting spark parameters.
Main benefits of this approach:
predict()
in chunks. Would be straightforward to load inx%
of the blocked pairs, and run predictions on only those onesThis has three benefits:
It appears to be faster. The speedup is variable - for some jobs there's no speedup, for others it goes 3x faster
It appears to offer scalability benefits. In particular, it seems to reduce the incidence of 'straggler tasks' (most CPUs finishing their work, but one or two still going and holding everything else up). I have a feeling it may also help with reducing spill to disk/memory usage
Since the blocking phase is distinct from prediction, it makes things easier to debug, because you can find out whether your job failed on the blocking phase or on the prediction phase.
Main changes:
BlockingRule.create_blocked_paris
SQL now outputs pairs (match_key
,join_key_l
,join_key_r
), but does not select the output columnscompute_comparison_vector_values_sqls
therefore has to select the output columnscompute_comparison_vector_values_sqls
needs a first CTE step to procue columnsmy_col_l
my_col_r
etc, because otherwise comparisons would have to refer tor.my_col
,l.my_col
etc.TODO:
set_match_probability_to_one
arg tocompute_comparison_vector_values_sqls