Auto-update: Tue Aug 6 23:07:27 PDT 2024

This commit is contained in:
sanj 2024-08-06 23:07:27 -07:00
parent fbb11c9e59
commit 53177d249d
2 changed files with 79 additions and 60 deletions

View file

@ -849,40 +849,28 @@ class APIConfig(BaseModel):
if table_name in self.SPECIAL_TABLES: if table_name in self.SPECIAL_TABLES:
result = await self._execute_special_table_write(conn, query, *args, table_name=table_name) result = await self._execute_special_table_write(conn, query, *args, table_name=table_name)
else: else:
# Get table columns
columns = await self.get_table_columns(conn, table_name)
# Remove 'id', 'version', and 'server_id' from the columns list
insert_cols = [col for col in columns if col not in ['id', 'version', 'server_id']]
# Prepare the INSERT ... ON CONFLICT ... query # Prepare the INSERT ... ON CONFLICT ... query
placeholders = [f'${i+1}' for i in range(len(args))] insert_cols = ', '.join(f'"{col}"' for col in query.split('(')[1].split(')')[0].split(','))
insert_cols_str = ', '.join(insert_cols) update_cols = ', '.join([f'"{col}" = EXCLUDED."{col}"' for col in query.split('(')[1].split(')')[0].split(',') if col.strip() not in ['id', 'version', 'server_id']])
insert_vals = ', '.join(placeholders[:len(insert_cols)])
update_cols = ', '.join([f"{col} = EXCLUDED.{col}" for col in insert_cols])
query = f""" modified_query = f"""
WITH new_version AS ( WITH new_version AS (
SELECT COALESCE(MAX(version), 0) + 1 as next_version SELECT COALESCE(MAX(version), 0) + 1 as next_version
FROM {table_name} FROM {table_name}
WHERE id = (SELECT id FROM {table_name} WHERE {insert_cols[0]} = $1 FOR UPDATE) WHERE id = (SELECT id FROM {table_name} WHERE {insert_cols.split(',')[0]} = $1 FOR UPDATE)
) )
INSERT INTO {table_name} ({insert_cols_str}, version, server_id) INSERT INTO {table_name} ({insert_cols}, version, server_id)
VALUES ({insert_vals}, (SELECT next_version FROM new_version), ${len(args)+1}) VALUES ({', '.join(f'${i+1}' for i in range(len(args)))}, (SELECT next_version FROM new_version), '{local_ts_id}')
ON CONFLICT (id) DO UPDATE SET ON CONFLICT (id) DO UPDATE SET
{update_cols}, {update_cols},
version = (SELECT next_version FROM new_version), version = (SELECT next_version FROM new_version),
server_id = ${len(args)+1} server_id = '{local_ts_id}'
WHERE {table_name}.version < (SELECT next_version FROM new_version) WHERE {table_name}.version < (SELECT next_version FROM new_version)
OR ({table_name}.version = (SELECT next_version FROM new_version) AND {table_name}.server_id < ${len(args)+1}) OR ({table_name}.version = (SELECT next_version FROM new_version) AND {table_name}.server_id < '{local_ts_id}')
RETURNING id, version RETURNING id, version
""" """
# Add server_id to args result = await conn.fetch(modified_query, *args)
args = list(args)
args.append(local_ts_id)
result = await conn.fetch(query, *args)
return result return result
except Exception as e: except Exception as e:
@ -895,6 +883,7 @@ class APIConfig(BaseModel):
return [] return []
async def get_table_columns(self, conn, table_name: str) -> List[str]: async def get_table_columns(self, conn, table_name: str) -> List[str]:
query = """ query = """

View file

