Auto-update: Mon Aug 12 21:28:09 PDT 2024
This commit is contained in:
parent
7f5cb9df77
commit
6c6c3a7b65
2 changed files with 18 additions and 11 deletions
|
@ -144,12 +144,8 @@ class Database:
|
||||||
result = await session.execute(text(query), serialized_kwargs)
|
result = await session.execute(text(query), serialized_kwargs)
|
||||||
await session.commit()
|
await session.commit()
|
||||||
|
|
||||||
# Calculate result checksum
|
|
||||||
result_str = str(result.fetchall())
|
|
||||||
result_checksum = hashlib.md5(result_str.encode()).hexdigest()
|
|
||||||
|
|
||||||
# Add the write query to the query_tracking table
|
# Add the write query to the query_tracking table
|
||||||
await self.add_query_to_tracking(query, serialized_kwargs, result_checksum)
|
await self.add_query_to_tracking(query, kwargs)
|
||||||
|
|
||||||
# Initiate async operations
|
# Initiate async operations
|
||||||
asyncio.create_task(self._async_sync_operations())
|
asyncio.create_task(self._async_sync_operations())
|
||||||
|
@ -165,6 +161,7 @@ class Database:
|
||||||
l.error(f"Traceback: {traceback.format_exc()}")
|
l.error(f"Traceback: {traceback.format_exc()}")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
async def _async_sync_operations(self):
|
async def _async_sync_operations(self):
|
||||||
try:
|
try:
|
||||||
# Call /db/sync on all online servers
|
# Call /db/sync on all online servers
|
||||||
|
@ -174,20 +171,20 @@ class Database:
|
||||||
l.error(f"Traceback: {traceback.format_exc()}")
|
l.error(f"Traceback: {traceback.format_exc()}")
|
||||||
|
|
||||||
|
|
||||||
async def add_query_to_tracking(self, query: str, kwargs: dict, result_checksum: str):
|
async def add_query_to_tracking(self, query: str, kwargs: dict):
|
||||||
async with self.sessions[self.local_ts_id]() as session:
|
async with self.sessions[self.local_ts_id]() as session:
|
||||||
new_query = QueryTracking(
|
new_query = QueryTracking(
|
||||||
ts_id=self.local_ts_id,
|
ts_id=self.local_ts_id,
|
||||||
query=query,
|
query=query,
|
||||||
args=json_dumps(kwargs),
|
args=json_dumps(kwargs),
|
||||||
completed_by={self.local_ts_id: True},
|
completed_by={self.local_ts_id: True}
|
||||||
result_checksum=result_checksum
|
|
||||||
)
|
)
|
||||||
session.add(new_query)
|
session.add(new_query)
|
||||||
await session.commit()
|
await session.commit()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
async def pull_query_tracking_from_primary(self):
|
async def pull_query_tracking_from_primary(self):
|
||||||
primary_ts_id = await self.get_primary_server()
|
primary_ts_id = await self.get_primary_server()
|
||||||
if not primary_ts_id:
|
if not primary_ts_id:
|
||||||
|
|
|
@ -205,7 +205,12 @@ async def store_weather_to_db(date_time: dt_datetime, weather_data: dict):
|
||||||
if daily_weather_result is None:
|
if daily_weather_result is None:
|
||||||
raise ValueError("Failed to insert daily weather data: no result returned")
|
raise ValueError("Failed to insert daily weather data: no result returned")
|
||||||
|
|
||||||
daily_weather_id = daily_weather_result.fetchone()[0]
|
daily_weather_row = daily_weather_result.fetchone()
|
||||||
|
if daily_weather_row is None:
|
||||||
|
raise ValueError("Failed to retrieve inserted daily weather ID: fetchone() returned None")
|
||||||
|
|
||||||
|
daily_weather_id = daily_weather_row[0]
|
||||||
|
|
||||||
l.debug(f"Inserted daily weather data with id: {daily_weather_id}")
|
l.debug(f"Inserted daily weather data with id: {daily_weather_id}")
|
||||||
|
|
||||||
# Hourly weather insertion
|
# Hourly weather insertion
|
||||||
|
@ -262,7 +267,11 @@ async def store_weather_to_db(date_time: dt_datetime, weather_data: dict):
|
||||||
if hourly_result is None:
|
if hourly_result is None:
|
||||||
l.warning(f"Failed to insert hourly weather data for {hour_data.get('datetimeEpoch')}")
|
l.warning(f"Failed to insert hourly weather data for {hour_data.get('datetimeEpoch')}")
|
||||||
else:
|
else:
|
||||||
hourly_id = hourly_result.fetchone()[0]
|
hourly_row = hourly_result.fetchone()
|
||||||
|
if hourly_row is None:
|
||||||
|
l.warning(f"Failed to retrieve inserted hourly weather ID for {hour_data.get('datetimeEpoch')}")
|
||||||
|
else:
|
||||||
|
hourly_id = hourly_row[0]
|
||||||
l.debug(f"Inserted hourly weather data with id: {hourly_id}")
|
l.debug(f"Inserted hourly weather data with id: {hourly_id}")
|
||||||
|
|
||||||
return "SUCCESS"
|
return "SUCCESS"
|
||||||
|
@ -271,6 +280,7 @@ async def store_weather_to_db(date_time: dt_datetime, weather_data: dict):
|
||||||
l.error(f"Traceback: {traceback.format_exc()}")
|
l.error(f"Traceback: {traceback.format_exc()}")
|
||||||
return "FAILURE"
|
return "FAILURE"
|
||||||
|
|
||||||
|
|
||||||
async def get_weather_from_db(date_time: dt_datetime, latitude: float, longitude: float):
|
async def get_weather_from_db(date_time: dt_datetime, latitude: float, longitude: float):
|
||||||
l.debug(f"Using {date_time.strftime('%Y-%m-%d %H:%M:%S')} as our datetime in get_weather_from_db.")
|
l.debug(f"Using {date_time.strftime('%Y-%m-%d %H:%M:%S')} as our datetime in get_weather_from_db.")
|
||||||
query_date = date_time.date()
|
query_date = date_time.date()
|
||||||
|
|
Loading…
Reference in a new issue