Skip to content

Tasks

IngestResourceTask

Bases: Task

Class representing a task to ingest a resource from a file/URL or from a given list of record dicts.

Source code in ckanext/versioned_datastore/lib/importing/tasks.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
class IngestResourceTask(Task):
    """
    Class representing a task to ingest a resource from a file/URL or from a given list
    of record dicts.
    """

    def __init__(
        self,
        resource: dict,
        replace: bool,
        api_key: str,
        records: Optional[List[dict]] = None,
        reingest: bool = False,
    ):
        """
        :param resource: the resource dict
        :param replace: whether to replace all existing records with the records that
                        will be ingested or not
        :param api_key: the user's API key to use for file access (this is needed for
                        private datasets)
        :param records: optional list of dicts to ingest instead of the resource's
                        file/URL
        :param reingest: reingest the dataset even if the new file hash matches the
                      previously ingested file hash
        """
        self.resource = resource
        self.replace = replace
        self.api_key = api_key
        self.records = records
        self.reingest = reingest

        # create a meaningful title
        if records is not None:
            data_info = f'{len(records)} records'
        else:
            data_info = 'data from file/url'
        title = f'Ingest for {self.resource_id} of {data_info} [replace: {replace}] [reingest: {reingest}]'
        super().__init__('importing', title)

    @staticmethod
    def result_to_dict(result: IngestResult) -> dict:
        """
        Simple helper to convert an IngestResult object to a dict.

        :param result: the IngestResult object
        :returns: a dict
        """
        return {
            'inserted': result.inserted,
            'deleted': result.deleted,
            'updated': result.updated,
        }

    @property
    def resource_id(self) -> str:
        """
        :returns: the ID of the resource this ingest is operating on
        """
        return self.resource['id']

    def run(self, tmpdir: Path):
        """
        Performs the ingestion.

        :param tmpdir: a dir to use for temporary storage
        """
        # do the prep stage, this involves downloading the data
        with ImportStats.track(self.resource_id, PREP) as stats:
            if self.records is None:
                source = tmpdir / 'source'
                file_hash = download_resource_data(self.resource, source, self.api_key)
                last_hash = get_last_file_hash(self.resource_id)
                if file_hash == last_hash and not self.reingest:
                    stats.update(error=get_dupe_message(file_hash))
                    self.log.info(get_dupe_message(file_hash))
                    return
            else:
                source = self.records
                file_hash = None

            reader = choose_reader_for_resource(self.resource, source)
            self.log.info(f'Using reader {reader.get_name()}')
            stats.update(count=reader.get_count())

        with ImportStats.track(self.resource_id, INGEST) as stats:
            database = get_database(self.resource_id)
            try:
                operations = {}
                if self.replace and database.has_data():
                    # to do a replace we delete everything before doing the ingest
                    replace_result = database.ingest(
                        (
                            Record.delete(record.id)
                            for record in database.iter_records()
                        ),
                        commit=False,
                    )
                    operations['replace'] = self.result_to_dict(replace_result)
                ingest_result = database.ingest(
                    iter_records(reader.read(), stats), commit=False
                )
                operations['ingest'] = self.result_to_dict(ingest_result)
                stats.update(
                    count=database.data_collection.count_documents({'version': None}),
                    operations=operations,
                )
            except:
                self.log.exception('Error while ingesting data, rolling back')
                database.rollback_records()
                raise

            version = database.commit()
            if version is None:
                self.log.info('No changes detected for data')
                stats.update(count=0)
                return
            else:
                stats.update(version=version)
                self.log.info(f'Ingested new data with version {version}')

            try:
                create_details(
                    self.resource_id, version, reader.get_fields(), file_hash
                )
            except Exception as e:
                self.log.warning(
                    f'Failed to create DatastoreResourceDetails due to {e}'
                )

        self.log.info('Finished ingesting')

resource_id property

Returns:

Type Description
str

the ID of the resource this ingest is operating on

__init__(resource, replace, api_key, records=None, reingest=False)

Parameters:

Name Type Description Default
resource dict

the resource dict

required
replace bool

whether to replace all existing records with the records that will be ingested or not

required
api_key str

the user's API key to use for file access (this is needed for private datasets)

required
records Optional[List[dict]]

optional list of dicts to ingest instead of the resource's file/URL

None
reingest bool

reingest the dataset even if the new file hash matches the previously ingested file hash

