Simon Volpert fediscover / master fediscover
master

Tree @master (Download .tar.gz)

fediscover @masterraw · history · blame

#!/usr/bin/env python3
# fediscover - A user profile discovery tool for ActivityPub social servers
# Author: Simon Volpert <simon@simonvolpert.com>
# Project page: https://simonvolpert.com/fediscover/
# This program is free software, released under the Apache License, Version 2.0. See the LICENSE file for more information
# Consult the README file for usage instructions and other helpful hints

import pathlib
import argparse
import subprocess
import random
import requests
import re
import sys

# Set up the environment
random.seed()
base_dir = pathlib.Path('~/.cache/fediscover').expanduser()
base_dir.mkdir(mode=0o755, parents=True, exist_ok=True)
html_cache = base_dir / 'page.html'
profile_regex = re.compile('https?://[a-zA-Z0-9._-]+/users/[a-zA-Z0-9_-]+')
follow_page_regex = re.compile('/users/[a-zA-Z0-9_-]+/follow[ingers]{3}[?]page=[0-9]+')


# A slightly extended list
class CacheFile(list):
	path = None


# Print a log message to standard error
def stderr(string):
	sys.stderr.write(string)
	sys.stderr.write('\n')


# Print a low-priority message to standard error
def verbose(string):
	if args.verbose:
		stderr(string)


# Load a cache file from the cache directory and return a list that is aware of its on-disk location
def load_file(_name):
	_data = CacheFile()
	_path = base_dir / _name
	_data.path = _path
	if _path.is_file():
		for line in _path.read_text().split('\n'):
			line = line.strip()
			if line == '' or line in _data:
				continue
			_data.append(line)
	return _data


# Save open cache files
def save_cache():
	if args.dry_run:
		return
	for _file in [urls.done, urls.new, urls.failed, users.done, users.new, blacklist]:
		try:
			_file.path.write_text('\n'.join(_file) + '\n')
		except (IOError, OSError) as exc:
			stderr(exc)


# Load and cache a web page
def load_url(url):
	page = requests.get(url, timeout=10).text
	html_cache.write_text(page)
	return page


# Add the URL to the appropriate cache
def store_url(url, front=False):
	# Select the correct cache
	if profile_regex.fullmatch(url):
		cache = users
	else:
		cache = urls
	# Already seen URL
	if url in cache.new:
		verbose('Seen {}'.format(url))
	# Already processed URL
	elif url in cache.done:
		verbose('Done {}'.format(url))
	# Blacklisted URL
	elif is_blacklisted(url):
			verbose('Blacklisted {}'.format(url))
			cache.skipped += 1
	# Previously unseen
	else:
		verbose('New {}'.format(url))
		if front:
			cache.new.insert(0, url)
		else:
			cache.new.append(url)
		cache.added += 1


# Check whether the URL is blacklisted
def is_blacklisted(url):
	for word in blacklist:
		if word in url:
			return True
	return False


# Scrape a web page for profile and crawlable URLs
def scrape_page(page, url):
	domain = '/'.join(url.split('/')[0:3])
	# Normalize the page between Mastodon/Pleroma username formats
	page = page.replace('/@', '/users/')
	# Extract all profile URLs
	profile_links = set(profile_regex.findall(page))
	# Cache all previosly unseen profile URLs
	for profile in profile_links:
		store_url(profile)
		# Add following/followers URLs to scraping queue
		for suffix in ['/following', '/followers']:
			store_url(profile + suffix)
	# Find all other following/followers pages
	follow_pages = set(follow_page_regex.findall(page))
	for _url in follow_pages:
		store_url(domain + _url, True)
	# Report scraping results
	stderr('{} new profile URLs added'.format(users.added))
	stderr('{} new crawlable URLs discovered'.format(urls.added))
	stderr('{} URLs skipped due to blacklist'.format(users.skipped + urls.skipped))


# Process URLs from the queue
def crawl():
	while users.added == 0:
		# The crawlable URL queue is empty
		if urls.new == []:
			stderr('No more URLs to crawl. Try adding some URLs using')
			stderr('  fediscover crawl URL [URL ...]')
			raise SystemExit
		# Load the next URL
		url = urls.new.pop(0)
		if is_blacklisted(url):
			verbose('Blacklisted {}'.format(url))
			return
		# If the URL is a profile URL, mark it as seen
		if profile_regex.fullmatch(url):
			if url in users.done:
				users.done.remove(url)
			users.done.append(url)
			# Add following/followers URLs to scraping queue
			for suffix in ['/following', '/followers']:
				store_url(url + suffix)
		stderr('Loading {}'.format(url))
		try:
			page = load_url(url)
		except OSError as exc:
			stderr('Error: {}'.format(exc))
			if url in urls.failed:
				stderr('Skipping {}'.format(url))
				urls.done.append(url)
			else:
				urls.failed.append(url)
				urls.new.append(url)
			continue
		# Process page and mark URL as complete
		scrape_page(page, url)
		if url in urls.done:
			urls.done.remove(url)
		urls.done.append(url)


