Skip to content

Utils

RawResourceException

Bases: ValidationError

Raised when trying to ingest a resource that has been marked with "disable_parsing".

Source code in ckanext/versioned_datastore/lib/utils.py
252
253
254
255
256
257
class RawResourceException(toolkit.ValidationError):
    """
    Raised when trying to ingest a resource that has been marked with "disable_parsing".
    """

    pass

ReadOnlyResourceException

Bases: ValidationError

Raised when a write operation of some variety is attempted on a resource which has been marked as read only.

Source code in ckanext/versioned_datastore/lib/utils.py
243
244
245
246
247
248
249
class ReadOnlyResourceException(toolkit.ValidationError):
    """
    Raised when a write operation of some variety is attempted on a resource which has
    been marked as read only.
    """

    pass

es_client()

Retrieves an Elasticsearch client for use on the in use cluster. If Splitgill is not configured yet on the VDS plugin, an exception is raised.

Returns:

Type Description
Elasticsearch

an Elasticsearch object

Source code in ckanext/versioned_datastore/lib/utils.py
165
166
167
168
169
170
171
172
173
174
175
def es_client() -> Elasticsearch:
    """
    Retrieves an Elasticsearch client for use on the in use cluster. If Splitgill is not
    configured yet on the VDS plugin, an exception is raised.

    :returns: an Elasticsearch object
    """
    vds_plugin = get_plugin('versioned_datastore')
    if not vds_plugin.is_sg_configured:
        raise Exception('VDS plugin not configured yet')
    return vds_plugin.elasticsearch_client

get_available_datastore_resources(ignore_auth=False, user_id='')

Simple wrapper around get_available_resources which provides a list of available datastore resources to the currently logged-in user.

Parameters:

Name Type Description Default
ignore_auth bool

whether to ignore authentication (default: False)

False

Returns:

Type Description
Set[str]

a set of resource IDs

Source code in ckanext/versioned_datastore/lib/utils.py
23
24
25
26
27
28
29
30
31
32
33
34
35
def get_available_datastore_resources(
    ignore_auth: bool = False, user_id: str = ''
) -> Set[str]:
    """
    Simple wrapper around get_available_resources which provides a list of available
    datastore resources to the currently logged-in user.

    :param ignore_auth: whether to ignore authentication (default: False)
    :returns: a set of resource IDs
    """
    return get_available_resources(
        datastore_only=True, ignore_auth=ignore_auth, user_id=user_id
    )

get_available_resources(datastore_only, ignore_auth=False, user_id='')

Get a set of resource IDs that are available to the currently logged-in user and, if datastore_only is set to True, are datastore active. If no user is logged-in, all public datastore resource IDs are returned. The resource IDs are returned as a set to enable quick checking between a list of requested IDs and the list of available IDs.

Parameters:

Name Type Description Default
datastore_only bool

whether to only return resource IDs that are datastore active

required
ignore_auth bool

whether to ignore authentication (default: False)

False

Returns:

Type Description
Set[str]

a set of resource IDs

Source code in ckanext/versioned_datastore/lib/utils.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def get_available_resources(
    datastore_only: bool, ignore_auth: bool = False, user_id: str = ''
) -> Set[str]:
    """
    Get a set of resource IDs that are available to the currently logged-in user and, if
    datastore_only is set to True, are datastore active. If no user is logged-in, all
    public datastore resource IDs are returned. The resource IDs are returned as a set
    to enable quick checking between a list of requested IDs and the list of available
    IDs.

    :param datastore_only: whether to only return resource IDs that are datastore active
    :param ignore_auth: whether to ignore authentication (default: False)
    :returns: a set of resource IDs
    """
    resource_ids = set()

    offset = 0
    action = toolkit.get_action('current_package_list_with_resources')

    while True:
        context = {}
        if ignore_auth:
            # unless ignore auth is passed, in which case pass that in the context
            context['ignore_auth'] = True
        else:
            context['user'] = user_id
        packages = action(context, {'offset': offset, 'limit': 100})
        if not packages:
            break
        for package in packages:
            # add the datastore active resources
            resource_ids.update(
                resource['id']
                for resource in package['resources']
                if not datastore_only or resource.get('datastore_active', False)
            )
        offset += len(packages)

    return resource_ids

