From dc8743841d5e9d6f183136371df8d45f487b7c8b Mon Sep 17 00:00:00 2001 From: sanj <67624670+iodrift@users.noreply.github.com> Date: Tue, 25 Jun 2024 16:59:10 -0700 Subject: [PATCH] Auto-update: Tue Jun 25 16:59:10 PDT 2024 --- sijapi/__init__.py | 33 +-- sijapi/classes.py | 182 ++++++++++++++++- sijapi/config/.env-example | 8 +- sijapi/config/email.yaml-example | 70 +++++++ sijapi/routers/asr.py | 1 - sijapi/routers/email.py | 297 ++++++++++++++------------- sijapi/routers/llm.py | 213 ++++++++++++++++++-- sijapi/routers/note.py | 38 ++-- sijapi/routers/serve.py | 5 +- sijapi/routers/tts.py | 48 ++++- sijapi/routers/weather.py | 331 +++++++++++++++---------------- sijapi/utilities.py | 125 +++--------- 12 files changed, 873 insertions(+), 478 deletions(-) create mode 100644 sijapi/config/email.yaml-example diff --git a/sijapi/__init__.py b/sijapi/__init__.py index 6013a75..254fba5 100644 --- a/sijapi/__init__.py +++ b/sijapi/__init__.py @@ -12,7 +12,7 @@ from typing import List, Optional import traceback import logging from .logs import Logger -from .classes import AutoResponder, IMAPConfig, SMTPConfig, EmailAccount, EmailContact, IncomingEmail +from .classes import AutoResponder, IMAPConfig, SMTPConfig, EmailAccount, EmailContact, IncomingEmail, TimezoneTracker, Database # from sijapi.config.config import load_config # cfg = load_config() @@ -43,6 +43,7 @@ os.makedirs(LOGS_DIR, exist_ok=True) load_dotenv(ENV_PATH) ### API essentials +DB = Database.from_env() ROUTERS = os.getenv('ROUTERS', '').split(',') PUBLIC_SERVICES = os.getenv('PUBLIC_SERVICES', '').split(',') GLOBAL_API_KEY = os.getenv("GLOBAL_API_KEY") @@ -68,29 +69,19 @@ os.makedirs(REQUESTS_DIR, exist_ok=True) REQUESTS_LOG_PATH = LOGS_DIR / "requests.log" -### Databases -DB = os.getenv("DB", 'sijdb') -DB_HOST = os.getenv("DB_HOST", "127.0.0.1") -DB_PORT = os.getenv("DB_PORT", 5432) -DB_USER = os.getenv("DB_USER", 'sij') -DB_PASS = os.getenv("DB_PASS") -DB_SSH = os.getenv("DB_SSH", "100.64.64.15") -DB_SSH_USER = os.getenv("DB_SSH_USER") -DB_SSH_PASS = os.getenv("DB_SSH_ENV") -DB_URL = f'postgresql://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB}' - ### LOCATE AND WEATHER LOCALIZATIONS USER_FULLNAME = os.getenv('USER_FULLNAME') USER_BIO = os.getenv('USER_BIO') -TZ = tz.gettz(os.getenv("TZ", "America/Los_Angeles")) HOME_ZIP = os.getenv("HOME_ZIP") # unimplemented -LOCATION_OVERRIDES = DATA_DIR / "loc_overrides.json" -LOCATIONS_CSV = DATA_DIR / "US.csv" +NAMED_LOCATIONS = CONFIG_DIR / "named-locations.yaml" # DB = DATA_DIR / "weatherlocate.db" # deprecated VISUALCROSSING_BASE_URL = os.getenv("VISUALCROSSING_BASE_URL", "https://weather.visualcrossing.com/VisualCrossingWebServices/rest/services/timeline") VISUALCROSSING_API_KEY = os.getenv("VISUALCROSSING_API_KEY") - +GEONAMES_TXT = DATA_DIR / "geonames.txt" +LOCATIONS_CSV = DATA_DIR / "US.csv" +TZ = tz.gettz(os.getenv("TZ", "America/Los_Angeles")) +DynamicTZ = TimezoneTracker(DB) ### Obsidian & notes ALLOWED_FILENAME_CHARS = r'[^\w \.-]' @@ -131,7 +122,7 @@ COMFYUI_URL = os.getenv('COMFYUI_URL', "http://localhost:8188") COMFYUI_DIR = Path(os.getenv('COMFYUI_DIR')) COMFYUI_OUTPUT_DIR = COMFYUI_DIR / 'output' COMFYUI_LAUNCH_CMD = os.getenv('COMFYUI_LAUNCH_CMD', 'mamba activate comfyui && python main.py') -SD_CONFIG_PATH = CONFIG_DIR / 'sd.json' +SD_CONFIG_PATH = CONFIG_DIR / 'sd.yaml' ### Summarization SUMMARY_CHUNK_SIZE = int(os.getenv("SUMMARY_CHUNK_SIZE", 4000)) # measured in tokens @@ -155,7 +146,7 @@ TTS_DIR = DATA_DIR / "tts" os.makedirs(TTS_DIR, exist_ok=True) VOICE_DIR = TTS_DIR / 'voices' os.makedirs(VOICE_DIR, exist_ok=True) -PODCAST_DIR = TTS_DIR / "sideloads" +PODCAST_DIR = os.getenv("PODCAST_DIR", TTS_DIR / "sideloads") os.makedirs(PODCAST_DIR, exist_ok=True) TTS_OUTPUT_DIR = TTS_DIR / 'outputs' os.makedirs(TTS_OUTPUT_DIR, exist_ok=True) @@ -169,13 +160,7 @@ ICAL_TOGGLE = True if os.getenv("ICAL_TOGGLE") == "True" else False ICS_PATH = DATA_DIR / 'calendar.ics' # deprecated now, but maybe revive? ICALENDARS = os.getenv('ICALENDARS', 'NULL,VOID').split(',') -def load_email_accounts(yaml_path: str) -> List[EmailAccount]: - with open(yaml_path, 'r') as file: - config = yaml.safe_load(file) - return [EmailAccount(**account) for account in config['accounts']] - EMAIL_CONFIG = CONFIG_DIR / "email.yaml" -EMAIL_ACCOUNTS = load_email_accounts(EMAIL_CONFIG) AUTORESPOND = True ### Courtlistener & other webhooks diff --git a/sijapi/classes.py b/sijapi/classes.py index 559bdb1..809c8f8 100644 --- a/sijapi/classes.py +++ b/sijapi/classes.py @@ -1,6 +1,65 @@ from pydantic import BaseModel -from typing import List, Optional, Any -from datetime import datetime +from typing import List, Optional, Any, Tuple, Dict, Union, Tuple +from datetime import datetime, timedelta +import asyncio +import asyncpg +import json +from pydantic import BaseModel, Field +from typing import Optional +import asyncpg +import os + +from pydantic import BaseModel, Field +from typing import Optional + +from pydantic import BaseModel, Field +from typing import Optional +import asyncpg + +from pydantic import BaseModel, Field +from typing import Optional +import asyncpg +from contextlib import asynccontextmanager + +class Database(BaseModel): + host: str = Field(..., description="Database host") + port: int = Field(5432, description="Database port") + user: str = Field(..., description="Database user") + password: str = Field(..., description="Database password") + database: str = Field(..., description="Database name") + db_schema: Optional[str] = Field(None, description="Database schema") + + @asynccontextmanager + async def get_connection(self): + conn = await asyncpg.connect( + host=self.host, + port=self.port, + user=self.user, + password=self.password, + database=self.database + ) + try: + if self.db_schema: + await conn.execute(f"SET search_path TO {self.db_schema}") + yield conn + finally: + await conn.close() + + @classmethod + def from_env(cls): + import os + return cls( + host=os.getenv("DB_HOST", "localhost"), + port=int(os.getenv("DB_PORT", 5432)), + user=os.getenv("DB_USER"), + password=os.getenv("DB_PASSWORD"), + database=os.getenv("DB_NAME"), + db_schema=os.getenv("DB_SCHEMA") + ) + + def to_dict(self): + return self.dict(exclude_none=True) + class AutoResponder(BaseModel): name: str @@ -8,7 +67,7 @@ class AutoResponder(BaseModel): context: str whitelist: List[str] blacklist: List[str] - img_gen_prompt: Optional[str] = None + image_prompt: Optional[str] = None class IMAPConfig(BaseModel): username: str @@ -26,20 +85,131 @@ class SMTPConfig(BaseModel): class EmailAccount(BaseModel): name: str + refresh: int fullname: Optional[str] bio: Optional[str] + summarize: bool = False + podcast: bool = False imap: IMAPConfig smtp: SMTPConfig autoresponders: Optional[List[AutoResponder]] class EmailContact(BaseModel): email: str - name: str + name: Optional[str] = None class IncomingEmail(BaseModel): sender: str - recipients: List[EmailContact] datetime_received: datetime + recipients: List[EmailContact] subject: str body: str - attachments: Optional[List[Any]] = None \ No newline at end of file + attachments: List[dict] = [] + + + + + +class Location(BaseModel): + latitude: float + longitude: float + datetime: datetime + elevation: Optional[float] = None + altitude: Optional[float] = None + zip: Optional[str] = None + street: Optional[str] = None + city: Optional[str] = None + state: Optional[str] = None + country: Optional[str] = None + context: Optional[Dict[str, Any]] = None + class_: Optional[str] = None + type: Optional[str] = None + name: Optional[str] = None + display_name: Optional[str] = None + boundingbox: Optional[List[str]] = None + amenity: Optional[str] = None + house_number: Optional[str] = None + road: Optional[str] = None + quarter: Optional[str] = None + neighbourhood: Optional[str] = None + suburb: Optional[str] = None + county: Optional[str] = None + country_code: Optional[str] = None + + class Config: + json_encoders = { + datetime: lambda dt: dt.isoformat(), + } + + +class TimezoneTracker: + def __init__(self, db_config: Database, cache_file: str = 'timezone_cache.json'): + self.db_config = db_config + self.cache_file = cache_file + self.last_timezone: str = "America/Los_Angeles" + self.last_update: Optional[datetime] = None + self.last_location: Optional[Tuple[float, float]] = None + + async def find(self, lat: float, lon: float) -> str: + query = """ + SELECT tzid + FROM timezones + WHERE ST_Contains(geom, ST_SetSRID(ST_MakePoint($1, $2), 4326)) + LIMIT 1; + """ + async with await self.db_config.get_connection() as conn: + result = await conn.fetchrow(query, lon, lat) + return result['tzid'] if result else 'Unknown' + + async def refresh(self, location: Union[Location, Tuple[float, float]], force: bool = False) -> str: + if isinstance(location, Location): + lat, lon = location.latitude, location.longitude + else: + lat, lon = location + + current_time = datetime.now() + + if (force or + not self.last_update or + current_time - self.last_update > timedelta(hours=1) or + self.last_location != (lat, lon)): + + new_timezone = await self.find(lat, lon) + + self.last_timezone = new_timezone + self.last_update = current_time + self.last_location = (lat, lon) + + await self.save_to_cache() + + return new_timezone + + return self.last_timezone + + async def save_to_cache(self): + cache_data = { + 'last_timezone': self.last_timezone, + 'last_update': self.last_update.isoformat() if self.last_update else None, + 'last_location': self.last_location + } + with open(self.cache_file, 'w') as f: + json.dump(cache_data, f) + + async def load_from_cache(self): + try: + with open(self.cache_file, 'r') as f: + cache_data = json.load(f) + self.last_timezone = cache_data.get('last_timezone') + self.last_update = datetime.fromisoformat(cache_data['last_update']) if cache_data.get('last_update') else None + self.last_location = tuple(cache_data['last_location']) if cache_data.get('last_location') else None + except (FileNotFoundError, json.JSONDecodeError): + # If file doesn't exist or is invalid, we'll start fresh + pass + + async def get_current(self, location: Union[Location, Tuple[float, float]]) -> str: + await self.load_from_cache() + return await self.refresh(location) + + async def get_last(self) -> Optional[str]: + await self.load_from_cache() + return self.last_timezone diff --git a/sijapi/config/.env-example b/sijapi/config/.env-example index 0c31cb7..e7f7986 100644 --- a/sijapi/config/.env-example +++ b/sijapi/config/.env-example @@ -96,7 +96,7 @@ TRUSTED_SUBNETS=127.0.0.1/32,10.13.37.0/24,100.64.64.0/24 # ────────── # #─── router selection: ──────────────────────────────────────────────────────────── -ROUTERS=asr,calendar,cf,email,health,hooks,llm,locate,note,rag,sd,serve,summarize,time,tts,weather +ROUTERS=asr,calendar,cf,email,health,hooks,llm,locate,note,rag,sd,serve,time,tts,weather UNLOADED=ig #─── notes: ────────────────────────────────────────────────────────────────────── # @@ -218,18 +218,18 @@ TAILSCALE_API_KEY=¿SECRET? # <--- enter your own TS API key # ░░ ░ ░T̷ O̷ G̷ E̷ T̷ H̷ ░ R̷. ░ ░ ░ ░ ░ # J U S T ░ #─── frag, or weat,and locate modules:── H O L D M Y H A N D. -DB=db +DB_NAME=db # DB_HOST=127.0.0.1 DB_PORT=5432 # R E A L T I G H T. DB_USER=postgres -DB_PASS=¿SECRET? # <--- enter your own Postgres password' +DB_PASSWORD=¿SECRET? # <--- enter your own Postgres password' # Y E A H . . . DB_SSH=100.64.64.15 # . . . 𝙹 𝚄 𝚂 𝚃 𝙻 𝙸 𝙺 𝙴 𝚃 𝙷 𝙰 𝚃. DB_SSH_USER=sij -DB_SSH_PASS=¿SECRET? # <--- enter SSH password for pg server (if not localhost) +DB_SSH_PASS=¿SECRET? # <--- enter SSH password for pg server (if not localhost) #─── notes: ────────────────────────────────────────────────── S E E ? 𝕰 𝖅 - 𝕻 𝖅 # # DB, DB_HOST, DB_PORT, DB_USER, and DB_PASS should specify those respective diff --git a/sijapi/config/email.yaml-example b/sijapi/config/email.yaml-example new file mode 100644 index 0000000..c4ad15f --- /dev/null +++ b/sijapi/config/email.yaml-example @@ -0,0 +1,70 @@ +accounts: + - name: REDACT@email.com + fullname: Your full name + bio: 'an ai enthusiast' + imap: + username: REDACT@email.com + password: REDACT + host: '127.0.0.1' + port: 1142 + encryption: STARTTLS + smtp: + username: REDACT@email.com + password: REDACT + host: '127.0.0.1' + port: 1024 + encryption: SSL + autoresponders: + - name: work + style: professional + context: he is currently on leave and will return in late July + whitelist: + - '@work.org' + blacklist: + - 'spam@' + - unsubscribe + - 'no-reply@' + - name: ai + style: cryptic + context: respond to any inquiries with cryptic and vaguely menacing riddles, esoteric assertions, or obscure references. + image_prompt: using visually evocative words, phrases, and sentence fragments, describe an image inspired by the following prompt + whitelist: + - 'colleagues@work.org' + - 'jimbo@' + - 'internal work email:' + blacklist: + - personal + - private + - noneofyerdamnbusiness + - unsubscribe + - 'no-reply@' + - name: otherREDACT@email.com + fullname: sij.ai + bio: an AI bot that responds in riddles. + imap: + username: otherREDACT@email.com + password: REDACT + host: '127.0.0.1' + port: 1142 + encryption: STARTTLS + smtp: + username: otherREDACT@email.com + password: REDACT + host: '127.0.0.1' + port: 1024 + encryption: SSL + autoresponders: + - name: ai + style: cryptic + context: respond to any inquiries with cryptic and vaguely menacing riddles, esoteric assertions, or obscure references. + image_prompt: using visually evocative words, phrases, and sentence fragments, describe an image inspired by the following prompt + whitelist: + - 'bestfriend@gmail.com' + - 'eximstalking@' + - uniquephraseinsubjectorbody + - 'internal work email:' + blacklist: + - work + - '@work.org' + - unsubscribe + - 'no-reply@' diff --git a/sijapi/routers/asr.py b/sijapi/routers/asr.py index bb82eec..f7f157d 100644 --- a/sijapi/routers/asr.py +++ b/sijapi/routers/asr.py @@ -23,7 +23,6 @@ import multiprocessing import asyncio import subprocess import tempfile - from sijapi import DEBUG, INFO, WARN, ERR, CRITICAL, ASR_DIR, WHISPER_CPP_MODELS, GARBAGE_COLLECTION_INTERVAL, GARBAGE_TTL, WHISPER_CPP_DIR, MAX_CPU_CORES diff --git a/sijapi/routers/email.py b/sijapi/routers/email.py index d35e0ca..23c8ba1 100644 --- a/sijapi/routers/email.py +++ b/sijapi/routers/email.py @@ -10,29 +10,38 @@ from pathlib import Path from shutil import move import tempfile import re -import ssl -from smtplib import SMTP_SSL +from smtplib import SMTP_SSL, SMTP from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from email.mime.image import MIMEImage +import ssl from datetime import datetime as dt_datetime from pydantic import BaseModel from typing import List, Optional, Any import yaml from typing import List, Dict, Optional from pydantic import BaseModel - +from sijapi import DEBUG, ERR, LLM_SYS_MSG +from datetime import datetime as dt_datetime +from typing import Dict from sijapi import DEBUG, INFO, WARN, ERR, CRITICAL -from sijapi import PODCAST_DIR, DEFAULT_VOICE, TZ, EMAIL_ACCOUNTS, EmailAccount, IMAPConfig, SMTPConfig -from sijapi.routers import summarize, tts, llm, sd -from sijapi.utilities import clean_text, assemble_journal_path, localize_datetime, extract_text, prefix_lines -from sijapi.classes import EmailAccount, IncomingEmail, EmailContact +from sijapi import PODCAST_DIR, DEFAULT_VOICE, EMAIL_CONFIG +from sijapi.routers import tts, llm, sd, locate +from sijapi.utilities import clean_text, assemble_journal_path, extract_text, prefix_lines +from sijapi.classes import EmailAccount, IMAPConfig, SMTPConfig, IncomingEmail, EmailContact email = APIRouter(tags=["private"]) +def load_email_accounts(yaml_path: str) -> List[EmailAccount]: + with open(yaml_path, 'r') as file: + config = yaml.safe_load(file) + return [EmailAccount(**account) for account in config['accounts']] + + def get_account_by_email(email: str) -> Optional[EmailAccount]: - for account in EMAIL_ACCOUNTS: + email_accounts = load_email_accounts(EMAIL_CONFIG) + for account in email_accounts: if account.imap.username.lower() == email.lower(): return account return None @@ -54,6 +63,18 @@ def get_imap_connection(account: EmailAccount): ssl=account.imap.encryption == 'SSL', starttls=account.imap.encryption == 'STARTTLS') +def get_smtp_connection(account: EmailAccount): + context = ssl._create_unverified_context() + + if account.smtp.encryption == 'SSL': + return SMTP_SSL(account.smtp.host, account.smtp.port, context=context) + elif account.smtp.encryption == 'STARTTLS': + smtp = SMTP(account.smtp.host, account.smtp.port) + smtp.starttls(context=context) + return smtp + else: + return SMTP(account.smtp.host, account.smtp.port) + def get_matching_autoresponders(email: IncomingEmail, account: EmailAccount) -> List[Dict]: matching_profiles = [] @@ -72,7 +93,7 @@ def get_matching_autoresponders(email: IncomingEmail, account: EmailAccount) -> 'USER_FULLNAME': account.fullname, 'RESPONSE_STYLE': profile.style, 'AUTORESPONSE_CONTEXT': profile.context, - 'IMG_GEN_PROMPT': profile.img_gen_prompt, + 'IMG_GEN_PROMPT': profile.image_prompt, 'USER_BIO': account.bio }) @@ -80,21 +101,44 @@ def get_matching_autoresponders(email: IncomingEmail, account: EmailAccount) -> async def generate_auto_response_body(e: IncomingEmail, profile: Dict) -> str: - age = dt_datetime.now(TZ) - e.datetime_received - prompt = f''' -Please generate a personalized auto-response to the following email. The email is from {e.sender} and was sent {age} ago with the subject line "{e.subject}." You are auto-responding on behalf of {profile['USER_FULLNAME']}, who is described by the following short bio (strictly for your context -- do not recite this in the response): "{profile['USER_BIO']}." {profile['USER_FULLNAME']} is unable to respond personally, because {profile['AUTORESPONSE_CONTEXT']}. Everything from here to ~~//END//~~ is the email body. + now = await locate.localize_datetime(dt_datetime.now()) + then = await locate.localize_datetime(e.datetime_received) + age = now - then + usr_prompt = f''' +Generate a personalized auto-response to the following email: +From: {e.sender} +Sent: {age} ago +Subject: "{e.subject}" +Body: {e.body} -~~//END//~~ -Keep your auto-response {profile['RESPONSE_STYLE']} and to the point, but do aim to make it responsive specifically to the sender's inquiry. - ''' + +Respond on behalf of {profile['USER_FULLNAME']}, who is unable to respond personally because {profile['AUTORESPONSE_CONTEXT']}. +Keep the response {profile['RESPONSE_STYLE']} and to the point, but responsive to the sender's inquiry. +Do not mention or recite this context information in your response. +''' + + sys_prompt = f"You are an AI assistant helping {profile['USER_FULLNAME']} with email responses. {profile['USER_FULLNAME']} is described as: {profile['USER_BIO']}" try: - response = await llm.query_ollama(prompt, 400) - return response + response = await llm.query_ollama(usr_prompt, sys_prompt, 400) + DEBUG(f"query_ollama response: {response}") + + if isinstance(response, str): + return response + elif isinstance(response, dict): + if "message" in response and "content" in response["message"]: + return response["message"]["content"] + else: + ERR(f"Unexpected response structure from query_ollama: {response}") + else: + ERR(f"Unexpected response type from query_ollama: {type(response)}") + + # If we reach here, we couldn't extract a valid response + raise ValueError("Could not extract valid response from query_ollama") + except Exception as e: ERR(f"Error generating auto-response: {str(e)}") - return "Thank you for your email. Unfortunately, an error occurred while generating the auto-response. We apologize for any inconvenience." - + return f"Thank you for your email regarding '{e.subject}'. We are currently experiencing technical difficulties with our auto-response system. We will review your email and respond as soon as possible. We apologize for any inconvenience." def clean_email_content(html_content): @@ -123,115 +167,113 @@ async def extract_attachments(attachments) -> List[str]: return attachment_texts -async def process_unread_emails(summarize_emails: bool = True, podcast: bool = True): + +async def process_account(account: EmailAccount): while True: - for account in EMAIL_ACCOUNTS: + start_time = dt_datetime.now() + try: DEBUG(f"Connecting to {account.name} to check for unread emails...") - try: - with get_imap_connection(account) as inbox: - DEBUG(f"Connected to {account.name}, checking for unread emails now...") - unread_messages = inbox.messages(unread=True) - for uid, message in unread_messages: - recipients = [EmailContact(email=recipient['email'], name=recipient.get('name', '')) for recipient in message.sent_to] - this_email = IncomingEmail( - sender=message.sent_from[0]['email'], - datetime_received=localize_datetime(message.date), - recipients=recipients, - subject=message.subject, - body=clean_email_content(message.body['html'][0]) if message.body['html'] else clean_email_content(message.body['plain'][0]) or "", - attachments=message.attachments - ) - - DEBUG(f"\n\nProcessing email for account {account.name}: {this_email.subject}\n\n") - - md_path, md_relative = assemble_journal_path(this_email.datetime_received, "Emails", this_email.subject, ".md") - tts_path, tts_relative = assemble_journal_path(this_email.datetime_received, "Emails", this_email.subject, ".wav") - if summarize_emails: - email_content = f'At {this_email.datetime_received}, {this_email.sender} sent an email with the subject line "{this_email.subject}". The email in its entirety reads: \n\n{this_email.body}\n"' - if this_email.attachments: - attachment_texts = await extract_attachments(this_email.attachments) - email_content += "\n—--\n" + "\n—--\n".join([f"Attachment: {text}" for text in attachment_texts]) - - summary = await summarize.summarize_text(email_content) - await tts.local_tts(text_content = summary, speed = 1.1, voice = DEFAULT_VOICE, podcast = podcast, output_path = tts_path) - - if podcast: - if PODCAST_DIR.exists(): - tts.copy_to_podcast_dir(tts_path) - else: - ERR(f"PODCAST_DIR does not exist: {PODCAST_DIR}") - - save_email_as_markdown(this_email, summary, md_path, tts_relative) - DEBUG(f"Email '{this_email.subject}' saved to {md_relative}.") - else: - save_email_as_markdown(this_email, None, md_path, None) - - matching_profiles = get_matching_autoresponders(this_email, account) - - for profile in matching_profiles: - DEBUG(f"Auto-responding to {this_email.subject} with profile: {profile['USER_FULLNAME']}") - auto_response_subject = f"Auto-Response Re: {this_email.subject}" - auto_response_body = await generate_auto_response_body(this_email, profile) - DEBUG(f"Auto-response: {auto_response_body}") - await send_auto_response(this_email.sender, auto_response_subject, auto_response_body, profile, account) - + with get_imap_connection(account) as inbox: + DEBUG(f"Connected to {account.name}, checking for unread emails now...") + unread_messages = inbox.messages(unread=True) + for uid, message in unread_messages: + recipients = [EmailContact(email=recipient['email'], name=recipient.get('name', '')) for recipient in message.sent_to] + localized_datetime = await locate.localize_datetime(message.date) + this_email = IncomingEmail( + sender=message.sent_from[0]['email'], + datetime_received=localized_datetime, + recipients=recipients, + subject=message.subject, + body=clean_email_content(message.body['html'][0]) if message.body['html'] else clean_email_content(message.body['plain'][0]) or "", + attachments=message.attachments + ) + DEBUG(f"\n\nProcessing email for account {account.name}: {this_email.subject}\n\n") + save_success = await save_email(this_email, account) + respond_success = await autorespond(this_email, account) + if save_success and respond_success: inbox.mark_seen(uid) - - await asyncio.sleep(30) - except Exception as e: - ERR(f"An error occurred for account {account.name}: {e}") - await asyncio.sleep(30) + except Exception as e: + ERR(f"An error occurred for account {account.name}: {e}") + + # Calculate the time taken for processing + processing_time = (dt_datetime.now() - start_time).total_seconds() + + # Calculate the remaining time to wait + wait_time = max(0, account.refresh - processing_time) + + # Wait for the remaining time + await asyncio.sleep(wait_time) +async def process_all_accounts(): + email_accounts = load_email_accounts(EMAIL_CONFIG) + tasks = [asyncio.create_task(process_account(account)) for account in email_accounts] + await asyncio.gather(*tasks) -def save_email_as_markdown(email: IncomingEmail, summary: str, md_path: Path, tts_path: Path): + +async def save_email(this_email: IncomingEmail, account: EmailAccount): + try: + md_path, md_relative = assemble_journal_path(this_email.datetime_received, "Emails", this_email.subject, ".md") + tts_path, tts_relative = assemble_journal_path(this_email.datetime_received, "Emails", this_email.subject, ".wav") + summary = "" + if account.summarize == True: + email_content = f'At {this_email.datetime_received}, {this_email.sender} sent an email with the subject line "{this_email.subject}". The email in its entirety reads: \n\n{this_email.body}\n"' + if this_email.attachments: + attachment_texts = await extract_attachments(this_email.attachments) + email_content += "\n—--\n" + "\n—--\n".join([f"Attachment: {text}" for text in attachment_texts]) + summary = await llm.summarize_text(email_content) + await tts.local_tts(text_content = summary, speed = 1.1, voice = DEFAULT_VOICE, podcast = account.podcast, output_path = tts_path) + summary = prefix_lines(summary, '> ') + + # Create the markdown content + markdown_content = f'''--- + date: {email.datetime_received.strftime('%Y-%m-%d')} + tags: + - email + --- + | | | | + | --: | :--: | :--: | + | *received* | **{email.datetime_received.strftime('%B %d, %Y at %H:%M:%S %Z')}** | | + | *from* | **[[{email.sender}]]** | | + | *to* | {', '.join([f'**[[{recipient}]]**' for recipient in email.recipients])} | | + | *subject* | **{email.subject}** | | ''' -Saves an email as a markdown file in the specified directory. -Args: - email (IncomingEmail): The email object containing email details. - summary (str): The summary of the email. - tts_path (str): The path to the text-to-speech audio file. + + if summary: + markdown_content += f''' + > [!summary] Summary + > {summary} ''' - DEBUG(f"Saving email to {md_path}...") - # Sanitize filename to avoid issues with filesystems - filename = f"{email.datetime_received.strftime('%Y%m%d%H%M%S')}_{email.subject.replace('/', '-')}.md".replace(':', '-').replace(' ', '_') + + if tts_path.exists(): + markdown_content += f''' + ![[{tts_path}]] + ''' + + markdown_content += f''' + --- + {email.body} + ''' + + with open(md_path, 'w', encoding='utf-8') as md_file: + md_file.write(markdown_content) - summary = prefix_lines(summary, '> ') - # Create the markdown content - markdown_content = f'''--- -date: {email.datetime_received.strftime('%Y-%m-%d')} -tags: - - email ---- -| | | | -| --: | :--: | :--: | -| *received* | **{email.datetime_received.strftime('%B %d, %Y at %H:%M:%S %Z')}** | | -| *from* | **[[{email.sender}]]** | | -| *to* | {', '.join([f'**[[{recipient}]]**' for recipient in email.recipients])} | | -| *subject* | **{email.subject}** | | -''' + DEBUG(f"Saved markdown to {md_path}") + + return True - if summary: - markdown_content += f''' -> [!summary] Summary -> {summary} -''' - - if tts_path: - markdown_content += f''' -![[{tts_path}]] -''' - - markdown_content += f''' ---- -{email.body} -''' - - with open(md_path, 'w', encoding='utf-8') as md_file: - md_file.write(markdown_content) - - DEBUG(f"Saved markdown to {md_path}") + except Exception as e: + ERR(f"Exception: {e}") + return False +async def autorespond(this_email: IncomingEmail, account: EmailAccount): + matching_profiles = get_matching_autoresponders(this_email, account) + for profile in matching_profiles: + DEBUG(f"Auto-responding to {this_email.subject} with profile: {profile['USER_FULLNAME']}") + auto_response_subject = f"Auto-Response Re: {this_email.subject}" + auto_response_body = await generate_auto_response_body(this_email, profile) + DEBUG(f"Auto-response: {auto_response_body}") + await send_auto_response(this_email.sender, auto_response_subject, auto_response_body, profile, account) async def send_auto_response(to_email, subject, body, profile, account): DEBUG(f"Sending auto response to {to_email}...") @@ -243,35 +285,24 @@ async def send_auto_response(to_email, subject, body, profile, account): message.attach(MIMEText(body, 'plain')) if profile['IMG_GEN_PROMPT']: - jpg_path = sd.workflow(profile['IMG_GEN_PROMPT'], earlyout=False, downscale_to_fit=True) + jpg_path = await sd.workflow(profile['IMG_GEN_PROMPT'], earlyout=False, downscale_to_fit=True) if jpg_path and os.path.exists(jpg_path): with open(jpg_path, 'rb') as img_file: img = MIMEImage(img_file.read(), name=os.path.basename(jpg_path)) message.attach(img) - context = ssl._create_unverified_context() - with SMTP_SSL(account.smtp.host, account.smtp.port, context=context) as server: + with get_smtp_connection(account) as server: server.login(account.smtp.username, account.smtp.password) server.send_message(message) INFO(f"Auto-response sent to {to_email} concerning {subject} from account {account.name}") + return True except Exception as e: ERR(f"Error in preparing/sending auto-response from account {account.name}: {e}") - raise e - - + return False @email.on_event("startup") async def startup_event(): - asyncio.create_task(process_unread_emails()) - - - - - - #### - - - + asyncio.create_task(process_all_accounts()) \ No newline at end of file diff --git a/sijapi/routers/llm.py b/sijapi/routers/llm.py index b9e23cd..7338c49 100644 --- a/sijapi/routers/llm.py +++ b/sijapi/routers/llm.py @@ -1,10 +1,9 @@ #routers/llm.py -from fastapi import APIRouter, HTTPException, Request, Response -from fastapi.responses import StreamingResponse, JSONResponse -from starlette.responses import StreamingResponse +from fastapi import APIRouter, HTTPException, Request, Response, BackgroundTasks, File, Form, UploadFile +from fastapi.responses import StreamingResponse, JSONResponse, FileResponse from datetime import datetime as dt_datetime from dateutil import parser -from typing import List, Dict, Any, Union +from typing import List, Dict, Any, Union, Optional from pydantic import BaseModel, root_validator, ValidationError import aiofiles import os @@ -17,21 +16,20 @@ import base64 from pathlib import Path import ollama from ollama import AsyncClient as Ollama, list as OllamaList -import aiofiles import time import asyncio -from pathlib import Path -from fastapi import FastAPI, Request, HTTPException, APIRouter -from fastapi.responses import JSONResponse, StreamingResponse -from dotenv import load_dotenv -from sijapi import BASE_DIR, DATA_DIR, LOGS_DIR, CONFIG_DIR, LLM_SYS_MSG, DEFAULT_LLM, DEFAULT_VISION, REQUESTS_DIR, OBSIDIAN_CHROMADB_COLLECTION, OBSIDIAN_VAULT_DIR, DOC_DIR, OPENAI_API_KEY -from sijapi import DEBUG, INFO, WARN, ERR, CRITICAL -from sijapi.utilities import convert_to_unix_time, sanitize_filename +import tempfile +import shutil +import html2text +import markdown +from sijapi import LLM_SYS_MSG, DEFAULT_LLM, DEFAULT_VISION, REQUESTS_DIR, OBSIDIAN_CHROMADB_COLLECTION, OBSIDIAN_VAULT_DIR, DOC_DIR, OPENAI_API_KEY, DEBUG, INFO, WARN, ERR, CRITICAL, DEFAULT_VOICE, SUMMARY_INSTRUCT, SUMMARY_CHUNK_SIZE, SUMMARY_TPW, SUMMARY_CHUNK_OVERLAP, SUMMARY_LENGTH_RATIO, SUMMARY_TOKEN_LIMIT, SUMMARY_MIN_LENGTH, SUMMARY_MODEL +from sijapi.utilities import convert_to_unix_time, sanitize_filename, ocr_pdf, clean_text, should_use_ocr, extract_text_from_pdf, extract_text_from_docx, read_text_file, str_to_bool, get_extension +from sijapi.routers.tts import generate_speech +from sijapi.routers.asr import transcribe_audio + llm = APIRouter() - - # Initialize chromadb client client = chromadb.Client() OBSIDIAN_CHROMADB_COLLECTION = client.create_collection("obsidian") @@ -80,11 +78,11 @@ async def generate_response(prompt: str): return {"response": output['response']} -async def query_ollama(usr: str, sys: str = LLM_SYS_MSG, max_tokens: int = 200): +async def query_ollama(usr: str, sys: str = LLM_SYS_MSG, model: str = DEFAULT_LLM, max_tokens: int = 200): messages = [{"role": "system", "content": sys}, {"role": "user", "content": usr}] LLM = Ollama() - response = await LLM.chat(model=DEFAULT_LLM, messages=messages, options={"num_predict": max_tokens}) + response = await LLM.chat(model=model, messages=messages, options={"num_predict": max_tokens}) DEBUG(response) if "message" in response: @@ -482,3 +480,186 @@ def gpt4v(image_base64, prompt_sys: str, prompt_usr: str, max_tokens: int = 150) try_again = gpt4v(image_base64, prompt_sys, prompt_usr, max_tokens) return try_again + + +@llm.get("/summarize") +async def summarize_get(text: str = Form(None), instruction: str = Form(SUMMARY_INSTRUCT)): + summarized_text = await summarize_text(text, instruction) + return summarized_text + +@llm.post("/summarize") +async def summarize_post(file: Optional[UploadFile] = File(None), text: Optional[str] = Form(None), instruction: str = Form(SUMMARY_INSTRUCT)): + text_content = text if text else await extract_text(file) + summarized_text = await summarize_text(text_content, instruction) + return summarized_text + +@llm.post("/speaksummary") +async def summarize_tts_endpoint(background_tasks: BackgroundTasks, instruction: str = Form(SUMMARY_INSTRUCT), file: Optional[UploadFile] = File(None), text: Optional[str] = Form(None), voice: Optional[str] = Form(DEFAULT_VOICE), speed: Optional[float] = Form(1.2), podcast: Union[bool, str] = Form(False)): + + podcast = str_to_bool(str(podcast)) # Proper boolean conversion + text_content = text if text else extract_text(file) + final_output_path = await summarize_tts(text_content, instruction, voice, speed, podcast) + return FileResponse(path=final_output_path, filename=os.path.basename(final_output_path), media_type='audio/wav') + + +async def summarize_tts( + text: str, + instruction: str = SUMMARY_INSTRUCT, + voice: Optional[str] = DEFAULT_VOICE, + speed: float = 1.1, + podcast: bool = False, + LLM: Ollama = None +): + LLM = LLM if LLM else Ollama() + summarized_text = await summarize_text(text, instruction, LLM=LLM) + filename = await summarize_text(summarized_text, "Provide a title for this summary no longer than 4 words") + filename = sanitize_filename(filename) + filename = ' '.join(filename.split()[:5]) + timestamp = dt_datetime.now().strftime("%Y%m%d_%H%M%S") + filename = f"{timestamp}{filename}.wav" + + background_tasks = BackgroundTasks() + final_output_path = await generate_speech(background_tasks, summarized_text, voice, "xtts", speed=speed, podcast=podcast, title=filename) + DEBUG(f"summary_tts completed with final_output_path: {final_output_path}") + return final_output_path + + +async def get_title(text: str, LLM: Ollama() = None): + LLM = LLM if LLM else Ollama() + title = await process_chunk("Generate a title for this text", text, 1, 1, 12, LLM) + title = sanitize_filename(title) + return title + +def split_text_into_chunks(text: str) -> List[str]: + """ + Splits the given text into manageable chunks based on predefined size and overlap. + """ + words = text.split() + adjusted_chunk_size = max(1, int(SUMMARY_CHUNK_SIZE / SUMMARY_TPW)) # Ensure at least 1 + adjusted_overlap = max(0, int(SUMMARY_CHUNK_OVERLAP / SUMMARY_TPW)) # Ensure non-negative + chunks = [] + for i in range(0, len(words), adjusted_chunk_size - adjusted_overlap): + DEBUG(f"We are on iteration # {i} if split_text_into_chunks.") + chunk = ' '.join(words[i:i + adjusted_chunk_size]) + chunks.append(chunk) + return chunks + + +def calculate_max_tokens(text: str) -> int: + tokens_count = max(1, int(len(text.split()) * SUMMARY_TPW)) # Ensure at least 1 + return min(tokens_count // 4, SUMMARY_CHUNK_SIZE) + + +async def extract_text(file: Union[UploadFile, bytes, bytearray, str, Path], background_tasks: BackgroundTasks = None) -> str: + if isinstance(file, UploadFile): + file_extension = get_extension(file) + temp_file_path = tempfile.mktemp(suffix=file_extension) + with open(temp_file_path, 'wb') as buffer: + shutil.copyfileobj(file.file, buffer) + file_path = temp_file_path + elif isinstance(file, (bytes, bytearray)): + temp_file_path = tempfile.mktemp() + with open(temp_file_path, 'wb') as buffer: + buffer.write(file) + file_path = temp_file_path + elif isinstance(file, (str, Path)): + file_path = str(file) + else: + raise ValueError("Unsupported file type") + + _, file_ext = os.path.splitext(file_path) + file_ext = file_ext.lower() + text_content = "" + + if file_ext == '.pdf': + text_content = await extract_text_from_pdf(file_path) + elif file_ext in ['.wav', '.m4a', '.m4v', '.mp3', '.mp4']: + text_content = await transcribe_audio(file_path=file_path) + elif file_ext == '.md': + text_content = await read_text_file(file_path) + text_content = markdown.markdown(text_content) + elif file_ext == '.html': + text_content = await read_text_file(file_path) + text_content = html2text.html2text(text_content) + elif file_ext in ['.txt', '.csv', '.json']: + text_content = await read_text_file(file_path) + elif file_ext == '.docx': + text_content = await extract_text_from_docx(file_path) + + if background_tasks and 'temp_file_path' in locals(): + background_tasks.add_task(os.remove, temp_file_path) + elif 'temp_file_path' in locals(): + os.remove(temp_file_path) + + return text_content + +async def summarize_text(text: str, instruction: str = SUMMARY_INSTRUCT, length_override: int = None, length_quotient: float = SUMMARY_LENGTH_RATIO, LLM: Ollama = None): + """ + Process the given text: split into chunks, summarize each chunk, and + potentially summarize the concatenated summary for long texts. + """ + LLM = LLM if LLM else Ollama() + + chunked_text = split_text_into_chunks(text) + total_parts = max(1, len(chunked_text)) # Ensure at least 1 + + total_words_count = len(text.split()) + total_tokens_count = max(1, int(total_words_count * SUMMARY_TPW)) # Ensure at least 1 + total_summary_length = length_override if length_override else total_tokens_count // length_quotient + corrected_total_summary_length = min(total_summary_length, SUMMARY_TOKEN_LIMIT) + individual_summary_length = max(1, corrected_total_summary_length // total_parts) # Ensure at least 1 + + DEBUG(f"Text split into {total_parts} chunks.") + summaries = await asyncio.gather(*[ + process_chunk(instruction, chunk, i+1, total_parts, individual_summary_length, LLM) for i, chunk in enumerate(chunked_text) + ]) + + concatenated_summary = ' '.join(summaries) + + if total_parts > 1: + concatenated_summary = await process_chunk(instruction, concatenated_summary, 1, 1) + + return concatenated_summary + +async def process_chunk(instruction: str, text: str, part: int, total_parts: int, max_tokens: Optional[int] = None, LLM: Ollama = None) -> str: + """ + Process a portion of text using the ollama library asynchronously. + """ + + LLM = LLM if LLM else Ollama() + + words_count = max(1, len(text.split())) # Ensure at least 1 + tokens_count = max(1, int(words_count * SUMMARY_TPW)) # Ensure at least 1 + fraction_tokens = max(1, tokens_count // SUMMARY_LENGTH_RATIO) # Ensure at least 1 + if max_tokens is None: + max_tokens = min(fraction_tokens, SUMMARY_CHUNK_SIZE // max(1, total_parts)) # Ensure at least 1 + max_tokens = max(max_tokens, SUMMARY_MIN_LENGTH) # Ensure a minimum token count to avoid tiny processing chunks + + DEBUG(f"Summarizing part {part} of {total_parts}: Max_tokens: {max_tokens}") + + if part and total_parts > 1: + prompt = f"{instruction}. Part {part} of {total_parts}:\n{text}" + else: + prompt = f"{instruction}:\n\n{text}" + + DEBUG(f"Starting LLM.generate for part {part} of {total_parts}") + response = await LLM.generate( + model=SUMMARY_MODEL, + prompt=prompt, + stream=False, + options={'num_predict': max_tokens, 'temperature': 0.6} + ) + + text_response = response['response'] + DEBUG(f"Completed LLM.generate for part {part} of {total_parts}") + + return text_response + +async def title_and_summary(extracted_text: str): + title = await get_title(extracted_text) + processed_title = title.split("\n")[-1] + processed_title = processed_title.split("\r")[-1] + processed_title = sanitize_filename(processed_title) + summary = await summarize_text(extracted_text) + + return processed_title, summary \ No newline at end of file diff --git a/sijapi/routers/note.py b/sijapi/routers/note.py index 996c603..f83157a 100644 --- a/sijapi/routers/note.py +++ b/sijapi/routers/note.py @@ -17,13 +17,12 @@ from requests.adapters import HTTPAdapter import re import os from datetime import timedelta, datetime, time as dt_time, date as dt_date -from sijapi.utilities import localize_datetime from fastapi import HTTPException, status from pathlib import Path from fastapi import APIRouter, Query, HTTPException from sijapi import DEBUG, INFO, WARN, ERR, CRITICAL, INFO from sijapi import YEAR_FMT, MONTH_FMT, DAY_FMT, DAY_SHORT_FMT, OBSIDIAN_VAULT_DIR, OBSIDIAN_RESOURCES_DIR, BASE_URL, OBSIDIAN_BANNER_SCENE, DEFAULT_11L_VOICE, DEFAULT_VOICE, TZ -from sijapi.routers import tts, time, sd, locate, weather, asr, calendar, summarize +from sijapi.routers import tts, llm, time, sd, locate, weather, asr, calendar from sijapi.routers.locate import Location from sijapi.utilities import assemble_journal_path, convert_to_12_hour_format, sanitize_filename, convert_degrees_to_cardinal, HOURLY_COLUMNS_MAPPING @@ -39,7 +38,7 @@ async def build_daily_note_range_endpoint(dt_start: str, dt_end: str): results = [] current_date = start_date while current_date <= end_date: - formatted_date = localize_datetime(current_date) + formatted_date = await locate.localize_datetime(current_date) result = await build_daily_note(formatted_date) results.append(result) current_date += timedelta(days=1) @@ -58,7 +57,7 @@ Obsidian helper. Takes a datetime and creates a new daily note. Note: it uses th header = f"# [[{day_before}|← ]] {formatted_day} [[{day_after}| →]]\n\n" places = await locate.fetch_locations(date_time) - location = locate.reverse_geocode(places[0].latitude, places[0].longitude) + location = await locate.reverse_geocode(places[0].latitude, places[0].longitude) timeslips = await build_daily_timeslips(date_time) @@ -271,9 +270,9 @@ async def process_document( with open(file_path, 'wb') as f: f.write(document_content) - parsed_content = await summarize.extract_text(file_path) # Ensure extract_text is awaited + parsed_content = await llm.extract_text(file_path) # Ensure extract_text is awaited - llm_title, summary = await summarize.title_and_summary(parsed_content) + llm_title, summary = await llm.title_and_summary(parsed_content) try: readable_title = sanitize_filename(title if title else document.filename) @@ -342,7 +341,7 @@ async def process_article( timestamp = datetime.now().strftime('%b %d, %Y at %H:%M') - parsed_content = parse_article(url, source) + parsed_content = await parse_article(url, source) if parsed_content is None: return {"error": "Failed to retrieve content"} @@ -350,7 +349,7 @@ async def process_article( markdown_filename, relative_path = assemble_journal_path(datetime.now(), subdir="Articles", filename=readable_title, extension=".md") try: - summary = await summarize.summarize_text(parsed_content["content"], "Summarize the provided text. Respond with the summary and nothing else. Do not otherwise acknowledge the request. Just provide the requested summary.") + summary = await llm.summarize_text(parsed_content["content"], "Summarize the provided text. Respond with the summary and nothing else. Do not otherwise acknowledge the request. Just provide the requested summary.") summary = summary.replace('\n', ' ') # Remove line breaks if tts_mode == "full" or tts_mode == "content": @@ -427,7 +426,7 @@ tags: raise HTTPException(status_code=500, detail=str(e)) -def parse_article(url: str, source: Optional[str] = None): +async def parse_article(url: str, source: Optional[str] = None): source = source if source else trafilatura.fetch_url(url) traf = trafilatura.extract_metadata(filecontent=source, default_url=url) @@ -442,7 +441,12 @@ def parse_article(url: str, source: Optional[str] = None): title = np3k.title or traf.title authors = np3k.authors or traf.author authors = authors if isinstance(authors, List) else [authors] - date = np3k.publish_date or localize_datetime(traf.date) + date = np3k.publish_date or traf.date + try: + date = await locate.localize_datetime(date) + except: + DEBUG(f"Failed to localize {date}") + date = await locate.localize_datetime(datetime.now()) excerpt = np3k.meta_description or traf.description content = trafilatura.extract(source, output_format="markdown", include_comments=False) or np3k.text image = np3k.top_image or traf.image @@ -474,7 +478,7 @@ async def process_archive( timestamp = datetime.now().strftime('%b %d, %Y at %H:%M') - parsed_content = parse_article(url, source) + parsed_content = await parse_article(url, source) if parsed_content is None: return {"error": "Failed to retrieve content"} content = parsed_content["content"] @@ -635,7 +639,7 @@ async def banner_endpoint(dt: str, location: str = None, mood: str = None, other Endpoint (POST) that generates a new banner image for the Obsidian daily note for a specified date, taking into account optional additional information, then updates the frontmatter if necessary. ''' DEBUG(f"banner_endpoint requested with date: {dt} ({type(dt)})") - date_time = localize_datetime(dt) + date_time = await locate.localize_datetime(dt) DEBUG(f"date_time after localization: {date_time} ({type(date_time)})") jpg_path = await generate_banner(date_time, location, mood=mood, other_context=other_context) return jpg_path @@ -643,7 +647,7 @@ async def banner_endpoint(dt: str, location: str = None, mood: str = None, other async def generate_banner(dt, location: Location = None, forecast: str = None, mood: str = None, other_context: str = None): DEBUG(f"Location: {location}, forecast: {forecast}, mood: {mood}, other_context: {other_context}") - date_time = localize_datetime(dt) + date_time = await locate.localize_datetime(dt) DEBUG(f"generate_banner called with date_time: {date_time}") destination_path, local_path = assemble_journal_path(date_time, filename="Banner", extension=".jpg", no_timestamp = True) DEBUG(f"destination path generated: {destination_path}") @@ -699,7 +703,7 @@ async def note_weather_get( ): try: - date_time = datetime.now() if date == "0" else localize_datetime(date) + date_time = datetime.now() if date == "0" else locate.localize_datetime(date) DEBUG(f"date: {date} .. date_time: {date_time}") content = await update_dn_weather(date_time) #, lat, lon) return JSONResponse(content={"forecast": content}, status_code=200) @@ -714,7 +718,7 @@ async def note_weather_get( @note.post("/update/note/{date}") async def post_update_daily_weather_and_calendar_and_timeslips(date: str) -> PlainTextResponse: - date_time = localize_datetime(date) + date_time = await locate.localize_datetime(date) await update_dn_weather(date_time) await update_daily_note_events(date_time) await build_daily_timeslips(date_time) @@ -1091,7 +1095,7 @@ async def format_events_as_markdown(event_data: Dict[str, Union[str, List[Dict[s # description = remove_characters(description) # description = remove_characters(description) if len(description) > 150: - description = await summarize.summarize_text(description, length_override=150) + description = await llm.summarize_text(description, length_override=150) event_markdown += f"\n * {description}" event_markdown += f"\n " @@ -1117,7 +1121,7 @@ async def format_events_as_markdown(event_data: Dict[str, Union[str, List[Dict[s @note.get("/note/events", response_class=PlainTextResponse) async def note_events_endpoint(date: str = Query(None)): - date_time = localize_datetime(date) if date else datetime.now(TZ) + date_time = await locate.localize_datetime(date) if date else datetime.now(TZ) response = await update_daily_note_events(date_time) return PlainTextResponse(content=response, status_code=200) diff --git a/sijapi/routers/serve.py b/sijapi/routers/serve.py index f9271c1..8ea6cc3 100644 --- a/sijapi/routers/serve.py +++ b/sijapi/routers/serve.py @@ -14,7 +14,8 @@ from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from pathlib import Path from sijapi import DEBUG, INFO, WARN, ERR, CRITICAL -from sijapi.utilities import bool_convert, sanitize_filename, assemble_journal_path, localize_datetime +from sijapi.utilities import bool_convert, sanitize_filename, assemble_journal_path +from sijapi.routers.locate import localize_datetime from sijapi import DATA_DIR, SD_IMAGE_DIR, PUBLIC_KEY, OBSIDIAN_VAULT_DIR serve = APIRouter(tags=["public"]) @@ -50,7 +51,7 @@ def is_valid_date(date_str: str) -> bool: @serve.get("/notes/{file_path:path}") async def get_file(file_path: str): try: - date_time = localize_datetime(file_path); + date_time = await localize_datetime(file_path); absolute_path, local_path = assemble_journal_path(date_time, no_timestamp = True) except ValueError as e: DEBUG(f"Unable to parse {file_path} as a date, now trying to use it as a local path") diff --git a/sijapi/routers/tts.py b/sijapi/routers/tts.py index 3d89f6e..2eca2d2 100644 --- a/sijapi/routers/tts.py +++ b/sijapi/routers/tts.py @@ -273,7 +273,17 @@ async def get_voice_file_path(voice: str = None, voice_file: UploadFile = None) return select_voice(DEFAULT_VOICE) -async def local_tts(text_content: str, speed: float, voice: str, voice_file = None, podcast: bool = False, background_tasks: BackgroundTasks = None, title: str = None, output_path: Optional[Path] = None) -> str: + +async def local_tts( + text_content: str, + speed: float, + voice: str, + voice_file = None, + podcast: bool = False, + background_tasks: BackgroundTasks = None, + title: str = None, + output_path: Optional[Path] = None +) -> str: if output_path: file_path = Path(output_path) else: @@ -286,27 +296,47 @@ async def local_tts(text_content: str, speed: float, voice: str, voice_file = No file_path.parent.mkdir(parents=True, exist_ok=True) voice_file_path = await get_voice_file_path(voice, voice_file) - XTTS = TTS(model_name=MODEL_NAME).to(DEVICE) + + # Initialize TTS model in a separate thread + XTTS = await asyncio.to_thread(TTS, model_name=MODEL_NAME) + await asyncio.to_thread(XTTS.to, DEVICE) + segments = split_text(text_content) combined_audio = AudioSegment.silent(duration=0) for i, segment in enumerate(segments): segment_file_path = TTS_SEGMENTS_DIR / f"segment_{i}.wav" DEBUG(f"Segment file path: {segment_file_path}") - segment_file = await asyncio.to_thread(XTTS.tts_to_file, text=segment, speed=speed, file_path=str(segment_file_path), speaker_wav=[voice_file_path], language="en") - DEBUG(f"Segment file generated: {segment_file}") - combined_audio += AudioSegment.from_wav(str(segment_file)) - # Delete the segment file immediately after adding it to the combined audio - segment_file_path.unlink() + + # Run TTS in a separate thread + await asyncio.to_thread( + XTTS.tts_to_file, + text=segment, + speed=speed, + file_path=str(segment_file_path), + speaker_wav=[voice_file_path], + language="en" + ) + DEBUG(f"Segment file generated: {segment_file_path}") + + # Load and combine audio in a separate thread + segment_audio = await asyncio.to_thread(AudioSegment.from_wav, str(segment_file_path)) + combined_audio += segment_audio + # Delete the segment file + await asyncio.to_thread(segment_file_path.unlink) + + # Export the combined audio in a separate thread if podcast: podcast_file_path = PODCAST_DIR / file_path.name - combined_audio.export(podcast_file_path, format="wav") + await asyncio.to_thread(combined_audio.export, podcast_file_path, format="wav") + + await asyncio.to_thread(combined_audio.export, file_path, format="wav") - combined_audio.export(file_path, format="wav") return str(file_path) + async def stream_tts(text_content: str, speed: float, voice: str, voice_file) -> StreamingResponse: voice_file_path = await get_voice_file_path(voice, voice_file) segments = split_text(text_content) diff --git a/sijapi/routers/weather.py b/sijapi/routers/weather.py index fd6b0f2..a2a825f 100644 --- a/sijapi/routers/weather.py +++ b/sijapi/routers/weather.py @@ -7,10 +7,9 @@ from typing import Dict from datetime import datetime from shapely.wkb import loads from binascii import unhexlify -from sijapi.utilities import localize_datetime from sijapi import DEBUG, INFO, WARN, ERR, CRITICAL -from sijapi import VISUALCROSSING_API_KEY, TZ -from sijapi.utilities import get_db_connection, haversine +from sijapi import VISUALCROSSING_API_KEY, TZ, DB +from sijapi.utilities import haversine from sijapi.routers import locate weather = APIRouter() @@ -25,7 +24,7 @@ async def get_weather(date_time: datetime, latitude: float, longitude: float): try: DEBUG(f"Daily weather data from db: {daily_weather_data}") last_updated = str(daily_weather_data['DailyWeather'].get('last_updated')) - last_updated = localize_datetime(last_updated) + last_updated = await locate.localize_datetime(last_updated) stored_loc_data = unhexlify(daily_weather_data['DailyWeather'].get('location')) stored_loc = loads(stored_loc_data) stored_lat = stored_loc.y @@ -84,182 +83,180 @@ async def get_weather(date_time: datetime, latitude: float, longitude: float): async def store_weather_to_db(date_time: datetime, weather_data: dict): - conn = await get_db_connection() + async with DB.get_connection() as conn: + try: + day_data = weather_data.get('days')[0] + DEBUG(f"day_data.get('sunrise'): {day_data.get('sunrise')}") + + # Handle preciptype and stations as PostgreSQL arrays + preciptype_array = day_data.get('preciptype', []) or [] + stations_array = day_data.get('stations', []) or [] + + date_str = date_time.strftime("%Y-%m-%d") + + # Get location details from weather data if available + longitude = weather_data.get('longitude') + latitude = weather_data.get('latitude') + elevation = locate.get_elevation(latitude, longitude) # 152.4 # default until we add a geocoder that can look up actual elevation; weather_data.get('elevation') # assuming 'elevation' key, replace if different + location_point = f"POINTZ({longitude} {latitude} {elevation})" if longitude and latitude and elevation else None + + # Correct for the datetime objects + day_data['datetime'] = await locate.localize_datetime(day_data.get('datetime')) #day_data.get('datetime')) + day_data['sunrise'] = day_data['datetime'].replace(hour=int(day_data.get('sunrise').split(':')[0]), minute=int(day_data.get('sunrise').split(':')[1])) + day_data['sunset'] = day_data['datetime'].replace(hour=int(day_data.get('sunset').split(':')[0]), minute=int(day_data.get('sunset').split(':')[1])) + + daily_weather_params = ( + day_data.get('sunrise'), day_data.get('sunriseEpoch'), + day_data.get('sunset'), day_data.get('sunsetEpoch'), + day_data.get('description'), day_data.get('tempmax'), + day_data.get('tempmin'), day_data.get('uvindex'), + day_data.get('winddir'), day_data.get('windspeed'), + day_data.get('icon'), datetime.now(), + day_data.get('datetime'), day_data.get('datetimeEpoch'), + day_data.get('temp'), day_data.get('feelslikemax'), + day_data.get('feelslikemin'), day_data.get('feelslike'), + day_data.get('dew'), day_data.get('humidity'), + day_data.get('precip'), day_data.get('precipprob'), + day_data.get('precipcover'), preciptype_array, + day_data.get('snow'), day_data.get('snowdepth'), + day_data.get('windgust'), day_data.get('pressure'), + day_data.get('cloudcover'), day_data.get('visibility'), + day_data.get('solarradiation'), day_data.get('solarenergy'), + day_data.get('severerisk', 0), day_data.get('moonphase'), + day_data.get('conditions'), stations_array, day_data.get('source'), + location_point + ) + except Exception as e: + ERR(f"Failed to prepare database query in store_weather_to_db! {e}") - try: - day_data = weather_data.get('days')[0] - DEBUG(f"day_data.get('sunrise'): {day_data.get('sunrise')}") - - # Handle preciptype and stations as PostgreSQL arrays - preciptype_array = day_data.get('preciptype', []) or [] - stations_array = day_data.get('stations', []) or [] - - date_str = date_time.strftime("%Y-%m-%d") - - # Get location details from weather data if available - longitude = weather_data.get('longitude') - latitude = weather_data.get('latitude') - elevation = locate.get_elevation(latitude, longitude) # 152.4 # default until we add a geocoder that can look up actual elevation; weather_data.get('elevation') # assuming 'elevation' key, replace if different - location_point = f"POINTZ({longitude} {latitude} {elevation})" if longitude and latitude and elevation else None - - # Correct for the datetime objects - day_data['datetime'] = localize_datetime(day_data.get('datetime')) #day_data.get('datetime')) - day_data['sunrise'] = day_data['datetime'].replace(hour=int(day_data.get('sunrise').split(':')[0]), minute=int(day_data.get('sunrise').split(':')[1])) - day_data['sunset'] = day_data['datetime'].replace(hour=int(day_data.get('sunset').split(':')[0]), minute=int(day_data.get('sunset').split(':')[1])) - - daily_weather_params = ( - day_data.get('sunrise'), day_data.get('sunriseEpoch'), - day_data.get('sunset'), day_data.get('sunsetEpoch'), - day_data.get('description'), day_data.get('tempmax'), - day_data.get('tempmin'), day_data.get('uvindex'), - day_data.get('winddir'), day_data.get('windspeed'), - day_data.get('icon'), datetime.now(), - day_data.get('datetime'), day_data.get('datetimeEpoch'), - day_data.get('temp'), day_data.get('feelslikemax'), - day_data.get('feelslikemin'), day_data.get('feelslike'), - day_data.get('dew'), day_data.get('humidity'), - day_data.get('precip'), day_data.get('precipprob'), - day_data.get('precipcover'), preciptype_array, - day_data.get('snow'), day_data.get('snowdepth'), - day_data.get('windgust'), day_data.get('pressure'), - day_data.get('cloudcover'), day_data.get('visibility'), - day_data.get('solarradiation'), day_data.get('solarenergy'), - day_data.get('severerisk', 0), day_data.get('moonphase'), - day_data.get('conditions'), stations_array, day_data.get('source'), - location_point - ) - except Exception as e: - ERR(f"Failed to prepare database query in store_weather_to_db! {e}") - - try: - daily_weather_query = ''' - INSERT INTO DailyWeather ( - sunrise, sunriseEpoch, sunset, sunsetEpoch, description, - tempmax, tempmin, uvindex, winddir, windspeed, icon, last_updated, - datetime, datetimeEpoch, temp, feelslikemax, feelslikemin, feelslike, - dew, humidity, precip, precipprob, precipcover, preciptype, - snow, snowdepth, windgust, pressure, cloudcover, visibility, - solarradiation, solarenergy, severerisk, moonphase, conditions, - stations, source, location - ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28, $29, $30, $31, $32, $33, $34, $35, $36, $37, $38) - RETURNING id - ''' - - # Debug logs for better insights - # DEBUG("Executing query: %s", daily_weather_query) - # DEBUG("With parameters: %s", daily_weather_params) - - # Execute the query to insert daily weather data - async with conn.transaction(): - daily_weather_id = await conn.fetchval(daily_weather_query, *daily_weather_params) - - - if 'hours' in day_data: - for hour_data in day_data['hours']: - try: - await asyncio.sleep(0.1) - # hour_data['datetime'] = parse_date(hour_data.get('datetime')) - hour_timestamp = date_str + ' ' + hour_data['datetime'] - hour_data['datetime'] = localize_datetime(hour_timestamp) - DEBUG(f"Processing hours now...") - # DEBUG(f"Processing {hour_data['datetime']}") - - hour_preciptype_array = hour_data.get('preciptype', []) or [] - hour_stations_array = hour_data.get('stations', []) or [] - hourly_weather_params = ( - daily_weather_id, - hour_data['datetime'], - hour_data.get('datetimeEpoch'), - hour_data['temp'], - hour_data['feelslike'], - hour_data['humidity'], - hour_data['dew'], - hour_data['precip'], - hour_data['precipprob'], - hour_preciptype_array, - hour_data['snow'], - hour_data['snowdepth'], - hour_data['windgust'], - hour_data['windspeed'], - hour_data['winddir'], - hour_data['pressure'], - hour_data['cloudcover'], - hour_data['visibility'], - hour_data['solarradiation'], - hour_data['solarenergy'], - hour_data['uvindex'], - hour_data.get('severerisk', 0), - hour_data['conditions'], - hour_data['icon'], - hour_stations_array, - hour_data.get('source', ''), - ) + try: + daily_weather_query = ''' + INSERT INTO DailyWeather ( + sunrise, sunriseEpoch, sunset, sunsetEpoch, description, + tempmax, tempmin, uvindex, winddir, windspeed, icon, last_updated, + datetime, datetimeEpoch, temp, feelslikemax, feelslikemin, feelslike, + dew, humidity, precip, precipprob, precipcover, preciptype, + snow, snowdepth, windgust, pressure, cloudcover, visibility, + solarradiation, solarenergy, severerisk, moonphase, conditions, + stations, source, location + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28, $29, $30, $31, $32, $33, $34, $35, $36, $37, $38) + RETURNING id + ''' + + # Debug logs for better insights + # DEBUG("Executing query: %s", daily_weather_query) + # DEBUG("With parameters: %s", daily_weather_params) + # Execute the query to insert daily weather data + async with conn.transaction(): + daily_weather_id = await conn.fetchval(daily_weather_query, *daily_weather_params) + + + if 'hours' in day_data: + for hour_data in day_data['hours']: try: - hourly_weather_query = ''' - INSERT INTO HourlyWeather (daily_weather_id, datetime, datetimeEpoch, temp, feelslike, humidity, dew, precip, precipprob, - preciptype, snow, snowdepth, windgust, windspeed, winddir, pressure, cloudcover, visibility, solarradiation, solarenergy, - uvindex, severerisk, conditions, icon, stations, source) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26) - RETURNING id - ''' - # Debug logs for better insights - # DEBUG("Executing query: %s", hourly_weather_query) - # DEBUG("With parameters: %s", hourly_weather_params) + await asyncio.sleep(0.1) + # hour_data['datetime'] = parse_date(hour_data.get('datetime')) + hour_timestamp = date_str + ' ' + hour_data['datetime'] + hour_data['datetime'] = await locate.localize_datetime(hour_timestamp) + DEBUG(f"Processing hours now...") + # DEBUG(f"Processing {hour_data['datetime']}") + + hour_preciptype_array = hour_data.get('preciptype', []) or [] + hour_stations_array = hour_data.get('stations', []) or [] + hourly_weather_params = ( + daily_weather_id, + hour_data['datetime'], + hour_data.get('datetimeEpoch'), + hour_data['temp'], + hour_data['feelslike'], + hour_data['humidity'], + hour_data['dew'], + hour_data['precip'], + hour_data['precipprob'], + hour_preciptype_array, + hour_data['snow'], + hour_data['snowdepth'], + hour_data['windgust'], + hour_data['windspeed'], + hour_data['winddir'], + hour_data['pressure'], + hour_data['cloudcover'], + hour_data['visibility'], + hour_data['solarradiation'], + hour_data['solarenergy'], + hour_data['uvindex'], + hour_data.get('severerisk', 0), + hour_data['conditions'], + hour_data['icon'], + hour_stations_array, + hour_data.get('source', ''), + ) + + try: + hourly_weather_query = ''' + INSERT INTO HourlyWeather (daily_weather_id, datetime, datetimeEpoch, temp, feelslike, humidity, dew, precip, precipprob, + preciptype, snow, snowdepth, windgust, windspeed, winddir, pressure, cloudcover, visibility, solarradiation, solarenergy, + uvindex, severerisk, conditions, icon, stations, source) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26) + RETURNING id + ''' + # Debug logs for better insights + # DEBUG("Executing query: %s", hourly_weather_query) + # DEBUG("With parameters: %s", hourly_weather_params) + + # Execute the query to insert hourly weather data + async with conn.transaction(): + hourly_weather_id = await conn.fetchval(hourly_weather_query, *hourly_weather_params) + # ERR(f"\n{hourly_weather_id}") + + except Exception as e: + ERR(f"EXCEPTION: {e}") - # Execute the query to insert hourly weather data - async with conn.transaction(): - hourly_weather_id = await conn.fetchval(hourly_weather_query, *hourly_weather_params) - # ERR(f"\n{hourly_weather_id}") - except Exception as e: ERR(f"EXCEPTION: {e}") - except Exception as e: - ERR(f"EXCEPTION: {e}") - - return "SUCCESS" - - except Exception as e: - ERR(f"Error in dailyweather storage: {e}") + return "SUCCESS" + + except Exception as e: + ERR(f"Error in dailyweather storage: {e}") async def get_weather_from_db(date_time: datetime, latitude: float, longitude: float): - conn = await get_db_connection() + async with DB.get_connection() as conn: + query_date = date_time.date() + try: + # Query to get daily weather data + query = ''' + SELECT DW.* FROM DailyWeather DW + WHERE DW.datetime::date = $1 + AND ST_DWithin(DW.location::geography, ST_MakePoint($2,$3)::geography, 8046.72) + ORDER BY ST_Distance(DW.location, ST_MakePoint($4, $5)::geography) ASC + LIMIT 1 + ''' - query_date = date_time.date() - try: - # Query to get daily weather data - query = ''' - SELECT DW.* FROM DailyWeather DW - WHERE DW.datetime::date = $1 - AND ST_DWithin(DW.location::geography, ST_MakePoint($2,$3)::geography, 8046.72) - ORDER BY ST_Distance(DW.location, ST_MakePoint($4, $5)::geography) ASC - LIMIT 1 - ''' + daily_weather_data = await conn.fetchrow(query, query_date, longitude, latitude, longitude, latitude) - daily_weather_data = await conn.fetchrow(query, query_date, longitude, latitude, longitude, latitude) + if daily_weather_data is None: + DEBUG(f"No daily weather data retrieved from database.") + return None + # else: + # DEBUG(f"Daily_weather_data: {daily_weather_data}") + # Query to get hourly weather data + query = ''' + SELECT HW.* FROM HourlyWeather HW + WHERE HW.daily_weather_id = $1 + ''' + hourly_weather_data = await conn.fetch(query, daily_weather_data['id']) - if daily_weather_data is None: - DEBUG(f"No daily weather data retrieved from database.") - return None - # else: - # DEBUG(f"Daily_weather_data: {daily_weather_data}") - # Query to get hourly weather data - query = ''' - SELECT HW.* FROM HourlyWeather HW - WHERE HW.daily_weather_id = $1 - ''' - hourly_weather_data = await conn.fetch(query, daily_weather_data['id']) - - day: Dict = { - 'DailyWeather': dict(daily_weather_data), - 'HourlyWeather': [dict(row) for row in hourly_weather_data], - } - # DEBUG(f"day: {day}") - return day - except Exception as e: - ERR(f"Unexpected error occurred: {e}") + day: Dict = { + 'DailyWeather': dict(daily_weather_data), + 'HourlyWeather': [dict(row) for row in hourly_weather_data], + } + # DEBUG(f"day: {day}") + return day + except Exception as e: + ERR(f"Unexpected error occurred: {e}") diff --git a/sijapi/utilities.py b/sijapi/utilities.py index 83b48ec..a89fae0 100644 --- a/sijapi/utilities.py +++ b/sijapi/utilities.py @@ -17,6 +17,8 @@ from datetime import datetime, date, time from typing import Optional, Union, Tuple import asyncio from PIL import Image +import pandas as pd +from scipy.spatial import cKDTree from dateutil.parser import parse as dateutil_parse from docx import Document import asyncpg @@ -24,7 +26,7 @@ from sshtunnel import SSHTunnelForwarder from fastapi import Depends, HTTPException, Request, UploadFile from fastapi.security.api_key import APIKeyHeader from sijapi import DEBUG, INFO, WARN, ERR, CRITICAL -from sijapi import DB, GLOBAL_API_KEY, DB, DB_HOST, DB_PORT, DB_USER, DB_PASS, TZ, YEAR_FMT, MONTH_FMT, DAY_FMT, DAY_SHORT_FMT, OBSIDIAN_VAULT_DIR, ALLOWED_FILENAME_CHARS, MAX_FILENAME_LENGTH +from sijapi import GLOBAL_API_KEY, YEAR_FMT, MONTH_FMT, DAY_FMT, DAY_SHORT_FMT, OBSIDIAN_VAULT_DIR, ALLOWED_FILENAME_CHARS, MAX_FILENAME_LENGTH api_key_header = APIKeyHeader(name="Authorization") @@ -141,64 +143,38 @@ def sanitize_filename(text, max_length=MAX_FILENAME_LENGTH): """Sanitize a string to be used as a safe filename while protecting the file extension.""" DEBUG(f"Filename before sanitization: {text}") - # Replace multiple spaces with a single space and remove other whitespace text = re.sub(r'\s+', ' ', text) - - # Remove any non-word characters except space, dot, and hyphen sanitized = re.sub(ALLOWED_FILENAME_CHARS, '', text) - - # Remove leading/trailing spaces sanitized = sanitized.strip() - - # Split the filename into base name and extension base_name, extension = os.path.splitext(sanitized) - # Calculate the maximum length for the base name max_base_length = max_length - len(extension) - - # Truncate the base name if necessary if len(base_name) > max_base_length: base_name = base_name[:max_base_length].rstrip() - - # Recombine the base name and extension final_filename = base_name + extension - # In case the extension itself is too long, truncate the entire filename - if len(final_filename) > max_length: - final_filename = final_filename[:max_length] - DEBUG(f"Filename after sanitization: {final_filename}") return final_filename - def check_file_name(file_name, max_length=255): """Check if the file name needs sanitization based on the criteria of the second sanitize_filename function.""" - DEBUG(f"Checking filename: {file_name}") needs_sanitization = False - # Check for length if len(file_name) > max_length: - DEBUG(f"Filename exceeds maximum length of {max_length}") + DEBUG(f"Filename exceeds maximum length of {max_length}: {file_name}") needs_sanitization = True - - # Check for non-word characters (except space, dot, and hyphen) if re.search(ALLOWED_FILENAME_CHARS, file_name): - DEBUG("Filename contains non-word characters (except space, dot, and hyphen)") + DEBUG(f"Filename contains non-word characters (except space, dot, and hyphen): {file_name}") needs_sanitization = True - - # Check for multiple consecutive spaces if re.search(r'\s{2,}', file_name): - DEBUG("Filename contains multiple consecutive spaces") + DEBUG(f"Filename contains multiple consecutive spaces: {file_name}") needs_sanitization = True - - # Check for leading/trailing spaces if file_name != file_name.strip(): - DEBUG("Filename has leading or trailing spaces") + DEBUG(f"Filename has leading or trailing spaces: {file_name}") needs_sanitization = True - DEBUG(f"Filename {'needs' if needs_sanitization else 'does not need'} sanitization") return needs_sanitization @@ -381,49 +357,6 @@ def convert_to_unix_time(iso_date_str): return int(dt.timestamp()) -async def get_db_connection(): - conn = await asyncpg.connect( - database=DB, - user=DB_USER, - password=DB_PASS, - host=DB_HOST, - port=DB_PORT - ) - return conn - -temp = """ -def get_db_connection_ssh(ssh: bool = True): - if ssh: - with SSHTunnelForwarder( - (DB_SSH, 22), - DB_SSH_USER=DB_SSH_USER, - DB_SSH_PASS=DB_SSH_PASS, - remote_bind_address=DB_SSH, - local_bind_address=(DB_HOST, DB_PORT) - ) as tunnel: conn = psycopg2.connect( - dbname=DB, - user=DB_USER, - password=DB_PASS, - host=DB_HOST, - port=DB_PORT - ) - else: - conn = psycopg2.connect( - dbname=DB, - user=DB_USER, - password=DB_PASS, - host=DB_HOST, - port=DB_PORT - ) - - return conn -""" - -def db_localized(): - # ssh = True if TS_IP == DB_SSH else False - return get_db_connection() - - def haversine(lat1, lon1, lat2, lon2): """ Calculate the great circle distance between two points on the earth specified in decimal degrees. """ lat1, lon1, lat2, lon2 = map(math.radians, [lat1, lon1, lat2, lon2]) @@ -445,30 +378,6 @@ def convert_degrees_to_cardinal(d): return dirs[ix % len(dirs)] -def localize_datetime(dt): - initial_dt = dt - try: - if isinstance(dt, str): - dt = dateutil_parse(dt) - DEBUG(f"{initial_dt} was a string so we attempted converting to datetime. Result: {dt}") - - - if isinstance(dt, datetime): - DEBUG(f"{dt} is a datetime object, so we will ensure it is tz-aware.") - if dt.tzinfo is None: - dt = dt.replace(tzinfo=TZ) - # DEBUG(f"{dt} should now be tz-aware. Returning it now.") - return dt - else: - # DEBUG(f"{dt} already was tz-aware. Returning it now.") - return dt - else: - ERR(f"Conversion failed") - raise TypeError("Conversion failed") - except Exception as e: - ERR(f"Error parsing datetime: {e}") - raise TypeError("Input must be a string or datetime object") - HOURLY_COLUMNS_MAPPING = { "12am": "00:00:00", @@ -531,4 +440,22 @@ def resize_and_convert_image(image_path, max_size=2160, quality=80): img.save(img_byte_arr, format='JPEG', quality=quality) img_byte_arr = img_byte_arr.getvalue() - return img_byte_arr \ No newline at end of file + return img_byte_arr + + +def load_geonames_data(path: str): + columns = ['geonameid', 'name', 'asciiname', 'alternatenames', + 'latitude', 'longitude', 'feature_class', 'feature_code', + 'country_code', 'cc2', 'admin1_code', 'admin2_code', 'admin3_code', + 'admin4_code', 'population', 'elevation', 'dem', 'timezone', 'modification_date'] + + data = pd.read_csv( + path, + sep='\t', + header=None, + names=columns, + low_memory=False + ) + + return data +