# Copyright 2020 Cognite AS
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
The ``statestore`` module contains classes for keeping track of the extraction state of individual items, facilitating
incremental load and speeding up startup times.
At the beginning of a run the extractor typically calls the ``initialize`` method, which loads the states from the
remote store (which can either be a local JSON file or a table in CDF RAW), and during and/or at the end of a run, the
``synchronize`` method is called, which saves the current states to the remote store.
You can choose the back-end for your state store with which class you're instantiating:
.. code-block:: python
# A state store using a JSON file as remote storage:
states = LocalStateStore("state.json")
states.initialize()
# A state store using a RAW table as remote storage:
states = RawStateStore(
cdf_client = CogniteClient(),
database = "extractor_states",
table = "my_extractor_deployment"
)
states.initialize()
You can now use this state store to get states:
.. code-block:: python
low, high = states.get_state(external_id = "my-id")
You can set states:
.. code-block:: python
states.set_state(external_id = "another-id", high=100)
and similar for ``low``. The ``set_state(...)`` method will always overwrite the current state. Some times you might
want to only set state *if larger* than the previous state, in that case consider ``expand_state(...)``:
.. code-block:: python
# High watermark of another-id is already 100, nothing happens in this call:
states.expand_state(external_id = "another-id", high=50)
# This will set high to 150 as it is larger than the previous state
states.expand_state(external_id = "another-id", high=150)
To store the state to the remote store, use the ``synchronize()`` method:
.. code-block:: python
states.synchronize()
You can set a state store to automatically update on upload triggers from an upload queue by using the
``post_upload_function`` in the upload queue:
.. code-block:: python
states = LocalStateStore("state.json")
states.initialize()
uploader = TimeSeriesUploadQueue(
cdf_client = CogniteClient(),
max_upload_interval = 10
post_upload_function = states.post_upload_handler()
)
# The state store is now updated automatically!
states.synchronize()
"""
import json
from abc import ABC
from collections.abc import Callable, Iterator
from types import TracebackType
from typing import Any
from cognite.client import CogniteClient
from cognite.client.exceptions import CogniteAPIError
from cognite.extractorutils._inner_util import _DecimalDecoder, _DecimalEncoder
from cognite.extractorutils.threading import CancellationToken
from cognite.extractorutils.uploader import DataPointList
from cognite.extractorutils.util import cognite_exceptions, retry
from ._base import RETRIES, RETRY_BACKOFF_FACTOR, RETRY_DELAY, RETRY_MAX_DELAY, _BaseStateStore
[docs]
class AbstractStateStore(_BaseStateStore, ABC):
"""
Base class for a state store.
Args:
save_interval: Automatically trigger synchronize each m seconds when run as a thread (use start/stop
methods).
trigger_log_level: Log level to log synchronize triggers to.
thread_name: Thread name of synchronize thread.
cancellation_token: Token to cancel event from elsewhere. Cancelled when stop is called.
"""
def __init__(
self,
save_interval: int | None = None,
trigger_log_level: str = "DEBUG",
thread_name: str | None = None,
cancellation_token: CancellationToken | None = None,
):
super().__init__(
save_interval=save_interval,
trigger_log_level=trigger_log_level,
thread_name=thread_name,
cancellation_token=cancellation_token,
)
self._local_state: dict[str, dict[str, Any]] = {}
self._deleted: list[str] = []
[docs]
def get_state(self, external_id: str | list[str]) -> tuple[Any, Any] | list[tuple[Any, Any]]:
"""
Get state(s) for external ID(s)
Args:
external_id: An external ID or list of external IDs to get states for
Returns:
A tuple with (low, high) watermarks, or a list of tuples
"""
with self.lock:
if isinstance(external_id, list):
states = []
for e in external_id:
state = self._local_state.get(e, {})
states.append((state.get("low"), state.get("high")))
return states
else:
state = self._local_state.get(external_id, {})
return state.get("low"), state.get("high")
[docs]
def set_state(self, external_id: str, low: Any | None = None, high: Any | None = None) -> None:
"""
Set/update state of a singe external ID.
Args:
external_id: External ID of e.g. time series to store state of
low: Low watermark
high: High watermark
"""
with self.lock:
state = self._local_state.setdefault(external_id, {})
state["low"] = low if low is not None else state.get("low")
state["high"] = high if high is not None else state.get("high")
[docs]
def expand_state(self, external_id: str, low: Any | None = None, high: Any | None = None) -> None:
"""
Like set_state, but only sets state if the proposed state is outside the stored state. That is if e.g. low is
lower than the stored low.
Args:
external_id: External ID of e.g. time series to store state of
low: Low watermark
high: High watermark
"""
with self.lock:
state = self._local_state.setdefault(external_id, {})
state["low"] = min(state.get("low", low), low) if low is not None else state.get("low")
state["high"] = max(state.get("high", high), high) if high is not None else state.get("high")
[docs]
def delete_state(self, external_id: str) -> None:
"""
Delete an external ID from the state store.
Args:
external_id: External ID to remove
"""
with self.lock:
self._local_state.pop(external_id, None)
self._deleted.append(external_id)
[docs]
def post_upload_handler(self) -> Callable[[list[dict[str, str | DataPointList]]], None]:
"""
Get a callable suitable for passing to a time series upload queue as post_upload_function, that will
automatically update the states in this state store when that upload queue is uploading.
Returns:
A function that expands the current states with the values given
"""
def callback(uploaded_points: list[dict[str, str | DataPointList]]) -> None:
for time_series in uploaded_points:
# Use CDF timestamps
data_points = time_series["datapoints"]
if data_points:
high = max(data_points)[0]
low = min(data_points)[0]
external_id: str = time_series["externalId"] # type: ignore # known to be str from where we set it
self.expand_state(external_id, low, high)
return callback
[docs]
def outside_state(self, external_id: str, new_state: Any) -> bool:
"""
Check if a new proposed state is outside state interval (ie, if a new datapoint should be processed).
Returns true if new_state is outside of stored state or if external_id is previously unseen.
Args:
external_id: External ID to test
new_state: Proposed new state to test
Returns:
True if new_state is higher than the stored high watermark or lower than the low watermark.
"""
if external_id not in self._local_state:
return True
low, high = self.get_state(external_id)
if high is not None and new_state > high:
return True
if low is not None and new_state < low:
return True
return False
def __getitem__(self, external_id: str) -> tuple[Any, Any]:
return self.get_state(external_id) # type: ignore # will not be list if input is single str
def __setitem__(self, key: str, value: tuple[Any, Any]) -> None:
self.set_state(external_id=key, low=value[0], high=value[1])
def __contains__(self, external_id: str) -> bool:
return external_id in self._local_state
def __len__(self) -> int:
return len(self._local_state)
def __iter__(self) -> Iterator[str]:
yield from self._local_state
[docs]
class RawStateStore(AbstractStateStore):
"""
An extractor state store based on CDF RAW.
Args:
cdf_client: Cognite client to use
database: Name of CDF database
table: Name of CDF table
save_interval: Automatically trigger synchronize each m seconds when run as a thread (use start/stop
methods).
trigger_log_level: Log level to log synchronize triggers to.
thread_name: Thread name of synchronize thread.
cancellation_token: Token to cancel event from elsewhere. Cancelled when stop is called.
"""
def __init__(
self,
cdf_client: CogniteClient,
database: str,
table: str,
save_interval: int | None = None,
trigger_log_level: str = "DEBUG",
thread_name: str | None = None,
cancellation_token: CancellationToken | None = None,
):
super().__init__(save_interval, trigger_log_level, thread_name, cancellation_token)
self._cdf_client = cdf_client
self.database = database
self.table = table
self._ensure_table()
def _ensure_table(self) -> None:
@retry(
exceptions=cognite_exceptions(),
cancellation_token=self.cancellation_token,
tries=RETRIES,
delay=RETRY_DELAY,
max_delay=RETRY_MAX_DELAY,
backoff=RETRY_BACKOFF_FACTOR,
)
def impl() -> None:
try:
self._cdf_client.raw.databases.create(self.database)
except CogniteAPIError as e:
if not e.code == 400:
raise e
try:
self._cdf_client.raw.tables.create(self.database, self.table)
except CogniteAPIError as e:
if not e.code == 400:
raise e
impl()
[docs]
def initialize(self, force: bool = False) -> None:
@retry(
exceptions=cognite_exceptions(),
cancellation_token=self.cancellation_token,
tries=RETRIES,
delay=RETRY_DELAY,
max_delay=RETRY_MAX_DELAY,
backoff=RETRY_BACKOFF_FACTOR,
)
def impl() -> None:
"""
Get all known states.
Args:
force: Enable re-initialization, ie overwrite when called multiple times
"""
if self._initialized and not force:
return
rows = self._cdf_client.raw.rows.list(db_name=self.database, table_name=self.table, limit=None)
with self.lock:
self._local_state.clear()
for row in rows:
if row.key is None or row.columns is None:
self.logger.warning(f"None encountered in row: {str(row)}")
# should never happen, but type from sdk is optional
continue
self._local_state[row.key] = row.columns
self._initialized = True
impl()
[docs]
def synchronize(self) -> None:
@retry(
exceptions=cognite_exceptions(),
cancellation_token=self.cancellation_token,
tries=RETRIES,
delay=RETRY_DELAY,
max_delay=RETRY_MAX_DELAY,
backoff=RETRY_BACKOFF_FACTOR,
)
def impl() -> None:
"""
Upload local state store to CDF
"""
with self.lock:
self._cdf_client.raw.rows.insert(db_name=self.database, table_name=self.table, row=self._local_state)
# Create a copy of deleted to facilitate testing (mock library stores list, and as it changes, the
# assertions fail)
self._cdf_client.raw.rows.delete(
db_name=self.database, table_name=self.table, key=[k for k in self._deleted]
)
self._deleted.clear()
impl()
def __enter__(self) -> "RawStateStore":
"""
Wraps around start method, for use as context manager
Returns:
self
"""
self.start()
return self
def __exit__(
self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None
) -> None:
"""
Wraps around stop method, for use as context manager
Args:
exc_type: Exception type
exc_val: Exception value
exc_tb: Traceback
"""
self.stop()
[docs]
class LocalStateStore(AbstractStateStore):
"""
An extractor state store using a local JSON file as backend.
Args:
file_path: File path to JSON file to use
save_interval: Automatically trigger synchronize each m seconds when run as a thread (use start/stop
methods).
trigger_log_level: Log level to log synchronize triggers to.
thread_name: Thread name of synchronize thread.
cancellation_token: Token to cancel event from elsewhere. Cancelled when stop is called.
"""
def __init__(
self,
file_path: str,
save_interval: int | None = None,
trigger_log_level: str = "DEBUG",
thread_name: str | None = None,
cancellation_token: CancellationToken | None = None,
):
super().__init__(save_interval, trigger_log_level, thread_name, cancellation_token)
self._file_path = file_path
[docs]
def initialize(self, force: bool = False) -> None:
"""
Load states from specified JSON file
Args:
force: Enable re-initialization, ie overwrite when called multiple times
"""
if self._initialized and not force:
return
with self.lock:
try:
with open(self._file_path) as f:
self._local_state = json.load(f, cls=_DecimalDecoder)
except FileNotFoundError:
pass
except json.decoder.JSONDecodeError as e:
raise ValueError(f"Invalid JSON in state store file: {str(e)}") from e
self._initialized = True
[docs]
def synchronize(self) -> None:
"""
Save states to specified JSON file
"""
with self.lock:
with open(self._file_path, "w") as f:
json.dump(self._local_state, f, cls=_DecimalEncoder)
self._deleted.clear()
def __enter__(self) -> "LocalStateStore":
"""
Wraps around start method, for use as context manager
Returns:
self
"""
self.start()
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
"""
Wraps around stop method, for use as context manager
Args:
exc_type: Exception type
exc_val: Exception value
exc_tb: Traceback
"""
self.stop()
[docs]
class NoStateStore(AbstractStateStore):
"""
A state store that only keeps states in memory and never stores or initializes from external sources.
"""
def __init__(self) -> None:
super().__init__()
[docs]
def initialize(self, force: bool = False) -> None:
pass
[docs]
def synchronize(self) -> None:
pass