get_database(resource_id)

Retrieves a SplitgillDatabase object for the given resource ID. If the SplitgillClient on the VDS plugin isn't yet configured, an exception is raised.

Parameters:

Name Type Description Default
resource_id str

the resource's ID

required

Returns:

Type Description
SplitgillDatabase

a SplitgillDatabase

Source code in ckanext/versioned_datastore/lib/utils.py
140
141
142
143
144
145
146
147
148
149
def get_database(resource_id: str) -> SplitgillDatabase:
    """
    Retrieves a SplitgillDatabase object for the given resource ID. If the
    SplitgillClient on the VDS plugin isn't yet configured, an exception is raised.

    :param resource_id: the resource's ID
    :returns: a SplitgillDatabase
    """
    name = get_sg_name(resource_id)
    return sg_client().get_database(name)

get_latest_resource_fields(resource_ids)

Retrieves a list of fields available on the latest indices for the given resources.

Does not do any authentication. Only call from within other authenticated functions.

Parameters:

Name Type Description Default
resource_ids List[str]

list of resource IDs

required

Returns:

Type Description
Dict[str, Dict[str, Dict]]

dict of field names, the resources they're found in, and their details within those resources

Source code in ckanext/versioned_datastore/lib/utils.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
@cache_region('vds', 'resource_fields')
def get_latest_resource_fields(resource_ids: List[str]) -> Dict[str, Dict[str, Dict]]:
    """
    Retrieves a list of fields available on the latest indices for the given resources.

    Does not do any authentication. Only call from within other authenticated functions.

    :param resource_ids: list of resource IDs
    :returns: dict of field names, the resources they're found in, and their details
        within those resources
    """
    fields = defaultdict(dict)
    for resource_id in resource_ids:
        database = get_database(resource_id)

        try:
            parsed_fields = database.get_field_names()
        except toolkit.NotFoundError:
            # temporary fix for splitgill#38 (so we can ignore unavailable resources)
            return fields

        for field in parsed_fields:
            fields[field.path][resource_id] = {
                'name': field.name,
                'path': field.path,
                'text': field.is_text,
                'keyword': field.is_keyword,
                'boolean': field.is_boolean,
                'date': field.is_date,
                'number': field.is_number,
                'geo': field.is_geo,
            }

    return fields

get_latest_version(resource_id)

Retrieves the latest version of the given resource from the status index.

Parameters:

Name Type Description Default
resource_id

the resource's id

required

Returns:

Type Description
Optional[int]

the version or None if the resource isn't indexed

Source code in ckanext/versioned_datastore/lib/utils.py
191
192
193
194
195
196
197
198
def get_latest_version(resource_id) -> Optional[int]:
    """
    Retrieves the latest version of the given resource from the status index.

    :param resource_id: the resource's id
    :returns: the version or None if the resource isn't indexed
    """
    return get_database(resource_id).get_elasticsearch_version()

get_public_resources()

Retrieves a list of public resources and whether they are present in the datastore.

Returns:

Type Description
Dict[str, bool]

dict of publicly available resource IDs and whether they are available in the datastore

Source code in ckanext/versioned_datastore/lib/utils.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
@cache_region('vds', 'public_resources')
def get_public_resources() -> Dict[str, bool]:
    """
    Retrieves a list of public resources and whether they are present in the datastore.

    :returns: dict of publicly available resource IDs and whether they are available in
        the datastore
    """
    resource_ids = {}
    offset = 0
    action = toolkit.get_action('current_package_list_with_resources')
    while True:
        # do not ignore auth, do not provide a user
        context = {'ignore_auth': False, 'user': None}
        packages = action(context, {'offset': offset, 'limit': 100})
        if not packages:
            break
        for package in packages:
            for resource in package.get('resources', []):
                resource_ids[resource['id']] = is_datastore_resource(resource['id'])
        offset += len(packages)

    return resource_ids

get_sg_name(resource_id)

Adds the configured prefix to the start of the resource id to get the index name for the resource data in elasticsearch.

Parameters:

