Package reference

Base class for extractors

class cognite.extractorutils.Extractor(*, name: str, description: str, version: str | None = None, run_handle: ~typing.Callable[[~cognite.client._cognite_client.CogniteClient, ~cognite.extractorutils.statestore.AbstractStateStore, ~cognite.extractorutils.base.CustomConfigClass, ~threading.Event], None] | None = None, config_class: ~typing.Type[~cognite.extractorutils.base.CustomConfigClass], metrics: ~cognite.extractorutils.metrics.BaseMetrics | None = None, use_default_state_store: bool = True, cancellation_token: ~threading.Event = <threading.Event object>, config_file_path: str | None = None, continuous_extractor: bool = False, heartbeat_waiting_time: int = 600, handle_interrupts: bool = True, reload_config_interval: int | None = 300, reload_config_action: ~cognite.extractorutils.base.ReloadConfigAction = ReloadConfigAction.DO_NOTHING)[source]

Bases: Generic[CustomConfigClass]

Base class for extractors.

When used as a context manager, the Extractor class will parse command line arguments, load a configuration file, set up everything needed for the extractor to run, and call the run_handle. If the extractor raises an exception, the exception will be handled by the Extractor class and logged and reported as an error.

Parameters:
  • name – Name of the extractor, how it’s invoked from the command line.

  • description – A short 1-2 sentence description of the extractor.

  • version – Version number, following semantic versioning.

  • run_handle – A function to call when setup is done that runs the extractor, taking a cognite client, state store config object and a shutdown event as arguments.

  • config_class – A class (based on the BaseConfig class) that defines the configuration schema for the extractor

  • metrics – Metrics collection, a default one with be created if omitted.

  • use_default_state_store – Create a simple instance of the LocalStateStore to provide to the run handle. If false a NoStateStore will be created in its place.

  • cancellation_token – An event that will be set when the extractor should shut down, an empty one will be created if omitted.

  • config_file_path – If supplied, the extractor will not use command line arguments to get a config file, but rather use the supplied path.

  • continuous_extractor – If True, extractor will both successful start and end time. Else, only show run on exit.

  • heartbeat_waiting_time – Time interval between each heartbeat to the extraction pipeline in seconds.

classmethod get_current_config() CustomConfigClass[source]
classmethod get_current_statestore() AbstractStateStore[source]
reload_config_callback()[source]
run() None[source]

Run the extractor. Ensures that the Extractor is set up correctly (run called within a with) and calls the run_handle.

Can be overrided in subclasses.

configtools - Utilities for reading, parsing and validating config files

The configtools module exists of tools for loading and verifying config files for extractors.

Extractor configurasions are conventionally written in hyphen-cased YAML. These are typically loaded and serialized as dataclasses in Python.

Config loader

cognite.extractorutils.configtools.load_yaml(source: TextIO | str, config_type: Type[CustomConfigClass], case_style: str = 'hyphen', expand_envvars=True) CustomConfigClass[source]

Read a YAML file, and create a config object based on its contents.

Parameters:
  • source – Input stream (as returned by open(…)) or string containing YAML.

  • config_type – Class of config type (i.e. your custom subclass of BaseConfig).

  • case_style – Casing convention of config file. Valid options are ‘snake’, ‘hyphen’ or ‘camel’. Should be ‘hyphen’.

  • expand_envvars – Substitute values with the pattern ${VAR} with the content of the environment variable VAR

Returns:

An initialized config object.

Raises:

InvalidConfigError – If any config field is given as an invalid type, is missing or is unknown

Base classes

The configtools module contains several prebuilt config classes for many common parameters. The class BaseConfig is intended as a starting point for a custom configuration schema, containing parameters for config version, CDF connection and logging.

Example:

@dataclass
class ExtractorConfig:
    state_store: Optional[StateStoreConfig]
    ...

@dataclass
class SourceConfig:
    ...


@dataclass
class MyConfig(BaseConfig):
    extractor: ExtractorConfig
    source: SourceConfig
class cognite.extractorutils.configtools.BaseConfig(type: ConfigType | None, cognite: CogniteConfig, version: str | int | None, logger: LoggingConfig)[source]

Basis for an extractor config, containing config version, CogniteConfig and LoggingConfig

