Skip to content

Request

ResultRecord dataclass

A wrapper on a hit from the response.

Unless source fields have been used in the search request, this will be wrapping a version of a record.

Source code in ckanext/versioned_datastore/lib/query/search/request.py
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
@dataclasses.dataclass
class ResultRecord:
    """
    A wrapper on a hit from the response.

    Unless source fields have been used in the search request, this will be wrapping a
    version of a record.
    """

    hit: AttrDict

    @property
    def id(self) -> str:
        """
        :returns: the record's ID
        """
        return self.hit[DocumentField.ID]

    @property
    def version(self) -> int:
        """
        :returns: the record's version
        """
        return self.hit[DocumentField.VERSION]

    @property
    def data(self) -> dict:
        """
        Rebuilds the record data from the hit in the raw Elasticsearch response using
        Splitgill's rebuild_data function and returns the data as a dict.

        :returns: the record data
        """
        return rebuild_data(self.hit[DocumentField.DATA].to_dict())

    @property
    def index(self) -> str:
        """
        :returns: the index this record comes from
        """
        return self.hit.meta.index

    @property
    def resource_id(self) -> str:
        """
        :returns: the resource this record comes from
        """
        return unprefix_index_name(self.index)

data property

Rebuilds the record data from the hit in the raw Elasticsearch response using Splitgill's rebuild_data function and returns the data as a dict.

Returns:

Type Description
dict

the record data

id property

Returns:

Type Description
str

the record's ID

index property

Returns:

Type Description
str

the index this record comes from

resource_id property

Returns:

Type Description
str

the resource this record comes from

version property

Returns:

Type Description
int

the record's version

SearchRequest dataclass

Class representing a search request.

