Source code for leuvenmapmatching.matcher.simple

# encoding: utf-8
"""
leuvenmapmatching.matcher.simple
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

:author: Wannes Meert
:copyright: Copyright 2015-2018 DTAI, KU Leuven and Sirris.
:license: Apache License, Version 2.0, see LICENSE for details.
"""
import math
from scipy.stats import halfnorm, norm

from .base import BaseMatcher, BaseMatching
from ..util.segment import Segment


class SimpleMatching(BaseMatching):
    pass


[docs] class SimpleMatcher(BaseMatcher): def __init__(self, *args, **kwargs): """A simple matcher that prefers paths where each matched location is as close as possible to the observed position. :param avoid_goingback: Change the transition probability to be lower for the direction the path is coming from. :param kwargs: Arguments passed to :class:`BaseMatcher`. """ if "matching" not in kwargs: kwargs['matching'] = SimpleMatching super().__init__(*args, **kwargs) self.obs_noise_dist = halfnorm(scale=self.obs_noise) self.obs_noise_dist_ne = halfnorm(scale=self.obs_noise_ne) # normalize to max 1 to simulate a prob instead of density self.obs_noise_logint = math.log(self.obs_noise * math.sqrt(2 * math.pi) / 2) self.obs_noise_logint_ne = math.log(self.obs_noise_ne * math.sqrt(2 * math.pi) / 2) # Transition probability is divided (in logprob_trans) by this factor if we move back on the # current edge. self.avoid_goingback = kwargs.get('avoid_goingback', True) self.gobackonedge_factor_log = math.log(0.99) # Transition probability is divided (in logprob_trans) by this factor if the next state is # also the previous state, thus if we go back self.gobacktoedge_factor_log = math.log(0.5) # Transition probability is divided (in logprob_trans) by this factor if a transition is made # This is to try to stay on the same node unless there is a good reason self.transition_factor = math.log(0.9)
[docs] def logprob_trans(self, prev_m: BaseMatching, edge_m: Segment, edge_o: Segment, is_prev_ne=False, is_next_ne=False): """Transition probability. Note: In contrast with a regular HMM, this is not a probability density function, it needs to be a proper probability (thus values between 0.0 and 1.0). """ logprob = 0 if prev_m.edge_m.label == edge_m.label: # Staying in same state if self.avoid_goingback and edge_m.key == prev_m.edge_m.key and edge_m.ti < prev_m.edge_m.ti: # Going back on edge logprob += self.gobackonedge_factor_log # prefer not going back else: # Moving states logprob += self.transition_factor if self.avoid_goingback: # Goin back on state going_back = False for m in prev_m.prev: if edge_m.label == m.edge_m.label: going_back = True break if going_back: logprob += self.gobacktoedge_factor_log # prefer not going back return logprob, {} # All probabilities are 1 (thus technically not a distribution)
[docs] def logprob_obs(self, dist, prev_m, new_edge_m, new_edge_o, is_ne=False): """Emission probability. Note: In contrast with a regular HMM, this is not a probability density function, it needs to be a proper probability (thus values between 0.0 and 1.0). """ if is_ne: result = self.obs_noise_dist_ne.logpdf(dist) + self.obs_noise_logint_ne else: result = self.obs_noise_dist.logpdf(dist) + self.obs_noise_logint # print("logprob_obs: {} -> {:.5f} = {:.5f}".format(dist, result, math.exp(result))) return result, {}