Auto-update: Tue Aug 6 22:44:42 PDT 2024

This commit is contained in:
sanj 2024-08-06 22:44:42 -07:00
parent c0f3690beb
commit b14a6837e1
2 changed files with 26 additions and 17 deletions

View file

@ -839,12 +839,12 @@ class APIConfig(BaseModel):
for pool_entry in online_hosts:
if pool_entry['ts_id'] != local_ts_id:
continue # Only write to the local database
conn = await self.get_connection(pool_entry)
if conn is None:
err(f"Unable to connect to local database {pool_entry['ts_id']}. Write operation failed.")
return []
try:
if table_name in self.SPECIAL_TABLES:
result = await self._execute_special_table_write(conn, query, *args, table_name=table_name)
@ -859,19 +859,28 @@ class APIConfig(BaseModel):
update_cols = ', '.join([f"{col} = EXCLUDED.{col}" for col in columns[1:] if col not in ['id', 'version', 'server_id']])
query = f"""
INSERT INTO {table_name} ({insert_cols})
VALUES ({insert_vals})
WITH new_version AS (
SELECT COALESCE(MAX(version), 0) + 1 as next_version
FROM {table_name}
WHERE id = (SELECT id FROM {table_name} WHERE {columns[1]} = ${1} FOR UPDATE)
)
INSERT INTO {table_name} ({insert_cols}, version, server_id)
VALUES ({insert_vals}, (SELECT next_version FROM new_version), ${{len(args)+1}})
ON CONFLICT (id) DO UPDATE SET
{update_cols},
version = {table_name}.version + 1,
server_id = EXCLUDED.server_id
WHERE {table_name}.version < EXCLUDED.version
OR ({table_name}.version = EXCLUDED.version AND {table_name}.server_id < EXCLUDED.server_id)
version = (SELECT next_version FROM new_version),
server_id = ${{len(args)+1}}
WHERE {table_name}.version < (SELECT next_version FROM new_version)
OR ({table_name}.version = (SELECT next_version FROM new_version) AND {table_name}.server_id < ${{len(args)+1}})
RETURNING id, version
"""
# Add server_id to args
args = list(args)
args.append(local_ts_id)
result = await conn.fetch(query, *args)
return result
except Exception as e:
err(f"Error executing write query on {pool_entry['ts_id']}: {str(e)}")
@ -880,8 +889,9 @@ class APIConfig(BaseModel):
err(f"Traceback: {traceback.format_exc()}")
finally:
await conn.close()
return []
async def get_table_columns(self, conn, table_name: str) -> List[str]:
query = """

View file

@ -171,13 +171,13 @@ async def store_weather_to_db(date_time: dt_datetime, weather_data: dict):
daily_weather_params = [
location_point,
day_data.get('sunrise'), day_data.get('sunriseEpoch'),
day_data.get('sunset'), day_data.get('sunsetEpoch'),
day_data['sunrise'], day_data.get('sunriseEpoch'),
day_data['sunset'], day_data.get('sunsetEpoch'),
day_data.get('description'), day_data.get('tempmax'),
day_data.get('tempmin'), day_data.get('uvindex'),
day_data.get('winddir'), day_data.get('windspeed'),
day_data.get('icon'), dt_datetime.now(tz),
day_data.get('datetime'), day_data.get('datetimeEpoch'),
day_data['datetime'], day_data.get('datetimeEpoch'),
day_data.get('temp'), day_data.get('feelslikemax'),
day_data.get('feelslikemin'), day_data.get('feelslike'),
day_data.get('dew'), day_data.get('humidity'),
@ -188,8 +188,7 @@ async def store_weather_to_db(date_time: dt_datetime, weather_data: dict):
day_data.get('cloudcover'), day_data.get('visibility'),
day_data.get('solarradiation'), day_data.get('solarenergy'),
day_data.get('severerisk', 0), day_data.get('moonphase'),
day_data.get('conditions'), stations_array, day_data.get('source'),
os.environ.get('TS_ID') # server_id
day_data.get('conditions'), stations_array, day_data.get('source')
]
debug(f"Prepared daily_weather_params: {daily_weather_params}")
@ -236,8 +235,7 @@ async def store_weather_to_db(date_time: dt_datetime, weather_data: dict):
hour_data['conditions'],
hour_data['icon'],
hour_stations_array,
hour_data.get('source', ''),
os.environ.get('TS_ID') # server_id
hour_data.get('source', '')
]
hourly_weather_query = 'INSERT INTO hourlyweather DEFAULT VALUES'
@ -256,6 +254,7 @@ async def store_weather_to_db(date_time: dt_datetime, weather_data: dict):
err(f"Traceback: {traceback.format_exc()}")
return "FAILURE"
async def get_weather_from_db(date_time: dt_datetime, latitude: float, longitude: float):