Source code for cognite.extractorutils.configtools

#  Copyright 2020 Cognite AS
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

"""
Module containing tools for loading and verifying config files, and a YAML loader to automatically serialize these
dataclasses from a config file.

Configs are described as ``dataclass``\es, and use the ``BaseConfig`` class as a superclass to get a few things built-in:
config version, Cognite project and logging. Use type hints to specify types, use the ``Optional`` type to specify that
a config parameter is optional, and give the attribute a value to give it a default.

For example, a config class for an extractor may look like the following:

.. code-block:: python

    @dataclass
    class ExtractorConfig:
        parallelism: int = 10

        state_store: Optional[StateStoreConfig]
        ...

    @dataclass
    class SourceConfig:
        host: str
        username: str
        password: str
        ...


    @dataclass
    class MyConfig(BaseConfig):
        extractor: ExtractorConfig
        source: SourceConfig

You can then load a YAML file into this dataclass with the `load_yaml` function:

.. code-block:: python

    with open("config.yaml") as infile:
        config: MyConfig = load_yaml(infile, MyConfig)

The config object can additionally do several things, such as:

Creating a ``CogniteClient`` based on the config:

.. code-block:: python

    client = config.cognite.get_cognite_client("my-client")

Setup the logging according to the config:

.. code-block:: python

    config.logger.setup_logging()

Start and stop threads to automatically push all the prometheus metrics in the default prometheus registry to the
configured push-gateways:

.. code-block:: python

    config.metrics.start_pushers(client)

    # Extractor code

    config.metrics.stop_pushers()

Get a state store object as configured:

.. code-block:: python

    states = config.extractor.state_store.create_state_store()

However, all of these things will be automatically done for you if you are using the base Extractor class.
"""

import logging
import os
import re
import time
from dataclasses import dataclass
from enum import Enum
from logging.handlers import TimedRotatingFileHandler
from threading import Event
from time import sleep
from typing import Any, Dict, List, Optional, T, TextIO, Type, TypeVar, Union
from urllib.parse import urljoin

import dacite
import yaml

from cognite.client import CogniteClient
from cognite.client.data_classes import Asset, DataSet, ExtractionPipeline
from cognite.client.exceptions import CogniteNotFoundError

from .authentication import AuthenticatorConfig
from .exceptions import InvalidConfigError
from .logging_prometheus import export_log_stats_on_root_logger
from .metrics import AbstractMetricsPusher, CognitePusher, PrometheusPusher
from .statestore import AbstractStateStore, LocalStateStore, NoStateStore, RawStateStore
from .util import EitherId

_logger = logging.getLogger(__name__)


def _to_snake_case(dictionary: Dict[str, Any], case_style: str) -> Dict[str, Any]:
    """
    Ensure that all keys in the dictionary follows the snake casing convention (recursively, so any sub-dictionaries are
    changed too).

    Args:
        dictionary: Dictionary to update.
        case_style: Existing casing convention. Either 'snake', 'hyphen' or 'camel'.

    Returns:
        An updated dictionary with keys in the given convention.
    """

    def fix_list(list_, key_translator):
        if list_ is None:
            return []

        new_list = [None] * len(list_)
        for i, element in enumerate(list_):
            if isinstance(element, dict):
                new_list[i] = fix_dict(element, key_translator)
            elif isinstance(element, list):
                new_list[i] = fix_list(element, key_translator)
            else:
                new_list[i] = element
        return new_list

    def fix_dict(dict_, key_translator):
        if dict_ is None:
            return {}

        new_dict = {}
        for key in dict_:
            if isinstance(dict_[key], dict):
                new_dict[key_translator(key)] = fix_dict(dict_[key], key_translator)
            elif isinstance(dict_[key], list):
                new_dict[key_translator(key)] = fix_list(dict_[key], key_translator)
            else:
                new_dict[key_translator(key)] = dict_[key]
        return new_dict

    def translate_hyphen(key):
        return key.replace("-", "_")

    def translate_camel(key):
        return re.sub(r"([A-Z]+)", r"_\1", key).strip("_").lower()

    if case_style == "snake" or case_style == "underscore":
        return dictionary
    elif case_style == "hyphen" or case_style == "kebab":
        return fix_dict(dictionary, translate_hyphen)
    elif case_style == "camel" or case_style == "pascal":
        return fix_dict(dictionary, translate_camel)
    else:
        raise ValueError(f"Invalid case style: {case_style}")


@dataclass
class EitherIdConfig:
    id: Optional[int]
    external_id: Optional[str]

    @property
    def either_id(self) -> EitherId:
        return EitherId(id=self.id, external_id=self.external_id)


