"""Augments the standard original_post_discovery algorithm with a
reverse lookup that supports posts without a backlink or citation.
Performs a reverse-lookup that scans the activity's author's ``h-feed``
for posts with rel=syndication links. As we find syndicated copies,
save the relationship. If we find the original post for the activity
in question, return the original's URL.
See http://indiewebcamp.com/posse-post-discovery for more detail.
This feature adds costs in terms of HTTP requests and database
lookups in the following primary cases:
* Author's domain is known to be invalid or blocklisted, there will
be 0 requests and 0 DB lookups.
* For a syndicated post has been seen previously (regardless of
whether discovery was successful), there will be 0 requests and 1
DB lookup.
* The first time a syndicated post has been seen:
* 1 to 2 HTTP requests to get and parse the ``h-feed`` plus 1 additional
request for *each* post permalink that has not been seen before.
* 1 DB query for the initial check plus 1 additional DB query for
*each* post permalink.
"""
import collections
import itertools
import logging
import mf2util
from granary import as1
from granary import microformats2
from webutil.appengine_info import DEBUG
import models
from models import SyndicatedPost
import util
logger = logging.getLogger(__name__)
MAX_PERMALINK_FETCHES = 10
MAX_PERMALINK_FETCHES_BETA = 50
MAX_FEED_ENTRIES = 100
MAX_ORIGINAL_CANDIDATES = 10
MAX_MENTION_CANDIDATES = 10
# this was 30 in google.appengine.ext.ndb. haven't found it in google.cloud.ndb
# yet, or whether it's even there at all, but we only rarely hit it anyway, so
# let's just keep it as is for now.
MAX_ALLOWABLE_QUERIES = 30
MF2_HTML_MIME_TYPE= 'text/mf2+html'
[docs]
def discover(source, activity, fetch_hfeed=True, include_redirect_sources=True,
already_fetched_hfeeds=None):
r"""Augments the standard original post discovery algorithm with a
reverse lookup that supports posts without a backlink or citation.
If ``fetch_hfeed`` is False, then we will check the db for previously found
:class:`models.SyndicatedPost`\s but will not do posse-post-discovery to find
new ones.
Args:
source (models.Source): subclass. Changes to property values (e.g.
`domains``, ``domain_urls``, ``last_syndication_url``) are stored in
``source.updates``\; they should be updated transactionally later.
activity (dict)
fetch_hfeed (bool)
include_redirect_sources (bool): whether to include URLs that redirect as
well as their final destination URLs
already_fetched_hfeeds (set of str): URLs that we have already fetched and
run posse-post-discovery on, so we can avoid running it multiple times
Returns:
(set of str, set of str) tuple: (original post URLs, mention URLs)
"""
label = activity.get('url') or activity.get('id')
logger.debug(f'discovering original posts for: {label}')
if not source.updates:
source.updates = {}
if already_fetched_hfeeds is None:
already_fetched_hfeeds = set()
originals, mentions = as1.original_post_discovery(
activity, domains=source.domains,
include_redirect_sources=include_redirect_sources,
include_reserved_hosts=DEBUG, max_redirect_fetches=MAX_ORIGINAL_CANDIDATES,
headers=util.request_headers(source=source))
# only include mentions of the author themselves.
# (mostly just for Mastodon; other silos' domains are all in the blocklist, so
# their mention URLs get dropped later anyway.)
# (these are originally added in Source._inject_user_urls() and in poll step 2.)
obj = activity.get('object', {})
other_user_mentions = set(
t.get('url') for t in obj.get('tags', [])
if t.get('objectType') == 'person' and t.get('url') not in source.domain_urls)
originals -= other_user_mentions
mentions -= other_user_mentions
# original posts are only from the author themselves
owner = activity.get('actor') or obj.get('author') or {}
owner_ids = util.trim_nulls([owner.get('id'), owner.get('username')])
source_ids = util.trim_nulls([source.key.id(), source.user_tag_id()])
if source.USERNAME_KEY_ID:
owner_ids = [id.lower() for id in owner_ids]
source_ids = [id.lower() for id in source_ids]
if owner_ids and not set(owner_ids) & set(source_ids):
logger.info(f"Demoting original post links because user ids {source_ids} don't match author ids {owner_ids}")
# this is someone else's post, so all links must be mentions
mentions.update(originals)
originals = set()
# look for original URL of attachments (e.g. quote tweets)
for att in obj.get('attachments', []):
if (att.get('objectType') in ('note', 'article')
and att.get('author', {}).get('id') == source.user_tag_id()):
logger.debug(f"running original post discovery on attachment: {att.get('id')}")
att_origs, _ = discover(
source, att, include_redirect_sources=include_redirect_sources)
logger.debug(f'original post discovery found originals for attachment, {att_origs}')
mentions.update(att_origs)
if len(originals) > MAX_ORIGINAL_CANDIDATES:
logger.info(f'{len(originals)} originals, pruning down to {MAX_ORIGINAL_CANDIDATES}')
originals = sorted(originals)[:MAX_ORIGINAL_CANDIDATES]
if len(mentions) > MAX_MENTION_CANDIDATES:
logger.info(f'{len(mentions)} mentions, pruning down to {MAX_MENTION_CANDIDATES}')
mentions = sorted(mentions)[:MAX_MENTION_CANDIDATES]
def resolve(urls):
resolved = set()
for url in urls:
final, domain, send = util.get_webmention_target(url)
if send and domain != source.gr_source.DOMAIN:
resolved.add(final)
if include_redirect_sources:
resolved.add(url)
return resolved
originals = resolve(originals)
mentions = resolve(mentions)
if not source.get_author_urls():
logger.debug('no author url(s), cannot find h-feed')
return ((originals, mentions) if not source.BACKFEED_REQUIRES_SYNDICATION_LINK
else (set(), set()))
# TODO possible optimization: if we've discovered a backlink to a post on the
# author's domain (i.e., it included a link or citation), then skip the rest
# of this.
syndicated = []
syndication_url = obj.get('url') or activity.get('url')
if syndication_url:
# use the canonical syndication url on both sides, so that we have
# the best chance of finding a match. Some silos allow several
# different permalink formats to point to the same place.
syndication_url = source.canonicalize_url(syndication_url)
if syndication_url:
syndicated = _posse_post_discovery(source, activity, syndication_url,
fetch_hfeed, already_fetched_hfeeds)
originals.update(syndicated)
originals = set(util.dedupe_urls(originals))
if not syndication_url:
logger.debug(f'no {source.SHORT_NAME} syndication url, cannot process h-entries')
return ((originals, mentions) if not source.BACKFEED_REQUIRES_SYNDICATION_LINK
else (set(syndicated), set()))
[docs]
def refetch(source):
r"""Refetch the author's URLs and look for new or updated syndication
links that might not have been there the first time we looked.
Args:
source (models.Source): Changes to property values (e.g. ``domains``,
``domain_urls``, ``last_syndication_url``) are stored in source.updates;
they should be updated transactionally later.
Returns:
dict: mapping syndicated_url to a list of new :class:`models.SyndicatedPost`\s
"""
logger.debug(f'attempting to refetch h-feed for {source.label()}')
if not source.updates:
source.updates = {}
results = {}
for url in _get_author_urls(source):
results.update(_process_author(source, url, refetch=True))
return results
[docs]
def targets_for_response(resp, originals, mentions):
"""Returns the URLs that we should send webmentions to for a given response.
...specifically, all responses except posts get sent to original post URLs,
but only posts and comments get sent to mentioned URLs.
Args:
resp (dict): ActivityStreams response object
originals, mentions (sequence of str) URLs
Returns:
set of str: URLs
"""
type = models.Response.get_type(resp)
targets = set()
if type != 'post':
targets |= originals
if type in ('post', 'comment'):
targets |= mentions
return targets
def _posse_post_discovery(source, activity, syndication_url, fetch_hfeed,
already_fetched_hfeeds):
"""Performs the actual meat of the posse-post-discover.
Args:
source (models.Source)
activity (dict)
syndication_url (str): url of the syndicated copy for which we are
trying to find an original
fetch_hfeed (bool): whether or not to fetch and parse the
author's feed if we don't have a previously stored
relationship
already_fetched_hfeeds (set of str): URLs we've already fetched in a
previous iteration
Return:
list of str: original post urls, possibly empty
"""
logger.info(f'starting posse post discovery with syndicated {syndication_url}')
relationships = SyndicatedPost.query(
SyndicatedPost.syndication == syndication_url,
ancestor=source.key).fetch()
if source.IGNORE_SYNDICATION_LINK_FRAGMENTS:
relationships += SyndicatedPost.query(
# prefix search to find any instances of this synd link with a fragment
SyndicatedPost.syndication > f'{syndication_url}#',
SyndicatedPost.syndication < f'{syndication_url}#\ufffd',
ancestor=source.key).fetch()
if not relationships and fetch_hfeed:
# a syndicated post we haven't seen before! fetch the author's URLs to see
# if we can find it.
#
# TODO: Consider using the actor's url, with get_author_urls() as the
# fallback in the future to support content from non-Bridgy users.
results = {}
for url in _get_author_urls(source):
if url not in already_fetched_hfeeds:
results.update(_process_author(source, url))
already_fetched_hfeeds.add(url)
else:
logger.debug(f'skipping {url}, already fetched this round')
relationships = results.get(syndication_url, [])
if not relationships:
# No relationships were found. Remember that we've seen this
# syndicated post to avoid reprocessing it every time
logger.debug(f'posse post discovery found no relationship for {syndication_url}')
if fetch_hfeed:
SyndicatedPost.insert_syndication_blank(source, syndication_url)
originals = [r.original for r in relationships if r.original]
if originals:
logger.debug(f'posse post discovery found relationship(s) {syndication_url} -> {originals}')
return originals
def _process_author(source, author_url, refetch=False, store_blanks=True):
r"""Fetch the author's domain URL, and look for syndicated posts.
Args:
source (models.Source)
author_url (str): the author's homepage URL
refetch (bool): whether to refetch and process entries we've seen before
store_blanks (bool): whether we should store blank
:class:`models.SyndicatedPost`\s when we don't find a relationship
Return:
dict: maps syndicated_url to a list of new :class:`models.SyndicatedPost`\s
"""
# for now use whether the url is a valid webmention target
# as a proxy for whether it's worth searching it.
author_url, _, ok = util.get_webmention_target(author_url)
if not ok:
return {}
logger.debug(f'fetching author url {author_url}')
try:
author_mf2 = util.fetch_mf2(author_url)
except AssertionError:
raise # for unit tests
except BaseException:
# TODO limit allowed failures, cache the author's h-feed url
# or the # of times we've failed to fetch it
logger.info(f'Could not fetch author url {author_url}', exc_info=True)
return {}
if not author_mf2:
logger.debug('nothing found')
return {}
feeditems = _find_feed_items(author_mf2)
# try rel=feeds and rel=alternates
feed_urls = set()
candidates = (author_mf2['rels'].get('feed', []) +
[a.get('url') for a in author_mf2.get('alternates', [])
if a.get('type') == MF2_HTML_MIME_TYPE])
for feed_url in candidates:
# check that it's html, not too big, etc
feed_url, _, feed_ok = util.get_webmention_target(feed_url)
if feed_url == author_url:
logger.debug('author url is the feed url, ignoring')
elif not feed_ok:
logger.debug("skipping feed since it's not HTML or otherwise bad")
else:
feed_urls.add(feed_url)
for feed_url in feed_urls:
try:
logger.debug(f"fetching author's rel-feed {feed_url}")
feed_mf2 = util.fetch_mf2(feed_url)
if not feed_mf2:
logger.debug('nothing found')
continue
feeditems = _merge_hfeeds(feeditems, _find_feed_items(feed_mf2))
domain = util.domain_from_link(feed_url)
if source.updates is not None and domain not in source.domains:
domains = source.updates.setdefault('domains', source.domains)
if domain not in domains:
logger.info(f'rel-feed found new domain {domain}! adding to source')
domains.append(domain)
except AssertionError:
raise # reraise assertions for unit tests
except BaseException:
logger.info(f'Could not fetch h-feed url {feed_url}.', exc_info=True)
# sort by dt-updated/dt-published
def updated_or_published(item):
props = microformats2.first_props(item.get('properties'))
return props.get('updated') or props.get('published') or ''
feeditems.sort(key=updated_or_published, reverse=True)
permalink_to_entry = collections.OrderedDict()
for child in feeditems:
if 'h-entry' in child['type']:
permalinks = child['properties'].get('url', [])
if not permalinks:
logger.debug('ignoring h-entry with no u-url!')
for permalink in permalinks:
if isinstance(permalink, str):
permalink_to_entry[permalink] = child
else:
logger.warning(f'unexpected non-string "url" property: {permalink}')
max = (MAX_PERMALINK_FETCHES_BETA if source.is_beta_user()
else MAX_PERMALINK_FETCHES)
if len(permalink_to_entry) >= max:
logger.info(f'Hit cap of {max} permalinks. Stopping.')
break
# query all preexisting permalinks at once, instead of once per link
permalinks_list = list(permalink_to_entry.keys())
# fetch the maximum allowed entries (currently 30) at a time
preexisting_list = itertools.chain.from_iterable(
SyndicatedPost.query(
SyndicatedPost.original.IN(permalinks_list[i:i + MAX_ALLOWABLE_QUERIES]),
ancestor=source.key)
for i in range(0, len(permalinks_list), MAX_ALLOWABLE_QUERIES))
preexisting = {}
for r in preexisting_list:
preexisting.setdefault(r.original, []).append(r)
results = {}
for permalink, entry in permalink_to_entry.items():
logger.debug(f'processing permalink: {permalink}')
new_results = process_entry(
source, permalink, entry, refetch, preexisting.get(permalink, []),
store_blanks=store_blanks)
for key, value in new_results.items():
results.setdefault(key, []).extend(value)
if source.updates is not None and results:
# keep track of the last time we've seen rel=syndication urls for
# this author. this helps us decide whether to refetch periodically
# and look for updates.
# Source will be saved at the end of each round of polling
source.updates['last_syndication_url'] = util.now()
return results
def _merge_hfeeds(feed1, feed2):
r"""Merge items from two ``h-feeds`` into a composite feed.
Skips items in ``feed2`` that are already represented in ``feed1``\, based on
the ``url`` property.
Args:
feed1 (list of dict)
feed2 (list of dict)
Returns:
list of dict:
"""
seen = set()
for item in feed1:
for url in item.get('properties', {}).get('url', []):
if isinstance(url, str):
seen.add(url)
return feed1 + [item for item in feed2 if all(
(url not in seen) for url in item.get('properties', {}).get('url', []) if isinstance(url, str))]
def _find_feed_items(mf2):
"""Extract feed items from given microformats2 data.
If the top-level ``h-*`` item is an h-feed, return its children. Otherwise,
returns the top-level items.
Args:
mf2 (dict): parsed mf2 data
Returns:
list of dict: each one representing an mf2 ``h-*`` item
"""
feeditems = mf2['items']
hfeeds = mf2util.find_all_entries(mf2, ('h-feed',))
if hfeeds:
feeditems = list(itertools.chain.from_iterable(
hfeed.get('children', []) for hfeed in hfeeds))
else:
logger.debug('No h-feed found, fallback to top-level h-entrys.')
if len(feeditems) > MAX_FEED_ENTRIES:
logger.info(f'Feed has {len(feeditems)} entries! only processing the first {MAX_FEED_ENTRIES}.')
feeditems = feeditems[:MAX_FEED_ENTRIES]
return feeditems
[docs]
def process_entry(source, permalink, feed_entry, refetch, preexisting,
store_blanks=True):
r"""Fetch and process an h-entry and save a new :class:`models.SyndicatedPost`.
Args:
source (models.Source)
permalink (str): url of the unprocessed post
feed_entry (dict): the ``h-feed`` version of the ``h-entry``\, often contains
a partial version of the ``h-entry`` at the permalink
refetch (bool): whether to refetch and process entries we've seen before
preexisting (list): of previously discovered :class:`models.SyndicatedPost`\s
for this permalink
store_blanks (bool): whether we should store blank
:class:`models.SyndicatedPost`\s when we don't find a relationship
Returns:
dict: maps syndicated url to a list of new :class:`models.SyndicatedPost`\s
"""
# if the post has already been processed, do not add to the results
# since this method only returns *newly* discovered relationships.
if preexisting:
# if we're refetching and this one is blank, do not return.
# if there is a blank entry, it should be the one and only entry,
# but go ahead and check 'all' of them to be safe.
if not refetch:
return {}
synds = [s.syndication for s in preexisting if s.syndication]
if synds:
logger.debug(f'previously found relationship(s) for original {permalink}: {synds}')
# first try with the h-entry from the h-feed. if we find the syndication url
# we're looking for, we don't have to fetch the permalink
permalink, _, type_ok = util.get_webmention_target(permalink)
usynd = feed_entry.get('properties', {}).get('syndication', [])
usynd_urls = {url for url in usynd if isinstance(url, str)}
if usynd_urls:
logger.debug(f'u-syndication links on the h-feed h-entry: {usynd_urls}')
results = _process_syndication_urls(source, permalink, usynd_urls, preexisting)
success = True
if results:
source.updates['last_feed_syndication_url'] = util.now()
elif not source.last_feed_syndication_url or not feed_entry:
# fetch the full permalink page if we think it might have more details
mf2 = None
try:
if type_ok:
logger.debug(f'fetching post permalink {permalink}')
mf2 = util.fetch_mf2(permalink)
except AssertionError:
raise # for unit tests
except BaseException:
# TODO limit the number of allowed failures
logger.info(f'Could not fetch permalink {permalink}', exc_info=True)
success = False
if mf2:
syndication_urls = set()
relsynd = mf2['rels'].get('syndication', [])
if relsynd:
logger.debug(f'rel-syndication links: {relsynd}')
syndication_urls.update(url for url in relsynd
if isinstance(url, str))
# there should only be one h-entry on a permalink page, but
# we'll check all of them just in case.
for hentry in (item for item in mf2['items']
if 'h-entry' in item['type']):
usynd = hentry.get('properties', {}).get('syndication', [])
if usynd:
logger.debug(f'u-syndication links: {usynd}')
syndication_urls.update(url for url in usynd
if isinstance(url, str))
results = _process_syndication_urls(
source, permalink, syndication_urls, preexisting)
# detect and delete SyndicatedPosts that were removed from the site
if success:
result_syndposts = list(itertools.chain(*results.values()))
for syndpost in preexisting:
if syndpost.syndication and syndpost not in result_syndposts:
logger.info(f'deleting relationship that disappeared: {syndpost}')
syndpost.key.delete()
preexisting.remove(syndpost)
if not results:
logger.debug(f'no syndication links from {permalink} to current source {source.label()}.')
results = {}
if store_blanks and not preexisting:
# remember that this post doesn't have syndication links for this
# particular source
logger.debug(f'saving empty relationship so that {permalink} will not be searched again')
SyndicatedPost.insert_original_blank(source, permalink)
# only return results that are not in the preexisting list
new_results = {}
for syndurl, syndposts_for_url in results.items():
for syndpost in syndposts_for_url:
if syndpost not in preexisting:
new_results.setdefault(syndurl, []).append(syndpost)
if new_results:
logger.debug(f'discovered relationships {new_results}')
return new_results
def _process_syndication_urls(source, permalink, syndication_urls,
preexisting):
r"""Process a list of syndication URLs looking for one that matches the
current source. If one is found, stores a new :class:`models.SyndicatedPost`
in the db.
Args:
source (models.Source)
permalink (str): the current ``h-entry`` permalink
syndication_urls (sequence of str): the unfitered list of syndication urls
preexisting: list of models.SyndicatedPost: previously discovered
Returns:
dict: maps str syndication url to list of :class:`models.SyndicatedPost`\s
"""
results = {}
# save the results (or lack thereof) to the db, and put them in a
# map for immediate use
for url in syndication_urls:
# source-specific logic to standardize the URL
url = source.canonicalize_url(url)
if not url:
continue
# TODO: save future lookups by saving results for other sources too (note:
# query the appropriate source subclass by author.domains, rather than
# author.domain_urls)
#
# we may have already seen this relationship, save a DB lookup by
# finding it in the preexisting list
relationship = next((sp for sp in preexisting
if sp.syndication == url
and sp.original == permalink), None)
if not relationship:
logger.debug(f'saving discovered relationship {url} -> {permalink}')
relationship = SyndicatedPost.insert(source, syndication=url, original=permalink)
results.setdefault(url, []).append(relationship)
return results
def _get_author_urls(source):
max = models.MAX_AUTHOR_URLS
urls = source.get_author_urls()
if len(urls) > max:
logger.warning(f'user has over {max} URLs! only running PPD on {urls[:max]}. skipping {urls[max:]}.')
urls = urls[:max]
return urls