Auto-update: Thu Aug 8 04:45:25 PDT 2024
This commit is contained in:
parent
961bdfc530
commit
72235f4272
2 changed files with 212 additions and 246 deletions
|
@ -4,28 +4,22 @@ Used to scrape, process, summarize, markdownify, and speechify news articles.
|
|||
# routers/news.py
|
||||
|
||||
import os
|
||||
import uuid
|
||||
import asyncio
|
||||
import shutil
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
from urllib.parse import urlparse
|
||||
from datetime import datetime as dt_datetime, timedelta
|
||||
from typing import Optional, List, Tuple
|
||||
import aiohttp
|
||||
import aiofiles
|
||||
import newspaper
|
||||
import trafilatura
|
||||
import newspaper
|
||||
from newspaper import Article
|
||||
from readability import Document
|
||||
import math
|
||||
from urllib.parse import urlparse
|
||||
from markdownify import markdownify as md
|
||||
from better_profanity import profanity
|
||||
from requests.adapters import HTTPAdapter
|
||||
from urllib3.util.retry import Retry
|
||||
from fastapi import APIRouter, BackgroundTasks, UploadFile, Form, HTTPException, Query, Path as FastAPIPath
|
||||
from pathlib import Path
|
||||
from sijapi import L, News, Archivist, OBSIDIAN_VAULT_DIR, OBSIDIAN_RESOURCES_DIR, DEFAULT_11L_VOICE, DEFAULT_VOICE
|
||||
from sijapi.utilities import html_to_markdown, sanitize_filename, assemble_journal_path, assemble_archive_path, contains_profanity, is_ad_or_tracker
|
||||
from sijapi.utilities import html_to_markdown, download_file, sanitize_filename, assemble_journal_path, assemble_archive_path, contains_profanity, is_ad_or_tracker
|
||||
from sijapi.routers import gis, llm, tts, note
|
||||
|
||||
news = APIRouter()
|
||||
|
@ -36,6 +30,126 @@ def warn(text: str): logger.warning(text)
|
|||
def err(text: str): logger.error(text)
|
||||
def crit(text: str): logger.critical(text)
|
||||
|
||||
|
||||
@news.post("/clip")
|
||||
async def clip_post(
|
||||
bg_tasks: BackgroundTasks,
|
||||
url: str = Form(...),
|
||||
title: Optional[str] = Form(None),
|
||||
tts: str = Form('summary'),
|
||||
voice: str = Form(DEFAULT_VOICE),
|
||||
):
|
||||
result = await process_and_save_article(bg_tasks, url, title, tts, voice)
|
||||
return {"message": "Clip saved successfully", "result": result}
|
||||
|
||||
@news.get("/clip")
|
||||
async def clip_get(
|
||||
bg_tasks: BackgroundTasks,
|
||||
url: str,
|
||||
voice: str = Query(DEFAULT_VOICE)
|
||||
):
|
||||
result = await process_and_save_article(bg_tasks, url, None, tts, voice)
|
||||
return {"message": "Clip saved successfully", "result": result}
|
||||
|
||||
@news.get("/news/refresh")
|
||||
async def news_refresh_endpoint(bg_tasks: BackgroundTasks):
|
||||
tasks = [process_news_site(site, bg_tasks) for site in News.sites]
|
||||
await asyncio.gather(*tasks)
|
||||
return "OK"
|
||||
|
||||
def is_article_within_date_range(article: Article, days_back: int) -> bool:
|
||||
earliest_date = dt_datetime.now().date() - timedelta(days=days_back)
|
||||
return article.publish_date.date() >= earliest_date
|
||||
|
||||
async def generate_summary(text: str) -> str:
|
||||
summary = await llm.summarize_text(text, "Summarize the provided text. Respond with the summary and nothing else.")
|
||||
return summary.replace('\n', ' ')
|
||||
|
||||
async def handle_tts(bg_tasks: BackgroundTasks, article: Article, title: str, tts_mode: str, voice: str, summary: str) -> Optional[str]:
|
||||
if tts_mode in ["full", "content"]:
|
||||
tts_text = article.text
|
||||
elif tts_mode in ["summary", "excerpt"]:
|
||||
tts_text = summary
|
||||
else:
|
||||
return None
|
||||
|
||||
audio_filename = f"{article.publish_date.strftime('%Y-%m-%d')} {title}"
|
||||
try:
|
||||
audio_path = await tts.generate_speech(
|
||||
bg_tasks=bg_tasks,
|
||||
text=tts_text,
|
||||
voice=voice,
|
||||
model="xtts",
|
||||
podcast=True,
|
||||
title=audio_filename,
|
||||
output_dir=Path(OBSIDIAN_VAULT_DIR) / OBSIDIAN_RESOURCES_DIR
|
||||
)
|
||||
return f"![[{Path(audio_path).name}]]"
|
||||
except HTTPException as e:
|
||||
err(f"Failed to generate TTS: {str(e)}")
|
||||
return None
|
||||
|
||||
|
||||
|
||||
def get_banner_markdown(image_url: str) -> str:
|
||||
if not image_url:
|
||||
return ''
|
||||
try:
|
||||
banner_image = download_file(image_url, Path(OBSIDIAN_VAULT_DIR) / OBSIDIAN_RESOURCES_DIR)
|
||||
return f"![[{OBSIDIAN_RESOURCES_DIR}/{banner_image}]]" if banner_image else ''
|
||||
except Exception as e:
|
||||
err(f"Failed to download banner image: {str(e)}")
|
||||
return ''
|
||||
|
||||
|
||||
async def save_markdown_file(filename: str, content: str):
|
||||
async with aiofiles.open(filename, 'w', encoding='utf-8') as f:
|
||||
await f.write(content)
|
||||
|
||||
|
||||
async def process_news_site(site, bg_tasks: BackgroundTasks):
|
||||
info(f"Downloading articles from {site.name}...")
|
||||
|
||||
earliest_date = dt_datetime.now().date() - timedelta(days=site.days_back)
|
||||
|
||||
try:
|
||||
news_source = newspaper.build(site.url, memoize_articles=False)
|
||||
|
||||
tasks = []
|
||||
for article in news_source.articles[:site.max_articles]:
|
||||
task = asyncio.create_task(download_and_save_article(
|
||||
article,
|
||||
site.name,
|
||||
earliest_date,
|
||||
bg_tasks,
|
||||
tts_mode=site.tts if hasattr(site, 'tts') else "off",
|
||||
voice=site.voice if hasattr(site, 'voice') else DEFAULT_11L_VOICE
|
||||
))
|
||||
tasks.append(task)
|
||||
|
||||
results = await asyncio.gather(*tasks)
|
||||
articles_downloaded = sum(results)
|
||||
|
||||
info(f"Downloaded {articles_downloaded} articles from {site.name}")
|
||||
except Exception as e:
|
||||
err(f"Error processing {site.name}: {str(e)}")
|
||||
|
||||
|
||||
async def download_and_save_article(article, site_name, earliest_date, bg_tasks: BackgroundTasks, tts_mode: str = "off", voice: str = DEFAULT_11L_VOICE):
|
||||
try:
|
||||
url = article.url
|
||||
parsed_article = await fetch_and_parse_article(url)
|
||||
|
||||
if not is_article_within_date_range(parsed_article, earliest_date):
|
||||
return False
|
||||
|
||||
return await process_and_save_article(bg_tasks, url, None, tts_mode, voice, site_name=site_name)
|
||||
|
||||
except Exception as e:
|
||||
err(f"Error processing article from {article.url}: {str(e)}")
|
||||
return False
|
||||
|
||||
|
||||
async def process_and_save_article(
|
||||
bg_tasks: BackgroundTasks,
|
||||
url: str,
|
||||
|
@ -93,252 +207,48 @@ async def fetch_and_parse_article(url: str) -> Article:
|
|||
|
||||
return article
|
||||
|
||||
def is_article_within_date_range(article: Article, days_back: int) -> bool:
|
||||
earliest_date = dt_datetime.now().date() - timedelta(days=days_back)
|
||||
return article.publish_date.date() >= earliest_date
|
||||
|
||||
async def generate_summary(text: str) -> str:
|
||||
summary = await llm.summarize_text(text, "Summarize the provided text. Respond with the summary and nothing else.")
|
||||
return summary.replace('\n', ' ')
|
||||
def generate_markdown_content(article, title: str, summary: str, audio_link: Optional[str], site_name: Optional[str] = None) -> str:
|
||||
def format_date(date):
|
||||
return date.strftime("%Y-%m-%d") if date else "Unknown Date"
|
||||
|
||||
async def handle_tts(bg_tasks: BackgroundTasks, article: Article, title: str, tts_mode: str, voice: str, summary: str) -> Optional[str]:
|
||||
if tts_mode in ["full", "content"]:
|
||||
tts_text = article.text
|
||||
elif tts_mode in ["summary", "excerpt"]:
|
||||
tts_text = summary
|
||||
else:
|
||||
return None
|
||||
def estimate_reading_time(text, words_per_minute=200):
|
||||
word_count = len(text.split())
|
||||
return math.ceil(word_count / words_per_minute)
|
||||
|
||||
audio_filename = f"{article.publish_date.strftime('%Y-%m-%d')} {title}"
|
||||
try:
|
||||
audio_path = await tts.generate_speech(
|
||||
bg_tasks=bg_tasks,
|
||||
text=tts_text,
|
||||
voice=voice,
|
||||
model="xtts",
|
||||
podcast=True,
|
||||
title=audio_filename,
|
||||
output_dir=Path(OBSIDIAN_VAULT_DIR) / OBSIDIAN_RESOURCES_DIR
|
||||
)
|
||||
return f"![[{Path(audio_path).name}]]"
|
||||
except HTTPException as e:
|
||||
err(f"Failed to generate TTS: {str(e)}")
|
||||
return None
|
||||
def format_tags(tags):
|
||||
return '\n'.join(f' - {tag}' for tag in (tags or []))
|
||||
|
||||
def get_banner_markdown(image_url):
|
||||
return image_url if image_url else ""
|
||||
|
||||
# Prepare metadata
|
||||
publish_date = format_date(article.publish_date)
|
||||
added_date = dt_datetime.now().strftime("%b %d, %Y at %H:%M")
|
||||
reading_time = estimate_reading_time(article.text)
|
||||
|
||||
def generate_markdown_content(article: Article, title: str, summary: str, audio_link: Optional[str], site_name: Optional[str] = None) -> str:
|
||||
frontmatter = f"""---
|
||||
title: {title}
|
||||
authors: {', '.join(f'[[{author}]]' for author in article.authors)}
|
||||
published: {article.publish_date}
|
||||
added: {dt_datetime.now().strftime('%b %d, %Y at %H:%M')}
|
||||
published: {publish_date}
|
||||
added: {added_date}
|
||||
banner: "{get_banner_markdown(article.top_image)}"
|
||||
url: {article.url}
|
||||
reading_minutes: {reading_time}
|
||||
tags:
|
||||
{chr(10).join(f' - {tag}' for tag in article.meta_keywords)}
|
||||
"""
|
||||
{format_tags(article.meta_keywords)}"""
|
||||
|
||||
if site_name:
|
||||
frontmatter += f"site: {site_name}\n"
|
||||
frontmatter += "---\n\n"
|
||||
frontmatter += f"\nsite: {site_name}"
|
||||
frontmatter += "\n---\n\n"
|
||||
|
||||
body = f"# {title}\n\n"
|
||||
if article.top_image:
|
||||
body += f"![{title}]({article.top_image})\n\n"
|
||||
if audio_link:
|
||||
body += f"{audio_link}\n\n"
|
||||
body += f"by {', '.join(article.authors)} in [{article.source_url}]({article.url})\n\n"
|
||||
body += f"> [!summary]+\n> {summary}\n\n"
|
||||
body += article.text
|
||||
|
||||
return frontmatter + body
|
||||
|
||||
|
||||
def get_banner_markdown(image_url: str) -> str:
|
||||
if not image_url:
|
||||
return ''
|
||||
try:
|
||||
banner_image = download_file(image_url, Path(OBSIDIAN_VAULT_DIR) / OBSIDIAN_RESOURCES_DIR)
|
||||
return f"![[{OBSIDIAN_RESOURCES_DIR}/{banner_image}]]" if banner_image else ''
|
||||
except Exception as e:
|
||||
err(f"Failed to download banner image: {str(e)}")
|
||||
return ''
|
||||
|
||||
async def save_markdown_file(filename: str, content: str):
|
||||
async with aiofiles.open(filename, 'w', encoding='utf-8') as f:
|
||||
await f.write(content)
|
||||
|
||||
|
||||
async def download_and_save_article(article, site_name, earliest_date, bg_tasks: BackgroundTasks, tts_mode: str = "off", voice: str = DEFAULT_11L_VOICE):
|
||||
try:
|
||||
url = article.url
|
||||
parsed_article = await fetch_and_parse_article(url)
|
||||
|
||||
if not is_article_within_date_range(parsed_article, earliest_date):
|
||||
return False
|
||||
|
||||
return await process_and_save_article(bg_tasks, url, None, tts_mode, voice, site_name=site_name)
|
||||
|
||||
except Exception as e:
|
||||
err(f"Error processing article from {article.url}: {str(e)}")
|
||||
return False
|
||||
|
||||
|
||||
async def process_news_site(site, bg_tasks: BackgroundTasks):
|
||||
info(f"Downloading articles from {site.name}...")
|
||||
|
||||
earliest_date = dt_datetime.now().date() - timedelta(days=site.days_back)
|
||||
|
||||
try:
|
||||
news_source = newspaper.build(site.url, memoize_articles=False)
|
||||
|
||||
tasks = []
|
||||
for article in news_source.articles[:site.max_articles]:
|
||||
task = asyncio.create_task(download_and_save_article(
|
||||
article,
|
||||
site.name,
|
||||
earliest_date,
|
||||
bg_tasks,
|
||||
tts_mode=site.tts if hasattr(site, 'tts') else "off",
|
||||
voice=site.voice if hasattr(site, 'voice') else DEFAULT_11L_VOICE
|
||||
))
|
||||
tasks.append(task)
|
||||
|
||||
results = await asyncio.gather(*tasks)
|
||||
articles_downloaded = sum(results)
|
||||
|
||||
info(f"Downloaded {articles_downloaded} articles from {site.name}")
|
||||
except Exception as e:
|
||||
err(f"Error processing {site.name}: {str(e)}")
|
||||
|
||||
|
||||
@news.get("/news/refresh")
|
||||
async def news_refresh_endpoint(bg_tasks: BackgroundTasks):
|
||||
tasks = [process_news_site(site, bg_tasks) for site in News.sites]
|
||||
await asyncio.gather(*tasks)
|
||||
return "OK"
|
||||
|
||||
|
||||
async def generate_path(article, site_name):
|
||||
publish_date = await gis.dt(article.publish_date, 'UTC') if article.publish_date else await gis.dt(dt_datetime.now(), 'UTC')
|
||||
title_slug = "".join(c if c.isalnum() else "_" for c in article.title)
|
||||
filename = f"{site_name} - {title_slug[:50]}.md"
|
||||
absolute_path, relative_path = assemble_journal_path(publish_date, 'Articles', filename, extension='.md', no_timestamp=True)
|
||||
return absolute_path, relative_path
|
||||
|
||||
|
||||
async def save_article_to_file(content, output_path):
|
||||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
async with aiofiles.open(output_path, 'w', encoding='utf-8') as file:
|
||||
await file.write(content)
|
||||
|
||||
|
||||
@news.post("/clip")
|
||||
async def clip_post(
|
||||
bg_tasks: BackgroundTasks,
|
||||
url: str = Form(...),
|
||||
title: Optional[str] = Form(None),
|
||||
tts: str = Form('summary'),
|
||||
voice: str = Form(DEFAULT_VOICE),
|
||||
):
|
||||
result = await process_and_save_article(bg_tasks, url, title, tts, voice)
|
||||
return {"message": "Clip saved successfully", "result": result}
|
||||
|
||||
@news.get("/clip")
|
||||
async def clip_get(
|
||||
bg_tasks: BackgroundTasks,
|
||||
url: str,
|
||||
tts: str = Query('summary'),
|
||||
voice: str = Query(DEFAULT_VOICE)
|
||||
):
|
||||
result = await process_and_save_article(bg_tasks, url, None, tts, voice)
|
||||
return {"message": "Clip saved successfully", "result": result}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
async def parse_article(url: str, source: Optional[str] = None) -> Article:
|
||||
source = source if source else trafilatura.fetch_url(url)
|
||||
traf = trafilatura.extract_metadata(filecontent=source, default_url=url)
|
||||
|
||||
# Create and parse the newspaper3k Article
|
||||
article = Article(url)
|
||||
article.set_html(source)
|
||||
article.parse()
|
||||
|
||||
info(f"Parsed {article.title}")
|
||||
|
||||
# Update or set properties based on trafilatura and additional processing
|
||||
article.title = article.title or traf.title or url
|
||||
article.authors = article.authors or (traf.author if isinstance(traf.author, list) else [traf.author])
|
||||
|
||||
article.publish_date = article.publish_date or traf.date
|
||||
try:
|
||||
article.publish_date = await gis.dt(article.publish_date, "UTC")
|
||||
except:
|
||||
debug(f"Failed to localize {article.publish_date}")
|
||||
article.publish_date = await gis.dt(dt_datetime.now(), "UTC")
|
||||
|
||||
article.meta_description = article.meta_description or traf.description
|
||||
article.text = trafilatura.extract(source, output_format="markdown", include_comments=False) or article.text
|
||||
article.top_image = article.top_image or traf.image
|
||||
article.source_url = traf.sitename or urlparse(url).netloc.replace('www.', '').title()
|
||||
article.meta_keywords = article.meta_keywords or traf.categories or traf.tags
|
||||
article.meta_keywords = article.meta_keywords if isinstance(article.meta_keywords, list) else [article.meta_keywords]
|
||||
|
||||
# Set additional data in the additional_data dictionary
|
||||
article.additional_data = {
|
||||
'excerpt': article.meta_description,
|
||||
'domain': article.source_url,
|
||||
'tags': article.meta_keywords,
|
||||
'content': article.text # Store the markdown content here
|
||||
}
|
||||
|
||||
return article
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def download_file(url, folder):
|
||||
os.makedirs(folder, exist_ok=True)
|
||||
filename = str(uuid.uuid4()) + os.path.splitext(urlparse(url).path)[-1]
|
||||
filepath = os.path.join(folder, filename)
|
||||
|
||||
session = requests.Session()
|
||||
retries = Retry(total=5, backoff_factor=1, status_forcelist=[502, 503, 504])
|
||||
session.mount('http://', HTTPAdapter(max_retries=retries))
|
||||
session.mount('https://', HTTPAdapter(max_retries=retries))
|
||||
|
||||
headers = {
|
||||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'
|
||||
}
|
||||
|
||||
try:
|
||||
response = session.get(url, headers=headers, timeout=10)
|
||||
if response.status_code == 200:
|
||||
if 'image' in response.headers.get('Content-Type', ''):
|
||||
with open(filepath, 'wb') as f:
|
||||
f.write(response.content)
|
||||
else:
|
||||
err(f"Failed to download image: {url}, invalid content type: {response.headers.get('Content-Type')}")
|
||||
return None
|
||||
else:
|
||||
err(f"Failed to download image: {url}, status code: {response.status_code}")
|
||||
return None
|
||||
except Exception as e:
|
||||
err(f"Failed to download image: {url}, error: {str(e)}")
|
||||
return None
|
||||
return filename
|
||||
|
||||
|
||||
def copy_file(local_path, folder):
|
||||
os.makedirs(folder, exist_ok=True)
|
||||
filename = os.path.basename(local_path)
|
||||
destination_path = os.path.join(folder, filename)
|
||||
shutil.copy(local_path, destination_path)
|
||||
return filename
|
||||
|
||||
|
||||
async def save_file(file: UploadFile, folder: Path) -> Path:
|
||||
file_path = folder / f"{dt_datetime.now().strftime('%Y%m%d_%H%M%S')}_{file.filename}"
|
||||
with open(file_path, 'wb') as f:
|
||||
shutil.copyfileobj(file.file, f)
|
||||
return file_path
|
||||
return frontmatter + body
|
|
@ -11,13 +11,21 @@ import paramiko
|
|||
from dateutil import parser
|
||||
from pathlib import Path
|
||||
import filetype
|
||||
import shutil
|
||||
import uuid
|
||||
import hashlib
|
||||
import requests
|
||||
from requests.adapters import HTTPAdapter
|
||||
from urllib3.util.retry import Retry
|
||||
from urllib.parse import urlparse
|
||||
from PyPDF2 import PdfReader
|
||||
from better_profanity import profanity
|
||||
from adblockparser import AdblockRules
|
||||
from pdfminer.high_level import extract_text as pdfminer_extract_text
|
||||
import pytesseract
|
||||
from readability import Document
|
||||
from pdf2image import convert_from_path
|
||||
from datetime import datetime, date, time
|
||||
from datetime import datetime as dt_datetime, date, time
|
||||
from typing import Optional, Union, Tuple, List
|
||||
import asyncio
|
||||
from PIL import Image
|
||||
|
@ -70,7 +78,8 @@ def validate_api_key(request: Request, api_key: str = Depends(api_key_header)):
|
|||
raise HTTPException(status_code=401, detail="Invalid or missing API key")
|
||||
|
||||
|
||||
def assemble_archive_path(filename: str, extension: str = None, date_time: datetime = datetime.now(), subdir: str = None) -> Tuple[Path, Path]:
|
||||
def assemble_archive_path(filename: str, extension: str = None, date_time: dt_datetime = None, subdir: str = None) -> Tuple[Path, Path]:
|
||||
date_time = date_time or dt_datetime.now()
|
||||
year = date_time.strftime(YEAR_FMT)
|
||||
month = date_time.strftime(MONTH_FMT)
|
||||
day = date_time.strftime(DAY_FMT)
|
||||
|
@ -122,7 +131,7 @@ def assemble_archive_path(filename: str, extension: str = None, date_time: datet
|
|||
|
||||
|
||||
|
||||
def assemble_journal_path(date_time: datetime, subdir: str = None, filename: str = None, extension: str = None, no_timestamp: bool = False) -> Tuple[Path, Path]:
|
||||
def assemble_journal_path(date_time: dt_datetime, subdir: str = None, filename: str = None, extension: str = None, no_timestamp: bool = False) -> Tuple[Path, Path]:
|
||||
'''
|
||||
Obsidian helper. Takes a datetime and optional subdirectory name, filename, and extension.
|
||||
If an extension is provided, it ensures the path is to a file with that extension.
|
||||
|
@ -300,7 +309,7 @@ def str_to_bool(value: str) -> bool:
|
|||
"""
|
||||
|
||||
def get_timestamp():
|
||||
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
return dt_datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
|
||||
|
||||
async def extract_text(file_path: str) -> str:
|
||||
|
@ -476,10 +485,10 @@ HOURLY_COLUMNS_MAPPING = {
|
|||
def convert_to_12_hour_format(datetime_obj_or_str):
|
||||
if isinstance(datetime_obj_or_str, str):
|
||||
try:
|
||||
datetime_obj = datetime.strptime(datetime_obj_or_str, "%Y-%m-%d %H:%M:%S")
|
||||
datetime_obj = dt_datetime.strptime(datetime_obj_or_str, "%Y-%m-%d %H:%M:%S")
|
||||
except ValueError:
|
||||
try:
|
||||
datetime_obj = datetime.strptime(datetime_obj_or_str, "%H:%M:%S")
|
||||
datetime_obj = dt_datetime.strptime(datetime_obj_or_str, "%H:%M:%S")
|
||||
except ValueError:
|
||||
return "Invalid datetime string format"
|
||||
elif isinstance(datetime_obj_or_str, time):
|
||||
|
@ -522,6 +531,53 @@ def resize_and_convert_image(image_path, max_size=2160, quality=80):
|
|||
return img_byte_arr
|
||||
|
||||
|
||||
def download_file(url, folder):
|
||||
os.makedirs(folder, exist_ok=True)
|
||||
filename = str(uuid.uuid4()) + os.path.splitext(urlparse(url).path)[-1]
|
||||
filepath = os.path.join(folder, filename)
|
||||
|
||||
session = requests.Session()
|
||||
retries = Retry(total=5, backoff_factor=1, status_forcelist=[502, 503, 504])
|
||||
session.mount('http://', HTTPAdapter(max_retries=retries))
|
||||
session.mount('https://', HTTPAdapter(max_retries=retries))
|
||||
|
||||
headers = {
|
||||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'
|
||||
}
|
||||
|
||||
try:
|
||||
response = session.get(url, headers=headers, timeout=10)
|
||||
if response.status_code == 200:
|
||||
if 'image' in response.headers.get('Content-Type', ''):
|
||||
with open(filepath, 'wb') as f:
|
||||
f.write(response.content)
|
||||
else:
|
||||
err(f"Failed to download image: {url}, invalid content type: {response.headers.get('Content-Type')}")
|
||||
return None
|
||||
else:
|
||||
err(f"Failed to download image: {url}, status code: {response.status_code}")
|
||||
return None
|
||||
except Exception as e:
|
||||
err(f"Failed to download image: {url}, error: {str(e)}")
|
||||
return None
|
||||
return filename
|
||||
|
||||
|
||||
def copy_file(local_path, folder):
|
||||
os.makedirs(folder, exist_ok=True)
|
||||
filename = os.path.basename(local_path)
|
||||
destination_path = os.path.join(folder, filename)
|
||||
shutil.copy(local_path, destination_path)
|
||||
return filename
|
||||
|
||||
|
||||
async def save_file(file: UploadFile, folder: Path) -> Path:
|
||||
file_path = folder / f"{dt_datetime.now().strftime('%Y%m%d_%H%M%S')}_{file.filename}"
|
||||
with open(file_path, 'wb') as f:
|
||||
shutil.copyfileobj(file.file, f)
|
||||
return file_path
|
||||
|
||||
|
||||
def index_to_braille(v1a, v1b, v2a, v2b, v3a, v3b):
|
||||
return (v1a * 1 + v1b * 8 + v2a * 2 + v2b * 16 + v3a * 4 + v3b * 32)
|
||||
|
||||
|
|
Loading…
Reference in a new issue