diff --git a/sijapi/__init__.py b/sijapi/__init__.py index 1b0d8a4..735b92f 100644 --- a/sijapi/__init__.py +++ b/sijapi/__init__.py @@ -10,8 +10,8 @@ from pydantic import BaseModel import traceback import logging from .logs import Logger +from .purgenc import process_nc - # from sijapi.config.config import load_config # cfg = load_config() @@ -93,6 +93,7 @@ VISUALCROSSING_API_KEY = os.getenv("VISUALCROSSING_API_KEY") ### Obsidian & notes OBSIDIAN_VAULT_DIR = Path(os.getenv("OBSIDIAN_BASE_DIR") or HOME_DIR / "Nextcloud" / "notes") OBSIDIAN_JOURNAL_DIR = OBSIDIAN_VAULT_DIR / "journal" +process_nc(OBSIDIAN_JOURNAL_DIR, True) OBSIDIAN_RESOURCES_DIR = "obsidian/resources" OBSIDIAN_BANNER_DIR = f"{OBSIDIAN_RESOURCES_DIR}/banners" os.makedirs(Path(OBSIDIAN_VAULT_DIR) / OBSIDIAN_BANNER_DIR, exist_ok=True) diff --git a/sijapi/purgenc.py b/sijapi/purgenc.py new file mode 100644 index 0000000..ce6b3c9 --- /dev/null +++ b/sijapi/purgenc.py @@ -0,0 +1,67 @@ +import os +import re +from pathlib import Path + +# Set the maximum permissible file name length for NextCloud +MAX_FILENAME_LENGTH = 255 + +# Define impermissible characters for NextCloud +IMPERMISSIBLE_CHARS = r'[<>:"/\\|?*\n]' + +def sanitize_file_name(file_name): + """Sanitize the file name by replacing impermissible characters and truncating if necessary.""" + # Replace impermissible characters with underscores + sanitized_name = re.sub(IMPERMISSIBLE_CHARS, '_', file_name) + # Truncate the file name if it exceeds the maximum length + if len(sanitized_name) > MAX_FILENAME_LENGTH: + ext = Path(sanitized_name).suffix + base_name = sanitized_name[:MAX_FILENAME_LENGTH - len(ext)] + sanitized_name = base_name + ext + return sanitized_name + +def check_file_name(file_name): + """Check if the file name is impermissibly long or contains impermissible characters.""" + if len(file_name) > MAX_FILENAME_LENGTH: + return True + if re.search(IMPERMISSIBLE_CHARS, file_name): + return True + return False + +def list_and_correct_impermissible_files(root_dir, rename: bool = False): + """List and correct all files with impermissible names.""" + impermissible_files = [] + for dirpath, _, filenames in os.walk(root_dir): + for filename in filenames: + if check_file_name(filename): + file_path = Path(dirpath) / filename + impermissible_files.append(file_path) + print(f"Impermissible file found: {file_path}") + + # Sanitize the file name + new_filename = sanitize_file_name(filename) + new_file_path = Path(dirpath) / new_filename + + # Ensure the new file name does not already exist + if new_file_path.exists(): + counter = 1 + base_name, ext = os.path.splitext(new_filename) + while new_file_path.exists(): + new_filename = f"{base_name}_{counter}{ext}" + new_file_path = Path(dirpath) / new_filename + counter += 1 + + # Rename the file + if rename == True: + os.rename(file_path, new_file_path) + print(f"Renamed: {file_path} -> {new_file_path}") + + return impermissible_files + +def process_nc(dir_to_fix, rename: bool = False): + impermissible_files = list_and_correct_impermissible_files(dir_to_fix, rename) + if impermissible_files: + print("\nList of impermissible files found and corrected:") + for file in impermissible_files: + print(file) + else: + print("No impermissible files found.") diff --git a/sijapi/routers/note.py b/sijapi/routers/note.py index 905e5c5..a48eca7 100644 --- a/sijapi/routers/note.py +++ b/sijapi/routers/note.py @@ -140,6 +140,17 @@ async def clip_post( markdown_filename = await process_article(background_tasks, url, title, encoding, source, tts, voice) return {"message": "Clip saved successfully", "markdown_filename": markdown_filename} +@note.post("/archive") +async def archive_post( + background_tasks: BackgroundTasks, + file: UploadFile = None, + url: Optional[str] = Form(None), + source: Optional[str] = Form(None), + title: Optional[str] = Form(None), + encoding: str = Form('utf-8') +): + markdown_filename = await process_archive(background_tasks, url, title, encoding, source) + return {"message": "Clip saved successfully", "markdown_filename": markdown_filename} @note.get("/clip") async def clip_get( @@ -182,27 +193,68 @@ async def process_for_daily_note(file: Optional[UploadFile] = File(None), text: absolute_path, relative_path = assemble_journal_path(now, subdir=subdir, filename=file.filename) with open(absolute_path, 'wb') as f: f.write(file_content) - source_prefix = "" + if 'audio' in file_type: transcription = await asr.transcribe_audio(file_path=absolute_path, params=asr.TranscribeParams(model="small-en", language="en", threads=6)) - source_prefix = "voice note" file_entry = f"![[{relative_path}]]" elif 'image' in file_type: - source_prefix = "image" file_entry = f"![[{relative_path}]]" else: file_entry = f"[Source]({relative_path})" - if source: - source = f" — {source_prefix} from {source}:" - else: - source = "" text_entry = text if text else "" INFO(f"transcription: {transcription}\nfile_entry: {file_entry}\ntext_entry: {text_entry}") - return await add_to_daily_note(transcription, file_entry, text_entry, now, source) + return await add_to_daily_note(transcription, file_entry, text_entry, now) + + +async def add_to_daily_note(transcription: str = None, file_link: str = None, additional_text: str = None, date_time: datetime = None): + date_time = date_time or datetime.now() + note_path, _ = assemble_journal_path(date_time, filename='Notes', extension=".md", no_timestamp = True) + time_str = date_time.strftime("%H:%M") + + entry_lines = [] + if additional_text and additional_text.strip(): + entry_lines.append(f"\t* {additional_text.strip()}") + if transcription and transcription.strip(): + entry_lines.append(f"\t* {transcription.strip()}") + if file_link and file_link.strip(): + entry_lines.append(f"\t\t {file_link.strip()}") + + entry = f"\n* **{time_str}**\n" + "\n".join(entry_lines) + + # Write the entry to the end of the file + if note_path.exists(): + with open(note_path, 'a', encoding='utf-8') as note_file: + note_file.write(entry) + else: + date_str = date_time.strftime("%Y-%m-%d") + frontmatter = f"""--- +date: {date_str} +tags: + - notes +--- + +""" + content = frontmatter + entry + # If the file doesn't exist, create it and start with "Notes" + with open(note_path, 'w', encoding='utf-8') as note_file: + note_file.write(content) + + return entry + +async def handle_text(title:str, summary:str, extracted_text:str, date_time: datetime = None): + date_time = date_time if date_time else datetime.now() + absolute_path, relative_path = assemble_journal_path(date_time, filename=title, extension=".md", no_timestamp = True) + with open(absolute_path, "w") as file: + file.write(f"# {title}\n\n## Summary\n{summary}\n\n## Transcript\n{extracted_text}") + + # add_to_daily_note(f"**Uploaded [[{title}]]**: *{summary}*", absolute_path) + + return True + async def process_document( background_tasks: BackgroundTasks, @@ -293,13 +345,12 @@ async def process_article( parsed_content = parse_article(url, source) if parsed_content is None: return {"error": "Failed to retrieve content"} - - - readable_title = sanitize_filename(title if title else parsed_content.get("title", "Untitled")) - content = parsed_content["content"] - markdown_filename, relative_path = assemble_journal_path(datetime.now(), "Articles", readable_title, extension=".md") + readable_title = sanitize_filename(title if title else parsed_content.get("title", "Untitled")) + if not readable_title: + readable_title = timestamp + markdown_filename, relative_path = assemble_journal_path(datetime.now(), subdir="Articles", filename=readable_title, extension=".md") try: tags = parsed_content.get('meta_keywords', []) @@ -374,6 +425,7 @@ banner: "{banner_markdown}" md_file.write(markdown_content) INFO(f"Successfully saved to {markdown_filename}") + add_to_daily_note return markdown_filename except Exception as e: @@ -415,6 +467,66 @@ def parse_article(url: str, source: Optional[str] = None): +async def process_archive( + background_tasks: BackgroundTasks, + url: str, + title: Optional[str] = None, + encoding: str = 'utf-8', + source: Optional[str] = None, +): + + timestamp = datetime.now().strftime('%b %d, %Y at %H:%M') + + parsed_content = parse_article(url, source) + if parsed_content is None: + return {"error": "Failed to retrieve content"} + content = parsed_content["content"] + + readable_title = sanitize_filename(title if title else parsed_content.get("title", "Untitled")) + if not readable_title: + readable_title = timestamp + + markdown_path = OBSIDIAN_VAULT_DIR / "archive" + + try: + frontmatter = f"""--- +title: {readable_title} +author: {parsed_content.get('author', 'Unknown')} +published: {parsed_content.get('date_published', 'Unknown')} +added: {timestamp} +excerpt: {parsed_content.get('excerpt', '')} +--- +""" + body = f"# {readable_title}\n\n" + + try: + authors = parsed_content.get('author', '') + authors_in_brackets = [f"[[{author.strip()}]]" for author in authors.split(",")] + authors_string = ", ".join(authors_in_brackets) + + body += f"by {authors_string} in [{parsed_content.get('domain', urlparse(url).netloc.replace('www.', ''))}]({parsed_content.get('url', url)}).\n\n" + body += content + markdown_content = frontmatter + body + except Exception as e: + ERR(f"Failed to combine elements of article markdown.") + + try: + with open(markdown_path, 'w', encoding=encoding) as md_file: + md_file.write(markdown_content) + + INFO(f"Successfully saved to {markdown_path}") + add_to_daily_note + return markdown_path + + except Exception as e: + ERR(f"Failed to write markdown file") + raise HTTPException(status_code=500, detail=str(e)) + + except Exception as e: + ERR(f"Failed to clip {url}: {str(e)}") + raise HTTPException(status_code=500, detail=str(e)) + + def download_file(url, folder): os.makedirs(folder, exist_ok=True) filename = str(uuid.uuid4()) + os.path.splitext(urlparse(url).path)[-1] @@ -463,50 +575,6 @@ async def save_file(file: UploadFile, folder: Path) -> Path: -async def add_to_daily_note(transcription: str, file_link: str, additional_text: str, date_time: datetime = None, source: str = None): - date_time = date_time or datetime.now() - note_path, _ = assemble_journal_path(date_time, filename='Notes', extension=".md", no_timestamp = True) - time_str = date_time.strftime("%H:%M") - - entry_lines = [] - if additional_text.strip(): - entry_lines.append(f"\t* {additional_text.strip()}") - if transcription.strip(): - entry_lines.append(f"\t* {transcription.strip()}") - if file_link.strip(): - entry_lines.append(f"\t\t {file_link.strip()}") - - entry = f"\n* **{time_str}**{source}\n" + "\n".join(entry_lines) - - # Write the entry to the end of the file - if note_path.exists(): - with open(note_path, 'a', encoding='utf-8') as note_file: - note_file.write(entry) - else: - date_str = date_time.strftime("%Y-%m-%d") - frontmatter = f"""--- -date: {date_str} -tags: - - notes ---- - -""" - content = frontmatter + entry - # If the file doesn't exist, create it and start with "Notes" - with open(note_path, 'w', encoding='utf-8') as note_file: - note_file.write(content) - - return entry - -async def handle_text(title:str, summary:str, extracted_text:str, date_time: datetime = None): - date_time = date_time if date_time else datetime.now() - absolute_path, relative_path = assemble_journal_path(date_time, filename=title, extension=".md", no_timestamp = True) - with open(absolute_path, "w") as file: - file.write(f"# {title}\n\n## Summary\n{summary}\n\n## Transcript\n{extracted_text}") - - # add_to_daily_note(f"**Uploaded [[{title}]]**: *{summary}*", absolute_path) - - return True ### FRONTMATTER, BANNER diff --git a/sijapi/utilities.py b/sijapi/utilities.py index fb53a56..e051d8c 100644 --- a/sijapi/utilities.py +++ b/sijapi/utilities.py @@ -62,16 +62,25 @@ def assemble_journal_path(date_time: datetime, subdir: str = None, filename: str relative_path = relative_path / f"{day_short} {subdir}" if filename: - if no_timestamp: - filename = f"{day_short} {sanitize_filename(filename)}" - else: - filename = f"{day_short} {timestamp} {sanitize_filename(filename)}" + filename = sanitize_filename(filename) + filename = f"{day_short} {filename}" if no_timestamp else f"{day_short} {timestamp} {filename}" if extension: extension = extension if extension.startswith(".") else f".{extension}" filename = f"{filename}{extension}" if not filename.endswith(extension) else filename - + + else: + if has_valid_extension(filename, [".md", ".m4a", ".wav", ".aiff", ".flac", ".mp3", ".mp4", ".pdf", ".js", ".json", ".yaml", ".py"]): + DEBUG(f"Provided filename has a valid extension, so we use that.") + else: + filename = f"{filename}.md" + DEBUG(f"We are forcing the file to be a .md") + relative_path = relative_path / filename + + else: + DEBUG(f"This only happens, theoretically, when no filename nor subdirectory are provided, but an extension is. Which is kinda silly.") + return None, None absolute_path = OBSIDIAN_VAULT_DIR / relative_path @@ -79,6 +88,16 @@ def assemble_journal_path(date_time: datetime, subdir: str = None, filename: str return absolute_path, relative_path + +def has_valid_extension(filename, valid_extensions=None): + if valid_extensions is None: + # Check if there's any extension + return bool(os.path.splitext(filename)[1]) + else: + # Check if the extension is in the list of valid extensions + return os.path.splitext(filename)[1].lower() in valid_extensions + + def prefix_lines(text: str, prefix: str = '> ') -> str: lines = text.split('\n') prefixed_lines = [f"{prefix}{line.lstrip()}" for line in lines] @@ -117,15 +136,24 @@ def get_extension(file): raise e - def sanitize_filename(text, max_length=255): """Sanitize a string to be used as a safe filename.""" DEBUG(f"Filename before sanitization: {text}") - sanitized = re.sub(r'[^\w\s\.-]', '', text).strip() + + # Replace multiple spaces with a single space and remove other whitespace + text = re.sub(r'\s+', ' ', text) + + # Remove any non-word characters except space, dot, and hyphen + sanitized = re.sub(r'[^\w \.-]', '', text) + + # Remove leading/trailing spaces + sanitized = sanitized.strip() + final_filename = sanitized[:max_length] DEBUG(f"Filename after sanitization: {final_filename}") return final_filename + def bool_convert(value: str = Form(None)): return value.lower() in ["true", "1", "t", "y", "yes"]