Import/Catalogue/DAE Emilia-Romagna/translation.py
Jump to navigation
Jump to search
import json
import re
from os import path
from urllib import parse, request
import ogr2osm
from osgeo import ogr
DEBUG=False
KEEP_ONLY_BAD_FEATURES=False
ALWAYS_MERGE_NOME_IN_LOCATION=False
MERGE_DUPLICATE_LOCATIONS=True
NOMINATIM_ENDPOINT="http://localhost:8080"
GEOCODE_CACHE_FILE="./geocode_cache.json"
MAX_DISTANCE_FROM_GEOCODING=0.004
NAME_WHITELIST = { # Features verified manually as good
"PMS CREAZIONE CERAMICHE",
"Ufficio Turistico",
"FRAZ. SAN LORENZO DI CASTELL'ARQUATO",
"MC DONALDS -",
"CHIESA DI SAN FRANCESCO D'ASSISI _ LOC BOSCONE",
"CONFARTIGIANATO DI CESENA",
"Chiesa di Vitignano",
"TENSIOSTRUTTURA IMPIANTI SPORTIVI",
"UFFICIO INFORMAZIONE E ACCOGLIENZA TURISTICA",
"FONDAZIONE SAN GIUSEPPE CFP C.E.S.T.A.",
"PIAZZA COMUNE DI LUZZARA",
"PALESTRA COMUNALE PLESSO SCOLASTICO \"C. COLLODI\"",
"Parcheggio Ufficio postale",
"NUOVA FARMACIA BAISO",
"Pizzeria Il Carrobbio",
}
NAME_BLACKLIST = { # Features verified manually as bad
"Farmacia Cuoghi",
}
CITY_BLACKLIST = {
"repubblica di san marino",
"castello citta di san marino",
"castello serravalle",
"castello borgo maggiore",
"castello acquaviva",
"castello faetano",
}
class EmiliaRomagnaDaeTranslation(ogr2osm.TranslationBase):
geocode_cache:dict = {}
def __init__(self):
super()
if GEOCODE_CACHE_FILE and path.isfile(GEOCODE_CACHE_FILE):
print("Reading geocode cache")
with open(GEOCODE_CACHE_FILE, "r") as myfile:
self.geocode_cache = json.loads(myfile.read())
def __del__(self):
if GEOCODE_CACHE_FILE:
print("Saving geocode cache")
with open(GEOCODE_CACHE_FILE, "w") as myfile:
myfile.write(json.dumps(self.geocode_cache))
def merge_tags(self, geometry_type, tags_existing_geometry:dict, tags_new_geometry:dict):
if DEBUG:
for key,value in tags_new_geometry.items():
tags_existing_geometry[f"duplicate_{key}"] = [value]
ex_operator = tags_existing_geometry["operator"][0] if "operator" in tags_existing_geometry else None
new_operator = tags_new_geometry["operator"] if "operator" in tags_new_geometry else None
ex_location = tags_existing_geometry["defibrillator:location"][0] if "defibrillator:location" in tags_existing_geometry else None
new_location = tags_new_geometry["defibrillator:location"] if "defibrillator:location" in tags_new_geometry else None
if MERGE_DUPLICATE_LOCATIONS and ex_operator and new_operator and ex_operator.lower() == new_operator.lower():
print(f"Merging duplicate location for operator '{ex_operator}'")
tags_existing_geometry["defibrillator:location"].append(new_location)
if "addr:street" not in tags_existing_geometry and "addr:street" in tags_new_geometry:
tags_existing_geometry["addr:street"] = tags_new_geometry["addr:street"]
if "addr:housenumber" not in tags_existing_geometry and "addr:housenumber" in tags_new_geometry:
tags_existing_geometry["addr:housenumber"] = tags_new_geometry["addr:housenumber"]
else:
print("Ignoring duplicate geometry")
print(f" operator: '{ex_operator}' VS '{new_operator}'")
print(f" location: '{ex_location}' VS '{new_location}'")
return tags_existing_geometry
def get_street_and_number(self, attrs:dict):
if "indirizzo" not in attrs or not attrs["indirizzo"]:
return None,None
indirizzo = str(attrs["indirizzo"]).lower().replace("snc","").replace("n.d","").replace("n:d","").replace("xxx","").strip(" -")
if not indirizzo or 'sbagliat' in indirizzo or 'sconosciut' in indirizzo:
return None,None
street_match = re.findall(r'^[^-(_\d]+', indirizzo)
street = str(street_match[0]).replace("p.zza","piazza").strip().title() if street_match and street_match[0] else None
number_match = re.findall(r'\d+\/?[a-z]*$', indirizzo)
multiple_number_match = re.findall(r'\d+\/\w+$', indirizzo)
if multiple_number_match and multiple_number_match[0]:
number = str(multiple_number_match[0])
elif number_match and number_match[0]:
number = str(number_match[0]).replace("/","")
else:
number = None
#print("housenumber:", number, number_match, multiple_number_match)
return street,number
def find_latlon(self, name:str|None, address:str|None, city:str|None) -> list:
cache_key=f"{name or ""}, {address or ""}, {city or ""}"
if cache_key in self.geocode_cache:
return self.geocode_cache[cache_key] or []
else:
print("Searching", cache_key)
# https://nominatim.org/release-docs/develop/api/Search/#structured-query
amenity_param = ("&amenity=" + parse.quote(name)) if name else ""
street_param = ("&street=" + parse.quote(address)) if address else ""
city_param = ("&city=" + parse.quote(city)) if city else ""
req=f"{NOMINATIM_ENDPOINT}/search?limit=5{amenity_param}{street_param}{city_param}"
json_resp=request.urlopen(req).read()
response=json.loads(json_resp)
if len(response) > 0:
coords = [[float(result["lon"]),float(result["lat"])] for result in response]
self.geocode_cache[cache_key] = coords
return coords
req=f"{NOMINATIM_ENDPOINT}/search?limit=5&q={parse.quote(cache_key)}"
json_resp=request.urlopen(req).read()
response=json.loads(json_resp)
if len(response) > 0:
coords = [[float(result["lon"]),float(result["lat"])] for result in response]
self.geocode_cache[cache_key] = coords
return coords
self.geocode_cache[cache_key]=[]
return []
def should_keep_feature(self, ogrfeature: ogr.Feature):
if not NOMINATIM_ENDPOINT or not MAX_DISTANCE_FROM_GEOCODING:
return True
city = str(ogrfeature["citta"]).strip().lower()
if ogrfeature.IsFieldSetAndNotNull("nome"):
name = re.sub(r" spa$", "", str(ogrfeature["nome"]).lower().replace(" srl","").replace(" s.r.l","").replace(" s.p.a","")).strip(" .-")
if ogrfeature["nome"] in NAME_WHITELIST:
print("Keeping feature in whitelist:", name, city)
return True
if ogrfeature["nome"] in NAME_BLACKLIST:
print("Discarding feature in blacklist:", name, city)
return False
else:
name = None
if city in CITY_BLACKLIST:
print("Discarding feature because we don't have geocoding data for San Marino:", name, city)
return False
if ogrfeature.IsFieldSetAndNotNull("ubicazione"):
location = str(ogrfeature["ubicazione"]).lower()
if "vettura" in location or "veicolo" in location or "motonave" in location or "piattaforma" in location:
print("Discarding feature for mobile DAE:", name, city)
return False
street,number = self.get_street_and_number(ogrfeature.items())
if street is None or number is None:
address = street
else:
address = street + " " + number.split('/')[0]
if ogrfeature.IsFieldSetAndNotNull("quartiere") and ogrfeature["quartiere"] != "FUORI BOLOGNA":
city = str(ogrfeature["quartiere"]) + ", " + city
try:
res = []
if address:
res = self.find_latlon(None, address, city)
if "Via " in address:
res += self.find_latlon(None, address.replace("Via ","Viale "), city)
if "Viale " in address:
res += self.find_latlon(None, address.replace("Viale ","Via "), city)
if "Piazzale " in address:
res += self.find_latlon(None, address.replace("Piazzale ","Piazza "), city)
if "-" in city:
res += self.find_latlon(None, address, city.split("-")[-1])
if name:
split_name = name.replace(city,"").strip(" ._-").split(" ")
name_start = " ".join(split_name[1:]) if len(split_name) > 1 else name
name_end = " ".join(split_name[:-1]) if len(split_name) > 1 else name
res += self.find_latlon(name, None, city) or self.find_latlon(name_start, None, city) or self.find_latlon(name_end, None, city)
split_addr = address.split(" ") if address else []
if len(split_addr) == 4:
[a,b,c,d] = split_addr
res += self.find_latlon(None, " ".join([a,b,d]), city) or self.find_latlon(None, " ".join([c,b,d]), city)
if len(split_addr) == 5:
[a,b,c,d,e] = split_addr
res += self.find_latlon(None, " ".join([a,b,c,e]), city) or self.find_latlon(None, " ".join([a,b,d,e]), city) or self.find_latlon(None, " ".join([c,b,d,e]), city)
if len(split_addr) == 6:
[a,b,c,d,e,f] = split_addr
res += self.find_latlon(None, " ".join([a,b,d,e,f]), city) or self.find_latlon(None, " ".join([c,b,d,e,f]), city) or self.find_latlon(None, " ".join([b,d,c,e,f]), city)
if len(split_addr) > 2:
res += self.find_latlon(None, " ".join(split_addr[1:]), city)
if not res:
print("Discarding feature with unknown address:", name, city)
return False
featureGeom: ogr.Geometry = ogrfeature.geometry()
dist = 1
for coords in res:
geocodeGeom = ogr.Geometry(ogr.wkbPoint)
geocodeGeom.AddPoint_2D(coords[0], coords[1])
dist = featureGeom.Distance(geocodeGeom)
if DEBUG:
print(">", name, ",", address, "-", featureGeom.ExportToWkt(), geocodeGeom.ExportToIsoWkt(), dist)
if dist > 0.45:
print("! HUGE DISTANCE !")
if dist < MAX_DISTANCE_FROM_GEOCODING:
print("Keeping feature:", name, city)
return True
print("Discarding feature too far from its geocoding:", name, city)
return False
except Exception as e:
print("Discarding feature with error:", name, city, e)
return False
def filter_feature(self, ogrfeature, layer_fields, reproject):
return ogrfeature if self.should_keep_feature(ogrfeature) != KEEP_ONLY_BAD_FEATURES else None
def filter_tags(self, tags:dict):
if not tags:
return
osmTags = tags if DEBUG else {}
osmTags["emergency"] = "defibrillator"
street,number = self.get_street_and_number(tags)
if street:
osmTags["addr:street"] = street
if number:
osmTags["addr:housenumber"] = number
if "citta" in tags and tags["citta"]:
osmTags["addr:city"] = str(tags["citta"]).strip().title()
if "quartiere" in tags and tags["quartiere"] and tags["quartiere"] != "FUORI BOLOGNA":
osmTags["addr:suburb"] = str(tags["quartiere"]).strip().title()
ubicazione = str(tags["ubicazione"]).split("\n")[0].strip().capitalize() if "ubicazione" in tags and tags["ubicazione"] else None
nome = str(tags["nome"]).strip(" -").capitalize() if "nome" in tags and tags["nome"] else None
nome_is_location = nome and (nome.startswith("Strada") or \
nome.startswith("Via") or \
nome.startswith("Piazza") or \
nome.startswith("Giardin") or \
nome.startswith("Fraz") or \
nome.startswith("Parc") or \
nome.startswith("Totem") or \
nome.startswith("Portic") or \
nome.startswith("Ingresso") or \
nome.startswith("Area") or \
nome.startswith("Edific") or \
nome.startswith("Ex") or \
nome.startswith("Teca") or \
" presso " in nome or \
" civico " in nome or \
" fianco " in nome)
if ALWAYS_MERGE_NOME_IN_LOCATION:
osmTags["defibrillator:location"] = f"{nome} - {ubicazione}" if (nome and ubicazione) else nome or ubicazione or None
elif ubicazione and nome and ubicazione.startswith(nome):
osmTags["defibrillator:location"] = ubicazione
elif ubicazione and nome_is_location:
osmTags["defibrillator:location"] = f"{nome} - {ubicazione}"
elif ubicazione:
osmTags["defibrillator:location"] = ubicazione
elif nome_is_location:
osmTags["defibrillator:location"] = nome
if not ALWAYS_MERGE_NOME_IN_LOCATION and nome and len(nome) > 1 and not nome_is_location:
osmTags["operator"] = nome
if ubicazione and ("stern" in ubicazione or "facciata" in ubicazione):
osmTags["indoor"] = "no"
elif ubicazione and (("ntern" in ubicazione and "nterno di una teca" not in ubicazione) or "l chiuso" in ubicazione):
osmTags["indoor"] = "yes"
if "orari" in tags and tags["orari"]:
# {'LUNEDI': '07:00-19:00', 'MARTEDI': '07:00-19:00', 'MERCOLEDI': '07:00-19:00', 'GIOVEDI': '07:00-19:00', 'VENERDI': '07:00-19:00', 'SABATO': '07:00-19:00'}
raw_opening_hours = tags["orari"].replace("'", '"')
try:
parsed = json.loads(raw_opening_hours)
opening_hours:str|None = None
if "LUNEDI" in parsed and parsed["LUNEDI"]:
monday_hours:str = parsed["LUNEDI"].replace(" / ", ",")
if all(hours == monday_hours for day, hours in parsed.items()):
# Opening hours are equal in all specified days of the week
if "DOMENICA" in parsed:
opening_hours = "24/7" if monday_hours == "00:00-23:59" or monday_hours == "00:01-23:59" else monday_hours
elif "SABATO" in parsed:
opening_hours = f'Mo-Sa {monday_hours}; Su off'
elif "VENERDI" in parsed:
opening_hours = f'Mo-Fr {monday_hours}; Sa-Su off'
elif "SABATO" in parsed and parsed["SABATO"]:
saturday_hours:str = parsed["SABATO"].replace(" / ", ",")
del parsed["SABATO"]
sunday_hours:str|None = None
if "DOMENICA" in parsed and parsed["DOMENICA"]:
sunday_hours = parsed["DOMENICA"].replace(" / ", ",")
del parsed["DOMENICA"]
if all(hours == monday_hours for day, hours in parsed.items()):
opening_hours = f'Mo-Sa {monday_hours}' if monday_hours == saturday_hours else f'Mo-Fr {monday_hours}; Sa {saturday_hours}'
opening_hours += f'; Su {sunday_hours}' if sunday_hours else '; Su off'
if opening_hours:
if "note" in tags and "feste" in tags["note"].lower():
opening_hours += "; PH off"
osmTags["opening_hours"] = opening_hours
except Exception as e:
print(f"Failed parsing opening hours:\n {raw_opening_hours}\n {e}")
if "note" in tags and tags["note"]:
note = str(tags["note"]).split("\n")[0].strip().lower()
if len(note) > 4 and not note.endswith('disponibile.') and not note.endswith('del sito'):
osmTags["note"] = note.capitalize()
if "stern" in note:
osmTags["indoor"] = "no"
elif ("ntern" in note and "nterno di una teca" not in note) or "l chiuso" in note:
osmTags["indoor"] = "yes"
note_says_h24 = note.startswith("sempre accessibile") or "h24" in note.replace(" ","").replace(".","")
if "opening_hours" not in osmTags and note_says_h24:
osmTags["opening_hours"] = "24/7"
if "telefono" in tags and tags["telefono"]:
osmTags["contact:phone"] = tags["telefono"].strip()
return osmTags