class cognite.extractorutils.configtools.CogniteConfig(project: str, idp_authentication: ~cognite.extractorutils.configtools.elements.AuthenticatorConfig, data_set: ~cognite.extractorutils.configtools.elements.EitherIdConfig | None, data_set_id: int | None, data_set_external_id: str | None, extraction_pipeline: ~cognite.extractorutils.configtools.elements.EitherIdConfig | None, timeout: ~cognite.extractorutils.configtools.elements.TimeIntervalConfig = 30s, external_id_prefix: str = '', host: str = 'https://api.cognitedata.com')[source]

Configuration parameters for CDF connection, such as project name, host address and authentication

class cognite.extractorutils.configtools.LoggingConfig(console: _ConsoleLoggingConfig | None, file: _FileLoggingConfig | None, metrics: bool | None = False)[source]

Logging settings, such as log levels and path to log file

class cognite.extractorutils.configtools.MetricsConfig(push_gateways: List[_PushGatewayConfig] | None, cognite: _CogniteMetricsConfig | None, server: _PromServerConfig | None)[source]

Destination(s) for metrics, including options for one or several Prometheus push gateways, and pushing as CDF Time Series.

class cognite.extractorutils.configtools.RawDestinationConfig(database: str, table: str)[source]
class cognite.extractorutils.configtools.StateStoreConfig(raw: cognite.extractorutils.configtools.elements.RawStateStoreConfig | NoneType = None, local: cognite.extractorutils.configtools.elements.LocalStateStoreConfig | NoneType = None)[source]
class cognite.extractorutils.configtools.RawStateStoreConfig(database: str, table: str, upload_interval: cognite.extractorutils.configtools.elements.TimeIntervalConfig = 30s)[source]
class cognite.extractorutils.configtools.LocalStateStoreConfig(path: str, save_interval: cognite.extractorutils.configtools.elements.TimeIntervalConfig = 30s)[source]

Exceptions

throttle - Tools for throttling

metrics - Automatic pushers of performance metrics

Module containing tools for pushers for metric reporting.

The classes in this module scrape the default Prometheus registry and uploads it periodically to either a Prometheus push gateway, or to CDF as time series.

The BaseMetrics class forms the basis for a metrics collection for an extractor, containing some general metrics that all extractors should report. To create your own set of metrics, subclass this class and populate it with extractor-specific metrics, as such:

class MyMetrics(BaseMetrics):
    def __init__(self):
        super().__init__(extractor_name="my_extractor", extractor_version=__version__)

        self.a_counter = Counter("my_extractor_example_counter", "An example counter")
        ...

The metrics module also contains some Pusher classes that are used to routinely send metrics to a remote server, these can be automatically created with the start_pushers method described in configtools.

class cognite.extractorutils.metrics.AbstractMetricsPusher(push_interval: int | None = None, thread_name: str | None = None, cancellation_token: ~threading.Event = <threading.Event object>)[source]

Bases: ABC

Base class for metric pushers. Metric pushers spawns a thread that routinely pushes metrics to a configured destination.

Contains all the logic for starting and running threads.

Parameters:
  • push_interval – Seconds between each upload call

  • thread_name – Name of thread to start. If omitted, a standard name such as Thread-4 will be generated.

  • cancellation_token – Event object to be used as a thread cancelation event

start() None[source]

Starts a thread that pushes the default registry to the configured gateway at certain intervals.

stop() None[source]

Stop the push loop.

class cognite.extractorutils.metrics.BaseMetrics(extractor_name: str, extractor_version: str, process_scrape_interval: float = 15)[source]

Bases: object

Base collection of extractor metrics. The class also spawns a collector thread on init that regularly fetches process information and update the process_* gauges.

To create a set of metrics for an extractor, create a subclass of this class.

Note that only one instance of this class (or any subclass) can exist simultaneously

The collection includes the following metrics:
  • startup: Startup time (unix epoch)

  • finish: Finish time (unix epoch)

  • process_num_threads Number of active threads. Set automatically.

  • process_memory_bytes Memory usage of extractor. Set automatically.

  • process_cpu_percent CPU usage of extractor. Set automatically.

