sijapi/sijapi/classes.py

353 lines
13 KiB
Python
Raw Normal View History

from pydantic import BaseModel, Field
from typing import List, Optional, Any, Tuple, Dict, Union, Tuple
from datetime import datetime, timedelta, timezone
import asyncio
import json
from timezonefinder import TimezoneFinder
from pathlib import Path
import asyncpg
import aiohttp
import aiofiles
from contextlib import asynccontextmanager
from concurrent.futures import ThreadPoolExecutor
import reverse_geocoder as rg
from timezonefinder import TimezoneFinder
from srtm import get_data
class Location(BaseModel):
latitude: float
longitude: float
datetime: datetime
elevation: Optional[float] = None
altitude: Optional[float] = None
zip: Optional[str] = None
street: Optional[str] = None
city: Optional[str] = None
state: Optional[str] = None
country: Optional[str] = None
context: Optional[Dict[str, Any]] = None
class_: Optional[str] = None
type: Optional[str] = None
name: Optional[str] = None
display_name: Optional[str] = None
boundingbox: Optional[List[str]] = None
amenity: Optional[str] = None
house_number: Optional[str] = None
road: Optional[str] = None
quarter: Optional[str] = None
neighbourhood: Optional[str] = None
suburb: Optional[str] = None
county: Optional[str] = None
country_code: Optional[str] = None
class Config:
json_encoders = {
datetime: lambda dt: dt.isoformat(),
}
class Geocoder:
def __init__(self, named_locs: Union[str, Path] = None, cache_file: Union[str, Path] = 'timezone_cache.json'):
self.tf = TimezoneFinder()
self.srtm_data = get_data()
self.named_locs = Path(named_locs) if named_locs else None
self.cache_file = Path(cache_file)
self.last_timezone: str = "America/Los_Angeles"
self.last_update: Optional[datetime] = None
self.last_location: Optional[Tuple[float, float]] = None
self.executor = ThreadPoolExecutor()
async def location(self, lat: float, lon: float):
loop = asyncio.get_running_loop()
return await loop.run_in_executor(self.executor, rg.search, [(lat, lon)])
async def elevation(self, latitude: float, longitude: float, unit: str = "m") -> float:
loop = asyncio.get_running_loop()
elevation = await loop.run_in_executor(self.executor, self.srtm_data.get_elevation, latitude, longitude)
if unit == "m":
return elevation
elif unit == "km":
return elevation / 1000
elif unit == "ft" or unit == "'":
return elevation * 3.280839895
else:
raise ValueError(f"Unsupported unit: {unit}")
async def timezone(self, lat: float, lon: float):
loop = asyncio.get_running_loop()
timezone = await loop.run_in_executor(self.executor, self.tf.timezone_at, lat, lon)
return timezone if timezone else 'Unknown'
async def lookup(self, lat: float, lon: float):
city, state, country = (await self.location(lat, lon))[0]['name'], (await self.location(lat, lon))[0]['admin1'], (await self.location(lat, lon))[0]['cc']
elevation = await self.elevation(lat, lon)
timezone = await self.timezone(lat, lon)
return {
"city": city,
"state": state,
"country": country,
"elevation": elevation,
"timezone": timezone
}
async def code(self, locations: Union[Location, Tuple[float, float], List[Union[Location, Tuple[float, float]]]]) -> Union[Location, List[Location]]:
if isinstance(locations, (Location, tuple)):
locations = [locations]
processed_locations = []
for loc in locations:
if isinstance(loc, tuple):
processed_locations.append(Location(latitude=loc[0], longitude=loc[1]))
elif isinstance(loc, Location):
processed_locations.append(loc)
else:
raise ValueError(f"Unsupported location type: {type(loc)}")
coordinates = [(location.latitude, location.longitude) for location in processed_locations]
geocode_results = await self.location(*zip(*coordinates))
elevations = await asyncio.gather(*[self.elevation(lat, lon) for lat, lon in coordinates])
timezones = await asyncio.gather(*[self.timezone(lat, lon) for lat, lon in coordinates])
geocoded_locations = []
for location, result, elevation, timezone in zip(processed_locations, geocode_results, elevations, timezones):
geocoded_location = Location(
latitude=location.latitude,
longitude=location.longitude,
elevation=elevation,
datetime=location.datetime or datetime.now(timezone.utc),
zip=result.get("admin2"),
city=result.get("name"),
state=result.get("admin1"),
country=result.get("cc"),
context=location.context or {},
name=result.get("name"),
display_name=f"{result.get('name')}, {result.get('admin1')}, {result.get('cc')}",
country_code=result.get("cc"),
timezone=timezone
)
# Merge original location data with geocoded data
for field in location.__fields__:
if getattr(location, field) is None:
setattr(location, field, getattr(geocoded_location, field))
geocoded_locations.append(location)
return geocoded_locations[0] if len(geocoded_locations) == 1 else geocoded_locations
async def geocode_osm(self, latitude: float, longitude: float, email: str):
url = f"https://nominatim.openstreetmap.org/reverse?format=json&lat={latitude}&lon={longitude}"
headers = {
'User-Agent': f'sijapi/1.0 ({email})', # replace with your app name and email
}
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as response:
response.raise_for_status()
data = await response.json()
address = data.get("address", {})
elevation = await self.elevation(latitude, longitude)
return Location(
latitude=latitude,
longitude=longitude,
elevation=elevation,
datetime=datetime.now(timezone.utc),
zip=address.get("postcode"),
street=address.get("road"),
city=address.get("city"),
state=address.get("state"),
country=address.get("country"),
context={},
class_=data.get("class"),
type=data.get("type"),
name=data.get("name"),
display_name=data.get("display_name"),
amenity=address.get("amenity"),
house_number=address.get("house_number"),
road=address.get("road"),
quarter=address.get("quarter"),
neighbourhood=address.get("neighbourhood"),
suburb=address.get("suburb"),
county=address.get("county"),
country_code=address.get("country_code"),
timezone=await self.timezone(latitude, longitude)
)
def load_override_locations(self):
if self.named_locs and self.named_locs.exists():
with open(self.named_locs, 'r') as file:
return yaml.safe_load(file)
return []
def haversine(self, lat1, lon1, lat2, lon2):
R = 6371 # Earth's radius in kilometers
lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2])
dlat = lat2 - lat1
dlon = lon2 - lon1
a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
c = 2 * atan2(sqrt(a), sqrt(1-a))
return R * c
async def find_override_location(self, lat: float, lon: float) -> Optional[str]:
closest_location = None
closest_distance = float('inf')
for location in self.override_locations:
loc_name = location.get("name")
loc_lat = location.get("latitude")
loc_lon = location.get("longitude")
loc_radius = location.get("radius")
distance = self.haversine(lat, lon, loc_lat, loc_lon)
if distance <= loc_radius:
if distance < closest_distance:
closest_distance = distance
closest_location = loc_name
return closest_location
async def refresh_timezone(self, location: Union[Location, Tuple[float, float]], force: bool = False) -> str:
if isinstance(location, Location):
lat, lon = location.latitude, location.longitude
else:
lat, lon = location
current_time = datetime.now()
if (force or
not self.last_update or
current_time - self.last_update > timedelta(hours=1) or
self.last_location != (lat, lon)):
new_timezone = await self.timezone(lat, lon)
self.last_timezone = new_timezone
self.last_update = current_time
self.last_location = (lat, lon)
await self.tz_save()
return self.last_timezone
async def tz_save(self):
cache_data = {
'last_timezone': self.last_timezone,
'last_update': self.last_update.isoformat() if self.last_update else None,
'last_location': self.last_location
}
async with aiofiles.open(self.cache_file, 'w') as f:
await f.write(json.dumps(cache_data))
async def tz_cached(self):
try:
async with aiofiles.open(self.cache_file, 'r') as f:
cache_data = json.loads(await f.read())
self.last_timezone = cache_data.get('last_timezone')
self.last_update = datetime.fromisoformat(cache_data['last_update']) if cache_data.get('last_update') else None
self.last_location = tuple(cache_data['last_location']) if cache_data.get('last_location') else None
except (FileNotFoundError, json.JSONDecodeError):
# If file doesn't exist or is invalid, we'll start fresh
pass
async def tz_current(self, location: Union[Location, Tuple[float, float]]) -> str:
await self.tz_cached()
return await self.refresh_timezone(location)
async def tz_last(self) -> Optional[str]:
await self.tz_cached()
return self.last_timezone
def __del__(self):
self.executor.shutdown()
class Database(BaseModel):
host: str = Field(..., description="Database host")
port: int = Field(5432, description="Database port")
user: str = Field(..., description="Database user")
password: str = Field(..., description="Database password")
database: str = Field(..., description="Database name")
db_schema: Optional[str] = Field(None, description="Database schema")
@asynccontextmanager
async def get_connection(self):
conn = await asyncpg.connect(
host=self.host,
port=self.port,
user=self.user,
password=self.password,
database=self.database
)
try:
if self.db_schema:
await conn.execute(f"SET search_path TO {self.db_schema}")
yield conn
finally:
await conn.close()
@classmethod
def from_env(cls):
import os
return cls(
host=os.getenv("DB_HOST", "localhost"),
port=int(os.getenv("DB_PORT", 5432)),
user=os.getenv("DB_USER"),
password=os.getenv("DB_PASSWORD"),
database=os.getenv("DB_NAME"),
db_schema=os.getenv("DB_SCHEMA")
)
def to_dict(self):
return self.dict(exclude_none=True)
2024-06-25 03:12:07 -07:00
class IMAPConfig(BaseModel):
username: str
password: str
host: str
port: int
encryption: str = None
class SMTPConfig(BaseModel):
username: str
password: str
host: str
port: int
encryption: str = None
class AutoResponder(BaseModel):
name: str
style: str
context: str
ollama_model: str = "llama3"
whitelist: List[str]
blacklist: List[str]
image_prompt: Optional[str] = None
smtp: SMTPConfig
2024-06-25 03:12:07 -07:00
class EmailAccount(BaseModel):
name: str
refresh: int
2024-06-25 03:12:07 -07:00
fullname: Optional[str]
bio: Optional[str]
summarize: bool = False
podcast: bool = False
2024-06-25 03:12:07 -07:00
imap: IMAPConfig
autoresponders: Optional[List[AutoResponder]]
class EmailContact(BaseModel):
email: str
name: Optional[str] = None
2024-06-25 03:12:07 -07:00
class IncomingEmail(BaseModel):
sender: str
datetime_received: datetime
recipients: List[EmailContact]
2024-06-25 03:12:07 -07:00
subject: str
body: str
attachments: List[dict] = []