Auto-update: Wed Jun 26 10:02:40 PDT 2024
This commit is contained in:
parent
45059a9555
commit
04cd4c5bf2
5 changed files with 246 additions and 167 deletions
|
@ -161,7 +161,8 @@ ICS_PATH = DATA_DIR / 'calendar.ics' # deprecated now, but maybe revive?
|
|||
ICALENDARS = os.getenv('ICALENDARS', 'NULL,VOID').split(',')
|
||||
|
||||
EMAIL_CONFIG = CONFIG_DIR / "email.yaml"
|
||||
AUTORESPOND = True
|
||||
EMAIL_LOGS = LOGS_DIR / "email"
|
||||
os.makedirs(EMAIL_LOGS, exist_ok = True)
|
||||
|
||||
### Courtlistener & other webhooks
|
||||
COURTLISTENER_DOCKETS_DIR = DATA_DIR / "courtlistener" / "dockets"
|
||||
|
|
64
sijapi/helpers/log_prior_emails.py
Normal file
64
sijapi/helpers/log_prior_emails.py
Normal file
|
@ -0,0 +1,64 @@
|
|||
import asyncio
|
||||
from pathlib import Path
|
||||
from sijapi import DEBUG, INFO, ERR
|
||||
from sijapi import EMAIL_CONFIG, EMAIL_LOGS
|
||||
from sijapi.classes import EmailAccount
|
||||
from sijapi.routers import email
|
||||
|
||||
async def initialize_log_files():
|
||||
summarized_log = EMAIL_LOGS / "summarized.txt"
|
||||
autoresponded_log = EMAIL_LOGS / "autoresponded.txt"
|
||||
diagnostic_log = EMAIL_LOGS / "diagnostic.txt"
|
||||
for log_file in [summarized_log, autoresponded_log, diagnostic_log]:
|
||||
log_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
log_file.write_text("")
|
||||
DEBUG(f"Log files initialized: {summarized_log}, {autoresponded_log}, {diagnostic_log}")
|
||||
return summarized_log, autoresponded_log, diagnostic_log
|
||||
|
||||
async def process_all_emails(account: EmailAccount, summarized_log: Path, autoresponded_log: Path, diagnostic_log: Path):
|
||||
try:
|
||||
with email.get_imap_connection(account) as inbox:
|
||||
DEBUG(f"Connected to {account.name}, processing all emails...")
|
||||
all_messages = inbox.messages()
|
||||
unread_messages = set(uid for uid, _ in inbox.messages(unread=True))
|
||||
|
||||
processed_count = 0
|
||||
for identifier, message in all_messages:
|
||||
# Log diagnostic information
|
||||
with open(diagnostic_log, 'a') as f:
|
||||
f.write(f"Account: {account.name}, Raw Identifier: {identifier}, Type: {type(identifier)}\n")
|
||||
|
||||
# Attempt to get a string representation of the identifier
|
||||
if isinstance(identifier, bytes):
|
||||
id_str = identifier.decode()
|
||||
elif isinstance(identifier, (int, str)):
|
||||
id_str = str(identifier)
|
||||
else:
|
||||
id_str = repr(identifier)
|
||||
|
||||
if identifier not in unread_messages:
|
||||
processed_count += 1
|
||||
for log_file in [summarized_log, autoresponded_log]:
|
||||
with open(log_file, 'a') as f:
|
||||
f.write(f"{id_str}\n")
|
||||
|
||||
INFO(f"Processed {processed_count} non-unread emails for account {account.name}")
|
||||
except Exception as e:
|
||||
ERR(f"An error occurred while processing emails for account {account.name}: {e}")
|
||||
|
||||
async def main():
|
||||
email_accounts = email.load_email_accounts(EMAIL_CONFIG)
|
||||
summarized_log, autoresponded_log, diagnostic_log = await initialize_log_files()
|
||||
|
||||
DEBUG(f"Processing {len(email_accounts)} email accounts")
|
||||
|
||||
tasks = [process_all_emails(account, summarized_log, autoresponded_log, diagnostic_log) for account in email_accounts]
|
||||
await asyncio.gather(*tasks)
|
||||
|
||||
# Final verification
|
||||
with open(summarized_log, 'r') as f:
|
||||
final_count = len(f.readlines())
|
||||
INFO(f"Final non-unread email count: {final_count}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
|
@ -1,65 +1,36 @@
|
|||
import os
|
||||
import sys
|
||||
import logging
|
||||
from logging.handlers import RotatingFileHandler
|
||||
from colorama import Fore, Back, Style, init as colorama_init
|
||||
from loguru import logger
|
||||
import traceback
|
||||
|
||||
# Force colorama to initialize for the current platform
|
||||
colorama_init(autoreset=True, strip=False, convert=True)
|
||||
|
||||
class ColorFormatter(logging.Formatter):
|
||||
"""Custom formatter to add colors to log levels."""
|
||||
COLOR_MAP = {
|
||||
logging.DEBUG: Fore.CYAN,
|
||||
logging.INFO: Fore.GREEN,
|
||||
logging.WARNING: Fore.YELLOW,
|
||||
logging.ERROR: Fore.RED,
|
||||
logging.CRITICAL: Fore.MAGENTA + Back.WHITE,
|
||||
}
|
||||
|
||||
def format(self, record):
|
||||
log_message = super().format(record)
|
||||
color = self.COLOR_MAP.get(record.levelno, '')
|
||||
return f"{color}{log_message}{Style.RESET_ALL}"
|
||||
|
||||
class Logger:
|
||||
def __init__(self, name, logs_dir):
|
||||
self.logs_dir = logs_dir
|
||||
self.logger = logging.getLogger(name)
|
||||
self.logger.setLevel(logging.DEBUG)
|
||||
self.name = name
|
||||
self.logger = logger.bind(name=name)
|
||||
|
||||
def setup_from_args(self, args):
|
||||
if not os.path.exists(self.logs_dir):
|
||||
os.makedirs(self.logs_dir)
|
||||
|
||||
# Remove default logger
|
||||
logger.remove()
|
||||
|
||||
# File handler
|
||||
handler_path = os.path.join(self.logs_dir, 'app.log')
|
||||
file_handler = RotatingFileHandler(handler_path, maxBytes=2000000, backupCount=10)
|
||||
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
|
||||
file_handler.setLevel(logging.DEBUG)
|
||||
logger.add(handler_path, rotation="2 MB", compression="zip", level="DEBUG", format="{time:YYYY-MM-DD HH:mm:ss} - {name} - {level} - {message}")
|
||||
|
||||
# Console handler
|
||||
console_handler = logging.StreamHandler(sys.stdout) # Explicitly use sys.stdout
|
||||
console_formatter = ColorFormatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
||||
console_handler.setFormatter(console_formatter)
|
||||
|
||||
# Set console handler level based on args
|
||||
if args.debug:
|
||||
console_handler.setLevel(logging.DEBUG)
|
||||
else:
|
||||
console_handler.setLevel(logging.INFO)
|
||||
|
||||
# Add handlers to logger
|
||||
self.logger.addHandler(file_handler)
|
||||
self.logger.addHandler(console_handler)
|
||||
log_format = "<cyan>{time:YYYY-MM-DD HH:mm:ss}</cyan> - <cyan>{name}</cyan> - <level>{level: <8}</level> - <level>{message}</level>"
|
||||
console_level = "DEBUG" if args.debug else "INFO"
|
||||
logger.add(sys.stdout, format=log_format, level=console_level, colorize=True)
|
||||
|
||||
# Test color output
|
||||
self.logger.debug("Debug message (should be Cyan)")
|
||||
self.logger.info("Info message (should be Green)")
|
||||
self.logger.warning("Warning message (should be Yellow)")
|
||||
self.logger.error("Error message (should be Red)")
|
||||
self.logger.critical("Critical message (should be Magenta on White)")
|
||||
self.logger.critical("Critical message (should be Magenta)")
|
||||
|
||||
def get_logger(self):
|
||||
return self.logger
|
||||
|
@ -71,9 +42,9 @@ if __name__ == "__main__":
|
|||
parser.add_argument('--debug', action='store_true')
|
||||
args = parser.parse_args()
|
||||
|
||||
logger = Logger("test", "logs")
|
||||
logger.setup_from_args(args)
|
||||
test_logger = logger.get_logger()
|
||||
logger_instance = Logger("test", "logs")
|
||||
logger_instance.setup_from_args(args)
|
||||
test_logger = logger_instance.get_logger()
|
||||
|
||||
print("FORCE_COLOR:", os.environ.get('FORCE_COLOR'))
|
||||
print("NO_COLOR:", os.environ.get('NO_COLOR'))
|
||||
|
|
|
@ -1,67 +1,50 @@
|
|||
'''
|
||||
Automatic Speech Recognition module relying on the `whisper_cpp` implementation of OpenAI's Whisper model.
|
||||
Depends on:
|
||||
LOGGER, ASR_DIR, WHISPER_CPP_MODELS, GARBAGE_COLLECTION_INTERVAL, GARBAGE_TTL, WHISPER_CPP_DIR
|
||||
Notes:
|
||||
Performs exceptionally well on Apple Silicon. Other devices will benefit from future updates to optionally use `faster_whisper`, `insanely_faster_whisper`, and/or `whisper_jax`.
|
||||
'''
|
||||
|
||||
from fastapi import APIRouter, HTTPException, Form, UploadFile, File
|
||||
import os
|
||||
import sys
|
||||
import uuid
|
||||
import json
|
||||
import asyncio
|
||||
import tempfile
|
||||
import subprocess
|
||||
from urllib.parse import unquote
|
||||
from fastapi import APIRouter, HTTPException, Form, UploadFile, File, BackgroundTasks
|
||||
from fastapi.responses import JSONResponse
|
||||
from pydantic import BaseModel, Field
|
||||
from typing import Optional
|
||||
import tempfile
|
||||
from fastapi.responses import JSONResponse, FileResponse
|
||||
from pydantic import BaseModel, HttpUrl
|
||||
from whisperplus.pipelines import mlx_whisper
|
||||
from youtube_dl import YoutubeDL
|
||||
from urllib.parse import unquote
|
||||
import subprocess
|
||||
import os
|
||||
import uuid
|
||||
from threading import Thread
|
||||
import multiprocessing
|
||||
import asyncio
|
||||
import subprocess
|
||||
import tempfile
|
||||
from sijapi import DEBUG, INFO, WARN, ERR, CRITICAL, ASR_DIR, WHISPER_CPP_MODELS, GARBAGE_COLLECTION_INTERVAL, GARBAGE_TTL, WHISPER_CPP_DIR, MAX_CPU_CORES
|
||||
|
||||
from sijapi import DEBUG, INFO, WARN, ERR, CRITICAL, ASR_DIR, WHISPER_CPP_MODELS, GARBAGE_COLLECTION_INTERVAL, GARBAGE_TTL, WHISPER_CPP_DIR, MAX_CPU_CORES
|
||||
|
||||
asr = APIRouter()
|
||||
|
||||
class TranscribeParams(BaseModel):
|
||||
model: str = Field(default="small")
|
||||
output_srt : Optional[bool] = Field(default=False)
|
||||
language : Optional[str] = Field(None)
|
||||
split_on_word : Optional[bool] = Field(default=False)
|
||||
temperature : Optional[float] = Field(default=0)
|
||||
temp_increment : Optional[int] = Field(None)
|
||||
translate : Optional[bool] = Field(default=False)
|
||||
diarize : Optional[bool] = Field(default=False)
|
||||
tiny_diarize : Optional[bool] = Field(default=False)
|
||||
no_fallback : Optional[bool] = Field(default=False)
|
||||
output_json : Optional[bool] = Field(default=False)
|
||||
detect_language : Optional[bool] = Field(default=False)
|
||||
dtw : Optional[str] = Field(None)
|
||||
threads : Optional[int] = Field(None)
|
||||
output_srt: Optional[bool] = Field(default=False)
|
||||
language: Optional[str] = Field(None)
|
||||
split_on_word: Optional[bool] = Field(default=False)
|
||||
temperature: Optional[float] = Field(default=0)
|
||||
temp_increment: Optional[int] = Field(None)
|
||||
translate: Optional[bool] = Field(default=False)
|
||||
diarize: Optional[bool] = Field(default=False)
|
||||
tiny_diarize: Optional[bool] = Field(default=False)
|
||||
no_fallback: Optional[bool] = Field(default=False)
|
||||
output_json: Optional[bool] = Field(default=False)
|
||||
detect_language: Optional[bool] = Field(default=False)
|
||||
dtw: Optional[str] = Field(None)
|
||||
threads: Optional[int] = Field(None)
|
||||
|
||||
from urllib.parse import unquote
|
||||
import json
|
||||
# Global dictionary to store transcription results
|
||||
transcription_results = {}
|
||||
|
||||
@asr.post("/asr")
|
||||
@asr.post("/transcribe")
|
||||
@asr.post("/v1/audio/transcription")
|
||||
async def transcribe_endpoint(
|
||||
background_tasks: BackgroundTasks,
|
||||
file: UploadFile = File(...),
|
||||
params: str = Form(...)
|
||||
):
|
||||
try:
|
||||
# Decode the URL-encoded string
|
||||
decoded_params = unquote(params)
|
||||
|
||||
# Parse the JSON string
|
||||
parameters_dict = json.loads(decoded_params)
|
||||
|
||||
# Create TranscribeParams object
|
||||
parameters = TranscribeParams(**parameters_dict)
|
||||
except json.JSONDecodeError as json_err:
|
||||
raise HTTPException(status_code=400, detail=f"Invalid JSON: {str(json_err)}")
|
||||
|
@ -72,12 +55,30 @@ async def transcribe_endpoint(
|
|||
temp_file.write(await file.read())
|
||||
temp_file_path = temp_file.name
|
||||
|
||||
transcription = await transcribe_audio(file_path=temp_file_path, params=parameters)
|
||||
return transcription
|
||||
transcription_job = await transcribe_audio(file_path=temp_file_path, params=parameters, background_tasks=background_tasks)
|
||||
job_id = transcription_job["job_id"]
|
||||
|
||||
async def transcribe_audio(file_path, params: TranscribeParams):
|
||||
# Poll for completion
|
||||
max_wait_time = 600 # 10 minutes
|
||||
poll_interval = 2 # 2 seconds
|
||||
elapsed_time = 0
|
||||
|
||||
file_path = convert_to_wav(file_path)
|
||||
while elapsed_time < max_wait_time:
|
||||
if job_id in transcription_results:
|
||||
result = transcription_results[job_id]
|
||||
if result["status"] == "completed":
|
||||
return JSONResponse(content={"status": "completed", "result": result["result"]})
|
||||
elif result["status"] == "failed":
|
||||
return JSONResponse(content={"status": "failed", "error": result["error"]}, status_code=500)
|
||||
|
||||
await asyncio.sleep(poll_interval)
|
||||
elapsed_time += poll_interval
|
||||
|
||||
# If we've reached this point, the transcription has taken too long
|
||||
return JSONResponse(content={"status": "timeout", "message": "Transcription is taking longer than expected. Please check back later."}, status_code=202)
|
||||
|
||||
async def transcribe_audio(file_path, params: TranscribeParams, background_tasks: BackgroundTasks):
|
||||
file_path = await convert_to_wav(file_path)
|
||||
model = params.model if params.model in WHISPER_CPP_MODELS else 'small'
|
||||
model_path = WHISPER_CPP_DIR / 'models' / f'ggml-{model}.bin'
|
||||
command = [str(WHISPER_CPP_DIR / 'build' / 'bin' / 'main')]
|
||||
|
@ -115,35 +116,50 @@ async def transcribe_audio(file_path, params: TranscribeParams):
|
|||
command.extend(['-f', file_path])
|
||||
|
||||
DEBUG(f"Command: {command}")
|
||||
|
||||
# Create a unique ID for this transcription job
|
||||
job_id = str(uuid.uuid4())
|
||||
|
||||
# Store the job status
|
||||
transcription_results[job_id] = {"status": "processing", "result": None}
|
||||
|
||||
# Run the transcription in a background task
|
||||
background_tasks.add_task(process_transcription, command, file_path, job_id)
|
||||
|
||||
return {"job_id": job_id}
|
||||
|
||||
async def process_transcription(command, file_path, job_id):
|
||||
try:
|
||||
result = await run_transcription(command, file_path)
|
||||
transcription_results[job_id] = {"status": "completed", "result": result}
|
||||
except Exception as e:
|
||||
transcription_results[job_id] = {"status": "failed", "error": str(e)}
|
||||
finally:
|
||||
# Clean up the temporary file
|
||||
os.remove(file_path)
|
||||
|
||||
async def run_transcription(command, file_path):
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
*command,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE
|
||||
)
|
||||
stdout, stderr = await proc.communicate()
|
||||
|
||||
if proc.returncode != 0:
|
||||
raise Exception(f"Error running command: {stderr.decode()}")
|
||||
return stdout.decode().strip()
|
||||
|
||||
result = stdout.decode().strip()
|
||||
DEBUG(f"Result: {result}")
|
||||
return result
|
||||
|
||||
|
||||
def convert_to_wav(file_path: str):
|
||||
async def convert_to_wav(file_path: str):
|
||||
wav_file_path = os.path.join(ASR_DIR, f"{uuid.uuid4()}.wav")
|
||||
subprocess.run(["ffmpeg", "-y", "-i", file_path, "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", wav_file_path], check=True)
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
"ffmpeg", "-y", "-i", file_path, "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", wav_file_path,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE
|
||||
)
|
||||
stdout, stderr = await proc.communicate()
|
||||
if proc.returncode != 0:
|
||||
raise Exception(f"Error converting file to WAV: {stderr.decode()}")
|
||||
return wav_file_path
|
||||
def download_from_youtube(url: str):
|
||||
temp_file = os.path.join(ASR_DIR, f"{uuid.uuid4()}.mp3")
|
||||
ytdl_opts = {
|
||||
'outtmpl': temp_file,
|
||||
'postprocessors': [{'key': 'FFmpegExtractAudio', 'preferredcodec': 'mp3', 'preferredquality': '192'}],
|
||||
'nooverwrites': True
|
||||
}
|
||||
with YoutubeDL(ytdl_opts) as ydl:
|
||||
ydl.download([url])
|
||||
return convert_to_wav(temp_file)
|
||||
|
||||
def format_srt_timestamp(seconds: float):
|
||||
milliseconds = round(seconds * 1000.0)
|
||||
|
|
|
@ -15,21 +15,16 @@ from email.mime.multipart import MIMEMultipart
|
|||
from email.mime.text import MIMEText
|
||||
from email.mime.image import MIMEImage
|
||||
import ssl
|
||||
from datetime import datetime as dt_datetime
|
||||
from pydantic import BaseModel
|
||||
from typing import List, Optional, Any
|
||||
import yaml
|
||||
from typing import List, Dict, Optional
|
||||
from pydantic import BaseModel
|
||||
from sijapi import DEBUG, ERR, LLM_SYS_MSG
|
||||
from typing import List, Dict, Optional, Set
|
||||
from datetime import datetime as dt_datetime
|
||||
from typing import Dict
|
||||
from sijapi import DEBUG, INFO, WARN, ERR, CRITICAL
|
||||
from sijapi import PODCAST_DIR, DEFAULT_VOICE, EMAIL_CONFIG
|
||||
from sijapi import PODCAST_DIR, DEFAULT_VOICE, EMAIL_CONFIG, EMAIL_LOGS
|
||||
from sijapi.routers import tts, llm, sd, locate
|
||||
from sijapi.utilities import clean_text, assemble_journal_path, extract_text, prefix_lines
|
||||
from sijapi.classes import EmailAccount, IMAPConfig, SMTPConfig, IncomingEmail, EmailContact, AutoResponder
|
||||
|
||||
from sijapi import DEBUG, INFO, ERR, LOGS_DIR
|
||||
from sijapi.classes import EmailAccount
|
||||
|
||||
email = APIRouter(tags=["private"])
|
||||
|
||||
|
@ -76,8 +71,6 @@ def get_smtp_connection(account: EmailAccount):
|
|||
return SMTP(account.smtp.host, account.smtp.port)
|
||||
|
||||
|
||||
|
||||
|
||||
def get_matching_autoresponders(this_email: IncomingEmail, account: EmailAccount) -> List[AutoResponder]:
|
||||
def matches_list(item: str, this_email: IncomingEmail) -> bool:
|
||||
if '@' in item:
|
||||
|
@ -161,50 +154,6 @@ async def extract_attachments(attachments) -> List[str]:
|
|||
return attachment_texts
|
||||
|
||||
|
||||
|
||||
async def process_account(account: EmailAccount):
|
||||
while True:
|
||||
start_time = dt_datetime.now()
|
||||
try:
|
||||
DEBUG(f"Connecting to {account.name} to check for unread emails...")
|
||||
with get_imap_connection(account) as inbox:
|
||||
DEBUG(f"Connected to {account.name}, checking for unread emails now...")
|
||||
unread_messages = inbox.messages(unread=True)
|
||||
for uid, message in unread_messages:
|
||||
recipients = [EmailContact(email=recipient['email'], name=recipient.get('name', '')) for recipient in message.sent_to]
|
||||
localized_datetime = await locate.localize_datetime(message.date)
|
||||
this_email = IncomingEmail(
|
||||
sender=message.sent_from[0]['email'],
|
||||
datetime_received=localized_datetime,
|
||||
recipients=recipients,
|
||||
subject=message.subject,
|
||||
body=clean_email_content(message.body['html'][0]) if message.body['html'] else clean_email_content(message.body['plain'][0]) or "",
|
||||
attachments=message.attachments
|
||||
)
|
||||
DEBUG(f"\n\nProcessing email for account {account.name}: {this_email.subject}\n\n")
|
||||
save_success = await save_email(this_email, account)
|
||||
respond_success = await autorespond(this_email, account)
|
||||
if save_success and respond_success:
|
||||
inbox.mark_seen(uid)
|
||||
except Exception as e:
|
||||
ERR(f"An error occurred for account {account.name}: {e}")
|
||||
|
||||
# Calculate the time taken for processing
|
||||
processing_time = (dt_datetime.now() - start_time).total_seconds()
|
||||
|
||||
# Calculate the remaining time to wait
|
||||
wait_time = max(0, account.refresh - processing_time)
|
||||
|
||||
# Wait for the remaining time
|
||||
await asyncio.sleep(wait_time)
|
||||
|
||||
|
||||
async def process_all_accounts():
|
||||
email_accounts = load_email_accounts(EMAIL_CONFIG)
|
||||
tasks = [asyncio.create_task(process_account(account)) for account in email_accounts]
|
||||
await asyncio.gather(*tasks)
|
||||
|
||||
|
||||
async def save_email(this_email: IncomingEmail, account: EmailAccount):
|
||||
try:
|
||||
md_path, md_relative = assemble_journal_path(this_email.datetime_received, "Emails", this_email.subject, ".md")
|
||||
|
@ -262,6 +211,7 @@ tags:
|
|||
|
||||
async def autorespond(this_email: IncomingEmail, account: EmailAccount):
|
||||
matching_profiles = get_matching_autoresponders(this_email, account)
|
||||
DEBUG(f"Matching profiles: {matching_profiles}")
|
||||
for profile in matching_profiles:
|
||||
DEBUG(f"Auto-responding to {this_email.subject} with profile: {profile.name}")
|
||||
auto_response_subject = f"Auto-Response Re: {this_email.subject}"
|
||||
|
@ -297,6 +247,83 @@ async def send_auto_response(to_email, subject, body, profile, account):
|
|||
return False
|
||||
|
||||
|
||||
|
||||
|
||||
async def load_processed_uids(filename: Path) -> Set[str]:
|
||||
if filename.exists():
|
||||
with open(filename, 'r') as f:
|
||||
return set(line.strip().split(':')[-1] for line in f)
|
||||
return set()
|
||||
|
||||
async def save_processed_uid(filename: Path, account_name: str, uid: str):
|
||||
with open(filename, 'a') as f:
|
||||
f.write(f"{account_name}:{uid}\n")
|
||||
|
||||
async def process_account_summarization(account: EmailAccount):
|
||||
summarized_log = EMAIL_LOGS / "summarized.txt"
|
||||
while True:
|
||||
try:
|
||||
processed_uids = await load_processed_uids(summarized_log)
|
||||
with get_imap_connection(account) as inbox:
|
||||
unread_messages = inbox.messages(unread=True)
|
||||
for uid, message in unread_messages:
|
||||
uid_str = uid.decode() if isinstance(uid, bytes) else str(uid)
|
||||
if uid_str not in processed_uids:
|
||||
recipients = [EmailContact(email=recipient['email'], name=recipient.get('name', '')) for recipient in message.sent_to]
|
||||
localized_datetime = await locate.localize_datetime(message.date)
|
||||
this_email = IncomingEmail(
|
||||
sender=message.sent_from[0]['email'],
|
||||
datetime_received=localized_datetime,
|
||||
recipients=recipients,
|
||||
subject=message.subject,
|
||||
body=clean_email_content(message.body['html'][0]) if message.body['html'] else clean_email_content(message.body['plain'][0]) or "",
|
||||
attachments=message.attachments
|
||||
)
|
||||
if account.summarize:
|
||||
save_success = await save_email(this_email, account)
|
||||
if save_success:
|
||||
await save_processed_uid(summarized_log, account.name, uid_str)
|
||||
DEBUG(f"Summarized email: {uid_str}")
|
||||
except Exception as e:
|
||||
ERR(f"An error occurred during summarization for account {account.name}: {e}")
|
||||
|
||||
await asyncio.sleep(account.refresh)
|
||||
|
||||
async def process_account_autoresponding(account: EmailAccount):
|
||||
autoresponded_log = EMAIL_LOGS / "autoresponded.txt"
|
||||
while True:
|
||||
try:
|
||||
processed_uids = await load_processed_uids(autoresponded_log)
|
||||
with get_imap_connection(account) as inbox:
|
||||
unread_messages = inbox.messages(unread=True)
|
||||
for uid, message in unread_messages:
|
||||
uid_str = uid.decode() if isinstance(uid, bytes) else str(uid)
|
||||
if uid_str not in processed_uids:
|
||||
recipients = [EmailContact(email=recipient['email'], name=recipient.get('name', '')) for recipient in message.sent_to]
|
||||
localized_datetime = await locate.localize_datetime(message.date)
|
||||
this_email = IncomingEmail(
|
||||
sender=message.sent_from[0]['email'],
|
||||
datetime_received=localized_datetime,
|
||||
recipients=recipients,
|
||||
subject=message.subject,
|
||||
body=clean_email_content(message.body['html'][0]) if message.body['html'] else clean_email_content(message.body['plain'][0]) or "",
|
||||
attachments=message.attachments
|
||||
)
|
||||
respond_success = await autorespond(this_email, account)
|
||||
if respond_success:
|
||||
await save_processed_uid(autoresponded_log, account.name, uid_str)
|
||||
DEBUG(f"Auto-responded to email: {uid_str}")
|
||||
except Exception as e:
|
||||
ERR(f"An error occurred during auto-responding for account {account.name}: {e}")
|
||||
|
||||
await asyncio.sleep(account.refresh)
|
||||
|
||||
async def process_all_accounts():
|
||||
email_accounts = load_email_accounts(EMAIL_CONFIG)
|
||||
summarization_tasks = [asyncio.create_task(process_account_summarization(account)) for account in email_accounts]
|
||||
autoresponding_tasks = [asyncio.create_task(process_account_autoresponding(account)) for account in email_accounts]
|
||||
await asyncio.gather(*summarization_tasks, *autoresponding_tasks)
|
||||
|
||||
@email.on_event("startup")
|
||||
async def startup_event():
|
||||
asyncio.create_task(process_all_accounts())
|
Loading…
Reference in a new issue