|  | 
|  | 1 | +from collections import OrderedDict | 
|  | 2 | +import numpy as np | 
|  | 3 | +from scipy.spatial import distance | 
|  | 4 | +from scipy.optimize import linear_sum_assignment | 
|  | 5 | +from motrackers.tracker import Tracker | 
|  | 6 | +from motrackers.track import KFTrackCentroid | 
|  | 7 | +from motrackers.utils.misc import get_centroid | 
|  | 8 | + | 
|  | 9 | + | 
|  | 10 | +def assign_tracks2detection_centroid_distances(bbox_tracks, bbox_detections, distance_threshold=10.): | 
|  | 11 | +    """ | 
|  | 12 | +    Assigns detected bounding boxes to tracked bounding boxes using IoU as a distance metric. | 
|  | 13 | +
 | 
|  | 14 | +    Args: | 
|  | 15 | +        bbox_tracks (numpy.ndarray): tracked bounding boxes with shape (n, 4) each row as (xmin, ymin, width, height). | 
|  | 16 | +        bbox_detections (numpy.ndarray): detection bounding boxes with shape (m, 4) each row as (xmin, ymin, width, height). | 
|  | 17 | +        distance_threshold (float): Minimum distance between the tracked object and new detection to consider for assignment. | 
|  | 18 | +
 | 
|  | 19 | +    Returns: | 
|  | 20 | +        tuple: Tuple containing the following elements: | 
|  | 21 | +            - matches (numpy.ndarray):  Array of shape `(n, 2)` where `n` is number of pairs formed after | 
|  | 22 | +                matching tracks to detections. This is an array of tuples with each element as matched pair | 
|  | 23 | +                of indices`(track_index, detection_index)`. | 
|  | 24 | +            - unmatched_detections (numpy.ndarray):  Array of shape `(m,)` where `m` is number of unmatched detections. | 
|  | 25 | +            - unmatched_tracks (numpy.ndarray):  Array of shape `(k,)` where `k` is the number of unmatched tracks. | 
|  | 26 | +    """ | 
|  | 27 | + | 
|  | 28 | +    if (bbox_tracks.size == 0) or (bbox_detections.size == 0): | 
|  | 29 | +        return np.empty((0, 2), dtype=int), np.arange(len(bbox_detections), dtype=int), np.empty((0,), dtype=int) | 
|  | 30 | + | 
|  | 31 | +    if len(bbox_tracks.shape) == 1: | 
|  | 32 | +        bbox_tracks = bbox_tracks[None, :] | 
|  | 33 | + | 
|  | 34 | +    if len(bbox_detections.shape) == 1: | 
|  | 35 | +        bbox_detections = bbox_detections[None, :] | 
|  | 36 | + | 
|  | 37 | +    estimated_track_centroids = get_centroid(bbox_tracks) | 
|  | 38 | +    detection_centroids = get_centroid(bbox_detections) | 
|  | 39 | +    centroid_distances = distance.cdist(estimated_track_centroids, detection_centroids) | 
|  | 40 | + | 
|  | 41 | +    assigned_tracks, assigned_detections = linear_sum_assignment(centroid_distances) | 
|  | 42 | + | 
|  | 43 | +    unmatched_detections, unmatched_tracks = [], [] | 
|  | 44 | + | 
|  | 45 | +    for d in range(bbox_detections.shape[0]): | 
|  | 46 | +        if d not in assigned_detections: | 
|  | 47 | +            unmatched_detections.append(d) | 
|  | 48 | + | 
|  | 49 | +    for t in range(bbox_tracks.shape[0]): | 
|  | 50 | +        if t not in assigned_tracks: | 
|  | 51 | +            unmatched_tracks.append(t) | 
|  | 52 | + | 
|  | 53 | +    # filter out matched with high distance between centroids | 
|  | 54 | +    matches = [] | 
|  | 55 | +    for t, d in zip(assigned_tracks, assigned_detections): | 
|  | 56 | +        if centroid_distances[t, d] > distance_threshold: | 
|  | 57 | +            unmatched_detections.append(d) | 
|  | 58 | +            unmatched_tracks.append(t) | 
|  | 59 | +        else: | 
|  | 60 | +            matches.append((t, d)) | 
|  | 61 | + | 
|  | 62 | +    if len(matches): | 
|  | 63 | +        matches = np.array(matches) | 
|  | 64 | +    else: | 
|  | 65 | +        matches = np.empty((0, 2), dtype=int) | 
|  | 66 | + | 
|  | 67 | +    return matches, np.array(unmatched_detections), np.array(unmatched_tracks) | 
|  | 68 | + | 
|  | 69 | + | 
|  | 70 | +class CentroidKF_Tracker(Tracker): | 
|  | 71 | +    """ | 
|  | 72 | +    Kalman filter based tracking of multiple detected objects. | 
|  | 73 | +
 | 
|  | 74 | +    Parameters | 
|  | 75 | +    ---------- | 
|  | 76 | +    max_lost : int | 
|  | 77 | +        Maximum number of consecutive frames object was not detected. | 
|  | 78 | +    tracker_output_format : str | 
|  | 79 | +        Output format of the tracker. | 
|  | 80 | +    process_noise_scale : float or numpy.ndarray | 
|  | 81 | +        Process noise covariance matrix of shape (3, 3) or covariance magnitude as scalar value. | 
|  | 82 | +    measurement_noise_scale : float or numpy.ndarray | 
|  | 83 | +        Measurement noise covariance matrix of shape (1,) or covariance magnitude as scalar value. | 
|  | 84 | +    time_step : int or float | 
|  | 85 | +        Time step for Kalman Filter. | 
|  | 86 | +    """ | 
|  | 87 | + | 
|  | 88 | +    def __init__( | 
|  | 89 | +            self, | 
|  | 90 | +            max_lost=1, | 
|  | 91 | +            centroid_distance_threshold=30., | 
|  | 92 | +            tracker_output_format='mot_challenge', | 
|  | 93 | +            process_noise_scale=1.0, | 
|  | 94 | +            measurement_noise_scale=1.0, | 
|  | 95 | +            time_step=1 | 
|  | 96 | +    ): | 
|  | 97 | +        self.time_step = time_step | 
|  | 98 | +        self.process_noise_scale = process_noise_scale | 
|  | 99 | +        self.measurement_noise_scale = measurement_noise_scale | 
|  | 100 | +        self.centroid_distance_threshold = centroid_distance_threshold | 
|  | 101 | +        self.kalman_trackers = OrderedDict() | 
|  | 102 | +        super().__init__(max_lost, tracker_output_format) | 
|  | 103 | + | 
|  | 104 | +    def _add_track(self, frame_id, bbox, detection_confidence, class_id, **kwargs): | 
|  | 105 | +        self.tracks[self.next_track_id] = KFTrackCentroid( | 
|  | 106 | +            self.next_track_id, frame_id, bbox, detection_confidence, class_id=class_id, | 
|  | 107 | +            data_output_format=self.tracker_output_format, process_noise_scale=self.process_noise_scale, | 
|  | 108 | +            measurement_noise_scale=self.measurement_noise_scale, **kwargs | 
|  | 109 | +        ) | 
|  | 110 | +        self.next_track_id += 1 | 
|  | 111 | + | 
|  | 112 | +    def update(self, bboxes, detection_scores, class_ids): | 
|  | 113 | +        self.frame_count += 1 | 
|  | 114 | +        bbox_detections = np.array(bboxes, dtype='int') | 
|  | 115 | + | 
|  | 116 | +        track_ids = list(self.tracks.keys()) | 
|  | 117 | +        bbox_tracks = [] | 
|  | 118 | +        for track_id in track_ids: | 
|  | 119 | +            bbox_tracks.append(self.tracks[track_id].predict()) | 
|  | 120 | +        bbox_tracks = np.array(bbox_tracks) | 
|  | 121 | + | 
|  | 122 | +        matches, unmatched_detections, unmatched_tracks = assign_tracks2detection_centroid_distances( | 
|  | 123 | +            bbox_tracks, bbox_detections, distance_threshold=self.centroid_distance_threshold | 
|  | 124 | +        ) | 
|  | 125 | + | 
|  | 126 | +        for i in range(matches.shape[0]): | 
|  | 127 | +            t, d = matches[i, :] | 
|  | 128 | +            track_id = track_ids[t] | 
|  | 129 | +            bbox = bboxes[d, :] | 
|  | 130 | +            cid = class_ids[d] | 
|  | 131 | +            confidence = detection_scores[d] | 
|  | 132 | +            self._update_track(track_id, self.frame_count, bbox, confidence, cid, lost=0) | 
|  | 133 | + | 
|  | 134 | +        for d in unmatched_detections: | 
|  | 135 | +            bbox = bboxes[d, :] | 
|  | 136 | +            cid = class_ids[d] | 
|  | 137 | +            confidence = detection_scores[d] | 
|  | 138 | +            self._add_track(self.frame_count, bbox, confidence, cid) | 
|  | 139 | + | 
|  | 140 | +        for t in unmatched_tracks: | 
|  | 141 | +            track_id = track_ids[t] | 
|  | 142 | +            bbox = bbox_tracks[t, :] | 
|  | 143 | +            confidence = self.tracks[track_id].detection_confidence | 
|  | 144 | +            cid = self.tracks[track_id].class_id | 
|  | 145 | +            self._update_track(track_id, self.frame_count, bbox, confidence, cid, lost=1) | 
|  | 146 | + | 
|  | 147 | +            if self.tracks[track_id].lost > self.max_lost: | 
|  | 148 | +                self._remove_track(track_id) | 
|  | 149 | + | 
|  | 150 | +        outputs = self._get_tracks(self.tracks) | 
|  | 151 | +        return outputs | 
0 commit comments