Support incremental update of Beancount transactions, embeddings

This commit is contained in:
Debanjum Singh Solanky 2022-09-10 20:55:32 +03:00
parent cfaf7aa6f4
commit 91aac83c6a

View file

@ -9,7 +9,7 @@ import re
import logging
# Internal Packages
from src.utils.helpers import get_absolute_path, is_none_or_empty
from src.utils.helpers import get_absolute_path, is_none_or_empty, mark_entries_for_update
from src.utils.constants import empty_escape_sequences
from src.utils.jsonl import dump_jsonl, compress_jsonl_data
@ -28,10 +28,20 @@ def beancount_to_jsonl(beancount_files, beancount_file_filter, output_file, prev
beancount_files = get_beancount_files(beancount_files, beancount_file_filter)
# Extract Entries from specified Beancount files
entries, transaction_to_file_map = extract_beancount_entries(beancount_files)
extracted_transactions, transaction_to_file_map = extract_beancount_entries(beancount_files)
# Convert Extracted Transactions to Dictionaries
current_entries = convert_transactions_to_maps(extracted_transactions, transaction_to_file_map)
# Identify, mark and merge any new entries with previous entries
if not previous_entries:
entries_with_ids = list(enumerate(current_entries))
else:
entries_with_ids = mark_entries_for_update(current_entries, previous_entries, key='compiled', logger=logger)
# Process Each Entry from All Notes Files
jsonl_data = convert_beancount_entries_to_jsonl(entries, transaction_to_file_map)
entries = list(map(lambda entry: entry[1], entries_with_ids))
jsonl_data = convert_transaction_maps_to_jsonl(entries)
# Compress JSONL formatted Data
if output_file.suffix == ".gz":
@ -39,7 +49,7 @@ def beancount_to_jsonl(beancount_files, beancount_file_filter, output_file, prev
elif output_file.suffix == ".jsonl":
dump_jsonl(jsonl_data, output_file)
return list(enumerate(entries))
return entries_with_ids
def get_beancount_files(beancount_files=None, beancount_file_filter=None):
@ -87,17 +97,20 @@ def extract_beancount_entries(beancount_files):
return entries, transaction_to_file_map
def convert_beancount_entries_to_jsonl(entries, transaction_to_file_map):
"Convert each Beancount transaction to JSON and collate as JSONL"
jsonl = ''
def convert_transactions_to_maps(entries: list[str], transaction_to_file_map) -> list[dict]:
"Convert each Beancount transaction into a dictionary"
entry_maps = []
for entry_id, entry in enumerate(entries):
entry_dict = {'compiled': entry, 'raw': entry, 'file': f'{transaction_to_file_map[entry_id]}'}
# Convert Dictionary to JSON and Append to JSONL string
jsonl += f'{json.dumps(entry_dict, ensure_ascii=False)}\n'
entry_maps.append({'compiled': entry, 'raw': entry, 'file': f'{transaction_to_file_map[entry_id]}'})
logger.info(f"Converted {len(entries)} to jsonl format")
logger.info(f"Converted {len(entries)} transactions to dictionaries")
return jsonl
return entry_maps
def convert_transaction_maps_to_jsonl(entries: list[dict]) -> str:
"Convert each Beancount transaction dictionary to JSON and collate as JSONL"
return ''.join([f'{json.dumps(entry_dict, ensure_ascii=False)}\n' for entry_dict in entries])
if __name__ == '__main__':