diff --git a/sijapi/__init__.py b/sijapi/__init__.py index 254fba5..973e44c 100644 --- a/sijapi/__init__.py +++ b/sijapi/__init__.py @@ -161,7 +161,8 @@ ICS_PATH = DATA_DIR / 'calendar.ics' # deprecated now, but maybe revive? ICALENDARS = os.getenv('ICALENDARS', 'NULL,VOID').split(',') EMAIL_CONFIG = CONFIG_DIR / "email.yaml" -AUTORESPOND = True +EMAIL_LOGS = LOGS_DIR / "email" +os.makedirs(EMAIL_LOGS, exist_ok = True) ### Courtlistener & other webhooks COURTLISTENER_DOCKETS_DIR = DATA_DIR / "courtlistener" / "dockets" diff --git a/sijapi/helpers/log_prior_emails.py b/sijapi/helpers/log_prior_emails.py new file mode 100644 index 0000000..1be6267 --- /dev/null +++ b/sijapi/helpers/log_prior_emails.py @@ -0,0 +1,64 @@ +import asyncio +from pathlib import Path +from sijapi import DEBUG, INFO, ERR +from sijapi import EMAIL_CONFIG, EMAIL_LOGS +from sijapi.classes import EmailAccount +from sijapi.routers import email + +async def initialize_log_files(): + summarized_log = EMAIL_LOGS / "summarized.txt" + autoresponded_log = EMAIL_LOGS / "autoresponded.txt" + diagnostic_log = EMAIL_LOGS / "diagnostic.txt" + for log_file in [summarized_log, autoresponded_log, diagnostic_log]: + log_file.parent.mkdir(parents=True, exist_ok=True) + log_file.write_text("") + DEBUG(f"Log files initialized: {summarized_log}, {autoresponded_log}, {diagnostic_log}") + return summarized_log, autoresponded_log, diagnostic_log + +async def process_all_emails(account: EmailAccount, summarized_log: Path, autoresponded_log: Path, diagnostic_log: Path): + try: + with email.get_imap_connection(account) as inbox: + DEBUG(f"Connected to {account.name}, processing all emails...") + all_messages = inbox.messages() + unread_messages = set(uid for uid, _ in inbox.messages(unread=True)) + + processed_count = 0 + for identifier, message in all_messages: + # Log diagnostic information + with open(diagnostic_log, 'a') as f: + f.write(f"Account: {account.name}, Raw Identifier: {identifier}, Type: {type(identifier)}\n") + + # Attempt to get a string representation of the identifier + if isinstance(identifier, bytes): + id_str = identifier.decode() + elif isinstance(identifier, (int, str)): + id_str = str(identifier) + else: + id_str = repr(identifier) + + if identifier not in unread_messages: + processed_count += 1 + for log_file in [summarized_log, autoresponded_log]: + with open(log_file, 'a') as f: + f.write(f"{id_str}\n") + + INFO(f"Processed {processed_count} non-unread emails for account {account.name}") + except Exception as e: + ERR(f"An error occurred while processing emails for account {account.name}: {e}") + +async def main(): + email_accounts = email.load_email_accounts(EMAIL_CONFIG) + summarized_log, autoresponded_log, diagnostic_log = await initialize_log_files() + + DEBUG(f"Processing {len(email_accounts)} email accounts") + + tasks = [process_all_emails(account, summarized_log, autoresponded_log, diagnostic_log) for account in email_accounts] + await asyncio.gather(*tasks) + + # Final verification + with open(summarized_log, 'r') as f: + final_count = len(f.readlines()) + INFO(f"Final non-unread email count: {final_count}") + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/sijapi/logs.py b/sijapi/logs.py index c0ba953..dab689b 100644 --- a/sijapi/logs.py +++ b/sijapi/logs.py @@ -1,65 +1,36 @@ import os import sys -import logging -from logging.handlers import RotatingFileHandler -from colorama import Fore, Back, Style, init as colorama_init +from loguru import logger import traceback -# Force colorama to initialize for the current platform -colorama_init(autoreset=True, strip=False, convert=True) - -class ColorFormatter(logging.Formatter): - """Custom formatter to add colors to log levels.""" - COLOR_MAP = { - logging.DEBUG: Fore.CYAN, - logging.INFO: Fore.GREEN, - logging.WARNING: Fore.YELLOW, - logging.ERROR: Fore.RED, - logging.CRITICAL: Fore.MAGENTA + Back.WHITE, - } - - def format(self, record): - log_message = super().format(record) - color = self.COLOR_MAP.get(record.levelno, '') - return f"{color}{log_message}{Style.RESET_ALL}" - class Logger: def __init__(self, name, logs_dir): self.logs_dir = logs_dir - self.logger = logging.getLogger(name) - self.logger.setLevel(logging.DEBUG) + self.name = name + self.logger = logger.bind(name=name) def setup_from_args(self, args): if not os.path.exists(self.logs_dir): os.makedirs(self.logs_dir) + # Remove default logger + logger.remove() + # File handler handler_path = os.path.join(self.logs_dir, 'app.log') - file_handler = RotatingFileHandler(handler_path, maxBytes=2000000, backupCount=10) - file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) - file_handler.setLevel(logging.DEBUG) + logger.add(handler_path, rotation="2 MB", compression="zip", level="DEBUG", format="{time:YYYY-MM-DD HH:mm:ss} - {name} - {level} - {message}") # Console handler - console_handler = logging.StreamHandler(sys.stdout) # Explicitly use sys.stdout - console_formatter = ColorFormatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') - console_handler.setFormatter(console_formatter) - - # Set console handler level based on args - if args.debug: - console_handler.setLevel(logging.DEBUG) - else: - console_handler.setLevel(logging.INFO) - - # Add handlers to logger - self.logger.addHandler(file_handler) - self.logger.addHandler(console_handler) + log_format = "{time:YYYY-MM-DD HH:mm:ss} - {name} - {level: <8} - {message}" + console_level = "DEBUG" if args.debug else "INFO" + logger.add(sys.stdout, format=log_format, level=console_level, colorize=True) # Test color output self.logger.debug("Debug message (should be Cyan)") self.logger.info("Info message (should be Green)") self.logger.warning("Warning message (should be Yellow)") self.logger.error("Error message (should be Red)") - self.logger.critical("Critical message (should be Magenta on White)") + self.logger.critical("Critical message (should be Magenta)") def get_logger(self): return self.logger @@ -71,9 +42,9 @@ if __name__ == "__main__": parser.add_argument('--debug', action='store_true') args = parser.parse_args() - logger = Logger("test", "logs") - logger.setup_from_args(args) - test_logger = logger.get_logger() + logger_instance = Logger("test", "logs") + logger_instance.setup_from_args(args) + test_logger = logger_instance.get_logger() print("FORCE_COLOR:", os.environ.get('FORCE_COLOR')) print("NO_COLOR:", os.environ.get('NO_COLOR')) @@ -85,4 +56,4 @@ if __name__ == "__main__": test_logger.info("This is an info message") test_logger.warning("This is a warning message") test_logger.error("This is an error message") - test_logger.critical("This is a critical message") \ No newline at end of file + test_logger.critical("This is a critical message") diff --git a/sijapi/routers/asr.py b/sijapi/routers/asr.py index f7f157d..10a14eb 100644 --- a/sijapi/routers/asr.py +++ b/sijapi/routers/asr.py @@ -1,67 +1,50 @@ -''' -Automatic Speech Recognition module relying on the `whisper_cpp` implementation of OpenAI's Whisper model. -Depends on: - LOGGER, ASR_DIR, WHISPER_CPP_MODELS, GARBAGE_COLLECTION_INTERVAL, GARBAGE_TTL, WHISPER_CPP_DIR -Notes: - Performs exceptionally well on Apple Silicon. Other devices will benefit from future updates to optionally use `faster_whisper`, `insanely_faster_whisper`, and/or `whisper_jax`. -''' - -from fastapi import APIRouter, HTTPException, Form, UploadFile, File +import os +import sys +import uuid +import json +import asyncio +import tempfile +import subprocess +from urllib.parse import unquote +from fastapi import APIRouter, HTTPException, Form, UploadFile, File, BackgroundTasks +from fastapi.responses import JSONResponse from pydantic import BaseModel, Field from typing import Optional -import tempfile -from fastapi.responses import JSONResponse, FileResponse -from pydantic import BaseModel, HttpUrl -from whisperplus.pipelines import mlx_whisper -from youtube_dl import YoutubeDL -from urllib.parse import unquote -import subprocess -import os -import uuid -from threading import Thread -import multiprocessing -import asyncio -import subprocess -import tempfile -from sijapi import DEBUG, INFO, WARN, ERR, CRITICAL, ASR_DIR, WHISPER_CPP_MODELS, GARBAGE_COLLECTION_INTERVAL, GARBAGE_TTL, WHISPER_CPP_DIR, MAX_CPU_CORES +from sijapi import DEBUG, INFO, WARN, ERR, CRITICAL, ASR_DIR, WHISPER_CPP_MODELS, GARBAGE_COLLECTION_INTERVAL, GARBAGE_TTL, WHISPER_CPP_DIR, MAX_CPU_CORES asr = APIRouter() class TranscribeParams(BaseModel): model: str = Field(default="small") - output_srt : Optional[bool] = Field(default=False) - language : Optional[str] = Field(None) - split_on_word : Optional[bool] = Field(default=False) - temperature : Optional[float] = Field(default=0) - temp_increment : Optional[int] = Field(None) - translate : Optional[bool] = Field(default=False) - diarize : Optional[bool] = Field(default=False) - tiny_diarize : Optional[bool] = Field(default=False) - no_fallback : Optional[bool] = Field(default=False) - output_json : Optional[bool] = Field(default=False) - detect_language : Optional[bool] = Field(default=False) - dtw : Optional[str] = Field(None) - threads : Optional[int] = Field(None) + output_srt: Optional[bool] = Field(default=False) + language: Optional[str] = Field(None) + split_on_word: Optional[bool] = Field(default=False) + temperature: Optional[float] = Field(default=0) + temp_increment: Optional[int] = Field(None) + translate: Optional[bool] = Field(default=False) + diarize: Optional[bool] = Field(default=False) + tiny_diarize: Optional[bool] = Field(default=False) + no_fallback: Optional[bool] = Field(default=False) + output_json: Optional[bool] = Field(default=False) + detect_language: Optional[bool] = Field(default=False) + dtw: Optional[str] = Field(None) + threads: Optional[int] = Field(None) -from urllib.parse import unquote -import json +# Global dictionary to store transcription results +transcription_results = {} @asr.post("/asr") @asr.post("/transcribe") @asr.post("/v1/audio/transcription") async def transcribe_endpoint( + background_tasks: BackgroundTasks, file: UploadFile = File(...), params: str = Form(...) ): try: - # Decode the URL-encoded string decoded_params = unquote(params) - - # Parse the JSON string parameters_dict = json.loads(decoded_params) - - # Create TranscribeParams object parameters = TranscribeParams(**parameters_dict) except json.JSONDecodeError as json_err: raise HTTPException(status_code=400, detail=f"Invalid JSON: {str(json_err)}") @@ -72,12 +55,30 @@ async def transcribe_endpoint( temp_file.write(await file.read()) temp_file_path = temp_file.name - transcription = await transcribe_audio(file_path=temp_file_path, params=parameters) - return transcription + transcription_job = await transcribe_audio(file_path=temp_file_path, params=parameters, background_tasks=background_tasks) + job_id = transcription_job["job_id"] -async def transcribe_audio(file_path, params: TranscribeParams): + # Poll for completion + max_wait_time = 600 # 10 minutes + poll_interval = 2 # 2 seconds + elapsed_time = 0 - file_path = convert_to_wav(file_path) + while elapsed_time < max_wait_time: + if job_id in transcription_results: + result = transcription_results[job_id] + if result["status"] == "completed": + return JSONResponse(content={"status": "completed", "result": result["result"]}) + elif result["status"] == "failed": + return JSONResponse(content={"status": "failed", "error": result["error"]}, status_code=500) + + await asyncio.sleep(poll_interval) + elapsed_time += poll_interval + + # If we've reached this point, the transcription has taken too long + return JSONResponse(content={"status": "timeout", "message": "Transcription is taking longer than expected. Please check back later."}, status_code=202) + +async def transcribe_audio(file_path, params: TranscribeParams, background_tasks: BackgroundTasks): + file_path = await convert_to_wav(file_path) model = params.model if params.model in WHISPER_CPP_MODELS else 'small' model_path = WHISPER_CPP_DIR / 'models' / f'ggml-{model}.bin' command = [str(WHISPER_CPP_DIR / 'build' / 'bin' / 'main')] @@ -115,35 +116,50 @@ async def transcribe_audio(file_path, params: TranscribeParams): command.extend(['-f', file_path]) DEBUG(f"Command: {command}") + + # Create a unique ID for this transcription job + job_id = str(uuid.uuid4()) + + # Store the job status + transcription_results[job_id] = {"status": "processing", "result": None} + + # Run the transcription in a background task + background_tasks.add_task(process_transcription, command, file_path, job_id) + + return {"job_id": job_id} + +async def process_transcription(command, file_path, job_id): + try: + result = await run_transcription(command, file_path) + transcription_results[job_id] = {"status": "completed", "result": result} + except Exception as e: + transcription_results[job_id] = {"status": "failed", "error": str(e)} + finally: + # Clean up the temporary file + os.remove(file_path) + +async def run_transcription(command, file_path): proc = await asyncio.create_subprocess_exec( *command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await proc.communicate() - if proc.returncode != 0: raise Exception(f"Error running command: {stderr.decode()}") - - result = stdout.decode().strip() - DEBUG(f"Result: {result}") - return result + return stdout.decode().strip() - -def convert_to_wav(file_path: str): +async def convert_to_wav(file_path: str): wav_file_path = os.path.join(ASR_DIR, f"{uuid.uuid4()}.wav") - subprocess.run(["ffmpeg", "-y", "-i", file_path, "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", wav_file_path], check=True) + proc = await asyncio.create_subprocess_exec( + "ffmpeg", "-y", "-i", file_path, "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", wav_file_path, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + stdout, stderr = await proc.communicate() + if proc.returncode != 0: + raise Exception(f"Error converting file to WAV: {stderr.decode()}") return wav_file_path -def download_from_youtube(url: str): - temp_file = os.path.join(ASR_DIR, f"{uuid.uuid4()}.mp3") - ytdl_opts = { - 'outtmpl': temp_file, - 'postprocessors': [{'key': 'FFmpegExtractAudio', 'preferredcodec': 'mp3', 'preferredquality': '192'}], - 'nooverwrites': True - } - with YoutubeDL(ytdl_opts) as ydl: - ydl.download([url]) - return convert_to_wav(temp_file) def format_srt_timestamp(seconds: float): milliseconds = round(seconds * 1000.0) diff --git a/sijapi/routers/email.py b/sijapi/routers/email.py index ca27520..4dafbb0 100644 --- a/sijapi/routers/email.py +++ b/sijapi/routers/email.py @@ -15,21 +15,16 @@ from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from email.mime.image import MIMEImage import ssl -from datetime import datetime as dt_datetime -from pydantic import BaseModel -from typing import List, Optional, Any import yaml -from typing import List, Dict, Optional -from pydantic import BaseModel -from sijapi import DEBUG, ERR, LLM_SYS_MSG +from typing import List, Dict, Optional, Set from datetime import datetime as dt_datetime -from typing import Dict from sijapi import DEBUG, INFO, WARN, ERR, CRITICAL -from sijapi import PODCAST_DIR, DEFAULT_VOICE, EMAIL_CONFIG +from sijapi import PODCAST_DIR, DEFAULT_VOICE, EMAIL_CONFIG, EMAIL_LOGS from sijapi.routers import tts, llm, sd, locate from sijapi.utilities import clean_text, assemble_journal_path, extract_text, prefix_lines from sijapi.classes import EmailAccount, IMAPConfig, SMTPConfig, IncomingEmail, EmailContact, AutoResponder - +from sijapi import DEBUG, INFO, ERR, LOGS_DIR +from sijapi.classes import EmailAccount email = APIRouter(tags=["private"]) @@ -76,8 +71,6 @@ def get_smtp_connection(account: EmailAccount): return SMTP(account.smtp.host, account.smtp.port) - - def get_matching_autoresponders(this_email: IncomingEmail, account: EmailAccount) -> List[AutoResponder]: def matches_list(item: str, this_email: IncomingEmail) -> bool: if '@' in item: @@ -161,50 +154,6 @@ async def extract_attachments(attachments) -> List[str]: return attachment_texts - -async def process_account(account: EmailAccount): - while True: - start_time = dt_datetime.now() - try: - DEBUG(f"Connecting to {account.name} to check for unread emails...") - with get_imap_connection(account) as inbox: - DEBUG(f"Connected to {account.name}, checking for unread emails now...") - unread_messages = inbox.messages(unread=True) - for uid, message in unread_messages: - recipients = [EmailContact(email=recipient['email'], name=recipient.get('name', '')) for recipient in message.sent_to] - localized_datetime = await locate.localize_datetime(message.date) - this_email = IncomingEmail( - sender=message.sent_from[0]['email'], - datetime_received=localized_datetime, - recipients=recipients, - subject=message.subject, - body=clean_email_content(message.body['html'][0]) if message.body['html'] else clean_email_content(message.body['plain'][0]) or "", - attachments=message.attachments - ) - DEBUG(f"\n\nProcessing email for account {account.name}: {this_email.subject}\n\n") - save_success = await save_email(this_email, account) - respond_success = await autorespond(this_email, account) - if save_success and respond_success: - inbox.mark_seen(uid) - except Exception as e: - ERR(f"An error occurred for account {account.name}: {e}") - - # Calculate the time taken for processing - processing_time = (dt_datetime.now() - start_time).total_seconds() - - # Calculate the remaining time to wait - wait_time = max(0, account.refresh - processing_time) - - # Wait for the remaining time - await asyncio.sleep(wait_time) - - -async def process_all_accounts(): - email_accounts = load_email_accounts(EMAIL_CONFIG) - tasks = [asyncio.create_task(process_account(account)) for account in email_accounts] - await asyncio.gather(*tasks) - - async def save_email(this_email: IncomingEmail, account: EmailAccount): try: md_path, md_relative = assemble_journal_path(this_email.datetime_received, "Emails", this_email.subject, ".md") @@ -262,6 +211,7 @@ tags: async def autorespond(this_email: IncomingEmail, account: EmailAccount): matching_profiles = get_matching_autoresponders(this_email, account) + DEBUG(f"Matching profiles: {matching_profiles}") for profile in matching_profiles: DEBUG(f"Auto-responding to {this_email.subject} with profile: {profile.name}") auto_response_subject = f"Auto-Response Re: {this_email.subject}" @@ -296,7 +246,84 @@ async def send_auto_response(to_email, subject, body, profile, account): ERR(f"Error in preparing/sending auto-response from account {account.name}: {e}") return False - + + + +async def load_processed_uids(filename: Path) -> Set[str]: + if filename.exists(): + with open(filename, 'r') as f: + return set(line.strip().split(':')[-1] for line in f) + return set() + +async def save_processed_uid(filename: Path, account_name: str, uid: str): + with open(filename, 'a') as f: + f.write(f"{account_name}:{uid}\n") + +async def process_account_summarization(account: EmailAccount): + summarized_log = EMAIL_LOGS / "summarized.txt" + while True: + try: + processed_uids = await load_processed_uids(summarized_log) + with get_imap_connection(account) as inbox: + unread_messages = inbox.messages(unread=True) + for uid, message in unread_messages: + uid_str = uid.decode() if isinstance(uid, bytes) else str(uid) + if uid_str not in processed_uids: + recipients = [EmailContact(email=recipient['email'], name=recipient.get('name', '')) for recipient in message.sent_to] + localized_datetime = await locate.localize_datetime(message.date) + this_email = IncomingEmail( + sender=message.sent_from[0]['email'], + datetime_received=localized_datetime, + recipients=recipients, + subject=message.subject, + body=clean_email_content(message.body['html'][0]) if message.body['html'] else clean_email_content(message.body['plain'][0]) or "", + attachments=message.attachments + ) + if account.summarize: + save_success = await save_email(this_email, account) + if save_success: + await save_processed_uid(summarized_log, account.name, uid_str) + DEBUG(f"Summarized email: {uid_str}") + except Exception as e: + ERR(f"An error occurred during summarization for account {account.name}: {e}") + + await asyncio.sleep(account.refresh) + +async def process_account_autoresponding(account: EmailAccount): + autoresponded_log = EMAIL_LOGS / "autoresponded.txt" + while True: + try: + processed_uids = await load_processed_uids(autoresponded_log) + with get_imap_connection(account) as inbox: + unread_messages = inbox.messages(unread=True) + for uid, message in unread_messages: + uid_str = uid.decode() if isinstance(uid, bytes) else str(uid) + if uid_str not in processed_uids: + recipients = [EmailContact(email=recipient['email'], name=recipient.get('name', '')) for recipient in message.sent_to] + localized_datetime = await locate.localize_datetime(message.date) + this_email = IncomingEmail( + sender=message.sent_from[0]['email'], + datetime_received=localized_datetime, + recipients=recipients, + subject=message.subject, + body=clean_email_content(message.body['html'][0]) if message.body['html'] else clean_email_content(message.body['plain'][0]) or "", + attachments=message.attachments + ) + respond_success = await autorespond(this_email, account) + if respond_success: + await save_processed_uid(autoresponded_log, account.name, uid_str) + DEBUG(f"Auto-responded to email: {uid_str}") + except Exception as e: + ERR(f"An error occurred during auto-responding for account {account.name}: {e}") + + await asyncio.sleep(account.refresh) + +async def process_all_accounts(): + email_accounts = load_email_accounts(EMAIL_CONFIG) + summarization_tasks = [asyncio.create_task(process_account_summarization(account)) for account in email_accounts] + autoresponding_tasks = [asyncio.create_task(process_account_autoresponding(account)) for account in email_accounts] + await asyncio.gather(*summarization_tasks, *autoresponding_tasks) + @email.on_event("startup") async def startup_event(): asyncio.create_task(process_all_accounts()) \ No newline at end of file