Import/Catalogue/DAE Emilia-Romagna/translation.py

From OpenStreetMap Wiki
Jump to navigation Jump to search
import json
import re
from os import path
from urllib import parse, request

import ogr2osm
from osgeo import ogr

DEBUG=False
KEEP_ONLY_BAD_FEATURES=False
ALWAYS_MERGE_NOME_IN_LOCATION=False
MERGE_DUPLICATE_LOCATIONS=True
NOMINATIM_ENDPOINT="http://localhost:8080"
GEOCODE_CACHE_FILE="./geocode_cache.json"
MAX_DISTANCE_FROM_GEOCODING=0.004

NAME_WHITELIST = { # Features verified manually as good
    "PMS CREAZIONE CERAMICHE",
    "Ufficio Turistico",
    "FRAZ. SAN LORENZO DI CASTELL'ARQUATO",
    "MC DONALDS -",
    "CHIESA DI  SAN FRANCESCO D'ASSISI _ LOC BOSCONE",
    "CONFARTIGIANATO DI CESENA",
    "Chiesa di Vitignano",
    "TENSIOSTRUTTURA IMPIANTI SPORTIVI",
    "UFFICIO INFORMAZIONE E ACCOGLIENZA TURISTICA",
    "FONDAZIONE SAN GIUSEPPE CFP C.E.S.T.A.",
    "PIAZZA COMUNE DI LUZZARA",
    "PALESTRA COMUNALE PLESSO SCOLASTICO \"C. COLLODI\"",
    "Parcheggio Ufficio postale",
    "NUOVA FARMACIA BAISO",
    "Pizzeria Il Carrobbio",
}

NAME_BLACKLIST = { # Features verified manually as bad
    "Farmacia Cuoghi",
}

CITY_BLACKLIST = {
    "repubblica di san marino",
    "castello citta di san marino",
    "castello serravalle",
    "castello borgo maggiore",
    "castello acquaviva",
    "castello faetano",
}

