Auto-update: Fri Jun 28 22:22:58 PDT 2024

This commit is contained in:
sanj 2024-06-28 22:22:58 -07:00
parent 48b61867f5
commit 6e960dca9e
9 changed files with 1518 additions and 914 deletions

View file

@ -12,7 +12,7 @@ from typing import List, Optional
import traceback
import logging
from .logs import Logger
from .classes import AutoResponder, IMAPConfig, SMTPConfig, EmailAccount, EmailContact, IncomingEmail, TimezoneTracker, Database, PyGeolocator
from .classes import AutoResponder, IMAPConfig, SMTPConfig, EmailAccount, EmailContact, IncomingEmail, TimezoneTracker, Database, Geocoder
# from sijapi.config.config import load_config
# cfg = load_config()
@ -68,7 +68,7 @@ GEONAMES_TXT = DATA_DIR / "geonames.txt"
LOCATIONS_CSV = DATA_DIR / "US.csv"
TZ = tz.gettz(os.getenv("TZ", "America/Los_Angeles"))
TZ_CACHE = DATA_DIR / "tzcache.json"
DynamicTZ = TimezoneTracker(TZ_CACHE)
GEO = Geocoder(NAMED_LOCATIONS, TZ_CACHE)
### Obsidian & notes
ALLOWED_FILENAME_CHARS = r'[^\w \.-]'
@ -90,7 +90,6 @@ YEAR_FMT = os.getenv("YEAR_FMT")
MONTH_FMT = os.getenv("MONTH_FMT")
DAY_FMT = os.getenv("DAY_FMT")
DAY_SHORT_FMT = os.getenv("DAY_SHORT_FMT")
GEOLOCATOR = PyGeolocator
### Large language model
LLM_URL = os.getenv("LLM_URL", "http://localhost:11434")

View file

