#!/usr/bin/env python3
# gmipay - Gemini Paywall CGI
# Author: Simon Volpert <simon@simonvolpert.com>
# Project page: gemini://simonvolpert.com/gmipay/
# This program is free software, released under the MIT license. See the LICENSE file for more information
__version__ = '1.3'
import os
import sys
from pathlib import Path
import urllib.parse
import urllib.request
import json
import mimetypes
import datetime
import qrcode
import io
# Packages installed via PIP as root are unreachable from Jetforce CGI; Patch the module search paths to make them discoverable
# See https://github.com/michael-lazar/jetforce/issues/63
sys.path.append(f'/usr/local/lib/python{sys.version_info.major}.{sys.version_info.minor}/dist-packages')
import coinaddress
# Possible state directory locations in order of decreasing priority
state_locations = [
Path('/etc/gmipay'), # System-wide config directory
Path.home() / '.config/gmipay', # User config directory
Path.cwd(), # Current working directory
Path('/tmp/gmipay') # Transient directory
]
TIMEOUT = 5
# Data indices in the payment request record
R_NAME = 0
R_STATUS = 1
R_ADDR = 2
R_AMOUNT = 3
R_TIME = 4
# Configuration defaults
file_path = ''
xpub = ''
default_price = 1000.0
api_url = 'https://rest.bch.actorforth.org/v2/address/details/{address}'
api_key_balance = 'balanceSat'
api_key_unconfirmed = ''
api_unit = 'satoshi'
address = '{address}'
unlock_time = 0
global_unlock = 'no'
CONFIG_TEMPLATE = f'''# gmipay configuration file
# Set this to the absolute path where your paywalled files are located. This
# location should be in a place inaccessible from your capsule (outside the
# document root):
path={file_path}
# Set this to the extended public key (xPub) of your receiving wallet, which
# will be used to generate a fresh receiving address for every payment.
xpub=
# Set this to the default price any given file should be in BITs (1,000,000th
# of a Bitcoin Cash):
default_price={default_price}
# The period of time for which the files should stay unlocked, in days. If set
# to 0, the files will stay unlocked indefinitely.
unlock_time={unlock_time}
# Set this to "yes" to have a single payment blanket unlock all paywalled
# files:
global_unlock={global_unlock}
# Block explorer API options
# You can replace these with parameters for your preferred block explorer.
# Any occurrences of the string '{address}' will be replaced with the queried
# address. Check the relevant block explorer's API documentation.
# The URL to GET data from:
api_url={api_url}
# The JSON hierarchy representing the address balance, nested fields delimited
# with dots:
api_key_balance={api_key_balance}
# The same, but for unconfirmed balance, only if balance above key doesn't
# already include it:
api_key_unconfirmed={api_key_unconfirmed}
# The unit the balance is represented in ("native" or "satoshi"):
api_unit={api_unit}
# Be wary of block explorer API rate limits. If you perform many requests per
# minute, consider running your own private block explorer instance.'''
# Read persistent state
def read_state(filename):
for location in state_locations:
try:
state_file = location / filename
data = [x for x in state_file.read_text().split('\n') if x != '']
return data
except OSError:
continue
# Write persistent state
def write_state(filename, data):
for location in state_locations:
try:
location.mkdir(parents=True, exist_ok=True)
state_file = location / filename
state_file.write_text('\n'.join(data) + '\n')
return
except OSError:
continue
print('42 Unable to write state\r')
sys.exit()
# Write to log file
def write_log(string):
for location in state_locations:
try:
with open(location / 'paywall.log', 'a') as log:
log.write(string)
sys.stderr.write(string)
return
except OSError:
pass
# Get the value at the end of a dot-separated key path
def get_value(json_object, key_path):
for k in key_path.split('.'):
# Process integer indices
try:
k = int(k)
except ValueError:
pass
# Expand the key
json_object = json_object[k]
return json_object
# Get the current balance of an address from a block explorer (in BITs)
def get_balance(address):
try:
request = urllib.request.Request(api_url.replace('{address}', address), headers={})
with urllib.request.urlopen(request, timeout=TIMEOUT) as webpage:
data = json.loads(str(webpage.read(), 'UTF-8'))
unconfirmed = float(get_value(data, api_key_unconfirmed.replace('{address}', address))) if api_key_unconfirmed != '' else 0
balance = float(get_value(data, api_key_balance.replace('{address}', address))) + unconfirmed
if api_unit == 'satoshi':
balance /= 100
elif api_unit == 'native':
balance /= 1000000
return balance
except BaseException as exc:
print(f'42 Error getting address balance: {exc}\r')
sys.stderr.write(f'Error getting address balance: {exc}\n')
write_log(f'Error getting address balance: {exc}\n')
sys.exit()
# Format and write out access data
def write_access_data():
access_file = []
for line in access_data:
access_file.append(' '.join(line))
write_state(cert_hash, access_file)
# Render an invoice page
def show_invoice():
address = record[R_ADDR]
native_amount = float(default_price) / 1000000
description = request_filename
if unlock_time == 0:
period = 'indefinitely'
if request_filename == '*':
description = f'full unrestricted access to {server_name}'
else:
plural = 's' if unlock_time > 1 else ''
period = f'for {unlock_time} day{plural}'
if request_filename == '*':
description = f'{server_name} subscription'
filename = urllib.parse.quote_plus(description)
payment_url = f'bitcoincash:{address}?amount={native_amount}&message={filename}'
print('20 text/gemini\r')
print(f'# Paying for {description}')
print(f'Please send at least {default_price} BIT to {address}')
print(f'=> {payment_url} Open this payment request in your wallet\n')
print('After sending your payment, reload this page to receive your file.')
files = 'All files' if request_filename == '*' else 'This file'
print(f'{files} will remain unlocked for this client certificate {period}.')
print('Payment request QR codes:')
qr = qrcode.QRCode()
qr.add_data(payment_url)
stream = io.StringIO()
qr.print_ascii(out=stream)
qr.print_ascii(out=stream, invert=True)
stream.seek(0)
output = stream.read().replace('\xa0', ' ')
print(f'```\n{output}```')
# Generate a new receiving address for use in invoices
def generate_address():
idx = read_state('xpub_index')
idx = 0 if idx is None else int(idx[0]) + 1
write_state('xpub_index', [str(idx)])
address = coinaddress.address_from_xpub(network='bitcoin_cash', xpub=xpub, path=f'0/{idx}').replace('bitcoincash:', '')
return address
# Collect request parameters
server_name = os.environ['SERVER_NAME'] if 'SERVER_NAME' in os.environ else ''
path_info = os.environ['PATH_INFO'].strip('/') if 'PATH_INFO' in os.environ else ''
query_string = os.environ['QUERY_STRING'] if 'QUERY_STRING' in os.environ else ''
remote_addr = os.environ['REMOTE_ADDR'] if 'REMOTE_ADDR' in os.environ else ''
cert_hash = os.environ['TLS_CLIENT_HASH'].lower().replace(':', '_') if 'TLS_CLIENT_HASH' in os.environ else ''
# Find and read the configuration file
_data = read_state('paywall.conf')
# If no config file is found, write out defaults
if _data is None:
write_state('paywall.conf', [CONFIG_TEMPLATE])
_data = CONFIG_TEMPLATE.split('\n')
# Scan the config file for relevant options
for line in _data:
if line.startswith('path='):
file_path = line[5:]
elif line.startswith('xpub='):
xpub = line[5:]
elif line.startswith('default_price='):
default_price = round(float(line[14:]), 2)
elif line.startswith('unlock_time='):
unlock_time = int(line[12:])
elif line.startswith('global_unlock='):
global_unlock = line[14:]
elif line.startswith('api_url='):
api_url = line[8:]
elif line.startswith('api_key_balance='):
api_key_balance = line[16:]
elif line.startswith('api_key_unconfirmed='):
api_key_unconfirmed = line[20:]
elif line.startswith('api_unit='):
api_unit = line[9:]
global_unlock = global_unlock == 'yes'
# Check configuration validity
if file_path == '':
print('42 Missing required configuration value: path\r')
sys.exit()
if xpub == '':
print('42 Missing required configuration value: xpub\r')
sys.exit()
if api_unit not in ['native', 'satoshi']:
print(f'42 Bad configuration value for api_unit: {api_unit}\r')
sys.exit()
# Validate the provided xPub
try:
coinaddress.address_from_xpub(network='bitcoin_cash', xpub=xpub, path='0/0')
except ValueError:
print(f'42 Bad configuration value for xpub: {xpub}\r')
sys.exit()
# Require a filename parameter
if path_info == '':
print('59 Missing file name\r')
sys.exit()
# Check if the file exists
request_file = Path(file_path) / path_info
if not request_file.is_file():
print('51 Not found\r')
sys.exit()
# Check authentication
if cert_hash == '':
print('60 Client certificate required\r')
sys.exit()
# Check if client has an access file and create one if not
access_file = read_state(cert_hash)
if access_file is None:
write_state(cert_hash, [])
access_file = []
access_data = []
for line in access_file:
if line == '':
continue
line = line.split(' ')
access_data.append(line)
# Override for global unlock
request_filename = '*' if global_unlock else request_file.name
# Find the row in the access file which refers to the requested file
access_index = -1
for i, record in enumerate(access_data):
if record[R_NAME] == request_filename:
access_index = i
break
now = datetime.datetime.now()
timestamp = now.isoformat(' ', timespec='seconds')
# First request to access file by this client
if access_index == -1:
# Create an invoice
address = generate_address()
record = [request_filename, 'invoice', address, 'x', timestamp]
access_data.append(record)
write_access_data()
# Write to log file
write_log(f'{timestamp} -- New invoice for {request_filename} by {cert_hash} on address {address}\n')
# Subsequent requests to access file by this client
# Check access expiry time for timed unlocks
if unlock_time > 0 and record[R_STATUS] == 'paid':
old_timestamp = datetime.datetime.fromisoformat(record[R_TIME].replace(' ', 'T'))
# Calculate the length of the prepaid period
prepaid_time = unlock_time * float(record[R_AMOUNT]) / default_price
delta = datetime.timedelta(days=prepaid_time)
if now > old_timestamp + delta:
record[R_STATUS] = 'expired'
record[R_AMOUNT] = 'x'
# Check whether payment was received
if record[R_STATUS] != 'paid':
record[R_TIME] = timestamp
address = record[R_ADDR]
new_balance = get_balance(address)
if record[R_AMOUNT] == 'x':
balance = new_balance
record[R_AMOUNT] = str(balance)
else:
balance = float(record[R_AMOUNT])
change = new_balance - balance
if change < default_price:
if record[R_STATUS] == 'expired':
write_log(f'{timestamp} -- Recreated invoice for {request_filename} by {cert_hash} on address {address}\n')
record[R_STATUS] = 'invoice'
write_access_data()
show_invoice()
sys.exit()
# Mark payment as received
record[R_STATUS] = 'paid'
record[R_AMOUNT] = str(change)
write_access_data()
# Write to log file
write_log(f'{timestamp} -- Payment received of {change} BIT to {address} from {cert_hash} for {request_filename}\n')
# Give the client their file
if record[R_STATUS] == 'paid':
# Patch mime type definitions with Gemini-specific types
mimetypes.add_type('text/gemini', '.gmi', strict=False)
# Determine the mime type of the served file
mime = mimetypes.guess_type(request_file, strict=False)[0]
if mime is None:
mime = 'application/octet-stream'
# Serve the file
print(f'20 {mime}\r')
sys.stdout.flush()
sys.stdout.buffer.write(request_file.read_bytes())