From 04cd4c5bf27d1818e10acb72dd4e39e41d5ce96b Mon Sep 17 00:00:00 2001
From: sanj <67624670+iodrift@users.noreply.github.com>
Date: Wed, 26 Jun 2024 10:02:40 -0700
Subject: [PATCH] Auto-update: Wed Jun 26 10:02:40 PDT 2024
---
sijapi/__init__.py | 3 +-
sijapi/helpers/log_prior_emails.py | 64 +++++++++++++
sijapi/logs.py | 59 +++---------
sijapi/routers/asr.py | 148 ++++++++++++++++-------------
sijapi/routers/email.py | 139 ++++++++++++++++-----------
5 files changed, 246 insertions(+), 167 deletions(-)
create mode 100644 sijapi/helpers/log_prior_emails.py
diff --git a/sijapi/__init__.py b/sijapi/__init__.py
index 254fba5..973e44c 100644
--- a/sijapi/__init__.py
+++ b/sijapi/__init__.py
@@ -161,7 +161,8 @@ ICS_PATH = DATA_DIR / 'calendar.ics' # deprecated now, but maybe revive?
ICALENDARS = os.getenv('ICALENDARS', 'NULL,VOID').split(',')
EMAIL_CONFIG = CONFIG_DIR / "email.yaml"
-AUTORESPOND = True
+EMAIL_LOGS = LOGS_DIR / "email"
+os.makedirs(EMAIL_LOGS, exist_ok = True)
### Courtlistener & other webhooks
COURTLISTENER_DOCKETS_DIR = DATA_DIR / "courtlistener" / "dockets"
diff --git a/sijapi/helpers/log_prior_emails.py b/sijapi/helpers/log_prior_emails.py
new file mode 100644
index 0000000..1be6267
--- /dev/null
+++ b/sijapi/helpers/log_prior_emails.py
@@ -0,0 +1,64 @@
+import asyncio
+from pathlib import Path
+from sijapi import DEBUG, INFO, ERR
+from sijapi import EMAIL_CONFIG, EMAIL_LOGS
+from sijapi.classes import EmailAccount
+from sijapi.routers import email
+
+async def initialize_log_files():
+ summarized_log = EMAIL_LOGS / "summarized.txt"
+ autoresponded_log = EMAIL_LOGS / "autoresponded.txt"
+ diagnostic_log = EMAIL_LOGS / "diagnostic.txt"
+ for log_file in [summarized_log, autoresponded_log, diagnostic_log]:
+ log_file.parent.mkdir(parents=True, exist_ok=True)
+ log_file.write_text("")
+ DEBUG(f"Log files initialized: {summarized_log}, {autoresponded_log}, {diagnostic_log}")
+ return summarized_log, autoresponded_log, diagnostic_log
+
+async def process_all_emails(account: EmailAccount, summarized_log: Path, autoresponded_log: Path, diagnostic_log: Path):
+ try:
+ with email.get_imap_connection(account) as inbox:
+ DEBUG(f"Connected to {account.name}, processing all emails...")
+ all_messages = inbox.messages()
+ unread_messages = set(uid for uid, _ in inbox.messages(unread=True))
+
+ processed_count = 0
+ for identifier, message in all_messages:
+ # Log diagnostic information
+ with open(diagnostic_log, 'a') as f:
+ f.write(f"Account: {account.name}, Raw Identifier: {identifier}, Type: {type(identifier)}\n")
+
+ # Attempt to get a string representation of the identifier
+ if isinstance(identifier, bytes):
+ id_str = identifier.decode()
+ elif isinstance(identifier, (int, str)):
+ id_str = str(identifier)
+ else:
+ id_str = repr(identifier)
+
+ if identifier not in unread_messages:
+ processed_count += 1
+ for log_file in [summarized_log, autoresponded_log]:
+ with open(log_file, 'a') as f:
+ f.write(f"{id_str}\n")
+
+ INFO(f"Processed {processed_count} non-unread emails for account {account.name}")
+ except Exception as e:
+ ERR(f"An error occurred while processing emails for account {account.name}: {e}")
+
+async def main():
+ email_accounts = email.load_email_accounts(EMAIL_CONFIG)
+ summarized_log, autoresponded_log, diagnostic_log = await initialize_log_files()
+
+ DEBUG(f"Processing {len(email_accounts)} email accounts")
+
+ tasks = [process_all_emails(account, summarized_log, autoresponded_log, diagnostic_log) for account in email_accounts]
+ await asyncio.gather(*tasks)
+
+ # Final verification
+ with open(summarized_log, 'r') as f:
+ final_count = len(f.readlines())
+ INFO(f"Final non-unread email count: {final_count}")
+
+if __name__ == "__main__":
+ asyncio.run(main())
\ No newline at end of file
diff --git a/sijapi/logs.py b/sijapi/logs.py
index c0ba953..dab689b 100644
--- a/sijapi/logs.py
+++ b/sijapi/logs.py
@@ -1,65 +1,36 @@
import os
import sys
-import logging
-from logging.handlers import RotatingFileHandler
-from colorama import Fore, Back, Style, init as colorama_init
+from loguru import logger
import traceback
-# Force colorama to initialize for the current platform
-colorama_init(autoreset=True, strip=False, convert=True)
-
-class ColorFormatter(logging.Formatter):
- """Custom formatter to add colors to log levels."""
- COLOR_MAP = {
- logging.DEBUG: Fore.CYAN,
- logging.INFO: Fore.GREEN,
- logging.WARNING: Fore.YELLOW,
- logging.ERROR: Fore.RED,
- logging.CRITICAL: Fore.MAGENTA + Back.WHITE,
- }
-
- def format(self, record):
- log_message = super().format(record)
- color = self.COLOR_MAP.get(record.levelno, '')
- return f"{color}{log_message}{Style.RESET_ALL}"
-
class Logger:
def __init__(self, name, logs_dir):
self.logs_dir = logs_dir
- self.logger = logging.getLogger(name)
- self.logger.setLevel(logging.DEBUG)
+ self.name = name
+ self.logger = logger.bind(name=name)
def setup_from_args(self, args):
if not os.path.exists(self.logs_dir):
os.makedirs(self.logs_dir)
+ # Remove default logger
+ logger.remove()
+
# File handler
handler_path = os.path.join(self.logs_dir, 'app.log')
- file_handler = RotatingFileHandler(handler_path, maxBytes=2000000, backupCount=10)
- file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
- file_handler.setLevel(logging.DEBUG)
+ logger.add(handler_path, rotation="2 MB", compression="zip", level="DEBUG", format="{time:YYYY-MM-DD HH:mm:ss} - {name} - {level} - {message}")
# Console handler
- console_handler = logging.StreamHandler(sys.stdout) # Explicitly use sys.stdout
- console_formatter = ColorFormatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- console_handler.setFormatter(console_formatter)
-
- # Set console handler level based on args
- if args.debug:
- console_handler.setLevel(logging.DEBUG)
- else:
- console_handler.setLevel(logging.INFO)
-
- # Add handlers to logger
- self.logger.addHandler(file_handler)
- self.logger.addHandler(console_handler)
+ log_format = "{time:YYYY-MM-DD HH:mm:ss} - {name} - {level: <8} - {message}"
+ console_level = "DEBUG" if args.debug else "INFO"
+ logger.add(sys.stdout, format=log_format, level=console_level, colorize=True)
# Test color output
self.logger.debug("Debug message (should be Cyan)")
self.logger.info("Info message (should be Green)")
self.logger.warning("Warning message (should be Yellow)")
self.logger.error("Error message (should be Red)")
- self.logger.critical("Critical message (should be Magenta on White)")
+ self.logger.critical("Critical message (should be Magenta)")
def get_logger(self):
return self.logger
@@ -71,9 +42,9 @@ if __name__ == "__main__":
parser.add_argument('--debug', action='store_true')
args = parser.parse_args()
- logger = Logger("test", "logs")
- logger.setup_from_args(args)
- test_logger = logger.get_logger()
+ logger_instance = Logger("test", "logs")
+ logger_instance.setup_from_args(args)
+ test_logger = logger_instance.get_logger()
print("FORCE_COLOR:", os.environ.get('FORCE_COLOR'))
print("NO_COLOR:", os.environ.get('NO_COLOR'))
@@ -85,4 +56,4 @@ if __name__ == "__main__":
test_logger.info("This is an info message")
test_logger.warning("This is a warning message")
test_logger.error("This is an error message")
- test_logger.critical("This is a critical message")
\ No newline at end of file
+ test_logger.critical("This is a critical message")
diff --git a/sijapi/routers/asr.py b/sijapi/routers/asr.py
index f7f157d..10a14eb 100644
--- a/sijapi/routers/asr.py
+++ b/sijapi/routers/asr.py
@@ -1,67 +1,50 @@
-'''
-Automatic Speech Recognition module relying on the `whisper_cpp` implementation of OpenAI's Whisper model.
-Depends on:
- LOGGER, ASR_DIR, WHISPER_CPP_MODELS, GARBAGE_COLLECTION_INTERVAL, GARBAGE_TTL, WHISPER_CPP_DIR
-Notes:
- Performs exceptionally well on Apple Silicon. Other devices will benefit from future updates to optionally use `faster_whisper`, `insanely_faster_whisper`, and/or `whisper_jax`.
-'''
-
-from fastapi import APIRouter, HTTPException, Form, UploadFile, File
+import os
+import sys
+import uuid
+import json
+import asyncio
+import tempfile
+import subprocess
+from urllib.parse import unquote
+from fastapi import APIRouter, HTTPException, Form, UploadFile, File, BackgroundTasks
+from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field
from typing import Optional
-import tempfile
-from fastapi.responses import JSONResponse, FileResponse
-from pydantic import BaseModel, HttpUrl
-from whisperplus.pipelines import mlx_whisper
-from youtube_dl import YoutubeDL
-from urllib.parse import unquote
-import subprocess
-import os
-import uuid
-from threading import Thread
-import multiprocessing
-import asyncio
-import subprocess
-import tempfile
-from sijapi import DEBUG, INFO, WARN, ERR, CRITICAL, ASR_DIR, WHISPER_CPP_MODELS, GARBAGE_COLLECTION_INTERVAL, GARBAGE_TTL, WHISPER_CPP_DIR, MAX_CPU_CORES
+from sijapi import DEBUG, INFO, WARN, ERR, CRITICAL, ASR_DIR, WHISPER_CPP_MODELS, GARBAGE_COLLECTION_INTERVAL, GARBAGE_TTL, WHISPER_CPP_DIR, MAX_CPU_CORES
asr = APIRouter()
class TranscribeParams(BaseModel):
model: str = Field(default="small")
- output_srt : Optional[bool] = Field(default=False)
- language : Optional[str] = Field(None)
- split_on_word : Optional[bool] = Field(default=False)
- temperature : Optional[float] = Field(default=0)
- temp_increment : Optional[int] = Field(None)
- translate : Optional[bool] = Field(default=False)
- diarize : Optional[bool] = Field(default=False)
- tiny_diarize : Optional[bool] = Field(default=False)
- no_fallback : Optional[bool] = Field(default=False)
- output_json : Optional[bool] = Field(default=False)
- detect_language : Optional[bool] = Field(default=False)
- dtw : Optional[str] = Field(None)
- threads : Optional[int] = Field(None)
+ output_srt: Optional[bool] = Field(default=False)
+ language: Optional[str] = Field(None)
+ split_on_word: Optional[bool] = Field(default=False)
+ temperature: Optional[float] = Field(default=0)
+ temp_increment: Optional[int] = Field(None)
+ translate: Optional[bool] = Field(default=False)
+ diarize: Optional[bool] = Field(default=False)
+ tiny_diarize: Optional[bool] = Field(default=False)
+ no_fallback: Optional[bool] = Field(default=False)
+ output_json: Optional[bool] = Field(default=False)
+ detect_language: Optional[bool] = Field(default=False)
+ dtw: Optional[str] = Field(None)
+ threads: Optional[int] = Field(None)
-from urllib.parse import unquote
-import json
+# Global dictionary to store transcription results
+transcription_results = {}
@asr.post("/asr")
@asr.post("/transcribe")
@asr.post("/v1/audio/transcription")
async def transcribe_endpoint(
+ background_tasks: BackgroundTasks,
file: UploadFile = File(...),
params: str = Form(...)
):
try:
- # Decode the URL-encoded string
decoded_params = unquote(params)
-
- # Parse the JSON string
parameters_dict = json.loads(decoded_params)
-
- # Create TranscribeParams object
parameters = TranscribeParams(**parameters_dict)
except json.JSONDecodeError as json_err:
raise HTTPException(status_code=400, detail=f"Invalid JSON: {str(json_err)}")
@@ -72,12 +55,30 @@ async def transcribe_endpoint(
temp_file.write(await file.read())
temp_file_path = temp_file.name
- transcription = await transcribe_audio(file_path=temp_file_path, params=parameters)
- return transcription
+ transcription_job = await transcribe_audio(file_path=temp_file_path, params=parameters, background_tasks=background_tasks)
+ job_id = transcription_job["job_id"]
-async def transcribe_audio(file_path, params: TranscribeParams):
+ # Poll for completion
+ max_wait_time = 600 # 10 minutes
+ poll_interval = 2 # 2 seconds
+ elapsed_time = 0
- file_path = convert_to_wav(file_path)
+ while elapsed_time < max_wait_time:
+ if job_id in transcription_results:
+ result = transcription_results[job_id]
+ if result["status"] == "completed":
+ return JSONResponse(content={"status": "completed", "result": result["result"]})
+ elif result["status"] == "failed":
+ return JSONResponse(content={"status": "failed", "error": result["error"]}, status_code=500)
+
+ await asyncio.sleep(poll_interval)
+ elapsed_time += poll_interval
+
+ # If we've reached this point, the transcription has taken too long
+ return JSONResponse(content={"status": "timeout", "message": "Transcription is taking longer than expected. Please check back later."}, status_code=202)
+
+async def transcribe_audio(file_path, params: TranscribeParams, background_tasks: BackgroundTasks):
+ file_path = await convert_to_wav(file_path)
model = params.model if params.model in WHISPER_CPP_MODELS else 'small'
model_path = WHISPER_CPP_DIR / 'models' / f'ggml-{model}.bin'
command = [str(WHISPER_CPP_DIR / 'build' / 'bin' / 'main')]
@@ -115,35 +116,50 @@ async def transcribe_audio(file_path, params: TranscribeParams):
command.extend(['-f', file_path])
DEBUG(f"Command: {command}")
+
+ # Create a unique ID for this transcription job
+ job_id = str(uuid.uuid4())
+
+ # Store the job status
+ transcription_results[job_id] = {"status": "processing", "result": None}
+
+ # Run the transcription in a background task
+ background_tasks.add_task(process_transcription, command, file_path, job_id)
+
+ return {"job_id": job_id}
+
+async def process_transcription(command, file_path, job_id):
+ try:
+ result = await run_transcription(command, file_path)
+ transcription_results[job_id] = {"status": "completed", "result": result}
+ except Exception as e:
+ transcription_results[job_id] = {"status": "failed", "error": str(e)}
+ finally:
+ # Clean up the temporary file
+ os.remove(file_path)
+
+async def run_transcription(command, file_path):
proc = await asyncio.create_subprocess_exec(
*command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
-
if proc.returncode != 0:
raise Exception(f"Error running command: {stderr.decode()}")
-
- result = stdout.decode().strip()
- DEBUG(f"Result: {result}")
- return result
+ return stdout.decode().strip()
-
-def convert_to_wav(file_path: str):
+async def convert_to_wav(file_path: str):
wav_file_path = os.path.join(ASR_DIR, f"{uuid.uuid4()}.wav")
- subprocess.run(["ffmpeg", "-y", "-i", file_path, "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", wav_file_path], check=True)
+ proc = await asyncio.create_subprocess_exec(
+ "ffmpeg", "-y", "-i", file_path, "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", wav_file_path,
+ stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.PIPE
+ )
+ stdout, stderr = await proc.communicate()
+ if proc.returncode != 0:
+ raise Exception(f"Error converting file to WAV: {stderr.decode()}")
return wav_file_path
-def download_from_youtube(url: str):
- temp_file = os.path.join(ASR_DIR, f"{uuid.uuid4()}.mp3")
- ytdl_opts = {
- 'outtmpl': temp_file,
- 'postprocessors': [{'key': 'FFmpegExtractAudio', 'preferredcodec': 'mp3', 'preferredquality': '192'}],
- 'nooverwrites': True
- }
- with YoutubeDL(ytdl_opts) as ydl:
- ydl.download([url])
- return convert_to_wav(temp_file)
def format_srt_timestamp(seconds: float):
milliseconds = round(seconds * 1000.0)
diff --git a/sijapi/routers/email.py b/sijapi/routers/email.py
index ca27520..4dafbb0 100644
--- a/sijapi/routers/email.py
+++ b/sijapi/routers/email.py
@@ -15,21 +15,16 @@ from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.image import MIMEImage
import ssl
-from datetime import datetime as dt_datetime
-from pydantic import BaseModel
-from typing import List, Optional, Any
import yaml
-from typing import List, Dict, Optional
-from pydantic import BaseModel
-from sijapi import DEBUG, ERR, LLM_SYS_MSG
+from typing import List, Dict, Optional, Set
from datetime import datetime as dt_datetime
-from typing import Dict
from sijapi import DEBUG, INFO, WARN, ERR, CRITICAL
-from sijapi import PODCAST_DIR, DEFAULT_VOICE, EMAIL_CONFIG
+from sijapi import PODCAST_DIR, DEFAULT_VOICE, EMAIL_CONFIG, EMAIL_LOGS
from sijapi.routers import tts, llm, sd, locate
from sijapi.utilities import clean_text, assemble_journal_path, extract_text, prefix_lines
from sijapi.classes import EmailAccount, IMAPConfig, SMTPConfig, IncomingEmail, EmailContact, AutoResponder
-
+from sijapi import DEBUG, INFO, ERR, LOGS_DIR
+from sijapi.classes import EmailAccount
email = APIRouter(tags=["private"])
@@ -76,8 +71,6 @@ def get_smtp_connection(account: EmailAccount):
return SMTP(account.smtp.host, account.smtp.port)
-
-
def get_matching_autoresponders(this_email: IncomingEmail, account: EmailAccount) -> List[AutoResponder]:
def matches_list(item: str, this_email: IncomingEmail) -> bool:
if '@' in item:
@@ -161,50 +154,6 @@ async def extract_attachments(attachments) -> List[str]:
return attachment_texts
-
-async def process_account(account: EmailAccount):
- while True:
- start_time = dt_datetime.now()
- try:
- DEBUG(f"Connecting to {account.name} to check for unread emails...")
- with get_imap_connection(account) as inbox:
- DEBUG(f"Connected to {account.name}, checking for unread emails now...")
- unread_messages = inbox.messages(unread=True)
- for uid, message in unread_messages:
- recipients = [EmailContact(email=recipient['email'], name=recipient.get('name', '')) for recipient in message.sent_to]
- localized_datetime = await locate.localize_datetime(message.date)
- this_email = IncomingEmail(
- sender=message.sent_from[0]['email'],
- datetime_received=localized_datetime,
- recipients=recipients,
- subject=message.subject,
- body=clean_email_content(message.body['html'][0]) if message.body['html'] else clean_email_content(message.body['plain'][0]) or "",
- attachments=message.attachments
- )
- DEBUG(f"\n\nProcessing email for account {account.name}: {this_email.subject}\n\n")
- save_success = await save_email(this_email, account)
- respond_success = await autorespond(this_email, account)
- if save_success and respond_success:
- inbox.mark_seen(uid)
- except Exception as e:
- ERR(f"An error occurred for account {account.name}: {e}")
-
- # Calculate the time taken for processing
- processing_time = (dt_datetime.now() - start_time).total_seconds()
-
- # Calculate the remaining time to wait
- wait_time = max(0, account.refresh - processing_time)
-
- # Wait for the remaining time
- await asyncio.sleep(wait_time)
-
-
-async def process_all_accounts():
- email_accounts = load_email_accounts(EMAIL_CONFIG)
- tasks = [asyncio.create_task(process_account(account)) for account in email_accounts]
- await asyncio.gather(*tasks)
-
-
async def save_email(this_email: IncomingEmail, account: EmailAccount):
try:
md_path, md_relative = assemble_journal_path(this_email.datetime_received, "Emails", this_email.subject, ".md")
@@ -262,6 +211,7 @@ tags:
async def autorespond(this_email: IncomingEmail, account: EmailAccount):
matching_profiles = get_matching_autoresponders(this_email, account)
+ DEBUG(f"Matching profiles: {matching_profiles}")
for profile in matching_profiles:
DEBUG(f"Auto-responding to {this_email.subject} with profile: {profile.name}")
auto_response_subject = f"Auto-Response Re: {this_email.subject}"
@@ -296,7 +246,84 @@ async def send_auto_response(to_email, subject, body, profile, account):
ERR(f"Error in preparing/sending auto-response from account {account.name}: {e}")
return False
-
+
+
+
+async def load_processed_uids(filename: Path) -> Set[str]:
+ if filename.exists():
+ with open(filename, 'r') as f:
+ return set(line.strip().split(':')[-1] for line in f)
+ return set()
+
+async def save_processed_uid(filename: Path, account_name: str, uid: str):
+ with open(filename, 'a') as f:
+ f.write(f"{account_name}:{uid}\n")
+
+async def process_account_summarization(account: EmailAccount):
+ summarized_log = EMAIL_LOGS / "summarized.txt"
+ while True:
+ try:
+ processed_uids = await load_processed_uids(summarized_log)
+ with get_imap_connection(account) as inbox:
+ unread_messages = inbox.messages(unread=True)
+ for uid, message in unread_messages:
+ uid_str = uid.decode() if isinstance(uid, bytes) else str(uid)
+ if uid_str not in processed_uids:
+ recipients = [EmailContact(email=recipient['email'], name=recipient.get('name', '')) for recipient in message.sent_to]
+ localized_datetime = await locate.localize_datetime(message.date)
+ this_email = IncomingEmail(
+ sender=message.sent_from[0]['email'],
+ datetime_received=localized_datetime,
+ recipients=recipients,
+ subject=message.subject,
+ body=clean_email_content(message.body['html'][0]) if message.body['html'] else clean_email_content(message.body['plain'][0]) or "",
+ attachments=message.attachments
+ )
+ if account.summarize:
+ save_success = await save_email(this_email, account)
+ if save_success:
+ await save_processed_uid(summarized_log, account.name, uid_str)
+ DEBUG(f"Summarized email: {uid_str}")
+ except Exception as e:
+ ERR(f"An error occurred during summarization for account {account.name}: {e}")
+
+ await asyncio.sleep(account.refresh)
+
+async def process_account_autoresponding(account: EmailAccount):
+ autoresponded_log = EMAIL_LOGS / "autoresponded.txt"
+ while True:
+ try:
+ processed_uids = await load_processed_uids(autoresponded_log)
+ with get_imap_connection(account) as inbox:
+ unread_messages = inbox.messages(unread=True)
+ for uid, message in unread_messages:
+ uid_str = uid.decode() if isinstance(uid, bytes) else str(uid)
+ if uid_str not in processed_uids:
+ recipients = [EmailContact(email=recipient['email'], name=recipient.get('name', '')) for recipient in message.sent_to]
+ localized_datetime = await locate.localize_datetime(message.date)
+ this_email = IncomingEmail(
+ sender=message.sent_from[0]['email'],
+ datetime_received=localized_datetime,
+ recipients=recipients,
+ subject=message.subject,
+ body=clean_email_content(message.body['html'][0]) if message.body['html'] else clean_email_content(message.body['plain'][0]) or "",
+ attachments=message.attachments
+ )
+ respond_success = await autorespond(this_email, account)
+ if respond_success:
+ await save_processed_uid(autoresponded_log, account.name, uid_str)
+ DEBUG(f"Auto-responded to email: {uid_str}")
+ except Exception as e:
+ ERR(f"An error occurred during auto-responding for account {account.name}: {e}")
+
+ await asyncio.sleep(account.refresh)
+
+async def process_all_accounts():
+ email_accounts = load_email_accounts(EMAIL_CONFIG)
+ summarization_tasks = [asyncio.create_task(process_account_summarization(account)) for account in email_accounts]
+ autoresponding_tasks = [asyncio.create_task(process_account_autoresponding(account)) for account in email_accounts]
+ await asyncio.gather(*summarization_tasks, *autoresponding_tasks)
+
@email.on_event("startup")
async def startup_event():
asyncio.create_task(process_all_accounts())
\ No newline at end of file