Simon Volpert gmipay / master buy
master

Tree @master (Download .tar.gz)

buy @masterraw · history · blame

#!/usr/bin/env python3
# gmipay - Gemini Paywall CGI
# Author: Simon Volpert <simon@simonvolpert.com>
# Project page: gemini://simonvolpert.com/gmipay/
# This program is free software, released under the MIT license. See the LICENSE file for more information
__version__ = '1.2'

import os
import sys
from pathlib import Path
import urllib.parse
import urllib.request
import json
import mimetypes
import datetime
import qrcode
import io

# Packages installed via PIP as root are unreachable from Jetforce CGI; Patch the module search paths to make them discoverable
# See https://github.com/michael-lazar/jetforce/issues/63
sys.path.append(f'/usr/local/lib/python{sys.version_info.major}.{sys.version_info.minor}/dist-packages')
import coinaddress


# Possible state directory locations in order of decreasing priority
state_locations = [
	Path('/etc/gmipay'),             # System-wide config directory
	Path.home() / '.config/gmipay',  # User config directory
	Path.cwd(),                      # Current working directory
	Path('/tmp/gmipay')              # Transient directory
]

TIMEOUT = 5
# Data indices in the payment request record
R_NAME = 0
R_STATUS = 1
R_ADDR = 2
R_AMOUNT = 3
R_TIME = 4

# Configuration defaults
file_path = ''
xpub = ''
default_price = 1000.0
api_url = 'https://api.blockchair.com/bitcoin-cash/dashboards/address/{address}'
api_key_balance = 'data.{address}.address.balance'
api_key_unconfirmed = ''
api_unit = 'satoshi'
address = '{address}'
unlock_time = 0
global_unlock = 'no'

CONFIG_TEMPLATE = f'''# gmipay configuration file
# Set this to the absolute path where your paywalled files are located. This
# location should be in a place inaccessible from your capsule (outside the
# document root):
path={file_path}
# Set this to the extended public key (xPub) of your receiving wallet, which
# will be used to generate a fresh receiving address for every payment.
xpub=
# Set this to the default price any given file should be in BITs (1,000,000th
# of a Bitcoin Cash):
default_price={default_price}
# The period of time for which the files should stay unlocked, in days. If set
# to 0, the files will stay unlocked indefinitely.
unlock_time={unlock_time}
# Set this to "yes" to have a single payment blanket unlock all paywalled
# files:
global_unlock={global_unlock}

# Block explorer API options
# You can replace these with parameters for your preferred block explorer.
# Any occurrences of the string '{address}' will be replaced with the queried
# address. Check the relevant block explorer's API documentation.
# The URL to GET data from:
api_url={api_url}
# The JSON hierarchy representing the address balance, nested fields delimited
# with dots:
api_key_balance={api_key_balance}
# The same, but for unconfirmed balance, only if balance above key doesn't
# already include it:
api_key_unconfirmed={api_key_unconfirmed}
# The unit the balance is represented in ("native" or "satoshi"):
api_unit={api_unit}
# Be wary of block explorer API rate limits. If you perform many requests per
# minute, consider running your own private block explorer instance.'''


# Read persistent state
def read_state(filename):
	for location in state_locations:
		try:
			state_file = location / filename
			data = [x for x in state_file.read_text().split('\n') if x != '']
			return data
		except OSError:
			continue


# Write persistent state
def write_state(filename, data):
	for location in state_locations:
		try:
			location.mkdir(parents=True, exist_ok=True)
			state_file = location / filename
			state_file.write_text('\n'.join(data) + '\n')
			return
		except OSError:
			continue
	print('42 Unable to write state\r')
	sys.exit()


# Write to log file
def write_log(string):
	for location in state_locations:
		try:
			with open(location / 'paywall.log', 'a') as log:
				log.write(string)
			sys.stderr.write(string)
			return
		except OSError:
			pass


# Get the value at the end of a dot-separated key path
def get_value(json_object, key_path):
	for k in key_path.split('.'):
		# Process integer indices
		try:
			k = int(k)
		except ValueError:
			pass
		# Expand the key
		json_object = json_object[k]
	return json_object


