Source code for firexkit.revoke

from celery import current_app
from datetime import timedelta, datetime
from celery.utils.log import get_task_logger
from firexkit.inspect import get_revoked

logger = get_task_logger(__name__)


[docs] class RevokedRequests(object): """ Need to inspect the app for the revoked requests, because AsyncResult.state of a task that hasn't been de-queued and executed by a worker but was revoked is PENDING (i.e., the REVOKED state is only updated upon executing a task). This phenomenon makes the wait_for_results wait on such "revoked" tasks, and therefore required us to implement this work-around. """ _instance = None
[docs] @classmethod def instance(cls, existing_instance=None): if existing_instance is not None: cls._instance = existing_instance if cls._instance is None: cls._instance = RevokedRequests() return cls._instance
def __init__(self, timer_expiry_secs=60, skip_first_cycle=True): self.timer_expiry = timedelta(seconds=timer_expiry_secs) self.revoked_list = [] self.last_updated = datetime.utcnow() if skip_first_cycle else None
[docs] @classmethod def get_revoked_list_from_app(cls): revoked_list = list() v = get_revoked(retry_if_None_returned=False, timeout=60, destination=(f'{current_app.conf.primary_worker_name}@{current_app.conf.mc}', )) if not v: return revoked_list else: v = v.values() if v: for l in v: revoked_list += l return revoked_list
[docs] def update(self, verbose=False): self.revoked_list = self.get_revoked_list_from_app() self.last_updated = datetime.utcnow() if verbose: logger.debug('RevokedRequests list updated at %s to %r' % (self.last_updated, self.revoked_list))
def _task_in_revoked_list(self, result_id): if self.last_updated is None: self.update() return result_id in self.revoked_list
[docs] def is_revoked(self, result_id, timer_expiry_secs=None): if self._task_in_revoked_list(result_id): return True else: # Updating the revoked_list is an expensive operation, so only do it periodically timer_expiry = self.timer_expiry if timer_expiry_secs is None else \ timedelta(seconds=timer_expiry_secs) time_lapsed = datetime.utcnow()-self.last_updated if time_lapsed > timer_expiry: self.update() return self._task_in_revoked_list(result_id) else: return False
[docs] def get_chain_head(parent, child): if child == parent or parent is None or child is None: return child one_up = child.parent if one_up == parent or one_up is None: return child else: return get_chain_head(parent=parent, child=one_up)
[docs] def revoke_nodes_up_to_parent(starting_node, parent): from firexkit.result import get_result_logging_name node = starting_node parent_name = get_result_logging_name(parent) while node != parent: one_up = node.parent logger.info('Revoking child %r of parent %r' % (get_result_logging_name(node), parent_name)) node.revoke(terminate=True) node = one_up