[docs]@dataclass class CogniteConfig: """ Configuration parameters for CDF connection, such as project name, host address and API key """ project: str api_key: Optional[str] idp_authentication: Optional[AuthenticatorConfig] data_set: Optional[EitherIdConfig] data_set_id: Optional[int] data_set_external_id: Optional[str] extraction_pipeline: Optional[EitherIdConfig] timeout: int = 30 external_id_prefix: str = "" host: str = "https://api.cognitedata.com" def get_cognite_client(self, client_name: str, token_custom_args: Optional[Dict[str, str]] = None) -> CogniteClient: kwargs = {} if self.api_key: kwargs["api_key"] = self.api_key elif self.idp_authentication: if self.idp_authentication.token_url: kwargs["token_url"] = self.idp_authentication.token_url elif self.idp_authentication.tenant: base_url = urljoin(self.idp_authentication.authority, self.idp_authentication.tenant) kwargs["token_url"] = f"{base_url}/oauth2/v2.0/token" kwargs["token_client_id"] = self.idp_authentication.client_id kwargs["token_client_secret"] = self.idp_authentication.secret kwargs["token_scopes"] = self.idp_authentication.scopes if token_custom_args is None: token_custom_args = {} if self.idp_authentication.resource: token_custom_args["resource"] = self.idp_authentication.resource kwargs["token_custom_args"] = token_custom_args else: raise InvalidConfigError("No CDF credentials") return CogniteClient( project=self.project, base_url=self.host, client_name=client_name, disable_pypi_version_check=True, timeout=self.timeout, **kwargs, ) def get_data_set(self, cdf_client: CogniteClient) -> Optional[DataSet]: if self.data_set_external_id: logging.getLogger(__name__).warning( "Using data-set-external-id is deprecated, please use data-set-id/external-id instead" ) return cdf_client.data_sets.retrieve(external_id=self.data_set_external_id) if self.data_set_id: logging.getLogger(__name__).warning("Using data-set-id is deprecated, please use data-set/id instead") return cdf_client.data_sets.retrieve(external_id=self.data_set_external_id) if not self.data_set: return None return cdf_client.data_sets.retrieve( id=self.data_set.either_id.internal_id, external_id=self.data_set.either_id.external_id ) def get_extraction_pipeline(self, cdf_client: CogniteClient) -> Optional[ExtractionPipeline]: if not self.extraction_pipeline: return None return cdf_client.extraction_pipelines.retrieve( id=self.extraction_pipeline.either_id.internal_id, external_id=self.extraction_pipeline.either_id.external_id, )
@dataclass class _ConsoleLoggingConfig: level: str = "INFO" @dataclass class _FileLoggingConfig: path: str level: str = "INFO" retention: int = 7
[docs]@dataclass class LoggingConfig: """ Logging settings, such as log levels and path to log file """ console: Optional[_ConsoleLoggingConfig] file: Optional[_FileLoggingConfig] # enables metrics on the number of log messages recorded (per logger and level) # In order to collect/see result MetricsConfig should be set as well, so metrics are propagated to # Prometheus and/or Cognite metrics: Optional[bool] = False def setup_logging(self, suppress_console=False) -> None: """ Sets up the default logger in the logging package to be configured as defined in this config object Args: suppress_console: Don't log to console regardless of config. Useful when running an extractor as a Windows service """ fmt = logging.Formatter( "%(asctime)s.%(msecs)03d UTC [%(levelname)-8s] %(threadName)s - %(message)s", "%Y-%m-%d %H:%M:%S", ) # Set logging to UTC fmt.converter = time.gmtime root = logging.getLogger() if self.metrics: export_log_stats_on_root_logger(root) if self.console and not suppress_console: console_handler = logging.StreamHandler() console_handler.setLevel(self.console.level) console_handler.setFormatter(fmt) root.addHandler(console_handler) if root.getEffectiveLevel() > console_handler.level: root.setLevel(console_handler.level) if self.file: file_handler = TimedRotatingFileHandler( filename=self.file.path, when="midnight", utc=True, backupCount=self.file.retention, ) file_handler.setLevel(self.file.level) file_handler.setFormatter(fmt) root.addHandler(file_handler) if root.getEffectiveLevel() > file_handler.level: root.setLevel(file_handler.level)
@dataclass class _PushGatewayConfig: host: str job_name: str username: Optional[str] password: Optional[str] clear_after: Optional[int] push_interval: int = 30 @dataclass class _CogniteMetricsConfig: external_id_prefix: str asset_name: Optional[str] asset_external_id: Optional[str] push_interval: int = 30
[docs]@dataclass class MetricsConfig: """ Destination(s) for metrics, including options for one or several Prometheus push gateways, and pushing as CDF Time Series. """ push_gateways: Optional[List[_PushGatewayConfig]] cognite: Optional[_CogniteMetricsConfig] def start_pushers(self, cdf_client: CogniteClient, cancelation_token: Event = Event()) -> None: self._pushers: List[AbstractMetricsPusher] = [] self._clear_on_stop: Dict[PrometheusPusher, int] = {} push_gateways = self.push_gateways or [] for counter, push_gateway in enumerate(push_gateways): pusher = PrometheusPusher( job_name=push_gateway.job_name, username=push_gateway.username, password=push_gateway.password, url=push_gateway.host, push_interval=push_gateway.push_interval, thread_name=f"MetricsPusher_{counter}", cancelation_token=cancelation_token, ) pusher.start() self._pushers.append(pusher) if push_gateway.clear_after is not None: self._clear_on_stop[pusher] = push_gateway.clear_after if self.cognite: asset = None if self.cognite.asset_name is not None: asset = Asset(name=self.cognite.asset_name, external_id=self.cognite.asset_external_id) pusher = CognitePusher( cdf_client=cdf_client, external_id_prefix=self.cognite.external_id_prefix, push_interval=self.cognite.push_interval, asset=asset, thread_name="CogniteMetricsPusher", # There is only one Cognite project as a target cancelation_token=cancelation_token, ) pusher.start() self._pushers.append(pusher) def stop_pushers(self) -> None: pushers = self.__dict__.get("_pushers") or [] for pusher in pushers: pusher.stop() if len(self._clear_on_stop) > 0: wait_time = max(self._clear_on_stop.values()) _logger.debug("Waiting %d seconds before clearing gateways", wait_time) sleep(wait_time) for pusher in self._clear_on_stop.keys(): pusher.clear_gateway()
[docs]@dataclass class BaseConfig: """ Basis for an extractor config, containing config version, ``CogniteConfig`` and ``LoggingConfig`` """ version: Optional[Union[str, int]] cognite: CogniteConfig logger: LoggingConfig
[docs]@dataclass class RawDestinationConfig: database: str table: str
[docs]@dataclass class RawStateStoreConfig(RawDestinationConfig): upload_interval: int = 30
[docs]@dataclass class LocalStateStoreConfig: path: str save_interval: int = 30
[docs]@dataclass class StateStoreConfig: raw: Optional[RawStateStoreConfig] = None local: Optional[LocalStateStoreConfig] = None def create_state_store( self, cdf_client: Optional[CogniteClient] = None, default_to_local: bool = True ) -> AbstractStateStore: """ Create a state store object based on the config. Args: cdf_client: CogniteClient object to use in case of a RAW state store (ignored otherwise) default_to_local: If true, return a LocalStateStore if no state store is configured. Otherwise return a NoStateStore Returns: An (uninitialized) state store """ if self.raw and self.local: raise ValueError("Only one state store can be used simultaneously") if self.raw: if cdf_client is None: raise TypeError("A cognite client object must be provided when state store is RAW") return RawStateStore( cdf_client=cdf_client, database=self.raw.database, table=self.raw.table, save_interval=self.raw.upload_interval, ) if self.local: return LocalStateStore(file_path=self.local.path, save_interval=self.local.save_interval) if default_to_local: return LocalStateStore(file_path="states.json") else: return NoStateStore()
CustomConfigClass = TypeVar("CustomConfigClass", bound=BaseConfig)
[docs]def load_yaml( source: Union[TextIO, str], config_type: Type[CustomConfigClass], case_style: str = "hyphen", expand_envvars=True ) -> CustomConfigClass: """ Read a YAML file, and create a config object based on its contents. Args: source: Input stream (as returned by open(...)) or string containing YAML. config_type: Class of config type (i.e. your custom subclass of BaseConfig). case_style: Casing convention of config file. Valid options are 'snake', 'hyphen' or 'camel'. Should be 'hyphen'. expand_envvars: Substitute values with the pattern ${VAR} with the content of the environment variable VAR Returns: An initialized config object. Raises: InvalidConfigError: If any config field is given as an invalid type, is missing or is unknown """ def env_constructor(_: yaml.SafeLoader, node): bool_values = { "true": True, "false": False, } expanded_value = os.path.expandvars(node.value) return bool_values.get(expanded_value.lower(), expanded_value) class EnvLoader: pass class EnvLoader(yaml.SafeLoader): pass EnvLoader.add_implicit_resolver("!env", re.compile(r"\$\{([^}^{]+)\}"), None) EnvLoader.add_constructor("!env", env_constructor) loader = EnvLoader if expand_envvars else yaml.SafeLoader # Safe to use load instead of safe_load since both loader classes are based on SafeLoader config_dict = yaml.load(source, Loader=loader) config_dict = _to_snake_case(config_dict, case_style) try: config = dacite.from_dict( data=config_dict, data_class=config_type, config=dacite.Config(strict=True, cast=[Enum]) ) except (dacite.WrongTypeError, dacite.MissingValueError, dacite.UnionMatchError, dacite.UnexpectedDataError) as e: raise InvalidConfigError(str(e)) except dacite.ForwardReferenceError as e: raise ValueError(f"Invalid config class: {str(e)}") return config