@ -1,54 +1,88 @@
from pydantic import BaseModel
from typing import List, Optional, Any, Tuple, Dict, Union, Tuple
from datetime import datetime, timedelta
import asyncio
import asyncpg
import json
from pydantic import BaseModel, Field
from typing import Optional
import asyncpg
import os
from typing import Optional, Tuple, Union
from datetime import datetime, timedelta
from typing import List, Optional, Any, Tuple, Dict, Union, Tuple
from datetime import datetime, timedelta, timezone
import asyncio
import json
from timezonefinder import TimezoneFinder
from pathlib import Path
from pydantic import BaseModel, Field
from typing import Optional
from pydantic import BaseModel, Field
from typing import Optional
import asyncpg
from pydantic import BaseModel, Field
from typing import Optional
import asyncpg
import aiohttp
import aiofiles
from contextlib import asynccontextmanager
from concurrent.futures import ThreadPoolExecutor
import reverse_geocoder as rg
from timezonefinder import TimezoneFinder
from srtm import get_data
class PyGeolocator:
def __init__(self):
class Location(BaseModel):
latitude: float
longitude: float
datetime: datetime
elevation: Optional[float] = None
altitude: Optional[float] = None
zip: Optional[str] = None
street: Optional[str] = None
city: Optional[str] = None
state: Optional[str] = None
country: Optional[str] = None
context: Optional[Dict[str, Any]] = None
class_: Optional[str] = None
type: Optional[str] = None
name: Optional[str] = None
display_name: Optional[str] = None
boundingbox: Optional[List[str]] = None
amenity: Optional[str] = None
house_number: Optional[str] = None
road: Optional[str] = None
quarter: Optional[str] = None
neighbourhood: Optional[str] = None
suburb: Optional[str] = None
county: Optional[str] = None
country_code: Optional[str] = None
class Config:
json_encoders = {
datetime: lambda dt: dt.isoformat(),
}
class Geocoder:
def __init__(self, named_locs: Union[str, Path] = None, cache_file: Union[str, Path] = 'timezone_cache.json'):
self.tf = TimezoneFinder()
self.srtm_data = get_data()
self.named_locs = Path(named_locs) if named_locs else None
self.cache_file = Path(cache_file)
self.last_timezone: str = "America/Los_Angeles"
self.last_update: Optional[datetime] = None
self.last_location: Optional[Tuple[float, float]] = None
self.executor = ThreadPoolExecutor()
def get_location(self, lat, lon):
result = rg.search((lat, lon))
return result[0]['name'], result[0]['admin1'], result[0]['cc']
async def location(self, lat: float, lon: float):
loop = asyncio.get_running_loop()
return await loop.run_in_executor(self.executor, rg.search, [(lat, lon)])
def get_elevation(self, lat, lon):
return self.srtm_data.get_elevation(lat, lon)
async def elevation(self, latitude: float, longitude: float, unit: str = "m") -> float:
loop = asyncio.get_running_loop()
elevation = await loop.run_in_executor(self.executor, self.srtm_data.get_elevation, latitude, longitude)
if unit == "m":
return elevation
elif unit == "km":
return elevation / 1000
elif unit == "ft" or unit == "'":
return elevation * 3.280839895
else:
raise ValueError(f"Unsupported unit: {unit}")
def get_timezone(self, lat, lon):
return self.tf.timezone_at(lat=lat, lng=lon)
async def timezone(self, lat: float, lon: float):
loop = asyncio.get_running_loop()
timezone = await loop.run_in_executor(self.executor, self.tf.timezone_at, lat, lon)
return timezone if timezone else 'Unknown'
def lookup(self, lat, lon):
city, state, country = self.get_location(lat, lon)
elevation = self.get_elevation(lat, lon)
timezone = self.get_timezone(lat, lon)
async def lookup(self, lat: float, lon: float):
city, state, country = (await self.location(lat, lon))[0]['name'], (await self.location(lat, lon))[0]['admin1'], (await self.location(lat, lon))[0]['cc']
elevation = await self.elevation(lat, lon)
timezone = await self.timezone(lat, lon)
return {
"city": city,
@ -58,6 +92,179 @@ class PyGeolocator:
"timezone": timezone
}
async def code(self, locations: Union[Location, Tuple[float, float], List[Union[Location, Tuple[float, float]]]]) -> Union[Location, List[Location]]:
if isinstance(locations, (Location, tuple)):
locations = [locations]
processed_locations = []
for loc in locations:
if isinstance(loc, tuple):
processed_locations.append(Location(latitude=loc[0], longitude=loc[1]))
elif isinstance(loc, Location):
processed_locations.append(loc)
else:
raise ValueError(f"Unsupported location type: {type(loc)}")
coordinates = [(location.latitude, location.longitude) for location in processed_locations]
geocode_results = await self.location(*zip(*coordinates))
elevations = await asyncio.gather(*[self.elevation(lat, lon) for lat, lon in coordinates])
timezones = await asyncio.gather(*[self.timezone(lat, lon) for lat, lon in coordinates])
geocoded_locations = []
for location, result, elevation, timezone in zip(processed_locations, geocode_results, elevations, timezones):
geocoded_location = Location(
latitude=location.latitude,
longitude=location.longitude,
elevation=elevation,
datetime=location.datetime or datetime.now(timezone.utc),
zip=result.get("admin2"),
city=result.get("name"),
state=result.get("admin1"),
country=result.get("cc"),
context=location.context or {},
name=result.get("name"),
display_name=f"{result.get('name')}, {result.get('admin1')}, {result.get('cc')}",
country_code=result.get("cc"),
timezone=timezone
)
# Merge original location data with geocoded data
for field in location.__fields__:
if getattr(location, field) is None:
setattr(location, field, getattr(geocoded_location, field))
geocoded_locations.append(location)
return geocoded_locations[0] if len(geocoded_locations) == 1 else geocoded_locations
async def geocode_osm(self, latitude: float, longitude: float, email: str):
url = f"https://nominatim.openstreetmap.org/reverse?format=json&lat={latitude}&lon={longitude}"
headers = {
'User-Agent': f'sijapi/1.0 ({email})', # replace with your app name and email
}
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as response:
response.raise_for_status()
data = await response.json()
address = data.get("address", {})
elevation = await self.elevation(latitude, longitude)
return Location(
latitude=latitude,
longitude=longitude,
elevation=elevation,
datetime=datetime.now(timezone.utc),
zip=address.get("postcode"),
street=address.get("road"),
city=address.get("city"),
state=address.get("state"),
country=address.get("country"),
context={},
class_=data.get("class"),
type=data.get("type"),
name=data.get("name"),
display_name=data.get("display_name"),
amenity=address.get("amenity"),
house_number=address.get("house_number"),
road=address.get("road"),
quarter=address.get("quarter"),
neighbourhood=address.get("neighbourhood"),
suburb=address.get("suburb"),
county=address.get("county"),
country_code=address.get("country_code"),
timezone=await self.timezone(latitude, longitude)
)
def load_override_locations(self):
if self.named_locs and self.named_locs.exists():
with open(self.named_locs, 'r') as file:
return yaml.safe_load(file)
return []
def haversine(self, lat1, lon1, lat2, lon2):
R = 6371 # Earth's radius in kilometers
lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2])
dlat = lat2 - lat1
dlon = lon2 - lon1
a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
c = 2 * atan2(sqrt(a), sqrt(1-a))
return R * c
async def find_override_location(self, lat: float, lon: float) -> Optional[str]:
closest_location = None
closest_distance = float('inf')
for location in self.override_locations:
loc_name = location.get("name")
loc_lat = location.get("latitude")
loc_lon = location.get("longitude")
loc_radius = location.get("radius")
distance = self.haversine(lat, lon, loc_lat, loc_lon)
if distance <= loc_radius:
if distance < closest_distance:
closest_distance = distance
closest_location = loc_name
return closest_location
async def refresh_timezone(self, location: Union[Location, Tuple[float, float]], force: bool = False) -> str:
if isinstance(location, Location):
lat, lon = location.latitude, location.longitude
else:
lat, lon = location
current_time = datetime.now()
if (force or
not self.last_update or
current_time - self.last_update > timedelta(hours=1) or
self.last_location != (lat, lon)):
new_timezone = await self.timezone(lat, lon)
self.last_timezone = new_timezone
self.last_update = current_time
self.last_location = (lat, lon)
await self.tz_save()
return self.last_timezone
async def tz_save(self):
cache_data = {
'last_timezone': self.last_timezone,
'last_update': self.last_update.isoformat() if self.last_update else None,
'last_location': self.last_location
}
async with aiofiles.open(self.cache_file, 'w') as f:
await f.write(json.dumps(cache_data))
async def tz_cached(self):
try:
async with aiofiles.open(self.cache_file, 'r') as f:
cache_data = json.loads(await f.read())
self.last_timezone = cache_data.get('last_timezone')
self.last_update = datetime.fromisoformat(cache_data['last_update']) if cache_data.get('last_update') else None
self.last_location = tuple(cache_data['last_location']) if cache_data.get('last_location') else None
except (FileNotFoundError, json.JSONDecodeError):
# If file doesn't exist or is invalid, we'll start fresh
pass
async def tz_current(self, location: Union[Location, Tuple[float, float]]) -> str:
await self.tz_cached()
return await self.refresh_timezone(location)
async def tz_last(self) -> Optional[str]:
await self.tz_cached()
return self.last_timezone
def __del__(self):
self.executor.shutdown()
class Database(BaseModel):
host: str = Field(..., description="Database host")
port: int = Field(5432, description="Database port")
@ -98,15 +305,6 @@ class Database(BaseModel):
return self.dict(exclude_none=True)
class AutoResponder(BaseModel):
name: str
style: str
context: str
ollama_model: str = "llama3"
whitelist: List[str]
blacklist: List[str]
image_prompt: Optional[str] = None
class IMAPConfig(BaseModel):
username: str
password: str
@ -121,6 +319,16 @@ class SMTPConfig(BaseModel):
port: int
encryption: str = None
class AutoResponder(BaseModel):
name: str
style: str
context: str
ollama_model: str = "llama3"
whitelist: List[str]
blacklist: List[str]
image_prompt: Optional[str] = None
smtp: SMTPConfig
class EmailAccount(BaseModel):
name: str
refresh: int
@ -129,7 +337,6 @@ class EmailAccount(BaseModel):
summarize: bool = False
podcast: bool = False
imap: IMAPConfig
smtp: SMTPConfig
autoresponders: Optional[List[AutoResponder]]
class EmailContact(BaseModel):
@ -143,95 +350,3 @@ class IncomingEmail(BaseModel):
subject: str
body: str
attachments: List[dict] = []
class Location(BaseModel):
latitude: float
longitude: float
datetime: datetime
elevation: Optional[float] = None
altitude: Optional[float] = None
zip: Optional[str] = None
street: Optional[str] = None
city: Optional[str] = None
state: Optional[str] = None
country: Optional[str] = None
context: Optional[Dict[str, Any]] = None
class_: Optional[str] = None
type: Optional[str] = None
name: Optional[str] = None
display_name: Optional[str] = None
boundingbox: Optional[List[str]] = None
amenity: Optional[str] = None
house_number: Optional[str] = None
road: Optional[str] = None
quarter: Optional[str] = None
neighbourhood: Optional[str] = None
suburb: Optional[str] = None
county: Optional[str] = None
country_code: Optional[str] = None
class Config:
json_encoders = {
datetime: lambda dt: dt.isoformat(),
}
class TimezoneTracker:
def __init__(self, cache_file: Union[str, Path] = 'timezone_cache.json'):
self.cache_file = Path(cache_file)
self.last_timezone: str = "America/Los_Angeles"
self.last_update: Optional[datetime] = None
self.last_location: Optional[Tuple[float, float]] = None
self.tf = TimezoneFinder()
def find(self, lat: float, lon: float) -> str:
timezone = self.tf.timezone_at(lat=lat, lng=lon)
return timezone if timezone else 'Unknown'
async def refresh(self, location: Union[Location, Tuple[float, float]], force: bool = False) -> str:
if isinstance(location, Location):
lat, lon = location.latitude, location.longitude
else:
lat, lon = location
current_time = datetime.now()
if (force or
not self.last_update or
current_time - self.last_update > timedelta(hours=1) or
self.last_location != (lat, lon)):
new_timezone = self.find(lat, lon)
self.last_timezone = new_timezone
self.last_update = current_time
self.last_location = (lat, lon)
await self.save_to_cache()
return self.last_timezone
async def save_to_cache(self):
cache_data = {
'last_timezone': self.last_timezone,
'last_update': self.last_update.isoformat() if self.last_update else None,
'last_location': self.last_location
}
with self.cache_file.open('w') as f:
json.dump(cache_data, f)
async def load_from_cache(self):
try:
with self.cache_file.open('r') as f:
cache_data = json.load(f)
self.last_timezone = cache_data.get('last_timezone')
self.last_update = datetime.fromisoformat(cache_data['last_update']) if cache_data.get('last_update') else None
self.last_location = tuple(cache_data['last_location']) if cache_data.get('last_location') else None
except (FileNotFoundError, json.JSONDecodeError):
# If file doesn't exist or is invalid, we'll start fresh
pass
async def get_current(self, location: Union[Location, Tuple[float, float]]) -> str:
await self.load_from_cache()
return await self.refresh(location)
async def get_last(self) -> Optional[str]:
await self.load_from_cache()
return self.last_timezone

View file

