Auto-update: Tue Aug 6 21:21:14 PDT 2024
This commit is contained in:
2 changed files with 122 additions and 102 deletions
@ -766,20 +766,20 @@ class APIConfig(BaseModel):
results = []
max_version = -1
latest_result = None
for pool_entry in online_hosts:
conn = await self.get_connection(pool_entry)
if conn is None:
warn(f"Unable to connect to {pool_entry['ts_id']}. Skipping read.")
# Execute the query
result = await conn.fetch(query, *args)
if not result:
# Check version if it's not a special table
if table_name not in self.SPECIAL_TABLES:
@ -800,34 +800,38 @@ class APIConfig(BaseModel):
max_version = version
latest_result = result
debug(f"No data in table {table_name} for {pool_entry['ts_id']}")
debug(f"No version data in table {table_name} for {pool_entry['ts_id']}")
if latest_result is None:
latest_result = result
except asyncpg.exceptions.UndefinedColumnError:
warn(f"Version or server_id column does not exist in table {table_name} for {pool_entry['ts_id']}. Skipping version check.")
warn(f"Version column does not exist in table {table_name} for {pool_entry['ts_id']}. Using result without version check.")
if latest_result is None:
latest_result = result
# For special tables, just use the first result
if latest_result is None:
latest_result = result
results.append((pool_entry['ts_id'], result))
except Exception as e:
err(f"Error executing read query on {pool_entry['ts_id']}: {str(e)}")
err(f"Traceback: {traceback.format_exc()}")
await conn.close()
if not latest_result:
warn(f"No results found for query on table {table_name}")
return []
# Log results from all databases
for ts_id, result in results:
info(f"Read result from {ts_id}: {result}")
return [dict(r) for r in latest_result] # Convert Record objects to dictionaries
async def execute_write_query(self, query: str, *args, table_name: str):
conn = await self.get_connection()
if conn is None:
@ -31,118 +31,131 @@ async def get_refreshed_weather(
date: str = Query("%Y-%m-%d"), description="Enter a date in YYYY-MM-DD format, otherwise it will default to today."),
latlon: str = Query(default="None", description="Optionally enter latitude and longitude in the format 45.8411,-123.1765; if not provided it will use your recorded location."),
# date = await date
if latlon == "None":
date_time = await gis.dt(date)
place = await gis.fetch_last_location_before(date_time)
lat = place.latitude
lon = place.longitude
if not place:
raise HTTPException(status_code=404, detail="No location data found for the given date")
lat, lon = place.latitude, place.longitude
lat, lon = latlon.split(',')
lat, lon = map(float, latlon.split(','))
except ValueError:
raise HTTPException(status_code=400, detail="Invalid latitude/longitude format. Use format: 45.8411,-123.1765")
tz = await GEO.tz_at(lat, lon)
date_time = await gis.dt(date, tz)
debug(f"passing date_time {date_time.strftime('%Y-%m-%d %H:%M:%S')}, {lat}/{lon} into get_weather")
debug(f"Passing date_time {date_time.strftime('%Y-%m-%d %H:%M:%S')}, {lat}/{lon} into get_weather")
day = await get_weather(date_time, lat, lon, force_refresh=True)
day_str = str(day)
return JSONResponse(content={"weather": day_str}, status_code=200)
# Convert the day object to a JSON-serializable format
day_dict = {k: str(v) if isinstance(v, (dt_datetime, date)) else v for k, v in day.items()}
return JSONResponse(content={"weather": day_dict}, status_code=200)
except HTTPException as e:
err(f"HTTP Exception in get_refreshed_weather: {e.detail}")
return JSONResponse(content={"detail": str(e.detail)}, status_code=e.status_code)
except Exception as e:
err(f"Error in note_weather_get: {str(e)}")
raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}")
err(f"Unexpected error in get_refreshed_weather: {str(e)}")
err(f"Traceback: {traceback.format_exc()}")
return JSONResponse(content={"detail": "An unexpected error occurred"}, status_code=500)
async def get_weather(date_time: dt_datetime, latitude: float, longitude: float, force_refresh: bool = False):
fetch_new_data = True
if force_refresh == False:
daily_weather_data = await get_weather_from_db(date_time, latitude, longitude)
if daily_weather_data:
daily_weather_data = None
fetch_new_data = force_refresh
if not force_refresh:
daily_weather_data = await get_weather_from_db(date_time, latitude, longitude)
if daily_weather_data:
debug(f"Daily weather data from db: {daily_weather_data}")
last_updated = str(daily_weather_data['DailyWeather'].get('last_updated'))
last_updated = await gis.dt(last_updated)
last_updated = await gis.dt(str(daily_weather_data['DailyWeather'].get('last_updated')))
stored_loc_data = unhexlify(daily_weather_data['DailyWeather'].get('location'))
stored_loc = loads(stored_loc_data)
stored_lat = stored_loc.y
stored_lon = stored_loc.x
stored_ele = stored_loc.z
stored_lat, stored_lon, stored_ele = stored_loc.y, stored_loc.x, stored_loc.z
hourly_weather = daily_weather_data.get('HourlyWeather')
# debug(f"Hourly: {hourly_weather}")
request_haversine = haversine(latitude, longitude, stored_lat, stored_lon)
debug(f"\nINFO:\nlast updated {last_updated}\nstored lat: {stored_lat} - requested lat: {latitude}\nstored lon: {stored_lon} - requested lon: {longitude}\nHaversine: {request_haversine}")
if last_updated and (date_time <= and last_updated > date_time and request_haversine < 8) and hourly_weather and len(hourly_weather) > 0:
debug(f"We can use existing data... :')")
if (last_updated and date_time <= and
last_updated > date_time and request_haversine < 8 and
hourly_weather and len(hourly_weather) > 0):
debug(f"Using existing data")
fetch_new_data = False
except Exception as e:
err(f"Error in get_weather: {e}")
fetch_new_data = True
except Exception as e:
err(f"Error checking existing weather data: {e}")
fetch_new_data = True
if fetch_new_data:
debug(f"We require new data!")
debug(f"Fetching new weather data")
request_date_str = date_time.strftime("%Y-%m-%d")
debug(f"Using {date_time.strftime('%Y-%m-%d')} as our datetime for fetching new data.")
url = f"{latitude},{longitude}/{request_date_str}/{request_date_str}?unitGroup=us&key={VISUALCROSSING_API_KEY}"
async with AsyncClient() as client:
response = await client.get(url)
if response.status_code == 200:
debug(f"Successfully obtained data from VC...")
weather_data = response.json()
store_result = await store_weather_to_db(date_time, weather_data)
if store_result == "SUCCESS":
debug(f"New weather data for {request_date_str} stored in database...")
err(f"Failed to store weather data for {request_date_str} in database! {store_result}")
debug(f"Attempting to retrieve data for {date_time}, {latitude}, {longitude}")
daily_weather_data = await get_weather_from_db(date_time, latitude, longitude)
if daily_weather_data is not None:
return daily_weather_data
raise HTTPException(status_code=500, detail="Weather data was not properly stored.")
except Exception as e:
err(f"Problem parsing VC response or storing data: {e}")
weather_data = response.json()
store_result = await store_weather_to_db(date_time, weather_data)
if store_result != "SUCCESS":
raise HTTPException(status_code=500, detail=f"Failed to store weather data: {store_result}")
daily_weather_data = await get_weather_from_db(date_time, latitude, longitude)
if daily_weather_data is None:
raise HTTPException(status_code=500, detail="Weather data was not properly stored.")
err(f"Failed to fetch weather data: {response.status_code}, {response.text}")
raise HTTPException(status_code=response.status_code, detail=f"Failed to fetch weather data: {response.text}")
except HTTPException:
except Exception as e:
err(f"Exception during API call: {e}")
err(f"Exception during API call or data storage: {e}")
raise HTTPException(status_code=500, detail=f"Error fetching or storing weather data: {str(e)}")
if daily_weather_data is None:
raise HTTPException(status_code=404, detail="No weather data found")
return daily_weather_data
async def store_weather_to_db(date_time: dt_datetime, weather_data: dict):
debug(f"Using {date_time.strftime('%Y-%m-%d %H:%M:%S')} as our datetime in store_weather_to_db")
debug(f"Starting store_weather_to_db for datetime: {date_time.strftime('%Y-%m-%d %H:%M:%S')}")
day_data = weather_data.get('days')[0]
day_data = weather_data.get('days', [{}])[0]
debug(f"RAW DAY_DATA: {day_data}")
# Handle preciptype and stations as PostgreSQL arrays
preciptype_array = day_data.get('preciptype', []) or []
stations_array = day_data.get('stations', []) or []
preciptype_array = day_data.get('preciptype', [])
stations_array = day_data.get('stations', [])
preciptype_array = [] if preciptype_array is None else preciptype_array
stations_array = [] if stations_array is None else stations_array
date_str = date_time.strftime("%Y-%m-%d")
debug(f"Using {date_str} in our query in store_weather_to_db.")
# Get location details from weather data if available
debug(f"Using date {date_str} for database query")
# Get location details
longitude = weather_data.get('longitude')
latitude = weather_data.get('latitude')
if longitude is None or latitude is None:
raise ValueError("Missing longitude or latitude in weather data")
tz = await GEO.tz_at(latitude, longitude)
elevation = await GEO.elevation(latitude, longitude)
location_point = f"POINTZ({longitude} {latitude} {elevation})" if longitude and latitude and elevation else None
warn(f"Uncorrected datetimes in store_weather_to_db: {day_data['datetime']}, sunrise: {day_data['sunrise']}, sunset: {day_data['sunset']}")
location_point = f"POINTZ({longitude} {latitude} {elevation})" if elevation else None
debug(f"Uncorrected datetimes: datetime={day_data.get('datetime')}, sunrise={day_data.get('sunrise')}, sunset={day_data.get('sunset')}")
day_data['datetime'] = await gis.dt(day_data.get('datetimeEpoch'))
day_data['sunrise'] = await gis.dt(day_data.get('sunriseEpoch'))
day_data['sunset'] = await gis.dt(day_data.get('sunsetEpoch'))
warn(f"Corrected datetimes in store_weather_to_db: {day_data['datetime']}, sunrise: {day_data['sunrise']}, sunset: {day_data['sunset']}")
daily_weather_params = (
debug(f"Corrected datetimes: datetime={day_data['datetime']}, sunrise={day_data['sunrise']}, sunset={day_data['sunset']}")
daily_weather_params = [
day_data.get('sunrise'), day_data.get('sunriseEpoch'),
day_data.get('sunset'), day_data.get('sunsetEpoch'),
day_data.get('description'), day_data.get('tempmax'),
@ -162,12 +175,13 @@ async def store_weather_to_db(date_time: dt_datetime, weather_data: dict):
day_data.get('severerisk', 0), day_data.get('moonphase'),
day_data.get('conditions'), stations_array, day_data.get('source'),
except Exception as e:
err(f"Failed to prepare database query in store_weather_to_db! {e}")
return "FAILURE"
# Check for None values and replace with appropriate defaults
daily_weather_params = ['' if v is None else v for v in daily_weather_params]
debug(f"Prepared daily_weather_params: {daily_weather_params}")
daily_weather_query = '''
INSERT INTO dailyweather (
sunrise, sunriseepoch, sunset, sunsetepoch, description,
@ -180,19 +194,19 @@ async def store_weather_to_db(date_time: dt_datetime, weather_data: dict):
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28, $29, $30, $31, $32, $33, $34, $35, $36, $37, $38)
daily_weather_id = await API.execute_write_query(daily_weather_query, *daily_weather_params, table_name="dailyweather")
await API.execute_write_query(daily_weather_query, *daily_weather_params, table_name="dailyweather")
debug(f"Inserted daily weather data")
if 'hours' in day_data:
debug(f"Processing hours now...")
debug(f"Processing {len(day_data['hours'])} hourly records")
for hour_data in day_data['hours']:
await asyncio.sleep(0.01)
hour_data['datetime'] = await gis.dt(hour_data.get('datetimeEpoch'))
hour_preciptype_array = hour_data.get('preciptype', []) or []
hour_stations_array = hour_data.get('stations', []) or []
hourly_weather_params = (
hourly_weather_params = [
date_time, # Use the daily weather datetime as a foreign key
@ -218,28 +232,30 @@ async def store_weather_to_db(date_time: dt_datetime, weather_data: dict):
hour_data.get('source', ''),
hourly_weather_query = '''
INSERT INTO hourlyweather (daily_weather_id, datetime, datetimeepoch, temp, feelslike, humidity, dew, precip, precipprob,
preciptype, snow, snowdepth, windgust, windspeed, winddir, pressure, cloudcover, visibility, solarradiation, solarenergy,
uvindex, severerisk, conditions, icon, stations, source)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26)
hourly_weather_id = await API.execute_write_query(hourly_weather_query, *hourly_weather_params, table_name="hourlyweather")
debug(f"Done processing hourly_weather_id {hourly_weather_id}")
except Exception as e:
err(f"EXCEPTION: {e}")
# Check for None values and replace with appropriate defaults
hourly_weather_params = ['' if v is None else v for v in hourly_weather_params]
hourly_weather_query = '''
INSERT INTO hourlyweather (daily_weather_datetime, datetime, datetimeepoch, temp, feelslike, humidity, dew, precip, precipprob,
preciptype, snow, snowdepth, windgust, windspeed, winddir, pressure, cloudcover, visibility, solarradiation, solarenergy,
uvindex, severerisk, conditions, icon, stations, source)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26)
await API.execute_write_query(hourly_weather_query, *hourly_weather_params, table_name="hourlyweather")
debug(f"Inserted hourly weather data for {hour_data['datetime']}")
except Exception as e:
err(f"EXCEPTION: {e}")
err(f"Error processing hourly data: {e}")
err(f"Problematic hour_data: {hour_data}")
debug("Successfully stored weather data")
return "SUCCESS"
except Exception as e:
err(f"Error in dailyweather storage: {e}")
err(f"Error in weather storage: {e}")
err(f"Traceback: {traceback.format_exc()}")
return "FAILURE"
Add table
Reference in a new issue