Skip to content

Action

record_show(resource_id, record_id, version=None)

Compatibility action for vds_data_get.

Parameters:

Name Type Description Default
resource_id str

the resource's ID

required
record_id str

the record's ID

required
version Optional[int]

the version to get of the record, or None to get the latest

None

Returns:

Type Description

the record data

Source code in ckanext/versioned_datastore/logic/data/action.py
151
152
153
154
155
156
157
158
159
160
161
@action(schema.vds_data_get(), helptext.vds_data_get, get=True)
def record_show(resource_id: str, record_id: str, version: Optional[int] = None):
    """
    Compatibility action for vds_data_get.

    :param resource_id: the resource's ID
    :param record_id: the record's ID
    :param version: the version to get of the record, or None to get the latest
    :returns: the record data
    """
    return vds_data_get(resource_id, record_id, version)

vds_data_add(context, original_data_dict, resource_id, replace, reingest=False)

Add data to the datastore for the given resource. The data added will be the data found at the resource's URL unless a list of dict records is provided in which case this will be added. The addition is asynchronous and so the result returned by calling this action is information about the queued jobs. Two jobs will be queued, an ingest job to store the data found at the URL/in the provided records, and then a sync job, which updates the data search index making the data available for searching (this will only run if the ingest succeeded).

If the replace flag is passed as True, all the existing data in the datastore for this resource will be deleted before the new data is added, resulting in the new data replacing the old data. This is the kind of behaviour users expect when uploading a fresh new file.

If the reingest flag is True, the file will be ingested even if the database believes it has already ingested a file with the same file hash. This is mostly for handling errors and shouldn't generally be needed.

Before the ingest and sync jobs are queued, the datastore's parsing options are updated if required. If they are altered before ingest, the new version will be returned in the result dict.

Parameters:

Name Type Description Default
context dict

the CKAN context

required
resource_id str

the resource ID

required
replace bool

whether to replace all existing records with the new ones

required
reingest bool

whether to ingest even if the previous file is identical

False

Returns:

Type Description

a dict containing information about the add

Source code in ckanext/versioned_datastore/logic/data/action.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
@action(schema.vds_data_add(), helptext.vds_data_add)
def vds_data_add(
    context: dict,
    original_data_dict: dict,
    resource_id: str,
    replace: bool,
    reingest: bool = False,
):
    """
    Add data to the datastore for the given resource. The data added will be the data
    found at the resource's URL unless a list of dict records is provided in which case
    this will be added. The addition is asynchronous and so the result returned by
    calling this action is information about the queued jobs. Two jobs will be queued,
    an ingest job to store the data found at the URL/in the provided records, and then a
    sync job, which updates the data search index making the data available for
    searching (this will only run if the ingest succeeded).

    If the replace flag is passed as True, all the existing data in the datastore for
    this resource will be deleted before the new data is added, resulting in the new
    data replacing the old data. This is the kind of behaviour users expect when
    uploading a fresh new file.

    If the reingest flag is True, the file will be ingested even if the database
    believes it has already ingested a file with the same file hash. This is mostly for
    handling errors and shouldn't generally be needed.

    Before the ingest and sync jobs are queued, the datastore's parsing options are
    updated if required. If they are altered before ingest, the new version will be
    returned in the result dict.

    :param context: the CKAN context
    :param resource_id: the resource ID
    :param replace: whether to replace all existing records with the new ones
    :param reingest: whether to ingest even if the previous file is identical
    :returns: a dict containing information about the add
    """
    if is_resource_read_only(resource_id):
        raise ReadOnlyResourceException('This resource has been marked as read only')

    records = original_data_dict.get('records', None)
    if records is not None:
        records = list_of_dicts_validator(records, context)

    # make sure the options are in sync
    new_options_version = update_options(resource_id)

    user = toolkit.get_action('user_show')(context, {'id': context['user']})
    resource = toolkit.get_action('resource_show')(context, {'id': resource_id})

    if resource.get('disable_parsing', False):
        raise RawResourceException('Ingestion has been disabled for this resource')

    ingest_job, sync_job = queue_ingest(
        resource, replace, user['apikey'], records, reingest
    )

    return {
        'new_options_version': new_options_version,
        'queued_at': ingest_job.enqueued_at.isoformat(),
        'ingest_job_id': ingest_job.id,
        'sync_job_id': sync_job.id,
    }

vds_data_delete(context, resource_id)

Deletes all the data from the given resource. This is an async operation as signified by the return of this action which is details about the created jobs. The data will be deleted by creating new deleted versions of the records in the resource and thus the old data will still be available in old versions, but the latest version of the resource will be empty.