class EmiliaRomagnaDaeTranslation(ogr2osm.TranslationBase):
    geocode_cache:dict = {}

    def __init__(self):
        super()
        if GEOCODE_CACHE_FILE and path.isfile(GEOCODE_CACHE_FILE):
            print("Reading geocode cache")
            with open(GEOCODE_CACHE_FILE, "r") as myfile:
                self.geocode_cache = json.loads(myfile.read())

    def __del__(self):
        if GEOCODE_CACHE_FILE:
            print("Saving geocode cache")
            with open(GEOCODE_CACHE_FILE, "w") as myfile:
                myfile.write(json.dumps(self.geocode_cache))

    def merge_tags(self, geometry_type, tags_existing_geometry:dict, tags_new_geometry:dict):
        if DEBUG:
            for key,value in tags_new_geometry.items():
                tags_existing_geometry[f"duplicate_{key}"] = [value]
        
        ex_operator = tags_existing_geometry["operator"][0] if "operator" in tags_existing_geometry else None
        new_operator = tags_new_geometry["operator"] if "operator" in tags_new_geometry else None
        ex_location = tags_existing_geometry["defibrillator:location"][0] if "defibrillator:location" in tags_existing_geometry else None
        new_location = tags_new_geometry["defibrillator:location"] if "defibrillator:location" in tags_new_geometry else None
        if MERGE_DUPLICATE_LOCATIONS and ex_operator and new_operator and ex_operator.lower() == new_operator.lower():
            print(f"Merging duplicate location for operator '{ex_operator}'")
            tags_existing_geometry["defibrillator:location"].append(new_location)
            if "addr:street" not in tags_existing_geometry and "addr:street" in tags_new_geometry:
                tags_existing_geometry["addr:street"] = tags_new_geometry["addr:street"]
            if "addr:housenumber" not in tags_existing_geometry and "addr:housenumber" in tags_new_geometry:
                tags_existing_geometry["addr:housenumber"] = tags_new_geometry["addr:housenumber"]
        else:
            print("Ignoring duplicate geometry")
            print(f"    operator: '{ex_operator}' VS '{new_operator}'")
        print(f"    location: '{ex_location}' VS '{new_location}'")
        
        return tags_existing_geometry
    
    def get_street_and_number(self, attrs:dict):
        if "indirizzo" not in attrs or not attrs["indirizzo"]:
            return None,None
        
        indirizzo = str(attrs["indirizzo"]).lower().replace("snc","").replace("n.d","").replace("n:d","").replace("xxx","").strip(" -")
        if not indirizzo or 'sbagliat' in indirizzo or 'sconosciut' in indirizzo:
            return None,None
        
        street_match = re.findall(r'^[^-(_\d]+', indirizzo)
        street = str(street_match[0]).replace("p.zza","piazza").strip().title() if street_match and street_match[0] else None
        
        number_match = re.findall(r'\d+\/?[a-z]*$', indirizzo)
        multiple_number_match = re.findall(r'\d+\/\w+$', indirizzo)
        if multiple_number_match and multiple_number_match[0]:
            number = str(multiple_number_match[0])
        elif number_match and number_match[0]:
            number = str(number_match[0]).replace("/","")
        else:
            number = None
        #print("housenumber:", number, number_match, multiple_number_match)

        return street,number
    
    def find_latlon(self, name:str|None, address:str|None, city:str|None) -> list:
        cache_key=f"{name or ""}, {address or ""}, {city or ""}"
        if cache_key in self.geocode_cache:
            return self.geocode_cache[cache_key] or []
        else:
            print("Searching", cache_key)
            # https://nominatim.org/release-docs/develop/api/Search/#structured-query
            amenity_param = ("&amenity=" + parse.quote(name)) if name else ""
            street_param = ("&street=" + parse.quote(address)) if address else ""
            city_param = ("&city=" + parse.quote(city)) if city else ""
            req=f"{NOMINATIM_ENDPOINT}/search?limit=5{amenity_param}{street_param}{city_param}"
            json_resp=request.urlopen(req).read()
            response=json.loads(json_resp)
            if len(response) > 0:
                coords = [[float(result["lon"]),float(result["lat"])] for result in response]
                self.geocode_cache[cache_key] = coords
                return coords
            
            req=f"{NOMINATIM_ENDPOINT}/search?limit=5&q={parse.quote(cache_key)}"
            json_resp=request.urlopen(req).read()
            response=json.loads(json_resp)
            if len(response) > 0:
                coords = [[float(result["lon"]),float(result["lat"])] for result in response]
                self.geocode_cache[cache_key] = coords
                return coords

            self.geocode_cache[cache_key]=[]
            return []

    def should_keep_feature(self, ogrfeature: ogr.Feature):
        if not NOMINATIM_ENDPOINT or not MAX_DISTANCE_FROM_GEOCODING:
            return True

        city = str(ogrfeature["citta"]).strip().lower()
        
        if ogrfeature.IsFieldSetAndNotNull("nome"):
            name = re.sub(r" spa$", "", str(ogrfeature["nome"]).lower().replace(" srl","").replace(" s.r.l","").replace(" s.p.a","")).strip(" .-")

            if ogrfeature["nome"] in NAME_WHITELIST:
                print("Keeping feature in whitelist:", name, city)
                return True
            
            if ogrfeature["nome"] in NAME_BLACKLIST:
                print("Discarding feature in blacklist:", name, city)
                return False
        else:
            name = None
        
        if city in CITY_BLACKLIST:
            print("Discarding feature because we don't have geocoding data for San Marino:", name, city)
            return False
        
        if ogrfeature.IsFieldSetAndNotNull("ubicazione"):
            location = str(ogrfeature["ubicazione"]).lower()
            if "vettura" in location or "veicolo" in location or "motonave" in location or "piattaforma" in location:
                print("Discarding feature for mobile DAE:", name, city)
                return False
        
        street,number = self.get_street_and_number(ogrfeature.items())
        if street is None or number is None:
            address = street
        else:
            address = street + " " + number.split('/')[0]
        
        if ogrfeature.IsFieldSetAndNotNull("quartiere") and ogrfeature["quartiere"] != "FUORI BOLOGNA":
            city = str(ogrfeature["quartiere"]) + ", " + city

        try:
            res = []
            if address:
                res = self.find_latlon(None, address, city)
                if "Via " in address:
                    res += self.find_latlon(None, address.replace("Via ","Viale "), city)
                if "Viale " in address:
                    res += self.find_latlon(None, address.replace("Viale ","Via "), city)
                if "Piazzale " in address:
                    res += self.find_latlon(None, address.replace("Piazzale ","Piazza "), city)
                if "-" in city:
                    res += self.find_latlon(None, address, city.split("-")[-1])

            if name:
                split_name = name.replace(city,"").strip(" ._-").split(" ")
                name_start = " ".join(split_name[1:]) if len(split_name) > 1 else name
                name_end = " ".join(split_name[:-1]) if len(split_name) > 1 else name
                res += self.find_latlon(name, None, city) or self.find_latlon(name_start, None, city) or self.find_latlon(name_end, None, city)

            
            split_addr = address.split(" ") if address else []
            if len(split_addr) == 4:
                [a,b,c,d] = split_addr
                res += self.find_latlon(None, " ".join([a,b,d]), city) or self.find_latlon(None, " ".join([c,b,d]), city)
            if len(split_addr) == 5:
                [a,b,c,d,e] = split_addr
                res += self.find_latlon(None, " ".join([a,b,c,e]), city) or self.find_latlon(None, " ".join([a,b,d,e]), city) or self.find_latlon(None, " ".join([c,b,d,e]), city)
            if len(split_addr) == 6:
                [a,b,c,d,e,f] = split_addr
                res += self.find_latlon(None, " ".join([a,b,d,e,f]), city) or self.find_latlon(None, " ".join([c,b,d,e,f]), city) or self.find_latlon(None, " ".join([b,d,c,e,f]), city)
            if len(split_addr) > 2:
                res += self.find_latlon(None, " ".join(split_addr[1:]), city)
            
            if not res:
                print("Discarding feature with unknown address:", name, city)
                return False
            
            featureGeom: ogr.Geometry = ogrfeature.geometry()
            dist = 1
            for coords in res:
                geocodeGeom = ogr.Geometry(ogr.wkbPoint)
                geocodeGeom.AddPoint_2D(coords[0], coords[1])
                dist = featureGeom.Distance(geocodeGeom)
                if DEBUG:
                    print(">", name, ",", address, "-", featureGeom.ExportToWkt(), geocodeGeom.ExportToIsoWkt(), dist)
                    if dist > 0.45:
                        print("! HUGE DISTANCE !")
                if dist < MAX_DISTANCE_FROM_GEOCODING:
                    print("Keeping feature:", name, city)
                    return True
            
            print("Discarding feature too far from its geocoding:", name, city)
            return False
        except Exception as e:
            print("Discarding feature with error:", name, city, e)
            return False
    
    def filter_feature(self, ogrfeature, layer_fields, reproject):
        return ogrfeature if self.should_keep_feature(ogrfeature) != KEEP_ONLY_BAD_FEATURES else None

    def filter_tags(self, tags:dict):
        if not tags:
            return

        osmTags = tags if DEBUG else {}

        osmTags["emergency"] = "defibrillator"

        street,number = self.get_street_and_number(tags)
        if street:
            osmTags["addr:street"] = street
        if number:
            osmTags["addr:housenumber"] = number

        if "citta" in tags and tags["citta"]:
            osmTags["addr:city"] = str(tags["citta"]).strip().title()

        if "quartiere" in tags and tags["quartiere"] and tags["quartiere"] != "FUORI BOLOGNA":
            osmTags["addr:suburb"] = str(tags["quartiere"]).strip().title()

        ubicazione = str(tags["ubicazione"]).split("\n")[0].strip().capitalize() if "ubicazione" in tags and tags["ubicazione"] else None
        nome = str(tags["nome"]).strip(" -").capitalize() if "nome" in tags and tags["nome"] else None
        nome_is_location = nome and (nome.startswith("Strada") or \
                                     nome.startswith("Via") or \
                                     nome.startswith("Piazza") or \
                                     nome.startswith("Giardin") or \
                                     nome.startswith("Fraz") or \
                                     nome.startswith("Parc") or \
                                     nome.startswith("Totem") or \
                                     nome.startswith("Portic") or \
                                     nome.startswith("Ingresso") or \
                                     nome.startswith("Area") or \
                                     nome.startswith("Edific") or \
                                     nome.startswith("Ex") or \
                                     nome.startswith("Teca") or \
                                     " presso " in nome or \
                                     " civico " in nome or \
                                     " fianco " in nome)
        
        if ALWAYS_MERGE_NOME_IN_LOCATION:
            osmTags["defibrillator:location"] = f"{nome} - {ubicazione}" if (nome and ubicazione) else nome or ubicazione or None
        elif ubicazione and nome and ubicazione.startswith(nome):
            osmTags["defibrillator:location"] = ubicazione
        elif ubicazione and nome_is_location:
            osmTags["defibrillator:location"] = f"{nome} - {ubicazione}"
        elif ubicazione:
            osmTags["defibrillator:location"] = ubicazione
        elif nome_is_location:
            osmTags["defibrillator:location"] = nome
        
        if not ALWAYS_MERGE_NOME_IN_LOCATION and nome and len(nome) > 1 and not nome_is_location:
            osmTags["operator"] = nome

        if ubicazione and ("stern" in ubicazione or "facciata" in ubicazione):
            osmTags["indoor"] = "no"
        elif ubicazione and (("ntern" in ubicazione and "nterno di una teca" not in ubicazione) or "l chiuso" in ubicazione):
            osmTags["indoor"] = "yes"
        
        if "orari" in tags and tags["orari"]:
            # {'LUNEDI': '07:00-19:00', 'MARTEDI': '07:00-19:00', 'MERCOLEDI': '07:00-19:00', 'GIOVEDI': '07:00-19:00', 'VENERDI': '07:00-19:00', 'SABATO': '07:00-19:00'}
            raw_opening_hours = tags["orari"].replace("'", '"')
            try:
                parsed = json.loads(raw_opening_hours)
                opening_hours:str|None = None
                if "LUNEDI" in parsed and parsed["LUNEDI"]:
                    monday_hours:str = parsed["LUNEDI"].replace(" / ", ",")
                    if all(hours == monday_hours for day, hours in parsed.items()):
                        # Opening hours are equal in all specified days of the week
                        if "DOMENICA" in parsed:
                            opening_hours = "24/7" if monday_hours == "00:00-23:59" or monday_hours == "00:01-23:59" else monday_hours
                        elif "SABATO" in parsed:
                            opening_hours = f'Mo-Sa {monday_hours}; Su off'
                        elif "VENERDI" in parsed:
                            opening_hours = f'Mo-Fr {monday_hours}; Sa-Su off'
                    elif "SABATO" in parsed and parsed["SABATO"]:
                        saturday_hours:str = parsed["SABATO"].replace(" / ", ",")
                        del parsed["SABATO"]

                        sunday_hours:str|None = None
                        if "DOMENICA" in parsed and parsed["DOMENICA"]:
                            sunday_hours = parsed["DOMENICA"].replace(" / ", ",")
                            del parsed["DOMENICA"]

                        if all(hours == monday_hours for day, hours in parsed.items()):
                            opening_hours = f'Mo-Sa {monday_hours}' if monday_hours == saturday_hours else f'Mo-Fr {monday_hours}; Sa {saturday_hours}'
                            opening_hours += f'; Su {sunday_hours}' if sunday_hours else '; Su off'

                    if opening_hours:
                        if "note" in tags and "feste" in tags["note"].lower():
                            opening_hours += "; PH off"
                        osmTags["opening_hours"] = opening_hours
            except Exception as e:
                print(f"Failed parsing opening hours:\n    {raw_opening_hours}\n    {e}")

        if "note" in tags and tags["note"]:
            note = str(tags["note"]).split("\n")[0].strip().lower()
            if len(note) > 4 and not note.endswith('disponibile.') and not note.endswith('del sito'):
                osmTags["note"] = note.capitalize()

            if "stern" in note:
                osmTags["indoor"] = "no"
            elif ("ntern" in note and "nterno di una teca" not in note) or "l chiuso" in note:
                osmTags["indoor"] = "yes"

            note_says_h24 = note.startswith("sempre accessibile") or "h24" in note.replace(" ","").replace(".","")
            if "opening_hours" not in osmTags and note_says_h24:
                osmTags["opening_hours"] = "24/7"

        if "telefono" in tags and tags["telefono"]:
            osmTags["contact:phone"] = tags["telefono"].strip()

        return osmTags