# encoding: utf-8
"""
leuvenmapmatching.matcher.distance
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
:author: Wannes Meert
:copyright: Copyright 2018 DTAI, KU Leuven and Sirris.
:license: Apache License, Version 2.0, see LICENSE for details.
"""
import logging
import math
from .base import BaseMatching, BaseMatcher
from ..util.segment import Segment
from ..util.debug import printd
MYPY = False
if MYPY:
from typing import Tuple, Any, Dict
logger = logging.getLogger("be.kuleuven.cs.dtai.mapmatching")
class DistanceMatching(BaseMatching):
__slots__ = ['d_s', 'd_o', 'lpe', 'lpt'] # Additional fields
def __init__(self, *args, d_s=0.0, d_o=0.0, lpe=0.0, lpt=0.0, **kwargs):
"""
:param args: Arguments for BaseMatching
:param d_s: Distance between two (interpolated) states
:param d_o: Distance between two (interpolated) observations
:param lpe: Log probability of emission
:param lpt: Log probablity of transition
:param kwargs: Arguments for BaseMatching
"""
super().__init__(*args, **kwargs)
self.d_o: float = d_o
self.d_s: float = d_s
self.lpe: float = lpe
self.lpt: float = lpt
def _update_inner(self, m_other):
# type: (DistanceMatching, DistanceMatching) -> None
super()._update_inner(m_other)
self.d_s = m_other.d_s
self.d_o = m_other.d_o
self.lpe = m_other.lpe
self.lpt = m_other.lpt
@staticmethod
def repr_header(label_width=None, stop=""):
res = BaseMatching.repr_header(label_width)
res += f" {'dt(o)':<6} | {'dt(s)':<6} |"
if logger.isEnabledFor(logging.DEBUG):
res += f" {'lg(Pr-t)':<9} | {'lg(Pr-e)':<9} |"
return res
def __str__(self, label_width=None):
res = super().__str__(label_width)
res += f" {self.d_o:>6.2f} | {self.d_s:>6.2f} |"
if logger.isEnabledFor(logging.DEBUG):
res += f" {self.lpt:>9.2f} | {self.lpe:>9.2f} |"
return res
def __repr__(self):
return self.label
[docs]
class DistanceMatcher(BaseMatcher):
"""
Map Matching that takes into account the distance between matched locations on the map compared to
the distance between the observations (that are matched to these locations). It thus prefers matched
paths that have a similar distance than the observations.
Inspired on the method presented in:
P. Newson and J. Krumm. Hidden markov map matching through noise and sparseness.
In Proceedings of the 17th ACM SIGSPATIAL international conference on advances
in geographic information systems, pages 336–343. ACM, 2009.
The options available in :class:``BaseMatcher`` are inherited. Additionally, this class
offers:
- Transition probability is lower if the distance between observations and states is different
- Transition probability is lower if the the next match is going back on an edge or to a previous edge
- Transition probability is lower if two neighboring states represent not connected edges
- Skip non-emitting states if distance between states and observations is close to each other
"""
def __init__(self, *args, **kwargs):
"""Create a new object.
:param map_con: Map object to connect to map database
:param obs_noise: Standard deviation of noise
:param obs_noise_ne: Standard deviation of noise for non-emitting states (is set to obs_noise if not given)
:param max_dist_init: Maximum distance from start location (if not given, uses max_dist)
:param max_dist: Maximum distance from path (this is a hard cut, min_prob_norm should be better)
:param min_prob_norm: Minimum normalized probability of observations (ema)
:param non_emitting_states: Allow non-emitting states. A non-emitting state is a state that is
not associated with an observation. Here we assume it can be associated with a location in between
two observations to allow for pruning. It is advised to set min_prob_norm and/or max_dist to avoid
visiting all possible nodes in the graph.
:param non_emitting_length_factor: Reduce the probability of a sequence of non-emitting states the longer it
is. This can be used to prefer shorter paths. This is separate from the transition probabilities because
transition probabilities are averaged for non-emitting states and thus the length is also averaged out.
:param max_lattice_width: Restrict the lattice (or possible candidate states per observation) to this value.
If there are more possible next states, the states with the best likelihood so far are selected.
:param dist_noise: Standard deviation of difference between distance between states and distance
between observatoins. If not given, set to obs_noise
:param dist_noise_ne: If not given, set to dist_noise
:param restrained_ne: Avoid non-emitting states if the distance between states and between
observations is close to each other.
:param avoid_goingback: If true, the probability is lowered for a transition that returns back to a
previous edges or returns to a position on an edge.
:param args: Arguments for BaseMatcher
:param kwargs: Arguments for BaseMatcher
"""
if not kwargs.get("only_edges", True):
logger.warning("The MatcherDistance method only works on edges as states. Nodes have been disabled.")
kwargs["only_edges"] = True
if "matching" not in kwargs:
kwargs["matching"] = DistanceMatching
super().__init__(*args, **kwargs)
self.use_original = kwargs.get('use_original', False)
# if not use_original, the following value for beta gives a prob of 0.5 at dist=x_half:
# beta = np.sqrt(np.power(x_half, 2) / (np.log(2)*2))
self.dist_noise = kwargs.get('dist_noise', self.obs_noise)
self.dist_noise_ne = kwargs.get('dist_noise_ne', self.dist_noise)
self.beta = 2 * self.dist_noise ** 2
self.beta_ne = 2 * self.dist_noise_ne ** 2
self.sigma = 2 * self.obs_noise ** 2
self.sigma_ne = 2 * self.obs_noise_ne ** 2
self.restrained_ne = kwargs.get('restrained_ne', True)
self.restrained_ne_thr = 1.25 # Threshold
self.exact_dt_s = True # Newson and Krumm is 'True'
self.avoid_goingback = kwargs.get('avoid_goingback', True)
self.gobackonedge_factor_log = math.log(0.5)
self.gobacktoedge_factor_log = math.log(0.5)
self.first_farend_penalty = math.log(0.75) # should be > gobacktoedge_factor_log
self.notconnectededges_factor_log = math.log(0.5)
[docs]
def logprob_trans(self, prev_m, edge_m, edge_o,
is_prev_ne=False, is_next_ne=False):
# type: (DistanceMatcher, DistanceMatching, Segment, Segment, bool, bool) -> Tuple[float, Dict[str, Any]]
"""Transition probability.
The probability is defined with a formula from the exponential family.
:math:`P(dt) = exp(-d_t^2 / (2 * dist_{noise}^2))`
with :math:`d_t = |d_s - d_o|,
d_s = |loc_{prev\_state} - loc_{cur\_state}|,
d_o = |loc_{prev\_obs} - loc_{cur\_obs}|`
This function is more tolerant for low values. The intuition is that values under a certain
distance should all be close to probability 1.0.
Note: We should also smooth the distance between observations to handle outliers better.
:param prev_m: Previous matching / state
:param edge_m: Edge between matchings / states
:param edge_o: Edge between observations
:param is_prev_ne: Is previous state non-emitting
:param is_next_ne: Is the next state non-emitting
:param dist_o: First output of distance_progress
:param dist_m: Second output of distance_progress
:return:
"""
d_z = self.map.distance(prev_m.edge_o.pi, edge_o.pi)
is_same_edge = False
if (prev_m.edge_m.l1 == edge_m.l1 and prev_m.edge_m.l2 == edge_m.l2) or \
(prev_m.edge_m.l1 == edge_m.l2 and prev_m.edge_m.l2 == edge_m.l1):
is_same_edge = True
if ((not self.exact_dt_s) or
is_same_edge or # On same edge
prev_m.edge_m.l2 != edge_m.l1): # Edges are not connected
d_x = self.map.distance(prev_m.edge_m.pi, edge_m.pi)
else:
# Take into account the curvature
d_x = self.map.distance(prev_m.edge_m.pi, prev_m.edge_m.p2) + self.map.distance(prev_m.edge_m.p2, edge_m.pi)
if is_next_ne:
# For non-emitting states, the distances are added
# Otherwise it can map to a sequence of short segments and stay at the same
# observation because the difference is then always small.
d_z += prev_m.d_o
d_x += prev_m.d_s
d_t = abs(d_z - d_x)
# p_dt = 1 / beta * math.exp(-d_t / beta)
if is_prev_ne or is_next_ne:
beta = self.beta_ne
else:
beta = self.beta
logprob = -d_t ** 2 / beta
# Penalties
if prev_m.edge_m.label == edge_m.label:
# Staying in same state
if self.avoid_goingback and edge_m.key == prev_m.edge_m.key and edge_m.ti < prev_m.edge_m.ti:
# Going back on edge (direction is from p1 to p2 of the segment)
logprob += self.gobackonedge_factor_log # Prefer not going back
elif (prev_m.edge_m.l1, prev_m.edge_m.l2) == (edge_m.l2, edge_m.l1):
if self.avoid_goingback:
logprob += self.gobackonedge_factor_log
else:
# Moving states
if prev_m.edge_m.l2 != edge_m.l1:
# We are moving between states that represent edges that are not connected through a node
logprob += self.notconnectededges_factor_log
elif self.avoid_goingback:
# Goin back on state
going_back = False
for m in prev_m.prev:
if edge_m.label == m.edge_m.label:
going_back = True
break
if going_back:
logprob += self.gobacktoedge_factor_log # prefer not going back
props = {
'd_o': d_z,
'd_s': d_x,
'lpt': logprob
}
return logprob, props
[docs]
def logprob_obs(self, dist, prev_m=None, new_edge_m=None, new_edge_o=None, is_ne=False):
# type: (DistanceMatcher, float, DistanceMatching, Segment, Segment, bool) -> Tuple[float, Dict[str, Any]]
"""Emission probability for emitting states.
Exponential family:
:math:`P(dt) = exp(-d_o^2 / (2 * obs_{noise}^2))`
with :math:`d_o = |loc_{state} - loc_{obs}|`
"""
if is_ne:
sigma = self.sigma_ne
else:
sigma = self.sigma
result = -dist ** 2 / sigma
props = {
'lpe': result
}
return result, props
def _skip_ne_states(self, next_ne_m):
# type: (DistanceMatcher, DistanceMatching) -> bool
# Skip searching for non-emitting states when the distances between nodes
# on the map are similar to the distances between the observation
if not self.restrained_ne:
return False
if next_ne_m.d_s > 0:
factor = (next_ne_m.d_o + next_ne_m.dist_obs) / next_ne_m.d_s
else:
factor = 0
if factor < self.restrained_ne_thr:
logger.debug(f"Skip non-emitting states to {next_ne_m.label}: {factor} < {self.restrained_ne_thr} "
"(observations close enough to each other)")
return True
return False