Parameters:
  • extractor_name – Name of extractor, used to prefix metric names

  • process_scrape_interval – Interval (in seconds) between each fetch of data for the process_* gauges

class cognite.extractorutils.metrics.CognitePusher(cdf_client: ~cognite.client._cognite_client.CogniteClient, external_id_prefix: str, push_interval: int, asset: ~cognite.client.data_classes.assets.Asset | None = None, thread_name: str | None = None, cancellation_token: ~threading.Event = <threading.Event object>)[source]

Bases: AbstractMetricsPusher

Pusher to CDF. Creates time series in CDF for all Gauges and Counters in the default Prometheus registry.

Optional contextualization with an Asset to make the time series observable in Asset Data Insight. The given asset will be created at root level in the tenant if it doesn’t already exist.

Parameters:
  • cdf_client – The CDF tenant to upload time series to

  • external_id_prefix – Unique external ID prefix for this pusher.

  • push_interval – Seconds between each upload call

  • asset – Optional contextualization.

  • thread_name – Name of thread to start. If omitted, a standard name such as Thread-4 will be generated.

  • cancellation_token – Event object to be used as a thread cancelation event

start() None

Starts a thread that pushes the default registry to the configured gateway at certain intervals.

stop() None

Stop the push loop.

class cognite.extractorutils.metrics.PrometheusPusher(job_name: str, url: str, push_interval: int, username: str | None = None, password: str | None = None, thread_name: str | None = None, cancellation_token: ~threading.Event = <threading.Event object>)[source]

Bases: AbstractMetricsPusher

Pusher to a Prometheus push gateway.

Parameters:
  • job_name – Prometheus job name

  • username – Push gateway credentials

  • password – Push gateway credentials

  • url – URL (with portnum) of push gateway

  • push_interval – Seconds between each upload call

  • thread_name – Name of thread to start. If omitted, a standard name such as Thread-4 will be generated.

  • cancellation_token – Event object to be used as a thread cancelation event

clear_gateway() None[source]

Delete metrics stored at the gateway (reset gateway).

start() None

Starts a thread that pushes the default registry to the configured gateway at certain intervals.

stop() None

Stop the push loop.

cognite.extractorutils.metrics.safe_get(cls: Type, *args, **kwargs) T[source]

A factory for instances of metrics collections.

Since Prometheus doesn’t allow multiple metrics with the same name, any subclass of BaseMetrics must never be created more than once. This function creates an instance of the given class on the first call and stores it, any subsequent calls with the same class as argument will return the same instance.

>>> a = safe_get(MyMetrics)  # This will create a new instance of MyMetrics
>>> b = safe_get(MyMetrics)  # This will return the same instance
>>> a is b
True
Parameters:

cls – Metrics class to either create or get a cached version of

Returns:

An instance of given class

statestore - Storing extractor state between runs locally or remotely

The statestore module contains classes for keeping track of the extraction state of individual items, facilitating incremental load and speeding up startup times.

At the beginning of a run the extractor typically calls the initialize method, which loads the states from the remote store (which can either be a local JSON file or a table in CDF RAW), and during and/or at the end of a run, the synchronize method is called, which saves the current states to the remote store.

You can choose the back-end for your state store with which class you’re instantiating:

# A state store using a JSON file as remote storage:
states = LocalStateStore("state.json")
states.initialize()

# A state store using a RAW table as remote storage:
states = RawStateStore(
    cdf_client = CogniteClient(),
    database = "extractor_states",
    table = "my_extractor_deployment"
)
states.initialize()

You can now use this state store to get states:

low, high = states.get_state(external_id = "my-id")

You can set states:

states.set_state(external_id = "another-id", high=100)

and similar for low. The set_state(...) method will always overwrite the current state. Some times you might want to only set state if larger than the previous state, in that case consider expand_state(...):

# High watermark of another-id is already 100, nothing happens in this call:
states.expand_state(external_id = "another-id", high=50)

# This will set high to 150 as it is larger than the previous state
states.expand_state(external_id = "another-id", high=150)

To store the state to the remote store, use the synchronize() method:

states.synchronize()

You can set a state store to automatically update on upload triggers from an upload queue by using the post_upload_function in the upload queue:

