#!/usr/bin/env python3
# bean-add - Beancount transaction entry assistant
# Author: Simon Volpert <simon@simonvolpert.com>
# Project page: https://simonvolpert.com/bean-add/
# This program is free software, released under the Apache License, Version 2.0. See the LICENSE file for more information
# Consult the README file for usage instructions and other helpful hints
import locale
import re
import readline
import sys
import datetime
import subprocess
import os
# Set global constants
tempfile = '/tmp/bean-add.%s.tmp.bnct' % os.environ['USER']
usage = '''Usage: bean-add [-n] FILENAME
Add or edit transactions in the beancount journal FILENAME.
-n immediately enter a new transaction with today's date
-h, --help show this help message
See the README file for more information.'''
# Init transaction data
class data(object):
# Journal/transactions
journal = []
saved = []
external_journal = []
file_name = ''
defaults = {}
restore = []
mtime = 0
prices = {}
quote_currency = ''
constraints = {}
# A mapping from known accounts to their respective default currencies
accounts = {}
# Completion
currencies = []
tags = []
vocab = []
history = {
'command': [],
'date': [],
'description': [],
'account': [],
'amount': [],
'tag': [],
'seek': [],
'lookup': [],
'note': [],
'txids': [],
'currency': []
}
history_context = 'command'
# Current transaction
description = ''
last_currency = None
balance = 0
balance_currency = None
known_balances = {}
tag = 'bean-add'
txid = -1
date = datetime.date.today().isoformat()
date_prompt = '%Y-%m-%d'
lookup_string = ''
eof = False
# Statements
statement = []
statement_pattern = ' ! '
statement_replacement = ' * '
target = 0
statement_currency = ''
transaction_account = ''
funding_account = ''
# Options
sort_by_date = True
date_delimiter = '-'
indentation = None
use_defaults = None
paranoid_write = False
external_write = False
use_beancount_accounts = False
date_preview = True
context = 10
auto_flag = []
lookup_seek = None
max_expr_length = 40
auto_new = False
amount_width = 1
colors = False
last_bold = True
precisions = {}
default_precision = 8
# Ask for confirmation: str(prompt) -> bool(result)
def confirm(prompt='', default_yes=None):
# Unset completion
_vocab = data.vocab
_delims = readline.get_completer_delims()
data.vocab = []
readline.set_completer_delims('')
# Get a valid confirmation
ch = 'x'
while ch not in 'yn':
ch = input(prompt).lower()
if ch == '':
if default_yes is None:
continue
ch = 'y' if default_yes else 'n'
# Reset completion
data.vocab = _vocab
readline.set_completer_delims(_delims)
if ch == 'y':
return True
elif ch == 'n':
return False
# Cast a number into the appropriate format: number -> int or rounded float
def cast_number(number, precision=data.default_precision):
number = round(float(number), precision)
return int(number) if number.is_integer() else number
# Condense whitespace and return a list
def condense(line):
line = line.strip().split(' ')
_data = []
for i in line:
if i.strip() != '':
_data.append(i)
return _data
# Naive word pluralization
def pluralize(number, word):
number = str(number)
return '%s %s' % (number, word if number.endswith('1') and not number.endswith('11') else word + 's')
# Completion
def complete(text, state):
results = [x for x in data.vocab if x not in ['__balance__', '__pad__', '__note__'] and text.lower() in x.lower()] + [None]
return results[state]
# Set up histories
def set_history_context(context):
# Cache history for current context
data.history[data.history_context] = []
for i in range(readline.get_current_history_length()):
_hist = readline.get_history_item(i + 1)
# Remove duplicate history items
if _hist in data.history[data.history_context]:
data.history[data.history_context].remove(_hist)
data.history[data.history_context].append(_hist)
# Populate completion history with new context
readline.clear_history()
for i in data.history[context]:
readline.add_history(i)
data.history_context = context
# Insert a transaction in an appropriate place in the file: str(transaction) -> int(index)
def insert_transaction(tx):
_found = False
for _txid, jtx in enumerate(data.journal):
# Ignore records that don't start with a date
if jtx[0] not in '0123456789':
continue
# Insert balance and price statements as the first transaction of the date
if (tx[10:19] == ' balance ' or tx[10:17] == ' price ') and jtx[0:10] == tx[0:10]:
_found = True
break
# Seek forward to first bigger date
if jtx[0:10] > tx[0:10]:
_found = True
break
if _found and data.sort_by_date:
data.journal.insert(_txid, tx)
else:
# If not found, insert at the end of file
_txid = len(data.journal)
data.journal.append(tx)
print(f'\nTransaction added as record {_txid}\n\n{tx}')
data.txid = _txid
update_statement_refs(_txid, True)
# Date from string wrapper: str(date) -> date(date)
def strpdate(d):
return datetime.datetime.strptime(d, data.date_prompt).date()
# Figure out the full date from a day-only input
def sliding_window(day):
today = strpdate(data.date)
if today.day == day:
return today
future = today
# Seek to the nearest same-date in the next month, mark the delta
for future_delta in range(32):
future = future + datetime.timedelta(days=1)
if future.day == day:
break
past = today
# Seek to the nearest same-date in the previous month, mark the delta
for past_delta in range(32):
past = past - datetime.timedelta(days=1)
if past.day == day:
break
# Compare the deltas and pick the smallest
if past_delta <= future_delta and past_delta < 31:
return past
elif future_delta < past_delta:
return future
elif past_delta == future_delta == 31:
return None
# Figure out the full date from a day-month input.
def sliding_window_with_month(day, month):
today = strpdate(data.date)
dates = []
deltas = []
# Pick the selected date in the three nearest years
for i in [0, -1, 1]:
try:
dates.append(datetime.date(today.year + i, month, day))
deltas.append(abs(today - dates[-1]))
except ValueError:
continue
if dates == []:
return None
# Return the date which is closest to the present
return dates[deltas.index(min(deltas))]
# Read and validate a date: str(default) -> str(date)
def read_date(default):
# Unset completion
data.vocab = ['today']
readline.set_completer_delims('')
set_history_context('date')
# Get a valid date
while True:
reading = input(f'Enter transaction date (`h` or `?` for a format hint) [{default}]: ')
# Accepting the default
if reading == '':
return default
# Formatting hint
elif reading in '?h':
partial_prompt = '%m/%d' if locale.getlocale()[0] == 'en_US' else '%d/%m'
print(f'\nAccepted formats: {data.date_prompt}, or %d, or {partial_prompt}, or +/-days, or "today"\n')
continue
date = strpdate(default)
# Relative date
if reading.startswith('+') or reading.startswith('-'):
try:
date += datetime.timedelta(int(reading))
except ValueError:
continue
# Explicitly today
elif reading == 'today':
date = datetime.date.today()
# Partial notation (day only)
elif len(reading) == 1 or len(reading) == 2:
try:
date = sliding_window(int(reading))
except ValueError:
continue
if date is None:
continue
# Partial notation (day and month)
elif len(reading) <= 5 and ('.' in reading or '/' in reading):
separator = '.' if '.' in reading else '/'
try:
day, month = reading.split(separator)
# American order with month first
if locale.getlocale()[0] == 'en_US':
month, day = day, month
date = sliding_window_with_month(int(day), int(month))
except ValueError:
continue
if date is None:
continue
# Absolute date
else:
try:
date = strpdate(reading)
except ValueError:
continue
date = date.isoformat()
if default != date:
if data.date_preview:
default = date
continue
print(f'\nAdding new transaction at {date}.\n')
return date
# Read a description -> str(description)
def read_description():
# Set completion
data.vocab = data.defaults
readline.set_completer_delims('')
set_history_context('description')
# Get a non-empty description
reading = input(f'Enter transaction description [{data.description}]: ').strip(' "')
if reading == '':
if data.description != '':
return data.description
return None
data.description = reading
return reading
# Read a valid account name
def read_account():
# Set completion
data.vocab = data.accounts
readline.set_completer_delims('')
set_history_context('account')
while True:
# Read a valid account
reading = input('Enter an account name (tab to complete, enter to finish): ')
if reading == '':
return None
# Add new account option
if reading not in data.accounts:
if not confirm(f'The account `{reading}` is not in the journal file. Add it? (Y/n) ', True):
continue
insert_transaction(f'{data.date} open {reading}')
print()
data.accounts[reading] = None
return reading
# Normalize amounts with unnecessary decimal places
# number, rounding precision -> string
def undecimal(number, precision=data.default_precision):
try:
number = round(float(number), precision)
except ValueError:
print('\nThis is not a valid number.')
raise KeyboardInterrupt
number = f'{number:.8f}'.rstrip('0')
if number.endswith('.'):
number = number[0:-1]
elif number == '':
number = '0'
elif '.' in number and len(number.split('.')[1]) == 1:
number += '0'
return number
# Calculate simple arithmetic expressions
def evaluate(expression):
try:
expression = str(expression)
# Restrict to nothing but numbers and basic arithmetic operators and attempt to limit computation cost
if len(expression) > data.max_expr_length:
raise ValueError
if '**' in expression:
raise ValueError
for n in expression:
if n not in '0123456789.+-*/()e':
raise ValueError
return eval(expression)
except (SyntaxError, ValueError, ArithmeticError):
print('\nInvalid arithmetic expression.')
raise
# Add previously unseen currency signs to the completion list
def detect_currencies(line):
if type(line) is str:
line = line.split()
_currencies = [line[-1]]
if '@' in line:
_currencies.append(line[line.index('@') - 1])
for cur in _currencies:
try:
int(cur)
except ValueError:
data.last_currency = cur
if cur not in data.currencies:
data.currencies.append(cur)
# Read and normalize transaction amount
def read_amount(account):
# Set completion
data.vocab = data.currencies
readline.set_completer_delims(' ')
set_history_context('amount')
# Read an amount, taking defaults into account (balancing currency, then default currency, then last used currency)
default_currency = data.last_currency
if account in data.accounts and data.accounts[account] is not None:
default_currency = data.accounts[account]
if data.balance_currency is not None:
default_currency = data.balance_currency
amount = input('Enter the amount for `%s` (including currency symbol) [%s %s]: ' % (account, undecimal(data.balance), default_currency)).replace(',', '')
amount = [data.balance, default_currency] if amount == '' else amount.split()
# Process currency sign
if len(amount) == 1:
amount.append(default_currency)
data.accounts[account] = data.last_currency = default_currency
else:
if account in data.constraints and amount[1] not in data.constraints[account]:
print('\nThis account is constrained to: {}'.format(', '.join(data.constraints[account])))
raise ValueError
if len(amount) == 4:
amount.append(data.last_currency)
detect_currencies(amount)
data.accounts[account] = amount[1]
data.last_currency = amount[-1]
# Normalize numbers and process conversion rate
amount[0] = undecimal(evaluate(amount[0]))
_amt = amount[0].split('.')[0] if '.' in amount[0] else amount[0]
data.amount_width = max(data.amount_width, len(_amt.strip('-')) + 3)
if '@' in amount:
amount[-2] = undecimal(evaluate(amount[-2]), 3)
data.balance -= float(amount[0]) * float(amount[-2])
else:
data.balance -= float(amount[0])
if amount[-1] in data.precisions:
data.balance = round(data.balance, data.precisions[amount[-1]])
deduce_commodity_price(amount)
data.balance = cast_number(data.balance)
data.balance_currency = amount[-1]
# Finalize and return
return ' '.join(amount)
# Read a currency sign
def read_currency():
# Set completion
data.vocab = data.currencies
readline.set_completer_delims('')
set_history_context('currency')
cur = input('Enter currency symbol: ')
return cur
# Figure out the implied price of a currency
def deduce_commodity_price(line):
if data.quote_currency == '':
return
if type(line) is str:
line = condense(line)
if 'price' in line and line[1] == 'price':
base_currency = line[2]
rate, currency = line[3:5]
elif '@' in line:
if len(line) < 5:
print('\nInvalid exchange rate declaration.')
raise ValueError
p = line.index('@')
base_currency = line[p - 1]
rate, currency = line[p + 1:p + 3]
else:
return
rate = float(rate)
# base_currency @ rate [currency] <= quote_currency
if currency == data.quote_currency:
data.prices[base_currency] = rate
# quote_currency => [base_currency] @ rate currency
elif base_currency == data.quote_currency:
data.prices[currency] = 1 / rate
# Toggle transaction flag
def toggle_flag():
if data.txid < 0:
print('\nNothing to flag, no current record.')
return
if ' * ' in data.journal[data.txid]:
data.journal[data.txid] = data.journal[data.txid].replace(' * ', ' ! ')
flagged = True
elif ' ! ' in data.journal[data.txid]:
data.journal[data.txid] = data.journal[data.txid].replace(' ! ', ' * ')
flagged = False
else:
print('\nRecord type does not support flags.')
return
print('\nTransaction flag %sset.\n' % ('un' if not flagged else ''))
print(data.journal[data.txid])
# Toggle the flags on individual transaction legs
def toggle_flag_with_legs(account=None):
if ' * ' not in data.journal[data.txid] and ' ! ' not in data.journal[data.txid]:
print('\nRecord type does not support flags.')
return
if ' * ' in data.journal[data.txid]:
if account is None:
print('\nFlag which transaction legs?')
account = read_account()
data.journal[data.txid] = data.journal[data.txid].replace(account, '! %s' % account).replace('! !', '!')
else:
# Replace any type/combination of whitespace used for indentation, followed by "! ", with just the leading whitespace
data.journal[data.txid] = re.sub('\n(\\s+)! ', '\n\\1', data.journal[data.txid])
toggle_flag()
# Add or remove transaction tags
def add_remove_tag():
if data.txid < 0:
print('\nNothing to tag, no current record.')
return
# Set completion
data.vocab = data.tags
readline.set_completer_delims('')
set_history_context('tag')
# Make sure the transaction can be tagged
tx = data.journal[data.txid].split('\n')
if len(tx) == 1:
print('\nCannot add tags to special transactions.')
return
# Get and normalize a tag
try:
_tag = input(f'Enter a tag [{data.tag}]: ')
except KeyboardInterrupt:
print('\nTagging cancelled.')
return
except EOFError:
data.eof = True
return
_tag = _tag.strip('# ')
if _tag == '':
_tag = data.tag
else:
data.tag = _tag
_tag = '#' + _tag
# If the tag is already there, remove it instead
tx_tags = tx[0].split()
if _tag in tx_tags:
tx_tags.remove(_tag)
# Otherwise, apply the tag
else:
tx_tags.append(_tag)
tx[0] = ' '.join(tx_tags)
data.journal[data.txid] = '\n'.join(tx)
print()
print(data.journal[data.txid])
# Seek to a specific record in the journal file
def seek_to_transaction(autoseek=None):
while True:
# Early return
if data.journal == []:
print('\nThe journal is empty.')
return
last_tx = len(data.journal) - 1
if autoseek is None:
# Manual seeking input
set_history_context('seek')
# Unset completion
data.vocab = []
readline.set_completer_delims('')
if data.txid >= 0:
print(f'\nCurrently at record {data.txid}.')
or_count = ', or +/-COUNT'
else:
print('\nNo current record.')
or_count = ''
try:
reading = input(f'Enter record number (0-{last_tx}{or_count}): ')
except KeyboardInterrupt:
print('\nSeeking cancelled.')
return
except EOFError:
data.eof = True
return
else:
# Scripted seeking
reading = str(autoseek)
try:
_txid = int(reading)
except ValueError:
if reading != '':
print('\nSeeking cancelled.')
return
_txid = data.txid
# Relative value
if reading.startswith('+') or reading.startswith('-'):
if data.txid == -1:
if autoseek is not None:
print('\nNo current record.')
return
else:
print('\nSeeking cancelled.')
return
else:
_txid += data.txid
# Beginning of journal
if _txid < 0:
if data.txid == 0:
print('\nYou are at the first record.')
return
else:
_txid = 0
break
# End of journal
elif _txid > last_tx:
if data.txid == last_tx:
print('\nYou are at the last record.')
return
else:
_txid = last_tx
break
# Explicit and absolute value
else:
break
# Change to the specified record
print(f'\nRecord {_txid}:\n')
print(data.journal[_txid])
data.txid = _txid
# Return a list of txids which contain all of the passed strings
def tx_lookup(patterns, prune_txids=[]):
if type(patterns) is str:
patterns = [patterns]
patterns = [x.lower() for x in patterns]
results = []
for i, tx in enumerate(data.journal):
if i not in prune_txids:
match = True
record = tx.lower()
for pattern in patterns:
# Use an AND keyword joining
if pattern not in record:
match = False
break
if match:
results.append(i)
return results
# Find transactions that contain a string
def find_transactions(autoseek=None):
# Use all possible completion contenxts for lookups
data.vocab = list(data.accounts) + list(data.defaults) + ['#' + x for x in data.tags]
readline.set_completer_delims('')
set_history_context('lookup')
# Autoseeking without doing a lookup first
if autoseek is not None and data.lookup_string == '':
print('\nNo lookup pattern.')
return
# Get a lookup pattern or reuse the previous one if autoseeking
if autoseek is None:
try:
reading = input('Enter a search string: ').strip('\n')
except KeyboardInterrupt:
print('\nLookup cancelled.')
return
except EOFError:
data.eof = True
return
else:
reading = data.lookup_string
if reading == '':
print('\nLookup cancelled.')
return
results = tx_lookup(reading)
# Use default lookup-seek option
if autoseek is None:
autoseek = data.lookup_seek
data.lookup_string = reading
if results == []:
print('\nNo results.')
return
# Check for autoseek
_txid = -1
if autoseek is not None:
# Last result
if autoseek is True:
_txid = results[-1]
# First result
elif autoseek is False:
_txid = results[0]
# Next and previous results
elif autoseek == 1 or autoseek == -1:
_txid = data.txid
while _txid >= 0 and _txid < len(data.journal):
_txid += autoseek
if _txid in results:
break
if _txid == len(data.journal):
if data.txid > results[-1]:
print('\nNo more results.')
else:
print('\nYou are on the last result.')
return
elif _txid == -1:
if data.txid < results[0]:
print('\nNo more results.')
else:
print('\nYou are on the first result.')
return
else:
_txid = data.txid
# Print the transaction list
if autoseek is not None:
data.txid = _txid
# Show a slice of the results around the current record
idx = results.index(_txid)
start = max(0, idx - data.context)
end = min(len(results), idx + data.context + 1)
try:
for tx in results[start:end]:
print_transaction(tx)
except KeyboardInterrupt:
pass
# Seek to the proper record
if autoseek is not None:
seek_to_transaction(_txid)
return results
# Get default accounts for a transaction
def get_default_accounts(description):
if description in data.defaults:
print('Previously used accounts for this transaction:')
for _account in data.defaults[description]:
print(f'\t{_account}')
print()
_use = confirm('Use these accounts? (Y/n) ', True) if data.use_defaults is None else data.use_defaults
if _use:
return data.defaults[description].copy()
return []
# Enter a normal transaction
def read_normal_transaction(description=None):
output = ''
_transaction = []
data.balance = 0
data.balance_currency = None
if not data.auto_new:
data.date = read_date(data.date)
if description is None:
description = read_description()
if description is None:
raise KeyboardInterrupt
else:
output += f'{data.date} * "{description}"\n'
defaults = get_default_accounts(description)
while True:
# Use defaults, if available, then proceed to manual account input
if len(defaults) > 0:
account = defaults.pop(0)
else:
account = read_account()
if account is None:
if data.balance != 0:
if not confirm('\nThe transaction is unbalanced! Really finish? (y/N) ', False):
continue
break
_transaction.append(account)
# Retry amount entry until valid input is received
while True:
try:
amount = read_amount(account)
except KeyboardInterrupt:
raise
except (SyntaxError, ValueError, IndexError, ArithmeticError):
continue
break
# Zero amounts are meaningless in this context
if amount.startswith('0 '):
print(f'Removing zero-amount `{account}` from transaction.')
del(_transaction[-1])
else:
output += f'{data.indentation}{account} {amount}\n'
# The transaction is empty, no money moved anywhere
if _transaction == []:
raise KeyboardInterrupt
# Update defaults and insert the transaction
data.defaults[description] = _transaction
insert_transaction(output.strip())
# Auto-flag if appropriate
for _account in data.auto_flag:
_count = output.count(_account)
if _count > 1:
toggle_flag_with_legs(_account)
break
elif _count == 1:
toggle_flag()
break
# Read a balance assertion transaction
def read_balance_transaction():
print('\nAdding a new balance assertion.')
data.date = read_date(data.date)
defaults = get_default_accounts('__balance__')
if defaults == []:
account = read_account()
data.defaults['__balance__'] = [account]
else:
account = defaults[0]
data.balance = 0
amount = read_amount(account)
output = f'{data.date} balance {account} {amount}'
cur = amount.split()[1]
balance = calculate_balance(account, before=data.date)[cur]
insert_transaction(output)
# Warn about mismatching balances
delta = float(amount.split()[0]) - balance
difference = 'missing' if delta < 0 else 'extra'
if delta != 0:
print('\nWarning: The balance does not match ({expected} {cur} expected, {delta} {cur} {difference}).'.format(expected=undecimal(balance), difference=difference, delta=undecimal(abs(delta)), cur=cur))
# Read a pad statement
def read_pad_transaction():
print('\nAdding a new pad statement.')
data.date = read_date(data.date)
# Print a helpful hint
if '__pad__' not in data.defaults:
print('\nEnter the source and destination accounts.')
defaults = get_default_accounts('__pad__')
if defaults == []:
account1 = read_account()
account2 = read_account()
data.defaults['__pad__'] = [account1, account2]
else:
account1, account2 = defaults
output = f'{data.date} pad {account1} {account2}'
insert_transaction(output)
# Read a note statement
def read_note_transaction():
# Unset completion
data.vocab = []
readline.set_completer_delims('')
set_history_context('note')
print('\nAdding a new note statement.')
data.date = read_date(data.date)
defaults = get_default_accounts('__note__')
if defaults == []:
account = read_account()
data.defaults['__note__'] = [account]
else:
account = defaults[0]
description = input('Enter the note text: ').strip(' "')
output = f'{data.date} note {account} "{description}"'
insert_transaction(output)
# Read a price statement
def read_price_statement():
# Unset completion
data.vocab = []
readline.set_completer_delims('')
set_history_context('currency')
print('\nAdding a new price statement.')
data.date = read_date(data.date)
currency = read_currency()
try:
rate, quote_currency = read_amount('the quote').split()
except ValueError:
raise KeyboardInterrupt
tx = f'{data.date} price {currency} {rate} {quote_currency}'
insert_transaction(tx)
# Remove a transaction
def remove_transaction(silent_drop):
if data.txid >= 0:
if silent_drop:
del(data.journal[data.txid])
else:
data.restore.append(data.journal.pop(data.txid))
print(f'\nRemoved record {data.txid}')
if data.txid in data.statement:
data.statement.remove(data.txid)
update_statement_refs(data.txid, False)
if data.txid >= len(data.journal):
data.txid -= 1
else:
print('\nNothing to remove, no current record.')
# Edit the current transaction with an external editor
def edit_transaction():
if 'EDITOR' not in os.environ or os.environ['EDITOR'] == '':
print('\nThe $EDITOR environmental variable is not set, editing unavailable.')
return
if data.txid < 0:
print('\nNothing to edit, no current record.')
return
# Save the transaction in a temp file
with open(tempfile, 'w') as tmp_file:
tmp_file.write(data.journal[data.txid])
try:
subprocess.call([os.environ['EDITOR'], tempfile])
except Exception as exc:
print('\nCould not start the editor: %s\n' % exc)
# Reload the transaction from the file
with open(tempfile, 'r') as tmp_file:
_tx = ''.join(tmp_file.readlines()).strip('\n')
if _tx == data.journal[data.txid]:
print('Nothing changed.')
else:
# Update the record
if data.journal[data.txid][0:10] != _tx[0:10]:
# Reinsert the transaction where appropriate if the date changed...
remove_transaction(True)
insert_transaction(_tx)
else:
# ...Update in-place otherwise
data.journal[data.txid] = _tx
print('Record updated.')
os.unlink(tempfile)
# Print a transaction in short format
def print_transaction(tx, tx_amount=None):
tx_string = data.journal[tx].split('\n')
# Display the amount on the first leg in the preview unless given an explicit amount
if tx_amount is None:
try:
# Don't bother figuring out amounts if it's not a standard transaction
if '!' not in tx_string[0] and '*' not in tx_string[0]:
raise IndexError
first_account = condense(tx_string[1])
if first_account[0] == '!':
del first_account[0]
tx_amount = first_account[1:]
except IndexError:
tx_amount = ''
if tx_amount != '':
# Extract number from amount
try:
number = tx_amount[0]
except IndexError:
number = ''
try:
remainder = ' '.join(tx_amount[1:])
except IndexError:
remainder = ''
# Pad integer amounts
if '.' not in number:
number += ' '
# Pad positive amounts
if not number.startswith('-'):
number = ' ' + number
# Justify amount
tx_amount = ' '.join([number.rjust(data.amount_width), remainder[:3]])
# Trim and pad description
txid_width = len(str(len(data.journal)))
desc_width = os.get_terminal_size()[0] - txid_width - len(tx_amount) - 5
description = tx_string[0][:desc_width].ljust(desc_width)
# Mark current transaction
tx_number = '>' * (txid_width + 2) if tx == data.txid else '[{}]'.format(tx).ljust(txid_width + 2)
bold = '%s' if not data.colors or data.last_bold else '\33[1m%s\33[0m'
data.last_bold = not data.last_bold
print(bold % '%s %s %s' % (tx_number, description, tx_amount))
# Print the context to the current transaction (default: +-10 transactions)
def print_journal():
if data.txid == -1:
print('\nNo current record.')
return
start = max(data.txid - data.context, 0)
end = min(data.txid + data.context, len(data.journal))
data.last_bold = True
for tx in range(start, end):
print_transaction(tx)
# Attempt to deduce an implied amount: int txid -> bool transaction is correct
def deduce_implied_amount(txid):
_tx = data.journal[txid].split('\n')
lineid = -1
amount = 0
cur = ''
for lineno, line in enumerate(_tx):
line = condense(line)
# Ignore anything that isn't a transaction leg
if line[0] == '!':
del(line[0])
if line[0] not in data.accounts:
continue
# Find the leg that needs to be deduced
if len(line) == 1:
if lineid != -1:
print('\nError: More than one implied amount in record %s.' % txid)
return False
else:
lineid = lineno
else:
# FIXME multiple currencies are not supported
if cur != line[-1] and cur != '':
print('\nWarning: Multiple transactional currencies in record %s.' % txid)
return True
cur = line[-1]
if '@' in line and len(line) == 5:
line[-2] = undecimal(line[-2], 3)
amount -= float(line[1]) * float(line[-2])
else:
amount -= float(line[1])
amount = cast_number(amount)
# The record is perfectly fine as-is
if lineid == -1:
return True
# Something needs to be done
if amount != 0:
_tx[lineid] = _tx[lineid] + ' %s %s' % (undecimal(amount), cur)
else:
print('\nCould not figure out implied amount in record %s.' % txid)
return False
# Update the record
data.journal[txid] = '\n'.join(_tx)
print('Implied amounts corrected in record %s.' % txid)
return True
# Calculate the balance of an account
def calculate_balance(account, txlist=None, statement_verification_quirks=False, before=None):
# If no list of transactions to consider is given, default to the entire journal
if txlist is None:
if before is None:
txlist = list(range(len(data.journal)))
# If end date is given, summarize only records before it
else:
txlist = []
for txid, tx in enumerate(data.journal):
if tx[:10] < before:
txlist.append(txid)
amounts = {}
for currency in data.currencies:
amounts[currency] = 0
# Go through the transaction list
for tx in txlist:
# Ignore comments
if data.journal[tx].startswith(';'):
continue
# See if it need corrections
if account in data.journal[tx]:
if not deduce_implied_amount(tx):
continue
lines = data.journal[tx].split('\n')
for _line in lines:
if account in _line and len(_line) < 3:
if not deduce_implied_amount(tx):
continue
# If the record is a balance statement, use that
line = condense(lines[0])
if len(line) == 5 and line[1] == 'balance' and line[2] == account:
amounts[line[4]] = cast_number(line[3])
continue
# If statement verification, look for the first flagged leg
if statement_verification_quirks:
flagid = -1
for _line in lines[1:]:
line = condense(_line)
if line[0] == '!':
flagid = lines.index(_line)
break
if flagid > -1:
if len(line) < 3:
print('Missing amount ignored in record %s.' % tx)
continue
line = condense(data.journal[tx].split('\n')[flagid])[1:]
amounts[line[2]] = cast_number(amounts[line[2]] + float(line[1]))
continue
# Else, just add the amount to the balance
for _line in lines[1:]:
line = condense(_line)
if line[0] == '!':
del(line[0])
if line[0] == account:
if len(line) < 3:
print('Missing amount ignored in record %s.' % tx)
continue
deduce_commodity_price(line)
amounts[line[2]] = cast_number(amounts[line[2]] + float(line[1]))
return amounts
# Count and print the balances for an account
def print_account_balances():
try:
account = read_account()
if account is None:
raise KeyboardInterrupt
except KeyboardInterrupt:
print('\nCancelled.')
return
except EOFError:
data.eof = True
return
balances = data.known_balances[account].copy() if account in data.known_balances else None
data.known_balances[account] = calculate_balance(account)
if balances is None:
balances = data.known_balances[account].copy()
print('\nBalances for %s:' % account)
output = ''
for currency in data.currencies:
value = ''
if data.quote_currency != '' and currency != data.quote_currency:
if currency not in data.prices:
value = ' [Unknown conversion rate]'
else:
value = ' [%s %s @ %s]' % (undecimal(balances[currency] * data.prices[currency]), data.quote_currency, undecimal(data.prices[currency], 3))
difference = data.known_balances[account][currency] if currency not in balances else data.known_balances[account][currency] - balances[currency]
if difference == 0:
if data.known_balances[account][currency] != 0:
output += '%s %s%s\n' % (undecimal(data.known_balances[account][currency]), currency, value)
else:
difference = '%s%s' % ('+' if difference > 0 else '', undecimal(difference))
output += '%s %s (%s %s)%s\n' % (undecimal(data.known_balances[account][currency]), currency, difference, currency, value)
if output == '':
output = '(nothing)'
print(output)
# Estimate the balance of the next statement
def estimate_statement_amount(account=None):
interactive = True if account is None else False
if interactive:
try:
account = read_account()
if account is None:
raise KeyboardInterrupt
except KeyboardInterrupt:
print('\nCancelled.')
return
except EOFError:
data.eof = True
return
transactions = tx_lookup([account, data.statement_pattern])
balances = calculate_balance(account, transactions, True)
if interactive:
print('Estimated amount of the next statement:')
for currency in balances:
if balances[currency] != 0:
line = '%s %s' % (undecimal(abs(balances[currency])), currency)
if not interactive:
return line
print(line)
# Nudge txids in the statement following a journal modification
def update_statement_refs(txid, insertion):
for tx, statement in enumerate(data.statement):
if insertion:
if statement >= txid:
data.statement[tx] += 1
else:
if statement > txid:
data.statement[tx] -= 1
# Read the accounts related to a statement, or use the defaults
def read_statement_accounts():
if data.transaction_account != '' and data.funding_account != '':
print('\nPreviously used statement accounts:')
print(' %s\n %s' % (data.transaction_account, data.funding_account))
print()
_use = confirm('Use these accounts? (Y/n) ', True) if data.use_defaults is None else data.use_defaults
if _use:
return
print('\nEnter the account the statement refers to.')
data.transaction_account = read_account()
print('\nEnter the account which funds the statement.')
data.funding_account = read_account()
# Verify a statement
def verify_statement():
# If wasn't working on a statement before
if data.target == 0 or not confirm('Continue ongoing statement verification? (Y/n) ', True):
# Get some general information
try:
data.statement = []
read_statement_accounts()
print()
data.balance, data.last_currency = estimate_statement_amount(data.transaction_account).split()
data.balance = float(data.balance)
while True:
try:
amount = read_amount('the statement')
except (SyntaxError, ValueError, ArithmeticError):
continue
break
data.target = cast_number(amount.split()[0])
if data.target == 0:
raise KeyboardInterrupt
data.statement_currency = amount.split()[1]
except KeyboardInterrupt:
print('\nCancelled.')
return
except EOFError:
data.eof = True
return
neg = -1 if data.transaction_account.startswith('Liabilities:') or data.transaction_account.startswith('Income:') else 1
set_history_context('txids')
while True:
# Tally the balance
balance = calculate_balance(data.transaction_account, data.statement, True)[data.statement_currency] * neg
print('\nTotal: %s %s (target: %s %s, difference: %s %s)' % (undecimal(balance), data.statement_currency, undecimal(data.target), data.statement_currency, undecimal(balance - data.target), data.statement_currency))
if balance == data.target:
break
transactions = tx_lookup([data.transaction_account, data.statement_pattern], data.statement)
# List matching records neatly justified
for title, txlist in [
['Statement Records', data.statement],
['Available Records', transactions]
]:
print('\n------- {} '.format(title).ljust(os.get_terminal_size()[0] + 1, '-'))
if txlist == []:
print(' (nothing)')
continue
data.last_bold = True
for tx in txlist:
# Ignore commented out records
if data.journal[tx].startswith(';'):
continue
_amount = undecimal(calculate_balance(data.transaction_account, [tx], True)[data.statement_currency] * neg)
print_transaction(tx, [_amount, data.statement_currency])
# Select records to add or remove
print('\nSelect records to add to the statement. Type a record number already on the statement to remove it.')
try:
reading = int(input('Enter a record number: ').strip())
if reading < 0 or reading > len(data.journal) - 1:
raise ValueError
except ValueError:
print('\nInvalid record number.')
continue
except KeyboardInterrupt:
print('\nPostponing statement verification.')
return
except EOFError:
data.eof = True
return
# Add the input to history
if str(reading) in data.history['seek']:
data.history['seek'].remove(str(reading))
data.history['seek'].append(str(reading))
# Add or remove records
if reading in data.statement:
data.statement.remove(reading)
elif reading in transactions:
data.statement.append(reading)
data.statement.sort()
else:
print('\nThis record is not relevant to the statement.')
continue
# Modify the journal
if confirm('Commit statement to journal? (Y/n) ', True):
try:
data.date = read_date(data.date)
desc = read_description()
except KeyboardInterrupt:
print('\nPostponing statement verification.')
return
except EOFError:
data.eof = True
return
print('\nSetting transaction flags.')
lookup_string = '! %s' % data.transaction_account
for tx in data.statement:
# Unflag one of the legs first
if lookup_string in data.journal[tx]:
data.journal[tx] = data.journal[tx].replace(lookup_string, data.transaction_account, 1)
# And if it was the last one, unflag the transaction also
if lookup_string not in data.journal[tx]:
data.journal[tx] = data.journal[tx].replace(data.statement_pattern, data.statement_replacement)
# Add the statement transaction
output = '%s * "%s"\n%s%s %s %s\n%s%s %s %s' % (data.date, desc, data.indentation, data.funding_account, undecimal(balance * -1), data.statement_currency, data.indentation, data.transaction_account, undecimal(balance), data.statement_currency)
insert_transaction(output)
# Mark the verification as finished
data.target = 0
# Move any new transactions to the external journal
def externalize_transactions():
journal = []
for _tx in data.journal:
if _tx not in data.saved:
journal.append(_tx)
return journal
# Duplicate the current transaction at another date
def duplicate_transaction():
if data.txid == -1:
print('\nNo current record.')
return
_tx = data.journal[data.txid]
# If the record does not start with a date, just copy it to the end of the file
if _tx[0] not in '0123456789':
print('\nTransaction added as record %s\n\n%s' % (len(data.journal), _tx))
data.journal.append(_tx)
else:
data.date = read_date(data.date)
new_tx = data.date + data.journal[data.txid][10:]
insert_transaction(new_tx)
# Print changes from saved journal
def print_journal_diff():
if data.journal == data.saved:
print('\nNo changes.')
return
endcolor = '\33[0m' if data.colors else ''
journal = set(externalize_transactions() if data.external_write else data.journal)
saved = set(data.external_journal if data.external_write else data.saved)
# Look for additions to the journal
for record in journal:
if record not in saved:
print()
color = '\33[32m' if data.colors else ''
record = f'{color}+' + f'{endcolor}\n{color}+'.join(record.split('\n')) + f'{endcolor}'
print(record)
# Look for deletions from the journal (except for external writes)
if data.external_write:
return
for record in saved:
if record not in journal:
print()
color = '\33[31m' if data.colors else ''
record = f'{color}-' + f'{endcolor}\n{color}-'.join(record.split('\n')) + f'{endcolor}'
print(record)
# Process command line arguments
if len(sys.argv) < 2:
print(usage)
sys.exit(1)
elif sys.argv[1] == '-h' or sys.argv[1] == '--help':
print(usage)
sys.exit()
elif '-n' in sys.argv:
sys.argv.remove('-n')
data.auto_new = True
# Read and process journal file
data.file_name = sys.argv[1]
try:
with open(data.file_name, 'r') as source_file:
tx_file = source_file.read()
data.mtime = os.stat(data.file_name).st_mtime
except FileNotFoundError:
tx_file = ''
except Exception as exc:
print('Could not open the file: %s' % exc)
sys.exit(2)
description = ''
lastdate = '0'
for full_line in tx_file.split('\n'):
line = full_line.strip()
if line == '':
# Transaction came out empty
if description in data.defaults and data.defaults[description] == []:
del(data.defaults[description])
description = ''
continue
# Skip comments, but make use of the option strings
elif line.startswith(';'):
if line.startswith(';*bean-add* '):
_auto_flag = False
_quote = False
_precision = False
_statement = False
for i in line.split()[1:]:
if i == 'a':
data.use_defaults = True
elif i == 'aa':
data.use_defaults = False
elif i == 's':
data.sort_by_date = False
elif i == 'w':
data.paranoid_write = True
elif i == 'e':
data.external_write = True
elif i == 'd':
data.date_preview = False
elif i == 'l':
data.lookup_seek = True
elif i == 'll':
data.lookup_seek = False
elif i == 'c':
data.colors = True
elif i == 'b':
data.use_beancount_accounts = True
elif i == 'f':
_auto_flag = True
elif _auto_flag:
data.auto_flag.append(i)
_auto_flag = False
elif i == 'q':
_quote = True
elif _quote:
data.quote_currency = i
_quote = False
elif i == 'v':
_statement = True
elif _statement:
try:
data.transaction_account, data.funding_account = i.split(',')
except ValueError:
print('Malformed option: v %s' % i)
_statement = False
elif i == 'p':
_precision = True
elif _precision:
try:
currency, digits = i.split(':')
data.precisions[currency] = int(digits)
except ValueError:
print('Malformed option: p %s' % i)
_precision = False
else:
print('Unrecognized option: %s' % i)
print('Loaded options from journal file.')
continue
# Process and cache commodity prices
_data = condense(line)
deduce_commodity_price(_data)
try:
# Attach transaction legs to previous description
if _data[0] == '!':
del _data[0]
if _data[0] in data.accounts:
# Detect indentation type
if data.indentation is None:
_match = re.match('\\s+', full_line)
if _match is not None:
data.indentation = _match.group()
data.defaults[description].append(_data[0])
# Detect involved currencies and amounts
if len(_data) > 1:
detect_currencies(_data)
data.accounts[_data[0]] = data.last_currency
_amount = str(int(float(_data[1])))
data.amount_width = max(data.amount_width, len(_amount))
# Add account name to account list
elif _data[1] == 'open':
data.accounts[_data[2]] = None
if len(_data) > 3:
data.constraints[_data[2]] = _data[3].split(',')
data.accounts[_data[2]] = data.constraints[_data[2]][0]
# Remove closed accounts from completion
elif _data[1] == 'close' and _data[2] in data.accounts:
del data.accounts[_data[2]]
# Cache command data
elif _data[1] == 'balance':
data.defaults['__balance__'] = [_data[2]]
elif _data[1] == 'pad':
data.defaults['__pad__'] = [_data[2], _data[3]]
# Record a description
elif _data[1] in '!*':
_date = _data[0][0:10]
if data.sort_by_date and _date < lastdate:
print('Sorting by date disabled due to inconsistent ordering at %s.' % _date)
data.sort_by_date = False
else:
lastdate = _date
data.date_delimiter = _data[0][4]
description = line[13:].split('#')[0].strip('" ')
data.defaults[description] = []
# Cache any tags present
if '#' in line:
for word in _data:
if word.startswith('#'):
if not word[1:] in data.tags:
data.tags.append(word[1:])
data.tag = word[1:]
# Pass through malformed (or unrecognized) lines without stopping
except IndexError:
continue
# Try to use beancount to get the account list
if data.use_beancount_accounts:
try:
beancount = subprocess.Popen(['bean-report', data.file_name, 'accounts'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = beancount.communicate()
for _line in str(stdout, 'UTF-8').split('\n'):
_line = condense(_line)
if len(_line) == 2 and _line[0] not in data.accounts:
data.accounts[_line[0]] = None
elif len(_line) == 3 and _line[0] in data.accounts:
del data.accounts[_line[0]]
except Exception as exc:
print('Unable to use beancount to get the account list: %s' % exc)
data.amount_width += 3
# Prepare file for transaction insertion
data.journal = tx_file.split('\n\n')
data.journal = [i.strip() for i in data.journal]
# Initialize modification tracking
data.saved = data.journal.copy()
# Remove empty records
while '' in data.journal:
data.journal.remove('')
# Move balance statements to the beginning of each day
for i, jtx in enumerate(data.journal):
if jtx[0] in '0123456789' and (jtx[10:19] == ' balance ' or jtx[10:17] == ' price '):
_date = jtx[0:10]
_tx = i
while True:
# Seek backwards until the first non-comment transaction or another balance statement with an earlier date
_tx -= 1
if data.journal[_tx][0] not in '0123456789':
continue
if not data.journal[_tx].startswith(_date) or (data.journal[_tx][10:19] == ' balance ' or data.journal[_tx][10:17] == ' price '):
# Attempt to move the balance statement up
if _tx == i - 1:
break
data.journal.insert(_tx + 1, data.journal[i])
del(data.journal[i + 1])
break
# Notify that some trivial fixes were made
if data.journal != data.saved:
print('Automatic normalizations applied.')
data.saved = data.journal.copy()
# Set up the completer
readline.parse_and_bind('tab: menu-complete')
readline.parse_and_bind(r'"\e[Z": menu-complete-backward')
readline.parse_and_bind('set show-all-if-ambiguous on')
readline.parse_and_bind('set menu-complete-display-prefix on')
readline.set_completer(complete)
# Print some statistics
print('\n%s processed' % pluralize(len(data.journal), 'record'))
if len(data.accounts) > 0:
print('%s, %s, %s and %s loaded' % (
pluralize(len(data.accounts), 'account name'),
pluralize(len(data.defaults), 'unique description'),
pluralize(len(data.currencies), 'currency sign'),
pluralize(len(data.tags), 'tag')))
else:
# Empty file
if tx_file == '':
print('File does not exist, or is empty.')
# Not empty, but un-useful file
else:
print('File contains no account definitions.')
if not confirm('Proceed with editing? (y/N) ', False):
sys.exit()
# Date delimiter handling
if data.date_delimiter != '-':
print('\nWarning: Non-standard date delimiter.')
data.date = data.date.replace('-', data.date_delimiter)
data.date_prompt = data.date_delimiter.join(['%Y', '%m', '%d'])
# Default to indentation by a single tab
if data.indentation is None:
data.indentation = ' '
if data.indentation != ' ':
print('\nWarning: Non-standard indentation.')
# Init date and currencies
if data.currencies == []:
data.last_currency = 'USD'
data.currencies.append('USD')
elif data.last_currency is None:
data.last_currency = data.currencies[0]
# Main data entry loop
if data.paranoid_write:
print('\nChanges are written to disk immediately.')
else:
print('\nChanges stay in memory until you [w]rite them out.')
print('Type `h` or `?` for a list of available commands.')
while True:
# Check journal modifications for paranoid write mode
if data.paranoid_write and not data.external_write and data.journal != data.saved:
cmd = 'w'
elif data.paranoid_write and data.external_write and externalize_transactions() != data.external_journal:
cmd = 'w'
elif data.eof:
cmd = 'q'
print()
data.eof = False
# Pass-through transaction entry
elif data.auto_new:
print('\nAdding a new transaction with today\'s date.\n')
cmd = 'n'
# Get a command
else:
# Unset completion
data.vocab = []
# Set history
set_history_context('command')
try:
cmd = input('\nEnter command (h for help) ').strip()
except (KeyboardInterrupt, EOFError):
print()
cmd = 'q'
# Write the journal
if cmd in ['w', 'wq']:
# Cache last-used statement accounts
if data.transaction_account != '' and data.funding_account != '':
if data.journal[0].startswith(';*bean-add* '):
# Replace cached statement accounts
if ' v' in data.journal[0]:
chunks = data.journal[0].split(' ')
chunks[chunks.index('v') + 1] = f'{data.transaction_account},{data.funding_account}'
data.journal[0] = ' '.join(chunks)
# Append cached statement accounts
else:
data.journal[0] += f' v {data.transaction_account},{data.funding_account}'
# Create empty option string with cached statement accounts
else:
data.journal.insert(0, f';*bean-add* v {data.transaction_account},{data.funding_account}')
if data.mtime != os.stat(data.file_name).st_mtime:
print('\nWARNING: The journal file was modified on disk after opening; The journal is most likely inconsistent.')
if not data.external_write and not confirm('Do you really want to overwrite it? (y/N) ', False):
continue
if data.external_write:
file_name = data.file_name + '.new'
data.external_journal = externalize_transactions()
journal = data.external_journal
else:
file_name = data.file_name
journal = data.journal
try:
with open(file_name, 'w') as source_file:
source_file.write('\n\n'.join(journal) + '\n')
print('\n%s written' % pluralize(len(journal), 'record'))
data.mtime = os.stat(data.file_name).st_mtime
except Exception as exc:
print('\nCould not write the file: %s' % exc)
continue
else:
if not data.external_write:
data.saved = data.journal.copy()
if cmd == 'wq':
sys.exit()
# Quit
elif cmd == 'q':
try:
if not data.external_write and data.journal != data.saved:
if not confirm('The journal file was modified. Do you really want to DESTROY ALL CHANGES and quit? (y/N) ', False):
continue
elif data.external_write:
journal = externalize_transactions()
modified = False
for _tx in journal:
if _tx not in data.external_journal:
modified = True
break
if modified:
if not confirm('There are unsaved records in the external file buffer. Do you really want to DESTROY ALL CHANGES and quit? (y/N) ', False):
continue
elif data.target != 0:
if not confirm('Statement verification in progress. Do you really want to DESTROY ALL CHANGES and quit? (y/N) ', False):
continue
# Treat a second CTRL-C as a confirmation
except (KeyboardInterrupt, EOFError):
pass
sys.exit()
# Run bean-check
elif cmd == 'c':
with open(tempfile, 'w') as tmp_file:
tmp_file.write('\n\n'.join(data.journal) + '\n')
try:
subprocess.call(['bean-check', tempfile])
except Exception as exc:
print('\nCould not start the validator: %s' % exc)
else:
print('\nValidation finished.')
os.unlink(tempfile)
# New transaction
elif cmd == 'n':
try:
read_normal_transaction()
except KeyboardInterrupt:
print('\nTransaction entry cancelled.')
except EOFError:
data.eof = True
# Unset pass-through transaction entry
if data.auto_new:
data.auto_new = False
# New transaction sequence
elif cmd == 'nn':
tx_count = 0
defaults = data.use_defaults
try:
read_normal_transaction()
# Set to auto-accept as much as possible
data.use_defaults = True
while True:
tx_count += 1
print('\nAdding transaction #%s\n' % (tx_count + 1))
read_normal_transaction(data.description)
except KeyboardInterrupt:
print('\nTransaction entry cancelled.')
except EOFError:
data.eof = True
finally:
print('\n%s added.' % pluralize(tx_count, 'transaction'))
# Restore original behavior
data.use_defaults = defaults
# New balance assertion
elif cmd == 'B':
try:
read_balance_transaction()
except KeyboardInterrupt:
print('\nTransaction entry cancelled.')
except EOFError:
data.eof = True
# New padding transaction
elif cmd == 'P':
try:
read_pad_transaction()
except KeyboardInterrupt:
print('\nTransaction entry cancelled.')
except EOFError:
data.eof = True
# New note transaction
elif cmd == 'N':
try:
read_note_transaction()
except KeyboardInterrupt:
print('\nTransaction entry cancelled.')
except EOFError:
data.eof = True
elif cmd == 'R':
try:
read_price_statement()
except KeyboardInterrupt:
print('\nTransaction entry cancelled.')
except EOFError:
data.eof = True
# Toggle flag
elif cmd == 'f':
toggle_flag()
# Toggle deep flag
elif cmd == 'ff':
try:
toggle_flag_with_legs()
except KeyboardInterrupt:
print('\nCancelled.')
except EOFError:
data.eof = True
# Edit tags
elif cmd == 't':
add_remove_tag()
# Seeking
elif cmd == 's':
seek_to_transaction()
elif cmd == 'sa':
seek_to_transaction(-1)
elif cmd == 'sd':
seek_to_transaction('+1')
elif cmd == 'sq':
seek_to_transaction(0)
elif cmd == 'se':
seek_to_transaction(len(data.journal) - 1)
# Lookup
elif cmd == 'l':
find_transactions()
elif cmd == 'ld':
find_transactions(1)
elif cmd == 'la':
find_transactions(-1)
elif cmd == 'le':
find_transactions(True)
elif cmd == 'lq':
find_transactions(False)
# Remove record
elif cmd == 'r':
remove_transaction(False)
# Unremove record
elif cmd == 'u':
if len(data.restore) > 0:
insert_transaction(data.restore.pop())
else:
print('\nNothing to undo, the restore buffer is empty.')
# Duplicate record
elif cmd == 'd':
try:
duplicate_transaction()
except KeyboardInterrupt:
print('\nCancelled.')
except EOFError:
data.eof = True
# External edit
elif cmd == 'e':
edit_transaction()
# Show context
elif cmd == 'j':
print_journal()
# Show changes
elif cmd == 'jj':
print_journal_diff()
# Tally account balances
elif cmd == 'b':
print_account_balances()
# Estimate statement amount
elif cmd == 'bv':
estimate_statement_amount()
# Begin statement verification
elif cmd == 'v':
# Disable marking of current record
txid_cache = data.txid
data.txid = -1
verify_statement()
# Restore marking of current record
data.txid = txid_cache
# Show options
elif cmd == 'o':
print('Available option commands:')
print('oo write current options to the journal')
print('os sort transactions by date (%s)' % ('enabled' if data.sort_by_date else 'disabled'))
print('oa reuse last used accounts in new transactions (%s)' % ('ask' if data.use_defaults is None else 'always' if data.use_defaults else 'never'))
print('ow write journal after every change (%s)' % ('enabled' if data.paranoid_write else 'disabled'))
print('oe write changes to an external file (%s)' % ('enabled' if data.external_write else 'disabled'))
print('od preview non-absoulute dates during input (%s)' % ('enabled' if data.date_preview else 'disabled'))
print('of automatically flag transactions containing (%s)' % (None if data.auto_flag == [] else ', '.join(data.auto_flag)))
print('ol lookup result to seek to (%s)' % ('none' if data.lookup_seek is None else 'last' if data.lookup_seek else 'first'))
print('oc colorize output (%s)' % ('enabled' if data.colors else 'disabled'))
print('oq quote currency for all conversions (%s)' % ('none' if data.quote_currency == '' else data.quote_currency))
precision_string = ', '.join(['%s:%s' % (k, v) for k, v in data.precisions.items()])
print('op custom precision for commodities (%s)' % ('none' if precision_string == '' else precision_string))
print('ob use Beancount to load account names (%s)' % ('enabled' if data.use_beancount_accounts else 'disabled'))
# Option: sort by date
elif cmd == 'os':
data.sort_by_date = not data.sort_by_date
print('\nSorting by date is now %s.' % ('enabled' if data.sort_by_date else 'disabled'))
# Option: accept defaults
elif cmd == 'oa':
data.use_defaults = True if data.use_defaults is None else False if data.use_defaults else None
print('\nThe policy on reusing accounts for new transactions is now `%s`.' % ('ask' if data.use_defaults is None else 'always' if data.use_defaults else 'never'))
# Option: paranoid write
elif cmd == 'ow':
data.paranoid_write = not data.paranoid_write
print('\nWriting journal after every change is now %s.' % ('enabled' if data.paranoid_write else 'disabled'))
# Option: external write
elif cmd == 'oe':
data.external_write = not data.external_write
print('\nWriting changes to an external file is now %s.' % ('enabled' if data.external_write else 'disabled'))
# Option: date preview
elif cmd == 'od':
data.date_preview = not data.date_preview
print('\nPreviewing of non-absolute dates during input is now %s.' % ('enabled' if data.date_preview else 'disabled'))
# Option: output colorization
elif cmd == 'oc':
data.colors = not data.colors
print('\nColorizing of output is now %s.' % ('enabled' if data.colors else 'disabled'))
# Option: Beancount account loading
elif cmd == 'ob':
data.use_beancount_accounts = not data.use_beancount_accounts
print('\nUsing Beancount to load account names is now %s.' % ('enabled' if data.use_beancount_accounts else 'disabled'))
print('Save the options to the journal file with `oo` for it to take effect on the next journal load.')
# Option: auto-flagging of new records
elif cmd == 'of':
try:
_account = read_account()
if _account is None:
raise KeyboardInterrupt
if _account in data.auto_flag:
print('Removed from auto-flag list.')
data.auto_flag.remove(_account)
else:
print('Added to auto-flag list.')
data.auto_flag.append(_account)
data.auto_flag.sort()
print('\nAccounts to trigger automatic flagging:\n %s' % (None if data.auto_flag == [] else '\n '.join(data.auto_flag)))
except KeyboardInterrupt:
print('\nCancelled.')
except EOFError:
data.eof = True
# Option: auto-seek to a lookup result
elif cmd == 'ol':
data.lookup_seek = True if data.lookup_seek is None else False if data.lookup_seek else None
print('\nSeeking to lookup result is set to %s.' % ('none' if data.lookup_seek is None else 'last' if data.lookup_seek else 'first'))
# Option: default quote currency
elif cmd == 'oq':
try:
print('Enter new quote currency for all conversions, or leave blank to disable.')
data.quote_currency = read_currency()
print('\nQuote currency set to %s.' % ('None' if data.quote_currency == '' else data.quote_currency))
data.prices = {}
for tx in data.journal:
for line in tx.split('\n'):
deduce_commodity_price(line)
except KeyboardInterrupt:
print('\nCancelled.')
except EOFError:
data.eof = True
# Option: custom precisions
elif cmd == 'op':
try:
currency = read_currency()
precision = input(f'Enter precision [{data.default_precision}]: ')
if precision == '':
precision = data.default_precision
precision = int(precision)
if precision == data.default_precision:
if currency in data.precisions:
del data.precisions[currency]
print(f'\nPrecision for {currency} reset to default.')
else:
data.precisions[currency] = precision
print(f'\nPrecision for {currency} set to {precision} decimals.')
except (KeyboardInterrupt, ValueError):
print('\nCancelled.')
except EOFError:
data.eof = True
# Save options to the journal
elif cmd == 'oo':
# Generate the option string
optstring = ';*bean-add*'
optstring += '' if data.sort_by_date else ' s'
optstring += '' if data.use_defaults is None else ' a' if data.use_defaults else ' aa'
optstring += ' w' if data.paranoid_write else ''
optstring += ' e' if data.external_write else ''
optstring += '' if data.date_preview else ' d'
optstring += '' if data.lookup_seek is None else ' l' if data.lookup_seek else ' ll'
optstring += ' c' if data.colors else ''
optstring += ' q ' + data.quote_currency if data.quote_currency != '' else ''
optstring += ' b' if data.use_beancount_accounts else ''
for _account in data.auto_flag:
optstring += ' f %s' % _account
for currency, precision in data.precisions.items():
optstring += f' p {currency}:{precision}'
if data.transaction_account != '' and data.funding_account != '':
optstring += f' v {data.transaction_account},{data.funding_account}'
# Apply the option string
if data.journal[0].startswith(';*bean-add* '):
data.journal[0] = optstring
else:
data.journal.insert(0, optstring)
# Delete all other option strings
for _tx in data.journal[1:]:
if _tx.startswith(';*bean-add* '):
data.journal.remove(_tx)
print('\nOptions saved to the journal as record 0.')
# Help
elif cmd == 'h' or cmd == '?':
print('''Available commands:
n new transaction
nn add a sequence of near-identical transactions
d duplicate the current transaction at another date
f toggle transaction flag
ff flag a transaction including its legs
t add or remove tags
s seek to another record
sa seek one record forward
sd seek one record backwards
sq seek to the first record
se seek to the last record
l look up records containing a string
ld go one lookup result forward
la go one lookup result backwards
le go to the last lookup result
lq go to the first lookup result
j show the context of the current record
jj show changes from saved journal file
b show the balance of an account
bv estimate the amount of the next statement
r remove the current record
u undo record removal
e edit the current record with your preferred editor
v verify a statement
B new balance assertion
P new pad statement
N new note statement
R new price statement
c run bean-check on the journal file
o view or change options
w write journal file
q quit
wq write journal file and quit immediately
h,? show this help message''')
# Repeat the prompt hint
elif cmd == '':
print('\nType `h` or `?` for a list of available commands.')
# Say what
else:
print('\nUnknown command.')