Skip to content

Stats

ImportStats

Bases: DomainObject

Object for holding resource import stats.

Source code in ckanext/versioned_datastore/model/stats.py
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
class ImportStats(DomainObject):
    """
    Object for holding resource import stats.
    """

    @classmethod
    def get(cls, stats_id: int) -> Optional['ImportStats']:
        return Session.query(cls).get(stats_id)

    def update(self, **kwargs):
        for field, value in kwargs.items():
            setattr(self, field, value)
        try:
            self.save()
        except InvalidRequestError:
            self.commit()

    @classmethod
    def begin(cls, resource_id: str, stat_type: str):
        stats = cls(
            resource_id=resource_id,
            type=stat_type,
            in_progress=True,
            start=datetime.now(timezone.utc),
        )
        stats.save()
        return stats

    @classmethod
    @contextmanager
    def track(cls, resource_id: str, stat_type: str):
        stats = cls.begin(resource_id, stat_type)
        try:
            yield stats
        except Exception as e:
            stats.mark_error(e)
            raise
        finally:
            stats.finish()

    def mark_error(self, error: Union[Exception, str]):
        """
        Marks the ImportStats object as having finished with an error. Just the error
        message is stored against the ImportStats object. "in_progress", "duration" and
        "end" are also updated.

        :param error: the exception object or string message
        """
        if isinstance(error, Exception):
            str_error = str(format_exception_only(type(error), error)[-1].strip())
        else:
            str_error = error
        self.update(error=str_error)
        self.finish()

    def finish(self):
        end = datetime.now(timezone.utc)
        self.update(
            in_progress=False, end=end, duration=(end - self.start).total_seconds()
        )

mark_error(error)

Marks the ImportStats object as having finished with an error. Just the error message is stored against the ImportStats object. "in_progress", "duration" and "end" are also updated.

Parameters:

Name Type Description Default
error Union[Exception, str]

the exception object or string message

required
Source code in ckanext/versioned_datastore/model/stats.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def mark_error(self, error: Union[Exception, str]):
    """
    Marks the ImportStats object as having finished with an error. Just the error
    message is stored against the ImportStats object. "in_progress", "duration" and
    "end" are also updated.

    :param error: the exception object or string message
    """
    if isinstance(error, Exception):
        str_error = str(format_exception_only(type(error), error)[-1].strip())
    else:
        str_error = error
    self.update(error=str_error)
    self.finish()

get_all_stats(resource_id)

Retrieves and returns all the ImportStats from the database associated with the given resource. They are ordered by ID descending which will result in the newest results coming back first.

Parameters:

Name Type Description Default
resource_id

the id of the resource

required

Returns:

Type Description

a Query object which can be iterated over to retrieve all the results

Source code in ckanext/versioned_datastore/model/stats.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
def get_all_stats(resource_id):
    """
    Retrieves and returns all the ImportStats from the database associated with the
    given resource. They are ordered by ID descending which will result in the newest
    results coming back first.

    :param resource_id: the id of the resource
    :returns: a Query object which can be iterated over to retrieve all the results
    """
    return list(
        model.Session.query(ImportStats)
        .filter(ImportStats.resource_id == resource_id)
        .order_by(desc(ImportStats.id))
    )

get_last_ingest(resource_id)

Retrieve the last ingest stat object from the database, or None if there aren't any.

Parameters:

Name Type Description Default
resource_id

the resource id

required

Returns:

Type Description

an ImportStats object or None

Source code in ckanext/versioned_datastore/model/stats.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def get_last_ingest(resource_id):
    """
    Retrieve the last ingest stat object from the database, or None if there aren't any.

    :param resource_id: the resource id
    :returns: an ImportStats object or None
    """
    return (
        model.Session.query(ImportStats)
        .filter(ImportStats.resource_id == resource_id)
        .filter(ImportStats.type == INGEST)
        .order_by(ImportStats.version.desc())
        .first()
    )