states = LocalStateStore("state.json")
states.initialize()

uploader = TimeSeriesUploadQueue(
    cdf_client = CogniteClient(),
    max_upload_interval = 10
    post_upload_function = states.post_upload_handler()
)

# The state store is now updated automatically!

states.synchronize()
class cognite.extractorutils.statestore.AbstractStateStore(save_interval: int | None = None, trigger_log_level: str = 'DEBUG', thread_name: str | None = None, cancellation_token: ~threading.Event = <threading.Event object>)[source]

Bases: ABC

Base class for a state store.

Parameters:
  • save_interval – Automatically trigger synchronize each m seconds when run as a thread (use start/stop methods).

  • trigger_log_level – Log level to log synchronize triggers to.

  • thread_name – Thread name of synchronize thread.

  • cancellation_token – Token to cancel event from elsewhere. Cancelled when stop is called.

delete_state(external_id: str) None[source]

Delete an external ID from the state store.

Parameters:

external_id – External ID to remove

expand_state(external_id: str, low: Any | None = None, high: Any | None = None) None[source]

Like set_state, but only sets state if the proposed state is outside the stored state. That is if e.g. low is lower than the stored low.

Parameters:
  • external_id – External ID of e.g. time series to store state of

  • low – Low watermark

  • high – High watermark

get_state(external_id: str | List[str]) Tuple[Any, Any] | List[Tuple[Any, Any]][source]

Get state(s) for external ID(s)

Parameters:

external_id – An external ID or list of external IDs to get states for

Returns:

A tuple with (low, high) watermarks, or a list of tuples

abstract initialize(force: bool = False) None[source]

Get states from remote store

outside_state(external_id: str, new_state: Any) bool[source]

Check if a new proposed state is outside state interval (ie, if a new datapoint should be processed).

Returns true if new_state is outside of stored state or if external_id is previously unseen.

Parameters:
  • external_id – External ID to test

  • new_state – Proposed new state to test

Returns:

True if new_state is higher than the stored high watermark or lower than the low watermark.

post_upload_handler() Callable[[List[Dict[str, str | List[Dict[str, int | float | str | datetime] | Tuple[int | float | datetime, int | float | str]]]]], None][source]

Get a callable suitable for passing to a time series upload queue as post_upload_function, that will automatically update the states in this state store when that upload queue is uploading.

Returns:

A function that expands the current states with the values given

set_state(external_id: str, low: Any | None = None, high: Any | None = None) None[source]

Set/update state of a singe external ID.

Parameters:
  • external_id – External ID of e.g. time series to store state of

  • low – Low watermark

  • high – High watermark

start() None[source]

Start saving state periodically if save_interval is set. This calls the synchronize method every save_interval seconds.

stop(ensure_synchronize: bool = True) None[source]

Stop synchronize thread if running, and ensure state is saved if ensure_synchronize is True.

Parameters:

ensure_synchronize (bool) – (Optional). Call synchronize one last time after shutting down thread.

abstract synchronize() None[source]

Upload states to remote store

class cognite.extractorutils.statestore.LocalStateStore(file_path: str, save_interval: int | None = None, trigger_log_level: str = 'DEBUG', thread_name: str | None = None, cancellation_token: ~threading.Event = <threading.Event object>)[source]

Bases: AbstractStateStore

An extractor state store using a local JSON file as backend.

Parameters:
  • file_path – File path to JSON file to use

  • save_interval – Automatically trigger synchronize each m seconds when run as a thread (use start/stop methods).

  • trigger_log_level – Log level to log synchronize triggers to.

  • thread_name – Thread name of synchronize thread.

  • cancellation_token – Token to cancel event from elsewhere. Cancelled when stop is called.

delete_state(external_id: str) None

Delete an external ID from the state store.

Parameters:

external_id – External ID to remove

expand_state(external_id: str, low: Any | None = None, high: Any | None = None) None

Like set_state, but only sets state if the proposed state is outside the stored state. That is if e.g. low is lower than the stored low.

Parameters:
  • external_id – External ID of e.g. time series to store state of

  • low – Low watermark

  • high – High watermark

get_state(external_id: str | List[str]) Tuple[Any, Any] | List[Tuple[Any, Any]]

