Skip to content

Tasks

Task

Bases: ABC

Abstract base class for VDS queued tasks, for example importing downloading data.

Source code in ckanext/versioned_datastore/lib/tasks.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
class Task(abc.ABC):
    """
    Abstract base class for VDS queued tasks, for example importing downloading data.
    """

    def __init__(
        self,
        queue_name: str,
        title: str,
        timeout: int = 3600,
    ):
        """
        :param queue_name: the name of the queue this task should run on
        :param title: the title of the task
        :param timeout: how long RQ should wait for this task to complete, in seconds,
                        before timing it out (defaults to 3600 which is 1 hour)
        """
        self.queue_name = queue_name
        self.title = title
        self.timeout = timeout
        self.log = logging.getLogger(__name__)

    @abc.abstractmethod
    def run(self, tmpdir: Path):
        """
        Method stub for subclasses to override. This is automatically called by start
        when a task begins after some setup is completed.

        :param tmpdir: a temporary directory which can be used by the task and will be
            cleaned up after this run method completes
        """
        ...

    def post_run(self):
        """
        Runs immediately after self.run().
        """
        try:
            clear_cache_region('versioned_datastore', utils, cache_name='vds')
        except CacheClearError as e:
            self.log.error(e)

    def start(self):
        """
        Starts the task, performs some setup and then calls self.run with a temporary
        directory.
        """
        with tempfile.TemporaryDirectory() as tmp_dir_name:
            self.log.info(f'Starting {self.title}')
            self.run(Path(tmp_dir_name))
        self.post_run()
        self.log.info(f'Finished {self.title}')

    def queue(
        self,
        depends_on: Optional[Union[Job, List[Job]]] = None,
        timeout: Optional[int] = None,
    ) -> Job:
        """
        Queues this task on the appropriate RQ queue.

        :param depends_on: a job, or list of jobs, which this task depends on. If given,
            this task will only start when the other jobs finish successfully. Check the
            RQ docs for more information.
        :param timeout: a timeout in seconds for this task. If provided this overrides
            the base timeout defined in the task, otherwise the default task's timeout
            is used.
        :returns: returns the result of calling CKAN's enqueue_job which will provide
            details about the queued job for this task
        """
        rq_kwargs = {'timeout': timeout or self.timeout}
        if depends_on:
            rq_kwargs['depends_on'] = depends_on

        return toolkit.enqueue_job(
            self.start,
            queue=self.queue_name,
            title=self.title,
            rq_kwargs=rq_kwargs,
        )

__init__(queue_name, title, timeout=3600)

Parameters:

Name Type Description Default
queue_name str

the name of the queue this task should run on

required
title str

the title of the task

required
timeout int

how long RQ should wait for this task to complete, in seconds, before timing it out (defaults to 3600 which is 1 hour)

3600
Source code in ckanext/versioned_datastore/lib/tasks.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def __init__(
    self,
    queue_name: str,
    title: str,
    timeout: int = 3600,
):
    """
    :param queue_name: the name of the queue this task should run on
    :param title: the title of the task
    :param timeout: how long RQ should wait for this task to complete, in seconds,
                    before timing it out (defaults to 3600 which is 1 hour)
    """
    self.queue_name = queue_name
    self.title = title
    self.timeout = timeout
    self.log = logging.getLogger(__name__)

post_run()

Runs immediately after self.run().

Source code in ckanext/versioned_datastore/lib/tasks.py
48
49
50
51
52
53
54
55
def post_run(self):
    """
    Runs immediately after self.run().
    """
    try:
        clear_cache_region('versioned_datastore', utils, cache_name='vds')
    except CacheClearError as e:
        self.log.error(e)

queue(depends_on=None, timeout=None)

Queues this task on the appropriate RQ queue.

Parameters:

Name Type Description Default
depends_on Optional[Union[Job, List[Job]]]

a job, or list of jobs, which this task depends on. If given, this task will only start when the other jobs finish successfully. Check the RQ docs for more information.

None
timeout Optional[int]

a timeout in seconds for this task. If provided this overrides the base timeout defined in the task, otherwise the default task's timeout is used.

None

Returns:

Type Description
Job

returns the result of calling CKAN's enqueue_job which will provide details about the queued job for this task

Source code in ckanext/versioned_datastore/lib/tasks.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def queue(
    self,
    depends_on: Optional[Union[Job, List[Job]]] = None,
    timeout: Optional[int] = None,
) -> Job:
    """
    Queues this task on the appropriate RQ queue.

    :param depends_on: a job, or list of jobs, which this task depends on. If given,
        this task will only start when the other jobs finish successfully. Check the
        RQ docs for more information.
    :param timeout: a timeout in seconds for this task. If provided this overrides
        the base timeout defined in the task, otherwise the default task's timeout
        is used.
    :returns: returns the result of calling CKAN's enqueue_job which will provide
        details about the queued job for this task
    """
    rq_kwargs = {'timeout': timeout or self.timeout}
    if depends_on:
        rq_kwargs['depends_on'] = depends_on

    return toolkit.enqueue_job(
        self.start,
        queue=self.queue_name,
        title=self.title,
        rq_kwargs=rq_kwargs,
    )

run(tmpdir) abstractmethod

Method stub for subclasses to override. This is automatically called by start when a task begins after some setup is completed.

Parameters:

Name Type Description Default
tmpdir Path

a temporary directory which can be used by the task and will be cleaned up after this run method completes

required
Source code in ckanext/versioned_datastore/lib/tasks.py
37
38
39
40
41
42
43
44
45
46
@abc.abstractmethod
def run(self, tmpdir: Path):
    """
    Method stub for subclasses to override. This is automatically called by start
    when a task begins after some setup is completed.

    :param tmpdir: a temporary directory which can be used by the task and will be
        cleaned up after this run method completes
    """
    ...

start()

Starts the task, performs some setup and then calls self.run with a temporary directory.

Source code in ckanext/versioned_datastore/lib/tasks.py
57
58
59
60
61
62
63
64
65
66
def start(self):
    """
    Starts the task, performs some setup and then calls self.run with a temporary
    directory.
    """
    with tempfile.TemporaryDirectory() as tmp_dir_name:
        self.log.info(f'Starting {self.title}')
        self.run(Path(tmp_dir_name))
    self.post_run()
    self.log.info(f'Finished {self.title}')

get_queue_length(queue_name)

This will only get the pending jobs in a queue, not any jobs that are currently processing.

Parameters:

Name Type Description Default
queue_name

the name of the queue to check, e.g. 'download'

required

Returns:

Type Description

length of queue as int

Source code in ckanext/versioned_datastore/lib/tasks.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
@cached(cache=TTLCache(maxsize=10, ttl=300))
def get_queue_length(queue_name):
    """
    This will only get the pending jobs in a queue, not any jobs that are currently
    processing.

    :param queue_name: the name of the queue to check, e.g. 'download'
    :returns: length of queue as int
    """
    queued_jobs = toolkit.get_action('job_list')(
        {'ignore_auth': True}, {'queues': [queue_name]}
    )
    return len(queued_jobs)