Skip to content

Schema

Schema

Bases: ABC

Abstract base class for a query schema.

Should be extended by all registered query schemas.

Source code in ckanext/versioned_datastore/lib/query/schema.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
class Schema(abc.ABC):
    """
    Abstract base class for a query schema.

    Should be extended by all registered query schemas.
    """

    @abc.abstractmethod
    def validate(self, query: dict):
        """
        Validate the given query against this schema. Failures are marked raising
        jsonschema exceptions.

        :param query: the query dict to validate
        """
        pass

    @abc.abstractmethod
    def translate(self, query: dict) -> DSLQuery:
        """
        Translates the query into an Elasticsearch DSL object.

        :param query: the whole query dict
        :returns: an instantiated Elasticsearch DSL object
        """
        pass

    @abc.abstractmethod
    def hash(self, query: dict) -> str:
        """
        Hashes the query and returns the hex digest.

        :param query: the whole query dict
        :returns: a string hex digest
        """
        pass

    def normalise(self, query: dict) -> dict:
        """
        Corrects some (small) common query errors, e.g. removing empty groups.

        :param query: the query dict
        :returns: the corrected/normalised query dict
        """
        return query

hash(query) abstractmethod

Hashes the query and returns the hex digest.

Parameters:

Name Type Description Default
query dict

the whole query dict

required

Returns:

Type Description
str

a string hex digest

Source code in ckanext/versioned_datastore/lib/query/schema.py
166
167
168
169
170
171
172
173
174
@abc.abstractmethod
def hash(self, query: dict) -> str:
    """
    Hashes the query and returns the hex digest.

    :param query: the whole query dict
    :returns: a string hex digest
    """
    pass

normalise(query)

Corrects some (small) common query errors, e.g. removing empty groups.

Parameters:

Name Type Description Default
query dict

the query dict

required

Returns:

Type Description
dict

the corrected/normalised query dict

Source code in ckanext/versioned_datastore/lib/query/schema.py
176
177
178
179
180
181
182
183
def normalise(self, query: dict) -> dict:
    """
    Corrects some (small) common query errors, e.g. removing empty groups.

    :param query: the query dict
    :returns: the corrected/normalised query dict
    """
    return query

translate(query) abstractmethod

Translates the query into an Elasticsearch DSL object.

Parameters:

Name Type Description Default
query dict

the whole query dict

required

Returns:

Type Description
Query

an instantiated Elasticsearch DSL object

Source code in ckanext/versioned_datastore/lib/query/schema.py
156
157
158
159
160
161
162
163
164
@abc.abstractmethod
def translate(self, query: dict) -> DSLQuery:
    """
    Translates the query into an Elasticsearch DSL object.

    :param query: the whole query dict
    :returns: an instantiated Elasticsearch DSL object
    """
    pass

validate(query) abstractmethod

Validate the given query against this schema. Failures are marked raising jsonschema exceptions.

Parameters:

Name Type Description Default
query dict

the query dict to validate

required
Source code in ckanext/versioned_datastore/lib/query/schema.py
146
147
148
149
150
151
152
153
154
@abc.abstractmethod
def validate(self, query: dict):
    """
    Validate the given query against this schema. Failures are marked raising
    jsonschema exceptions.

    :param query: the query dict to validate
    """
    pass

get_latest_query_version()

Gets the latest query version from the registered schemas dict.

Returns:

Type Description
str

the latest query version

Source code in ckanext/versioned_datastore/lib/query/schema.py
38
39
40
41
42
43
44
def get_latest_query_version() -> str:
    """
    Gets the latest query version from the registered schemas dict.

    :returns: the latest query version
    """
    return next(iter(schemas.keys()))

hash_query(query, version)

Hashes the given query at the given version and returns the unique digest.

Parameters:

Name Type Description Default
query dict

the query dict

required
version str

the query version

required

Returns:

Type Description
str

the hash

Source code in ckanext/versioned_datastore/lib/query/schema.py
85
86
87
88
89
90
91
92
93
94
95
96
def hash_query(query: dict, version: str) -> str:
    """
    Hashes the given query at the given version and returns the unique digest.

    :param query: the query dict
    :param version: the query version
    :returns: the hash
    """
    if version not in schemas:
        raise InvalidQuerySchemaVersionError(version)
    else:
        return get_schema(version).hash(query)

load_core_schema(version)

Given a query schema version, loads the schema from the schema_base_path directory.

Parameters:

Name Type Description Default
version

the version to load

required

Returns:

