Source code for tasks

"""Task queue handlers."""
import datetime
import gc
import logging

from flask import g, request
from flask.views import View
from google.cloud import ndb
from google.cloud.ndb._datastore_types import _MAX_STRING_LENGTH
from granary import as1
from webutil import logs, webmention
from webutil.flask_util import error
from webutil.util import json_dumps, json_loads

import models, original_post_discovery, util
from flask_background import app
from models import Response
from util import ERROR_HTTP_RETURN_CODE
# need to import model class definitions since poll creates and saves entities.
import bluesky, facebook, flickr, github, instagram, mastodon, reddit, tumblr, twitter, wordpress_rest

logger = logging.getLogger(__name__)

# Used as a sentinel value in the webmention endpoint cache
NO_ENDPOINT = 'NONE'

WEBMENTION_SEND_TIMEOUT = datetime.timedelta(seconds=30)


[docs] def is_public(obj): """Checks both the object and its author/actor.""" return (as1.is_public(obj, unlisted=False) and as1.is_public(as1.get_object(obj, 'author'), unlisted=False) and as1.is_public(as1.get_object(obj, 'actor'), unlisted=False))
def is_quote_mention(activity, source): obj = activity.get('object') or activity for att in obj.get('attachments', []): if (att.get('objectType') in ('note', 'article') and att.get('author', {}).get('id') == source.user_tag_id()): return True
[docs] class Poll(View): r"""Task handler that fetches and processes new responses from a single source. Request parameters: * ``source_key``: string key of source entity * ``last_polled``: timestamp, ``YYYY-MM-DD-HH-MM-SS`` Inserts a propagate task for each response that hasn't been seen before. Steps: 1. Fetch activities: posts by the user, links to the user's domain(s). 2. Extract responses, store their activities. 3. Filter out responses we've already seen, using :class:`models.Response`\s in the datastore. 4. Store new responses and enqueue propagate tasks. 5. Possibly refetch updated syndication urls. 1-4 are in :meth:`backfeed`; 5 is in :meth:`poll`. """ RESTART_EXISTING_TASKS = False # overridden in Discover def _last_poll_url(self, source): return util.host_url(logs.url(source.last_poll_attempt, source.key)) def dispatch_request(self): logger.debug(f'Params: {list(request.values.items())}') key = request.values['source_key'] source = g.source = ndb.Key(urlsafe=key).get() if not source or source.status == 'disabled' or 'listen' not in source.features: logger.error('Source not found or disabled. Dropping task.') return '' logger.info(f'Source: {source.label()} {source.key_id()}, {source.bridgy_url()}') if source.AUTO_POLL: last_polled = request.values['last_polled'] if last_polled != source.last_polled.strftime(util.POLL_TASK_DATETIME_FORMAT): logger.warning('duplicate poll task! deferring to the other task.') return '' logger.info(f'Last poll: {self._last_poll_url(source)}') # mark this source as polling source.updates = { 'poll_status': 'polling', 'last_poll_attempt': util.now(), 'rate_limited': False, } source = models.Source.put_updates(source) source.updates = {} try: self.poll(source) except Exception as e: source.updates['poll_status'] = 'error' code, _ = util.interpret_http_exception(e) if code in source.DISABLE_HTTP_CODES or isinstance(e, models.DisableSource): # the user deauthorized the bridgy app, so disable this source. # let the task complete successfully so that it's not retried. logger.warning(f'Disabling source due to: {e}', exc_info=True) source.updates.update({ 'status': 'disabled', 'poll_status': 'ok', }) elif code in source.RATE_LIMIT_HTTP_CODES: logger.info(f'Rate limited. Marking as error and finishing. {e}') source.updates['rate_limited'] = True else: raise finally: source = models.Source.put_updates(source) if source.AUTO_POLL: util.add_poll_task(source) # feeble attempt to avoid hitting the instance memory limit source = None gc.collect() return 'OK'
[docs] def poll(self, source): """Actually runs the poll. Stores property names and values to update in ``source.updates``. """ if source.last_activities_etag or source.last_activity_id: logger.debug(f'Using ETag {source.last_activities_etag}, last activity id {source.last_activity_id}') # # Step 1: fetch activities: # * posts by the user # * search all posts for the user's domain URLs to find links # cache = util.CacheDict() if source.last_activities_cache_json: cache.update(json_loads(source.last_activities_cache_json)) # search for links first so that the user's activities and responses # override them if they overlap links = source.search_for_links() # this user's own activities (and user mentions) resp = source.get_activities_response( fetch_replies=True, fetch_likes=True, fetch_shares=True, fetch_mentions=True, count=30, etag=source.last_activities_etag, min_id=source.last_activity_id, cache=cache) etag = resp.get('etag') # used later user_activities = resp.get('items', []) # these map ids to AS objects. # backfeed all links as responses, but only include the user's own links as # activities, since their responses also get backfeed. responses = {a['id']: a for a in links} user_id = source.user_tag_id() links_by_user = [a for a in links if a.get('object', {}).get('author', {}).get('id') == user_id] activities = {a['id']: a for a in links_by_user + user_activities} # extract silo activity ids, update last_activity_id silo_activity_ids = set() last_activity_id = source.last_activity_id for id, activity in activities.items(): # maybe replace stored last activity id parsed = util.parse_tag_uri(id) if parsed: id = parsed[1] silo_activity_ids.add(id) try: # try numeric comparison first greater = int(id) > int(last_activity_id) except (TypeError, ValueError): greater = str(id) > str(last_activity_id) if greater: last_activity_id = id if last_activity_id and last_activity_id != source.last_activity_id: source.updates['last_activity_id'] = last_activity_id # trim cache to just the returned activity ids, so that it doesn't grow # without bound. (WARNING: depends on get_activities_response()'s cache key # format, e.g. 'PREFIX ACTIVITY_ID'!) source.updates['last_activities_cache_json'] = json_dumps( {k: v for k, v in cache.items() if k.split()[-1] in silo_activity_ids}) self.backfeed(source, responses, activities=activities) source.updates.update({'last_polled': source.last_poll_attempt, 'poll_status': 'ok'}) if etag and etag != source.last_activities_etag: source.updates['last_activities_etag'] = etag # # Possibly refetch updated syndication urls. # # if the author has added syndication urls since the first time # original_post_discovery ran, we'll miss them. this cleanup task will # periodically check for updated urls. only kicks in if the author has # *ever* published a rel=syndication url if source.should_refetch(): logger.info(f'refetching h-feed for source {source.label()}') relationships = original_post_discovery.refetch(source) now = util.now() source.updates['last_hfeed_refetch'] = now if relationships: logger.info(f'refetch h-feed found new rel=syndication relationships: {relationships}') try: self.repropagate_old_responses(source, relationships) except BaseException as e: if ('BadRequestError' in str(e.__class__) or 'Timeout' in str(e.__class__) or util.is_connection_failure(e)): logger.info('Timeout while repropagating responses.', exc_info=True) else: raise else: logger.info( 'skipping refetch h-feed. last-syndication-url %s, last-refetch %s', source.last_syndication_url, source.last_hfeed_refetch)
[docs] def backfeed(self, source, responses=None, activities=None): """Processes responses and activities and generates propagate tasks. Stores property names and values to update in source.updates. Args: source (models.Source) responses (dict): maps AS response id to AS object activities (dict): maps AS activity id to AS object """ if responses is None: responses = {} if activities is None: activities = {} # Cache to make sure we only fetch the author's h-feed(s) the # first time we see it fetched_hfeeds = set() # narrow down to just public activities public = {} private = {} for id, activity in activities.items(): (public if is_public(activity) else private)[id] = activity logger.info(f'Found {len(public)} public activities: {public.keys()}') logger.info(f'Found {len(private)} private activities: {private.keys()}') last_public_post = (source.last_public_post or util.EPOCH).isoformat() public_published = util.trim_nulls( [a.get('object', {}).get('published') for a in public.values()]) if public_published: max_published = max(public_published) if max_published > last_public_post: last_public_post = max_published source.updates['last_public_post'] = \ util.as_utc(datetime.datetime.fromisoformat(max_published)) source.updates['recent_private_posts'] = \ len([a for a in private.values() if a.get('object', {}).get('published', util.EPOCH_ISO) > last_public_post]) # # Step 2: extract responses, store their activities in response['activities'] # # WARNING: this creates circular references in link posts found by search # queries in step 1, since they are their own activity. We use # prune_activity() and prune_response() in step 4 to remove these before # serializing to JSON. # for id, activity in public.items(): obj = activity.get('object') or activity # handle user mentions user_id = source.user_tag_id() if obj.get('author', {}).get('id') != user_id and activity.get('verb') != 'share': for tag in obj.get('tags', []): urls = tag.get('urls') if tag.get('objectType') == 'person' and tag.get('id') == user_id and urls: activity['originals'], activity['mentions'] = \ original_post_discovery.discover( source, activity, fetch_hfeed=True, include_redirect_sources=False, already_fetched_hfeeds=fetched_hfeeds) activity['mentions'].update(u.get('value') for u in urls) _merge_activity_into_response(activity, responses) break # handle quote mentions if is_quote_mention(activity, source): # now that we've confirmed that one exists, OPD will dig # into the actual attachments if 'originals' not in activity or 'mentions' not in activity: activity['originals'], activity['mentions'] = \ original_post_discovery.discover( source, activity, fetch_hfeed=True, include_redirect_sources=False, already_fetched_hfeeds=fetched_hfeeds) _merge_activity_into_response(activity, responses) # extract replies, likes, reactions, reposts, and rsvps replies = obj.get('replies', {}).get('items', []) tags = obj.get('tags', []) likes = [t for t in tags if Response.get_type(t) == 'like'] reactions = [t for t in tags if Response.get_type(t) == 'react'] reposts = [t for t in tags if Response.get_type(t) == 'repost'] rsvps = as1.get_rsvps_from_event(obj) # coalesce responses. drop if missing id or author is blocked, non-public, # or opted out for resp in replies + likes + reactions + reposts + rsvps: id = resp.get('id') if not id: logger.error(f'Skipping response without id: {json_dumps(resp, indent=2)}') continue owner = as1.get_object(resp, 'actor') or as1.get_object(resp, 'author') if source.is_blocked(resp) or util.is_opt_out(owner): logger.info(f'Skipping blocked/opt out user: {json_dumps(owner, indent=2)}') continue elif not is_public(resp): logger.info(f'Skipping non-public response {id} or author') continue resp.setdefault('activities', []).append(activity) # when we find two responses with the same id, the earlier one may have # come from a link post or user mention, and this one is probably better # since it probably came from the user's activity, so prefer this one. # background: https://github.com/snarfed/bridgy/issues/533 _merge_activity_into_response(resp, responses) # # Step 3: filter out responses we've already seen # # seen responses (JSON objects) for each source are stored in its entity. unchanged_responses = [] if source.seen_responses_cache_json: for seen in json_loads(source.seen_responses_cache_json): id = seen['id'] resp = responses.get(id) if (resp and not as1.activity_changed(seen, resp, log=True) and not resp.get('activities_changed')): unchanged_responses.append(seen) del responses[id] # # Step 4: store new responses and enqueue propagate tasks # pruned_responses = [] source.blocked_ids = None for id, resp in responses.items(): resp_type = Response.get_type(resp) activities = resp.pop('activities', []) if not activities and (resp_type in ('post', 'comment') or is_quote_mention(resp, source)): activities = [resp] too_long = set() urls_to_activity = {} for i, activity in enumerate(activities): # we'll usually have multiple responses for the same activity, and the # objects in resp['activities'] are shared, so cache each activity's # discovered webmention targets inside its object. if 'originals' not in activity or 'mentions' not in activity: activity['originals'], activity['mentions'] = \ original_post_discovery.discover( source, activity, fetch_hfeed=True, include_redirect_sources=False, already_fetched_hfeeds=fetched_hfeeds) targets = original_post_discovery.targets_for_response( resp, originals=activity['originals'], mentions=activity['mentions']) if targets: logger.info(f"{activity.get('url')} has {len(targets)} webmention target(s): {' '.join(targets)}") # new response to propagate! load block list if we haven't already if source.blocked_ids is None: source.load_blocklist() for t in targets: if len(t) <= _MAX_STRING_LENGTH: urls_to_activity[t] = i else: logger.info(f'Giving up on target URL over {_MAX_STRING_LENGTH} chars! {t}') too_long.add(t[:_MAX_STRING_LENGTH - 4] + '...') # store/update response entity. the prune_*() calls are important to # remove circular references in link responses, which are their own # activities. details in the step 2 comment above. pruned_response = util.prune_response(resp) pruned_responses.append(pruned_response) resp_entity = Response( id=id, source=source.key, activities_json=[json_dumps(util.prune_activity(a, source)) for a in activities], response_json=json_dumps(pruned_response), type=resp_type, unsent=list(urls_to_activity.keys()), failed=list(too_long), original_posts=resp.get('originals', [])) if urls_to_activity: resp_entity.urls_to_activity=json_dumps(urls_to_activity) resp_entity.get_or_save(source, restart=self.RESTART_EXISTING_TASKS) # update cache if pruned_responses: source.updates['seen_responses_cache_json'] = json_dumps( pruned_responses + unchanged_responses)
[docs] def repropagate_old_responses(self, source, relationships): """Find old Responses that match a new SyndicatedPost and repropagate them. We look through as many responses as we can until the datastore query expires. Args: source (models.Source): relationships: refetch result """ for response in (Response.query(Response.source == source.key) .order(-Response.updated)): new_orig_urls = set() for activity_json in response.activities_json: activity = json_loads(activity_json) activity_url = activity.get('url') or activity.get('object', {}).get('url') if not activity_url: logger.warning(f'activity has no url {activity_json}') continue activity_url = source.canonicalize_url(activity_url, activity=activity) if not activity_url: continue # look for activity url in the newly discovered list of relationships for relationship in relationships.get(activity_url, []): # won't re-propagate if the discovered link is already among # these well-known upstream duplicates if (relationship.original in response.sent or relationship.original in response.original_posts): logger.info( '%s found a new rel=syndication link %s -> %s, but the ' 'relationship had already been discovered by another method', response.label(), relationship.original, relationship.syndication) else: logger.info( '%s found a new rel=syndication link %s -> %s, and ' 'will be repropagated with a new target!', response.label(), relationship.original, relationship.syndication) new_orig_urls.add(relationship.original) if new_orig_urls: # re-open a previously 'complete' propagate task response.status = 'new' response.unsent.extend(list(new_orig_urls)) response.put() response.add_task()
def _merge_activity_into_response(activity, responses): """Merges an activity into the responses dict, preserving existing activities. If the activity's id is already in responses (eg from a previous iteration), merges their 'activities' lists together. Otherwise just adds it. Args: activity (dict): ActivityStreams activity to add/merge responses (dict): maps AS response id to AS object, modified in place Returns: dict: the merged/added response object """ id = activity['id'] if existing_resp := responses.get(id): if as1.activity_changed(activity, existing_resp, log=True): logger.warning(f'Got two different versions of same response!\n{existing_resp}\n{activity}') existing_activities = existing_resp.get('activities', []) new_activities = activity.setdefault('activities', []) logger.info(f'Merging response activities {id}: existing {[a.get("id") for a in existing_activities]}, new {[a.get("id") for a in new_activities]}') for existing_activity in existing_activities: if util.add(new_activities, existing_activity): activity['activities_changed'] = True logger.info(f'Added activity {existing_activity.get("id")} to merged response') responses[id] = activity
[docs] class Discover(Poll): """Task handler that fetches and processes new responses to a single post. Request parameters: * source_key (string): key of source entity * post_id (string): silo post id(s) Inserts a propagate task for each response that hasn't been seen before. """ RESTART_EXISTING_TASKS = True def dispatch_request(self): logger.debug(f'Params: {list(request.values.items())}') g.TRANSIENT_ERROR_HTTP_CODES = ('400', '404') type = request.values.get('type') if type: assert type in ('event',) source = g.source = util.load_source() if not source or source.status == 'disabled' or 'listen' not in source.features: logger.error('Source not found or disabled. Dropping task.') return '' logger.info(f'Source: {source.label()} {source.key_id()}, {source.bridgy_url()}') post_id = request.values['post_id'] source.updates = {} if type == 'event': activities = [source.gr_source.get_event(post_id)] else: activities = source.get_activities( fetch_replies=True, fetch_likes=True, fetch_shares=True, activity_id=post_id, user_id=source.key_id()) if not activities or not activities[0]: logger.info(f'Post {post_id} not found.') return '' assert len(activities) == 1, activities activity = activities[0] activities = {activity['id']: activity} # no more transactional tasks. https://github.com/googleapis/python-tasks/issues/26 # they're still supported in the new "bundled services" thing, but that seems like a dead end. # https://groups.google.com/g/google-appengine/c/22BKInlWty0/m/05ObNEdsAgAJ self.backfeed(source, responses=activities, activities=activities) obj = activity.get('object') or activity in_reply_to = util.get_first(obj, 'inReplyTo') if in_reply_to: parsed = util.parse_tag_uri(in_reply_to.get('id', '')) if parsed: util.add_discover_task(source, parsed[1]) return 'OK'
[docs] class SendWebmentions(View): """Abstract base task handler that can send webmentions. Attributes: * entity (models.Webmentions): subclass instance (set in :meth:`lease`) * source (models.Source): entity (set in :meth:`send_webmentions`) """ # request deadline (10m) plus some padding LEASE_LENGTH = datetime.timedelta(minutes=12)
[docs] def source_url(self, target_url): """Return the source URL to use for a given target URL. Subclasses must implement. Args: target_url (str) Returns: str """ raise NotImplementedError()
[docs] def send_webmentions(self): """Tries to send each unsent webmention in self.entity. Uses :meth:`source_url()` to determine the source parameter for each webmention. :meth:`lease()` *must* be called before this! """ logger.info(f'Starting {self.entity.label()}') try: self.do_send_webmentions() except: logger.info('Propagate task failed', exc_info=True) self.release('error') raise
def do_send_webmentions(self): urls = self.entity.unsent + self.entity.error + self.entity.failed unsent = set() self.entity.error = [] self.entity.failed = [] for orig_url in urls: # recheck the url here since the checks may have failed during the poll # or streaming add. url, domain, ok = util.get_webmention_target(orig_url) if ok: if len(url) <= _MAX_STRING_LENGTH: unsent.add(url) else: logger.info(f'Giving up on target URL over {_MAX_STRING_LENGTH} chars! {url}') self.entity.failed.append(orig_url) self.entity.unsent = sorted(unsent) while self.entity.unsent: resp = None target = self.entity.unsent.pop(0) try: source_url = self.source_url(target) logger.info(f'Webmention from {source_url} to {target}') # see if we've cached webmention discovery for this domain. the cache # value is a string URL endpoint if discovery succeeded, NO_ENDPOINT if # no endpoint was found. cache_key = util.webmention_endpoint_cache_key(target) endpoint = util.webmention_endpoint_cache.get(cache_key) if endpoint: logger.info(f'Webmention discovery: using cached endpoint {cache_key}: {endpoint}') # send! and handle response or error headers = util.request_headers(source=g.source) if not endpoint: endpoint, resp = webmention.discover(target, follow_meta_refresh=True, headers=headers) with util.webmention_endpoint_cache_lock: util.webmention_endpoint_cache[cache_key] = endpoint or NO_ENDPOINT if endpoint and endpoint != NO_ENDPOINT: logger.info('Sending...') resp = webmention.send(endpoint, source_url, target, headers=headers, timeout=WEBMENTION_SEND_TIMEOUT.total_seconds()) logger.info(f'Sent! {resp}') self.record_source_webmention(endpoint, target) self.entity.sent.append(target) else: logger.info('Giving up this target.') self.entity.skipped.append(target) except ValueError: logger.info('Bad URL; giving up this target.') self.entity.skipped.append(target) except BaseException as e: logger.info('', exc_info=True) # Give up on 4XX and DNS errors; we don't expect retries to succeed. code, _ = util.interpret_http_exception(e) if ((code and code.startswith('4') and code != '429') or 'DNS lookup failed' in str(e)): logger.info('Giving up this target.') self.entity.failed.append(target) else: self.fail(f'Error sending to endpoint: {resp}') self.entity.error.append(target) if target in self.entity.unsent: self.entity.unsent.remove(target) if self.entity.error: logger.info('Some targets failed') self.release('error') else: self.complete()
[docs] @ndb.transactional() def lease(self, key): """Attempts to acquire and lease the :class:`models.Webmentions` entity. Also loads and sets ``g.source``, and returns False if the source doesn't exist or is disabled. Args: key (ndb.Key): Returns: bool: True on success, False or None otherwise """ self.entity = key.get() if self.entity is None: return self.fail('no entity!') elif self.entity.status == 'complete': # let this task return 200 and finish logger.warning('duplicate task already propagated this') return elif (self.entity.status == 'processing' and util.now() < self.entity.leased_until): return self.fail('duplicate task is currently processing!') g.source = self.entity.source.get() if not g.source or g.source.status == 'disabled': logger.error('Source not found or disabled. Dropping task.') return False logger.info(f'Source: {g.source.label()} {g.source.key_id()}, {g.source.bridgy_url()}') assert self.entity.status in ('new', 'processing', 'error'), self.entity.status self.entity.status = 'processing' self.entity.leased_until = util.now() + self.LEASE_LENGTH self.entity.put() return True
[docs] @ndb.transactional() def complete(self): """Attempts to mark the :class:`models.Webmentions` entity completed. Returns True on success, False otherwise. """ existing = self.entity.key.get() if existing is None: self.fail('entity disappeared!') elif existing.status == 'complete': # let this task return 200 and finish logger.warning('another task stole and finished this. did my lease expire?') elif self.entity.status == 'complete': # let this task return 200 and finish logger.error('i already completed this task myself somehow?! ' 'https://github.com/snarfed/bridgy/issues/610') elif existing.status == 'new': self.fail('went backward from processing to new!') else: assert existing.status == 'processing', existing.status assert self.entity.status == 'processing', self.entity.status self.entity.status = 'complete' self.entity.put() return True return False
[docs] @ndb.transactional() def release(self, new_status): """Attempts to unlease the :class:`models.Webmentions` entity. Args: new_status (str): """ existing = self.entity.key.get() # TODO: send_webmentions() edits self.entity.unsent etc, so if it fails and hits here, those values may be lost mid flight! if existing and existing.status == 'processing': self.entity.status = new_status self.entity.leased_until = None self.entity.put()
[docs] def fail(self, message): """Marks the request failed and logs an error message.""" logger.warning(message) g.failed = True
[docs] @ndb.transactional() def record_source_webmention(self, endpoint, target): """Sets this source's last_webmention_sent and maybe webmention_endpoint. Args: endpoint (str): URL target (str): URL """ g.source = g.source.key.get() logger.info('Setting last_webmention_sent') g.source.last_webmention_sent = util.now() if (endpoint != g.source.webmention_endpoint and util.domain_from_link(target) in g.source.domains): logger.info(f'Also setting webmention_endpoint to {endpoint} (discovered in {target}; was {g.source.webmention_endpoint})') g.source.webmention_endpoint = endpoint g.source.put()
[docs] class PropagateResponse(SendWebmentions): """Task handler that sends webmentions for a :class:`models.Response`. Attributes: * activities: parsed :attr:`models.Response.activities_json` list Request parameters: * response_key (str): key of :class:`models.Response` entity """ def dispatch_request(self): logger.debug(f'Params: {list(request.values.items())}') if not self.lease(ndb.Key(urlsafe=request.values['response_key'])): return ('', ERROR_HTTP_RETURN_CODE) if getattr(g, 'failed', None) else 'OK' source = g.source poll_estimate = self.entity.created - datetime.timedelta(seconds=61) poll_url = util.host_url(logs.url(poll_estimate, source.key)) logger.info(f'Created by this poll: {poll_url}') self.activities = [json_loads(a) for a in self.entity.activities_json] self.response_obj = json_loads(self.entity.response_json) if (not is_public(self.response_obj) or not all(is_public(a) for a in self.activities)): logger.info('Response or author or activity is non-public. Dropping.') self.complete() return '' self.send_webmentions() return ('', ERROR_HTTP_RETURN_CODE) if getattr(g, 'failed', None) else 'OK' def source_url(self, target_url): # determine which activity to use. default to response. activity = self.response_obj if self.activities: activity = self.activities[0] if self.entity.urls_to_activity: urls_to_activity = json_loads(self.entity.urls_to_activity) if urls_to_activity: index = urls_to_activity.get(target_url) if index: activity = self.activities[index] # generate source URL id = activity['id'] parsed = util.parse_tag_uri(id) post_id = parsed[1] if parsed else id parts = [ self.entity.type, g.source.SHORT_NAME, g.source.key.string_id(), g.source.format_for_source_url(post_id) ] if self.entity.type != 'post': # parse and add response id. (we know Response key ids are always tag URIs) _, response_id = util.parse_tag_uri(self.entity.key.string_id()) reaction_id = response_id if self.entity.type in ('like', 'react', 'repost', 'rsvp'): response_id = response_id.split('_')[-1] # extract responder user id parts.append(g.source.format_for_source_url(response_id)) if self.entity.type == 'react': parts.append(g.source.format_for_source_url(reaction_id)) return util.host_url('/'.join(parts))
[docs] class PropagateBlogPost(SendWebmentions): """Task handler that sends webmentions for a :class:`models.BlogPost`. Request parameters: * key (str): key of :class:`models.BlogPost` entity """ def dispatch_request(self): logger.debug(f'Params: {list(request.values.items())}') if not self.lease(ndb.Key(urlsafe=request.values['key'])): return ('', ERROR_HTTP_RETURN_CODE) if getattr(g, 'failed', None) else 'OK' to_send = set() for url in self.entity.unsent: url, domain, ok = util.get_webmention_target(url) # skip "self" links to this blog's domain if ok and domain not in g.source.domains: to_send.add(url) self.entity.unsent = list(to_send) self.send_webmentions() return ('', ERROR_HTTP_RETURN_CODE) if getattr(g, 'failed', None) else 'OK' def source_url(self, target_url): return self.entity.key.id()
app.add_url_rule('/_ah/queue/poll', view_func=Poll.as_view('poll'), methods=['POST']) app.add_url_rule('/_ah/queue/poll-now', view_func=Poll.as_view('poll-now'), methods=['POST']) app.add_url_rule('/_ah/queue/discover', view_func=Discover.as_view('discover'), methods=['POST']) app.add_url_rule('/_ah/queue/propagate', view_func=PropagateResponse.as_view('propagate'), methods=['POST']) app.add_url_rule('/_ah/queue/propagate-blogpost', view_func=PropagateBlogPost.as_view('propagate_blogpost'), methods=['POST'])