Major update - database sync methods finally work reliably

This commit is contained in:
sanj 2024-08-02 11:15:52 -07:00
parent b3d8f0113b
commit cf31f7b8df
24 changed files with 810 additions and 77 deletions

View file

@ -44,4 +44,4 @@ IN DRAFT
IN DRAFT
## Extras
[Apple Shortcut for companion Pythonista script](https://www.icloud.com/shortcuts/d63b179f9c664b0cbbacb1a607767ef7)
[Apple Shortcut for location tracking Pythonista script](https://www.icloud.com/shortcuts/d63b179f9c664b0cbbacb1a607767ef7)

View file

@ -51,4 +51,4 @@ urllib3
anyio
semaphore
location
SRTM.py
SRTM.py

View file

@ -0,0 +1,10 @@
default: xtts
email: xtts
webclip: 11L
rss: xtts
podcast_dir: '{{ DIR.HOME }}/Library/Mobile Documents/iCloud~co~supertop~castro/Documents/Sideloads'
xtts:
voice: joanne
elevenlabs:
voice: luna
api_key: '{{ SECRET.ELEVENLABS_API_KEY }}'

View file

@ -0,0 +1,28 @@
-----BEGIN PRIVATE KEY-----
MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCikW67UW0RpncJ
h4Ha9HumZ/WzgEZWRWkgksVJIOJ8t1PftctizLUlz+xMWNl+Volp4crxnPpnczis
pOXU4g65XoFHHpF9nhF/3YDgxo5BDEM/mIIKEO9LFkIBQVBdE85qXnIVot5MfuNj
HeyEs7doRMBxilOSR/DkT8bTWu7m5yeHlF58iYVOxxssGhP3bo7CcAcaZD1LJBnP
Df+UBqzWQ9as903p5bFixHy2kVz8Qkd5k5tyIQ/tXqlhRfLLHG4AYHmBbS06CAg0
nEpKUeQx4l1J/ykAjQTwhHf70xv1z0p28mHcr5ib4UvpYK9fMM6FKWenwlqA3qrK
zQUJQ7E/AgMBAAECggEAQ5H/XIxzsSpnv+Y66y9DVd9QGNPwaFthXtCif8rTWNM6
YXnGl8JOaPELXpBvljuR0hivqc19pxIVNG01uk5boGDPiygBgRz6WRNQRh1Bc3gN
W5mgM17ml2cg+DSVmppo6X1oHeYcT99N1BzT+jRYv1YURx0fr2WHkt413hOlyQMR
b8ir/TOBx3olg4KBPDuysRC5BCIr3Mkz4jsh+9wVIOReKVezsy7nxJVzipcxOyZO
9VGgvlw4XLrRTOJEv4e3ldcg219j5KEGsJ4FFSziSmpj5fN4Vt+JmY7nueSHyL6P
3hX52lRfOcTXTEeiEV2cXkm3h8uQ3zfiZRYi3P0DQQKBgQDXGBZc3WnfXim0u1EV
JzZFwxBS7SHkyGgnd50ak6e9yDbdxOuYIOo4mBlc3ofd20EfT4TvR7Xyw+PD2fWJ
+isdwCEb9JZZ1H6RDGIzSDYXGNeGId4kMKBZdmKpEeLgStihsrYp/nxtwcE/8A7N
jCEKZj1ld7QfbQlGT/NJ4Jj80wKBgQDBfBpth6vMyCETKMJVqYd2qhVnJKiFLfRn
OD/Ck6xwUuedbfe9M34wNO3Pn2Xvu1xVsQGb2dmlT345Iq9Z1nbZCGXyY9yfLnTV
fz7F2utjUjaJtuiSb52SgX7MWZ8E4nbqqKnC4SYSIlaeuL9KK3r/x6bcNLAYPcdk
qKHexDkGZQKBgF0JGyshzhiChzGYUBMBOfVk0Ru9XAq0MHDZyQdk1Io/HpRAB9Nu
cUD3cQj9a/EnU/yyDYLeFrIhztO44/7BSYL9xpRr79h9FB2zKIqb8mF9KkPnREmN
Ct6HWVdd2C9B0H/oZ+i0HafvxaHdONnpgaUY4feQlkV9iSRzknzi++lnAoGAXOuu
/X80oMpUKBFhEyaxqemREdHnJN6nC5NV+6pUHDWUimSvn6vFJH2m4BlbKUC/3V9+
uExtXBjLM8FWmTyIIz8HRttyrvfuoEHV8ctrVG29R3ISS5FTCXMrZBR+bCgemB+c
N71NPVREaUGsjIBJN+G4XvTmxR2WTt81rfhqsokCgYEA1It9e9Ut2Krzf2FaPGLG
ozlKhWadMNntthg3uxze80Rx8WSvgJQdbVpdbni2B/9xdYBIIljW/LGivYBrCSSp
aXFpXL7ZGkvl3b0MkojfghIpXVGqu+8ISDtFgL0B1gZ5hq9xMBl94fLVfQgC9Cy6
uvDHlz+fjWaWKYUPiouAtVs=
-----END PRIVATE KEY-----

View file

@ -0,0 +1,19 @@
-----BEGIN CERTIFICATE-----
MIIDAzCCAeugAwIBAgIUc+EtilZslnS7N6MAx0u9HeP83wAwDQYJKoZIhvcNAQEL
BQAwETEPMA0GA1UEAwwGcHl0aG9uMB4XDTI0MDYwODEyNTcxM1oXDTI1MDYwODEy
NTcxM1owETEPMA0GA1UEAwwGcHl0aG9uMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8A
MIIBCgKCAQEAopFuu1FtEaZ3CYeB2vR7pmf1s4BGVkVpIJLFSSDifLdT37XLYsy1
Jc/sTFjZflaJaeHK8Zz6Z3M4rKTl1OIOuV6BRx6RfZ4Rf92A4MaOQQxDP5iCChDv
SxZCAUFQXRPOal5yFaLeTH7jYx3shLO3aETAcYpTkkfw5E/G01ru5ucnh5RefImF
TscbLBoT926OwnAHGmQ9SyQZzw3/lAas1kPWrPdN6eWxYsR8tpFc/EJHeZObciEP
7V6pYUXyyxxuAGB5gW0tOggINJxKSlHkMeJdSf8pAI0E8IR3+9Mb9c9KdvJh3K+Y
m+FL6WCvXzDOhSlnp8JagN6qys0FCUOxPwIDAQABo1MwUTAdBgNVHQ4EFgQUS74L
HD4Cdzh1ajatbvSHNQXIVvAwHwYDVR0jBBgwFoAUS74LHD4Cdzh1ajatbvSHNQXI
VvAwDwYDVR0TAQH/BAUwAwEB/zANBgkqhkiG9w0BAQsFAAOCAQEAhpwtVubDjsyq
/LiTwpXKhjB/eFb6Yse782Iq+9rsiGGhsN88IA25fKgsJ2AIkR/KA7QSle3ds+1Q
EY9/vqpWnfBdpvOi7oV7ozBe+t/5JLu1GQBzg+cVa4iLAWYCiqg1d5NDdIcYMfsM
Yq2a3eQoP8Xbj3fFMXdNopXARa1d1zHB3ugXIJYinwMlS0EoGXVQVaHhemOh8GwW
keRaA6TDTBFsp0Gl4jv/NrisAt4qg+rlqr0mNcQK92vRX65mDWa/cQKwpUH8+Seq
Jl717NnsIGcqYWg8SSvVlkbFfxYhwYICXT824MAdSZtpHNCN/TegxsviYnlDyJKj
OJzn4fCxnQ==
-----END CERTIFICATE-----

View file

@ -0,0 +1 @@
{"token_type": "Bearer", "scope": "Calendars.Read Calendars.ReadWrite User.Read profile openid email", "expires_in": 3962, "ext_expires_in": 3962, "access_token": "eyJ0eXAiOiJKV1QiLCJub25jZSI6IldDeU91YXllN1RFX2FPM0F1alhlYmtvYTdVRHpUR1dVNWt5d3lJeDZ1MGciLCJhbGciOiJSUzI1NiIsIng1dCI6InE3UDFOdnh1R1F3RE4yVGFpTW92alo4YVp3cyIsImtpZCI6InE3UDFOdnh1R1F3RE4yVGFpTW92alo4YVp3cyJ9.eyJhdWQiOiIwMDAwMDAwMy0wMDAwLTAwMDAtYzAwMC0wMDAwMDAwMDAwMDAiLCJpc3MiOiJodHRwczovL3N0cy53aW5kb3dzLm5ldC9iYWQ3ODA0OC1hNmUwLTQ3YjEtYTI0Yi00MDNjNDQ0YWEzNDkvIiwiaWF0IjoxNzE4Mzc0NzA5LCJuYmYiOjE3MTgzNzQ3MDksImV4cCI6MTcxODM3ODk3MiwiYWNjdCI6MCwiYWNyIjoiMSIsImFpbyI6IkFWUUFxLzhYQUFBQVRnWHJ0Q1pCVjlPa1M2WldldHVVSHNMSFN0LzErYVcxT1BSSjVOWjJEL1Bzd05mY1Fxb0JTNEFZRmhLR3UvaE5TNnNWOGtLQUpmcDNNTzdqRUlNMEZrY1VaZ0IyREh4cWdOK3lUQVBUYnRVPSIsImFtciI6WyJwd2QiLCJtZmEiXSwiYXBwX2Rpc3BsYXluYW1lIjoicHl0aG9uIiwiYXBwaWQiOiJjZThjYmQyNC1mMTQ2LTRkYzctOGVlNy01MWQ5YjY5ZGVjNTkiLCJhcHBpZGFjciI6IjEiLCJmYW1pbHlfbmFtZSI6IkluY2UtSm9oYW5uc2VuIiwiZ2l2ZW5fbmFtZSI6IlNhbmd5ZSIsImlkdHlwIjoidXNlciIsImlwYWRkciI6IjY4LjIzNS40NC4yMDIiLCJuYW1lIjoiU2FuZ3llIEluY2UtSm9oYW5uc2VuIiwib2lkIjoiMWNiMWQwNDAtZmM1OS00MjMxLTllMDUtOWRjNGI0MzJjY2MxIiwicGxhdGYiOiI1IiwicHVpZCI6IjEwMDMyMDAyQTNGQjU4RjIiLCJyaCI6IjAuQVgwQVNJRFh1dUNtc1VlaVMwQThSRXFqU1FNQUFBQUFBQUFBd0FBQUFBQUFBQUMxQUk4LiIsInNjcCI6IkNhbGVuZGFycy5SZWFkIENhbGVuZGFycy5SZWFkV3JpdGUgVXNlci5SZWFkIHByb2ZpbGUgb3BlbmlkIGVtYWlsIiwic2lnbmluX3N0YXRlIjpbImttc2kiXSwic3ViIjoiV0FYVFdIR0puVFhBTjlncmIyamlEU3U4ZENOMmc0dDFacERiVHlwM1k3USIsInRlbmFudF9yZWdpb25fc2NvcGUiOiJOQSIsInRpZCI6ImJhZDc4MDQ4LWE2ZTAtNDdiMS1hMjRiLTQwM2M0NDRhYTM0OSIsInVuaXF1ZV9uYW1lIjoic2FuZ3llaWpAd2VzdGVybmxhdy5vcmciLCJ1cG4iOiJzYW5neWVpakB3ZXN0ZXJubGF3Lm9yZyIsInV0aSI6InFHcVlEODRzaDBHMFBfSEdldlVXQUEiLCJ2ZXIiOiIxLjAiLCJ3aWRzIjpbImI3OWZiZjRkLTNlZjktNDY4OS04MTQzLTc2YjE5NGU4NTUwOSJdLCJ4bXNfaWRyZWwiOiIxIDIiLCJ4bXNfc3QiOnsic3ViIjoieXhjdzFhV1FiM2VrX0FvNFRuRy11SDN6ZndGbVRRUmMxVGpFaEdqZ2p2WSJ9LCJ4bXNfdGNkdCI6MTY0ODY4MTc1Mn0.ssgIrbYo1SPNusoB9bNIB7pLxCmwBKhox__KOnwRRtnE63vbfGWAl53ww1KpNWPdDfC3p94yuPybTRqjZnTPluv1oJgGINml4AleUnZJnJttRsFHvGflzKOLtXnzmhQGUBXxu7QucKTCMH4J36neeQAWthITMwCHbaGmSy0RLotaIsoEHIufxR9ZEYD4XP5e3sFX54eSnyf4P3GgHHC1y5xxWUlemG4G1BRas8i7oX9o-gqRFube6BMtCLir_HMTNPfrCG-lhd9msLhc6e_WJSmLMHQ7RVLo-GlTMY9UouE190GzBBVKUrTg462I3kP_GayO1kt6qopBrwnF6bDUsw", "refresh_token": "0.AX0ASIDXuuCmsUeiS0A8REqjSSS9jM5G8cdNjudR2bad7Fm1AI8.AgABAwEAAAApTwJmzXqdR4BN2miheQMYAgDs_wUA9P_rTWFRiXWkWxvihyyXonsZPLrulRvnKKRlZ9PxKUltEOQsxjlg86xvCYzAS6dYeDBQiQxRAS_WEuuXVmTqUWDVwqgwQOa3BCbLwxQhPwfG-O9uFY6D239Jo8rdXTrf8XOntGs6fCn3wuo5kvJr2D-FGRA_EepltvRxZgrWdHROKuqoL_ArjLDdoFP7zM95MKhVYTmCO7LCM7u6O9ItU4_6y2_lH864zUivT1LFG8-h9sx0Ln3wd8LBP3P5GSeXwtQlkbNpj1FNDl_Ex5SwGCTM7uDHj0dn5CdUMgLkOcAC__HJdzmlEryTquoXcjd1RAmkq1MqAGD7QQreI7NQTZXwTcjoMwiBg92-bk-_o2ajeIVqzgOVBQIu1W8gkN2F7PAqRc5lGB-2mAXchqKMoL31CLUPxgTMBjWgR4waAjfZXT4h2WqXAAdGFy2nzUJAjyEQa9ZW1J5B6asCf3cVJQwI6nWIN7OphrXkGHl0ffpfrC-skVG3N2vrelAutRvyvWi4bbMqAZNglRrkTn5G_kULmnyydZBcFSc5uPmKD7OkfBD5UpTa_KLTjYexWRVsBfG9czIVxOh3ojnnza9BjrN5cHwHhzPM1t67E5iqronvT2OR_r-4BerUfRNHXrxwrLvDUEZwQ8o5IRs2N5FH0y_QN049o_NTgqytCj6wrIB4T-ZBUK2AsFej7ipdHAMYtWLZdoAo1o4nMuPBb4syN0VYd1sLUP-RQ5iv7wIkMWmNjhjIErIktZ134pGK9TlWa904H6HUin0qNTXyTmX2feE0nBlm6xJbO1ISfFkaf8aEjcAMfeu9qiArKQqUgvY", "expires_at": 1718378971}

View file

@ -2,6 +2,7 @@
Uses whisper_cpp to create an OpenAI-compatible Whisper web service.
'''
# routers/asr.py
import os
import uuid
import json

View file

@ -3,6 +3,8 @@ Calendar module using macOS Calendars and/or Microsoft 365 via its Graph API.
Depends on:
LOGGER, ICAL_TOGGLE, ICALENDARS, MS365_TOGGLE, MS365_CLIENT_ID, MS365_SECRET, MS365_AUTHORITY_URL, MS365_SCOPE, MS365_REDIRECT_PATH, MS365_TOKEN_PATH
'''
#routers/cal.py
from fastapi import APIRouter, Depends, HTTPException, status, Request
from fastapi.responses import RedirectResponse, JSONResponse
from fastapi.security import OAuth2PasswordBearer

View file

@ -1,6 +1,8 @@
'''
IN DEVELOPMENT - Cloudflare + Caddy module. Based on a bash script that's able to rapidly deploy new Cloudflare subdomains on new Caddy reverse proxy configurations, managing everything including restarting Caddy. The Python version needs more testing before actual use.
'''
#routers/cf.py
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from fastapi.responses import PlainTextResponse, JSONResponse

View file

@ -1,6 +1,8 @@
'''
Uses IMAP and SMTP login credentials to monitor an inbox and summarize incoming emails that match certain criteria and save the Text-To-Speech converted summaries into a specified "podcast" folder.
'''
#routers/email.py
from fastapi import APIRouter
import asyncio
import aiofiles

124
sijapi/routers/forward.py Normal file
View file

@ -0,0 +1,124 @@
'''
Used to port-forwarding and reverse proxy configurations.
'''
#routers/forward.py
import os
import io
import string
import json
import time
import base64
import asyncpg
import asyncio
import subprocess
import requests
import random
import paramiko
import aiohttp
import httpx
from datetime import datetime
from hashlib import sha256
from pathlib import Path
from typing import List, Optional
from pydantic import BaseModel
from PyPDF2 import PdfReader
from fastapi import APIRouter, Form, HTTPException, Request, Response, BackgroundTasks, status, Path as PathParam
from fastapi.responses import HTMLResponse, FileResponse, PlainTextResponse, JSONResponse, RedirectResponse
from fastapi.templating import Jinja2Templates
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from sijapi import (
L, API, Serve, LOGS_DIR, TS_ID, CASETABLE_PATH, COURTLISTENER_DOCKETS_URL, COURTLISTENER_API_KEY,
COURTLISTENER_BASE_URL, COURTLISTENER_DOCKETS_DIR, COURTLISTENER_SEARCH_DIR, ALERTS_DIR,
MAC_UN, MAC_PW, MAC_ID, TS_TAILNET, IMG_DIR, PUBLIC_KEY, OBSIDIAN_VAULT_DIR
)
from sijapi.classes import WidgetUpdate
from sijapi.utilities import bool_convert, sanitize_filename, assemble_journal_path
from sijapi.routers import gis
forward = APIRouter()
logger = L.get_module_logger("email")
def debug(text: str): logger.debug(text)
def info(text: str): logger.info(text)
def warn(text: str): logger.warning(text)
def err(text: str): logger.error(text)
def crit(text: str): logger.critical(text)
async def forward_traffic(reader: asyncio.StreamReader, writer: asyncio.StreamWriter, destination: str):
try:
dest_host, dest_port = destination.split(':')
dest_port = int(dest_port)
except ValueError:
warn(f"Invalid destination format: {destination}. Expected 'host:port'.")
writer.close()
await writer.wait_closed()
return
try:
dest_reader, dest_writer = await asyncio.open_connection(dest_host, dest_port)
except Exception as e:
warn(f"Failed to connect to destination {destination}: {str(e)}")
writer.close()
await writer.wait_closed()
return
async def forward(src, dst):
try:
while True:
data = await src.read(8192)
if not data:
break
dst.write(data)
await dst.drain()
except Exception as e:
warn(f"Error in forwarding: {str(e)}")
finally:
dst.close()
await dst.wait_closed()
await asyncio.gather(
forward(reader, dest_writer),
forward(dest_reader, writer)
)
async def start_server(source: str, destination: str):
if ':' in source:
host, port = source.split(':')
port = int(port)
else:
host = source
port = 80
server = await asyncio.start_server(
lambda r, w: forward_traffic(r, w, destination),
host,
port
)
async with server:
await server.serve_forever()
async def start_port_forwarding():
if hasattr(Serve, 'forwarding_rules'):
for rule in Serve.forwarding_rules:
asyncio.create_task(start_server(rule.source, rule.destination))
else:
warn("No forwarding rules found in the configuration.")
@forward.get("/forward_status")
async def get_forward_status():
if hasattr(Serve, 'forwarding_rules'):
return {"status": "active", "rules": Serve.forwarding_rules}
else:
return {"status": "inactive", "message": "No forwarding rules configured"}
asyncio.create_task(start_port_forwarding())

View file

@ -1,3 +1,19 @@
'''
WIP. Will be used to manage and update Ghost blogs.
'''
#routers/ghost.py
from fastapi import APIRouter
from datetime import date
import os
import requests
import json
import yaml
import jwt
from sijapi import GHOST_API_KEY, GHOST_API_URL
ghost = APIRouter()
def generate_jwt_token():
key_id, key_secret = GHOST_API_KEY.split(':')
iat = int(date.now().timestamp())

View file

@ -1,6 +1,8 @@
'''
Uses Postgres/PostGIS for location tracking (data obtained via the companion mobile Pythonista scripts), and for geocoding purposes.
'''
#routers/gis.py
from fastapi import APIRouter, HTTPException, Query
from fastapi.responses import HTMLResponse, JSONResponse
import random

View file

@ -1,6 +1,8 @@
'''
IN DEVELOPMENT: Instagram AI bot module.
'''
#routers/ig.py
from fastapi import APIRouter, UploadFile
import os
import io

View file

@ -2,9 +2,10 @@
Image generation module using StableDiffusion and similar models by way of ComfyUI.
DEPENDS ON:
LLM module
COMFYUI_URL, COMFYUI_DIR, COMFYUI_OUTPUT_DIR, TS_SUBNET, TS_ADDRESS, DATA_DIR, IMG_CONFIG_DIR, IMG_DIR, IMG_WORKFLOWS_DIR, LOCAL_HOSTS, API.URL, PHOTOPRISM_USER*, PHOTOPRISM_URL*, PHOTOPRISM_PASS*
COMFYUI_URL, COMFYUI_DIR, COMFYUI_OUTPUT_DIR, TS_SUBNET, TS_ADDRESS, DATA_DIR, IMG_CONFIG_DIR, IMG_DIR, IMG_WORKFLOWS_DIR, LOCAL_HOSTS
*unimplemented.
'''
#routers/img.py
from fastapi import APIRouter, Request, Query
from fastapi.responses import JSONResponse, RedirectResponse

View file

@ -2,6 +2,7 @@
Interfaces with Ollama and creates an OpenAI-compatible relay API.
'''
# routers/llm.py
from fastapi import APIRouter, HTTPException, Request, Response, BackgroundTasks, File, Form, UploadFile
from fastapi.responses import StreamingResponse, JSONResponse, FileResponse
from datetime import datetime as dt_datetime

View file

@ -1,4 +1,8 @@
'''
Used to scrape, process, summarize, markdownify, and speechify news articles.
'''
# routers/news.py
import os
import uuid
import asyncio

View file

@ -2,6 +2,7 @@
Manages an Obsidian vault, in particular daily notes, using information and functionality drawn from the other routers, primarily calendar, email, ig, llm, rag, img, serve, time, tts, and weather.
'''
# routers/note.py
from fastapi import APIRouter, BackgroundTasks, File, UploadFile, Form, HTTPException, Response, Query, Path as FastAPIPath
from fastapi.responses import JSONResponse, PlainTextResponse
import os, re
@ -17,7 +18,7 @@ from fastapi import HTTPException, status
from pathlib import Path
from fastapi import APIRouter, Query, HTTPException
from sijapi import API, L, OBSIDIAN_VAULT_DIR, OBSIDIAN_RESOURCES_DIR, OBSIDIAN_BANNER_SCENE, DEFAULT_11L_VOICE, DEFAULT_VOICE, GEO
from sijapi.routers import asr, cal, gis, img, llm, serve, time, tts, weather
from sijapi.routers import asr, cal, gis, img, llm, serve, timing, tts, weather
from sijapi.utilities import assemble_journal_path, convert_to_12_hour_format, sanitize_filename, convert_degrees_to_cardinal, check_file_name, HOURLY_COLUMNS_MAPPING
from sijapi.classes import Location
@ -388,7 +389,7 @@ async def build_daily_timeslips(date):
'''
absolute_path, relative_path = assemble_journal_path(date, filename = "Timeslips", extension=".md", no_timestamp = True)
content = await time.process_timing_markdown(date, date)
content = await timing.process_timing_markdown(date, date)
# document_content = await document.read()
with open(absolute_path, 'wb') as f:
f.write(content.encode())

View file

@ -2,6 +2,7 @@
IN DEVELOPMENT: Retrieval-Augmented Generation module.
NOTES: Haven't yet decided if this should depend on the Obsidian and Chat modules, or if they should depend on it, or one of one the other the other.
'''
#routers/rag.py
from fastapi import APIRouter
from sijapi import L

View file

@ -1,3 +1,8 @@
'''
Used to scrape data from websites, watch sites for changes, and make alerts based on specific changes.
'''
#routers/scrape.py
import asyncio
import json
import re

View file

@ -1,6 +1,8 @@
'''
Web server module. Used by other modules when serving static content is required, e.g. the img image generation module. Also used to serve PUBLIC_KEY.
'''
#routers/serve.py
import os
import io
import string
@ -512,75 +514,3 @@ async def get_analytics(short_code: str):
}
async def forward_traffic(reader: asyncio.StreamReader, writer: asyncio.StreamWriter, destination: str):
try:
dest_host, dest_port = destination.split(':')
dest_port = int(dest_port)
except ValueError:
warn(f"Invalid destination format: {destination}. Expected 'host:port'.")
writer.close()
await writer.wait_closed()
return
try:
dest_reader, dest_writer = await asyncio.open_connection(dest_host, dest_port)
except Exception as e:
warn(f"Failed to connect to destination {destination}: {str(e)}")
writer.close()
await writer.wait_closed()
return
async def forward(src, dst):
try:
while True:
data = await src.read(8192)
if not data:
break
dst.write(data)
await dst.drain()
except Exception as e:
warn(f"Error in forwarding: {str(e)}")
finally:
dst.close()
await dst.wait_closed()
await asyncio.gather(
forward(reader, dest_writer),
forward(dest_reader, writer)
)
async def start_server(source: str, destination: str):
if ':' in source:
host, port = source.split(':')
port = int(port)
else:
host = source
port = 80
server = await asyncio.start_server(
lambda r, w: forward_traffic(r, w, destination),
host,
port
)
async with server:
await server.serve_forever()
async def start_port_forwarding():
if hasattr(Serve, 'forwarding_rules'):
for rule in Serve.forwarding_rules:
asyncio.create_task(start_server(rule.source, rule.destination))
else:
warn("No forwarding rules found in the configuration.")
@serve.get("/forward_status")
async def get_forward_status():
if hasattr(Serve, 'forwarding_rules'):
return {"status": "active", "rules": Serve.forwarding_rules}
else:
return {"status": "inactive", "message": "No forwarding rules configured"}
asyncio.create_task(start_port_forwarding())

577
sijapi/routers/timing.py Normal file
View file

@ -0,0 +1,577 @@
'''
Uses the Timing.app API to get nicely formatted timeslip charts and spreadsheets.
'''
#routers/timing.py
import tempfile
import os
import json
import requests
import csv
import subprocess
import asyncio
import httpx
import io
import re
import pytz
import httpx
import sqlite3
import math
from httpx import Timeout
from fastapi import APIRouter, UploadFile, File, Response, Header, Query, Depends, FastAPI, Request, HTTPException, status
from fastapi.responses import FileResponse, JSONResponse
from pydantic import BaseModel, Field
from datetime import datetime, timedelta
from decimal import Decimal, ROUND_UP
from typing import Optional, List, Dict, Union, Tuple
from collections import defaultdict
from dotenv import load_dotenv
from traceback import format_exc
from sijapi import L, TIMING_API_KEY, TIMING_API_URL
from sijapi.routers import gis
timing = APIRouter(tags=["private"])
logger = L.get_module_logger("timing")
def debug(text: str): logger.debug(text)
def info(text: str): logger.info(text)
def warn(text: str): logger.warning(text)
def err(text: str): logger.error(text)
def crit(text: str): logger.critical(text)
script_directory = os.path.dirname(os.path.abspath(__file__))
# Configuration constants
pacific = pytz.timezone('America/Los_Angeles')
emoji_pattern = re.compile(r'^[\U0001F600-\U0001F64F\U0001F300-\U0001F5FF\U0001F680-\U0001F6FF\U0001F700-\U0001F77F\U0001F780-\U0001F7FF\U0001F800-\U0001F8FF\U0001F900-\U0001F9FF\U0001FA00-\U0001FA6F\U0001FA70-\U0001FAFF\U00002702-\U000027B0\U000024C2-\U0001F251]+ ')
timeout = Timeout(connect=30, read=600, write=120, pool=5)
# Define your models
class TimingRequest(BaseModel):
start_date: str = Field(..., pattern=r"\d{4}-\d{2}-\d{2}")
end_date: Optional[str] = Field(None, pattern=r"\d{4}-\d{2}-\d{2}")
output_format: Optional[str] = 'json'
####################
#### TIMING API ####
####################
@timing.post("/time/post")
async def post_time_entry_to_timing(entry: Dict):
url = 'https://web.timingapp.com/api/v1/time-entries'
headers = {
'Authorization': f'Bearer {TIMING_API_KEY}',
'Content-Type': 'application/json',
'Accept': 'application/json',
'X-Time-Zone': 'America/Los_Angeles'
}
debug(f"Received entry: {entry}")
response = None # Initialize response
try:
async with httpx.AsyncClient() as client:
response = await client.post(url, headers=headers, json=entry)
response.raise_for_status() # This will only raise for 4xx and 5xx responses
except httpx.HTTPStatusError as exc:
debug(f"HTTPStatusError caught: Status code: {exc.response.status_code}, Detail: {exc.response.text}")
raise HTTPException(status_code=exc.response.status_code, detail=str(exc.response.text))
except Exception as exc:
debug(f"General exception caught: {exc}")
raise HTTPException(status_code=500, detail="An unexpected error occurred")
if response:
return response.json()
else:
# Handle the case where the response was not set due to an error.
raise HTTPException(status_code=500, detail="Failed to make the external API request")
def project_sort_key(project):
# Remove any leading emoji characters for sorting
return emoji_pattern.sub('', project)
def prepare_date_range_for_query(start_date, end_date=None):
# Adjust the start date to include the day before
start_date_adjusted = (datetime.strptime(start_date, "%Y-%m-%d") - timedelta(days=1)).strftime("%Y-%m-%d")
# If end_date is not provided, use the original start_date as the end_date
end_date = end_date if end_date else start_date
# Format the end_date
end_date_formatted = f"{end_date}T23:59:59"
return f"{start_date_adjusted}T00:00:00", end_date_formatted
def truncate_project_title(title):
return title.split(' - ')[0] if ' - ' in title else title
async def fetch_and_prepare_timing_data(start: datetime, end: Optional[datetime] = None) -> List[Dict]:
# start_date = await gis.dt(start)
# end_date = await gis.dt(end) if end else None
# Adjust the start date to include the day before and format the end date
start_date_adjusted = (start - timedelta(days=1)).strftime("%Y-%m-%dT00:00:00")
end_date_formatted = f"{datetime.strftime(end, '%Y-%m-%d')}T23:59:59" if end else f"{datetime.strftime(start, '%Y-%m-%d')}T23:59:59"
# Fetch timing data from the API using TIMING_API_KEY
url = f"{TIMING_API_URL}/time-entries?start_date_min={start_date_adjusted}&start_date_max={end_date_formatted}&include_project_data=1"
headers = {
'Authorization': f'Bearer {TIMING_API_KEY}',
'Content-Type': 'application/json',
'Accept': 'application/json',
'X-Time-Zone': 'America/Los_Angeles'
}
processed_timing_data = []
async with httpx.AsyncClient() as client:
response = await client.get(url, headers=headers)
if response.status_code != 200:
response.raise_for_status()
raw_timing_data = response.json().get('data', [])
for entry in raw_timing_data:
entry_start_utc = datetime.strptime(entry['start_date'], '%Y-%m-%dT%H:%M:%S.%f%z')
entry_end_utc = datetime.strptime(entry['end_date'], '%Y-%m-%dT%H:%M:%S.%f%z')
entry_start_pacific = entry_start_utc.astimezone(pacific)
entry_end_pacific = entry_end_utc.astimezone(pacific)
while entry_start_pacific.date() < entry_end_pacific.date():
midnight = pacific.localize(datetime.combine(entry_start_pacific.date() + timedelta(days=1), datetime.min.time()))
duration_to_midnight = (midnight - entry_start_pacific).total_seconds()
if entry_start_pacific.date() >= start.date():
processed_entry = create_time_entry(entry, entry_start_pacific, midnight, duration_to_midnight)
processed_timing_data.append(processed_entry)
entry_start_pacific = midnight
if entry_start_pacific.date() >= start.date():
duration_remaining = (entry_end_pacific - entry_start_pacific).total_seconds()
processed_entry = create_time_entry(entry, entry_start_pacific, entry_end_pacific, duration_remaining)
processed_timing_data.append(processed_entry)
return processed_timing_data
def format_duration(duration):
duration_in_hours = Decimal(duration) / Decimal(3600)
rounded_duration = duration_in_hours.quantize(Decimal('0.1'), rounding=ROUND_UP)
return str(rounded_duration)
def create_time_entry(original_entry, start_time, end_time, duration_seconds):
"""Formats a time entry, preserving key details and adding necessary elements."""
# Format start and end times in the appropriate timezone
start_time_aware = start_time.astimezone(pacific)
end_time_aware = end_time.astimezone(pacific)
# Check if project is None and handle accordingly
if original_entry.get('project'):
project_title = original_entry['project'].get('title', 'No Project')
project_color = original_entry['project'].get('color', '#FFFFFF') # Default color
else:
project_title = 'No Project'
project_color = '#FFFFFF' # Default color
# Construct the processed entry
processed_entry = {
'start_time': start_time_aware.strftime('%Y-%m-%dT%H:%M:%S.%f%z'),
'end_time': end_time_aware.strftime('%Y-%m-%dT%H:%M:%S.%f%z'),
'start_date': start_time_aware.strftime('%Y-%m-%d'),
'end_date': end_time_aware.strftime('%Y-%m-%d'),
'duration': format_duration(duration_seconds),
'notes': original_entry.get('notes', ''),
'title': original_entry.get('title', 'Untitled'),
'is_running': original_entry.get('is_running', False),
'project': {
'title': project_title,
'color': project_color,
# Include other project fields as needed
},
# Additional original fields as required
}
return processed_entry
# TIMELINE
@timing.get("/time/line")
async def get_timing_timeline(
request: Request,
start_date: str = Query(..., regex=r"\d{4}-\d{2}-\d{2}"),
end_date: Optional[str] = Query(None, regex=r"\d{4}-\d{2}-\d{2}")
):
# Retain these for processing timeline data with the correct timezone
queried_start_date = datetime.strptime(start_date, "%Y-%m-%d").replace(tzinfo=pacific).date()
queried_end_date = (datetime.strptime(end_date, "%Y-%m-%d").replace(tzinfo=pacific).date()
if end_date else queried_start_date)
# Fetch and process timing data
timing_data = await fetch_and_prepare_timing_data(start_date, end_date)
# Process timeline data
timeline_formatted_data = process_timeline(timing_data, queried_start_date, queried_end_date)
return Response(content=timeline_formatted_data, media_type="text/markdown")
def process_timeline(timing_data, queried_start_date, queried_end_date):
timeline_output = []
entries_by_date = defaultdict(list)
for entry in timing_data:
# Convert start and end times to datetime objects and localize to Pacific timezone
start_datetime = datetime.strptime(entry['start_time'], '%Y-%m-%dT%H:%M:%S.%f%z').astimezone(pacific)
end_datetime = datetime.strptime(entry['end_time'], '%Y-%m-%dT%H:%M:%S.%f%z').astimezone(pacific)
project_title = truncate_project_title(entry['project']['title']) if entry.get('project') else 'No Project'
task_title = entry['title'] if entry.get('title') else 'Untitled'
# Check if the entry's date falls within the queried date range
if queried_start_date <= start_datetime.date() <= queried_end_date:
duration_seconds = (end_datetime - start_datetime).total_seconds()
duration_hours = format_duration(duration_seconds)
entries_by_date[start_datetime.date()].append(
(start_datetime.strftime('%H:%M:%S'), project_title, task_title, duration_hours)
)
# Sorting and outputting the timeline
for date, entries in sorted(entries_by_date.items()):
sorted_entries = sorted(entries, key=lambda x: x[0])
day_total_duration = sum(Decimal(entry[3]) for entry in sorted_entries)
if queried_start_date != queried_end_date:
timeline_output.append(f"## {date.strftime('%Y-%m-%d')} {date.strftime('%A')} [{day_total_duration}]\n")
for start_time, project, task, duration in sorted_entries:
timeline_output.append(f" - {start_time} {project} - {task} [{duration}]")
return "\n".join(timeline_output)
# CSV
@timing.get("/time/csv")
async def get_timing_csv(
request: Request,
start_date: str = Query(..., regex=r"\d{4}-\d{2}-\d{2}"),
end_date: Optional[str] = Query(None, regex=r"\d{4}-\d{2}-\d{2}")
):
# Fetch and process timing data
timing_data = await fetch_and_prepare_timing_data(start_date, end_date)
# Retain these for processing CSV data with the correct timezone
queried_start_date = datetime.strptime(start_date, "%Y-%m-%d").replace(tzinfo=pacific).date()
queried_end_date = (datetime.strptime(end_date, "%Y-%m-%d").replace(tzinfo=pacific).date()
if end_date else queried_start_date)
# Process CSV data
csv_data = process_csv(timing_data, queried_start_date, queried_end_date)
if not csv_data or csv_data.strip() == "":
return Response(content="No CSV data available for the specified date range.", media_type="text/plain")
return Response(content=csv_data, media_type="text/csv")
def process_csv(timing_data, queried_start_date, queried_end_date):
project_task_data = defaultdict(lambda: defaultdict(list))
for entry in timing_data:
# Convert start and end times to datetime objects and localize to Pacific timezone
start_datetime = datetime.strptime(entry['start_time'], '%Y-%m-%dT%H:%M:%S.%f%z').astimezone(pacific)
end_datetime = datetime.strptime(entry['end_time'], '%Y-%m-%dT%H:%M:%S.%f%z').astimezone(pacific)
# Ensure the entry's date falls within the queried date range
if queried_start_date <= start_datetime.date() <= queried_end_date:
duration_seconds = (end_datetime - start_datetime).total_seconds()
duration_hours = format_duration(duration_seconds) # Convert duration to hours
project_title = truncate_project_title(entry['project']['title']) if 'title' in entry['project'] else 'No Project'
project_task_data[start_datetime.date()][project_title].append(
(entry['title'] if entry.get('title') else 'Untitled', duration_hours)
)
output = io.StringIO()
writer = csv.writer(output, delimiter='|', quotechar='"', quoting=csv.QUOTE_MINIMAL)
writer.writerow(['Date', 'Project', 'Task', 'Notes', 'Duration'])
for date, project_tasks in sorted(project_task_data.items()):
day_total_duration = Decimal(0)
formatted_date = date.strftime('%Y-%m-%d %a')
for project, tasks in sorted(project_tasks.items(), key=lambda item: project_sort_key(item[0])):
task_summary = defaultdict(Decimal)
for task, duration in tasks:
task_summary[task] += Decimal(duration)
project_duration = sum(task_summary.values()).quantize(Decimal('0.1'))
day_total_duration += project_duration
tasks_formatted = "; ".join([f"{task.replace(';', ',')} [{str(task_summary[task].quantize(Decimal('0.1')))}]" for task in task_summary])
writer.writerow([formatted_date, project, tasks_formatted, '', str(project_duration.quantize(Decimal('0.1')))])
writer.writerow([formatted_date, 'Day Total', '', '', str(day_total_duration.quantize(Decimal('0.1')))])
writer.writerow(['', '', '', '', ''])
return output.getvalue()
# MARKDOWN
@timing.get("/time/markdown3")
async def get_timing_markdown3(
request: Request,
start_date: str = Query(..., regex=r"\d{4}-\d{2}-\d{2}"),
end_date: Optional[str] = Query(None, regex=r"\d{4}-\d{2}-\d{2}")
):
# Fetch and process timing data
start = await gis.dt(start_date)
end = await gis.dt(end_date) if end_date else None
timing_data = await fetch_and_prepare_timing_data(start, end)
# Retain these for processing Markdown data with the correct timezone
queried_start_date = start.replace(tzinfo=pacific).date()
queried_end_date = end.replace(tzinfo=pacific).date() if end else queried_start_date
# Process Markdown data
markdown_formatted_data = process_timing_markdown3(timing_data, queried_start_date, queried_end_date)
return Response(content=markdown_formatted_data, media_type="text/markdown")
def process_timing_markdown3(timing_data, queried_start_date, queried_end_date):
markdown_output = []
project_task_data = defaultdict(lambda: defaultdict(list))
for entry in timing_data:
# Convert start and end times to datetime objects and localize to Pacific timezone
start_datetime = datetime.strptime(entry['start_time'], '%Y-%m-%dT%H:%M:%S.%f%z').astimezone(pacific)
end_datetime = datetime.strptime(entry['end_time'], '%Y-%m-%dT%H:%M:%S.%f%z').astimezone(pacific)
# Check if the entry's date falls within the queried date range
if queried_start_date <= start_datetime.date() <= queried_end_date:
duration_seconds = (end_datetime - start_datetime).total_seconds()
duration_hours = format_duration(duration_seconds)
project_title = truncate_project_title(entry['project']['title']) if 'title' in entry['project'] else 'No Project'
project_task_data[start_datetime.date()][project_title].append(
(entry['title'] if entry.get('title') else 'Untitled', duration_hours)
)
for date, projects in sorted(project_task_data.items()):
day_total_duration = Decimal(0)
tasks_output = []
for project, tasks in sorted(projects.items(), key=lambda item: project_sort_key(item[0])):
task_summary = defaultdict(Decimal)
for task, duration in tasks:
task_summary[task] += Decimal(duration)
project_duration = sum(task_summary.values()).quantize(Decimal('0.1'))
day_total_duration += project_duration
tasks_formatted = "; ".join([f"{task.replace(';', ',')} [{duration}]" for task, duration in task_summary.items()])
tasks_output.append(f"- {project} - {tasks_formatted} - *{project_duration}*.")
if queried_start_date != queried_end_date:
markdown_output.append(f"## {date.strftime('%Y-%m-%d %A')} [{day_total_duration}]\n")
markdown_output.extend(tasks_output)
markdown_output.append("")
return "\n".join(markdown_output)
@timing.get("/time/markdown")
async def get_timing_markdown(
request: Request,
start: str = Query(..., regex=r"\d{4}-\d{2}-\d{2}"),
end: Optional[str] = Query(None, regex=r"\d{4}-\d{2}-\d{2}")
):
start_date = await gis.dt(start)
end_date = await gis.dt(end)
markdown_formatted_data = await process_timing_markdown(start_date, end_date)
return Response(content=markdown_formatted_data, media_type="text/markdown")
#return JSONResponse(content={"markdown": markdown_formatted_data}, media_type="text/markdown")
async def process_timing_markdown(start_date: datetime, end_date: datetime): # timing_data, queried_start_date, queried_end_date)
timing_data = await fetch_and_prepare_timing_data(start_date, end_date)
queried_start_date = start_date.replace(tzinfo=pacific).date()
queried_end_date = (end_date.replace(tzinfo=pacific).date() if end_date else queried_start_date)
markdown_output = []
project_task_data = defaultdict(lambda: defaultdict(list))
# pacific = pytz.timezone('US/Pacific')
for entry in timing_data:
# Convert start and end times to datetime objects and localize to Pacific timezone
start_datetime = datetime.strptime(entry['start_time'], '%Y-%m-%dT%H:%M:%S.%f%z').astimezone(pacific)
end_datetime = datetime.strptime(entry['end_time'], '%Y-%m-%dT%H:%M:%S.%f%z').astimezone(pacific)
# Check if the entry's date falls within the queried date range
if queried_start_date <= start_datetime.date() <= queried_end_date:
duration_seconds = (end_datetime - start_datetime).total_seconds()
duration_hours = format_duration(duration_seconds)
project_title = truncate_project_title(entry['project']['title']) if 'title' in entry['project'] else 'No Project'
project_task_data[start_datetime.date()][project_title].append(
(entry['title'] if entry.get('title') else 'Untitled', duration_hours)
)
for date, projects in sorted(project_task_data.items()):
day_total_duration = Decimal(0)
tasks_output = []
for project, tasks in sorted(projects.items(), key=lambda item: project_sort_key(item[0])):
task_summary = defaultdict(Decimal)
for task, duration in tasks:
task_summary[task] += Decimal(duration)
project_duration = sum(task_summary.values()).quantize(Decimal('0.1'))
day_total_duration += project_duration
tasks_formatted = "; ".join([f"{task.replace(';', ',')} [{duration}]" for task, duration in task_summary.items()])
tasks_output.append(f"|{project}|{tasks_formatted}|{project_duration}|")
if queried_start_date != queried_end_date:
markdown_output.append(f"## {date.strftime('%Y-%m-%d %A')} [{day_total_duration}]\n")
tableheader = """|Project|Task(s)|Duration|
|-------|-------|-------:|"""
markdown_output.append(tableheader)
markdown_output.extend(tasks_output)
markdown_output.append(f"|TOTAL| |{day_total_duration}|\n")
markdown_output.append("")
return "\n".join(markdown_output)
#JSON
@timing.get("/time/json")
async def get_timing_json(
request: Request,
start_date: str = Query(..., regex=r"\d{4}-\d{2}-\d{2}"),
end_date: Optional[str] = Query(None, regex=r"\d{4}-\d{2}-\d{2}")
):
# Fetch and process timing data
start = await gis.dt(start_date)
end = await gis.dt(end_date)
timing_data = await fetch_and_prepare_timing_data(start, end)
# Convert processed data to the required JSON structure
json_data = process_json(timing_data)
return JSONResponse(content=json_data)
def process_json(timing_data):
structured_data = defaultdict(lambda: defaultdict(lambda: defaultdict(list)))
for entry in timing_data:
date_key = entry['start_date'] # Already in 'YYYY-MM-DD' format
project_title = entry['project']['title'] if 'title' in entry['project'] else 'No Project'
task_title = entry['title']
structured_data[date_key][project_title][task_title].append(entry)
return dict(structured_data)
# ROCKETMATTER CSV PARSING
def load_project_names(filename):
with open(filename, 'r', encoding='utf-8') as file:
return json.load(file)
def parse_input(fields, project_name_mappings, start_times_by_date):
project_code = fields[3].strip()
project_name = project_name_mappings.get(project_code, project_code)
task_descriptions = fields[4].strip()
billing_date_str = fields[6].strip()
total_hours = float(fields[9].strip())
billing_date = datetime.strptime(billing_date_str, "%m/%d/%Y").date()
# If no start time is recorded for this billing_date, default to 8 AM
if billing_date not in start_times_by_date:
start_time = pacific.localize(datetime.combine(billing_date, datetime.min.time()).replace(hour=8))
else:
start_time = start_times_by_date[billing_date]
# Normalize the task descriptions by converting line breaks and variations of task separators (],), (),)\s to standard form [,]
task_descriptions = re.sub(r'(\)|\])(\s+|$)(?=\[|\(|[A-Za-z])', '],', task_descriptions)
task_descriptions = re.sub(r'(\r?\n|\r)', ',', task_descriptions)
# Regex pattern to match task descriptions along with their respective durations.
task_pattern = re.compile(r'(.*?)[\[\(](\d+\.\d+)[\]\)]\s*,?')
tasks_with_durations = task_pattern.findall(task_descriptions)
tasks = []
total_calc_hours = 0
# Process tasks with explicit durations
for task in tasks_with_durations:
task_name, duration_hours = task[0].strip(' ,;'), float(task[1])
task_name = task_name if task_name else "Undefined Task"
tasks.append((task_name, duration_hours))
total_calc_hours += duration_hours
# If there are hours not accounted for, consider them for a task without a specific duration
remainder = total_hours - total_calc_hours
if remainder > 0:
# Include non-specific task or "Undefined Task"
non_duration_task = re.sub(task_pattern, '', task_descriptions).strip(' ,;')
if not non_duration_task:
non_duration_task = "Undefined Task"
tasks.append((non_duration_task, remainder))
# If no specific task durations are found in the description, treat the entire description as one task
if not tasks_with_durations:
task_name = task_descriptions if task_descriptions else "Undefined Task"
tasks.append((task_name, total_hours))
json_entries = []
for task_name, duration_hours in tasks:
duration = timedelta(hours=duration_hours)
end_time = start_time + duration
entry = {
"project": project_name,
"Task": task_name,
"Start_time": start_time.strftime("%Y-%m-%d %H:%M:%S-07:00"),
"End_time": end_time.strftime("%Y-%m-%d %H:%M:%S-07:00")
}
json_entries.append(entry)
start_time = end_time
# Update the start time for the billing_date in the dictionary
start_times_by_date[billing_date] = start_time
return json_entries
async def post_time_entry_to_timing(entry):
url = f"{TIMING_API_URL}/time-entries" # The URL for posting time entries
headers = {
"Authorization": f"Bearer {TIMING_API_KEY}",
"Content-Type": "application/json",
"Accept": "application/json",
'X-Time-Zone': 'America/Los_Angeles' # Set the timezone for the API request
}
data = {
"start_date": entry["Start_time"], # Format these according to the API's requirements
"end_date": entry["End_time"],
"project": entry["project"],
"title": entry["Task"],
"notes": "Automatically generated based on Rocketmatter reports.",
"replace_existing": False
}
response = await httpx.post(url, headers=headers, json=data)
return response.status_code, response.json()
@timing.get("/time/flagemoji/{country_code}")
def flag_emoji(country_code: str):
offset = 127397
flag = ''.join(chr(ord(char) + offset) for char in country_code.upper())
return {"emoji": flag}
@timing.head("/time/")
async def read_root():
return {}

View file

@ -1,6 +1,8 @@
'''
Uses xtts-v2 and/or the Elevenlabs API for text to speech.
'''
#routers/tts.py
from fastapi import APIRouter, UploadFile, HTTPException, Response, Form, File, BackgroundTasks, Depends, Request
from fastapi.responses import Response, StreamingResponse, FileResponse
from fastapi.responses import StreamingResponse, PlainTextResponse

View file

@ -1,6 +1,8 @@
'''
Uses the VisualCrossing API and Postgres/PostGIS to source local weather forecasts and history.
'''
#routers/weather.py
import asyncio
import traceback
from fastapi import APIRouter, HTTPException, Query