False
Source code in ckanext/versioned_datastore/lib/importing/tasks.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
def __init__(
    self,
    resource: dict,
    replace: bool,
    api_key: str,
    records: Optional[List[dict]] = None,
    reingest: bool = False,
):
    """
    :param resource: the resource dict
    :param replace: whether to replace all existing records with the records that
                    will be ingested or not
    :param api_key: the user's API key to use for file access (this is needed for
                    private datasets)
    :param records: optional list of dicts to ingest instead of the resource's
                    file/URL
    :param reingest: reingest the dataset even if the new file hash matches the
                  previously ingested file hash
    """
    self.resource = resource
    self.replace = replace
    self.api_key = api_key
    self.records = records
    self.reingest = reingest

    # create a meaningful title
    if records is not None:
        data_info = f'{len(records)} records'
    else:
        data_info = 'data from file/url'
    title = f'Ingest for {self.resource_id} of {data_info} [replace: {replace}] [reingest: {reingest}]'
    super().__init__('importing', title)

result_to_dict(result) staticmethod

Simple helper to convert an IngestResult object to a dict.

Parameters:

Name Type Description Default
result IngestResult

the IngestResult object

required

Returns:

Type Description
dict

a dict

Source code in ckanext/versioned_datastore/lib/importing/tasks.py
154
155
156
157
158
159
160
161
162
163
164
165
166
@staticmethod
def result_to_dict(result: IngestResult) -> dict:
    """
    Simple helper to convert an IngestResult object to a dict.

    :param result: the IngestResult object
    :returns: a dict
    """
    return {
        'inserted': result.inserted,
        'deleted': result.deleted,
        'updated': result.updated,
    }

run(tmpdir)

Performs the ingestion.

Parameters:

Name Type Description Default
tmpdir Path

a dir to use for temporary storage

required
Source code in ckanext/versioned_datastore/lib/importing/tasks.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
def run(self, tmpdir: Path):
    """
    Performs the ingestion.

    :param tmpdir: a dir to use for temporary storage
    """
    # do the prep stage, this involves downloading the data
    with ImportStats.track(self.resource_id, PREP) as stats:
        if self.records is None:
            source = tmpdir / 'source'
            file_hash = download_resource_data(self.resource, source, self.api_key)
            last_hash = get_last_file_hash(self.resource_id)
            if file_hash == last_hash and not self.reingest:
                stats.update(error=get_dupe_message(file_hash))
                self.log.info(get_dupe_message(file_hash))
                return
        else:
            source = self.records
            file_hash = None

        reader = choose_reader_for_resource(self.resource, source)
        self.log.info(f'Using reader {reader.get_name()}')
        stats.update(count=reader.get_count())

    with ImportStats.track(self.resource_id, INGEST) as stats:
        database = get_database(self.resource_id)
        try:
            operations = {}
            if self.replace and database.has_data():
                # to do a replace we delete everything before doing the ingest
                replace_result = database.ingest(
                    (
                        Record.delete(record.id)
                        for record in database.iter_records()
                    ),
                    commit=False,
                )
                operations['replace'] = self.result_to_dict(replace_result)
            ingest_result = database.ingest(
                iter_records(reader.read(), stats), commit=False
            )
            operations['ingest'] = self.result_to_dict(ingest_result)
            stats.update(
                count=database.data_collection.count_documents({'version': None}),
                operations=operations,
            )
        except:
            self.log.exception('Error while ingesting data, rolling back')
            database.rollback_records()
            raise

        version = database.commit()
        if version is None:
            self.log.info('No changes detected for data')
            stats.update(count=0)
            return
        else:
            stats.update(version=version)
            self.log.info(f'Ingested new data with version {version}')

        try:
            create_details(
                self.resource_id, version, reader.get_fields(), file_hash
            )
        except Exception as e:
            self.log.warning(
                f'Failed to create DatastoreResourceDetails due to {e}'
            )

    self.log.info('Finished ingesting')

get_dupe_message(file_hash)

Returns an error message to be used for duplicate resource ingestions where nothing needs to be done. No error has really occurred so raising an exception feels like a bad way to deal with this, hence this str generator.

Parameters:

Name Type Description Default
file_hash str

the file's hash

required

Returns:

Type Description
str

a str to be stored as the stat's "error"