Source code in ckanext/versioned_datastore/lib/query/search/request.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
@dataclasses.dataclass
class SearchRequest:
    """
    Class representing a search request.
    """

    query: Query
    size: Optional[int] = 100
    offset: Optional[int] = 0
    after: Optional[Any] = None
    sorts: List[Sort] = dataclasses.field(default_factory=list)
    fields: List[str] = dataclasses.field(default_factory=list)
    aggs: Dict[str, A] = dataclasses.field(default_factory=dict)
    # any additional filters which shouldn't be represented as part of the main query
    extra_filter: Bool = dataclasses.field(default_factory=Bool)
    # setting this to True will cause no versions to be taken into account and all data
    # will be returned. As an example of where this is useful, consider aggregations
    # across all versions of a record or resource
    force_no_version: bool = False
    # if this request has been created based on an action call then it's useful to store
    # a reference to the original data dict that the action was called with. Defaults to
    # None, indicating there is no associated (or relevant) data dict available.
    data_dict: Optional[dict] = None
    ignore_auth: bool = False
    # optional additional request parameters to be included when performing the search
    req_params: dict = dataclasses.field(default_factory=dict)

    def add_param(self, param: str, value: Any):
        """
        Add a request parameter to be passed as part of this search request. The search
        request is completed using the _msearch endpoint on Elasticsearch so only
        parameters that work on that endpoint should be added.

        :param param: the param name
        :param value: the param value
        """
        self.req_params[param] = value

    def add_sort(self, field: str, ascending: bool = True):
        """
        Convenience wrapper to add a sort to the sort list on the given field with the
        given direction.

        :param field: the field to sort on
        :param ascending: whether to sort in ascending (True, default) or descending
            (False)
        """
        self.sorts.append(Sort(field, ascending))

    def add_agg(self, name, agg_type, *args, **kwargs):
        """
        Creates an aggregation and adds it to the dict of aggregations on this request.

        :param name: the name of the aggregation
        :param agg_type: the aggregation type
        :param args: the aggregation arguments
        :param kwargs: the aggregation kwarguments
        :returns:
        """
        self.aggs[name] = A(agg_type, *args, **kwargs)

    def indexes(self) -> List[str]:
        """
        A list of the indexes this request will search over. This list is created from
        the resource_ids specified in the query.

        :returns: a list of index names, this could include wildcards
        """
        databases = map(get_database, self.query.resource_ids)

        if self.force_no_version or self.query.version is not None:
            # todo: could check latest version for each and optimise to latest?
            return [database.indices.wildcard for database in databases]
        else:
            return [database.indices.latest for database in databases]

    def set_no_results(self):
        """
        Sets the request size to zero and removes all pagination values.

        This is useful for aggregation only results or counts where you don't need any
        hits.
        """
        self.size = 0
        self.offset = 0
        self.after = None

    def get_safe_size(self) -> int:
        """
        The size that is actually going to be used in the request to Elasticsearch. This
        method returns a value based on the `self.size` property but capped between 0
        and 1000, with a default value of 100 if `self.size` is set to None.

        :returns: a number between 0 and 1000
        """
        return max(0, min(100 if self.size is None else self.size, 1000))

    def to_search(self) -> Search:
        """
        Builds an Elasticsearch Search object with the query, indexes, sorts, size, and
        aggregations set.

        :returns: a new Elasticsearch Search object
        """
        search = (
            Search()
            .index(self.indexes())
            # we want to provide an accurate count, and damn the expense
            .extra(track_total_hits=True)
        )

        search = search.query(self.query.to_dsl())

        # add the version filter, if needed
        if not self.force_no_version and self.query.version is not None:
            search = search.filter(version_query(self.query.version))

        # add any extra filters if there are any
        if self.extra_filter.to_dict()['bool']:
            search = search.filter(self.extra_filter)

        size = self.get_safe_size()
        if size == 0:
            # an empty request
            search = search.extra(size=0)
        else:
            # add one to the size so that we can work out when there are no more hits
            size += 1
            search = search.extra(size=size)

            # only offset or after should be used
            if self.offset:
                # if from + size > 10000, running the search will fail. we still want
                # to return an error instead of silently failing, but we can return a
                # better error.
                search_window = size + self.offset
                if search_window > 10000:
                    raise toolkit.ValidationError(
                        f'Search window size out of bounds: {search_window} (from: '
                        f'{self.offset}, size: {size})'
                    )
                search = search.extra(from_=self.offset)
            elif self.after is not None:
                search = search.extra(search_after=self.after)

            # set the source if required
            if self.fields:
                fields = [f'{DocumentField.DATA}.{field}' for field in self.fields]
                # add the fields we need for the ResultRecord wrapper class to work
                fields.extend([DocumentField.ID, DocumentField.VERSION])
                search = search.source(fields)

            # add sorting
            if self.sorts:
                sorts = [sort.to_sort() for sort in self.sorts]
            else:
                sorts = [{DocumentField.VERSION: 'desc'}]
            # TODO: can we use _doc here? Is it safe across indexes?
            # always add the default sort to ensure search after values are unique
            sorts.extend([Sort.desc('_id').to_sort(), {'_index': 'desc'}])
            search = search.sort(*sorts)

        # add any aggregations
        for agg_name, agg in self.aggs.items():
            search.aggs[agg_name] = agg

        return search

    def run(self) -> 'SearchResponse':
        """
        Builds the search, runs it, and returns a SearchResponse object.

        :returns: a SearchResponse object
        """
        for plugin in ivds_implementations():
            plugin.vds_before_search(self)

        search = self.to_search()

        # use a multisearch to wrap the search to avoid any issues with URL length. When
        # you query a lot of indexes you can get errors because the URL contains all the
        # index names, comma separated, and it can cause a URL to be created which is
        # beyond the allowed length of Elasticsearch. Using a multisearch solves this
        # problem because the multisearch is sent to Elasticsearch as a POST request
        # with all parts of the search, including the indexes, as part of the payload
        multi_search = MultiSearch(using=es_client()).add(search)
        multi_search = multi_search.params(**self.req_params)
        result = next(iter(multi_search.execute()))
        return SearchResponse(self, result)

add_agg(name, agg_type, *args, **kwargs)

Creates an aggregation and adds it to the dict of aggregations on this request.

Parameters:

Name Type Description Default
name

the name of the aggregation

required
agg_type

the aggregation type

required
args

the aggregation arguments

()
kwargs

the aggregation kwarguments

{}

Returns:

Type Description
Source code in ckanext/versioned_datastore/lib/query/search/request.py
71
72
73
74
75
76
77
78
79
80
81
def add_agg(self, name, agg_type, *args, **kwargs):
    """
    Creates an aggregation and adds it to the dict of aggregations on this request.

    :param name: the name of the aggregation
    :param agg_type: the aggregation type
    :param args: the aggregation arguments
    :param kwargs: the aggregation kwarguments
    :returns:
    """
    self.aggs[name] = A(agg_type, *args, **kwargs)

add_param(param, value)

Add a request parameter to be passed as part of this search request. The search request is completed using the _msearch endpoint on Elasticsearch so only parameters that work on that endpoint should be added.

