Auto-update: Mon Jun 24 16:28:41 PDT 2024

This commit is contained in:
sanj 2024-06-24 16:28:41 -07:00
parent 45fd56895c
commit 0936cf8808
4 changed files with 229 additions and 65 deletions

View file

@ -10,8 +10,8 @@ from pydantic import BaseModel
import traceback import traceback
import logging import logging
from .logs import Logger from .logs import Logger
from .purgenc import process_nc
# from sijapi.config.config import load_config # from sijapi.config.config import load_config
# cfg = load_config() # cfg = load_config()
@ -93,6 +93,7 @@ VISUALCROSSING_API_KEY = os.getenv("VISUALCROSSING_API_KEY")
### Obsidian & notes ### Obsidian & notes
OBSIDIAN_VAULT_DIR = Path(os.getenv("OBSIDIAN_BASE_DIR") or HOME_DIR / "Nextcloud" / "notes") OBSIDIAN_VAULT_DIR = Path(os.getenv("OBSIDIAN_BASE_DIR") or HOME_DIR / "Nextcloud" / "notes")
OBSIDIAN_JOURNAL_DIR = OBSIDIAN_VAULT_DIR / "journal" OBSIDIAN_JOURNAL_DIR = OBSIDIAN_VAULT_DIR / "journal"
process_nc(OBSIDIAN_JOURNAL_DIR, True)
OBSIDIAN_RESOURCES_DIR = "obsidian/resources" OBSIDIAN_RESOURCES_DIR = "obsidian/resources"
OBSIDIAN_BANNER_DIR = f"{OBSIDIAN_RESOURCES_DIR}/banners" OBSIDIAN_BANNER_DIR = f"{OBSIDIAN_RESOURCES_DIR}/banners"
os.makedirs(Path(OBSIDIAN_VAULT_DIR) / OBSIDIAN_BANNER_DIR, exist_ok=True) os.makedirs(Path(OBSIDIAN_VAULT_DIR) / OBSIDIAN_BANNER_DIR, exist_ok=True)

67
sijapi/purgenc.py Normal file
View file

@ -0,0 +1,67 @@
import os
import re
from pathlib import Path
# Set the maximum permissible file name length for NextCloud
MAX_FILENAME_LENGTH = 255
# Define impermissible characters for NextCloud
IMPERMISSIBLE_CHARS = r'[<>:"/\\|?*\n]'
def sanitize_file_name(file_name):
"""Sanitize the file name by replacing impermissible characters and truncating if necessary."""
# Replace impermissible characters with underscores
sanitized_name = re.sub(IMPERMISSIBLE_CHARS, '_', file_name)
# Truncate the file name if it exceeds the maximum length
if len(sanitized_name) > MAX_FILENAME_LENGTH:
ext = Path(sanitized_name).suffix
base_name = sanitized_name[:MAX_FILENAME_LENGTH - len(ext)]
sanitized_name = base_name + ext
return sanitized_name
def check_file_name(file_name):
"""Check if the file name is impermissibly long or contains impermissible characters."""
if len(file_name) > MAX_FILENAME_LENGTH:
return True
if re.search(IMPERMISSIBLE_CHARS, file_name):
return True
return False
def list_and_correct_impermissible_files(root_dir, rename: bool = False):
"""List and correct all files with impermissible names."""
impermissible_files = []
for dirpath, _, filenames in os.walk(root_dir):
for filename in filenames:
if check_file_name(filename):
file_path = Path(dirpath) / filename
impermissible_files.append(file_path)
print(f"Impermissible file found: {file_path}")
# Sanitize the file name
new_filename = sanitize_file_name(filename)
new_file_path = Path(dirpath) / new_filename
# Ensure the new file name does not already exist
if new_file_path.exists():
counter = 1
base_name, ext = os.path.splitext(new_filename)
while new_file_path.exists():
new_filename = f"{base_name}_{counter}{ext}"
new_file_path = Path(dirpath) / new_filename
counter += 1
# Rename the file
if rename == True:
os.rename(file_path, new_file_path)
print(f"Renamed: {file_path} -> {new_file_path}")
return impermissible_files
def process_nc(dir_to_fix, rename: bool = False):
impermissible_files = list_and_correct_impermissible_files(dir_to_fix, rename)
if impermissible_files:
print("\nList of impermissible files found and corrected:")
for file in impermissible_files:
print(file)
else:
print("No impermissible files found.")

View file