Name Type Description Default
resource_id str

the resource id

required

Returns:

Type Description
str

the resource's Splitgill database name

Source code in ckanext/versioned_datastore/lib/utils.py
201
202
203
204
205
206
207
208
209
210
def get_sg_name(resource_id: str) -> str:
    """
    Adds the configured prefix to the start of the resource id to get the index name for
    the resource data in elasticsearch.

    :param resource_id: the resource id
    :returns: the resource's Splitgill database name
    """
    prefix = toolkit.config.get('ckanext.versioned_datastore.sg_prefix', '')
    return f'{prefix}{resource_id}'

is_datastore_only_resource(resource_url)

Checks whether the resource url is a datastore only resource url. When uploading data directly to the API without using a source file/URL the url of the resource will be set to "_datastore_only_resource" to indicate that as such. This function checks to see if the resource URL provided is one of these URLs. Note that we check a few different scenarios as CKAN has the nasty habit of adding a protocol onto the front of these URLs when saving the resource, sometimes.

Parameters:

Name Type Description Default
resource_url str

the URL of the resource

required

Returns:

Type Description
bool

True if the resource is a datastore only resource, False if not

Source code in ckanext/versioned_datastore/lib/utils.py
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
def is_datastore_only_resource(resource_url: str) -> bool:
    """
    Checks whether the resource url is a datastore only resource url. When uploading
    data directly to the API without using a source file/URL the url of the resource
    will be set to "_datastore_only_resource" to indicate that as such. This function
    checks to see if the resource URL provided is one of these URLs. Note that we check
    a few different scenarios as CKAN has the nasty habit of adding a protocol onto the
    front of these URLs when saving the resource, sometimes.

    :param resource_url: the URL of the resource
    :returns: True if the resource is a datastore only resource, False if not
    """
    return (
        resource_url == common.DATASTORE_ONLY_RESOURCE
        or resource_url == f'http://{common.DATASTORE_ONLY_RESOURCE}'
        or resource_url == f'https://{common.DATASTORE_ONLY_RESOURCE}'
    )

is_datastore_resource(resource_id)

Checks if any data has made it to Elasticsearch for this resource ID. Note that this only checks Elasticsearch, it doesn't check MongoDB, and is therefore intended to simply test if there is any searchable data for the resource.

Parameters:

Name Type Description Default
resource_id str

the resource id

required

Returns:

Type Description
bool

True if the resource is a datastore resource, False if not

Source code in ckanext/versioned_datastore/lib/utils.py
273
274
275
276
277
278
279
280
281
282
def is_datastore_resource(resource_id: str) -> bool:
    """
    Checks if any data has made it to Elasticsearch for this resource ID. Note that this
    only checks Elasticsearch, it doesn't check MongoDB, and is therefore intended to
    simply test if there is any searchable data for the resource.

    :param resource_id: the resource id
    :returns: True if the resource is a datastore resource, False if not
    """
    return get_database(resource_id).get_elasticsearch_version() is not None

is_ingestible(resource)

Returns True if the resource can be ingested into the datastore and False if not. To be ingestible, the resource must either be a datastore only resource (signified by the url being set to _datastore_only_resource) or have a format that we can ingest (the format field on the resource is used for this, not the URL). If the url is None, False is returned. This is technically not possible due to a Resource model constraint, but it's worth covering off anyway.

Parameters:

Name Type Description Default
resource dict

the resource dict

required

Returns:

Type Description
bool

True if it is, False if not

Source code in ckanext/versioned_datastore/lib/utils.py
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
def is_ingestible(resource: dict) -> bool:
    """
    Returns True if the resource can be ingested into the datastore and False if not. To
    be ingestible, the resource must either be a datastore only resource (signified by
    the url being set to _datastore_only_resource) or have a format that we can ingest
    (the format field on the resource is used for this, not the URL). If the url is
    None, False is returned. This is technically not possible due to a Resource model
    constraint, but it's worth covering off anyway.

    :param resource: the resource dict
    :returns: True if it is, False if not
    """
    if resource.get('url', None) is None:
        return False

    resource_format = resource.get('format', None)
    not_raw = not resource.get('disable_parsing', False)
    return (
        not_raw
        and is_datastore_only_resource(resource['url'])
        or (
            resource_format is not None
            and resource_format.lower() in common.ALL_FORMATS
        )
    )