Get state(s) for external ID(s)

Parameters:

external_id – An external ID or list of external IDs to get states for

Returns:

A tuple with (low, high) watermarks, or a list of tuples

initialize(force: bool = False) None[source]

Load states from specified JSON file

Parameters:

force – Enable re-initialization, ie overwrite when called multiple times

outside_state(external_id: str, new_state: Any) bool

Check if a new proposed state is outside state interval (ie, if a new datapoint should be processed).

Returns true if new_state is outside of stored state or if external_id is previously unseen.

Parameters:
  • external_id – External ID to test

  • new_state – Proposed new state to test

Returns:

True if new_state is higher than the stored high watermark or lower than the low watermark.

post_upload_handler() Callable[[List[Dict[str, str | List[Dict[str, int | float | str | datetime] | Tuple[int | float | datetime, int | float | str]]]]], None]

Get a callable suitable for passing to a time series upload queue as post_upload_function, that will automatically update the states in this state store when that upload queue is uploading.

Returns:

A function that expands the current states with the values given

set_state(external_id: str, low: Any | None = None, high: Any | None = None) None

Set/update state of a singe external ID.

Parameters:
  • external_id – External ID of e.g. time series to store state of

  • low – Low watermark

  • high – High watermark

start() None

Start saving state periodically if save_interval is set. This calls the synchronize method every save_interval seconds.

stop(ensure_synchronize: bool = True) None

Stop synchronize thread if running, and ensure state is saved if ensure_synchronize is True.

Parameters:

ensure_synchronize (bool) – (Optional). Call synchronize one last time after shutting down thread.

synchronize() None[source]

Save states to specified JSON file

class cognite.extractorutils.statestore.NoStateStore[source]

Bases: AbstractStateStore

A state store that only keeps states in memory and never stores or initializes from external sources.

delete_state(external_id: str) None

Delete an external ID from the state store.

Parameters:

external_id – External ID to remove

expand_state(external_id: str, low: Any | None = None, high: Any | None = None) None

Like set_state, but only sets state if the proposed state is outside the stored state. That is if e.g. low is lower than the stored low.

Parameters:
  • external_id – External ID of e.g. time series to store state of

  • low – Low watermark

  • high – High watermark

get_state(external_id: str | List[str]) Tuple[Any, Any] | List[Tuple[Any, Any]]

Get state(s) for external ID(s)

Parameters:

external_id – An external ID or list of external IDs to get states for

Returns:

A tuple with (low, high) watermarks, or a list of tuples

initialize(force: bool = False) None[source]

Get states from remote store

outside_state(external_id: str, new_state: Any) bool

Check if a new proposed state is outside state interval (ie, if a new datapoint should be processed).

Returns true if new_state is outside of stored state or if external_id is previously unseen.

Parameters:
  • external_id – External ID to test

  • new_state – Proposed new state to test

Returns:

True if new_state is higher than the stored high watermark or lower than the low watermark.

post_upload_handler() Callable[[List[Dict[str, str | List[Dict[str, int | float | str | datetime] | Tuple[int | float | datetime, int | float | str]]]]], None]

Get a callable suitable for passing to a time series upload queue as post_upload_function, that will automatically update the states in this state store when that upload queue is uploading.

Returns:

A function that expands the current states with the values given

set_state(external_id: str, low: Any | None = None, high: Any | None = None) None

Set/update state of a singe external ID.

Parameters:
  • external_id – External ID of e.g. time series to store state of

  • low – Low watermark

  • high – High watermark

start() None

Start saving state periodically if save_interval is set. This calls the synchronize method every save_interval seconds.

stop(ensure_synchronize: bool = True) None

Stop synchronize thread if running, and ensure state is saved if ensure_synchronize is True.

Parameters:

ensure_synchronize (bool) – (Optional). Call synchronize one last time after shutting down thread.

synchronize() None[source]

Upload states to remote store

class cognite.extractorutils.statestore.RawStateStore(cdf_client: ~cognite.client._cognite_client.CogniteClient, database: str, table: str, save_interval: int | None = None, trigger_log_level: str = 'DEBUG', thread_name: str | None = None, cancellation_token: ~threading.Event = <threading.Event object>)[source]

Bases: AbstractStateStore