# Open the working cache files
class urls(object):
	done = load_file('urls.done')
	new = load_file('urls.new')
	failed = load_file('urls.failed')
	added = 0
	skipped = 0


class users(object):
	done = load_file('users.done')
	new = load_file('users.new')
	added = 0
	skipped = 0


blacklist = load_file('black.list')


# Set up command line
parser = argparse.ArgumentParser()
parser.add_argument('--dry-run', action='store_true', help="don't update the state cache")
parser.add_argument('--verbose', action='store_true', help='print more information about what is done')
subparsers = parser.add_subparsers(help='commands', dest='command', required=True)
subparsers.add_parser('stats', help='show cache statistics')
# Options for "crawl"
crawl_parser = subparsers.add_parser('crawl', help='extract user profile links from the next set of URLs')
crawl_parser.add_argument('URL', nargs='*', help='URLs to add to the crawling list')
crawl_parser.add_argument('--cached', action='store_true', help='parse cached page again instead of advancing to the next URL')
# Options for "next"
next_parser = subparsers.add_parser('next', help='show the next profile URL to visit')
next_parser.add_argument('--random', action='store_true', help='pick a random profile URL from the queue instead of the next one')
next_parser.add_argument('--clip', action='store_true', help='copy profile URL to clipboard (requires xclip)')
next_parser.add_argument('--open', action='store_true', help='open profile URL in your web browser')
# Options for "blacklist"
blacklist_parser = subparsers.add_parser('blacklist', help='show or manage the blacklist')
blacklist_parser.add_argument('WORD', nargs='*', help='strings to add to the blacklist')
# Read command line arguments
args = parser.parse_args()


# Print cache stats
if args.command == 'stats':
	print('User profiles to check:', len(users.new))
	print('Processed user profiles:', len(users.done))
	print('URLs to crawl:', len(urls.new))
	print('Processed URLs:', len(urls.done))
	print('Blacklisted strings:', len(blacklist))
# Add a string to the blacklist
elif args.command == 'blacklist':
	if args.WORD == []:
		for word in blacklist:
			print(word)
		raise SystemExit
	for word in args.WORD:
		if args.WORD not in blacklist:
			blacklist.append(args.WORD)
	save_cache()
# Crawl the next known URL
elif args.command == 'crawl':
	# Add URLs passed on the command line to the front of the queue
	for url in reversed(args.URL):
		# Normalize the page between Mastodon/Pleroma username formats
		url = url.replace('/@', '/users/')
		if url in urls.done:
			urls.done.remove(url)
		store_url(url, True)
		# Add following/followers URLs to scraping queue
		if profile_regex.fullmatch(url):
			if url in users.new:
				users.new.remove(url)
			if url in users.done:
				users.done.remove(url)
			users.done.append(url)
			for suffix in ['/following', '/followers']:
				store_url(url + suffix)
	# Process the cached page again
	if args.cached:
		if html_cache.is_file():
			scrape_page(html_cache.read_text(), urls.done[-1])
			save_cache()
			raise SystemExit
		else:
			stderr('No cached page')
	crawl()
	save_cache()
# Show and open the next profile URL for examination
elif args.command == 'next':
	while True:
		# Crawl some pages if no more users
		if users.new == []:
			crawl()
		# Pick a profile URL
		if args.random:
			_user = users.new.pop(random.randint(0, len(users.new) - 1))
		else:
			_user = users.new.pop(0)
		# Check for blacklist
		if not is_blacklisted(_user):
			users.done.append(_user)
			break
	# Print to console
	print(_user)
	save_cache()
	# Copy to clipboard
	if args.clip:
		xclip = subprocess.Popen(['xclip', '-selection', 'c'], stdin=subprocess.PIPE)
		xclip.communicate(bytes(_user, 'UTF-8'))
	# Open in default web browser
	if args.open:
		subprocess.Popen(['xdg-open', _user])