Type Description

the loaded schema (as a dict) and a jsonschmea validator object for the schema

Source code in ckanext/versioned_datastore/lib/query/schema.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
def load_core_schema(version):
    """
    Given a query schema version, loads the schema from the schema_base_path directory.

    :param version: the version to load
    :returns: the loaded schema (as a dict) and a jsonschmea validator object for the
        schema
    """
    schema_file = schema_base_path / version / f'{version}.json'
    schema = json.loads(schema_file.read_text('utf-8'))
    validator_cls = validator_for(schema)
    validator_cls.check_schema(schema)
    # create a resolver which can resolve refs relative to the schema
    resolver = RefResolver(base_uri=f'file://{schema_file}', referrer=schema)
    validator = validator_cls(schema, resolver=resolver)
    return schema, validator

normalise_query(query, version)

Corrects some (small) common query errors, e.g. removing empty groups.

Parameters:

Name Type Description Default
query

the query dict

required
version

the query version

required

Returns:

Type Description

the corrected/normalised query

Source code in ckanext/versioned_datastore/lib/query/schema.py
107
108
109
110
111
112
113
114
115
116
117
118
def normalise_query(query, version):
    """
    Corrects some (small) common query errors, e.g. removing empty groups.

    :param query: the query dict
    :param version: the query version
    :returns: the corrected/normalised query
    """
    if version not in schemas:
        raise InvalidQuerySchemaVersionError(version)
    else:
        return schemas[version].normalise(query)

register_schema(version, schema)

Registers a new schema with the given version into the central schemas dict. The schema parameter should be a subclass of the Schema class but generally must at least provide the translate and validate methods. After registration, the schemas dict is updated to ensure the correct sort order is in use.

Parameters:

Name Type Description Default
version str

the query schema version

required
schema dict

the Schema object representing the query schema

required
Source code in ckanext/versioned_datastore/lib/query/schema.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def register_schema(version: str, schema: dict):
    """
    Registers a new schema with the given version into the central schemas dict. The
    schema parameter should be a subclass of the Schema class but generally must at
    least provide the translate and validate methods. After registration, the schemas
    dict is updated to ensure the correct sort order is in use.

    :param version: the query schema version
    :param schema: the Schema object representing the query schema
    """
    global schemas
    # add the new version and re-sort the schemas by version in ascending order
    # (h/t https://stackoverflow.com/a/2574090)
    schemas = OrderedDict(
        sorted(
            itertools.chain([(version, schema)], schemas.items()),
            key=lambda vs: [int(u) for u in vs[0][1:].split('.')],
        )
    )

translate_query(query, version)

Translates the given query dict into an elasticsearch-dsl object using the Schema object associated with the given version. If the version doesn't match any registered schemas then an InvalidQuerySchemaVersionError will be raised.

Parameters:

Name Type Description Default
query dict

the whole query dict

required
version str

the query schema version to translate using

required

Returns:

Type Description
Query

an instantiated Elasticsearch DSL Query object

Source code in ckanext/versioned_datastore/lib/query/schema.py
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def translate_query(query: dict, version: str) -> DSLQuery:
    """
    Translates the given query dict into an elasticsearch-dsl object using the Schema
    object associated with the given version. If the version doesn't match any
    registered schemas then an InvalidQuerySchemaVersionError will be raised.

    :param query: the whole query dict
    :param version: the query schema version to translate using
    :returns: an instantiated Elasticsearch DSL Query object
    """
    if version not in schemas:
        raise InvalidQuerySchemaVersionError(version)
    else:
        return get_schema(version).translate(query)

validate_query(query, version)

Validate the given query dict against the query schema for the given version. If the version doesn't match any registered schemas then an InvalidQuerySchemaVersionError will be raised.

Parameters:

Name Type Description Default
query dict

the query dict

required
version str

the query schema version to validate against

required

Returns:

Type Description
bool

True if the validation succeeded, otherwise jsonschema exceptions will be raised

Source code in ckanext/versioned_datastore/lib/query/schema.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def validate_query(query: dict, version: str) -> bool:
    """
    Validate the given query dict against the query schema for the given version. If the
    version doesn't match any registered schemas then an InvalidQuerySchemaVersionError
    will be raised.

    :param query: the query dict
    :param version: the query schema version to validate against
    :returns: True if the validation succeeded, otherwise jsonschema exceptions will be
        raised
    """
    if version not in schemas:
        raise InvalidQuerySchemaVersionError(version)
    get_schema(version).validate(query)
    return True