# encoding: utf-8
"""
leuvenmapmatching.map.inmem
~~~~~~~~~~~~~~~~~~~~~~~~~~~
Simple in-memory map representation. Not suited for production purposes.
Write your own map class that connects to your map (e.g. a database instance).
:author: Wannes Meert
:copyright: Copyright 2018 DTAI, KU Leuven and Sirris.
:license: Apache License, Version 2.0, see LICENSE for details.
"""
import logging
import time
from pathlib import Path
import pickle
from functools import partial
try:
import rtree
except ImportError:
rtree = None
try:
import pyproj
except ImportError:
pyproj = None
try:
import tqdm
except ImportError:
tqdm = None
from .base import BaseMap
MYPY = False
if MYPY:
from typing import Optional, Set, Tuple, Dict, Union
LabelType = Union[int, str]
LocType = Tuple[float, float]
EdgeType = Tuple[LabelType, LabelType]
logger = logging.getLogger("be.kuleuven.cs.dtai.mapmatching")
[docs]
class InMemMap(BaseMap):
def __init__(self, name, use_latlon=True, use_rtree=False, index_edges=False,
crs_lonlat=None, crs_xy=None, graph=None, linked_edges=None, dir=None, deserializing=False):
"""In-memory representation of a map.
This is a simple database-like object to perform experiments with map matching.
For production purposes it is recommended to use your own derived
class (e.g. to connect to your database instance).
This class supports:
- Indexing using rtrees to allow for fast searching of points on the map.
When using the rtree index, only integer numbers are allowed as node labels.
- Serializing to write and read from files.
- Projecting points to a different frame (e.g. GPS to Lambert)
:param name: Map name (mandatory)
:param use_latlon: The locations represent latitude-longitude pairs, otherwise y-x coordinates
are assumed.
:param use_rtree: Build an rtree index to quickly search for locations.
:param index_edges: Build an index for the edges in the map instead of the vertices.
:param crs_lonlat: Coordinate reference system for the latitude-longitude coordinates.
:param crs_xy: Coordiante reference system for the y-x coordinates.
:param graph: Initial graph of form Dict[label, Tuple[Tuple[y,x], List[neighbor]]]]
:param dir: Directory where to serialize to. If given, the rtree index structure will be written
to a file immediately.
:param deserializing: Internal variable to indicate that the object is being build from a file.
"""
super(InMemMap, self).__init__(name, use_latlon=use_latlon)
self.dir = None if dir is None else Path(dir)
self.index_edges = index_edges
self.graph = dict() if graph is None else graph
self.rtree = None
self.use_rtree = use_rtree
if self.use_rtree:
self.setup_index(deserializing=deserializing)
self.crs_lonlat = 'EPSG:4326' if crs_lonlat is None else crs_lonlat # GPS
self.crs_xy = 'EPSG:3395' if crs_xy is None else crs_xy # Mercator projection
if pyproj:
# proj_lonlat = pyproj.Proj(self.crs_lonlat, preserve_units=True)
# proj_xy = pyproj.Proj(self.crs_xy, preserve_units=True)
# self.lonlat2xy = partial(pyproj.transform, proj_lonlat, proj_xy)
# self.xy2lonlat = partial(pyproj.transform, proj_xy, proj_lonlat)
tr_lonlat2xy = pyproj.Transformer.from_crs(self.crs_lonlat, self.crs_xy)
self.lonlat2xy = tr_lonlat2xy.transform
tr_xy2lonlat = pyproj.Transformer.from_crs(self.crs_xy, self.crs_lonlat)
self.xy2lonlat = tr_xy2lonlat.transform
else:
def pyproj_notfound(*_args, **_kwargs):
raise Exception("pyproj package not found")
self.lonlat2xy = pyproj_notfound
self.xy2lonlat = pyproj_notfound
self.linked_edges = linked_edges # type: Optional[Dict[EdgeType, Set[Tuple[EdgeType]]]]
self.vertex_label_map = None
def vertex_label_to_int(self, label, create=False):
if type(label) is int:
return label
if self.vertex_label_map is None:
if not create:
return label
self.vertex_label_map = dict()
if label in self.vertex_label_map:
new_label = self.vertex_label_map[label]
else:
new_label = len(self.vertex_label_map)
self.vertex_label_map[label] = new_label
return new_label
def vertices_labels_to_int(self):
graph = dict()
for label, (loc, nbrs) in self.graph.items():
new_label = self.vertex_label_to_int(label, create=True)
new_nbrs = [self.vertex_label_to_int(nbr, create=True) for nbr in nbrs]
graph[new_label] = (loc, new_nbrs)
self.graph = graph
[docs]
def serialize(self):
"""Create a serializable data structure."""
data = {
"name": self.name,
"graph": self.graph,
"use_latlon": self.use_latlon,
"use_rtree": self.use_rtree,
"index_edges": self.index_edges,
"crs_lonlat": self.crs_lonlat,
"crs_xy": self.crs_xy,
"linked_edges": self.linked_edges
}
if self.dir is not None:
data["dir"] = self.dir
return data
[docs]
@classmethod
def deserialize(cls, data):
"""Create a new instance from a dictionary."""
nmap = cls(data["name"], dir=data.get("dir", None),
use_latlon=data["use_latlon"], use_rtree=data["use_rtree"],
index_edges=data["index_edges"],
crs_lonlat=data.get("crs_lonlat", None), crs_xy=data.get("crs_xy", None),
graph=data.get("graph", None), linked_edges=data.get("linked_edges", None),
deserializing=True)
return nmap
[docs]
def dump(self):
"""Serialize map using pickle.
All files will be saved to the `dir` directory using the `name` as filename.
"""
if self.dir is None:
logger.error(f"No directory set where to save (see InMemMap.__init__)")
return
filename = self.dir / (self.name + ".pkl")
with filename.open("wb") as ofile:
pickle.dump(self.serialize(), ofile)
logger.debug(f"Saved map to {filename}")
if self.rtree:
rtree_fn = self.rtree_fn()
if rtree_fn is not None:
self.rtree.close()
self.rtree = rtree.index.Index(str(rtree_fn))
[docs]
@classmethod
def from_pickle(cls, filename):
"""Deserialize map using pickle to the given filename."""
filename = Path(filename)
with filename.open("rb") as ifile:
data = pickle.load(ifile)
nmap = cls.deserialize(data)
return nmap
[docs]
def bb(self):
"""Bounding box.
:return: (lat_min, lon_min, lat_max, lon_max) or (y_min, x_min, y_max, x_max)
"""
if self.use_rtree:
lat_min, lon_min, lat_max, lon_max = self.rtree.bounds
else:
glat, glon = zip(*[t[0] for t in self.graph.values()])
lat_min, lat_max = min(glat), max(glat)
lon_min, lon_max = min(glon), max(glon)
return lat_min, lon_min, lat_max, lon_max
[docs]
def labels(self):
"""All labels."""
return self.graph.keys()
[docs]
def size(self):
return len(self.graph)
[docs]
def node_coordinates(self, node_key):
"""Get the coordinates of the given node.
:param node_key: Node label/key
:return: (lat, lon)
"""
return self.graph[node_key][0]
[docs]
def add_node(self, node, loc):
"""Add new node to the map.
:param node: label
:param loc: (lat, lon) or (y, x)
"""
if node in self.graph:
if self.graph[node][0] is None:
self.graph[node] = (loc, self.graph[node][1])
else:
self.graph[node] = (loc, [])
if self.use_rtree and self.rtree is not None and not self.index_edges:
if type(node) is not int:
raise Exception(f"Rtree index only supports integer keys for vertices")
self.rtree.insert(node, (loc[0], loc[1], loc[0], loc[1]))
def del_node(self, node):
if node not in self.graph:
return
if self.rtree:
data = self.graph[node]
loc = data[0]
self.rtree.delete(node, (loc[0], loc[1], loc[0], loc[1]))
del self.graph[node]
[docs]
def add_edge(self, node_a, node_b):
"""Add new edge to the map.
:param node_a: Label for the node that is the start of the edge
:param node_b: Label for the node that is the end of the edge
"""
if node_a not in self.graph:
raise ValueError(f"Add {node_a} first as node")
if node_b not in self.graph:
raise ValueError(f"Add {node_b} first as node")
if node_b not in self.graph[node_a][1]:
self.graph[node_a][1].append(node_b)
if self.use_rtree and self.rtree is not None and self.index_edges:
if type(node_a) is not int or type(node_b) is not int:
raise Exception(f"Rtree index only supports integer keys for vertices")
loc_a = self.graph[node_a][0]
loc_b = self.graph[node_b][0]
bb = (min(loc_a[0], loc_b[0]), min(loc_a[1], loc_b[1]), # y_min, x_min
max(loc_a[0], loc_b[0]), max(loc_a[1], loc_b[1])) # y_max, x_max
self.rtree.insert(node_a, bb)
# self.rtree.insert(node_b, bb)
def _items_in_bb(self, bb):
if self.rtree is not None:
node_idxs = self.rtree.intersection(bb)
for key in node_idxs:
yield (key, self.graph[key])
else:
lat_min, lon_min, lat_max, lon_max = bb
for key, value in self.graph.items():
((lat, lon), nbrs) = value
if lat_min <= lat <= lat_max and lon_min <= lon <= lon_max:
yield (key, value)
[docs]
def all_edges(self, bb=None):
"""Return all edges.
:param bb: Bounding box
:return: (key_a, loc_a, nbr, loc_b)
"""
if bb is None:
keyvals = self.graph.items()
else:
keyvals = self._items_in_bb(bb)
for key_a, (loc_a, nbrs) in keyvals:
if loc_a is not None:
for nbr in nbrs:
try:
loc_b, _ = self.graph[nbr]
if loc_b is not None:
yield (key_a, loc_a, nbr, loc_b)
except KeyError:
# print("Node not found: {}".format(nbr))
pass
[docs]
def all_nodes(self, bb=None):
"""Return all nodes.
:param bb: Bounding box
:return:
"""
if bb is None:
keyvals = self.graph.items()
else:
keyvals = self._items_in_bb(bb)
for key_a, (loc_a, nbrs) in keyvals:
if loc_a is not None:
yield key_a, loc_a
def purge(self):
cnt_noloc = 0
cnt_noedges = 0
remove = []
for node in self.graph.keys():
if self.graph[node][0] is None:
cnt_noloc += 1
remove.append(node)
# print("No location for node {}".format(node))
elif len(self.graph[node][1]) == 0:
cnt_noedges += 1
remove.append(node)
for node in remove:
self.del_node(node)
logger.debug("Removed {} nodes without location".format(cnt_noloc))
logger.debug("Removed {} nodes without edges".format(cnt_noedges))
def rtree_size(self):
bb = self.rtree.bounds
if bb[0] < bb[2] and bb[1] < bb[3]:
rtree_size = self.rtree.count(bb)
else:
rtree_size = 0
return rtree_size
def rtree_fn(self):
rtree_fn = None
if self.dir is not None:
rtree_fn = self.dir / self.name
return rtree_fn
def setup_index(self, force=False, deserializing=False):
if not self.use_rtree:
return
if self.rtree is not None and not force:
return
if rtree is None:
raise Exception("rtree package not found")
rtree_fn = self.rtree_fn()
args = []
if deserializing and (rtree_fn is None or not rtree_fn.exists()):
deserializing = False
if self.graph and len(self.graph) > 0 and not deserializing:
if self.index_edges:
logger.debug("Generator to index edges")
def generator_function():
for label, data in self.graph.items():
lat_min, lon_min = data[0]
lat_max, lon_max = lat_min, lon_min
for idx in data[1]:
olat, olon = self.graph[idx][0]
lat_min = min(lat_min, olat)
lat_max = max(lat_max, olat)
lon_min = min(lon_min, olon)
lon_max = max(lon_max, olon)
if type(label) is not int:
raise Exception(f"Rtree index only supports integer keys for vertices")
yield (label, (lat_min, lon_min, lat_max, lon_max), None)
else:
logger.debug("Generator to index nodes")
def generator_function():
for label, data in self.graph.items():
lat, lon = data[0]
if type(label) is not int:
raise Exception(f"Rtree index only supports integer keys for vertices")
yield (label, (lat, lon, lat, lon), None)
args.append(generator_function())
t_start = time.time()
if self.dir is not None:
# props = rtree.index.Property()
# if force:
# props.overwrite = True
logger.debug(f"Creating new file-based rtree index ({rtree_fn}) ...")
args.insert(0, str(rtree_fn))
elif deserializing:
raise Exception("Cannot deserialize, no directory given")
else:
logger.debug(f"Creating new in-memory rtree index (args={args}) ...")
self.rtree = rtree.index.Index(*args)
t_delta = time.time() - t_start
logger.debug(f"... done: rtree size = {self.rtree_size()}, time = {t_delta} sec")
def fill_index(self):
if not self.use_rtree or self.rtree is None:
return
for label, data in self.graph.items():
loc = data[0]
self.rtree.insert(label, (loc[1], loc[0], loc[1], loc[0]))
logger.debug(f"After filling rtree, size = {self.rtree_size()}")
[docs]
def to_xy(self, name=None, use_rtree=None):
"""Create a map that uses a projected XY representation on which Euclidean distances
can be used.
"""
if not self.use_latlon:
return self
if name is None:
name = self.name + "_xy"
if use_rtree is None:
use_rtree = self.use_rtree
ngraph = dict()
for label, row in self.graph.items():
lat, lon = row[0]
x, y = self.lonlat2xy(lon, lat)
ngraph[label] = ((y, x), row[1])
nmap = self.__class__(name, dir=self.dir, graph=ngraph,
use_latlon=False, use_rtree=use_rtree, index_edges=self.index_edges,
crs_xy=self.crs_xy, crs_lonlat=self.crs_lonlat)
return nmap
def latlon2xy(self, lat, lon):
x, y = self.lonlat2xy(lon, lat)
return x, y
def latlon2yx(self, lat, lon):
x, y = self.lonlat2xy(lon, lat)
return y, x
def xy2latlon(self, x, y):
lon, lat = self.xy2lonlat(x, y)
return lat, lon
def yx2latlon(self, y, x):
lon, lat = self.xy2lonlat(x, y)
return lat, lon
[docs]
def nodes_closeto(self, loc, max_dist=None, max_elmt=None):
"""Return all nodes close to the given location.
:param loc: Location
:param max_dist: Maximal distance from the location
:param max_elmt: Return only the most nearby nodes
"""
t_start = time.time()
lat, lon = loc[:2]
lat_b, lon_l, lat_t, lon_r = self.box_around_point((lat, lon), max_dist)
bb = (lat_b, lon_l, # y_min, x_min
lat_t, lon_r) # y_max, x_max
if self.rtree is not None and max_dist is not None:
logger.debug(f"Search closeby nodes to {loc}, bb={bb}")
nodes = self.rtree.intersection(bb)
else:
logger.warning("Searching closeby nodes with linear search, use an index and set max_dist")
if max_dist is not None:
nodes = (key for key, _ in self._items_in_bb(self.box_around_point((lat, lon), max_dist)))
else:
nodes = self.graph.keys()
t_delta_search = time.time() - t_start
t_start = time.time()
results = []
for label in nodes:
oloc, nbrs = self.graph[label]
dist = self.distance(loc, oloc)
if dist < max_dist:
results.append((dist, label, oloc))
results.sort()
t_delta_dist = time.time() - t_start
logger.debug(f"Found {len(results)} closeby nodes "
f"in {t_delta_search} sec and computed distances in {t_delta_dist} sec")
if max_elmt is not None:
results = results[:max_elmt]
return results
[docs]
def edges_closeto(self, loc, max_dist=None, max_elmt=None):
"""Return all nodes that are on an edge that is close to the given location.
:param loc: Location
:param max_dist: Maximal distance from the location
:param max_elmt: Return only the most nearby nodes
"""
t_start = time.time()
lat, lon = loc[:2]
if self.rtree is not None and max_dist is not None and self.index_edges:
lat_b, lon_l, lat_t, lon_r = self.box_around_point((lat, lon), max_dist)
bb = (lat_b, lon_l, # y_min, x_min
lat_t, lon_r) # y_max, x_max
logger.debug(f"Search closeby edges to {loc}, bb={bb}")
nodes = self.rtree.intersection(bb)
else:
if self.rtree is not None and max_dist is not None and not self.index_edges:
logger.warning("Index is built for nodes, not for edges, set the index_edges argument to true")
logger.warning("Searching closeby nodes with linear search, use an index and set max_dist")
if max_dist is not None:
bb = self.box_around_point((lat, lon), max_dist)
nodes = (key for key, _ in self._items_in_bb(bb))
else:
nodes = self.graph.keys()
t_delta_search = time.time() - t_start
t_start = time.time()
results = []
for label in nodes:
oloc, nbrs = self.graph[label]
for nbr in nbrs:
if label == nbr:
continue
nbr_data = self.graph[nbr]
dist, pi, ti = self.distance_point_to_segment(loc, oloc, nbr_data[0])
# print(f"label={label}/{oloc}, nbr={nbr}/{nbr_data[0]} -- loc={loc} -> {dist}, {pi}, {ti}")
if dist < max_dist:
results.append((dist, label, oloc, nbr, nbr_data[0], pi, ti))
results.sort()
t_delta_dist = time.time() - t_start
logger.debug(f"Found {len(results)} closeby edges "
f"in {t_delta_search} sec and computed distances in {t_delta_dist} sec")
if max_elmt is not None:
results = results[:max_elmt]
return results
[docs]
def nodes_nbrto(self, node):
results = []
if node not in self.graph:
return results
loc_node, nbrs = self.graph[node]
for nbr_label in nbrs + [node]:
try:
loc_nbr = self.graph[nbr_label][0]
if loc_nbr is not None:
results.append((nbr_label, loc_nbr))
except KeyError:
pass
return results
[docs]
def edges_nbrto(self, edge):
results = []
l1, l2 = edge
p1 = self.node_coordinates(l1)
p2 = self.node_coordinates(l2)
# Edges that connect at end of this edge
for l3, p3 in self.nodes_nbrto(l2):
results.append((l2, p2, l3, p3))
# Edges that are in parallel and close
if self.linked_edges:
for (l3, l4) in self.linked_edges.get(edge, []):
p3 = self.node_coordinates(l3)
p4 = self.node_coordinates(l4)
results.append((l3, p3, l4, p4))
return results
[docs]
def find_duplicates(self, func=None):
"""Find entries with identical locations."""
cnt = 0
for label, data in self.graph.items():
lat, lon = data[0]
idxs = list(self.rtree.nearest((lat, lon, lat, lon), num_results=1))
idxs.remove(label)
if len(idxs) > 0:
# logger.info(f"Found doubles for {label}: {idxs}")
if func:
func(label, idxs)
logger.info(f"Found {cnt} doubles")
def connect_parallelroads(self, dist=0.5, bb=None):
if self.rtree is None or not self.index_edges:
logger.error("Finding parallel roads requires and edge-based index")
return
self.linked_edges = {}
it = self.all_edges(bb=bb)
if tqdm:
it = tqdm.tqdm(list(it))
for key_a, loc_a, key_b, loc_b in it:
bb2 = [min(loc_a[0], loc_b[0]), min(loc_a[1], loc_b[1]),
max(loc_a[0], loc_b[0]), max(loc_a[1], loc_b[1])]
for key_c, loc_c, key_d, loc_d in self.all_edges(bb=bb2):
if key_a == key_c or key_a == key_d or key_b == key_c or key_b == key_d:
continue
# print(f"Test: ({key_a},{key_b}) - ({key_c},{key_d})")
if self.lines_parallel(loc_a, loc_b, loc_c, loc_d, d=dist):
# print(f"Parallel: ({key_a},{key_b}) - ({key_c},{key_d})")
key = (key_a, key_b)
if key in self.linked_edges:
self.linked_edges[key].add((key_c, key_d))
else:
self.linked_edges[key] = {(key_c, key_d)}
logger.debug(f"Linked {len(self.linked_edges)} edges")
def print_stats(self):
print("Graph\n-----")
print("Nodes: {}".format(len(self.graph)))
def __str__(self):
# s = ""
# for label, (loc, nbrs, _) in self.graph.items():
# s += f"{label:<10} - ({loc[0]:10.4f}, {loc[1]:10.4f})\n"
# return s
return f"InMemMap({self.name}, size={self.size()})"