#!/usr/bin/env python3
# MiniPOS - A self-hosted, 0-confirmation Bitcoin Cash point-of-sale server
# Author: Simon Volpert <simon@simonvolpert.com>
# Project page: https://simonvolpert.com/minipos/
# This program is free software, released under the Apache License, Version 2.0. See the LICENSE file for more information
# Consult the README file for usage instructions and other helpful hints
import os
import sys
from wsgiref.simple_server import make_server
import urllib.parse
import datetime
import qrcode
import io
import random
import base64
import threading
import logging
# Local library files
import bch
import sendmail
import tridenticon
# Useful constants
NORMAL_FEE = 1.0
LOW_FEE = 0.5
usage = '''Usage: minipos [DATA_DIRECTORY]
See the README file for more information.'''
config = {
'addresses': [],
'lock': {
'@': None,
},
'cache': [],
}
binary_extensions = ['png', 'jpg', 'gif', 'ico']
mime_types = {
'txt': 'text/plain',
'css': 'text/css',
'js': 'text/javascript',
'png': 'image/png',
'jpg': 'image/jpeg',
'gif': 'image/gif',
'svg': 'image/svg+xml',
'ico': 'image/x-icon',
}
# A list of files which will always be served regardless of access control settings
file_whitelist = ['style.css', 'logo.svg', 'logo.png', 'logo.gif', 'logo.jpg', 'favicon.png', 'favicon.ico']
# Set up logging
logger = logging.getLogger()
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.DEBUG)
handler.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s'))
logger.addHandler(handler)
logger.setLevel(logging.INFO)
# Look for the directory containing the configuration files
lib_dir = os.path.dirname(os.path.abspath(__file__))
data_dir_locations = [
os.path.join(os.path.expanduser('~'), '.minipos'),
os.path.join(os.path.expanduser('~'), '.config', 'minipos'),
lib_dir,
os.getcwd()
]
if len(sys.argv) > 1:
if sys.argv[1] == '-h' or sys.argv[1] == '--help':
print(usage)
sys.exit(0)
else:
data_dir_locations.insert(0, os.path.abspath(sys.argv[1]))
if not os.path.isdir(data_dir_locations[0]):
print('No such directory: ' + data_dir_locations[0])
for data_dir in data_dir_locations:
try:
os.chdir(data_dir)
except (OSError, NotADirectoryError):
continue
if os.path.isfile('minipos.cfg'):
logging.info('Using {} as data directory'.format(data_dir))
break
# Load the config file
try:
with open('minipos.cfg', 'r', encoding='UTF-8') as f:
lines = f.readlines()
except (IOError, OSError, FileNotFoundError, PermissionError) as error:
logging.warning('Could not open configuration file, using default settings ({})'.format(error))
lines = []
for line in lines:
# Skip blank lines and comments
if line.strip() == '' or line.startswith('#'):
continue
# Split to key and value pairs
words = line.strip().split('=')
key = words[0].strip()
value = '='.join(words[1:]).strip()
# Skip empty values
if value == '':
continue
elif key in ['sightings', 'index']:
logging.warning('The "{}" option is depreciated'.format(key))
continue
if key == 'address':
config['addresses'].append(value)
else:
config[key] = value
# Read the auxillary address list, if present
try:
with open('address.list', 'r') as f:
lines = f.readlines()
except (IOError, OSError, FileNotFoundError, PermissionError):
lines = []
for line in lines:
_addr = line.strip()
if _addr == '':
continue
elif _addr in config['addresses']:
logging.warning('Discarding duplicate address {}'.format(_addr))
continue
config['addresses'].append(_addr)
# Utility config parsing functions
def cast_config_type(key, _type, default):
try:
config[key] = _type(config[key])
except (KeyError, ValueError):
config[key] = default
def clamp_config_value(_name, _min=None, _max=None, default=0):
if _min is not None and config[_name] < _min:
logging.warning('Invalid "{}" value, falling back to {}'.format(_name, default))
config[_name] = default
elif _max is not None and config[_name] > _max:
logging.warning('Invalid "{}" value, falling back to {}'.format(_name, default))
config[_name] = default
def split_config_key(key, default):
if key not in config:
config[key] = [default]
else:
config[key] = config[key].split(',')
def pick_config_list(key, value_list):
if key not in config:
config[key] = value_list[0]
else:
if config[key] not in value_list:
logging.warning('Invalid "{}" value, falling back to "{}"'.format(key, value_list[0]))
config[key] = value_list[0]
# Sanitize the config file
cast_config_type('taxrate', float, 0)
cast_config_type('port', int, 8080)
cast_config_type('propagation', int, 60)
clamp_config_value('propagation', 0, 100, 60)
cast_config_type('welcome_timeout', int, 120)
clamp_config_value('welcome_timeout', 0, None, 120)
split_config_key('currencies', 'USD')
split_config_key('allowed_ips', '127.0.0.1')
pick_config_list('unit', ['native', 'bits', 'cash', 'satoshi'])
pick_config_list('payment_return', ['request', 'welcome'])
pick_config_list('log_order', ['ascending', 'descending'])
if 'week_start' not in config or config['week_start'] == 'monday':
config['week_start'] = 0
elif config['week_start'] == 'sunday':
config['week_start'] = 1
else:
logging.warning('Invalid "week_start" value, falling back to "monday"')
config['week_start'] = 0
if 'label' not in config or config['label'] == '':
config['label'] = 'MiniPOS'
exchange_list = []
for e in bch.exchanges:
exchange_list.append(e['name'])
if 'price_source' not in config or config['price_source'] not in exchange_list:
logging.info('Using default exchange rate source "{}"'.format(exchange_list[0]))
config['price_source'] = exchange_list[0]
if 'custom_unit_satoshi' in config:
config['custom_unit_satoshi'] = True if config['custom_unit_satoshi'].lower() in ['1', 'yes', 'on', 'true'] else False
config['auto_cents'] = True if 'auto_cents' in config and config['auto_cents'].lower() in ['1', 'yes', 'on', 'true'] else False
config['fingerprinting'] = True if 'fingerprinting' in config and config['fingerprinting'].lower() in ['1', 'yes', 'on', 'true'] else False
config['receive_notify'] = True if 'receive_notify' in config and config['receive_notify'].lower() in ['1', 'yes', 'on', 'true'] else False
# Prune meaningless values from configuration
config['allowed_ips'] = set(config['allowed_ips'])
if '127.0.0.1' in config['allowed_ips']:
config['allowed_ips'].remove('127.0.0.1')
# Try to set up a custom block explorer
custom_explorer = None
try:
custom_explorer = {
'name': '.'.join(config['custom_explorer_url'].split('/')[2].split('.')[-2:]),
'url': config['custom_explorer_url'],
'tx_url': config['custom_tx_url'],
'balance_key': config['custom_balance_key'],
'confirmed_key': config['custom_confirmed_key'],
'unconfirmed_key': config['custom_unconfirmed_key'],
'last_tx_key': config['custom_last_tx_key'],
'tx_time_key': config['custom_tx_time_key'],
'tx_inputs_key': config['custom_tx_inputs_key'],
'tx_in_double_spend_key': config['custom_tx_in_double_spend_key'],
'tx_outputs_key': config['custom_tx_outputs_key'],
'tx_out_value_key': config['custom_tx_out_value_key'],
'tx_out_address_key': config['custom_tx_out_address_key'],
'tx_double_spend_key': config['custom_tx_double_spend_key'],
'tx_fee_key': config['custom_tx_fee_key'],
'tx_size_key': config['custom_tx_size_key'],
'unit_satoshi': config['custom_unit_satoshi'],
'prefixes': config['custom_prefixes'],
}
for key in custom_explorer:
if custom_explorer[key].lower() == 'none':
custom_explorer[key] = None
bch.explorers.insert(0, custom_explorer)
custom_explorer = custom_explorer['name']
logging.info('Using custom explorer definition: {}'.format(custom_explorer))
except KeyError as error:
if str(error) != "'custom_explorer_url'":
logging.warning('Missing key in custom explorer definition: {}'.format(error))
# Write cached address list to address.list
def write_address_list():
address_list = []
# Free receiving addresses
for entry in config['addresses']:
if type(entry) is tuple:
address_list.append('{} {}'.format(*entry))
else:
address_list.append(entry)
# Locked receiving addresses
for entry in config['lock'].values():
if type(entry) is dict:
if 'index' in entry:
address_list.append('{} {}'.format(entry['address'], entry['index']))
else:
address_list.append(entry['address'])
# Perform the write
try:
with open(os.path.join(data_dir, 'address.list'), 'w') as f:
f.write('\n'.join(address_list) + '\n')
except (IOError, OSError, PermissionError) as error:
logging.error('Could not write address.list: {}'.format(error))
# Utility wrapper function
def load_file(filename, override=False, null=False):
extension = filename.split('.')[-1]
file_mode = 'rb' if extension in binary_extensions else 'r'
encoding = 'UTF-8' if extension not in binary_extensions else None
if extension == 'html':
directory = 'templates'
elif extension == 'log':
directory = 'logs'
else:
directory = 'assets'
if override or directory == 'logs':
try:
with open(os.path.join(data_dir, directory, filename), file_mode, encoding=encoding) as src:
return src.read()
except (IOError, OSError, PermissionError):
pass
try:
with open(os.path.join(lib_dir, directory, filename), file_mode, encoding=encoding) as src:
return src.read()
except (IOError, OSError, PermissionError):
if null:
return ''
raise
# Cast amount into preferred units
def format_amount(amount):
token = 'BCH' if config['unit'] == 'native' else config['unit']
if config['unit'] in ['bits', 'cash']:
amount = bch.bits(float(amount))
elif config['unit'] == 'satoshi':
amount = str(int(float(amount) * 100000000))
return amount, token
# Generate a new receiving address and add it in the cache
def generate_new_address():
address = bch.generate_address(config['xpub'], config['index'])
logging.debug('Generated new address {} with derivation index {}'.format(address, config['index']))
config['addresses'].append((address, config['index']))
config['index'] += 1
# Create a payment request QR page
def create_invoice(parameters):
if 'currency' not in parameters:
parameters['currency'] = config['currencies']
currency = parameters['currency'][0]
divider = 100 if config['auto_cents'] else 1
fiat = float(parameters['amount'][0]) / divider
if fiat <= 0.0:
raise ValueError('Requested amount must be positive')
# Check for address lock timeouts
for k in list(config['lock']):
if k != '@' and check_lock_timeout(k):
logging.info('Payment request {} timed out'.format(k))
unlock_address(k)
# Use the next available address
if config['addresses'] == []:
if 'xpub' in config:
generate_new_address()
write_address_list()
else:
return load_file('noaddrs.html')
address = config['addresses'].pop(0)
# Generate a lock tag
tag = ''
for i in range(7):
tag += random.choice('0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ')
# Lock the address
lock_address(tag)
request = config['lock'][tag]
if type(address) is tuple:
request['address'], request['index'] = address
address = request['address']
else:
request['address'] = address
# Get the exchange rate
try:
price = bch.get_price(currency, exchange=config['price_source'])
except KeyboardInterrupt:
raise
except:
logging.error(sys.exc_info()[1])
return load_file('timeout.html')
# Calculate amount
amount = bch.btc(fiat / price)
if float(amount) > 20999950:
raise ValueError('Requested amount is greater than logically possible')
# Get current address state
try:
txid = bch.get_last_txid(address, explorer=custom_explorer)
except KeyboardInterrupt:
raise
except:
logging.error(sys.exc_info()[1])
unlock_address(tag)
return load_file('timeout.html')
request['seen_txids'] = [] if txid is None else [txid]
request['amount'] = amount
request['fiat'] = bch.fiat(fiat)
request['currency'] = currency
# Generate the invoice URI and QR code
logging.info('New invoice {tag}: {amount} BCH ({fiat} {currency}) to {address}'.format(tag=tag, **request))
label = urllib.parse.quote('%s ID:%s' % (config['label'], tag))
data = 'bitcoincash:{addr}?amount={amt}&message={label}'.format(addr=address, amt=amount, label=label)
image = qrcode.make(data, box_size=7, error_correction=qrcode.constants.ERROR_CORRECT_L)
# Overlay a visual fingerprint over the QR code
if 'hash' in config:
icon = config['hash']
image = image.get_image().convert('RGB')
image_width, image_height = image.size
icon_width, icon_height = icon.size
image.paste(icon, (image_width // 2 - icon_width // 2, image_height // 2 - icon_height // 2))
# Convert QR code into text data
output = io.BytesIO()
image.save(output, format='PNG')
output = base64.b64encode(output.getvalue()).decode('UTF-8')
amt, token = format_amount(amount)
filler = {
'addr': address,
'amt': amt,
'token': token,
'qr': output,
'request': data,
'fiat': bch.fiat(fiat),
'cur': currency,
'price': bch.fiat(price),
'tag': tag,
'return': config['payment_return'],
'label': config['label'],
}
# Load user template override
invoice_text = load_file('invoice_text.html', override=True)
if '<script' in invoice_text.lower():
logging.warning('Script detected in user invoice_text.html, override rejected')
invoice_text = load_file('invoice_text.html')
filler['text'] = invoice_text.format_map(filler)
page = load_file('invoice.html').format_map(filler)
return page
# API check if a payment was received
def check_payment(parameters):
# Responses:
# 0 - not yet received
# 1 - payment detected (with txid)
# 2 - payment request timed out
# 3 - server connection error
# 4 - client connection error
# 5 - double spend detected (with txid)
# 6 - low fee (with txid)
tag = parameters['id'][0]
if tag not in config['lock']:
return '2'
# Update address lock
if check_lock_timeout(tag):
logging.info('Payment request {} timed out'.format(tag))
unlock_address(tag)
return '2'
lock_address(tag)
# Check address state
request = config['lock'][tag]
address = request['address']
amount = float(request['amount'])
# No previously detected transaction
if 'txid' not in request:
try:
txid = bch.get_last_txid(address)
except KeyboardInterrupt:
raise
except:
logging.warning('Could not fetch address info: {}'.format(sys.exc_info()[1]))
return '3'
# No new transactions
if txid is None:
return '0'
# Previously seen transaction
elif txid in request['seen_txids']:
return '0'
elif txid in config['cache']:
return '0'
# New transaction
try:
tx = bch.TxInfo(txid, explorer=bch.explorers[-1]['name'])
except KeyboardInterrupt:
raise
except bch.TxNotFoundError:
logging.warning('Anomalous event: tx not found on reporting explorer')
return '0' # TODO anomaly!
except:
logging.warning('Could not fetch address info: {}'.format(sys.exc_info()[1]))
return '3'
# Transaction is known-old
if tx.time < request['ctime']:
logging.info('Ignoring old tx {}'.format(txid))
request['seen_txids'].append(txid)
return '0'
elif address not in tx.outputs:
logging.info('Ignoring mis-addressed tx {}'.format(txid)) # TODO anomaly!
request['seen_txids'].append(txid)
return '0'
# Wrong transaction amount
elif tx.outputs[address] != amount:
logging.info('Ignoring tx with wrong amount {}'.format(txid))
request['seen_txids'].append(txid)
return '0'
# All checks passed make note of the transaction
request['txid'] = txid
logging.info('Payment {} detected'.format(tag))
# Double spend check
if tx.double_spend:
logging.warning('Double spend detected, waiting for confitmation')
request['wait_confirm'] = True
return '5 ' + txid
# Check transaction fee
if tx.fee_per_byte >= NORMAL_FEE:
logging.debug('Tx has sufficient fee ({:.3f}), skipping propagation check'.format(tx.fee_per_byte))
else:
if tx.fee_per_byte < LOW_FEE:
logging.debug('Tx has low fee ({:.3f})'.format(tx.fee_per_byte))
request['low_fee'] = True
# Propagation check needed, defer processing
if config['propagation'] > 0:
return '0'
logging.debug('Skipping propagation check by user config')
# Previously detected transaction
else:
# Currently waiting for confirmation
if 'wait_confirm' in request:
try:
tx = bch.TxInfo(txid)
except KeyboardInterrupt:
raise
except:
logging.warning('Could not get transaction info: {}'.format(sys.exc_info()[1]))
return '3'
if tx.confirmations == 0:
if tx.double_spend:
return '5 ' + txid
return '0'
# Not currently waiting for confirmation
else:
txid = request['txid']
try:
propagation, double_spend = bch.get_tx_propagation(txid, threshold=config['propagation'], stop_on_double_spend=True)
except KeyboardInterrupt:
raise
except:
logging.warning('Could not get propagation information: {}'.format(sys.exc_info()[1]))
return '3'
# Is double spend
if double_spend:
request['wait_confirm'] = True
return '5 ' + txid
# Low propagation
elif propagation < config['propagation']:
if 'low_fee' in request:
return '6 ' + txid
return '0'
# Record the payment
record_payment(tag)
config['cache'].append(txid)
unlock_address(tag)
# Remove this address from future use if generated
if 'xpub' in config:
config['addresses'].remove((address, request['index']))
logging.debug('Removing used address {}'.format(address))
# Generate a new address if on the last derivation index to keep the index saved
if request['index'] + 1 == config['index']:
generate_new_address()
write_address_list()
return '1 ' + txid
# Write the details of the payment to a log file
def record_payment(tag):
logging.info('Payment {} received'.format(tag))
request = config['lock'][tag]
log_dir = os.path.join(data_dir, 'logs')
if not os.path.isdir(log_dir):
os.mkdir(log_dir)
logfile = os.path.join(log_dir, datetime.date.today().isoformat() + '.log')
with log_lock:
with open(logfile, 'a') as log:
log.write('{date} {address} {amount} {fiat} {currency} {tag} {txid}\n'.format(date=datetime.datetime.now().isoformat(), tag=tag, **request))
# Send a notification
if config['receive_notify'] and 'email' in config:
subject = '[MiniPOS] New payment received, ID: {}'.format(tag)
_amount, token = format_amount(request['amount'])
body = '''Date and time: {date}
Address: {address}
Amount: {amt} {token}
Amount (fiat): {fiat} {currency}
TxID: https://bch.btc.com/{txid}'''.format(date=datetime.datetime.now().isoformat().split('.')[0].replace('T', ' '), token=token, amt=_amount, **request)
sendmail.background_send(config, config['email'], subject, body)
# Lock an address to prevent concurrent access
def lock_address(tag):
if tag not in config['lock']:
config['lock'][tag] = {
'ctime': datetime.datetime.now()
}
config['lock'][tag]['time'] = datetime.datetime.now()
# Free address lock
def unlock_address(tag):
if tag in config['lock']:
request = config['lock'][tag]
if 'index' in request:
config['addresses'].append((request['address'], request['index']))
else:
config['addresses'].append(request['address'])
del(config['lock'][tag])
# Check address lock timeout
def check_lock_timeout(tag):
if tag not in config['lock']:
return True
delta = datetime.datetime.now() - config['lock'][tag]['time']
if delta >= datetime.timedelta(seconds=60):
return True
return False
# Parse a log file and add its contents to the table
def read_log_file(filename, plaintext=False, txids=False, reverse_order=False):
if plaintext and txids:
raise RuntimeError('read_log_file: the "plaintext" and "txids" parameters are incompatible')
table = [] if txids else ''
totals = {}
try:
logfile = open(os.path.join(data_dir, filename), 'r')
except:
if sys.exc_info()[0] not in [IOError, OSError, FileNotFoundError, PermissionError]:
logging.warning(sys.exc_info()[1])
return totals, table
try:
for line in logfile.readlines():
line = line.strip().split(' ')
while len(line) < 6:
line.append('')
date, address, amount, fiat, tag, txid = line
amount, token = format_amount(amount)
fiat, currency = fiat.split(' ')
if currency not in totals:
totals[currency] = 0
totals[currency] += float(fiat)
date = date.split('.')[0].replace('T', ' ')
if txids:
if txid != '':
table.append(txid)
elif plaintext:
line = '{date} {amt} {fiat} {cur} {tag}\n Address: {addr}\n TxID: {txid}\n'.format(date=date, addr=address, amt=str(amount).rjust(17 + len(token)), fiat=str(fiat).rjust(15), cur=currency, tag=tag, txid=txid)
if reverse_order:
table = line + table
else:
table += line
else:
line = '''<tr class="%STYLE%">
<td><a id="toggle%ROW%" href="javascript:toggleRow(%ROW%);">+</a></td>
<td>{date}</td><td>{fiat} {cur}</td><td>{tag}</td>
</tr>
<tr class="%STYLE% expand" id="row%ROW%">
<td colspan="4"><strong>Address:</strong> <a href="https://bch.btc.com/{addr}" class="address" target="_blank"><img class="icon" src="icon.svg" alt="">{addr}</a><br>
<strong>Amount:</strong> <span>{amt} {token}</span><br>
<strong>TxID:</strong> <span class="txid"><a href="https://bch.btc.com/{txid}" target="_blank">{txid}</a></span></td></tr>\n'''.format(date=date, amt=amount, fiat=fiat, cur=currency, tag=tag, token=token, addr=address, txid=txid)
if reverse_order:
table = line + table
else:
table += line
except:
logging.warning('Log file is corrupted: {file} ({error})'.format(file=filename, error=sys.exc_info()[1]))
msg = 'The log file for {file} is corrupted!'.format(file=filename.split('/')[1].split('.')[0])
if not plaintext:
msg = '<tr class="%STYLE%"><td colspan="5" class="error">' + msg + '</td></tr>'
if txids:
pass
else:
if reverse_order:
table = msg + table
else:
table += msg
logfile.close()
return totals, table
# Display a log of recent transactions
def show_logs(parameters, plaintext=False):
if 'date' not in parameters:
date = datetime.date.today().isoformat()
else:
date = parameters['date'][0]
# Process the current and calculate next and previous date
days = []
tally = False
# Day scope
if len(date) == 10:
d = datetime.datetime.strptime(date, '%Y-%m-%d')
delta = datetime.timedelta(1)
next_date = (d + delta).date().isoformat()
prev_date = (d - delta).date().isoformat()
tag_s = 'W'
scope_s = '%s-W%02d' % d.isocalendar()[0:2]
tag_m = 'M'
scope_m = '%s-%s' % (d.year, str(d.month).zfill(2))
tag_l = 'Y'
scope_l = str(d.year)
days = [date]
# Week scope
elif len(date) == 8:
# Convert ISO week to Python date
_year = int(date[0:4])
_week = int(date[6:8])
ref_date = datetime.date(_year, 1, 4)
ref_week, ref_day = ref_date.isocalendar()[1:3]
d = (ref_date + datetime.timedelta(days=1-ref_day, weeks=_week-ref_week))
# Calculate offsets
delta = datetime.timedelta(7)
next_date = '%s-W%02d' % (d + delta).isocalendar()[0:2]
prev_date = '%s-W%02d' % (d - delta).isocalendar()[0:2]
tag_s = 'D'
scope_s = (d + datetime.timedelta(3)).isoformat()
tag_m = 'M'
scope_m = '%s-%s' % (d.year, str(d.month).zfill(2))
tag_l = 'Y'
scope_l = str(d.year)
# Populate date list
for i in range(7):
days.append((d + datetime.timedelta(i - config['week_start'])).isoformat())
# Month scope
elif len(date) == 7:
d = datetime.datetime.strptime(date, '%Y-%m')
if d.month == 12:
year, month = d.year + 1, 1
else:
year, month = d.year, d.month + 1
next_date = '%s-%s' % (year, str(month).zfill(2))
if d.month == 1:
year, month = d.year - 1, 12
else:
year, month = d.year, d.month - 1
prev_date = '%s-%s' % (year, str(month).zfill(2))
tag_s = 'D'
scope_s = '%s-%s-15' % (d.year, str(d.month).zfill(2))
tag_m = 'W'
scope_m = '%s-W%02d' % (d + datetime.timedelta(15)).isocalendar()[0:2]
tag_l = 'Y'
scope_l = str(d.year)
# Populate date list
_date = datetime.date(d.year, d.month, 1)
while _date.month == d.month:
days.append(_date.isoformat())
_date += datetime.timedelta(1)
tally = True
# Year scope
elif len(date) == 4:
d = datetime.datetime.strptime(date, '%Y')
next_date = str(d.year + 1)
prev_date = str(d.year - 1)
tag_s = 'D'
scope_s = '%s-06-15' % d.year
tag_m = 'W'
scope_m = '%s-W26' % d.year
tag_l = 'M'
scope_l = '%s-06' % d.year
# Populate date list
_date = datetime.date(d.year, 1, 1)
while _date.year == d.year:
days.append(_date.isoformat())
_date += datetime.timedelta(1)
tally = True
else:
raise ValueError
# Create a transaction table and calculate totals
if plaintext:
page = '===== Summary for {date} ====='.format(date=date)
else:
page = load_file('logs.html')
table = ''
table_head = ''
table_foot = ''
summary = ''
totals = {}
# Compile transaction table and calculate date totals
reverse_order = config['log_order'] == 'descending'
if reverse_order:
days.reverse()
for _date in days:
_totals, _table = read_log_file(os.path.join('logs', _date + '.log'), plaintext=plaintext, reverse_order=reverse_order)
if tally:
_amounts = []
for k, v in _totals.items():
_amounts.append('{} {}'.format(bch.fiat(v), k))
if plaintext:
_amounts = ', '.join(_amounts)
tx_count = _table.count('TxID:')
else:
_amounts = '<br>'.join(_amounts)
tx_count = _table.count('javascript:toggleRow(')
if tx_count > 0:
plural = ' ' if str(tx_count).endswith('1') else 's'
if plaintext:
chunk = '{} transaction{}'.format(tx_count, plural)
table += '{} {:>17} {}\n'.format(_date, chunk, _amounts)
else:
table += '<tr class="%STYLE%"><td><a href="logs?date={date}">{date}</a></td><td>{count} transaction{plural}</td><td>{amounts}</td></tr>\n'.format(date=_date, count=tx_count, plural=plural, amounts=_amounts)
else:
table += _table
for k in _totals.keys():
if k in totals:
totals[k] += _totals[k]
else:
totals[k] = _totals[k]
for sign in totals.keys():
if totals[sign] != 0:
if plaintext:
chunk = '{fiat} {cur}\n'
else:
chunk = '<p>{fiat} {cur}</p>\n'
summary += chunk.format(fiat=bch.fiat(totals[sign]), cur=sign)
# Format and return the logs page
if table != '':
if plaintext:
_, token = format_amount(0)
if tally:
table_head = '== Date ==||===== Count =====||===== Amount (fiat) ====='
else:
table_head = '=== Date & Time ===||=== Amount ({token}) ===||== Amount (fiat) ==||== ID =='.format(token=token)
else:
if tally:
table_head = '<h2>Transactions:</h2>\n<table class="listing">\n<tr><th>Date</th><th>Count</th><th>Amount (fiat)</tr></tr>\n'
else:
table_head = '<h2>Transactions:</h2>\n<table class="listing">\n<tr><th></th><th>Date and time</th><th>Amount</th><th>ID</th></tr>\n'
table_foot = '</table>\n'
else:
if plaintext:
summary = 'No transactions.'
else:
summary = '<p>No transactions.</p>'
if plaintext:
return '\n'.join([page, summary, table_head, table])
else:
# Load print header and footer
header = load_file('log_header.html', override=True, null=True)
footer = load_file('log_footer.html', override=True, null=True)
row_count = 1
style = 'odd'
if tally:
while '%STYLE%' in table:
table = table.replace('%STYLE%', style, 1)
style = 'even' if style == 'odd' else 'odd'
else:
while '%ROW%' in table:
table = table.replace('%ROW%', str(row_count), 3)
table = table.replace('%STYLE%', style, 2)
style = 'even' if style == 'odd' else 'odd'
row_count += 1
table = table_head + table + table_foot
# Pack the above variables into a filler dict
label = config['label']
params = {}
_names = ['date', 'prev_date', 'tag_s', 'scope_s', 'tag_m', 'scope_m', 'tag_l', 'scope_l', 'next_date', 'header', 'summary', 'table', 'footer', 'label']
for n in _names:
params[n] = locals()[n]
return page.format_map(params)
# Serve a static file or return a 404
def serve_static_file(request):
status = '200 OK'
headers = [('Content-type', 'text/html; charset=UTF-8')]
# Handle specific content-types
extension = request.split('.')[-1]
if extension in mime_types:
headers = [('Content-Type', mime_types[extension])]
override = False if request.split('.') == 'js' else True
# Try to load the requested file
try:
page = load_file(request, override=override)
except:
headers = [('Content-type', 'text/html; charset=UTF-8')]
status = '404 Not Found'
page = load_file('error.html').format(err=status)
logging.warning(sys.exc_info()[1])
return status, headers, page
# Set the log mailing return status
def email_sent(success):
config['lock']['@'] = success
# Main webapp function
def minipos(environ, start_response):
headers = [('Content-type', 'text/html; charset=UTF-8')]
status = '200 OK'
page = ''
filler = ()
if 'HTTP_X_REAL_IP' in environ:
ip_addr = environ['HTTP_X_REAL_IP']
else:
ip_addr = environ['REMOTE_ADDR']
subnet = '.'.join(ip_addr.split('.')[0:3]) + '.0'
request = environ['PATH_INFO'].lstrip('/').split('/')[-1]
if ip_addr != '127.0.0.1' and '0.0.0.0' not in config['allowed_ips'] and ip_addr not in config['allowed_ips'] and subnet not in config['allowed_ips'] and request not in file_whitelist:
status = '403 Not Allowed'
page = load_file('error.html').format(err=status)
start_response(status, headers)
return [bytes(page, 'UTF-8')]
parameters = urllib.parse.parse_qs(environ['QUERY_STRING'])
# Handle specific app pages
if request == 'invoice':
try:
page = create_invoice(parameters)
except ValueError:
status = '303 See Other\nLocation: request'
page = 'Redirecting...'
except:
if sys.exc_info()[0] is KeyError:
logging.error('Missing required GET argument: {}'.format(sys.exc_info()[1]))
else:
logging.error(sys.exc_info()[1])
status = '400 Bad Request'
page = load_file('error.html').format(err=status)
elif request == 'check':
if 'id' not in parameters:
status = '400 Bad Request'
page = load_file('error.html').format(err=status)
else:
tag = parameters['id'][0]
# Welcome page JavaScript check
if tag == '0':
page = '2'
# Email sending check
elif tag == '@':
sent = config['lock']['@']
page = '-1' if sent is None else '1' if sent else '2'
# Payment received check
else:
page = check_payment(parameters)
headers = [('Content-type', 'text/plain')]
elif request == 'cancel':
try:
tag = parameters['id'][0]
if tag:
logging.info('Payment {} cancelled'.format(tag))
unlock_address(tag)
except:
if sys.exc_info()[0] is KeyError:
logging.error('Missing required GET argument: {}'.format(sys.exc_info()[1]))
else:
logging.error(sys.exc_info()[1])
status = '303 See Other\nLocation: {}'.format(config['payment_return'])
page = 'Redirecting...'
elif request == 'logs':
try:
page = show_logs(parameters)
except:
logging.error(sys.exc_info()[1])
status = '400 Bad Request'
page = load_file('error.html').format(err=status)
elif request == 'email':
headers = [('Content-type', 'text/plain')]
if 'email' not in config:
logging.error('Sendmail failed: Email address is not set')
page = '0'
else:
# Unset previous status
config['lock']['@'] = None
# Assemble message parts
if 'date' not in parameters:
date = datetime.date.today().isoformat()
else:
date = parameters['date'][0]
listing = show_logs(parameters, plaintext=True)
subject = '[MiniPOS] Transaction listing for {}'.format(date)
# Send an email in a background thread
sendmail.background_send(config, config['email'], subject, listing, callback=email_sent)
page = '-1'
elif request == 'welcome':
footer = load_file('welcome_footer.html', override=True, null=True)
page = load_file('welcome.html').format(label=config['label'], welcome_footer=footer)
elif request == 'request':
tax = 'Discount' if config['taxrate'] < 0 else 'Tax'
filler = {
'currencies': repr(config['currencies']),
'timeout': config['welcome_timeout'],
'cur': config['currencies'][0],
'tax': tax,
'taxrate': config['taxrate'],
'label': config['label'],
'centkey': '00' if config['auto_cents'] else '.',
}
filler['currency_disabled'] = 'disabled' if len(config['currencies']) == 1 else ''
filler['tax_disabled'] = 'disabled' if config['taxrate'] == 0 else ''
page = load_file('request.html').format_map(filler)
# Redirect blank request to main page
elif request == '':
if config['welcome_timeout'] > 0:
status = '303 See Other \nLocation: welcome'
else:
status = '303 See Other \nLocation: request'
page = 'Redirecting...'
# Load non-generated files from disk
if page == '':
status, headers, page = serve_static_file(request)
# Serve the page
start_response(status, headers)
if type(page) is bytes:
return [page]
return [bytes(page, 'UTF-8')]
# Populate txid cache from recent log entries
_today = datetime.datetime.today().strftime('logs/%Y-%m-%d.log')
config['cache'] += read_log_file(_today, txids=True)[1]
_yesterday = (datetime.datetime.today() - datetime.timedelta(1)).strftime('logs/%Y-%m-%d.log')
config['cache'] += read_log_file(_yesterday, txids=True)[1]
# Make sure xpub works
if 'xpub' in config:
try:
if not bch.validate_key(config['xpub']):
logging.warning('xpub is invalid, address generation unavailable')
del(config['xpub'])
except ImportError:
logging.warning('pycoin is not installed, address generation unavailable')
del(config['xpub'])
# Validate addresses
config['index'] = 0
for address in config['addresses'].copy():
if address.startswith('xp'):
logging.warning('Discarding extended key from address list')
config['addresses'].remove(address)
continue
if 'xpub' in config:
try:
addr, idx = address.split(' ')
except ValueError:
logging.warning('Discarding address {} without derivation index'.format(address))
config['addresses'].remove(address)
continue
try:
idx = int(idx)
except ValueError:
logging.warning('Discarding address {} with invalid derivation index {}'.format(addr, idx))
config['addresses'].remove(address)
continue
if not bch.validate_key(addr):
logging.warning('Discarding invalid address {}'.format(addr))
config['addresses'].remove(address)
continue
if addr[0] in 'qp':
generated = bch.generate_address(config['xpub'], idx)
else:
generated = bch.generate_address(config['xpub'], idx, False)
if addr != generated:
logging.debug('Generated address: {}'.format(generated))
logging.warning('Discarding address {} with mismatching derivation index {}'.format(addr, idx))
config['addresses'].remove(address)
continue
# Replace the plaintext entry with a ready-to-use tuple
config['addresses'][config['addresses'].index(address)] = (addr, idx)
if idx >= config['index']:
config['index'] = idx + 1
else:
try:
addr, idx = address.split(' ')
except ValueError:
addr = address
if not bch.validate_key(addr):
logging.warning('Discarding invalid address {}'.format(addr))
config['addresses'].remove(address)
continue
if addr != address:
# Replace the plaintext entry with a ready-to-use tuple
config['addresses'][config['addresses'].index(address)] = (addr, idx)
plural_addrs = 'es' if len(config['addresses']) > 1 else ''
logging.debug('{} receiving address{} loaded'.format(len(config['addresses']), plural_addrs))
if 'xpub' in config:
logging.debug('Current derivation index set to {}'.format(config['index']))
# General initialization
random.seed()
log_lock = threading.Lock()
# Generate visual hash
if config['fingerprinting']:
if 'xpub' in config:
config['hash'] = tridenticon.generate(config['xpub'], scale=7)
else:
address_list = []
for i, addr in enumerate(config['addresses']):
if type(addr) is tuple:
address_list.append(''.join(addr))
else:
address_list.append(addr)
config['hash'] = tridenticon.generate(''.join(sorted(address_list)), scale=7)
# Start the web server
if __name__ == "__main__":
if config['addresses'] == [] and 'xpub' not in config:
logging.critical('No receiving addresses available. Please add some receiving addresses or an extended public key to your config file.')
sys.exit(2)
httpd = make_server('', config['port'], minipos)
logging.info('Serving minipos on port {}...'.format(config['port']))
try:
httpd.serve_forever()
except KeyboardInterrupt:
logging.info('Server stopped.')