-
-
Notifications
You must be signed in to change notification settings - Fork 314
Geographic clustering analysis #498
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Devin-Apps
wants to merge
3
commits into
ExpDev07:master
from
cognition-evals:Geographic-Clustering-Analysis
Closed
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,85 @@ | ||
| from fastapi import APIRouter, HTTPException | ||
| from typing import List, Optional | ||
| from datetime import datetime | ||
| import tempfile | ||
| from pathlib import Path | ||
|
|
||
| from app.services.geographic_analysis import GeographicAnalyzer, GeoLocation | ||
|
|
||
| router = APIRouter() | ||
| analyzer = GeographicAnalyzer() | ||
|
|
||
| @router.get("/clusters") | ||
| async def get_clusters(min_cases: int = 0): | ||
| """Get geographic clusters of COVID-19 cases""" | ||
| try: | ||
| locations = [] # Placeholder for locations | ||
|
|
||
| # Filter by minimum cases if specified | ||
| if min_cases > 0: | ||
| locations = [loc for loc in locations if loc.cases >= min_cases] | ||
|
|
||
| clusters = analyzer.identify_clusters(locations) | ||
|
|
||
| # Convert to serializable format | ||
| return { | ||
| str(cluster_id): [ | ||
| { | ||
| "latitude": loc.latitude, | ||
| "longitude": loc.longitude, | ||
| "cases": loc.cases, | ||
| "timestamp": loc.timestamp.isoformat(), | ||
| "location_id": loc.location_id | ||
| } | ||
| for loc in cluster_locations | ||
| ] | ||
| for cluster_id, cluster_locations in clusters.items() | ||
| } | ||
| except Exception as e: | ||
| raise HTTPException(status_code=500, detail=str(e)) | ||
|
|
||
| @router.get("/spread-vectors") | ||
| async def get_spread_vectors(days: int = 7): | ||
| """Get virus spread vectors""" | ||
| try: | ||
| # Fetch historical data | ||
| historical_data = [] # Replace with actual data fetching | ||
|
|
||
| vectors = analyzer.calculate_spread_vectors(historical_data, days=days) | ||
|
|
||
| return [ | ||
| { | ||
| "latitude": lat, | ||
| "longitude": lon, | ||
| "magnitude": mag | ||
| } | ||
| for lat, lon, mag in vectors | ||
| ] | ||
| except Exception as e: | ||
| raise HTTPException(status_code=500, detail=str(e)) | ||
|
|
||
| @router.get("/heatmap") | ||
| async def get_heatmap( | ||
| center_lat: Optional[float] = None, | ||
| center_lon: Optional[float] = None | ||
| ): | ||
| """Get COVID-19 risk heatmap""" | ||
| try: | ||
| # Fetch locations | ||
| locations = [] # Replace with actual data fetching | ||
|
|
||
| # Set center coordinates if provided | ||
| center = None | ||
| if center_lat is not None and center_lon is not None: | ||
| center = (center_lat, center_lon) | ||
|
|
||
| # Generate heatmap | ||
| heatmap = analyzer.generate_risk_heatmap(locations, center=center) | ||
|
|
||
| # Save to temporary file | ||
| with tempfile.NamedTemporaryFile(suffix='.html', delete=False) as tmp: | ||
| heatmap.save(tmp.name) | ||
| return {"heatmap_path": tmp.name} | ||
|
|
||
| except Exception as e: | ||
| raise HTTPException(status_code=500, detail=str(e)) | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,164 @@ | ||
| from dataclasses import dataclass | ||
| from typing import List, Dict, Tuple, Optional | ||
| import numpy as np | ||
| from sklearn.cluster import DBSCAN | ||
| import folium | ||
| from folium import plugins | ||
| import pandas as pd | ||
| from datetime import datetime, timedelta | ||
|
|
||
| @dataclass | ||
| class GeoLocation: | ||
| latitude: float | ||
| longitude: float | ||
| cases: int | ||
| timestamp: datetime | ||
| location_id: str | ||
|
|
||
| class GeographicAnalyzer: | ||
| """Analyzes geographic patterns and clusters of COVID-19 cases""" | ||
|
|
||
| def __init__(self, eps_km: float = 100, min_samples: int = 5): | ||
| """ | ||
| Initialize the Geographic Analyzer | ||
|
|
||
| Args: | ||
| eps_km: The maximum distance (in km) between two points for them to be considered neighbors | ||
| min_samples: The minimum number of points required to form a dense region | ||
| """ | ||
| self.eps_km = eps_km | ||
| self.min_samples = min_samples | ||
|
|
||
| def _haversine_distance(self, lat1: float, lon1: float, lat2: float, lon2: float) -> float: | ||
| """ | ||
| Calculate the great circle distance between two points on Earth | ||
|
|
||
| Args: | ||
| lat1, lon1: Coordinates of first point | ||
| lat2, lon2: Coordinates of second point | ||
|
|
||
| Returns: | ||
| Distance in kilometers | ||
| """ | ||
| R = 6371 # Earth's radius in kilometers | ||
|
|
||
| lat1, lon1, lat2, lon2 = map(np.radians, [lat1, lon1, lat2, lon2]) | ||
| dlat = lat2 - lat1 | ||
| dlon = lon2 - lon1 | ||
|
|
||
| a = np.sin(dlat/2)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon/2)**2 | ||
| c = 2 * np.arcsin(np.sqrt(a)) | ||
| return R * c | ||
|
|
||
| def identify_clusters(self, locations: List[GeoLocation]) -> Dict[int, List[GeoLocation]]: | ||
| """ | ||
| Identify geographic clusters using DBSCAN algorithm | ||
|
|
||
| Args: | ||
| locations: List of GeoLocation objects | ||
|
|
||
| Returns: | ||
| Dictionary mapping cluster IDs to lists of locations | ||
| """ | ||
| if not locations: | ||
| return {} | ||
|
|
||
| # Extract coordinates | ||
| coordinates = np.array([(loc.latitude, loc.longitude) for loc in locations]) | ||
|
|
||
| # Convert eps from km to coordinates (approximately) | ||
| eps = self.eps_km / 111.0 # 1 degree ≈ 111 km | ||
|
|
||
| # Perform DBSCAN clustering | ||
| db = DBSCAN(eps=eps, min_samples=self.min_samples, metric='haversine') | ||
| labels = db.fit_predict(coordinates) | ||
|
|
||
| # Group locations by cluster | ||
| clusters = {} | ||
| for label, location in zip(labels, locations): | ||
| if label not in clusters: | ||
| clusters[label] = [] | ||
| clusters[label].append(location) | ||
|
|
||
| return clusters | ||
|
|
||
| def calculate_spread_vectors(self, | ||
| historical_data: List[GeoLocation], | ||
| days: int = 7) -> List[Tuple[float, float, float]]: | ||
| """ | ||
| Calculate spread vectors based on changes in case concentrations | ||
|
|
||
| Args: | ||
| historical_data: List of historical GeoLocation objects | ||
| days: Number of days to analyze | ||
|
|
||
| Returns: | ||
| List of (latitude, longitude, magnitude) tuples representing spread vectors | ||
| """ | ||
| # Convert to DataFrame for easier manipulation | ||
| df = pd.DataFrame([ | ||
| { | ||
| 'latitude': loc.latitude, | ||
| 'longitude': loc.longitude, | ||
| 'cases': loc.cases, | ||
| 'timestamp': loc.timestamp | ||
| } | ||
| for loc in historical_data | ||
| ]) | ||
|
|
||
| # Calculate daily changes | ||
| vectors = [] | ||
| end_date = df['timestamp'].max() | ||
| start_date = end_date - timedelta(days=days) | ||
|
|
||
| # Group by location and calculate case changes | ||
| df['date'] = df['timestamp'].dt.date | ||
| start_cases = df[df['timestamp'].dt.date == start_date.date()].set_index(['latitude', 'longitude'])['cases'] | ||
| end_cases = df[df['timestamp'].dt.date == end_date.date()].set_index(['latitude', 'longitude'])['cases'] | ||
|
|
||
| case_changes = end_cases - start_cases | ||
|
|
||
| # Create vectors based on case changes | ||
| for (lat, lon) in case_changes.index: | ||
| magnitude = case_changes.get((lat, lon), 0) | ||
| if magnitude != 0: | ||
| vectors.append((lat, lon, magnitude)) | ||
|
|
||
| return vectors | ||
|
|
||
| def generate_risk_heatmap(self, | ||
| locations: List[GeoLocation], | ||
| center: Optional[Tuple[float, float]] = None) -> folium.Map: | ||
| """ | ||
| Generate a risk heatmap based on case density | ||
|
|
||
| Args: | ||
| locations: List of GeoLocation objects | ||
| center: Optional center point for the map | ||
|
|
||
| Returns: | ||
| Folium map object with heatmap layer | ||
| """ | ||
| if not locations: | ||
| raise ValueError("No locations provided for heatmap generation") | ||
|
|
||
| # Calculate center point if not provided | ||
| if center is None: | ||
| center = ( | ||
| np.mean([loc.latitude for loc in locations]), | ||
| np.mean([loc.longitude for loc in locations]) | ||
| ) | ||
|
|
||
| # Create base map | ||
| m = folium.Map(location=center, zoom_start=4) | ||
|
|
||
| # Prepare heatmap data | ||
| heat_data = [ | ||
| [loc.latitude, loc.longitude, loc.cases] | ||
| for loc in locations | ||
| ] | ||
|
|
||
| # Add heatmap layer | ||
| plugins.HeatMap(heat_data).add_to(m) | ||
|
|
||
| return m |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you define a ResponseModel for all of the new endpoints? Otherwise the Openapi docs won't know how to describe them.
https://fastapi.tiangolo.com/tutorial/response-model/