Parameters:

Name Type Description Default
param str

the param name

required
value Any

the param value

required
Source code in ckanext/versioned_datastore/lib/query/search/request.py
49
50
51
52
53
54
55
56
57
58
def add_param(self, param: str, value: Any):
    """
    Add a request parameter to be passed as part of this search request. The search
    request is completed using the _msearch endpoint on Elasticsearch so only
    parameters that work on that endpoint should be added.

    :param param: the param name
    :param value: the param value
    """
    self.req_params[param] = value

add_sort(field, ascending=True)

Convenience wrapper to add a sort to the sort list on the given field with the given direction.

Parameters:

Name Type Description Default
field str

the field to sort on

required
ascending bool

whether to sort in ascending (True, default) or descending (False)

True
Source code in ckanext/versioned_datastore/lib/query/search/request.py
60
61
62
63
64
65
66
67
68
69
def add_sort(self, field: str, ascending: bool = True):
    """
    Convenience wrapper to add a sort to the sort list on the given field with the
    given direction.

    :param field: the field to sort on
    :param ascending: whether to sort in ascending (True, default) or descending
        (False)
    """
    self.sorts.append(Sort(field, ascending))

get_safe_size()

The size that is actually going to be used in the request to Elasticsearch. This method returns a value based on the self.size property but capped between 0 and 1000, with a default value of 100 if self.size is set to None.

Returns:

Type Description
int

a number between 0 and 1000

Source code in ckanext/versioned_datastore/lib/query/search/request.py
109
110
111
112
113
114
115
116
117
def get_safe_size(self) -> int:
    """
    The size that is actually going to be used in the request to Elasticsearch. This
    method returns a value based on the `self.size` property but capped between 0
    and 1000, with a default value of 100 if `self.size` is set to None.

    :returns: a number between 0 and 1000
    """
    return max(0, min(100 if self.size is None else self.size, 1000))

indexes()

A list of the indexes this request will search over. This list is created from the resource_ids specified in the query.

Returns:

Type Description
List[str]

a list of index names, this could include wildcards

Source code in ckanext/versioned_datastore/lib/query/search/request.py
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def indexes(self) -> List[str]:
    """
    A list of the indexes this request will search over. This list is created from
    the resource_ids specified in the query.

    :returns: a list of index names, this could include wildcards
    """
    databases = map(get_database, self.query.resource_ids)

    if self.force_no_version or self.query.version is not None:
        # todo: could check latest version for each and optimise to latest?
        return [database.indices.wildcard for database in databases]
    else:
        return [database.indices.latest for database in databases]

run()

Builds the search, runs it, and returns a SearchResponse object.

Returns:

Type Description
SearchResponse

a SearchResponse object

Source code in ckanext/versioned_datastore/lib/query/search/request.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
def run(self) -> 'SearchResponse':
    """
    Builds the search, runs it, and returns a SearchResponse object.

    :returns: a SearchResponse object
    """
    for plugin in ivds_implementations():
        plugin.vds_before_search(self)

    search = self.to_search()

    # use a multisearch to wrap the search to avoid any issues with URL length. When
    # you query a lot of indexes you can get errors because the URL contains all the
    # index names, comma separated, and it can cause a URL to be created which is
    # beyond the allowed length of Elasticsearch. Using a multisearch solves this
    # problem because the multisearch is sent to Elasticsearch as a POST request
    # with all parts of the search, including the indexes, as part of the payload
    multi_search = MultiSearch(using=es_client()).add(search)
    multi_search = multi_search.params(**self.req_params)
    result = next(iter(multi_search.execute()))
    return SearchResponse(self, result)

set_no_results()

Sets the request size to zero and removes all pagination values.

This is useful for aggregation only results or counts where you don't need any hits.

Source code in ckanext/versioned_datastore/lib/query/search/request.py
 98
 99
100
101
102
103
104
105
106
107
def set_no_results(self):
    """
    Sets the request size to zero and removes all pagination values.

    This is useful for aggregation only results or counts where you don't need any
    hits.
    """
    self.size = 0
    self.offset = 0
    self.after = None

Builds an Elasticsearch Search object with the query, indexes, sorts, size, and aggregations set.

Returns:

Type Description
Search

a new Elasticsearch Search object