is_resource_read_only(resource_id)

Loops through the plugin implementations checking if any of them want the given resource id to be read only.

Returns:

Type Description
bool

True if the resource should be treated as read only, False if not

Source code in ckanext/versioned_datastore/lib/utils.py
260
261
262
263
264
265
266
267
268
269
270
def is_resource_read_only(resource_id: str) -> bool:
    """
    Loops through the plugin implementations checking if any of them want the given
    resource id to be read only.

    :returns: True if the resource should be treated as read only, False if not
    """
    return any(
        plugin.vds_is_read_only_resource(resource_id)
        for plugin in ivds_implementations()
    )

mongo_client()

Retrieves a Mongo client for use on the in use database instance. If Splitgill is not configured yet on the VDS plugin, an exception is raised.

Returns:

Type Description
MongoClient

an MongoClient object

Source code in ckanext/versioned_datastore/lib/utils.py
178
179
180
181
182
183
184
185
186
187
188
def mongo_client() -> MongoClient:
    """
    Retrieves a Mongo client for use on the in use database instance. If Splitgill is
    not configured yet on the VDS plugin, an exception is raised.

    :returns: an MongoClient object
    """
    vds_plugin = get_plugin('versioned_datastore')
    if not vds_plugin.is_sg_configured:
        raise Exception('VDS plugin not configured yet')
    return vds_plugin.mongo_client

sg_client()

Retrieves a Splitgill client object. If Splitgill is not configured yet on the VDS plugin, an exception is raised.

Returns:

Type Description
SplitgillClient

an SplitgillClient object

Source code in ckanext/versioned_datastore/lib/utils.py
152
153
154
155
156
157
158
159
160
161
162
def sg_client() -> SplitgillClient:
    """
    Retrieves a Splitgill client object. If Splitgill is not configured yet on the VDS
    plugin, an exception is raised.

    :returns: an SplitgillClient object
    """
    vds_plugin = get_plugin('versioned_datastore')
    if not vds_plugin.is_sg_configured:
        raise Exception('VDS plugin not configured yet')
    return vds_plugin.sg_client

unprefix_index_name(sg_index_name)

Extracts the resource ID from the given Splitgill index name by removing the Splitgill specific parts, plus removing the prefix (if one is configured). If the resource ID cannot be extracted, a ValueError is raised.

Parameters:

Name Type Description Default
sg_index_name str

the Splitgill index name

required

Returns:

Type Description
str

the resource's ID

Source code in ckanext/versioned_datastore/lib/utils.py
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
def unprefix_index_name(sg_index_name: str) -> str:
    """
    Extracts the resource ID from the given Splitgill index name by removing the
    Splitgill specific parts, plus removing the prefix (if one is configured). If the
    resource ID cannot be extracted, a ValueError is raised.

    :param sg_index_name: the Splitgill index name
    :returns: the resource's ID
    """
    # all indexes have data- at the start and -latest or -arc-# on the end
    regexes = [re.compile(r'data-(.*)-latest'), re.compile(r'data-(.*)-arc-[0-9]+')]
    for regex in regexes:
        match = regex.match(sg_index_name)
        if match:
            return unprefix_sg_name(match.group(1))
    raise ValueError(f'Failed to extract resource name from index: {sg_index_name}')

unprefix_sg_name(sg_name)

Removes the configured prefix from the start of the index name to get the resource id.

Parameters:

Name Type Description Default
sg_name str

the Spitgill database name

required

Returns:

Type Description
str

the resource's id

Source code in ckanext/versioned_datastore/lib/utils.py
213
214
215
216
217
218
219
220
221
222
def unprefix_sg_name(sg_name: str) -> str:
    """
    Removes the configured prefix from the start of the index name to get the resource
    id.

    :param sg_name: the Spitgill database name
    :returns: the resource's id
    """
    prefix = toolkit.config.get('ckanext.versioned_datastore.sg_prefix', '')
    return sg_name[len(prefix) :]