#!/usr/bin/env python3
# fediscover - A user profile discovery tool for ActivityPub social servers
# Author: Simon Volpert <simon@simonvolpert.com>
# Project page: https://simonvolpert.com/fediscover/
# This program is free software, released under the Apache License, Version 2.0. See the LICENSE file for more information
# Consult the README file for usage instructions and other helpful hints
import pathlib
import argparse
import subprocess
import random
import requests
import re
import sys
# Set up the environment
random.seed()
base_dir = pathlib.Path('~/.cache/fediscover').expanduser()
base_dir.mkdir(mode=0o755, parents=True, exist_ok=True)
html_cache = base_dir / 'page.html'
profile_regex = re.compile('https?://[a-zA-Z0-9._-]+/users/[a-zA-Z0-9_-]+')
follow_page_regex = re.compile('/users/[a-zA-Z0-9_-]+/follow[ingers]{3}[?]page=[0-9]+')
# A slightly extended list
class CacheFile(list):
path = None
# Print a log message to standard error
def stderr(string):
sys.stderr.write(string)
sys.stderr.write('\n')
# Print a low-priority message to standard error
def verbose(string):
if args.verbose:
stderr(string)
# Load a cache file from the cache directory and return a list that is aware of its on-disk location
def load_file(_name):
_data = CacheFile()
_path = base_dir / _name
_data.path = _path
if _path.is_file():
for line in _path.read_text().split('\n'):
line = line.strip()
if line == '' or line in _data:
continue
_data.append(line)
return _data
# Save open cache files
def save_cache():
if args.dry_run:
return
for _file in [urls.done, urls.new, urls.failed, users.done, users.new, blacklist]:
try:
_file.path.write_text('\n'.join(_file) + '\n')
except (IOError, OSError) as exc:
stderr(exc)
# Load and cache a web page
def load_url(url):
page = requests.get(url, timeout=10).text
html_cache.write_text(page)
return page
# Add the URL to the appropriate cache
def store_url(url, front=False):
# Select the correct cache
if profile_regex.fullmatch(url):
cache = users
else:
cache = urls
# Already seen URL
if url in cache.new:
verbose('Seen {}'.format(url))
# Already processed URL
elif url in cache.done:
verbose('Done {}'.format(url))
# Blacklisted URL
elif is_blacklisted(url):
verbose('Blacklisted {}'.format(url))
cache.skipped += 1
# Previously unseen
else:
verbose('New {}'.format(url))
if front:
cache.new.insert(0, url)
else:
cache.new.append(url)
cache.added += 1
# Check whether the URL is blacklisted
def is_blacklisted(url):
for word in blacklist:
if word in url:
return True
return False
# Scrape a web page for profile and crawlable URLs
def scrape_page(page, url):
domain = '/'.join(url.split('/')[0:3])
# Normalize the page between Mastodon/Pleroma username formats
page = page.replace('/@', '/users/')
# Extract all profile URLs
profile_links = set(profile_regex.findall(page))
# Cache all previosly unseen profile URLs
for profile in profile_links:
store_url(profile)
# Add following/followers URLs to scraping queue
for suffix in ['/following', '/followers']:
store_url(profile + suffix)
# Find all other following/followers pages
follow_pages = set(follow_page_regex.findall(page))
for _url in follow_pages:
store_url(domain + _url, True)
# Report scraping results
stderr('{} new profile URLs added'.format(users.added))
stderr('{} new crawlable URLs discovered'.format(urls.added))
stderr('{} URLs skipped due to blacklist'.format(users.skipped + urls.skipped))
# Process URLs from the queue
def crawl():
while users.added == 0:
# The crawlable URL queue is empty
if urls.new == []:
stderr('No more URLs to crawl. Try adding some URLs using')
stderr(' fediscover crawl URL [URL ...]')
raise SystemExit
# Load the next URL
url = urls.new.pop(0)
if is_blacklisted(url):
verbose('Blacklisted {}'.format(url))
return
# Convert a username into a profile URL
if '@' in url and '://' not in url:
username, domain = url.split('@')
url = f'https://{domain}/users/{username}'
# If the URL is a profile URL, mark it as seen
if profile_regex.fullmatch(url):
if url in users.done:
users.done.remove(url)
users.done.append(url)
# Add following/followers URLs to scraping queue
for suffix in ['/following', '/followers']:
store_url(url + suffix)
stderr('Loading {}'.format(url))
try:
page = load_url(url)
except OSError as exc:
stderr('Error: {}'.format(exc))
if url in urls.failed:
stderr('Skipping {}'.format(url))
urls.done.append(url)
else:
urls.failed.append(url)
urls.new.append(url)
continue
# Process page and mark URL as complete
scrape_page(page, url)
if url in urls.done:
urls.done.remove(url)
urls.done.append(url)
# Open the working cache files
class urls(object):
done = load_file('urls.done')
new = load_file('urls.new')
failed = load_file('urls.failed')
added = 0
skipped = 0
class users(object):
done = load_file('users.done')
new = load_file('users.new')
added = 0
skipped = 0
blacklist = load_file('black.list')
# Set up command line
parser = argparse.ArgumentParser()
parser.add_argument('--dry-run', action='store_true', help="don't update the state cache")
parser.add_argument('--verbose', action='store_true', help='print more information about what is done')
subparsers = parser.add_subparsers(help='commands', dest='command', required=True)
subparsers.add_parser('stats', help='show cache statistics')
# Options for "crawl"
crawl_parser = subparsers.add_parser('crawl', help='extract user profile links from the next set of URLs')
crawl_parser.add_argument('URL', nargs='*', help='URLs to add to the crawling list')
crawl_parser.add_argument('--cached', action='store_true', help='parse cached page again instead of advancing to the next URL')
# Options for "next"
next_parser = subparsers.add_parser('next', help='show the next profile URL to visit')
next_parser.add_argument('--random', action='store_true', help='pick a random profile URL from the queue instead of the next one')
next_parser.add_argument('--clip', action='store_true', help='copy profile URL to clipboard (requires xclip)')
next_parser.add_argument('--open', action='store_true', help='open profile URL in your web browser')
# Options for "blacklist"
blacklist_parser = subparsers.add_parser('blacklist', help='show or manage the blacklist')
blacklist_parser.add_argument('WORD', nargs='*', help='strings to add to the blacklist')
# Read command line arguments
args = parser.parse_args()
# Print cache stats
if args.command == 'stats':
print('User profiles to check:', len(users.new))
print('Processed user profiles:', len(users.done))
print('URLs to crawl:', len(urls.new))
print('Processed URLs:', len(urls.done))
print('Blacklisted strings:', len(blacklist))
# Add a string to the blacklist
elif args.command == 'blacklist':
if args.WORD == []:
for word in blacklist:
print(word)
raise SystemExit
for word in args.WORD:
if args.WORD not in blacklist:
blacklist.append(args.WORD)
save_cache()
# Crawl the next known URL
elif args.command == 'crawl':
# Add URLs passed on the command line to the front of the queue
for url in reversed(args.URL):
# Normalize the page between Mastodon/Pleroma username formats
url = url.replace('/@', '/users/')
# Add following/followers URLs to scraping queue
if profile_regex.fullmatch(url):
if url in users.new:
users.new.remove(url)
if url in users.done:
users.done.remove(url)
users.done.append(url)
for suffix in ['/following', '/followers']:
store_url(url + suffix)
# Refresh or add normal URLs to the scraping queue
else:
if url in urls.done:
urls.done.remove(url)
store_url(url, True)
# Process the cached page again
if args.cached:
if html_cache.is_file():
scrape_page(html_cache.read_text(), urls.done[-1])
save_cache()
raise SystemExit
else:
stderr('No cached page')
crawl()
save_cache()
# Show and open the next profile URL for examination
elif args.command == 'next':
while True:
# Crawl some pages if no more users
if users.new == []:
crawl()
# Pick a profile URL
if args.random:
_user = users.new.pop(random.randint(0, len(users.new) - 1))
else:
_user = users.new.pop(0)
# Check for blacklist
if not is_blacklisted(_user):
users.done.append(_user)
break
# Print to console
print(_user)
save_cache()
# Copy to clipboard
if args.clip:
xclip = subprocess.Popen(['xclip', '-selection', 'c'], stdin=subprocess.PIPE)
xclip.communicate(bytes(_user, 'UTF-8'))
# Open in default web browser
if args.open:
subprocess.Popen(['xdg-open', _user])