From ded78ba57109a74406f6ab5d6c3a6432646a93e2 Mon Sep 17 00:00:00 2001 From: sanj <67624670+iodrift@users.noreply.github.com> Date: Mon, 24 Jun 2024 23:21:14 -0700 Subject: [PATCH] Auto-update: Mon Jun 24 23:21:14 PDT 2024 --- sijapi/__init__.py | 4 +- sijapi/__main__.py | 5 +- sijapi/purgenc.py | 67 -------------------------- sijapi/routers/note.py | 83 ++++++++++++++++---------------- sijapi/utilities.py | 106 +++++++++++++++++++++++++++++++++++++---- 5 files changed, 143 insertions(+), 122 deletions(-) delete mode 100644 sijapi/purgenc.py diff --git a/sijapi/__init__.py b/sijapi/__init__.py index 735b92f..ff9f9a2 100644 --- a/sijapi/__init__.py +++ b/sijapi/__init__.py @@ -10,7 +10,6 @@ from pydantic import BaseModel import traceback import logging from .logs import Logger -from .purgenc import process_nc # from sijapi.config.config import load_config # cfg = load_config() @@ -91,9 +90,10 @@ VISUALCROSSING_API_KEY = os.getenv("VISUALCROSSING_API_KEY") ### Obsidian & notes +ALLOWED_FILENAME_CHARS = r'[^\w \.-]' +MAX_FILENAME_LENGTH = 255 OBSIDIAN_VAULT_DIR = Path(os.getenv("OBSIDIAN_BASE_DIR") or HOME_DIR / "Nextcloud" / "notes") OBSIDIAN_JOURNAL_DIR = OBSIDIAN_VAULT_DIR / "journal" -process_nc(OBSIDIAN_JOURNAL_DIR, True) OBSIDIAN_RESOURCES_DIR = "obsidian/resources" OBSIDIAN_BANNER_DIR = f"{OBSIDIAN_RESOURCES_DIR}/banners" os.makedirs(Path(OBSIDIAN_VAULT_DIR) / OBSIDIAN_BANNER_DIR, exist_ok=True) diff --git a/sijapi/__main__.py b/sijapi/__main__.py index b1a85ca..42d9eea 100755 --- a/sijapi/__main__.py +++ b/sijapi/__main__.py @@ -18,8 +18,9 @@ from dotenv import load_dotenv from pathlib import Path from datetime import datetime import argparse -from . import LOGGER, LOGS_DIR +from . import LOGGER, LOGS_DIR, OBSIDIAN_VAULT_DIR from .logs import Logger +from .utilities import fix_nextcloud_filenames parser = argparse.ArgumentParser(description='Personal API.') parser.add_argument('--debug', action='store_true', help='Set log level to INFO') @@ -137,6 +138,8 @@ def main(argv): for router_name in ROUTERS: load_router(router_name) + journal = OBSIDIAN_VAULT_DIR / "journal" + fix_nextcloud_filenames(journal, rename=True) config = Config() config.keep_alive_timeout = 1200 config.bind = [HOST] diff --git a/sijapi/purgenc.py b/sijapi/purgenc.py deleted file mode 100644 index ce6b3c9..0000000 --- a/sijapi/purgenc.py +++ /dev/null @@ -1,67 +0,0 @@ -import os -import re -from pathlib import Path - -# Set the maximum permissible file name length for NextCloud -MAX_FILENAME_LENGTH = 255 - -# Define impermissible characters for NextCloud -IMPERMISSIBLE_CHARS = r'[<>:"/\\|?*\n]' - -def sanitize_file_name(file_name): - """Sanitize the file name by replacing impermissible characters and truncating if necessary.""" - # Replace impermissible characters with underscores - sanitized_name = re.sub(IMPERMISSIBLE_CHARS, '_', file_name) - # Truncate the file name if it exceeds the maximum length - if len(sanitized_name) > MAX_FILENAME_LENGTH: - ext = Path(sanitized_name).suffix - base_name = sanitized_name[:MAX_FILENAME_LENGTH - len(ext)] - sanitized_name = base_name + ext - return sanitized_name - -def check_file_name(file_name): - """Check if the file name is impermissibly long or contains impermissible characters.""" - if len(file_name) > MAX_FILENAME_LENGTH: - return True - if re.search(IMPERMISSIBLE_CHARS, file_name): - return True - return False - -def list_and_correct_impermissible_files(root_dir, rename: bool = False): - """List and correct all files with impermissible names.""" - impermissible_files = [] - for dirpath, _, filenames in os.walk(root_dir): - for filename in filenames: - if check_file_name(filename): - file_path = Path(dirpath) / filename - impermissible_files.append(file_path) - print(f"Impermissible file found: {file_path}") - - # Sanitize the file name - new_filename = sanitize_file_name(filename) - new_file_path = Path(dirpath) / new_filename - - # Ensure the new file name does not already exist - if new_file_path.exists(): - counter = 1 - base_name, ext = os.path.splitext(new_filename) - while new_file_path.exists(): - new_filename = f"{base_name}_{counter}{ext}" - new_file_path = Path(dirpath) / new_filename - counter += 1 - - # Rename the file - if rename == True: - os.rename(file_path, new_file_path) - print(f"Renamed: {file_path} -> {new_file_path}") - - return impermissible_files - -def process_nc(dir_to_fix, rename: bool = False): - impermissible_files = list_and_correct_impermissible_files(dir_to_fix, rename) - if impermissible_files: - print("\nList of impermissible files found and corrected:") - for file in impermissible_files: - print(file) - else: - print("No impermissible files found.") diff --git a/sijapi/routers/note.py b/sijapi/routers/note.py index a48eca7..c6589b0 100644 --- a/sijapi/routers/note.py +++ b/sijapi/routers/note.py @@ -12,7 +12,7 @@ from typing import Optional, Union, Dict, List, Tuple from urllib.parse import urlparse from urllib3.util.retry import Retry from newspaper import Article -from trafilatura import fetch_url, extract +import trafilatura from requests.adapters import HTTPAdapter import re import os @@ -345,24 +345,16 @@ async def process_article( parsed_content = parse_article(url, source) if parsed_content is None: return {"error": "Failed to retrieve content"} - content = parsed_content["content"] - readable_title = sanitize_filename(title if title else parsed_content.get("title", "Untitled")) - if not readable_title: - readable_title = timestamp + readable_title = sanitize_filename(title or parsed_content.get("title") or timestamp) markdown_filename, relative_path = assemble_journal_path(datetime.now(), subdir="Articles", filename=readable_title, extension=".md") try: - tags = parsed_content.get('meta_keywords', []) - tags = [tag for tag in tags if tag] - tags.append('clipping') - tags_list = "\n - ".join(tags) - - summary = await summarize.summarize_text(content, "Summarize the provided text. Respond with the summary and nothing else. Do not otherwise acknowledge the request. Just provide the requested summary.") + summary = await summarize.summarize_text(parsed_content["content"], "Summarize the provided text. Respond with the summary and nothing else. Do not otherwise acknowledge the request. Just provide the requested summary.") summary = summary.replace('\n', ' ') # Remove line breaks if tts_mode == "full" or tts_mode == "content": - tts_text = content + tts_text = parsed_content["content"] elif tts_mode == "summary" or tts_mode == "excerpt": tts_text = summary else: @@ -370,27 +362,30 @@ async def process_article( banner_markdown = '' try: - banner_url = parsed_content.get('lead_image_url', '') + banner_url = parsed_content.get('image', '') if banner_url != '': - banner_image = download_file(parsed_content.get('lead_image_url', ''), Path(OBSIDIAN_VAULT_DIR / OBSIDIAN_RESOURCES_DIR)) + banner_image = download_file(banner_url, Path(OBSIDIAN_VAULT_DIR / OBSIDIAN_RESOURCES_DIR)) if banner_image: banner_markdown = f"![[{OBSIDIAN_RESOURCES_DIR}/{banner_image}]]" except Exception as e: ERR(f"No image found in article") + authors = ', '.join('[[{}]]'.format(author) for author in parsed_content.get('authors', ['Unknown'])) frontmatter = f"""--- title: {readable_title} -author: {parsed_content.get('author', 'Unknown')} +authors: {', '.join('[[{}]]'.format(author) for author in parsed_content.get('authors', ['Unknown']))} published: {parsed_content.get('date_published', 'Unknown')} added: {timestamp} -tags: - - {tags_list} excerpt: {parsed_content.get('excerpt', '')} banner: "{banner_markdown}" ---- +tags: + """ + frontmatter += '\n'.join(f" - {tag}" for tag in parsed_content.get('tags', [])) + frontmatter += '\n---\n' + body = f"# {readable_title}\n\n" if tts_text: @@ -403,20 +398,15 @@ banner: "{banner_markdown}" obsidian_link = f"![[{OBSIDIAN_RESOURCES_DIR}/{audio_filename}{audio_ext}]]" body += f"{obsidian_link}\n\n" except Exception as e: - ERR(f"Failed to generate TTS for article. {e}") + ERR(f"Failed to generate TTS for np3k. {e}") try: - authors = parsed_content.get('author', '') - authors_in_brackets = [f"[[{author.strip()}]]" for author in authors.split(",")] - authors_string = ", ".join(authors_in_brackets) - - body += f"by {authors_string} in [{parsed_content.get('domain', urlparse(url).netloc.replace('www.', ''))}]({parsed_content.get('url', url)}).\n\n" - - + body += f"by {authors} in [{parsed_content.get('domain', urlparse(url).netloc.replace('www.', ''))}]({url}).\n\n" body += f"> [!summary]+\n" body += f"> {summary}\n\n" - body += content + body += parsed_content["content"] markdown_content = frontmatter + body + except Exception as e: ERR(f"Failed to combine elements of article markdown.") @@ -438,31 +428,38 @@ banner: "{banner_markdown}" def parse_article(url: str, source: Optional[str] = None): - # Use trafilatura to download HTML content: - downloaded = source if source else fetch_url(url) + source = source if source else trafilatura.fetch_url(url) + traf = trafilatura.extract_metadata(filecontent=source, default_url=url) # Pass the HTML content to newspaper3k: - article = Article(url) - article.set_html(downloaded) - article.parse() + np3k = Article(url) + np3k.set_html(source) + np3k.parse() - # Use trafilatura to extract content in Markdown - trafilatura_result = extract(downloaded, output_format="markdown", include_comments=False) - content = trafilatura_result if trafilatura_result else article.text + INFO(f"Parsed {np3k.title}") + - domain = urlparse(url).netloc.replace('www.', '') - INFO(f"Parsed {article.title}") + title = np3k.title or traf.title + authors = np3k.authors or traf.author + authors = authors if isinstance(authors, List) else [authors] + date = np3k.publish_date or localize_dt(traf.date) + excerpt = np3k.meta_description or traf.description + content = trafilatura.extract(source, output_format="markdown", include_comments=False) or np3k.text + image = np3k.top_image or traf.image + domain = traf.sitename or urlparse(url).netloc.replace('www.', '').title() + tags = np3k.meta_keywords or traf.categories or traf.tags + tags = tags if isinstance(tags, List) else [tags] return { - 'title': article.title.replace(" ", " "), - 'author': ', '.join(article.authors) if article.authors else 'Unknown', - 'date_published': article.publish_date.strftime("%b %d, %Y at %H:%M") if article.publish_date else None, - 'excerpt': article.meta_description, + 'title': title.replace(" ", " "), + 'authors': authors, + 'date': date.strftime("%b %d, %Y at %H:%M"), + 'excerpt': excerpt, 'content': content, - 'lead_image_url': article.top_image, + 'image': image, 'url': url, 'domain': domain, - 'meta_keywords': article.meta_keywords + 'tags': np3k.meta_keywords } diff --git a/sijapi/utilities.py b/sijapi/utilities.py index e051d8c..b65d1ce 100644 --- a/sijapi/utilities.py +++ b/sijapi/utilities.py @@ -24,7 +24,7 @@ from sshtunnel import SSHTunnelForwarder from fastapi import Depends, HTTPException, Request, UploadFile from fastapi.security.api_key import APIKeyHeader from sijapi import DEBUG, INFO, WARN, ERR, CRITICAL -from sijapi import DB, GLOBAL_API_KEY, DB, DB_HOST, DB_PORT, DB_USER, DB_PASS, TZ, YEAR_FMT, MONTH_FMT, DAY_FMT, DAY_SHORT_FMT, OBSIDIAN_VAULT_DIR +from sijapi import DB, GLOBAL_API_KEY, DB, DB_HOST, DB_PORT, DB_USER, DB_PASS, TZ, YEAR_FMT, MONTH_FMT, DAY_FMT, DAY_SHORT_FMT, OBSIDIAN_VAULT_DIR, ALLOWED_FILENAME_CHARS, MAX_FILENAME_LENGTH api_key_header = APIKeyHeader(name="Authorization") @@ -136,24 +136,112 @@ def get_extension(file): raise e -def sanitize_filename(text, max_length=255): - """Sanitize a string to be used as a safe filename.""" + +def sanitize_filename(text, max_length=MAX_FILENAME_LENGTH): + """Sanitize a string to be used as a safe filename while protecting the file extension.""" DEBUG(f"Filename before sanitization: {text}") - + # Replace multiple spaces with a single space and remove other whitespace text = re.sub(r'\s+', ' ', text) - + # Remove any non-word characters except space, dot, and hyphen - sanitized = re.sub(r'[^\w \.-]', '', text) - + sanitized = re.sub(ALLOWED_FILENAME_CHARS, '', text) + # Remove leading/trailing spaces sanitized = sanitized.strip() - - final_filename = sanitized[:max_length] + + # Split the filename into base name and extension + base_name, extension = os.path.splitext(sanitized) + + # Calculate the maximum length for the base name + max_base_length = max_length - len(extension) + + # Truncate the base name if necessary + if len(base_name) > max_base_length: + base_name = base_name[:max_base_length].rstrip() + + # Recombine the base name and extension + final_filename = base_name + extension + + # In case the extension itself is too long, truncate the entire filename + if len(final_filename) > max_length: + final_filename = final_filename[:max_length] + DEBUG(f"Filename after sanitization: {final_filename}") return final_filename + +def check_file_name(file_name, max_length=255): + """Check if the file name needs sanitization based on the criteria of the second sanitize_filename function.""" + DEBUG(f"Checking filename: {file_name}") + + needs_sanitization = False + + # Check for length + if len(file_name) > max_length: + DEBUG(f"Filename exceeds maximum length of {max_length}") + needs_sanitization = True + + # Check for non-word characters (except space, dot, and hyphen) + if re.search(ALLOWED_FILENAME_CHARS, file_name): + DEBUG("Filename contains non-word characters (except space, dot, and hyphen)") + needs_sanitization = True + + # Check for multiple consecutive spaces + if re.search(r'\s{2,}', file_name): + DEBUG("Filename contains multiple consecutive spaces") + needs_sanitization = True + + # Check for leading/trailing spaces + if file_name != file_name.strip(): + DEBUG("Filename has leading or trailing spaces") + needs_sanitization = True + + DEBUG(f"Filename {'needs' if needs_sanitization else 'does not need'} sanitization") + return needs_sanitization + + +def list_and_correct_impermissible_files(root_dir, rename: bool = False): + """List and correct all files with impermissible names.""" + impermissible_files = [] + for dirpath, _, filenames in os.walk(root_dir): + for filename in filenames: + if check_file_name(filename): + file_path = Path(dirpath) / filename + impermissible_files.append(file_path) + print(f"Impermissible file found: {file_path}") + + # Sanitize the file name + new_filename = sanitize_filename(filename) + new_file_path = Path(dirpath) / new_filename + + # Ensure the new file name does not already exist + if new_file_path.exists(): + counter = 1 + base_name, ext = os.path.splitext(new_filename) + while new_file_path.exists(): + new_filename = f"{base_name}_{counter}{ext}" + new_file_path = Path(dirpath) / new_filename + counter += 1 + + # Rename the file + if rename: + os.rename(file_path, new_file_path) + print(f"Renamed: {file_path} -> {new_file_path}") + + return impermissible_files + +def fix_nextcloud_filenames(dir_to_fix, rename: bool = False): + impermissible_files = list_and_correct_impermissible_files(dir_to_fix, rename) + if impermissible_files: + print("\nList of impermissible files found and corrected:") + for file in impermissible_files: + print(file) + else: + print("No impermissible files found.") + + def bool_convert(value: str = Form(None)): return value.lower() in ["true", "1", "t", "y", "yes"]