Auto-update: Mon Jun 24 23:21:14 PDT 2024

This commit is contained in:
sanj 2024-06-24 23:21:14 -07:00
parent 0936cf8808
commit ded78ba571
5 changed files with 143 additions and 122 deletions

View file

@ -10,7 +10,6 @@ from pydantic import BaseModel
import traceback import traceback
import logging import logging
from .logs import Logger from .logs import Logger
from .purgenc import process_nc
# from sijapi.config.config import load_config # from sijapi.config.config import load_config
# cfg = load_config() # cfg = load_config()
@ -91,9 +90,10 @@ VISUALCROSSING_API_KEY = os.getenv("VISUALCROSSING_API_KEY")
### Obsidian & notes ### Obsidian & notes
ALLOWED_FILENAME_CHARS = r'[^\w \.-]'
MAX_FILENAME_LENGTH = 255
OBSIDIAN_VAULT_DIR = Path(os.getenv("OBSIDIAN_BASE_DIR") or HOME_DIR / "Nextcloud" / "notes") OBSIDIAN_VAULT_DIR = Path(os.getenv("OBSIDIAN_BASE_DIR") or HOME_DIR / "Nextcloud" / "notes")
OBSIDIAN_JOURNAL_DIR = OBSIDIAN_VAULT_DIR / "journal" OBSIDIAN_JOURNAL_DIR = OBSIDIAN_VAULT_DIR / "journal"
process_nc(OBSIDIAN_JOURNAL_DIR, True)
OBSIDIAN_RESOURCES_DIR = "obsidian/resources" OBSIDIAN_RESOURCES_DIR = "obsidian/resources"
OBSIDIAN_BANNER_DIR = f"{OBSIDIAN_RESOURCES_DIR}/banners" OBSIDIAN_BANNER_DIR = f"{OBSIDIAN_RESOURCES_DIR}/banners"
os.makedirs(Path(OBSIDIAN_VAULT_DIR) / OBSIDIAN_BANNER_DIR, exist_ok=True) os.makedirs(Path(OBSIDIAN_VAULT_DIR) / OBSIDIAN_BANNER_DIR, exist_ok=True)

View file

@ -18,8 +18,9 @@ from dotenv import load_dotenv
from pathlib import Path from pathlib import Path
from datetime import datetime from datetime import datetime
import argparse import argparse
from . import LOGGER, LOGS_DIR from . import LOGGER, LOGS_DIR, OBSIDIAN_VAULT_DIR
from .logs import Logger from .logs import Logger
from .utilities import fix_nextcloud_filenames
parser = argparse.ArgumentParser(description='Personal API.') parser = argparse.ArgumentParser(description='Personal API.')
parser.add_argument('--debug', action='store_true', help='Set log level to INFO') parser.add_argument('--debug', action='store_true', help='Set log level to INFO')
@ -137,6 +138,8 @@ def main(argv):
for router_name in ROUTERS: for router_name in ROUTERS:
load_router(router_name) load_router(router_name)
journal = OBSIDIAN_VAULT_DIR / "journal"
fix_nextcloud_filenames(journal, rename=True)
config = Config() config = Config()
config.keep_alive_timeout = 1200 config.keep_alive_timeout = 1200
config.bind = [HOST] config.bind = [HOST]

View file

@ -1,67 +0,0 @@
import os
import re
from pathlib import Path
# Set the maximum permissible file name length for NextCloud
MAX_FILENAME_LENGTH = 255
# Define impermissible characters for NextCloud
IMPERMISSIBLE_CHARS = r'[<>:"/\\|?*\n]'
def sanitize_file_name(file_name):
"""Sanitize the file name by replacing impermissible characters and truncating if necessary."""
# Replace impermissible characters with underscores
sanitized_name = re.sub(IMPERMISSIBLE_CHARS, '_', file_name)
# Truncate the file name if it exceeds the maximum length
if len(sanitized_name) > MAX_FILENAME_LENGTH:
ext = Path(sanitized_name).suffix
base_name = sanitized_name[:MAX_FILENAME_LENGTH - len(ext)]
sanitized_name = base_name + ext
return sanitized_name
def check_file_name(file_name):
"""Check if the file name is impermissibly long or contains impermissible characters."""
if len(file_name) > MAX_FILENAME_LENGTH:
return True
if re.search(IMPERMISSIBLE_CHARS, file_name):
return True
return False
def list_and_correct_impermissible_files(root_dir, rename: bool = False):
"""List and correct all files with impermissible names."""
impermissible_files = []
for dirpath, _, filenames in os.walk(root_dir):
for filename in filenames:
if check_file_name(filename):
file_path = Path(dirpath) / filename
impermissible_files.append(file_path)
print(f"Impermissible file found: {file_path}")
# Sanitize the file name
new_filename = sanitize_file_name(filename)
new_file_path = Path(dirpath) / new_filename
# Ensure the new file name does not already exist
if new_file_path.exists():
counter = 1
base_name, ext = os.path.splitext(new_filename)
while new_file_path.exists():
new_filename = f"{base_name}_{counter}{ext}"
new_file_path = Path(dirpath) / new_filename
counter += 1
# Rename the file
if rename == True:
os.rename(file_path, new_file_path)
print(f"Renamed: {file_path} -> {new_file_path}")
return impermissible_files
def process_nc(dir_to_fix, rename: bool = False):
impermissible_files = list_and_correct_impermissible_files(dir_to_fix, rename)
if impermissible_files:
print("\nList of impermissible files found and corrected:")
for file in impermissible_files:
print(file)
else:
print("No impermissible files found.")