Source code in ckanext/versioned_datastore/lib/importing/tasks.py
103
104
105
106
107
108
109
110
111
112
def get_dupe_message(file_hash: str) -> str:
    """
    Returns an error message to be used for duplicate resource ingestions where nothing
    needs to be done. No error has really occurred so raising an exception feels like a
    bad way to deal with this, hence this str generator.

    :param file_hash: the file's hash
    :returns: a str to be stored as the stat's "error"
    """
    return f'This file has been ingested before, ignoring [hash: {file_hash}]'

queue_delete(resource)

Queues a new delete job for the given resource to remove all records. This is a versioned delete, so in reality all the data is maintained, just the latest versions of all records are set to empty.

Parameters:

Name Type Description Default
resource dict

the resource dict

required

Returns:

Type Description
Tuple[Job, Job]

a 2-tuple of the created ingest job and sync job which is dependent on the ingest job

Source code in ckanext/versioned_datastore/lib/importing/tasks.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def queue_delete(resource: dict) -> Tuple[Job, Job]:
    """
    Queues a new delete job for the given resource to remove all records. This is a
    versioned delete, so in reality all the data is maintained, just the latest versions
    of all records are set to empty.

    :param resource: the resource dict
    :returns: a 2-tuple of the created ingest job and sync job which is dependent on the
        ingest job
    """
    if is_resource_read_only(resource['id']):
        raise ReadOnlyResourceException('This resource has been marked as read only')

    # create the delete task first
    ingest_task = DeleteResourceTask(resource)
    delete_job = ingest_task.queue()

    # then create a sync task dependent on the delete task
    sync_task = SyncResourceTask(resource)
    sync_job = sync_task.queue(depends_on=delete_job)

    return delete_job, sync_job

queue_ingest(resource, replace, api_key, records=None, reingest=False)

Queues a new ingest job on the importing queue for the given resource.

Parameters:

Name Type Description Default
resource dict

the resource dict

required
replace bool

whether any existing records should be deleted before adding the new records.

required
api_key str

the user's API key to use for file access (this is needed for private datasets)

required
records Optional[List[dict]]

optional list of dicts to ingest instead of the file/URL on the resource

None
reingest bool

reingest the dataset even if the new file hash matches the previously ingested file hash

False

Returns:

Type Description
Tuple[Job, Job]

a 2-tuple of the created ingest job and sync job which is dependent on the ingest job

Source code in ckanext/versioned_datastore/lib/importing/tasks.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def queue_ingest(
    resource: dict,
    replace: bool,
    api_key: str,
    records: Optional[List[dict]] = None,
    reingest: bool = False,
) -> Tuple[Job, Job]:
    """
    Queues a new ingest job on the importing queue for the given resource.

    :param resource: the resource dict
    :param replace: whether any existing records should be deleted before adding the new
        records.
    :param api_key: the user's API key to use for file access (this is needed for
        private datasets)
    :param records: optional list of dicts to ingest instead of the file/URL on the
        resource
    :param reingest: reingest the dataset even if the new file hash matches the
        previously ingested file hash
    :returns: a 2-tuple of the created ingest job and sync job which is dependent on the
        ingest job
    """
    if is_resource_read_only(resource['id']):
        raise ReadOnlyResourceException('This resource has been marked as read only')

    # create the ingest task first
    ingest_task = IngestResourceTask(resource, replace, api_key, records, reingest)
    ingest_job = ingest_task.queue()

    # then create a sync task dependent on the ingest task
    sync_task = SyncResourceTask(resource)
    sync_job = sync_task.queue(depends_on=ingest_job)

    return ingest_job, sync_job

queue_sync(resource, full=False)

Queues a sync job to synchronise any changes in MongoDB for this resource with Elasticsearch.

Parameters:

Name Type Description Default
resource dict

the resource dict

required
full bool

whether to completely resync the Elasticsearch data with MongoDB or just sync the changes (default: False, just sync the changes)

False

Returns:

Type Description
Job

the queued sync job

Source code in ckanext/versioned_datastore/lib/importing/tasks.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def queue_sync(resource: dict, full: bool = False) -> Job:
    """
    Queues a sync job to synchronise any changes in MongoDB for this resource with
    Elasticsearch.

    :param resource: the resource dict
    :param full: whether to completely resync the Elasticsearch data with MongoDB or
        just sync the changes (default: False, just sync the changes)
    :returns: the queued sync job
    """
    if is_resource_read_only(resource['id']):
        raise ReadOnlyResourceException('This resource has been marked as read only')

    return SyncResourceTask(resource, full).queue()