An extractor state store based on CDF RAW.

Parameters:
  • cdf_client – Cognite client to use

  • database – Name of CDF database

  • table – Name of CDF table

  • save_interval – Automatically trigger synchronize each m seconds when run as a thread (use start/stop methods).

  • trigger_log_level – Log level to log synchronize triggers to.

  • thread_name – Thread name of synchronize thread.

  • cancellation_token – Token to cancel event from elsewhere. Cancelled when stop is called.

delete_state(external_id: str) None

Delete an external ID from the state store.

Parameters:

external_id – External ID to remove

expand_state(external_id: str, low: Any | None = None, high: Any | None = None) None

Like set_state, but only sets state if the proposed state is outside the stored state. That is if e.g. low is lower than the stored low.

Parameters:
  • external_id – External ID of e.g. time series to store state of

  • low – Low watermark

  • high – High watermark

get_state(external_id: str | List[str]) Tuple[Any, Any] | List[Tuple[Any, Any]]

Get state(s) for external ID(s)

Parameters:

external_id – An external ID or list of external IDs to get states for

Returns:

A tuple with (low, high) watermarks, or a list of tuples

initialize(force: bool = False) None[source]

Get states from remote store

outside_state(external_id: str, new_state: Any) bool

Check if a new proposed state is outside state interval (ie, if a new datapoint should be processed).

Returns true if new_state is outside of stored state or if external_id is previously unseen.

Parameters:
  • external_id – External ID to test

  • new_state – Proposed new state to test

Returns:

True if new_state is higher than the stored high watermark or lower than the low watermark.

post_upload_handler() Callable[[List[Dict[str, str | List[Dict[str, int | float | str | datetime] | Tuple[int | float | datetime, int | float | str]]]]], None]

Get a callable suitable for passing to a time series upload queue as post_upload_function, that will automatically update the states in this state store when that upload queue is uploading.

Returns:

A function that expands the current states with the values given

set_state(external_id: str, low: Any | None = None, high: Any | None = None) None

Set/update state of a singe external ID.

Parameters:
  • external_id – External ID of e.g. time series to store state of

  • low – Low watermark

  • high – High watermark

start() None

Start saving state periodically if save_interval is set. This calls the synchronize method every save_interval seconds.

stop(ensure_synchronize: bool = True) None

Stop synchronize thread if running, and ensure state is saved if ensure_synchronize is True.

Parameters:

ensure_synchronize (bool) – (Optional). Call synchronize one last time after shutting down thread.

synchronize() None[source]

Upload states to remote store

uploader - Batching upload queues with automatic upload triggers

Module containing upload queue classes. The UploadQueue classes chunks together items and uploads them together to CDF, both to minimize the load on the API, and also to speed up uploading as requests can be slow.

Each upload queue comes with some configurable conditions that, when met, automatically triggers an upload.

Note: You cannot assume that an element is uploaded when it is added to the queue, since the upload may be delayed. To ensure that everything is uploaded you should set the post_upload_function callback to verify. For example, for a time series queue you might want to check the latest time stamp, as such (assuming incremental time stamps and using timestamp-value tuples as data point format):

You can create an upload queue manually like this:

queue = TimeSeriesUploadQueue(cdf_client=my_cognite_client)

and then call queue.upload() to upload all data in the queue to CDF. However you could set some upload conditions and have the queue perform the uploads automatically, for example:

client = CogniteClient()
upload_queue = TimeSeriesUploadQueue(cdf_client=client, max_upload_interval=10)

upload_queue.start()

while not stop:
    timestamp, value = source.query()
    upload_queue.add_to_upload_queue((timestamp, value), external_id="my-timeseries")

upload_queue.stop()

The max_upload_interval specifies the maximum time (in seconds) between each API call. The upload method will be called on stop() as well so no datapoints are lost. You can also use the queue as a context:

client = CogniteClient()

with TimeSeriesUploadQueue(cdf_client=client, max_upload_interval=1) as upload_queue:
    while not stop:
        timestamp, value = source.query()

        upload_queue.add_to_upload_queue((timestamp, value), external_id="my-timeseries")

This will call the start() and stop() methods automatically.

