11"""A Device Tracker platform that combines one or more device trackers."""
2+
23from __future__ import annotations
34
45from collections .abc import Callable , Mapping , Sequence
@@ -126,6 +127,27 @@ class Location:
126127 accuracy : float
127128
128129
130+ @dataclass
131+ class CollectedState :
132+ """Collected state from an entity update (pure computation result)."""
133+
134+ entity_id : str
135+ last_seen : datetime
136+ source_type : SourceType
137+ location_name : str | None
138+ gps : GPSType | None
139+ gps_accuracy : float | None
140+ battery : int | None
141+ charging : bool | None
142+ entity_picture : str | None
143+ good_entity_ids : tuple [str , ...]
144+ # Speed calculation inputs (computed but not yet sent)
145+ prev_lat : float | None = None
146+ prev_lon : float | None = None
147+ prev_seen : datetime | None = None
148+ prev_entity_id : str | None = None
149+
150+
129151@dataclass
130152class EntityData :
131153 """Input entity data."""
@@ -365,9 +387,9 @@ async def _restore_state(self) -> None:
365387 # List of seen entity IDs used to be in ATTR_ENTITY_ID.
366388 # If present, move it to ATTR_ENTITIES.
367389 if ATTR_ENTITY_ID in self ._attr_extra_state_attributes :
368- self ._attr_extra_state_attributes [
369- ATTR_ENTITIES
370- ] = self . _attr_extra_state_attributes . pop ( ATTR_ENTITY_ID )
390+ self ._attr_extra_state_attributes [ATTR_ENTITIES ] = (
391+ self . _attr_extra_state_attributes . pop ( ATTR_ENTITY_ID )
392+ )
371393 with suppress (KeyError ):
372394 last_seen = dt_util .parse_datetime (
373395 self ._attr_extra_state_attributes [ATTR_LAST_SEEN ]
@@ -464,12 +486,12 @@ def _drive_ending_delayed(self) -> bool:
464486 """Return if end of driving state is being delayed."""
465487 return self ._remove_driving_ended is not None
466488
467- async def _entity_updated ( # noqa: C901
489+ def _collect_new_state ( # noqa: C901
468490 self , entity_id : str , new_state : State | None
469- ) -> None :
470- """Run when an input entity has changed state ."""
491+ ) -> CollectedState | None :
492+ """Collect new state from entity update (pure computation, no side effects) ."""
471493 if not new_state or new_state .state in (STATE_UNKNOWN , STATE_UNAVAILABLE ):
472- return
494+ return None
473495
474496 entity = self ._entities [entity_id ]
475497 new_attrs = Attributes (new_state .attributes )
@@ -497,7 +519,7 @@ def get_last_seen() -> datetime | None:
497519 old_last_seen = entity .seen
498520 if old_last_seen and last_seen < old_last_seen :
499521 entity .bad ("last_seen went backwards" )
500- return
522+ return None
501523
502524 # Try to get GPS and battery data.
503525 gps : GPSType | None = None
@@ -519,8 +541,10 @@ def get_last_seen() -> datetime | None:
519541 SourceType .GPS .value if gps and gps_accuracy is not None else None ,
520542 )
521543
544+ # Capture entity picture if applicable
545+ entity_picture : str | None = None
522546 if entity .use_picture :
523- self . _attr_entity_picture = new_attrs .get (ATTR_ENTITY_PICTURE )
547+ entity_picture = new_attrs .get (ATTR_ENTITY_PICTURE )
524548
525549 state = new_state .state
526550 # Don't use location_name unless we have to.
@@ -530,15 +554,15 @@ def get_last_seen() -> datetime | None:
530554 # GPS coordinates and accuracy are required.
531555 if not gps :
532556 entity .bad ("missing gps attributes" )
533- return
557+ return None
534558 if gps_accuracy is None :
535559 entity .bad ("missing gps_accuracy attribute" )
536- return
560+ return None
537561
538562 new_data = Location (gps , gps_accuracy )
539563 old_data = cast (Location | None , entity .data )
540564 if last_seen == old_last_seen and new_data == old_data :
541- return
565+ return None
542566 entity .good (last_seen , SourceType .GPS , new_data )
543567
544568 if self ._req_movement and old_data :
@@ -549,7 +573,7 @@ def get_last_seen() -> datetime | None:
549573 self .entity_id ,
550574 entity_id ,
551575 )
552- return
576+ return None
553577
554578 elif source_type in SourceType :
555579 # Convert 'on'/'off' state of binary_sensor
@@ -563,7 +587,7 @@ def get_last_seen() -> datetime | None:
563587 entity .good (last_seen , SourceType (source_type ), state ) # type: ignore[arg-type]
564588
565589 if not self ._use_non_gps_data (entity_id , state ):
566- return
590+ return None
567591
568592 # Don't use new GPS data if it's not complete.
569593 if not gps or gps_accuracy is None :
@@ -606,7 +630,7 @@ def get_last_seen() -> datetime | None:
606630
607631 else :
608632 entity .bad (f"unsupported source_type: { source_type } " )
609- return
633+ return None
610634
611635 # Is this newer info than last update?
612636 if self ._prev_seen and last_seen <= self ._prev_seen :
@@ -618,88 +642,116 @@ def get_last_seen() -> datetime | None:
618642 dt_util .as_local (last_seen ),
619643 dt_util .as_local (self ._prev_seen ),
620644 )
621- return
622-
623- _LOGGER .debug ("Updating %s from %s" , self .entity_id , entity_id )
645+ return None
624646
625- attrs = {
626- ATTR_ENTITIES : tuple (
627- entity_id
628- for entity_id , _entity in self ._entities .items ()
629- if _entity .is_good
630- ),
631- ATTR_LAST_ENTITY_ID : entity_id ,
632- ATTR_LAST_SEEN : dt_util .as_local (_nearest_second (last_seen )),
633- }
634- if charging is not None :
635- attrs [ATTR_BATTERY_CHARGING ] = charging
647+ _LOGGER .debug ("Collecting state for %s from %s" , self .entity_id , entity_id )
636648
637- self . _set_state (
638- location_name , gps , gps_accuracy , battery , attrs , SourceType ( source_type ) # type: ignore[arg-type]
649+ good_entity_ids = tuple (
650+ eid for eid , ent in self . _entities . items () if ent . is_good
639651 )
640652
641- self ._prev_seen = last_seen
642-
643- def _set_state (
644- self ,
645- location_name : str | None ,
646- gps : GPSType | None ,
647- gps_accuracy : float | None ,
648- battery : int | None ,
649- attributes : dict ,
650- source_type : SourceType ,
651- ) -> None :
652- """Set new state."""
653- # Save previously "seen" values before updating for speed calculations, etc.
654- prev_ent : str | None
655- prev_lat : float | None
656- prev_lon : float | None
653+ # Capture previous state for speed calculation
654+ prev_lat : float | None = None
655+ prev_lon : float | None = None
656+ prev_seen : datetime | None = None
657+ prev_entity_id : str | None = None
657658 if self ._prev_seen :
658- prev_ent = self ._attr_extra_state_attributes [ ATTR_LAST_ENTITY_ID ]
659+ prev_entity_id = self ._attr_extra_state_attributes . get ( ATTR_LAST_ENTITY_ID )
659660 prev_lat = self .latitude
660661 prev_lon = self .longitude
661- else :
662- # Don't use restored attributes.
663- prev_ent = prev_lat = prev_lon = None
662+ prev_seen = self ._prev_seen
663+
664+ return CollectedState (
665+ entity_id = entity_id ,
666+ last_seen = last_seen ,
667+ source_type = SourceType (source_type ), # type: ignore[arg-type]
668+ location_name = location_name ,
669+ gps = gps ,
670+ gps_accuracy = gps_accuracy ,
671+ battery = battery ,
672+ charging = charging ,
673+ entity_picture = entity_picture ,
674+ good_entity_ids = good_entity_ids ,
675+ prev_lat = prev_lat ,
676+ prev_lon = prev_lon ,
677+ prev_seen = prev_seen ,
678+ prev_entity_id = prev_entity_id ,
679+ )
680+
681+ def _apply_state (self , data : CollectedState ) -> None :
682+ """Apply collected state to internal fields (single mutation point)."""
683+ # Update entity picture if provided
684+ if data .entity_picture is not None :
685+ self ._attr_entity_picture = data .entity_picture
686+
687+ # Compute speed before updating position
688+ speed , angle , use_new_speed = self ._compute_speed (data )
689+
690+ # Check if we were driving before this update
664691 was_driving = (
665692 self ._prev_speed is not None
666693 and self ._driving_speed is not None
667694 and self ._prev_speed >= self ._driving_speed
668695 )
669696
670- self ._battery_level = battery
671- self ._attr_source_type = source_type
672- self ._attr_location_accuracy = gps_accuracy or 0
673- self ._attr_location_name = location_name
674- lat : float | None
675- lon : float | None
676- if gps :
677- lat , lon = gps
697+ # Update core state attributes
698+ self ._battery_level = data .battery
699+ self ._attr_source_type = data .source_type
700+ self ._attr_location_accuracy = data .gps_accuracy or 0
701+ self ._attr_location_name = data .location_name
702+
703+ if data .gps :
704+ self ._attr_latitude , self ._attr_longitude = data .gps
705+ else :
706+ self ._attr_latitude = self ._attr_longitude = None
707+
708+ # Build extra state attributes
709+ attrs : dict [str , Any ] = {
710+ ATTR_ENTITIES : data .good_entity_ids ,
711+ ATTR_LAST_ENTITY_ID : data .entity_id ,
712+ ATTR_LAST_SEEN : dt_util .as_local (_nearest_second (data .last_seen )),
713+ }
714+ if data .charging is not None :
715+ attrs [ATTR_BATTERY_CHARGING ] = data .charging
716+ self ._attr_extra_state_attributes = attrs
717+
718+ # Update timestamp tracking
719+ self ._prev_seen = data .last_seen
720+
721+ # Handle speed sensor updates
722+ if use_new_speed :
723+ self ._send_speed (speed , angle )
724+ self ._prev_speed = speed
725+ self ._start_speed_stale_monitor ()
678726 else :
679- lat = lon = None
680- self ._attr_latitude = lat
681- self ._attr_longitude = lon
727+ speed = self ._prev_speed
682728
683- self ._attr_extra_state_attributes = attributes
729+ # Handle driving state
730+ self ._update_driving_state (speed , was_driving )
684731
685- last_seen = cast (datetime , attributes [ATTR_LAST_SEEN ])
686- speed = None
687- angle = None
732+ def _compute_speed (
733+ self , data : CollectedState
734+ ) -> tuple [float | None , int | None , bool ]:
735+ """Compute speed and angle from collected state (pure computation)."""
736+ speed : float | None = None
737+ angle : int | None = None
688738 use_new_speed = True
739+
740+ lat = data .gps [0 ] if data .gps else None
741+ lon = data .gps [1 ] if data .gps else None
742+
689743 if (
690- prev_ent
691- and self . _prev_seen
692- and prev_lat is not None
693- and prev_lon is not None
744+ data . prev_entity_id
745+ and data . prev_seen
746+ and data . prev_lat is not None
747+ and data . prev_lon is not None
694748 and lat is not None
695749 and lon is not None
696750 ):
697- # It's ok that last_seen is in local tz and self._prev_seen is in UTC.
698- # last_seen's value will automatically be converted to UTC during the
699- # subtraction operation.
700- seconds = (last_seen - self ._prev_seen ).total_seconds ()
751+ last_seen_local = dt_util .as_local (_nearest_second (data .last_seen ))
752+ seconds = (last_seen_local - data .prev_seen ).total_seconds ()
701753 min_seconds = MIN_SPEED_SECONDS
702- if cast ( str , attributes [ ATTR_LAST_ENTITY_ID ]) != prev_ent :
754+ if data . entity_id != data . prev_entity_id :
703755 min_seconds *= 3
704756 if seconds < min_seconds :
705757 _LOGGER .debug (
@@ -710,24 +762,23 @@ def _set_state(
710762 )
711763 use_new_speed = False
712764 else :
713- meters = cast (float , distance (prev_lat , prev_lon , lat , lon ))
765+ meters = cast (float , distance (data . prev_lat , data . prev_lon , lat , lon ))
714766 try :
715767 speed = round (meters / seconds , 1 )
716768 except TypeError :
717769 _LOGGER .error ("%s: distance() returned None" , self .name )
718770 else :
719771 if speed > MIN_ANGLE_SPEED :
720- angle = round (degrees (atan2 (lon - prev_lon , lat - prev_lat )))
772+ angle = round (
773+ degrees (atan2 (lon - data .prev_lon , lat - data .prev_lat ))
774+ )
721775 if angle < 0 :
722776 angle += 360
723777
724- if use_new_speed :
725- self ._send_speed (speed , angle )
726- self ._prev_speed = speed
727- self ._start_speed_stale_monitor ()
728- else :
729- speed = self ._prev_speed
778+ return speed , angle , use_new_speed
730779
780+ def _update_driving_state (self , speed : float | None , was_driving : bool ) -> None :
781+ """Update driving state based on speed."""
731782 # Only set state to driving if it's currently "away" (i.e., not in a zone.)
732783 if self .state != STATE_NOT_HOME :
733784 self ._cancel_drive_ending_delay ()
@@ -747,6 +798,13 @@ def _set_state(
747798 if driving or self ._drive_ending_delayed :
748799 self ._attr_location_name = STATE_DRIVING
749800
801+ async def _entity_updated (self , entity_id : str , new_state : State | None ) -> None :
802+ """Run when an input entity has changed state."""
803+ data = self ._collect_new_state (entity_id , new_state )
804+ if data is None :
805+ return
806+ self ._apply_state (data )
807+
750808 def _use_non_gps_data (self , entity_id : str , state : str ) -> bool :
751809 """Determine if state should be used for non-GPS based entity."""
752810 if state == STATE_HOME or self ._entities [entity_id ].use_all_states :
0 commit comments