Source code for leuvenmapmatching.map.sqlite

# encoding: utf-8
"""
leuvenmapmatching.map.sqlite
~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Map representation based on a sqlite database. Not optimized for production purposes.

:author: Wannes Meert
:copyright: Copyright 2018 DTAI, KU Leuven and Sirris.
:license: Apache License, Version 2.0, see LICENSE for details.
"""
import sqlite3
import tempfile
import logging
import time
from pathlib import Path
import pickle
from functools import partial
try:
    import pyproj
except ImportError:
    pyproj = None
try:
    import tqdm
except ImportError:
    tqdm = None


from .base import BaseMap


MYPY = False
if MYPY:
    from typing import Optional, Set, Tuple, Dict, Union
    LabelType = Union[int, str]
    LocType = Tuple[float, float]
    EdgeType = Tuple[LabelType, LabelType]


logger = logging.getLogger("be.kuleuven.cs.dtai.mapmatching")


[docs] class SqliteMap(BaseMap): def __init__(self, name, use_latlon=True, crs_lonlat=None, crs_xy=None, dir=None, deserializing=False): """Store a map as a SQLite instance. This class supports: - Indexing using rtrees to allow for fast searching of points on the map. When using the rtree index, only integer numbers are allowed as node labels. - Serializing to write and read from files. - Projecting points to a different frame (e.g. GPS to Lambert) :param name: Name of database file :param use_latlon: The locations represent latitude-longitude pairs, otherwise y-x coordinates are assumed. :param crs_lonlat: Coordinate reference system for the latitude-longitude coordinates. :param crs_xy: Coordiante reference system for the y-x coordinates. :param dir: Directory where to serialize to. If not given, a temporary location will be used. :param deserializing: Internal variable to indicate that the object is being build from a file. """ super(SqliteMap, self).__init__(name, use_latlon=use_latlon) self.dir = Path(tempfile.gettempdir()) if dir is None else Path(dir) name = Path(name) suffix = name.suffix if suffix == '': name = name.with_suffix('.sqlite') self.db_fn = self.dir / name if deserializing and not self.db_fn.exists(): raise Exception(f"File not found: {self.db_fn}") logger.debug(f"Opening database: {self.db_fn}") try: self.db = sqlite3.connect(str(self.db_fn)) except Exception as exc: raise Exception(f'Problem with database: {self.db_fn}') from exc self.crs_lonlat = crs_lonlat self.crs_xy = crs_xy self.use_latlon = use_latlon if deserializing: self.read_properties() else: self.create_db() if self.crs_lonlat is None: self.crs_lonlat = 'EPSG:4326' # GPS if self.crs_xy is None: self.crs_xy = 'EPSG:3395' # Mercator projection self.save_properties() if pyproj: proj_lonlat = pyproj.Proj(self.crs_lonlat, preserve_units=True) proj_xy = pyproj.Proj(self.crs_xy, preserve_units=True) self.lonlat2xy = partial(pyproj.transform, proj_lonlat, proj_xy) self.xy2lonlat = partial(pyproj.transform, proj_xy, proj_lonlat) else: def pyproj_notfound(*_args, **_kwargs): raise Exception("pyproj package not found") self.lonlat2xy = pyproj_notfound self.xy2lonlat = pyproj_notfound def read_properties(self): c = self.db.cursor() for row in c.execute("SELECT key, value FROM properties;"): key, value = row[0], pickle.loads(row[1]) self.__dict__[key] = value def save_properties(self): c = self.db.cursor() q = "INSERT INTO properties (key, value) VALUES (?, ?)" v = [('name', pickle.dumps(self.name)), ('use_latlon', pickle.dumps(self.use_latlon)), ('crs_lonlat', pickle.dumps(self.crs_lonlat)), ('crs_xy', pickle.dumps(self.crs_xy))] c.executemany(q, v) self.db.commit() def create_db(self): logger.debug("Cleaning database file and creating new tables") c = self.db.cursor() c.execute("DROP INDEX IF EXISTS edges_from_index") c.execute("DROP INDEX IF EXISTS close_edges_index") c.execute("DROP TABLE IF EXISTS nodes_index") c.execute("DROP TABLE IF EXISTS nodes") c.execute("DROP TABLE IF EXISTS edges_index") c.execute("DROP TABLE IF EXISTS edges") c.execute("DROP TABLE IF EXISTS close_edges") c.execute("DROP TABLE IF EXISTS properties") self.db.commit() # Create tables q = ("CREATE VIRTUAL TABLE nodes_index USING rtree(\n" "id, -- Integer primary key\n" "minX, maxX, -- Minimum and maximum X coordinate\n" "minY, maxY -- Minimum and maximum Y coordinate\n" ")") c.execute(q) q = ("CREATE TABLE nodes(\n" "id INTEGER PRIMARY KEY,\n" "x REAL,\n" "y REAL\n" ")") c.execute(q) q = ("CREATE VIRTUAL TABLE edges_index USING rtree(\n" "id, -- Integer primary key\n" "minX, maxX, -- Minimum and maximum X coordinate\n" "minY, maxY -- Minimum and maximum Y coordinate\n" ")") c.execute(q) q = ("CREATE TABLE edges(\n" "id INTEGER PRIMARY KEY,\n" "path INTEGER,\n" # Not necessarily unique, a pathway id can consist of multiple edges "pathnum INTEGER,\n" "id1 INTEGER,\n" # node 1 "id2 INTEGER,\n" # node 2 "speed REAL,\n" # speed m/s "type INTEGER\n" # extra field ")") c.execute(q) q = ("CREATE TABLE close_edges(\n" "id1 INTEGER,\n" # edge 1 "id2 INTEGER\n" # edge 2 ")") c.execute(q) q = ("CREATE TABLE properties(\n" "key TEXT,\n" "value BLOB\n" ")") c.execute(q) q = "CREATE INDEX edges_from_index ON edges(id1)" c.execute(q) q = "CREATE INDEX close_edges_index ON close_edges(id1)" c.execute(q) self.db.commit()
[docs] @classmethod def from_file(cls, filename): """Read from an existing file.""" filename = Path(filename).with_suffix('') nmap = cls(filename.name, dir=filename.parent, deserializing=True) return nmap
[docs] def bb(self): """Bounding box. :return: (lat_min, lon_min, lat_max, lon_max) or (y_min, x_min, y_max, x_max) """ c = self.db.cursor() c.execute('SELECT min(minX), max(minX), min(maxX), max(maxX) FROM nodes_index;') lon_min, lon_max, lat_min, lat_max = c.fetchone() return lat_min, lon_min, lat_max, lon_max
[docs] def labels(self): """All labels.""" c = self.db.cursor() c.execute('SELECT id FROM nodes;') result = [row[0] for row in c.fetchall()] return result
[docs] def size(self): c = self.db.cursor() c.execute('SELECT count(*) FROM nodes') result = c.fetchone()[0] return result
[docs] def node_coordinates(self, node_key): """Get the coordinates of the given node. :param node_key: Node label/key :return: (lat, lon) """ c = self.db.cursor() c.execute('SELECT y, x FROM nodes WHERE id = ?', (node_key, )) result = c.fetchone() if result is None: raise Exception(f"No coordinates found for node {node_key}") return result
[docs] def add_node(self, node, loc, ignore_doubles=False, no_index=False, no_commit=False): """Add new node to the map. :param node: label :param loc: (lat, lon) or (y, x) :param ignore_doubles: When trying to add the same node, ignore it :param no_commit: Do not commit to database (remember to commit later) """ c = self.db.cursor() lat, lon = loc # Nodes q = "INSERT INTO nodes VALUES(?, ?, ?)" try: c.execute(q, (node, lon, lat)) except sqlite3.IntegrityError as exc: if ignore_doubles and "UNIQUE constraint failed: nodes.id" in str(exc): return logger.error(f"Problem with adding node {node} {loc}") raise exc # Nodes index if not no_index: q = "INSERT INTO nodes_index VALUES(?, ?, ?, ?, ?)" try: c.execute(q, (node, lon, lon, lat, lat)) except sqlite3.IntegrityError as exc: logger.error(f"Problem with adding node to index {node} {loc}") raise exc if not no_commit: self.db.commit()
def reindex_nodes(self): logger.debug("Reindexing nodes ...") t_start = time.time() c = self.db.cursor() c.execute('DELETE FROM nodes_index') q = ("INSERT INTO nodes_index " "SELECT id, x, x, y, y FROM nodes") c.execute(q) self.db.commit() c.execute('SELECT count(*) FROM nodes_index') cnt = c.fetchone()[0] t_delta = time.time() - t_start logger.debug(f"... done, #rows = {cnt}, time = {t_delta} sec")
[docs] def add_nodes(self, nodes): """Add list of nodes to database. :param nodes: List[Tuple[node_key, Tuple[lat, lon]]] """ c = self.db.cursor() def get_node_index(): for key, (lat, lon) in nodes: yield key, lon, lon, lat, lat q = "INSERT INTO nodes_index VALUES(?, ?, ?, ?, ?)" c.executemany(q, get_node_index()) def get_node_vals(): for key, (lat, lon) in nodes: yield key, lon, lat q = "INSERT INTO nodes VALUES(?, ?, ?)" c.executemany(q, get_node_vals()) self.db.commit()
def del_node(self, node): raise Exception("TODO")
[docs] def add_edge(self, node_a, node_b, loc_a=None, loc_b=None, speed=None, edge_type=None, path=None, pathnum=None, no_index=False, no_commit=False): """Add new edge to the map. :param node_a: Label for the node that is the start of the edge :param node_b: Label for the node that is the end of the edge :param no_commit: Do not commit to database (remember to commit later) """ c = self.db.cursor() eid = (node_a, node_b).__hash__() c.execute('INSERT OR IGNORE INTO edges(id, path, pathnum, id1, id2, type, speed) VALUES (?, ?, ?, ?, ?, ?, ?)', (eid, path, pathnum, node_a, node_b, edge_type, speed)) # c.execute('SELECT last_insert_rowid();') # eid = c.fetchone()[0] if not no_index: if loc_a is None: c.execute('SELECT y, x FROM nodes WHERE id = ?;', (node_a, )) loc_a = c.fetchone() if loc_b is None: c.execute('SELECT y, x FROM nodes WHERE id = ?;', (node_b, )) loc_b = c.fetchone() lat1, lon1 = loc_a lat2, lon2 = loc_b if lat1 > lat2: lat1, lat2 = lat2, lat1 if lon1 > lon2: lon1, lon2 = lon2, lon1 c.execute('INSERT OR IGNORE INTO edges_index(id, minX, maxX, minY, maxY) VALUES (?, ?, ?, ?, ?)', (eid, lon1, lon2, lat1, lat2)) if not no_commit: self.db.commit()
[docs] def add_edges(self, edges, no_index=False): """Add list of nodes to database. :param edges: List[Tuple[node_key, node_key]] or List[Tuple[node_key, node_key, path_key, int]] """ c = self.db.cursor() def get_edge(): for row in edges: row = list(row) + ([None] * (6 - len(row))) key_a, key_b, path, pathnum, edge_type, speed = row eid = (key_a, key_b).__hash__() yield eid, path, pathnum, key_a, key_b, edge_type, speed q = "INSERT INTO edges(id, path, pathnum, id1, id2, type, speed) VALUES(?, ?, ?, ?, ?, ?, ?);" c.executemany(q, get_edge()) self.db.commit() if not no_index: self.reindex_edges()
def reindex_edges(self): logger.debug("Reindexing edges ...") t_start = time.time() c = self.db.cursor() # c2 = self.db.cursor() c.execute('DELETE FROM edges_index') q = ('INSERT INTO edges_index ' 'SELECT e.id, MIN(n1.x,n2.x), MAX(n1.x,n2.x), ' ' MIN(n1.y,n2.y), MAX(n1.y,n2.y) ' 'FROM edges e ' 'INNER JOIN nodes n1 ON n1.id = e.id1 ' 'INNER JOIN nodes n2 ON n2.id = e.id2') c.execute(q) # cnt = 0 # for row in c.execute(q): # # Contained in query # c2.execute('INSERT INTO edges_index(id, minX, maxX, minY, maxY) VALUES (?, ?, ?, ?, ?)', row) # cnt += 1 self.db.commit() c.execute('SELECT count(*) FROM edges_index') cnt = c.fetchone()[0] t_delta = time.time() - t_start logger.debug(f"... done, #rows = {cnt}, time = {t_delta} sec")
[docs] def all_edges(self, bb=None): """Return all edges. :param bb: Bounding box :return: (key_a, loc_a, nbr, loc_b) """ c = self.db.cursor() q = 'SELECT e.id1, e.id2, n1.x AS n1x, n2.x AS n2x, n1.y AS n1y, n2.y AS n2y ' + \ 'FROM edges e, edges_index ei ' + \ 'LEFT JOIN nodes n1 ON n1.id = e.id1 ' + \ 'LEFT JOIN nodes n2 ON n2.id = e.id2 ' + \ 'WHERE ei.id == e.id' if bb: min_y, min_x, max_y, max_x = bb # Intersecting with query q += ' AND ei.maxX >= ? AND ei.minX <= ? AND ei.maxY >= ? AND ei.minY <= ?' c.execute(q, (min_x, max_x, min_y, max_y)) else: c.execute(q) for row in c.fetchall(): key_a, key_b, lon_a, lon_b, lat_a, lat_b = row yield key_a, (lat_a, lon_a), key_b, (lat_b, lon_b)
[docs] def all_nodes(self, bb=None): """Return all nodes. :param bb: Bounding box (minY, minX, maxY, maxX) :return: """ c = self.db.cursor() q = ('SELECT n.id, n.x, n.y ' 'FROM nodes n, nodes_index ni ' 'WHERE n.id = ni.id ') if bb: minY, minX, maxY, maxX = bb q += 'AND ni.minX >= ? AND ni.maxX <= ? AND ni.minY >= ? AND ni.maxY <= ?' c.execute(q, (minX, maxX, minY, maxY)) else: c.execute(q) for row in c.fetchall(): key_a, lon_a, lat_a = row yield key_a, (lat_a, lon_a)
def purge(self): pass
[docs] def to_xy(self, name=None): """Create a map that uses a projected XY representation on which Euclidean distances can be used. """ if not self.use_latlon: return self if name is None: name = self.name + "_xy" logger.debug("Start transformation ...") t_start = time.time() nmap = self.__class__(name, dir=self.dir, use_latlon=self.use_latlon, crs_xy=self.crs_xy, crs_lonlat=self.crs_lonlat) raise Exception("to implement") t_delta = time.time() - t_start logger.debug(f"... done: rtree size = {self.rtree_size()}, time = {t_delta} sec") return nmap
def latlon2xy(self, lat, lon): x, y = self.lonlat2xy(lon, lat) return x, y def latlon2yx(self, lat, lon): x, y = self.lonlat2xy(lon, lat) return y, x def xy2latlon(self, x, y): lon, lat = self.xy2lonlat(x, y) return lat, lon def yx2latlon(self, y, x): lon, lat = self.xy2lonlat(x, y) return lat, lon
[docs] def nodes_closeto(self, loc, max_dist=None, max_elmt=None): """Return all nodes close to the given location. :param loc: Location :param max_dist: Maximal distance from the location :param max_elmt: Return only the most nearby nodes """ t_start = time.time() lat, lon = loc[:2] lat_b, lon_l, lat_t, lon_r = self.box_around_point((lat, lon), max_dist) bb = (lat_b, lon_l, # y_min, x_min lat_t, lon_r) # y_max, x_max nodes = self.all_nodes(bb=bb) t_delta_search = time.time() - t_start t_start = time.time() results = [] for key_o, loc_o in nodes: dist = self.distance(loc, loc_o) if dist < max_dist: results.append((dist, key_o, loc_o)) results.sort() t_delta_dist = time.time() - t_start logger.debug(f"Found {len(results)} closeby nodes " f"in {t_delta_search} sec and computed distances in {t_delta_dist} sec") if max_elmt is not None: results = results[:max_elmt] return results
[docs] def edges_closeto(self, loc, max_dist=None, max_elmt=None): """Return all nodes that are on an edge that is close to the given location. :param loc: Location :param max_dist: Maximal distance from the location :param max_elmt: Return only the most nearby nodes """ print(f"edges_closeto({loc})") t_start = time.time() lat, lon = loc[:2] lat_b, lon_l, lat_t, lon_r = self.box_around_point((lat, lon), max_dist) bb = (lat_b, lon_l, # y_min, x_min lat_t, lon_r) # y_max, x_max logger.debug(f"Search in bounding box {bb}") nodes = self.all_edges(bb=bb) t_delta_search = time.time() - t_start t_start = time.time() results = [] for key_a, loc_a, key_b, loc_b in nodes: dist, pi, ti = self.distance_point_to_segment(loc, loc_a, loc_b) if dist < max_dist: results.append((dist, key_a, loc_a, key_b, loc_b, pi, ti)) results.sort() t_delta_dist = time.time() - t_start logger.debug(f"Found {len(results)} closeby edges " f"in {t_delta_search} sec and computed distances in {t_delta_dist} sec") if max_elmt is not None: results = results[:max_elmt] return results
[docs] def nodes_nbrto(self, node): c = self.db.cursor() q = ('SELECT e.id2, n2.y, n2.x FROM edges e ' 'INNER JOIN nodes n2 ON n2.id = e.id2 ' 'WHERE e.id1 = ?') results = [] for nbr_label, nbr_lat, nbr_lon in c.execute(q, (node, )): results.append((nbr_label, (nbr_lat, nbr_lon))) return results
[docs] def edges_nbrto(self, edge): l1, l2 = edge c = self.db.cursor() c.execute('SELECT n.y, n.x FROM nodes n WHERE id = ?', (l2, )) p2 = c.fetchone() results = [] # Edges that connect at end of this edge for l3, p3 in self.nodes_nbrto(l2): results.append((l2, p2, l3, p3)) # Edges that are in parallel and close edge_id = edge.__hash__() q = ('SELECT e.id1, e.id2, n1.y, n1.x, n2.y, n2.x FROM close_edges ce ' 'INNER JOIN edges e ON e.id = ce.id2 ' 'INNER JOIN nodes n1 ON n1.id = e.id1 ' 'INNER JOIN nodes n2 ON n2.id = e.id2 ' 'WHERE ce.id1 = ?') for l3, l4, p3lat, p3lon, p4lat, p4lon in c.execute(q, (edge_id,)): results.append((l3, (p3lat, p3lon), l4, (p4lat, p4lon))) return results
[docs] def find_duplicates(self, func=None): """Find entries with identical locations.""" c = self.db.cursor() logger.debug('Find duplicates ...') t_start = time.time() cnt = 0 q = ('select count(*)as qty, group_concat(id) ' 'from nodes ' 'group by y, x ' 'having qty > 1 ') for ncnt, idxs in c.execute(q): func(int(idx) for idx in idxs.split(",")) t_delta = time.time() - t_start logger.info(f"Found {cnt} doubles, time: {t_delta} seconds")
def connect_parallelroads(self, dist=0.5, bb=None): c = self.db.cursor() it = self.all_edges(bb=bb) if tqdm: it = tqdm.tqdm(list(it)) cnt = 0 for key_a, loc_a, key_b, loc_b in it: e_id1 = (key_a, key_b).__hash__() bb2 = [min(loc_a[0], loc_b[0]), min(loc_a[1], loc_b[1]), max(loc_a[0], loc_b[0]), max(loc_a[1], loc_b[1])] for key_c, loc_c, key_d, loc_d in self.all_edges(bb=bb2): e_id2 = (key_c, key_d).__hash__() if key_a == key_c or key_a == key_d or key_b == key_c or key_b == key_d: continue # print(f"Test: ({key_a},{key_b}) - ({key_c},{key_d})") if self.lines_parallel(loc_a, loc_b, loc_c, loc_d, d=dist): # print(f"Parallel: ({key_a},{key_b}) - ({key_c},{key_d})") c.execute('INSERT INTO close_edges(id1, id2) VALUES (?, ?)', (e_id1, e_id2)) c.execute('INSERT INTO close_edges(id1, id2) VALUES (?, ?)', (e_id2, e_id1)) cnt += 1 logger.debug(f"Linked {cnt} edges") self.db.commit() def nodes_to_paths(self, nodes, ignore_nopath=True): c = self.db.cursor() prev_path = None paths = [] for begin, end in zip(nodes[:-1], nodes[1:]): c.execute("SELECT path FROM edges WHERE id1=? AND id2=?", (begin, end)) path = c.fetchone()[0] if path is None and ignore_nopath: continue if path != prev_path: paths.append(path) prev_path = path return paths def path_dist(self, path): c = self.db.cursor() dist = 0 q = ('SELECT n1.y, n1.x, n2.y, n2.x FROM edges e ' 'INNER JOIN nodes n1 ON n1.id = e.id1 ' 'INNER JOIN nodes n2 ON n2.id = e.id2 ' 'WHERE e.pathnum>0 AND e.path=?') for lat1, lon1, lat2, lon2 in c.execute(q, (path,)): dist += self.distance((lat1, lon1), (lat2, lon2)) return dist def print_stats(self): print("Graph\n-----") print("Nodes: {}".format(len(self.graph))) def __str__(self): # s = "" # for label, (loc, nbrs, _) in self.graph.items(): # s += f"{label:<10} - ({loc[0]:10.4f}, {loc[1]:10.4f})\n" # return s c = self.db.cursor() c.execute("select sqlite_version()") row = c.fetchone() version = row[0] return f"SqliteMap({self.name}, size={self.size()}, version={version})"