@ -140,6 +140,17 @@ async def clip_post(
markdown_filename = await process_article(background_tasks, url, title, encoding, source, tts, voice) markdown_filename = await process_article(background_tasks, url, title, encoding, source, tts, voice)
return {"message": "Clip saved successfully", "markdown_filename": markdown_filename} return {"message": "Clip saved successfully", "markdown_filename": markdown_filename}
@note.post("/archive")
async def archive_post(
background_tasks: BackgroundTasks,
file: UploadFile = None,
url: Optional[str] = Form(None),
source: Optional[str] = Form(None),
title: Optional[str] = Form(None),
encoding: str = Form('utf-8')
):
markdown_filename = await process_archive(background_tasks, url, title, encoding, source)
return {"message": "Clip saved successfully", "markdown_filename": markdown_filename}
@note.get("/clip") @note.get("/clip")
async def clip_get( async def clip_get(
@ -182,27 +193,68 @@ async def process_for_daily_note(file: Optional[UploadFile] = File(None), text:
absolute_path, relative_path = assemble_journal_path(now, subdir=subdir, filename=file.filename) absolute_path, relative_path = assemble_journal_path(now, subdir=subdir, filename=file.filename)
with open(absolute_path, 'wb') as f: with open(absolute_path, 'wb') as f:
f.write(file_content) f.write(file_content)
source_prefix = ""
if 'audio' in file_type: if 'audio' in file_type:
transcription = await asr.transcribe_audio(file_path=absolute_path, params=asr.TranscribeParams(model="small-en", language="en", threads=6)) transcription = await asr.transcribe_audio(file_path=absolute_path, params=asr.TranscribeParams(model="small-en", language="en", threads=6))
source_prefix = "voice note"
file_entry = f"![[{relative_path}]]" file_entry = f"![[{relative_path}]]"
elif 'image' in file_type: elif 'image' in file_type:
source_prefix = "image"
file_entry = f"![[{relative_path}]]" file_entry = f"![[{relative_path}]]"
else: else:
file_entry = f"[Source]({relative_path})" file_entry = f"[Source]({relative_path})"
if source:
source = f"{source_prefix} from {source}:"
else:
source = ""
text_entry = text if text else "" text_entry = text if text else ""
INFO(f"transcription: {transcription}\nfile_entry: {file_entry}\ntext_entry: {text_entry}") INFO(f"transcription: {transcription}\nfile_entry: {file_entry}\ntext_entry: {text_entry}")
return await add_to_daily_note(transcription, file_entry, text_entry, now, source) return await add_to_daily_note(transcription, file_entry, text_entry, now)
async def add_to_daily_note(transcription: str = None, file_link: str = None, additional_text: str = None, date_time: datetime = None):
date_time = date_time or datetime.now()
note_path, _ = assemble_journal_path(date_time, filename='Notes', extension=".md", no_timestamp = True)
time_str = date_time.strftime("%H:%M")
entry_lines = []
if additional_text and additional_text.strip():
entry_lines.append(f"\t* {additional_text.strip()}")
if transcription and transcription.strip():
entry_lines.append(f"\t* {transcription.strip()}")
if file_link and file_link.strip():
entry_lines.append(f"\t\t {file_link.strip()}")
entry = f"\n* **{time_str}**\n" + "\n".join(entry_lines)
# Write the entry to the end of the file
if note_path.exists():
with open(note_path, 'a', encoding='utf-8') as note_file:
note_file.write(entry)
else:
date_str = date_time.strftime("%Y-%m-%d")
frontmatter = f"""---
date: {date_str}
tags:
- notes
---
"""
content = frontmatter + entry
# If the file doesn't exist, create it and start with "Notes"
with open(note_path, 'w', encoding='utf-8') as note_file:
note_file.write(content)
return entry
async def handle_text(title:str, summary:str, extracted_text:str, date_time: datetime = None):
date_time = date_time if date_time else datetime.now()
absolute_path, relative_path = assemble_journal_path(date_time, filename=title, extension=".md", no_timestamp = True)
with open(absolute_path, "w") as file:
file.write(f"# {title}\n\n## Summary\n{summary}\n\n## Transcript\n{extracted_text}")
# add_to_daily_note(f"**Uploaded [[{title}]]**: *{summary}*", absolute_path)
return True
async def process_document( async def process_document(
background_tasks: BackgroundTasks, background_tasks: BackgroundTasks,
@ -293,13 +345,12 @@ async def process_article(
parsed_content = parse_article(url, source) parsed_content = parse_article(url, source)
if parsed_content is None: if parsed_content is None:
return {"error": "Failed to retrieve content"} return {"error": "Failed to retrieve content"}
readable_title = sanitize_filename(title if title else parsed_content.get("title", "Untitled"))
content = parsed_content["content"] content = parsed_content["content"]
markdown_filename, relative_path = assemble_journal_path(datetime.now(), "Articles", readable_title, extension=".md") readable_title = sanitize_filename(title if title else parsed_content.get("title", "Untitled"))
if not readable_title:
readable_title = timestamp
markdown_filename, relative_path = assemble_journal_path(datetime.now(), subdir="Articles", filename=readable_title, extension=".md")
try: try:
tags = parsed_content.get('meta_keywords', []) tags = parsed_content.get('meta_keywords', [])
@ -374,6 +425,7 @@ banner: "{banner_markdown}"
md_file.write(markdown_content) md_file.write(markdown_content)
INFO(f"Successfully saved to {markdown_filename}") INFO(f"Successfully saved to {markdown_filename}")
add_to_daily_note
return markdown_filename return markdown_filename
except Exception as e: except Exception as e:
@ -415,6 +467,66 @@ def parse_article(url: str, source: Optional[str] = None):
async def process_archive(
background_tasks: BackgroundTasks,
url: str,
title: Optional[str] = None,
encoding: str = 'utf-8',
source: Optional[str] = None,
):
timestamp = datetime.now().strftime('%b %d, %Y at %H:%M')
parsed_content = parse_article(url, source)
if parsed_content is None:
return {"error": "Failed to retrieve content"}
content = parsed_content["content"]
readable_title = sanitize_filename(title if title else parsed_content.get("title", "Untitled"))
if not readable_title:
readable_title = timestamp
markdown_path = OBSIDIAN_VAULT_DIR / "archive"
try:
frontmatter = f"""---
title: {readable_title}
author: {parsed_content.get('author', 'Unknown')}
published: {parsed_content.get('date_published', 'Unknown')}
added: {timestamp}
excerpt: {parsed_content.get('excerpt', '')}
---
"""
body = f"# {readable_title}\n\n"
try:
authors = parsed_content.get('author', '')
authors_in_brackets = [f"[[{author.strip()}]]" for author in authors.split(",")]
authors_string = ", ".join(authors_in_brackets)
body += f"by {authors_string} in [{parsed_content.get('domain', urlparse(url).netloc.replace('www.', ''))}]({parsed_content.get('url', url)}).\n\n"
body += content
markdown_content = frontmatter + body
except Exception as e:
ERR(f"Failed to combine elements of article markdown.")
try:
with open(markdown_path, 'w', encoding=encoding) as md_file:
md_file.write(markdown_content)
INFO(f"Successfully saved to {markdown_path}")
add_to_daily_note
return markdown_path
except Exception as e:
ERR(f"Failed to write markdown file")
raise HTTPException(status_code=500, detail=str(e))
except Exception as e:
ERR(f"Failed to clip {url}: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
def download_file(url, folder): def download_file(url, folder):
os.makedirs(folder, exist_ok=True) os.makedirs(folder, exist_ok=True)
filename = str(uuid.uuid4()) + os.path.splitext(urlparse(url).path)[-1] filename = str(uuid.uuid4()) + os.path.splitext(urlparse(url).path)[-1]
@ -463,50 +575,6 @@ async def save_file(file: UploadFile, folder: Path) -> Path:
async def add_to_daily_note(transcription: str, file_link: str, additional_text: str, date_time: datetime = None, source: str = None):
date_time = date_time or datetime.now()
note_path, _ = assemble_journal_path(date_time, filename='Notes', extension=".md", no_timestamp = True)
time_str = date_time.strftime("%H:%M")
entry_lines = []
if additional_text.strip():
entry_lines.append(f"\t* {additional_text.strip()}")
if transcription.strip():
entry_lines.append(f"\t* {transcription.strip()}")
if file_link.strip():
entry_lines.append(f"\t\t {file_link.strip()}")
entry = f"\n* **{time_str}**{source}\n" + "\n".join(entry_lines)
# Write the entry to the end of the file
if note_path.exists():
with open(note_path, 'a', encoding='utf-8') as note_file:
note_file.write(entry)
else:
date_str = date_time.strftime("%Y-%m-%d")
frontmatter = f"""---
date: {date_str}
tags:
- notes
---
"""
content = frontmatter + entry
# If the file doesn't exist, create it and start with "Notes"
with open(note_path, 'w', encoding='utf-8') as note_file:
note_file.write(content)
return entry
async def handle_text(title:str, summary:str, extracted_text:str, date_time: datetime = None):
date_time = date_time if date_time else datetime.now()
absolute_path, relative_path = assemble_journal_path(date_time, filename=title, extension=".md", no_timestamp = True)
with open(absolute_path, "w") as file:
file.write(f"# {title}\n\n## Summary\n{summary}\n\n## Transcript\n{extracted_text}")
# add_to_daily_note(f"**Uploaded [[{title}]]**: *{summary}*", absolute_path)
return True
### FRONTMATTER, BANNER ### FRONTMATTER, BANNER

View file

@ -62,16 +62,25 @@ def assemble_journal_path(date_time: datetime, subdir: str = None, filename: str
relative_path = relative_path / f"{day_short} {subdir}" relative_path = relative_path / f"{day_short} {subdir}"
if filename: if filename:
if no_timestamp: filename = sanitize_filename(filename)
filename = f"{day_short} {sanitize_filename(filename)}" filename = f"{day_short} {filename}" if no_timestamp else f"{day_short} {timestamp} {filename}"
else:
filename = f"{day_short} {timestamp} {sanitize_filename(filename)}"
if extension: if extension:
extension = extension if extension.startswith(".") else f".{extension}" extension = extension if extension.startswith(".") else f".{extension}"
filename = f"{filename}{extension}" if not filename.endswith(extension) else filename filename = f"{filename}{extension}" if not filename.endswith(extension) else filename
else:
if has_valid_extension(filename, [".md", ".m4a", ".wav", ".aiff", ".flac", ".mp3", ".mp4", ".pdf", ".js", ".json", ".yaml", ".py"]):
DEBUG(f"Provided filename has a valid extension, so we use that.")
else:
filename = f"{filename}.md"
DEBUG(f"We are forcing the file to be a .md")
relative_path = relative_path / filename relative_path = relative_path / filename
else:
DEBUG(f"This only happens, theoretically, when no filename nor subdirectory are provided, but an extension is. Which is kinda silly.")
return None, None
absolute_path = OBSIDIAN_VAULT_DIR / relative_path absolute_path = OBSIDIAN_VAULT_DIR / relative_path
@ -79,6 +88,16 @@ def assemble_journal_path(date_time: datetime, subdir: str = None, filename: str
return absolute_path, relative_path return absolute_path, relative_path
def has_valid_extension(filename, valid_extensions=None):
if valid_extensions is None:
# Check if there's any extension
return bool(os.path.splitext(filename)[1])
else:
# Check if the extension is in the list of valid extensions
return os.path.splitext(filename)[1].lower() in valid_extensions
def prefix_lines(text: str, prefix: str = '> ') -> str: def prefix_lines(text: str, prefix: str = '> ') -> str:
lines = text.split('\n') lines = text.split('\n')
prefixed_lines = [f"{prefix}{line.lstrip()}" for line in lines] prefixed_lines = [f"{prefix}{line.lstrip()}" for line in lines]
@ -117,15 +136,24 @@ def get_extension(file):
raise e raise e
def sanitize_filename(text, max_length=255): def sanitize_filename(text, max_length=255):
"""Sanitize a string to be used as a safe filename.""" """Sanitize a string to be used as a safe filename."""
DEBUG(f"Filename before sanitization: {text}") DEBUG(f"Filename before sanitization: {text}")
sanitized = re.sub(r'[^\w\s\.-]', '', text).strip()
# Replace multiple spaces with a single space and remove other whitespace
text = re.sub(r'\s+', ' ', text)
# Remove any non-word characters except space, dot, and hyphen
sanitized = re.sub(r'[^\w \.-]', '', text)
# Remove leading/trailing spaces
sanitized = sanitized.strip()
final_filename = sanitized[:max_length] final_filename = sanitized[:max_length]
DEBUG(f"Filename after sanitization: {final_filename}") DEBUG(f"Filename after sanitization: {final_filename}")
return final_filename return final_filename
def bool_convert(value: str = Form(None)): def bool_convert(value: str = Form(None)):
return value.lower() in ["true", "1", "t", "y", "yes"] return value.lower() in ["true", "1", "t", "y", "yes"]