# Copyright 2020 Cognite AS
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
The ``util`` package contains miscellaneous functions and classes that can some times be useful while developing
extractors.
"""
import logging
import random
import signal
import threading
from functools import partial, wraps
from threading import Event, Thread
from time import time
from typing import Any, Dict, Generator, Iterable, Optional, Tuple, Type, Union
from cognite.client import CogniteClient
from cognite.client.data_classes import Asset, ExtractionPipelineRun, TimeSeries
from cognite.client.exceptions import CogniteNotFoundError
from decorator import decorator
def _ensure(endpoint: Any, items: Iterable[Any]) -> None:
try:
external_ids = [ts.external_id for ts in items]
# Not doing anything with the result, only want to make sure they exist. This will throw an exception if not.
endpoint.retrieve_multiple(external_ids=external_ids)
except CogniteNotFoundError as e:
# Create the missing time series
external_ids = [obj["externalId"] for obj in e.not_found]
create_these = [ts for ts in items if ts.external_id in external_ids]
endpoint.create(create_these)
[docs]def ensure_time_series(cdf_client: CogniteClient, time_series: Iterable[TimeSeries]) -> None:
"""
Ensure that all the given time series exists in CDF.
Searches through the tenant after the external IDs of the time series given, and creates those that are missing.
Args:
cdf_client: Tenant to create time series in
time_series: Time series to create
"""
_ensure(cdf_client.time_series, time_series)
[docs]def ensure_assets(cdf_client: CogniteClient, assets: Iterable[Asset]) -> None:
"""
Ensure that all the given assets exists in CDF.
Searches through the tenant after the external IDs of the assets given, and creates those that are missing.
Args:
cdf_client: Tenant to create assets in
assets: Assets to create
"""
_ensure(cdf_client.assets, assets)
[docs]def set_event_on_interrupt(stop_event: Event) -> None:
"""
Set given event on SIGINT (Ctrl-C) instead of throwing a KeyboardInterrupt exception.
Args:
stop_event: Event to set
"""
def sigint_handler(sig, frame):
logger = logging.getLogger(__name__)
logger.warning("Interrupt signal received, stopping extractor gracefully")
stop_event.set()
logger.info("Waiting for threads to complete. Send another interrupt to force quit.")
signal.signal(signal.SIGINT, signal.default_int_handler)
try:
signal.signal(signal.SIGINT, sigint_handler)
except ValueError as e:
logging.getLogger(__name__).warning(f"Could not register handler for interrupt signals: {str(e)}")
[docs]class EitherId:
"""
Class representing an ID in CDF, which can either be an external or internal ID. An EitherId can only hold one ID
type, not both.
Args:
id: Internal ID
externalId or external_id: external ID
Raises:
TypeError: If none of both of id types are set.
"""
def __init__(self, **kwargs):
internal_id = kwargs.get("id")
external_id = kwargs.get("externalId") or kwargs.get("external_id")
if internal_id is None and external_id is None:
raise TypeError("Either id or external_id must be set")
if internal_id is not None and external_id is not None:
raise TypeError("Only one of id and external_id can be set")
self.internal_id = internal_id
self.external_id = external_id
[docs] def type(self) -> str:
"""
Get the type of the ID
Returns:
'id' if the EitherId represents an internal ID, 'externalId' if the EitherId represents an external ID
"""
return "id" if self.internal_id is not None else "externalId"
[docs] def content(self) -> Union[int, str]:
"""
Get the value of the ID
Returns:
The ID
"""
return self.internal_id or self.external_id
def __eq__(self, other: Any) -> bool:
"""
Compare with another object. Only returns true if other is an EitherId with the same type and content
Args:
other: Another object
Returns:
True if the other object is equal to this
"""
if not isinstance(other, EitherId):
return False
return self.internal_id == other.internal_id and self.external_id == other.external_id
def __hash__(self) -> int:
"""
Returns a hash of the internal or external ID
Returns:
Hash code of ID
"""
return hash((self.internal_id, self.external_id))
def __str__(self) -> str:
"""
Get a string representation of the EitherId on the format "type: content".
Returns:
A string rep of the EitherId
"""
return "{}: {}".format(self.type(), self.content())
def __repr__(self) -> str:
"""
Get a string representation of the EitherId on the format "type: content".
Returns:
A string rep of the EitherId
"""
return self.__str__()
[docs]def throttled_loop(target_time: int, cancellation_token: Event) -> Generator[None, None, None]:
"""
A loop generator that automatically sleeps until each iteration has taken the desired amount of time. Useful for
when you want to avoid overloading a source system with requests.
Example:
This example will throttle printing to only print every 10th second:
.. code-block:: python
for _ in throttled_loop(10, stop_event):
print("Hello every 10 seconds!")
Args:
target_time: How long (in seconds) an iteration should take om total
cancellation_token: An Event object that will act as the stop event. When set, the loop will stop.
Returns:
A generator that will only yield when the target iteration time is met
"""
logger = logging.getLogger(__name__)
while not cancellation_token.is_set():
start_time = time()
yield
iteration_time = time() - start_time
if iteration_time > target_time:
logger.warning("Iteration time longer than target time, will not sleep")
else:
logger.debug(f"Iteration took {iteration_time:.1f} s, sleeping {target_time - iteration_time:.1f} s")
cancellation_token.wait(target_time - iteration_time)
def _retry_internal(
f,
cancellation_token: threading.Event = threading.Event(),
exceptions: Iterable[Type[Exception]] = Exception,
tries: int = -1,
delay: float = 0,
max_delay: Optional[float] = None,
backoff: float = 1,
jitter: Union[float, Tuple[float, float]] = 0,
):
logger = logging.getLogger(__name__)
while tries and not cancellation_token.is_set():
try:
return f()
except exceptions as e:
tries -= 1
if not tries:
raise
if logger is not None:
logger.warning("%s, retrying in %s seconds...", e, delay)
cancellation_token.wait(delay)
delay *= backoff
if isinstance(jitter, tuple):
delay += random.uniform(*jitter)
else:
delay += jitter
if max_delay is not None:
delay = min(delay, max_delay)
[docs]def retry(
cancellation_token: threading.Event = threading.Event(),
exceptions: Iterable[Type[Exception]] = Exception,
tries: int = -1,
delay: float = 0,
max_delay: Optional[float] = None,
backoff: float = 1,
jitter: Union[float, Tuple[float, float]] = 0,
):
"""
Returns a retry decorator.
This is adapted from https://github.com/invl/retry
Args:
cancellation_token: a threading token that is waited on.
exceptions: an exception or a tuple of exceptions to catch. default: Exception.
tries: the maximum number of attempts. default: -1 (infinite).
delay: initial delay between attempts. default: 0.
max_delay: the maximum value of delay. default: None (no limit).
backoff: multiplier applied to delay between attempts. default: 1 (no backoff).
jitter: extra seconds added to delay between attempts. default: 0.
fixed if a number, random if a range tuple (min, max)
logger: logger.warning(fmt, error, delay) will be called on failed attempts.
default: retry.logging_logger. if None, logging is disabled.
"""
@decorator
def retry_decorator(f, *fargs, **fkwargs):
args = fargs if fargs else list()
kwargs = fkwargs if fkwargs else dict()
return _retry_internal(
partial(f, *args, **kwargs),
cancellation_token,
exceptions,
tries,
delay,
max_delay,
backoff,
jitter,
)
return retry_decorator