Source code for cognite.extractorutils.base
# Copyright 2021 Cognite AS
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
import sys
from collections.abc import Callable
from dataclasses import is_dataclass
from enum import Enum
from threading import Thread
from types import TracebackType
from typing import Any, Generic, TypeVar
from dotenv import find_dotenv, load_dotenv
from cognite.client import CogniteClient
from cognite.client.data_classes import ExtractionPipeline, ExtractionPipelineRun
from cognite.extractorutils.configtools import BaseConfig, ConfigResolver, StateStoreConfig
from cognite.extractorutils.exceptions import InvalidConfigError
from cognite.extractorutils.metrics import BaseMetrics
from cognite.extractorutils.statestore import AbstractStateStore, LocalStateStore, NoStateStore
from cognite.extractorutils.threading import CancellationToken
class ReloadConfigAction(Enum):
DO_NOTHING = 1
REPLACE_ATTRIBUTE = 2
SHUTDOWN = 3
CALLBACK = 4
CustomConfigClass = TypeVar("CustomConfigClass", bound=BaseConfig)
RunHandle = Callable[[CogniteClient, AbstractStateStore, CustomConfigClass, CancellationToken], None]
[docs]
class Extractor(Generic[CustomConfigClass]):
"""
Base class for extractors.
When used as a context manager, the Extractor class will parse command line arguments, load a configuration file,
set up everything needed for the extractor to run, and call the ``run_handle``. If the extractor raises an
exception, the exception will be handled by the Extractor class and logged and reported as an error.
Args:
name: Name of the extractor, how it's invoked from the command line.
description: A short 1-2 sentence description of the extractor.
version: Version number, following semantic versioning.
run_handle: A function to call when setup is done that runs the extractor, taking a cognite client, state store
config object and a shutdown event as arguments.
config_class: A class (based on the BaseConfig class) that defines the configuration schema for the extractor
metrics: Metrics collection, a default one with be created if omitted.
use_default_state_store: Create a simple instance of the LocalStateStore to provide to the run handle. If false
a NoStateStore will be created in its place.
cancellation_token: An event that will be set when the extractor should shut down, an empty one will be created
if omitted.
config_file_path: If supplied, the extractor will not use command line arguments to get a config file, but
rather use the supplied path.
continuous_extractor: If True, extractor will both successful start and end time. Else, only show run on exit.
heartbeat_waiting_time: Time interval between each heartbeat to the extraction pipeline in seconds.
"""
_config_singleton: CustomConfigClass | None = None
_statestore_singleton: AbstractStateStore | None = None
def __init__(
self,
*,
name: str,
description: str,
version: str | None = None,
run_handle: RunHandle | None = None,
config_class: type[CustomConfigClass],
metrics: BaseMetrics | None = None,
use_default_state_store: bool = True,
cancellation_token: CancellationToken | None = None,
config_file_path: str | None = None,
continuous_extractor: bool = False,
heartbeat_waiting_time: int = 600,
handle_interrupts: bool = True,
reload_config_interval: int | None = 300,
reload_config_action: ReloadConfigAction = ReloadConfigAction.DO_NOTHING,
):
self.name = name
self.description = description
self.run_handle = run_handle
self.config_class = config_class
self.use_default_state_store = use_default_state_store
self.version = version or "unknown"
self.cancellation_token = cancellation_token.create_child_token() if cancellation_token else CancellationToken()
self.config_file_path = config_file_path
self.continuous_extractor = continuous_extractor
self.heartbeat_waiting_time = heartbeat_waiting_time
self.handle_interrupts = handle_interrupts
self.reload_config_interval = reload_config_interval
self.reload_config_action = reload_config_action
self.started = False
self.configured_logger = False
self.cognite_client: CogniteClient
self.state_store: AbstractStateStore
self.config: CustomConfigClass
self.extraction_pipeline: ExtractionPipeline | None
self.logger: logging.Logger
self.should_be_restarted = False
if metrics:
self.metrics = metrics
else:
self.metrics = BaseMetrics(extractor_name=name, extractor_version=self.version)
def _initial_load_config(self, override_path: str | None = None) -> None:
"""
Load a configuration file, either from the specified path, or by a path specified by the user in a command line
arg. Will quit further execution of no path is given.
Args:
override_path: Optional override for file path, ie don't parse command line arguments
"""
if override_path:
self.config_resolver = ConfigResolver(override_path, self.config_class)
else:
self.config_resolver = ConfigResolver.from_cli(self.name, self.description, self.version, self.config_class)
self.config = self.config_resolver.config
Extractor._config_singleton = self.config # type: ignore
def config_refresher() -> None:
while not self.cancellation_token.is_cancelled:
self.cancellation_token.wait(self.reload_config_interval)
if self.config_resolver.has_changed:
self._reload_config()
if self.reload_config_interval and self.reload_config_action != ReloadConfigAction.DO_NOTHING:
Thread(target=config_refresher, name="ConfigReloader", daemon=True).start()
[docs]
def reload_config_callback(self) -> None:
self.logger.error("Method for reloading configs has not been overridden in subclass")
def _reload_config(self) -> None:
self.logger.info("Config file has changed")
if self.reload_config_action is ReloadConfigAction.REPLACE_ATTRIBUTE:
self.logger.info("Loading in new config file")
self.config_resolver.accept_new_config()
self.config = self.config_resolver.config
Extractor._config_singleton = self.config # type: ignore
elif self.reload_config_action is ReloadConfigAction.SHUTDOWN:
self.logger.info("Shutting down, expecting to be restarted")
self.cancellation_token.cancel()
elif self.reload_config_action is ReloadConfigAction.CALLBACK:
self.logger.info("Loading in new config file")
self.config_resolver.accept_new_config()
self.config = self.config_resolver.config
self.reload_config_callback()
def _load_state_store(self) -> None:
"""
Searches through the config object for a StateStoreConfig. If found, it will use that configuration to generate
a state store, if no such config is found it will either create a LocalStateStore or a NoStateStore depending
on whether the ``use_default_state_store`` argument to the constructor was true or false.
Either way, the state_store attribute is guaranteed to be set after calling this method.
"""
def recursive_find_state_store(d: dict[str, Any]) -> StateStoreConfig | None:
for k in d:
if is_dataclass(d[k]):
res = recursive_find_state_store(d[k].__dict__)
if res:
return res
if isinstance(d[k], StateStoreConfig):
return d[k]
return None
state_store_config = recursive_find_state_store(self.config.__dict__)
if state_store_config:
self.state_store = state_store_config.create_state_store(
cdf_client=self.cognite_client,
default_to_local=self.use_default_state_store,
cancellation_token=self.cancellation_token,
)
else:
self.state_store = (
LocalStateStore("states.json", cancellation_token=self.cancellation_token)
if self.use_default_state_store
else NoStateStore()
)
try:
self.state_store.initialize()
except ValueError:
self.logger.exception("Could not load state store, using an empty state store as default")
Extractor._statestore_singleton = self.state_store
def _report_success(self) -> None:
"""
Called on a successful exit of the extractor
"""
if self.extraction_pipeline:
self.logger.info("Reporting new successful run")
self.cognite_client.extraction_pipelines.runs.create(
ExtractionPipelineRun(
extpipe_external_id=self.extraction_pipeline.external_id,
status="success",
message="Successful shutdown",
)
)
def _report_error(self, exception: BaseException) -> None:
"""
Called on an unsuccessful exit of the extractor
Args:
exception: Exception object that caused the extractor to fail
"""
self.logger.error("Unexpected error during extraction", exc_info=exception)
if self.extraction_pipeline:
message = f"{type(exception).__name__}: {str(exception)}"[:1000]
self.logger.info(f"Reporting new failed run: {message}")
self.cognite_client.extraction_pipelines.runs.create(
ExtractionPipelineRun(
extpipe_external_id=self.extraction_pipeline.external_id, status="failure", message=message
)
)
def __enter__(self) -> "Extractor":
"""
Configures and initializes the global logger, cognite client, loads config file, state store, etc.
Returns:
self
"""
if str(os.getenv("COGNITE_FUNCTION_RUNTIME", False)).lower() != "true":
# Environment Variables
env_file_path = find_dotenv(usecwd=True)
if env_file_path:
load_dotenv(dotenv_path=env_file_path, override=True)
dotenv_message = f"Successfully ingested environment variables from {env_file_path}"
else:
dotenv_message = "No .env file found"
else:
dotenv_message = "No .env file imported when using Cognite Functions"
try:
self._initial_load_config(override_path=self.config_file_path)
except InvalidConfigError as e:
print("Critical error: Could not read config file", file=sys.stderr) # noqa: T201
print(str(e), file=sys.stderr) # noqa: T201
sys.exit(1)
if not self.configured_logger:
self.config.logger.setup_logging()
self.configured_logger = True
self.logger = logging.getLogger(__name__)
self.logger.info(dotenv_message)
self.logger.info(f"Loaded {'remote' if self.config_resolver.is_remote else 'local'} config file")
if self.handle_interrupts:
self.cancellation_token.cancel_on_interrupt()
self.cognite_client = self.config.cognite.get_cognite_client(self.name)
self._load_state_store()
self.state_store.start()
self.extraction_pipeline = self.config.cognite.get_extraction_pipeline(self.cognite_client)
try:
self.config.metrics.start_pushers(self.cognite_client) # type: ignore
except AttributeError:
pass
def heartbeat_loop() -> None:
while not self.cancellation_token.is_cancelled:
self.cancellation_token.wait(self.heartbeat_waiting_time)
if not self.cancellation_token.is_cancelled:
self.logger.info("Reporting new heartbeat")
try:
self.cognite_client.extraction_pipelines.runs.create(
ExtractionPipelineRun(
extpipe_external_id=self.extraction_pipeline.external_id, # type: ignore
status="seen",
)
)
except Exception:
self.logger.exception("Failed to report heartbeat")
if self.extraction_pipeline:
self.logger.info("Starting heartbeat loop")
Thread(target=heartbeat_loop, name="HeartbeatLoop", daemon=True).start()
else:
self.logger.info("No extraction pipeline configured")
if self.extraction_pipeline and self.continuous_extractor:
self.cognite_client.extraction_pipelines.runs.create(
ExtractionPipelineRun(
extpipe_external_id=self.extraction_pipeline.external_id,
status="success",
message=f"New startup of {self.name}",
)
)
self.started = True
return self
def __exit__(
self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None
) -> bool:
"""
Shuts down the extractor. Makes sure states are preserved, that all uploads of data and metrics are done, etc.
Args:
exc_type: Will be provided by runtime. If an unhandled exception occurred, this will be the type of that
exception.
exc_val: Will be provided by runtime. If an unhandled exception occurred, this will be the instance of that
exception.
exc_tb: Will be provided by runtime. If an unhandled exception occurred, this will be the stack trace of
that exception.
Returns:
True if the extractor shut down cleanly, False if the extractor was shut down due to an unhandled error
"""
self.cancellation_token.cancel()
if self.state_store:
self.state_store.synchronize()
try:
self.config.metrics.stop_pushers() # type: ignore
except AttributeError:
pass
if exc_val:
self._report_error(exc_val)
else:
self._report_success()
self.started = False
return exc_val is None
[docs]
def run(self) -> None:
"""
Run the extractor. Ensures that the Extractor is set up correctly (``run`` called within a ``with``) and calls
the ``run_handle``.
Can be overrided in subclasses.
"""
if not self.started:
raise ValueError("You must run the extractor in a context manager")
if self.run_handle:
self.run_handle(self.cognite_client, self.state_store, self.config, self.cancellation_token)
else:
raise ValueError("No run_handle defined")
[docs]
@classmethod
def get_current_config(cls) -> CustomConfigClass:
if Extractor._config_singleton is None: # type: ignore
raise ValueError("No config singleton created. Have a config file been loaded?")
return Extractor._config_singleton # type: ignore
[docs]
@classmethod
def get_current_statestore(cls) -> AbstractStateStore:
if Extractor._statestore_singleton is None:
raise ValueError("No state store singleton created. Have a state store been loaded?")
return Extractor._statestore_singleton