# Get the current balance of an address from a block explorer (in BITs)
def get_balance(address):
	try:
		request = urllib.request.Request(api_url.replace('{address}', address), headers={})
		with urllib.request.urlopen(request, timeout=TIMEOUT) as webpage:
			data = json.loads(str(webpage.read(), 'UTF-8'))
		unconfirmed = float(get_value(data, api_key_unconfirmed.replace('{address}', address))) if api_key_unconfirmed != '' else 0
		balance = float(get_value(data, api_key_balance.replace('{address}', address))) + unconfirmed
		if api_unit == 'satoshi':
			balance /= 100
		elif api_unit == 'native':
			balance /= 1000000
		return balance
	except BaseException as exc:
		print(f'42 Error getting address balance: {exc}\r')
		sys.stderr.write(f'Error getting address balance: {exc}\n')
		sys.exit()


# Format and write out access data
def write_access_data():
	access_file = []
	for line in access_data:
		access_file.append('	'.join(line))
	write_state(cert_hash, access_file)


# Render an invoice page
def show_invoice():
	address = record[R_ADDR]
	native_amount = float(default_price) / 1000000
	description = request_filename
	if unlock_time == 0:
		period = 'indefinitely'
		if request_filename == '*':
			description = f'full unrestricted access to {server_name}'
	else:
		plural = 's' if unlock_time > 1 else ''
		period = f'for {unlock_time} day{plural}'
		if request_filename == '*':
			description = f'{server_name} subscription'
	filename = urllib.parse.quote_plus(description)
	payment_url = f'bitcoincash:{address}?amount={native_amount}&message={filename}'
	print('20 text/gemini\r')
	print(f'# Paying for {description}')
	print(f'Please send at least {default_price} BIT to {address}')
	print(f'=> {payment_url} Open this payment request in your wallet\n')
	print('After sending your payment, reload this page to receive your file.')
	files = 'All files' if request_filename == '*' else 'This file'
	print(f'{files} will remain unlocked for this client certificate {period}.')
	print('Payment request QR codes:')
	qr = qrcode.QRCode()
	qr.add_data(payment_url)
	stream = io.StringIO()
	qr.print_ascii(out=stream)
	qr.print_ascii(out=stream, invert=True)
	stream.seek(0)
	output = stream.read().replace('\xa0', ' ')
	print(f'```\n{output}```')


# Generate a new receiving address for use in invoices
def generate_address():
	idx = read_state('xpub_index')
	idx = 0 if idx is None else int(idx[0]) + 1
	write_state('xpub_index', [str(idx)])
	address = coinaddress.address_from_xpub(network='bitcoin_cash', xpub=xpub, path=f'0/{idx}').replace('bitcoincash:', '')
	return address


# Collect request parameters
server_name = os.environ['SERVER_NAME'] if 'SERVER_NAME' in os.environ else ''
path_info = os.environ['PATH_INFO'].strip('/') if 'PATH_INFO' in os.environ else ''
query_string = os.environ['QUERY_STRING'] if 'QUERY_STRING' in os.environ else ''
remote_addr = os.environ['REMOTE_ADDR'] if 'REMOTE_ADDR' in os.environ else ''
cert_hash = os.environ['TLS_CLIENT_HASH'].lower().replace(':', '_') if 'TLS_CLIENT_HASH' in os.environ else ''

# Find and read the configuration file
_data = read_state('paywall.conf')
# If no config file is found, write out defaults
if _data is None:
	write_state('paywall.conf', [CONFIG_TEMPLATE])
	_data = CONFIG_TEMPLATE.split('\n')
# Scan the config file for relevant options
for line in _data:
	if line.startswith('path='):
		file_path = line[5:]
	elif line.startswith('xpub='):
		xpub = line[5:]
	elif line.startswith('default_price='):
		default_price = round(float(line[14:]), 2)
	elif line.startswith('unlock_time='):
		unlock_time = int(line[12:])
	elif line.startswith('global_unlock='):
		global_unlock = line[14:]
	elif line.startswith('api_url='):
		api_url = line[8:]
	elif line.startswith('api_key_balance='):
		api_key_balance = line[16:]
	elif line.startswith('api_key_unconfirmed='):
		api_key_unconfirmed = line[20:]
	elif line.startswith('api_unit='):
		api_unit = line[9:]