@ -96,7 +96,7 @@ TRUSTED_SUBNETS=127.0.0.1/32,10.13.37.0/24,100.64.64.0/24
# ──────────
#
#─── router selection: ────────────────────────────────────────────────────────────
ROUTERS=asr,calendar,cf,email,health,llm,locate,note,rag,sd,serve,time,tts,weather
ROUTERS=asr,cal,cf,email,health,llm,loc,note,rag,sd,serve,time,tts,weather
UNLOADED=ig
#─── notes: ──────────────────────────────────────────────────────────────────────
#
@ -115,7 +115,7 @@ UNLOADED=ig
# asr: requires faster_whisper — $ pip install faster_whisper — and
# downloading the model file specified in ASR_DEFAULT_MODEL.
#
# calendar: requires (1) a Microsoft 365 account with a properly configured
# cal: requires (1) a Microsoft 365 account with a properly configured
# Azure Active Directory app, and/or (2) Calendars on macOS.
#
# cf: interfaces with the Cloudflare API and Caddy to register new
@ -138,7 +138,7 @@ UNLOADED=ig
# configured separately in the ig_config.json file; relies heavily
# on the llm and sd routers which have their own dependencies.
#
# locate: some endpoints work as is, but the core location tracking
# loc: some endpoints work as is, but the core location tracking
# functionality requires Postgresql + PostGIS extension and are
# designed specifically to pair with a mobile device where
# Pythonista is installed and configured to run the
@ -148,8 +148,8 @@ UNLOADED=ig
# note: designed for use with Obsidian plus the Daily Notes and Tasks
# core extensions; and the Admonitions, Banners, Icons (with the
# Lucide pack), and Make.md community extensions. Moreover `notes`
# relies heavily on the calendar, llm, locate, sd, summarize, time,
# tts, and weather routers and accordingly on the external
# relies heavily on the cal, llm, loc, sd, summarize, time, loc,
# and weather routers and accordingly on the external
# dependencies of each.
#
# sd: requires ComfyUI plus any modules and StableDiffusion models
@ -165,7 +165,7 @@ UNLOADED=ig
#
# weather: requires a VisualCrossing API key and is designed for (but doesn't
# itself strictly require) Postgresql with the PostGIS extension;
# (... but it presently relies on the locate router, which does).
# (... but it presently relies on the loc router, which does).
#
#
# ... Whew! that was a lot, right? I'm so glad we're in this together...
@ -217,7 +217,7 @@ TAILSCALE_API_KEY=¿SECRET? # <--- enter your own TS API key
# ░▒ ░ ░ ░ ▒ ░ ░ ░ ░ ░ ░ ░ ░░ ░ ░ ░ ░ ░
# ░░ ░ ░T̷ O̷ G̷ E̷ T̷ H̷ ░ R̷. ░ ░ ░ ░ ░
#
#─── frag, or weat,and locate modules:── .
#─── frag, or weat,and loc modules:────── .
DB_NAME=db
#
DB_HOST=127.0.0.1
@ -237,12 +237,12 @@ DB_SSH_PASS=¿SECRET? # <--- enter SSH password for pg server (if not l
# variables allow database access over an SSH tunnel.
#
# In the current implementation, we rely on Postgres to hold:
# i. user-logged location data (locate module), and
# i. user-logged location data (loc module), and
# ii. results from past weather forecast checks (weather module).
#
# A future version will hopefully make use of PostGIS's geocoding capabilities,
# and add a vector database for the LLM module. Until then it's up to you if the
# locate and weather modules are worth the hassle of maintaining Postgres.
# loc and weather modules are worth the hassle of maintaining Postgres.
# ──────────
#
#─────────────────────────────── 𝐼 𝐵 𝐸 𝑇 𝑌 𝑂 𝑈 ─────────────────────────────────

414
sijapi/routers/cal.py Normal file
View file