Two jobs will be queued, one to delete all the records in MongoDB and then one to sync the changes with Elasticsearch.

Parameters:

Name Type Description Default
context dict

the CKAN action context

required
resource_id str

the resource ID to delete the data from

required

Returns:

Type Description

a dict containing information about the delete

Source code in ckanext/versioned_datastore/logic/data/action.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
@action(schema.vds_data_delete(), helptext.vds_data_delete)
def vds_data_delete(context: dict, resource_id: str):
    """
    Deletes all the data from the given resource. This is an async operation as
    signified by the return of this action which is details about the created jobs. The
    data will be deleted by creating new deleted versions of the records in the resource
    and thus the old data will still be available in old versions, but the latest
    version of the resource will be empty.

    Two jobs will be queued, one to delete all the records in MongoDB and then one to
    sync the changes with Elasticsearch.

    :param context: the CKAN action context
    :param resource_id: the resource ID to delete the data from
    :returns: a dict containing information about the delete
    """
    if is_resource_read_only(resource_id):
        raise ReadOnlyResourceException('This resource has been marked as read only')

    resource = toolkit.get_action('resource_show')(context, {'id': resource_id})
    delete_job, sync_job = queue_delete(resource)

    return {
        'queued_at': delete_job.enqueued_at.isoformat(),
        'delete_job_id': delete_job.id,
        'sync_job_id': sync_job.id,
    }

vds_data_get(resource_id, record_id, version=None)

Retrieves the data for one record from the given resource at the given optional version and returns it. If the record can't be found an ObjectNotFound exception is raised.

Parameters:

Name Type Description Default
resource_id str

the resource's ID

required
record_id str

the record's ID

required
version Optional[int]

the version to get of the record, or None to get the latest

None

Returns:

Type Description
Source code in ckanext/versioned_datastore/logic/data/action.py
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
@action(schema.vds_data_get(), helptext.vds_data_get, get=True)
def vds_data_get(resource_id: str, record_id: str, version: Optional[int] = None):
    """
    Retrieves the data for one record from the given resource at the given optional
    version and returns it. If the record can't be found an ObjectNotFound exception is
    raised.

    :param resource_id: the resource's ID
    :param record_id: the record's ID
    :param version: the version to get of the record, or None to get the latest
    :returns:
    """
    query = DirectQuery(
        [resource_id], version, Q('term', **{DocumentField.ID: record_id})
    )
    request = SearchRequest(query, size=1)
    response = request.run()
    try:
        return {
            'data': response.data[0],
            # TODO: do we actually need to add the fields?
            # get_fields(resource_id, request.query.version)
            'resource_id': resource_id,
        }
    except IndexError:
        # if we don't have a result, raise not found
        raise toolkit.ObjectNotFound

vds_data_sync(context, resource_id, full=False)

Queues as sync operation to ensure that the resource's data in MongoDB matches the data in Elasticsearch.

With full=False (the default), only any unsynced changes will be synced. With full=True, all records will be resynced. This is managed by Splitgill and may delete records before syncing (check Splitgill docs).

Parameters:

Name Type Description Default
context dict

the CKAN action context

required
resource_id str

the resource ID to sync

required
full bool

whether to sync all records again or just the changed ones since the last sync (default is False).

False

Returns:

Type Description

a dict containing information about the sync

Source code in ckanext/versioned_datastore/logic/data/action.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
@action(schema.vds_data_sync(), helptext.vds_data_sync)
def vds_data_sync(context: dict, resource_id: str, full: bool = False):
    """
    Queues as sync operation to ensure that the resource's data in MongoDB matches the
    data in Elasticsearch.

    With full=False (the default), only any unsynced changes will be synced. With
    full=True, all records will be resynced. This is managed by Splitgill and may delete
    records before syncing (check Splitgill docs).

    :param context: the CKAN action context
    :param resource_id: the resource ID to sync
    :param full: whether to sync all records again or just the changed ones since the
        last sync (default is False).
    :returns: a dict containing information about the sync
    """
    if is_resource_read_only(resource_id):
        raise ReadOnlyResourceException('This resource has been marked as read only')

    resource = toolkit.get_action('resource_show')(context, {'id': resource_id})

    if resource.get('disable_parsing', False):
        raise RawResourceException('Ingestion has been disabled for this resource')

    job = queue_sync(resource, full)

    return {
        'queued_at': job.enqueued_at.isoformat(),
        'job_id': job.id,
    }