global_unlock = global_unlock == 'yes'

# Check configuration validity
if file_path == '':
	print('42 Missing required configuration value: path\r')
	sys.exit()
if xpub == '':
	print('42 Missing required configuration value: xpub\r')
	sys.exit()
if api_unit not in ['native', 'satoshi']:
	print(f'42 Bad configuration value for api_unit: {api_unit}\r')
	sys.exit()
# Validate the provided xPub
try:
	coinaddress.address_from_xpub(network='bitcoin_cash', xpub=xpub, path='0/0')
except ValueError:
	print(f'42 Bad configuration value for xpub: {xpub}\r')
	sys.exit()
# Require a filename parameter
if path_info == '':
	print('59 Missing file name\r')
	sys.exit()
# Check if the file exists
request_file = Path(file_path) / path_info
if not request_file.is_file():
	print('51 Not found\r')
	sys.exit()
# Check authentication
if cert_hash == '':
	print('60 Client certificate required\r')
	sys.exit()

# Check if client has an access file and create one if not
access_file = read_state(cert_hash)
if access_file is None:
	write_state(cert_hash, [])
	access_file = []
access_data = []
for line in access_file:
	if line == '':
		continue
	line = line.split('	')
	access_data.append(line)

# Override for global unlock
request_filename = '*' if global_unlock else request_file.name

# Find the row in the access file which refers to the requested file
access_index = -1
for i, record in enumerate(access_data):
	if record[R_NAME] == request_filename:
		access_index = i
		break

now = datetime.datetime.now()
timestamp = now.isoformat(' ', timespec='seconds')
# First request to access file by this client
if access_index == -1:
	# Create an invoice
	address = generate_address()
	record = [request_filename, 'invoice', address, 'x', timestamp]
	access_data.append(record)
	write_access_data()
	# Write to log file
	write_log(f'{timestamp} -- New invoice for {request_filename} by {cert_hash} on address {address}\n')
# Subsequent requests to access file by this client
# Check access expiry time for timed unlocks
if unlock_time > 0 and record[R_STATUS] == 'paid':
	old_timestamp = datetime.datetime.fromisoformat(record[R_TIME].replace(' ', 'T'))
	# Calculate the length of the prepaid period
	prepaid_time = unlock_time * float(record[R_AMOUNT]) / default_price
	delta = datetime.timedelta(days=prepaid_time)
	if now > old_timestamp + delta:
		record[R_STATUS] = 'expired'
		record[R_AMOUNT] = 'x'
# Check whether payment was received
if record[R_STATUS] != 'paid':
	record[R_TIME] = timestamp
	address = record[R_ADDR]
	new_balance = get_balance(address)
	if record[R_AMOUNT] == 'x':
		balance = new_balance
		record[R_AMOUNT] = str(balance)
	else:
		balance = float(record[R_AMOUNT])
	change = new_balance - balance
	if change < default_price:
		if record[R_STATUS] == 'expired':
			write_log(f'{timestamp} -- Recreated invoice for {request_filename} by {cert_hash} on address {address}\n')
			record[R_STATUS] = 'invoice'
		write_access_data()
		show_invoice()
		sys.exit()
	# Mark payment as received
	record[R_STATUS] = 'paid'
	record[R_AMOUNT] = str(change)
	write_access_data()
	# Write to log file
	write_log(f'{timestamp} -- Payment received of {change} BIT to {address} from {cert_hash} for {request_filename}\n')
# Give the client their file
if record[R_STATUS] == 'paid':
	# Patch mime type definitions with Gemini-specific types
	mimetypes.add_type('text/gemini', '.gmi', strict=False)
	# Determine the mime type of the served file
	mime = mimetypes.guess_type(request_file, strict=False)[0]
	if mime is None:
		mime = 'application/octet-stream'
	# Serve the file
	print(f'20 {mime}\r')
	sys.stdout.flush()
	sys.stdout.buffer.write(request_file.read_bytes())