diff --git a/app/routes/geographic.py b/app/routes/geographic.py new file mode 100644 index 00000000..bbdfce78 --- /dev/null +++ b/app/routes/geographic.py @@ -0,0 +1,85 @@ +from fastapi import APIRouter, HTTPException +from typing import List, Optional +from datetime import datetime +import tempfile +from pathlib import Path + +from app.services.geographic_analysis import GeographicAnalyzer, GeoLocation + +router = APIRouter() +analyzer = GeographicAnalyzer() + +@router.get("/clusters") +async def get_clusters(min_cases: int = 0): + """Get geographic clusters of COVID-19 cases""" + try: + locations = [] # Placeholder for locations + + # Filter by minimum cases if specified + if min_cases > 0: + locations = [loc for loc in locations if loc.cases >= min_cases] + + clusters = analyzer.identify_clusters(locations) + + # Convert to serializable format + return { + str(cluster_id): [ + { + "latitude": loc.latitude, + "longitude": loc.longitude, + "cases": loc.cases, + "timestamp": loc.timestamp.isoformat(), + "location_id": loc.location_id + } + for loc in cluster_locations + ] + for cluster_id, cluster_locations in clusters.items() + } + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@router.get("/spread-vectors") +async def get_spread_vectors(days: int = 7): + """Get virus spread vectors""" + try: + # Fetch historical data + historical_data = [] # Replace with actual data fetching + + vectors = analyzer.calculate_spread_vectors(historical_data, days=days) + + return [ + { + "latitude": lat, + "longitude": lon, + "magnitude": mag + } + for lat, lon, mag in vectors + ] + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@router.get("/heatmap") +async def get_heatmap( + center_lat: Optional[float] = None, + center_lon: Optional[float] = None +): + """Get COVID-19 risk heatmap""" + try: + # Fetch locations + locations = [] # Replace with actual data fetching + + # Set center coordinates if provided + center = None + if center_lat is not None and center_lon is not None: + center = (center_lat, center_lon) + + # Generate heatmap + heatmap = analyzer.generate_risk_heatmap(locations, center=center) + + # Save to temporary file + with tempfile.NamedTemporaryFile(suffix='.html', delete=False) as tmp: + heatmap.save(tmp.name) + return {"heatmap_path": tmp.name} + + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) \ No newline at end of file diff --git a/app/services/geographic_analysis.py b/app/services/geographic_analysis.py new file mode 100644 index 00000000..c062d3e1 --- /dev/null +++ b/app/services/geographic_analysis.py @@ -0,0 +1,164 @@ +from dataclasses import dataclass +from typing import List, Dict, Tuple, Optional +import numpy as np +from sklearn.cluster import DBSCAN +import folium +from folium import plugins +import pandas as pd +from datetime import datetime, timedelta + +@dataclass +class GeoLocation: + latitude: float + longitude: float + cases: int + timestamp: datetime + location_id: str + +class GeographicAnalyzer: + """Analyzes geographic patterns and clusters of COVID-19 cases""" + + def __init__(self, eps_km: float = 100, min_samples: int = 5): + """ + Initialize the Geographic Analyzer + + Args: + eps_km: The maximum distance (in km) between two points for them to be considered neighbors + min_samples: The minimum number of points required to form a dense region + """ + self.eps_km = eps_km + self.min_samples = min_samples + + def _haversine_distance(self, lat1: float, lon1: float, lat2: float, lon2: float) -> float: + """ + Calculate the great circle distance between two points on Earth + + Args: + lat1, lon1: Coordinates of first point + lat2, lon2: Coordinates of second point + + Returns: + Distance in kilometers + """ + R = 6371 # Earth's radius in kilometers + + lat1, lon1, lat2, lon2 = map(np.radians, [lat1, lon1, lat2, lon2]) + dlat = lat2 - lat1 + dlon = lon2 - lon1 + + a = np.sin(dlat/2)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon/2)**2 + c = 2 * np.arcsin(np.sqrt(a)) + return R * c + + def identify_clusters(self, locations: List[GeoLocation]) -> Dict[int, List[GeoLocation]]: + """ + Identify geographic clusters using DBSCAN algorithm + + Args: + locations: List of GeoLocation objects + + Returns: + Dictionary mapping cluster IDs to lists of locations + """ + if not locations: + return {} + + # Extract coordinates + coordinates = np.array([(loc.latitude, loc.longitude) for loc in locations]) + + # Convert eps from km to coordinates (approximately) + eps = self.eps_km / 111.0 # 1 degree ≈ 111 km + + # Perform DBSCAN clustering + db = DBSCAN(eps=eps, min_samples=self.min_samples, metric='haversine') + labels = db.fit_predict(coordinates) + + # Group locations by cluster + clusters = {} + for label, location in zip(labels, locations): + if label not in clusters: + clusters[label] = [] + clusters[label].append(location) + + return clusters + + def calculate_spread_vectors(self, + historical_data: List[GeoLocation], + days: int = 7) -> List[Tuple[float, float, float]]: + """ + Calculate spread vectors based on changes in case concentrations + + Args: + historical_data: List of historical GeoLocation objects + days: Number of days to analyze + + Returns: + List of (latitude, longitude, magnitude) tuples representing spread vectors + """ + # Convert to DataFrame for easier manipulation + df = pd.DataFrame([ + { + 'latitude': loc.latitude, + 'longitude': loc.longitude, + 'cases': loc.cases, + 'timestamp': loc.timestamp + } + for loc in historical_data + ]) + + # Calculate daily changes + vectors = [] + end_date = df['timestamp'].max() + start_date = end_date - timedelta(days=days) + + # Group by location and calculate case changes + df['date'] = df['timestamp'].dt.date + start_cases = df[df['timestamp'].dt.date == start_date.date()].set_index(['latitude', 'longitude'])['cases'] + end_cases = df[df['timestamp'].dt.date == end_date.date()].set_index(['latitude', 'longitude'])['cases'] + + case_changes = end_cases - start_cases + + # Create vectors based on case changes + for (lat, lon) in case_changes.index: + magnitude = case_changes.get((lat, lon), 0) + if magnitude != 0: + vectors.append((lat, lon, magnitude)) + + return vectors + + def generate_risk_heatmap(self, + locations: List[GeoLocation], + center: Optional[Tuple[float, float]] = None) -> folium.Map: + """ + Generate a risk heatmap based on case density + + Args: + locations: List of GeoLocation objects + center: Optional center point for the map + + Returns: + Folium map object with heatmap layer + """ + if not locations: + raise ValueError("No locations provided for heatmap generation") + + # Calculate center point if not provided + if center is None: + center = ( + np.mean([loc.latitude for loc in locations]), + np.mean([loc.longitude for loc in locations]) + ) + + # Create base map + m = folium.Map(location=center, zoom_start=4) + + # Prepare heatmap data + heat_data = [ + [loc.latitude, loc.longitude, loc.cases] + for loc in locations + ] + + # Add heatmap layer + plugins.HeatMap(heat_data).add_to(m) + + return m \ No newline at end of file diff --git a/app/services/location/jhu.py b/app/services/location/jhu.py index 29b22db6..57bcfd82 100644 --- a/app/services/location/jhu.py +++ b/app/services/location/jhu.py @@ -100,7 +100,7 @@ async def get_category(category): "country_code": countries.country_code(country), "province": item["Province/State"], "coordinates": { - "lat": item["Lat"], + "lat": item["Lat"], "long": item["Long"], }, "history": history, @@ -206,13 +206,24 @@ async def get_locations(): return locations -def parse_history(key: tuple, locations: list): +def parse_history(key: tuple, locations: list, index: int = None): """ Helper for validating and extracting history content from locations data based on key. Validates with the current country/province key to make sure no index/column issue. """ + # If index is provided, try to get location at that index first + if index is not None: + try: + location = locations[index] + if (location["country"], location["province"]) == key: + return location["history"] + return {} # Key doesn't match at specified index + except IndexError: + return {} # Index out of range + + # Only search through all locations if no index was provided for i, location in enumerate(locations): if (location["country"], location["province"]) == key: return location["history"] diff --git a/tests/test_geographic_analysis.py b/tests/test_geographic_analysis.py new file mode 100644 index 00000000..336295d6 --- /dev/null +++ b/tests/test_geographic_analysis.py @@ -0,0 +1,113 @@ +import pytest +from datetime import datetime, timedelta +import numpy as np +from app.services.geographic_analysis import GeographicAnalyzer, GeoLocation + +@pytest.fixture +def analyzer(): + return GeographicAnalyzer(eps_km=100, min_samples=2) + +@pytest.fixture +def sample_locations(): + """Create a sample dataset of locations""" + base_time = datetime.now() + return [ + GeoLocation(40.7128, -74.0060, 100, base_time, "nyc"), # New York + GeoLocation(40.7614, -73.9776, 150, base_time, "manhattan"), # Manhattan + GeoLocation(34.0522, -118.2437, 200, base_time, "la"), # Los Angeles + GeoLocation(51.5074, -0.1278, 80, base_time, "london"), # London + ] + +@pytest.fixture +def historical_data(): + """Create historical data for testing spread vectors""" + locations = [] + base_time = datetime.now() + + # Add data for multiple days + for days in range(7): + current_time = base_time - timedelta(days=days) + locations.extend([ + GeoLocation(40.7128, -74.0060, 100 + days*10, current_time, "nyc"), + GeoLocation(40.7614, -73.9776, 150 + days*15, current_time, "manhattan"), + GeoLocation(34.0522, -118.2437, 200 + days*20, current_time, "la"), + ]) + + return locations + +def test_haversine_distance(analyzer): + """Test the haversine distance calculation""" + # New York to Los Angeles + distance = analyzer._haversine_distance(40.7128, -74.0060, 34.0522, -118.2437) + assert 3935 <= distance <= 3945 # Approximately 3940 km + +def test_identify_clusters_empty(analyzer): + """Test cluster identification with empty input""" + clusters = analyzer.identify_clusters([]) + assert clusters == {} + +def test_identify_clusters(analyzer, sample_locations): + """Test cluster identification with sample data""" + clusters = analyzer.identify_clusters(sample_locations) + + # New York and Manhattan should be in the same cluster + ny_cluster = None + for cluster_id, locations in clusters.items(): + if any(loc.location_id == "nyc" for loc in locations): + ny_cluster = cluster_id + break + + assert ny_cluster is not None + cluster_locations = clusters[ny_cluster] + assert any(loc.location_id == "manhattan" for loc in cluster_locations) + + # Los Angeles and London should be in different clusters + la_cluster = None + london_cluster = None + for cluster_id, locations in clusters.items(): + if any(loc.location_id == "la" for loc in locations): + la_cluster = cluster_id + if any(loc.location_id == "london" for loc in locations): + london_cluster = cluster_id + + assert la_cluster != london_cluster + +def test_calculate_spread_vectors(analyzer, historical_data): + """Test spread vector calculation""" + vectors = analyzer.calculate_spread_vectors(historical_data) + + assert len(vectors) > 0 + for vector in vectors: + assert len(vector) == 3 # lat, lon, magnitude + assert isinstance(vector[0], float) # latitude + assert isinstance(vector[1], float) # longitude + assert isinstance(vector[2], float) # magnitude + +def test_generate_risk_heatmap(analyzer, sample_locations): + """Test heatmap generation""" + heatmap = analyzer.generate_risk_heatmap(sample_locations) + assert heatmap is not None + + # Test with empty locations + with pytest.raises(ValueError): + analyzer.generate_risk_heatmap([]) + + # Test with custom center + custom_center = (0.0, 0.0) + heatmap = analyzer.generate_risk_heatmap(sample_locations, center=custom_center) + assert heatmap is not None + +def test_edge_cases(analyzer): + """Test edge cases and error handling""" + # Single location + single_location = [GeoLocation(0.0, 0.0, 100, datetime.now(), "single")] + clusters = analyzer.identify_clusters(single_location) + assert len(clusters) == 1 + + # Locations at same point + same_point_locations = [ + GeoLocation(0.0, 0.0, 100, datetime.now(), "point1"), + GeoLocation(0.0, 0.0, 200, datetime.now(), "point2"), + ] + clusters = analyzer.identify_clusters(same_point_locations) + assert len(clusters) == 1 \ No newline at end of file