@ -144,16 +144,9 @@ async def store_weather_to_db(date_time: dt_datetime, weather_data: dict):
day_data = weather_data.get('days', [{}])[0] day_data = weather_data.get('days', [{}])[0]
debug(f"RAW DAY_DATA: {day_data}") debug(f"RAW DAY_DATA: {day_data}")
# Handle preciptype and stations as PostgreSQL arrays preciptype_array = day_data.get('preciptype', []) or []
preciptype_array = day_data.get('preciptype', []) stations_array = day_data.get('stations', []) or []
stations_array = day_data.get('stations', [])
preciptype_array = [] if preciptype_array is None else preciptype_array
stations_array = [] if stations_array is None else stations_array
date_str = date_time.strftime("%Y-%m-%d")
debug(f"Using date {date_str} for database query")
# Get location details
longitude = weather_data.get('longitude') longitude = weather_data.get('longitude')
latitude = weather_data.get('latitude') latitude = weather_data.get('latitude')
if longitude is None or latitude is None: if longitude is None or latitude is None:
@ -163,37 +156,63 @@ async def store_weather_to_db(date_time: dt_datetime, weather_data: dict):
elevation = await GEO.elevation(latitude, longitude) elevation = await GEO.elevation(latitude, longitude)
location_point = f"POINTZ({longitude} {latitude} {elevation})" if elevation else None location_point = f"POINTZ({longitude} {latitude} {elevation})" if elevation else None
debug(f"Uncorrected datetimes: datetime={day_data.get('datetime')}, sunrise={day_data.get('sunrise')}, sunset={day_data.get('sunset')}")
datetime_utc = await gis.dt(day_data.get('datetimeEpoch'))
sunrise_utc = await gis.dt(day_data.get('sunriseEpoch'))
sunset_utc = await gis.dt(day_data.get('sunsetEpoch'))
debug(f"Corrected datetimes: datetime={datetime_utc}, sunrise={sunrise_utc}, sunset={sunset_utc}")
daily_weather_params = [ daily_weather_params = [
location_point, location_point,
sunrise_utc, day_data.get('sunriseEpoch'), await gis.dt(day_data.get('sunriseEpoch')),
sunset_utc, day_data.get('sunsetEpoch'), day_data.get('sunriseEpoch'),
day_data.get('description'), day_data.get('tempmax'), await gis.dt(day_data.get('sunsetEpoch')),
day_data.get('tempmin'), day_data.get('uvindex'), day_data.get('sunsetEpoch'),
day_data.get('winddir'), day_data.get('windspeed'), day_data.get('description'),
day_data.get('icon'), dt_datetime.now(tz), day_data.get('tempmax'),
datetime_utc, day_data.get('datetimeEpoch'), day_data.get('tempmin'),
day_data.get('temp'), day_data.get('feelslikemax'), day_data.get('uvindex'),
day_data.get('feelslikemin'), day_data.get('feelslike'), day_data.get('winddir'),
day_data.get('dew'), day_data.get('humidity'), day_data.get('windspeed'),
day_data.get('precip'), day_data.get('precipprob'), day_data.get('icon'),
day_data.get('precipcover'), preciptype_array, dt_datetime.now(tz),
day_data.get('snow'), day_data.get('snowdepth'), await gis.dt(day_data.get('datetimeEpoch')),
day_data.get('windgust'), day_data.get('pressure'), day_data.get('datetimeEpoch'),
day_data.get('cloudcover'), day_data.get('visibility'), day_data.get('temp'),
day_data.get('solarradiation'), day_data.get('solarenergy'), day_data.get('feelslikemax'),
day_data.get('severerisk', 0), day_data.get('moonphase'), day_data.get('feelslikemin'),
day_data.get('conditions'), stations_array, day_data.get('source') day_data.get('feelslike'),
day_data.get('dew'),
day_data.get('humidity'),
day_data.get('precip'),
day_data.get('precipprob'),
day_data.get('precipcover'),
preciptype_array,
day_data.get('snow'),
day_data.get('snowdepth'),
day_data.get('windgust'),
day_data.get('pressure'),
day_data.get('cloudcover'),
day_data.get('visibility'),
day_data.get('solarradiation'),
day_data.get('solarenergy'),
day_data.get('severerisk', 0),
day_data.get('moonphase'),
day_data.get('conditions'),
stations_array,
day_data.get('source')
] ]
debug(f"Prepared daily_weather_params: {daily_weather_params}") daily_weather_query = '''
INSERT INTO dailyweather (
location, sunrise, sunriseepoch, sunset, sunsetepoch, description,
tempmax, tempmin, uvindex, winddir, windspeed, icon, last_updated,
datetime, datetimeepoch, temp, feelslikemax, feelslikemin, feelslike,
dew, humidity, precip, precipprob, precipcover, preciptype,
snow, snowdepth, windgust, pressure, cloudcover, visibility,
solarradiation, solarenergy, severerisk, moonphase, conditions,
stations, source
) VALUES (
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15,
$16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28,
$29, $30, $31, $32, $33, $34, $35, $36, $37, $38
) RETURNING id
'''
daily_weather_query = 'INSERT INTO dailyweather DEFAULT VALUES'
daily_weather_result = await API.execute_write_query(daily_weather_query, *daily_weather_params, table_name="dailyweather") daily_weather_result = await API.execute_write_query(daily_weather_query, *daily_weather_params, table_name="dailyweather")
if not daily_weather_result: if not daily_weather_result:
@ -206,12 +225,11 @@ async def store_weather_to_db(date_time: dt_datetime, weather_data: dict):
debug(f"Processing {len(day_data['hours'])} hourly records") debug(f"Processing {len(day_data['hours'])} hourly records")
for hour_data in day_data['hours']: for hour_data in day_data['hours']:
try: try:
hour_datetime = await gis.dt(hour_data.get('datetimeEpoch'))
hour_preciptype_array = hour_data.get('preciptype', []) or [] hour_preciptype_array = hour_data.get('preciptype', []) or []
hour_stations_array = hour_data.get('stations', []) or [] hour_stations_array = hour_data.get('stations', []) or []
hourly_weather_params = [ hourly_weather_params = [
daily_weather_id, daily_weather_id,
hour_datetime, await gis.dt(hour_data.get('datetimeEpoch')),
hour_data.get('datetimeEpoch'), hour_data.get('datetimeEpoch'),
hour_data['temp'], hour_data['temp'],
hour_data['feelslike'], hour_data['feelslike'],
@ -238,9 +256,20 @@ async def store_weather_to_db(date_time: dt_datetime, weather_data: dict):
hour_data.get('source', '') hour_data.get('source', '')
] ]
hourly_weather_query = 'INSERT INTO hourlyweather DEFAULT VALUES' hourly_weather_query = '''
INSERT INTO hourlyweather (
daily_weather_id, datetime, datetimeepoch, temp, feelslike,
humidity, dew, precip, precipprob, preciptype, snow, snowdepth,
windgust, windspeed, winddir, pressure, cloudcover, visibility,
solarradiation, solarenergy, uvindex, severerisk, conditions,
icon, stations, source
) VALUES (
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15,
$16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26
) RETURNING id
'''
hourly_result = await API.execute_write_query(hourly_weather_query, *hourly_weather_params, table_name="hourlyweather") hourly_result = await API.execute_write_query(hourly_weather_query, *hourly_weather_params, table_name="hourlyweather")
debug(f"Inserted hourly weather data for {hour_datetime}") debug(f"Inserted hourly weather data for {hour_data.get('datetimeEpoch')}")
except Exception as e: except Exception as e:
err(f"Error processing hourly data: {e}") err(f"Error processing hourly data: {e}")
err(f"Problematic hour_data: {hour_data}") err(f"Problematic hour_data: {hour_data}")
@ -255,6 +284,7 @@ async def store_weather_to_db(date_time: dt_datetime, weather_data: dict):
return "FAILURE" return "FAILURE"
async def get_weather_from_db(date_time: dt_datetime, latitude: float, longitude: float): async def get_weather_from_db(date_time: dt_datetime, latitude: float, longitude: float):
debug(f"Using {date_time.strftime('%Y-%m-%d %H:%M:%S')} as our datetime in get_weather_from_db.") debug(f"Using {date_time.strftime('%Y-%m-%d %H:%M:%S')} as our datetime in get_weather_from_db.")
query_date = date_time.date() query_date = date_time.date()