@ -0,0 +1,414 @@
'''
Calendar module using macOS Calendars and/or Microsoft 365 via its Graph API.
Depends on:
LOGGER, ICAL_TOGGLE, ICALENDARS, MS365_TOGGLE, MS365_CLIENT_ID, MS365_SECRET, MS365_AUTHORITY_URL, MS365_SCOPE, MS365_REDIRECT_PATH, MS365_TOKEN_PATH
'''
from fastapi import APIRouter, Depends, HTTPException, status, Request
from fastapi.responses import RedirectResponse, JSONResponse
from fastapi.security import OAuth2PasswordBearer
import httpx
import json
import os
import time
from dateutil.parser import isoparse as parse_iso
import threading
from typing import Dict, List, Any
from datetime import datetime, timedelta
from Foundation import NSDate, NSRunLoop
import EventKit as EK
from sijapi import L, ICAL_TOGGLE, ICALENDARS, MS365_TOGGLE, MS365_CLIENT_ID, MS365_SECRET, MS365_AUTHORITY_URL, MS365_SCOPE, MS365_REDIRECT_PATH, MS365_TOKEN_PATH
from sijapi.routers import loc
cal = APIRouter()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token")
timeout = httpx.Timeout(12)
if MS365_TOGGLE is True:
L.CRIT(f"Visit https://api.sij.ai/o365/login to obtain your Microsoft 365 authentication token.")
@cal.get("/o365/login")
async def login():
L.DEBUG(f"Received request to /o365/login")
L.DEBUG(f"SCOPE: {MS365_SCOPE}")
if not MS365_SCOPE:
L.ERR("No scopes defined for authorization.")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="No scopes defined for authorization."
)
authorization_url = f"{MS365_AUTHORITY_URL}/oauth2/v2.0/authorize?client_id={MS365_CLIENT_ID}&response_type=code&redirect_uri={MS365_REDIRECT_PATH}&scope={'+'.join(MS365_SCOPE)}"
L.INFO(f"Redirecting to authorization URL: {authorization_url}")
return RedirectResponse(authorization_url)
@cal.get("/o365/oauth_redirect")
async def oauth_redirect(code: str = None, error: str = None):
L.DEBUG(f"Received request to /o365/oauth_redirect")
if error:
L.ERR(f"OAuth2 Error: {error}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="OAuth2 Error"
)
L.INFO(f"Requesting token with authorization code: {code}")
token_url = f"{MS365_AUTHORITY_URL}/oauth2/v2.0/token"
data = {
"client_id": MS365_CLIENT_ID,
"client_secret": MS365_SECRET,
"code": code,
"redirect_uri": MS365_REDIRECT_PATH,
"grant_type": "authorization_code"
}
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.post(token_url, data=data)
L.DEBUG(f"Token endpoint response status code: {response.status_code}")
L.INFO(f"Token endpoint response text: {response.text}")
result = response.json()
if 'access_token' in result:
await save_token(result)
L.INFO("Access token obtained successfully")
return {"message": "Access token stored successfully"}
else:
L.CRIT(f"Failed to obtain access token. Response: {result}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to obtain access token"
)
@cal.get("/o365/me")
async def read_items():
L.DEBUG(f"Received request to /o365/me")
token = await load_token()
if not token:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Access token not found",
)
graph_url = "https://graph.microsoft.com/v1.0/me"
headers = {"Authorization": f"Bearer {token['access_token']}"}
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.get(graph_url, headers=headers)
if response.status_code == 200:
user = response.json()
L.INFO(f"User retrieved: {user}")
return user
else:
L.ERR("Invalid or expired token")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired token",
headers={"WWW-Authenticate": "Bearer"},
)
async def save_token(token):
L.DEBUG(f"Saving token: {token}")
try:
token["expires_at"] = int(time.time()) + token["expires_in"]
with open(MS365_TOKEN_PATH, "w") as file:
json.dump(token, file)
L.DEBUG(f"Saved token to {MS365_TOKEN_PATH}")
except Exception as e:
L.ERR(f"Failed to save token: {e}")
async def load_token():
if os.path.exists(MS365_TOKEN_PATH):
try:
with open(MS365_TOKEN_PATH, "r") as file:
token = json.load(file)
except FileNotFoundError:
L.ERR("Token file not found.")
return None
except json.JSONDecodeError:
L.ERR("Failed to decode token JSON")
return None
if token:
token["expires_at"] = int(time.time()) + token["expires_in"]
L.DEBUG(f"Loaded token: {token}") # Add this line to log the loaded token
return token
else:
L.DEBUG("No token found.")
return None
else:
L.ERR(f"No file found at {MS365_TOKEN_PATH}")
return None
async def is_token_expired(token):
if "expires_at" not in token:
return True # Treat missing expiration time as expired token
expiry_time = datetime.fromtimestamp(token["expires_at"])
return expiry_time <= datetime.now()
async def is_token_expired2(token):
graph_url = "https://graph.microsoft.com/v1.0/me"
headers = {"Authorization": f"Bearer {token}"}
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.get(graph_url, headers=headers)
return response.status_code == 401
async def get_new_token_with_refresh_token(refresh_token):
token_url = f"{MS365_AUTHORITY_URL}/oauth2/v2.0/token"
data = {
"client_id": MS365_CLIENT_ID,
"client_secret": MS365_SECRET,
"refresh_token": refresh_token,
"grant_type": "refresh_token",
"scope": " ".join(MS365_SCOPE),
}
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.post(token_url, data=data)
result = response.json()
if "access_token" in result:
L.INFO("Access token refreshed successfully")
return result
else:
L.ERR("Failed to refresh access token")
return None
async def refresh_token():
token = await load_token()
if not token:
L.ERR("No token found in storage")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="No token found",
)
if 'refresh_token' not in token:
L.ERR("Refresh token not found in the loaded token")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Refresh token not found",
)
refresh_token = token['refresh_token']
L.DEBUG("Found refresh token, attempting to refresh access token")
new_token = await get_new_token_with_refresh_token(refresh_token)
if new_token:
await save_token(new_token)
L.INFO("Token refreshed and saved successfully")
else:
L.ERR("Failed to refresh token")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to refresh token",
)
def get_calendar_ids() -> Dict[str, str]:
event_store = EK.EKEventStore.alloc().init()
all_calendars = event_store.calendarsForEntityType_(0) # 0 corresponds to EKEntityTypeEvent
calendar_identifiers = {
calendar.title() : calendar.calendarIdentifier() for calendar in all_calendars
}
L.DEBUG(f"{calendar_identifiers}")
return calendar_identifiers
# Helper to convert datetime to NSDate
def datetime_to_nsdate(dt: datetime) -> NSDate:
return NSDate.dateWithTimeIntervalSince1970_(dt.timestamp())
@cal.get("/events")
async def get_events_endpoint(start_date: str, end_date: str):
start_dt = await loc.dt(start_date)
end_dt = await loc.dt(end_date)
datetime.strptime(start_date, "%Y-%m-%d") or datetime.now()
end_dt = datetime.strptime(end_date, "%Y-%m-%d") or datetime.now()
response = await get_events(start_dt, end_dt)
return JSONResponse(content=response, status_code=200)
async def get_events(start_dt: datetime, end_dt: datetime) -> List:
combined_events = []
if MS365_TOGGLE:
ms_events = await get_ms365_events(start_dt, end_dt)
combined_events.extend(ms_events) # Use extend instead of append
if ICAL_TOGGLE:
calendar_ids = ICALENDARS
macos_events = get_macos_calendar_events(start_dt, end_dt, calendar_ids)
combined_events.extend(macos_events) # Use extend instead of append
parsed_events = await parse_calendar_for_day(start_dt, end_dt, combined_events)
return parsed_events
def get_macos_calendar_events(start_date: datetime, end_date: datetime, calendar_ids: List[str] = None) -> List[Dict]:
event_store = EK.EKEventStore.alloc().init()
# Request access to EventKit
def request_access() -> bool:
access_granted = []
def completion_handler(granted, error):
if error is not None:
L.ERR(f"Error: {error}")
access_granted.append(granted)
# Notify the main thread that the completion handler has executed
with access_granted_condition:
access_granted_condition.notify()
access_granted_condition = threading.Condition()
with access_granted_condition:
event_store.requestAccessToEntityType_completion_(0, completion_handler) # 0 corresponds to EKEntityTypeEvent
# Wait for the completion handler to be called
access_granted_condition.wait(timeout=10)
# Verify that the handler was called and access_granted is not empty
if access_granted:
return access_granted[0]
else:
L.ERR("Request access timed out or failed")
return False
if not request_access():
L.ERR("Access to calendar data was not granted")
return []
ns_start_date = datetime_to_nsdate(start_date)
ns_end_date = datetime_to_nsdate(end_date)
# Retrieve all calendars
all_calendars = event_store.calendarsForEntityType_(0) # 0 corresponds to EKEntityTypeEvent
if calendar_ids:
selected_calendars = [cal for cal in all_calendars if cal.calendarIdentifier() in calendar_ids]
else:
selected_calendars = all_calendars
# Filtering events by selected calendars
predicate = event_store.predicateForEventsWithStartDate_endDate_calendars_(ns_start_date, ns_end_date, selected_calendars)
events = event_store.eventsMatchingPredicate_(predicate)
event_list = []
for event in events:
# Check if event.attendees() returns None
if event.attendees():
attendees = [{'name': att.name(), 'email': att.emailAddress()} for att in event.attendees() if att.emailAddress()]
else:
attendees = []
# Format the start and end dates properly
start_date_str = event.startDate().descriptionWithLocale_(None)
end_date_str = event.endDate().descriptionWithLocale_(None)
event_data = {
"subject": event.title(),
"id": event.eventIdentifier(),
"start": start_date_str,
"end": end_date_str,
"bodyPreview": event.notes() if event.notes() else '',
"attendees": attendees,
"location": event.location() if event.location() else '',
"onlineMeetingUrl": '', # Defaulting to empty as macOS EventKit does not provide this
"showAs": 'busy', # Default to 'busy'
"isAllDay": event.isAllDay()
}
event_list.append(event_data)
return event_list
async def get_ms365_events(start_date: datetime, end_date: datetime):
token = await load_token()
if token:
if await is_token_expired(token):
await refresh_token()
else:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Access token not found",
)
# this looks like it might need updating to use tz-aware datetimes converted to UTC...
graph_url = f"https://graph.microsoft.com/v1.0/me/events?$filter=start/dateTime ge '{start_date}T00:00:00' and end/dateTime le '{end_date}T23:59:59'"
headers = {
"Authorization": f"Bearer {token['access_token']}",
"Prefer": 'outlook.timezone="Pacific Standard Time"',
}
async with httpx.AsyncClient() as client:
response = await client.get(graph_url, headers=headers)
if response.status_code != 200:
L.ERR("Failed to retrieve events from Microsoft 365")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to retrieve events",
)
ms_events = response.json().get("value", [])
return ms_events
async def parse_calendar_for_day(range_start: datetime, range_end: datetime, events: List[Dict[str, Any]]):
range_start = await loc.dt(range_start)
range_end = await loc.dt(range_end)
event_list = []
for event in events:
L.INFO(f"Event: {event}")
start_str = event.get('start')
end_str = event.get('end')
if isinstance(start_str, dict):
start_str = start_str.get('dateTime')
else:
L.INFO(f"Start date string not a dict")
if isinstance(end_str, dict):
end_str = end_str.get('dateTime')
else:
L.INFO(f"End date string not a dict")
try:
start_date = await loc.dt(start_str) if start_str else None
except (ValueError, TypeError) as e:
L.ERR(f"Invalid start date format: {start_str}, error: {e}")
continue
try:
end_date = await loc.dt(end_str) if end_str else None
except (ValueError, TypeError) as e:
L.ERR(f"Invalid end date format: {end_str}, error: {e}")
continue
L.DEBUG(f"Comparing {start_date} with range {range_start} to {range_end}")
if start_date:
# Ensure start_date is timezone-aware
start_date = await loc.dt(start_date)
# If end_date is not provided, assume it's the same as start_date
if not end_date:
end_date = start_date
else:
end_date = await loc.dt(end_date)
# Check if the event overlaps with the given range
if (start_date < range_end) and (end_date > range_start):
attendees = [{'name': att['name'], 'email': att['email']} for att in event.get('attendees', []) if 'name' in att and 'email' in att]
location = event.get('location', '')
if isinstance(location, dict):
location = location.get('displayName', '')
event_data = {
"name": event.get('subject', ''),
"uid": event.get('id', ''),
"start": start_date.strftime('%H:%M'),
"end": end_date.strftime('%H:%M') if end_date else '',
"description": event.get('bodyPreview', ''),
"attendees": attendees,
"location": location,
"url": event.get('onlineMeetingUrl', ''),
"busystatus": event.get('showAs', ''),
"busy": event.get('showAs', '') in ['busy', 'tentative'],
"all_day": event.get('isAllDay', False)
}
L.INFO(f"Event_data: {event_data}")
event_list.append(event_data)
else:
L.DEBUG(f"Event outside of specified range: {start_date} to {end_date}")
else:
L.ERR(f"Invalid or missing start date for event: {event.get('id', 'Unknown ID')}")
return event_list