You can also trigger uploads after a given amount of data is added, by using the max_queue_size keyword argument instead. If both are used, the condition being met first will trigger the upload.

util - Miscellaneous utilities

The util package contains miscellaneous functions and classes that can some times be useful while developing extractors.

class cognite.extractorutils.util.EitherId(**kwargs)[source]

Bases: object

Class representing an ID in CDF, which can either be an external or internal ID. An EitherId can only hold one ID type, not both.

Parameters:
  • id – Internal ID

  • external_id (externalId or) – external ID

Raises:

TypeError – If none of both of id types are set.

content() int | str[source]

Get the value of the ID

Returns:

The ID

type() str[source]

Get the type of the ID

Returns:

‘id’ if the EitherId represents an internal ID, ‘externalId’ if the EitherId represents an external ID

cognite.extractorutils.util.add_extraction_pipeline(extraction_pipeline_ext_id: str, cognite_client: CogniteClient, heartbeat_waiting_time: int = 600, added_message: str = '')[source]

This is to be used as a decorator for extractor functions to add extraction pipeline information

Parameters:
  • extraction_pipeline_ext_id

  • cognite_client

  • heartbeat_waiting_time

  • added_message

Usage:

If you have a function named “extract_data(*args, **kwargs)” and want to connect it to an extraction pipeline, you can use this decorator function as: @add_extraction_pipeline(

extraction_pipeline_ext_id=<INSERT EXTERNAL ID>, cognite_client=<INSERT COGNITE CLIENT OBJECT>, logger=<INSERT LOGGER>,

) def extract_data(*args, **kwargs):

<INSERT FUNCTION BODY>

cognite.extractorutils.util.ensure_assets(cdf_client: CogniteClient, assets: Iterable[Asset]) None[source]

Ensure that all the given assets exists in CDF.

Searches through the tenant after the external IDs of the assets given, and creates those that are missing.

Parameters:
  • cdf_client – Tenant to create assets in

  • assets – Assets to create

cognite.extractorutils.util.ensure_time_series(cdf_client: CogniteClient, time_series: Iterable[TimeSeries]) None[source]

Ensure that all the given time series exists in CDF.

Searches through the tenant after the external IDs of the time series given, and creates those that are missing.

Parameters:
  • cdf_client – Tenant to create time series in

  • time_series – Time series to create

cognite.extractorutils.util.retry(cancellation_token: ~threading.Event = <threading.Event object>, exceptions: ~typing.Iterable[~typing.Type[Exception]] = <class 'Exception'>, tries: int = -1, delay: float = 0, max_delay: float | None = None, backoff: float = 1, jitter: float | ~typing.Tuple[float, float] = 0)[source]

Returns a retry decorator.

This is adapted from https://github.com/invl/retry

Parameters:
  • cancellation_token – a threading token that is waited on.

  • exceptions – an exception or a tuple of exceptions to catch. default: Exception.

  • tries – the maximum number of attempts. default: -1 (infinite).

  • delay – initial delay between attempts. default: 0.

  • max_delay – the maximum value of delay. default: None (no limit).

  • backoff – multiplier applied to delay between attempts. default: 1 (no backoff).

  • jitter – extra seconds added to delay between attempts. default: 0. fixed if a number, random if a range tuple (min, max)

  • logger – logger.warning(fmt, error, delay) will be called on failed attempts. default: retry.logging_logger. if None, logging is disabled.

cognite.extractorutils.util.set_event_on_interrupt(stop_event: Event) None[source]

Set given event on SIGINT (Ctrl-C) instead of throwing a KeyboardInterrupt exception.

Parameters:

stop_event – Event to set

cognite.extractorutils.util.throttled_loop(target_time: int, cancellation_token: Event) Generator[None, None, None][source]

A loop generator that automatically sleeps until each iteration has taken the desired amount of time. Useful for when you want to avoid overloading a source system with requests.

Example

This example will throttle printing to only print every 10th second:

for _ in throttled_loop(10, stop_event):
    print("Hello every 10 seconds!")
Parameters:
  • target_time – How long (in seconds) an iteration should take om total

  • cancellation_token – An Event object that will act as the stop event. When set, the loop will stop.

Returns:

A generator that will only yield when the target iteration time is met