forked from AntSimi/py-eddy-tracker
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy patharea_tracker.py
More file actions
75 lines (60 loc) · 2.37 KB
/
area_tracker.py
File metadata and controls
75 lines (60 loc) · 2.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import logging
from numba import njit
from numpy import empty, ma, ones
from ..observations.observation import EddiesObservations as Model
logger = logging.getLogger("pet")
class AreaTracker(Model):
"""
Area Tracker will used overlap to track eddy.
This tracking will used :py:meth:`~py_eddy_tracker.observations.observation.EddiesObservations.match` method
to get a similarity index, which could be between [0:1].
You could setup this class with `cmin` option to set a minimal value to valid an association.
"""
__slots__ = ("cmin",)
def __init__(self, *args, cmin=0.2, **kwargs):
super().__init__(*args, **kwargs)
self.cmin = cmin
def merge(self, *args, **kwargs):
eddies = super().merge(*args, **kwargs)
eddies.cmin = self.cmin
return eddies
@classmethod
def needed_variable(cls):
vars = ["longitude", "latitude"]
vars.extend(cls.intern(False, public_label=True))
return vars
def tracking(self, other):
"""
Core method to track
"""
shape = (self.shape[0], other.shape[0])
i, j, c = self.match(other, intern=False)
cost_mat = ma.array(empty(shape, dtype="f4"), mask=ones(shape, dtype="bool"))
mask_cmin(i, j, c, self.cmin, cost_mat.data, cost_mat.mask)
i_self, i_other = self.solve_function(cost_mat)
i_self, i_other = self.post_process_link(other, i_self, i_other)
logger.debug("%d matched with previous", i_self.shape[0])
return i_self, i_other, cost_mat[i_self, i_other]
def propagate(
self, previous_obs, current_obs, obs_to_extend, dead_track, nb_next, model
):
virtual = super().propagate(
previous_obs, current_obs, obs_to_extend, dead_track, nb_next, model
)
nb_dead = len(previous_obs)
nb_virtual_extend = nb_next - nb_dead
for key in model.elements:
if "contour_" not in key:
continue
virtual[key][:nb_dead] = current_obs[key]
if nb_virtual_extend > 0:
virtual[key][nb_dead:] = obs_to_extend[key]
return virtual
@njit(cache=True)
def mask_cmin(i, j, c, cmin, cost_mat, mask):
for k in range(c.shape[0]):
c_ = c[k]
if c_ > cmin:
i_, j_ = i[k], j[k]
cost_mat[i_, j_] = 1 - c_
mask[i_, j_] = False