View file

@ -3,6 +3,7 @@ Uses IMAP and SMTP login credentials to monitor an inbox and summarize incoming
'''
from fastapi import APIRouter
import asyncio
import aiofiles
from imbox import Imbox
from bs4 import BeautifulSoup
import os
@ -19,35 +20,20 @@ import yaml
from typing import List, Dict, Optional, Set
from datetime import datetime as dt_datetime
from sijapi import L, PODCAST_DIR, DEFAULT_VOICE, EMAIL_CONFIG, EMAIL_LOGS
from sijapi.routers import tts, llm, sd, locate
from sijapi.routers import loc, tts, llm, sd
from sijapi.utilities import clean_text, assemble_journal_path, extract_text, prefix_lines
from sijapi.classes import EmailAccount, IMAPConfig, SMTPConfig, IncomingEmail, EmailContact, AutoResponder
from sijapi.classes import EmailAccount
email = APIRouter(tags=["private"])
def load_email_accounts(yaml_path: str) -> List[EmailAccount]:
with open(yaml_path, 'r') as file:
config = yaml.safe_load(file)
return [EmailAccount(**account) for account in config['accounts']]
def get_account_by_email(this_email: str) -> Optional[EmailAccount]:
email_accounts = load_email_accounts(EMAIL_CONFIG)
for account in email_accounts:
if account.imap.username.lower() == this_email.lower():
return account
return None
def get_imap_details(this_email: str) -> Optional[IMAPConfig]:
account = get_account_by_email(this_email)
return account.imap if account else None
def get_smtp_details(this_email: str) -> Optional[SMTPConfig]:
account = get_account_by_email(this_email)
return account.smtp if account else None
def get_imap_connection(account: EmailAccount):
return Imbox(account.imap.host,
username=account.imap.username,
@ -56,79 +42,17 @@ def get_imap_connection(account: EmailAccount):
ssl=account.imap.encryption == 'SSL',
starttls=account.imap.encryption == 'STARTTLS')
def get_smtp_connection(account: EmailAccount):
def get_smtp_connection(autoresponder: AutoResponder):
context = ssl._create_unverified_context()
if account.smtp.encryption == 'SSL':
return SMTP_SSL(account.smtp.host, account.smtp.port, context=context)
elif account.smtp.encryption == 'STARTTLS':
smtp = SMTP(account.smtp.host, account.smtp.port)
if autoresponder.smtp.encryption == 'SSL':
return SMTP_SSL(autoresponder.smtp.host, autoresponder.smtp.port, context=context)
elif autoresponder.smtp.encryption == 'STARTTLS':
smtp = SMTP(autoresponder.smtp.host, autoresponder.smtp.port)
smtp.starttls(context=context)
return smtp
else:
return SMTP(account.smtp.host, account.smtp.port)
def get_matching_autoresponders(this_email: IncomingEmail, account: EmailAccount) -> List[AutoResponder]:
L.DEBUG(f"Called get_matching_autoresponders for email \"{this_email.subject},\" account name \"{account.name}\"")
def matches_list(item: str, this_email: IncomingEmail) -> bool:
if '@' in item:
return item in this_email.sender
else:
return item.lower() in this_email.subject.lower() or item.lower() in this_email.body.lower()
matching_profiles = []
for profile in account.autoresponders:
whitelist_match = not profile.whitelist or any(matches_list(item, this_email) for item in profile.whitelist)
blacklist_match = any(matches_list(item, this_email) for item in profile.blacklist)
if whitelist_match and not blacklist_match:
L.DEBUG(f"We have a match for {whitelist_match} and no blacklist matches.")
matching_profiles.append(profile)
elif whitelist_match and blacklist_match:
L.DEBUG(f"Matched whitelist for {whitelist_match}, but also matched blacklist for {blacklist_match}")
else:
L.DEBUG(f"No whitelist or blacklist matches.")
return matching_profiles
async def generate_auto_response_body(this_email: IncomingEmail, profile: AutoResponder, account: EmailAccount) -> str:
now = await locate.localize_datetime(dt_datetime.now())
then = await locate.localize_datetime(this_email.datetime_received)
age = now - then
usr_prompt = f'''
Generate a personalized auto-response to the following email:
From: {this_email.sender}
Sent: {age} ago
Subject: "{this_email.subject}"
Body:
{this_email.body}
Respond on behalf of {account.fullname}, who is unable to respond personally because {profile.context}.
Keep the response {profile.style} and to the point, but responsive to the sender's inquiry.
Do not mention or recite this context information in your response.
'''
sys_prompt = f"You are an AI assistant helping {account.fullname} with email responses. {account.fullname} is described as: {account.bio}"
try:
# async def query_ollama(usr: str, sys: str = LLM_SYS_MSG, model: str = DEFAULT_LLM, max_tokens: int = 200):
response = await llm.query_ollama(usr_prompt, sys_prompt, profile.ollama_model, 400)
L.DEBUG(f"query_ollama response: {response}")
if isinstance(response, str):
response += "\n\n"
return response
elif isinstance(response, dict):
if "message" in response and "content" in response["message"]:
return response["message"]["content"]
else:
L.ERR(f"Unexpected response structure from query_ollama: {response}")
else:
L.ERR(f"Unexpected response type from query_ollama: {type(response)}")
# If we reach here, we couldn't extract a valid response
raise ValueError("Could not extract valid response from query_ollama")
except Exception as e:
L.ERR(f"Error generating auto-response: {str(e)}")
return f"Thank you for your email regarding '{this_email.subject}'. We are currently experiencing technical difficulties with our auto-response system. We will review your email and respond as soon as possible. We apologize for any inconvenience."
return SMTP(autoresponder.smtp.host, autoresponder.smtp.port)
def clean_email_content(html_content):
@ -156,119 +80,7 @@ async def extract_attachments(attachments) -> List[str]:
return attachment_texts
async def save_email(this_email: IncomingEmail, account: EmailAccount):
try:
md_path, md_relative = assemble_journal_path(this_email.datetime_received, "Emails", this_email.subject, ".md")
tts_path, tts_relative = assemble_journal_path(this_email.datetime_received, "Emails", this_email.subject, ".wav")
summary = ""
if account.summarize == True:
email_content = f'At {this_email.datetime_received}, {this_email.sender} sent an email with the subject line "{this_email.subject}". The email in its entirety reads: \n\n{this_email.body}\n"'
if this_email.attachments:
attachment_texts = await extract_attachments(this_email.attachments)
email_content += "\n—--\n" + "\n—--\n".join([f"Attachment: {text}" for text in attachment_texts])
summary = await llm.summarize_text(email_content)
await tts.local_tts(text_content = summary, speed = 1.1, voice = DEFAULT_VOICE, podcast = account.podcast, output_path = tts_path)
summary = prefix_lines(summary, '> ')
# Create the markdown content
markdown_content = f'''---
date: {this_email.datetime_received.strftime('%Y-%m-%d')}
tags:
- email
---
| | | |
| --: | :--: | :--: |
| *received* | **{this_email.datetime_received.strftime('%B %d, %Y at %H:%M:%S %Z')}** | |
| *from* | **[[{this_email.sender}]]** | |
| *to* | {', '.join([f'**[[{recipient.email}]]**' if not recipient.name else f'**[[{recipient.name}|{recipient.email}]]**' for recipient in this_email.recipients])} | |
| *subject* | **{this_email.subject}** | |
'''
if summary:
markdown_content += f'''
> [!summary] Summary
> {summary}
'''
if tts_path.exists():
markdown_content += f'''
![[{tts_path}]]
'''
markdown_content += f'''
---
{this_email.body}
'''
with open(md_path, 'w', encoding='utf-8') as md_file:
md_file.write(markdown_content)
L.DEBUG(f"Saved markdown to {md_path}")
return True
except Exception as e:
L.ERR(f"Exception: {e}")
return False
async def autorespond(this_email: IncomingEmail, account: EmailAccount):
L.DEBUG(f"Evaluating {this_email.subject} for autoresponse-worthiness...")
matching_profiles = get_matching_autoresponders(this_email, account)
L.DEBUG(f"Matching profiles: {matching_profiles}")
for profile in matching_profiles:
L.INFO(f"Generating auto-response to {this_email.subject} with profile: {profile.name}")
auto_response_subject = f"Auto-Response Re: {this_email.subject}"
auto_response_body = await generate_auto_response_body(this_email, profile, account)
L.DEBUG(f"Auto-response: {auto_response_body}")
success = await send_auto_response(this_email.sender, auto_response_subject, auto_response_body, profile, account)
if success == True:
return True
L.WARN(f"We were not able to successfully auto-respond to {this_email.subject}")
return False
async def send_auto_response(to_email, subject, body, profile, account):
try:
message = MIMEMultipart()
message['From'] = account.smtp.username
message['To'] = to_email
message['Subject'] = subject
message.attach(MIMEText(body, 'plain'))
if profile.image_prompt:
jpg_path = await sd.workflow(profile.image_prompt, earlyout=False, downscale_to_fit=True)
if jpg_path and os.path.exists(jpg_path):
with open(jpg_path, 'rb') as img_file:
img = MIMEImage(img_file.read(), name=os.path.basename(jpg_path))
message.attach(img)
L.DEBUG(f"Sending auto-response {to_email} concerning {subject} from account {account.name}...")
with get_smtp_connection(account) as server:
server.login(account.smtp.username, account.smtp.password)
server.send_message(message)
L.INFO(f"Auto-response sent to {to_email} concerning {subject} from account {account.name}!")
return True
except Exception as e:
L.ERR(f"Error in preparing/sending auto-response from account {account.name}: {e}")
return False
async def load_processed_uids(filename: Path) -> Set[str]:
if filename.exists():
with open(filename, 'r') as f:
return set(line.strip().split(':')[-1] for line in f)
return set()
async def save_processed_uid(filename: Path, account_name: str, uid: str):
with open(filename, 'a') as f:
f.write(f"{account_name}:{uid}\n")
async def process_account_summarization(account: EmailAccount):
async def process_account_archival(account: EmailAccount):
summarized_log = EMAIL_LOGS / account.name / "summarized.txt"
os.makedirs(summarized_log.parent, exist_ok = True)
@ -283,7 +95,7 @@ async def process_account_summarization(account: EmailAccount):
uid_str = uid.decode() if isinstance(uid, bytes) else str(uid)
if uid_str not in processed_uids:
recipients = [EmailContact(email=recipient['email'], name=recipient.get('name', '')) for recipient in message.sent_to]
localized_datetime = await locate.localize_datetime(message.date)
localized_datetime = await loc.dt(message.date)
this_email = IncomingEmail(
sender=message.sent_from[0]['email'],
datetime_received=localized_datetime,
@ -292,15 +104,15 @@ async def process_account_summarization(account: EmailAccount):
body=clean_email_content(message.body['html'][0]) if message.body['html'] else clean_email_content(message.body['plain'][0]) or "",
attachments=message.attachments
)
if account.summarize:
save_success = await save_email(this_email, account)
if save_success:
await save_processed_uid(summarized_log, account.name, uid_str)
L.INFO(f"Summarized email: {uid_str}")
else:
L.WARN(f"Failed to summarize {this_email.subject}")
md_path, md_relative = assemble_journal_path(this_email.datetime_received, "Emails", this_email.subject, ".md")
md_summary = await summarize_single_email(this_email, account.podcast) if account.summarize == True else None
md_content = await archive_single_email(this_email, md_summary)
save_success = await save_email(md_path, md_content)
if save_success:
await save_processed_uid(summarized_log, account.name, uid_str)
L.INFO(f"Summarized email: {uid_str}")
else:
L.INFO(f"account.summarize shows as false.")
L.WARN(f"Failed to summarize {this_email.subject}")
else:
L.DEBUG(f"Skipping {uid_str} because it was already processed.")
except Exception as e:
@ -308,51 +120,216 @@ async def process_account_summarization(account: EmailAccount):
await asyncio.sleep(account.refresh)
async def summarize_single_email(this_email: IncomingEmail, podcast: bool = False):
tts_path, tts_relative = assemble_journal_path(this_email.datetime_received, "Emails", this_email.subject, ".wav")
summary = ""
email_content = f'At {this_email.datetime_received}, {this_email.sender} sent an email with the subject line "{this_email.subject}". The email in its entirety reads: \n\n{this_email.body}\n"'
if this_email.attachments:
attachment_texts = await extract_attachments(this_email.attachments)
email_content += "\n—--\n" + "\n—--\n".join([f"Attachment: {text}" for text in attachment_texts])
summary = await llm.summarize_text(email_content)
await tts.local_tts(text_content = summary, speed = 1.1, voice = DEFAULT_VOICE, podcast = podcast, output_path = tts_path)
md_summary = f'```ad.summary\n'
md_summary += f'title: {this_email.subject}\n'
md_summary += f'{summary}\n'
md_summary += f'```\n\n'
md_summary += f'![[{tts_path}]]\n' if tts_path.exists() else ''
return md_summary
async def archive_single_email(this_email: IncomingEmail, summary: str = None):
try:
markdown_content = f'''---
date: {this_email.datetime_received.strftime('%Y-%m-%d')}
tags:
- email
---
| | | |
| --: | :--: | :--: |
| *received* | **{this_email.datetime_received.strftime('%B %d, %Y at %H:%M:%S %Z')}** | |
| *from* | **[[{this_email.sender}]]** | |
| *to* | {', '.join([f'**[[{recipient.email}]]**' if not recipient.name else f'**[[{recipient.name}|{recipient.email}]]**' for recipient in this_email.recipients])} | |
| *subject* | **{this_email.subject}** | |
'''
if summary:
markdown_content += summary
markdown_content += f'''
---
{this_email.body}
'''
return markdown_content
except Exception as e:
L.ERR(f"Exception: {e}")
return False
async def save_email(md_path, md_content):
try:
with open(md_path, 'w', encoding='utf-8') as md_file:
md_file.write(md_content)
L.DEBUG(f"Saved markdown to {md_path}")
return True
except Exception as e:
L.ERR(f"Failed to save email: {e}")
return False
def get_matching_autoresponders(this_email: IncomingEmail, account: EmailAccount) -> List[AutoResponder]:
L.DEBUG(f"Called get_matching_autoresponders for email \"{this_email.subject},\" account name \"{account.name}\"")
def matches_list(item: str, this_email: IncomingEmail) -> bool:
if '@' in item:
return item in this_email.sender
else:
return item.lower() in this_email.subject.lower() or item.lower() in this_email.body.lower()
matching_profiles = []
for profile in account.autoresponders:
whitelist_match = not profile.whitelist or any(matches_list(item, this_email) for item in profile.whitelist)
blacklist_match = any(matches_list(item, this_email) for item in profile.blacklist)
if whitelist_match and not blacklist_match:
L.DEBUG(f"We have a match for {whitelist_match} and no blacklist matches.")
matching_profiles.append(profile)
elif whitelist_match and blacklist_match:
L.DEBUG(f"Matched whitelist for {whitelist_match}, but also matched blacklist for {blacklist_match}")
else:
L.DEBUG(f"No whitelist or blacklist matches.")
return matching_profiles
async def process_account_autoresponding(account: EmailAccount):
autoresponded_log = EMAIL_LOGS / account.name / "autoresponded.txt"
os.makedirs(autoresponded_log.parent, exist_ok = True)
EMAIL_AUTORESPONSE_LOG = EMAIL_LOGS / account.name / "autoresponded.txt"
os.makedirs(EMAIL_AUTORESPONSE_LOG.parent, exist_ok=True)
while True:
try:
processed_uids = await load_processed_uids(autoresponded_log)
processed_uids = await load_processed_uids(EMAIL_AUTORESPONSE_LOG)
L.DEBUG(f"{len(processed_uids)} emails marked as already responded to are being ignored.")
with get_imap_connection(account) as inbox:
unread_messages = inbox.messages(unread=True)
L.DEBUG(f"There are {len(unread_messages)} unread messages.")
for uid, message in unread_messages:
uid_str = uid.decode() if isinstance(uid, bytes) else str(uid)
if uid_str not in processed_uids:
recipients = [EmailContact(email=recipient['email'], name=recipient.get('name', '')) for recipient in message.sent_to]
localized_datetime = await locate.localize_datetime(message.date)
this_email = IncomingEmail(
sender=message.sent_from[0]['email'],
datetime_received=localized_datetime,
recipients=recipients,
subject=message.subject,
body=clean_email_content(message.body['html'][0]) if message.body['html'] else clean_email_content(message.body['plain'][0]) or "",
attachments=message.attachments
)
L.DEBUG(f"Attempting autoresponse on {this_email.subject}")
respond_success = await autorespond(this_email, account)
if respond_success:
await save_processed_uid(autoresponded_log, account.name, uid_str)
L.WARN(f"Auto-responded to email: {this_email.subject}")
else:
L.WARN(f"Failed auto-response to {this_email.subject}")
L.DEBUG(f"Skipping {uid_str} because it was already processed.")
await autorespond_single_email(message, uid_str, account, EMAIL_AUTORESPONSE_LOG)
else:
L.DEBUG(f"Skipping {uid_str} because it was already processed.")
except Exception as e:
L.ERR(f"An error occurred during auto-responding for account {account.name}: {e}")
await asyncio.sleep(account.refresh)
async def process_all_accounts():
async def autorespond_single_email(message, uid_str: str, account: EmailAccount, log_file: Path):
this_email = await create_incoming_email(message)
L.DEBUG(f"Evaluating {this_email.subject} for autoresponse-worthiness...")
matching_profiles = get_matching_autoresponders(this_email, account)
L.DEBUG(f"Matching profiles: {matching_profiles}")
for profile in matching_profiles:
response_body = await generate_response(this_email, profile, account)
if response_body:
subject = f"Re: {this_email.subject}"
jpg_path = await sd.workflow(profile.image_prompt, earlyout=False, downscale_to_fit=True) if profile.image_prompt else None
success = await send_response(this_email.sender, subject, response_body, profile, account, jpg_path)
if success:
L.WARN(f"Auto-responded to email: {this_email.subject}")
await save_processed_uid(log_file, account.name, uid_str)
else:
L.WARN(f"Failed to send auto-response to {this_email.subject}")
else:
L.WARN(f"Unable to generate auto-response for {this_email.subject}")
async def generate_response(this_email: IncomingEmail, profile: AutoResponder, account: EmailAccount) -> Optional[str]:
L.INFO(f"Generating auto-response to {this_email.subject} with profile: {profile.name}")
now = await loc.dt(dt_datetime.now())
then = await loc.dt(this_email.datetime_received)
age = now - then
usr_prompt = f'''
Generate a personalized auto-response to the following email:
From: {this_email.sender}
Sent: {age} ago
Subject: "{this_email.subject}"
Body: {this_email.body}
---
Respond on behalf of {account.fullname}, who is unable to respond personally because {profile.context}. Keep the response {profile.style} and to the point, but responsive to the sender's inquiry. Do not mention or recite this context information in your response.
'''
sys_prompt = f"You are an AI assistant helping {account.fullname} with email responses. {account.fullname} is described as: {account.bio}"
try:
response = await llm.query_ollama(usr_prompt, sys_prompt, profile.ollama_model, 400)
L.DEBUG(f"query_ollama response: {response}")
if isinstance(response, dict) and "message" in response and "content" in response["message"]:
response = response["message"]["content"]
return response + "\n\n"
except Exception as e:
L.ERR(f"Error generating auto-response: {str(e)}")
return None
async def send_response(to_email: str, subject: str, body: str, profile: AutoResponder, image_attachment: Path = None) -> bool:
try:
message = MIMEMultipart()
message['From'] = profile.smtp.username
message['To'] = to_email
message['Subject'] = subject
message.attach(MIMEText(body, 'plain'))
if image_attachment and os.path.exists(image_attachment):
with open(image_attachment, 'rb') as img_file:
img = MIMEImage(img_file.read(), name=os.path.basename(image_attachment))
message.attach(img)
L.DEBUG(f"Sending auto-response to {to_email} concerning {subject} from account {profile.name}...")
with get_smtp_connection(profile) as server:
server.login(profile.smtp.username, profile.smtp.password)
server.send_message(message)
L.INFO(f"Auto-response sent to {to_email} concerning {subject} from account {profile.name}!")
return True
except Exception as e:
L.ERR(f"Error in preparing/sending auto-response from account {profile.name}: {e}")
return False
async def create_incoming_email(message) -> IncomingEmail:
recipients = [EmailContact(email=recipient['email'], name=recipient.get('name', '')) for recipient in message.sent_to]
localized_datetime = await loc.dt(message.date)
return IncomingEmail(
sender=message.sent_from[0]['email'],
datetime_received=localized_datetime,
recipients=recipients,
subject=message.subject,
body=clean_email_content(message.body['html'][0]) if message.body['html'] else clean_email_content(message.body['plain'][0]) or "",
attachments=message.attachments
)
async def load_processed_uids(filename: Path) -> Set[str]:
if filename.exists():
async with aiofiles.open(filename, 'r') as f:
return set(line.strip().split(':')[-1] for line in await f.readlines())
return set()
async def save_processed_uid(filename: Path, account_name: str, uid: str):
async with aiofiles.open(filename, 'a') as f:
await f.write(f"{account_name}:{uid}\n")
async def process_all_accounts():
email_accounts = load_email_accounts(EMAIL_CONFIG)
summarization_tasks = [asyncio.create_task(process_account_summarization(account)) for account in email_accounts]
summarization_tasks = [asyncio.create_task(process_account_archival(account)) for account in email_accounts]
autoresponding_tasks = [asyncio.create_task(process_account_autoresponding(account)) for account in email_accounts]
await asyncio.gather(*summarization_tasks, *autoresponding_tasks)
@email.on_event("startup")
async def startup_event():
await asyncio.sleep(5)
asyncio.create_task(process_all_accounts())
asyncio.create_task(process_all_accounts())

View file

@ -80,14 +80,6 @@ async def generate_response(prompt: str):
)
return {"response": output['response']}
@llm.post("/llm/query")
async def llm_query_endpoint(
message: str = Form(...),
file: Optional(UploadFile) = Form(...)
):
return None
async def query_ollama(usr: str, sys: str = LLM_SYS_MSG, model: str = DEFAULT_LLM, max_tokens: int = 200):
messages = [{"role": "system", "content": sys},

File diff suppressed because it is too large Load diff

View file

@ -33,7 +33,7 @@ from sijapi import (
MAC_UN, MAC_PW, MAC_ID, TS_TAILNET, DATA_DIR, SD_IMAGE_DIR, PUBLIC_KEY, OBSIDIAN_VAULT_DIR
)
from sijapi.utilities import bool_convert, sanitize_filename, assemble_journal_path
from sijapi.routers import note, locate
from sijapi.routers import loc, note
serve = APIRouter(tags=["public"])
@ -69,7 +69,7 @@ def is_valid_date(date_str: str) -> bool:
@serve.get("/notes/{file_path:path}")
async def get_file_endpoint(file_path: str):
try:
date_time = await locate.localize_datetime(file_path);
date_time = await loc.dt(file_path);
absolute_path, local_path = assemble_journal_path(date_time, no_timestamp = True)
except ValueError as e:
L.DEBUG(f"Unable to parse {file_path} as a date, now trying to use it as a local path")

View file

@ -10,9 +10,9 @@ from typing import Dict
from datetime import datetime
from shapely.wkb import loads
from binascii import unhexlify
from sijapi import L, VISUALCROSSING_API_KEY, TZ, DB
from sijapi import L, VISUALCROSSING_API_KEY, TZ, DB, GEO
from sijapi.utilities import haversine
from sijapi.routers import locate
from sijapi.routers import loc
weather = APIRouter()
@ -20,19 +20,19 @@ weather = APIRouter()
async def get_weather(date_time: datetime, latitude: float, longitude: float):
# request_date_str = date_time.strftime("%Y-%m-%d")
L.DEBUG(f"Called get_weather with lat: {latitude}, lon: {longitude}, date_time: {date_time}")
L.WARN(f"Using {date_time.strftime('%Y-%m-%d %H:%M:%S')} as our datetime in get_weather.")
daily_weather_data = await get_weather_from_db(date_time, latitude, longitude)
fetch_new_data = True
if daily_weather_data:
try:
L.DEBUG(f"Daily weather data from db: {daily_weather_data}")
last_updated = str(daily_weather_data['DailyWeather'].get('last_updated'))
last_updated = await locate.localize_datetime(last_updated)
last_updated = await loc.dt(last_updated)
stored_loc_data = unhexlify(daily_weather_data['DailyWeather'].get('location'))
stored_loc = loads(stored_loc_data)
stored_lat = stored_loc.y
stored_lon = stored_loc.x
stored_ele = stored_loc.z
hourly_weather = daily_weather_data.get('HourlyWeather')
@ -53,6 +53,7 @@ async def get_weather(date_time: datetime, latitude: float, longitude: float):
if fetch_new_data:
L.DEBUG(f"We require new data!")
request_date_str = date_time.strftime("%Y-%m-%d")
L.WARN(f"Using {date_time.strftime('%Y-%m-%d')} as our datetime for fetching new data.")
url = f"https://weather.visualcrossing.com/VisualCrossingWebServices/rest/services/timeline/{latitude},{longitude}/{request_date_str}/{request_date_str}?unitGroup=us&key={VISUALCROSSING_API_KEY}"
try:
async with AsyncClient() as client:
@ -85,6 +86,7 @@ async def get_weather(date_time: datetime, latitude: float, longitude: float):
async def store_weather_to_db(date_time: datetime, weather_data: dict):
L.WARN(f"Using {date_time.strftime('%Y-%m-%d %H:%M:%S')} as our datetime in store_weather_to_db")
async with DB.get_connection() as conn:
try:
day_data = weather_data.get('days')[0]
@ -95,16 +97,21 @@ async def store_weather_to_db(date_time: datetime, weather_data: dict):
stations_array = day_data.get('stations', []) or []
date_str = date_time.strftime("%Y-%m-%d")
L.WARN(f"Using {date_str} in our query in store_weather_to_db.")
# Get location details from weather data if available
longitude = weather_data.get('longitude')
latitude = weather_data.get('latitude')
elevation = await locate.get_elevation(latitude, longitude) # 152.4 # default until we add a geocoder that can look up actual elevation; weather_data.get('elevation') # assuming 'elevation' key, replace if different
elevation = await GEO.elevation(latitude, longitude)
location_point = f"POINTZ({longitude} {latitude} {elevation})" if longitude and latitude and elevation else None
# Correct for the datetime objects
day_data['datetime'] = await locate.localize_datetime(day_data.get('datetime')) #day_data.get('datetime'))
L.WARN(f"Uncorrected datetime in store_weather_to_db: {day_data['datetime']}")
day_data['datetime'] = await loc.dt(day_data.get('datetime')) #day_data.get('datetime'))
L.WARN(f"Corrected datetime in store_weather_to_db with localized datetime: {day_data['datetime']}")
L.WARN(f"Uncorrected sunrise time in store_weather_to_db: {day_data['sunrise']}")
day_data['sunrise'] = day_data['datetime'].replace(hour=int(day_data.get('sunrise').split(':')[0]), minute=int(day_data.get('sunrise').split(':')[1]))
L.WARN(f"Corrected sunrise time in store_weather_to_db with localized datetime: {day_data['sunrise']}")
day_data['sunset'] = day_data['datetime'].replace(hour=int(day_data.get('sunset').split(':')[0]), minute=int(day_data.get('sunset').split(':')[1]))
daily_weather_params = (
@ -160,7 +167,7 @@ async def store_weather_to_db(date_time: datetime, weather_data: dict):
await asyncio.sleep(0.1)
# hour_data['datetime'] = parse_date(hour_data.get('datetime'))
hour_timestamp = date_str + ' ' + hour_data['datetime']
hour_data['datetime'] = await locate.localize_datetime(hour_timestamp)
hour_data['datetime'] = await loc.dt(hour_timestamp)
L.DEBUG(f"Processing hours now...")
# L.DEBUG(f"Processing {hour_data['datetime']}")
@ -226,6 +233,7 @@ async def store_weather_to_db(date_time: datetime, weather_data: dict):
async def get_weather_from_db(date_time: datetime, latitude: float, longitude: float):
L.WARN(f"Using {date_time.strftime('%Y-%m-%d %H:%M:%S')} as our datetime in get_weather_from_db.")
async with DB.get_connection() as conn:
query_date = date_time.date()
try:
@ -238,25 +246,38 @@ async def get_weather_from_db(date_time: datetime, latitude: float, longitude: f
LIMIT 1
'''
daily_weather_data = await conn.fetchrow(query, query_date, longitude, latitude, longitude, latitude)
daily_weather_record = await conn.fetchrow(query, query_date, longitude, latitude, longitude, latitude)
if daily_weather_data is None:
if daily_weather_record is None:
L.DEBUG(f"No daily weather data retrieved from database.")
return None
# else:
# L.DEBUG(f"Daily_weather_data: {daily_weather_data}")
# Convert asyncpg.Record to a mutable dictionary
daily_weather_data = dict(daily_weather_record)
# Now we can modify the dictionary
daily_weather_data['datetime'] = await loc.dt(daily_weather_data.get('datetime'))
# Query to get hourly weather data
query = '''
SELECT HW.* FROM HourlyWeather HW
WHERE HW.daily_weather_id = $1
'''
hourly_weather_data = await conn.fetch(query, daily_weather_data['id'])
hourly_weather_records = await conn.fetch(query, daily_weather_data['id'])
hourly_weather_data = []
for record in hourly_weather_records:
hour_data = dict(record)
hour_data['datetime'] = await loc.dt(hour_data.get('datetime'))
hourly_weather_data.append(hour_data)
day: Dict = {
'DailyWeather': dict(daily_weather_data),
'HourlyWeather': [dict(row) for row in hourly_weather_data],
day = {
'DailyWeather': daily_weather_data,
'HourlyWeather': hourly_weather_data,
}
# L.DEBUG(f"day: {day}")
L.DEBUG(f"day: {day}")
return day
except Exception as e:
L.ERR(f"Unexpected error occurred: {e}")