View file

@ -12,7 +12,7 @@ from typing import Optional, Union, Dict, List, Tuple
from urllib.parse import urlparse from urllib.parse import urlparse
from urllib3.util.retry import Retry from urllib3.util.retry import Retry
from newspaper import Article from newspaper import Article
from trafilatura import fetch_url, extract import trafilatura
from requests.adapters import HTTPAdapter from requests.adapters import HTTPAdapter
import re import re
import os import os
@ -345,24 +345,16 @@ async def process_article(
parsed_content = parse_article(url, source) parsed_content = parse_article(url, source)
if parsed_content is None: if parsed_content is None:
return {"error": "Failed to retrieve content"} return {"error": "Failed to retrieve content"}
content = parsed_content["content"]
readable_title = sanitize_filename(title if title else parsed_content.get("title", "Untitled")) readable_title = sanitize_filename(title or parsed_content.get("title") or timestamp)
if not readable_title:
readable_title = timestamp
markdown_filename, relative_path = assemble_journal_path(datetime.now(), subdir="Articles", filename=readable_title, extension=".md") markdown_filename, relative_path = assemble_journal_path(datetime.now(), subdir="Articles", filename=readable_title, extension=".md")
try: try:
tags = parsed_content.get('meta_keywords', []) summary = await summarize.summarize_text(parsed_content["content"], "Summarize the provided text. Respond with the summary and nothing else. Do not otherwise acknowledge the request. Just provide the requested summary.")
tags = [tag for tag in tags if tag]
tags.append('clipping')
tags_list = "\n - ".join(tags)
summary = await summarize.summarize_text(content, "Summarize the provided text. Respond with the summary and nothing else. Do not otherwise acknowledge the request. Just provide the requested summary.")
summary = summary.replace('\n', ' ') # Remove line breaks summary = summary.replace('\n', ' ') # Remove line breaks
if tts_mode == "full" or tts_mode == "content": if tts_mode == "full" or tts_mode == "content":
tts_text = content tts_text = parsed_content["content"]
elif tts_mode == "summary" or tts_mode == "excerpt": elif tts_mode == "summary" or tts_mode == "excerpt":
tts_text = summary tts_text = summary
else: else:
@ -370,27 +362,30 @@ async def process_article(
banner_markdown = '' banner_markdown = ''
try: try:
banner_url = parsed_content.get('lead_image_url', '') banner_url = parsed_content.get('image', '')
if banner_url != '': if banner_url != '':
banner_image = download_file(parsed_content.get('lead_image_url', ''), Path(OBSIDIAN_VAULT_DIR / OBSIDIAN_RESOURCES_DIR)) banner_image = download_file(banner_url, Path(OBSIDIAN_VAULT_DIR / OBSIDIAN_RESOURCES_DIR))
if banner_image: if banner_image:
banner_markdown = f"![[{OBSIDIAN_RESOURCES_DIR}/{banner_image}]]" banner_markdown = f"![[{OBSIDIAN_RESOURCES_DIR}/{banner_image}]]"
except Exception as e: except Exception as e:
ERR(f"No image found in article") ERR(f"No image found in article")
authors = ', '.join('[[{}]]'.format(author) for author in parsed_content.get('authors', ['Unknown']))
frontmatter = f"""--- frontmatter = f"""---
title: {readable_title} title: {readable_title}
author: {parsed_content.get('author', 'Unknown')} authors: {', '.join('[[{}]]'.format(author) for author in parsed_content.get('authors', ['Unknown']))}
published: {parsed_content.get('date_published', 'Unknown')} published: {parsed_content.get('date_published', 'Unknown')}
added: {timestamp} added: {timestamp}
tags:
- {tags_list}
excerpt: {parsed_content.get('excerpt', '')} excerpt: {parsed_content.get('excerpt', '')}
banner: "{banner_markdown}" banner: "{banner_markdown}"
--- tags:
""" """
frontmatter += '\n'.join(f" - {tag}" for tag in parsed_content.get('tags', []))
frontmatter += '\n---\n'
body = f"# {readable_title}\n\n" body = f"# {readable_title}\n\n"
if tts_text: if tts_text:
@ -403,20 +398,15 @@ banner: "{banner_markdown}"
obsidian_link = f"![[{OBSIDIAN_RESOURCES_DIR}/{audio_filename}{audio_ext}]]" obsidian_link = f"![[{OBSIDIAN_RESOURCES_DIR}/{audio_filename}{audio_ext}]]"
body += f"{obsidian_link}\n\n" body += f"{obsidian_link}\n\n"
except Exception as e: except Exception as e:
ERR(f"Failed to generate TTS for article. {e}") ERR(f"Failed to generate TTS for np3k. {e}")
try: try:
authors = parsed_content.get('author', '') body += f"by {authors} in [{parsed_content.get('domain', urlparse(url).netloc.replace('www.', ''))}]({url}).\n\n"
authors_in_brackets = [f"[[{author.strip()}]]" for author in authors.split(",")]
authors_string = ", ".join(authors_in_brackets)
body += f"by {authors_string} in [{parsed_content.get('domain', urlparse(url).netloc.replace('www.', ''))}]({parsed_content.get('url', url)}).\n\n"
body += f"> [!summary]+\n" body += f"> [!summary]+\n"
body += f"> {summary}\n\n" body += f"> {summary}\n\n"
body += content body += parsed_content["content"]
markdown_content = frontmatter + body markdown_content = frontmatter + body
except Exception as e: except Exception as e:
ERR(f"Failed to combine elements of article markdown.") ERR(f"Failed to combine elements of article markdown.")
@ -438,31 +428,38 @@ banner: "{banner_markdown}"
def parse_article(url: str, source: Optional[str] = None): def parse_article(url: str, source: Optional[str] = None):
# Use trafilatura to download HTML content: source = source if source else trafilatura.fetch_url(url)
downloaded = source if source else fetch_url(url) traf = trafilatura.extract_metadata(filecontent=source, default_url=url)
# Pass the HTML content to newspaper3k: # Pass the HTML content to newspaper3k:
article = Article(url) np3k = Article(url)
article.set_html(downloaded) np3k.set_html(source)
article.parse() np3k.parse()
# Use trafilatura to extract content in Markdown INFO(f"Parsed {np3k.title}")
trafilatura_result = extract(downloaded, output_format="markdown", include_comments=False)
content = trafilatura_result if trafilatura_result else article.text
domain = urlparse(url).netloc.replace('www.', '')
INFO(f"Parsed {article.title}") title = np3k.title or traf.title
authors = np3k.authors or traf.author
authors = authors if isinstance(authors, List) else [authors]
date = np3k.publish_date or localize_dt(traf.date)
excerpt = np3k.meta_description or traf.description
content = trafilatura.extract(source, output_format="markdown", include_comments=False) or np3k.text
image = np3k.top_image or traf.image
domain = traf.sitename or urlparse(url).netloc.replace('www.', '').title()
tags = np3k.meta_keywords or traf.categories or traf.tags
tags = tags if isinstance(tags, List) else [tags]
return { return {
'title': article.title.replace(" ", " "), 'title': title.replace(" ", " "),
'author': ', '.join(article.authors) if article.authors else 'Unknown', 'authors': authors,
'date_published': article.publish_date.strftime("%b %d, %Y at %H:%M") if article.publish_date else None, 'date': date.strftime("%b %d, %Y at %H:%M"),
'excerpt': article.meta_description, 'excerpt': excerpt,
'content': content, 'content': content,
'lead_image_url': article.top_image, 'image': image,
'url': url, 'url': url,
'domain': domain, 'domain': domain,
'meta_keywords': article.meta_keywords 'tags': np3k.meta_keywords
} }

View file

@ -24,7 +24,7 @@ from sshtunnel import SSHTunnelForwarder
from fastapi import Depends, HTTPException, Request, UploadFile from fastapi import Depends, HTTPException, Request, UploadFile
from fastapi.security.api_key import APIKeyHeader from fastapi.security.api_key import APIKeyHeader
from sijapi import DEBUG, INFO, WARN, ERR, CRITICAL from sijapi import DEBUG, INFO, WARN, ERR, CRITICAL
from sijapi import DB, GLOBAL_API_KEY, DB, DB_HOST, DB_PORT, DB_USER, DB_PASS, TZ, YEAR_FMT, MONTH_FMT, DAY_FMT, DAY_SHORT_FMT, OBSIDIAN_VAULT_DIR from sijapi import DB, GLOBAL_API_KEY, DB, DB_HOST, DB_PORT, DB_USER, DB_PASS, TZ, YEAR_FMT, MONTH_FMT, DAY_FMT, DAY_SHORT_FMT, OBSIDIAN_VAULT_DIR, ALLOWED_FILENAME_CHARS, MAX_FILENAME_LENGTH
api_key_header = APIKeyHeader(name="Authorization") api_key_header = APIKeyHeader(name="Authorization")
@ -136,24 +136,112 @@ def get_extension(file):
raise e raise e
def sanitize_filename(text, max_length=255):
"""Sanitize a string to be used as a safe filename.""" def sanitize_filename(text, max_length=MAX_FILENAME_LENGTH):
"""Sanitize a string to be used as a safe filename while protecting the file extension."""
DEBUG(f"Filename before sanitization: {text}") DEBUG(f"Filename before sanitization: {text}")
# Replace multiple spaces with a single space and remove other whitespace # Replace multiple spaces with a single space and remove other whitespace
text = re.sub(r'\s+', ' ', text) text = re.sub(r'\s+', ' ', text)
# Remove any non-word characters except space, dot, and hyphen # Remove any non-word characters except space, dot, and hyphen
sanitized = re.sub(r'[^\w \.-]', '', text) sanitized = re.sub(ALLOWED_FILENAME_CHARS, '', text)
# Remove leading/trailing spaces # Remove leading/trailing spaces
sanitized = sanitized.strip() sanitized = sanitized.strip()
final_filename = sanitized[:max_length] # Split the filename into base name and extension
base_name, extension = os.path.splitext(sanitized)
# Calculate the maximum length for the base name
max_base_length = max_length - len(extension)
# Truncate the base name if necessary
if len(base_name) > max_base_length:
base_name = base_name[:max_base_length].rstrip()
# Recombine the base name and extension
final_filename = base_name + extension
# In case the extension itself is too long, truncate the entire filename
if len(final_filename) > max_length:
final_filename = final_filename[:max_length]
DEBUG(f"Filename after sanitization: {final_filename}") DEBUG(f"Filename after sanitization: {final_filename}")
return final_filename return final_filename
def check_file_name(file_name, max_length=255):
"""Check if the file name needs sanitization based on the criteria of the second sanitize_filename function."""
DEBUG(f"Checking filename: {file_name}")
needs_sanitization = False
# Check for length
if len(file_name) > max_length:
DEBUG(f"Filename exceeds maximum length of {max_length}")
needs_sanitization = True
# Check for non-word characters (except space, dot, and hyphen)
if re.search(ALLOWED_FILENAME_CHARS, file_name):
DEBUG("Filename contains non-word characters (except space, dot, and hyphen)")
needs_sanitization = True
# Check for multiple consecutive spaces
if re.search(r'\s{2,}', file_name):
DEBUG("Filename contains multiple consecutive spaces")
needs_sanitization = True
# Check for leading/trailing spaces
if file_name != file_name.strip():
DEBUG("Filename has leading or trailing spaces")
needs_sanitization = True
DEBUG(f"Filename {'needs' if needs_sanitization else 'does not need'} sanitization")
return needs_sanitization
def list_and_correct_impermissible_files(root_dir, rename: bool = False):
"""List and correct all files with impermissible names."""
impermissible_files = []
for dirpath, _, filenames in os.walk(root_dir):
for filename in filenames:
if check_file_name(filename):
file_path = Path(dirpath) / filename
impermissible_files.append(file_path)
print(f"Impermissible file found: {file_path}")
# Sanitize the file name
new_filename = sanitize_filename(filename)
new_file_path = Path(dirpath) / new_filename
# Ensure the new file name does not already exist
if new_file_path.exists():
counter = 1
base_name, ext = os.path.splitext(new_filename)
while new_file_path.exists():
new_filename = f"{base_name}_{counter}{ext}"
new_file_path = Path(dirpath) / new_filename
counter += 1
# Rename the file
if rename:
os.rename(file_path, new_file_path)
print(f"Renamed: {file_path} -> {new_file_path}")
return impermissible_files
def fix_nextcloud_filenames(dir_to_fix, rename: bool = False):
impermissible_files = list_and_correct_impermissible_files(dir_to_fix, rename)
if impermissible_files:
print("\nList of impermissible files found and corrected:")
for file in impermissible_files:
print(file)
else:
print("No impermissible files found.")
def bool_convert(value: str = Form(None)): def bool_convert(value: str = Form(None)):
return value.lower() in ["true", "1", "t", "y", "yes"] return value.lower() in ["true", "1", "t", "y", "yes"]