Source code in ckanext/versioned_datastore/lib/query/search/request.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
def to_search(self) -> Search:
    """
    Builds an Elasticsearch Search object with the query, indexes, sorts, size, and
    aggregations set.

    :returns: a new Elasticsearch Search object
    """
    search = (
        Search()
        .index(self.indexes())
        # we want to provide an accurate count, and damn the expense
        .extra(track_total_hits=True)
    )

    search = search.query(self.query.to_dsl())

    # add the version filter, if needed
    if not self.force_no_version and self.query.version is not None:
        search = search.filter(version_query(self.query.version))

    # add any extra filters if there are any
    if self.extra_filter.to_dict()['bool']:
        search = search.filter(self.extra_filter)

    size = self.get_safe_size()
    if size == 0:
        # an empty request
        search = search.extra(size=0)
    else:
        # add one to the size so that we can work out when there are no more hits
        size += 1
        search = search.extra(size=size)

        # only offset or after should be used
        if self.offset:
            # if from + size > 10000, running the search will fail. we still want
            # to return an error instead of silently failing, but we can return a
            # better error.
            search_window = size + self.offset
            if search_window > 10000:
                raise toolkit.ValidationError(
                    f'Search window size out of bounds: {search_window} (from: '
                    f'{self.offset}, size: {size})'
                )
            search = search.extra(from_=self.offset)
        elif self.after is not None:
            search = search.extra(search_after=self.after)

        # set the source if required
        if self.fields:
            fields = [f'{DocumentField.DATA}.{field}' for field in self.fields]
            # add the fields we need for the ResultRecord wrapper class to work
            fields.extend([DocumentField.ID, DocumentField.VERSION])
            search = search.source(fields)

        # add sorting
        if self.sorts:
            sorts = [sort.to_sort() for sort in self.sorts]
        else:
            sorts = [{DocumentField.VERSION: 'desc'}]
        # TODO: can we use _doc here? Is it safe across indexes?
        # always add the default sort to ensure search after values are unique
        sorts.extend([Sort.desc('_id').to_sort(), {'_index': 'desc'}])
        search = search.sort(*sorts)

    # add any aggregations
    for agg_name, agg in self.aggs.items():
        search.aggs[agg_name] = agg

    return search

SearchResponse dataclass

Class wrapping the Elasticsearch search response object.

Source code in ckanext/versioned_datastore/lib/query/search/request.py
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
@dataclasses.dataclass
class SearchResponse:
    """
    Class wrapping the Elasticsearch search response object.
    """

    request: SearchRequest
    response: Response

    def __post_init__(self):
        # cache the safe size just in case the request gets fiddled with after the
        # search is completed (it's unlikely, but it would be a real pig of a bug to
        # track down)
        self._request_size = self.request.get_safe_size()

    @property
    def count(self) -> int:
        """
        :returns: the number of documents which matched the search request
        """
        return self.response.hits.total.value

    @property
    def hits(self) -> List[ResultRecord]:
        """
        :returns: returns the list of ResultRecords which were returned in the request
        """
        hits = [ResultRecord(hit) for hit in self.response.hits]
        if len(self.response.hits) > self._request_size:
            # there are more results than requested due to our addition of +1 to the
            # size during the request, so trim off the last hit as it wasn't asked for
            return hits[:-1]
        else:
            return hits

    @property
    def data(self) -> List[dict]:
        """
        :returns: a list of just the data dicts for each hit
        """
        return [hit.data for hit in self.hits]

    @property
    def aggs(self) -> dict:
        """
        :returns: the aggregation results, keyed by the names specified in the request
        """
        return self.response.aggs.to_dict()

    @property
    def next_after(self) -> Optional[list]:
        """
        Returns the after value to be used to get the next set of results. If there are
        no more results to get, None is returned.

        :returns: None or a list
        """
        if self.count == 0:
            return None

        if len(self.response.hits) > self._request_size:
            if 'sort' in self.hits[-1].hit.meta:
                return list(self.hits[-1].hit.meta['sort'])

        return None

aggs property

Returns:

Type Description
dict

the aggregation results, keyed by the names specified in the request

count property

Returns:

Type Description
int

the number of documents which matched the search request

data property

Returns:

Type Description
List[dict]

a list of just the data dicts for each hit

hits property

Returns:

Type Description
List[ResultRecord]

returns the list of ResultRecords which were returned in the request

next_after property

Returns the after value to be used to get the next set of results. If there are no more results to get, None is returned.

Returns:

Type Description
Optional[list]

None or a list