forked from alltheplaces/alltheplaces
-
Notifications
You must be signed in to change notification settings - Fork 0
/
apply_nsi_categories.py
120 lines (93 loc) · 4.34 KB
/
apply_nsi_categories.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
from locations.categories import get_category_tags
from locations.name_suggestion_index import NSI
class ApplyNSICategoriesPipeline:
nsi = NSI()
wikidata_cache = {}
def process_item(self, item, spider):
if item.get("nsi_id"):
return item
code = item.get("brand_wikidata", item.get("operator_wikidata"))
if not code:
return item
if code not in self.wikidata_cache:
# wikidata_cache will usually only hold one thing, but can contain more with more complex spiders
# The key thing is that we don't have to call nsi.iter_nsi on every process_item
self.wikidata_cache[code] = list(self.nsi.iter_nsi(code))
matches = self.wikidata_cache.get(code)
if len(matches) == 0 and item.get("brand_wikidata"):
spider.crawler.stats.inc_value("atp/nsi/brand_missing")
return item
elif len(matches) == 0 and item.get("operator_wikidata"):
spider.crawler.stats.inc_value("atp/nsi/operator_missing")
return item
if len(matches) == 1:
spider.crawler.stats.inc_value("atp/nsi/perfect_match")
return self.apply_tags(matches[0], item)
if cc := item.get("country"):
matches = self.filter_cc(matches, cc.lower(), get_category_tags(item))
if len(matches) == 1:
spider.crawler.stats.inc_value("atp/nsi/cc_match")
return self.apply_tags(matches[0], item)
if categories := get_category_tags(item):
matches = self.filter_categories(matches, categories)
if len(matches) == 1:
spider.crawler.stats.inc_value("atp/nsi/category_match")
return self.apply_tags(matches[0], item)
spider.crawler.stats.inc_value("atp/nsi/match_failed")
return item
def filter_cc(self, matches: list[dict], cc: str, categories: dict = None) -> list[dict]:
"""Filter matches by country code, attempt to find a better match if category is supplied.
:param matches: list of matches from NSI
:param cc: country code in lower case
:param categories: category tags
:return: filtered list of matches
"""
includes = []
globals_matches = []
for match in matches:
if cc in match["locationSet"].get("exclude", []):
continue
include = match["locationSet"].get("include", [])
# Ignore non string such as: {"include":[[-122.835,45.5,2]]}
include = filter(lambda i: isinstance(i, str), include)
# "gb-eng" -> "gb"
include = [i.split("-")[0] for i in include]
if cc in include:
includes.append(match)
if "001" in include: # 001 being global in NSI
globals_matches.append(match)
if categories:
includes = self.filter_categories(includes, categories)
globals_matches = self.filter_categories(globals_matches, categories)
return includes or globals_matches
def filter_categories(self, matches: list[dict], categories: dict) -> list[dict]:
"""Filter matches by category tags. If two category tags are supplied,
both tags have to present on the NSI item for a match to occur.
:param matches: list of matches from NSI
:param tags: category tags
:return: filtered list of matches
"""
results = []
for match in matches:
if get_category_tags(match["tags"]) == categories:
results.append(match)
return results
def apply_tags(self, match, item):
extras = item.get("extras", {})
item["nsi_id"] = match["id"]
# Apply NSI tags to item
for key, value in match["tags"].items():
if key == "brand:wikidata":
key = "brand_wikidata"
elif key == "operator:wikidata":
key = "operator_wikidata"
# Fields defined in Feature are added directly otherwise add them to extras
# Never override anything set by the spider
if key in item.fields:
if item.get(key) is None:
item[key] = value
else:
if extras.get(key) is None:
extras[key] = value
item["extras"] = extras
return item