Package reference

Base class for extractors

class cognite.extractorutils.Extractor(*, name: str, description: str, version: str | None = None, run_handle: Callable[[CogniteClient, AbstractStateStore, CustomConfigClass, CancellationToken], None] | None = None, config_class: type[CustomConfigClass], metrics: BaseMetrics | None = None, use_default_state_store: bool = True, cancellation_token: CancellationToken | None = None, config_file_path: str | None = None, continuous_extractor: bool = False, heartbeat_waiting_time: int = 600, handle_interrupts: bool = True, reload_config_interval: int | None = 300, reload_config_action: ReloadConfigAction = ReloadConfigAction.DO_NOTHING, success_message: str = 'Successful shutdown')[source]

Bases: Generic[CustomConfigClass]

Base class for extractors.

When used as a context manager, the Extractor class will parse command line arguments, load a configuration file, set up everything needed for the extractor to run, and call the run_handle. If the extractor raises an exception, the exception will be handled by the Extractor class and logged and reported as an error.

Parameters:
  • name – Name of the extractor, how it’s invoked from the command line.

  • description – A short 1-2 sentence description of the extractor.

  • version – Version number, following semantic versioning.

  • run_handle – A function to call when setup is done that runs the extractor, taking a cognite client, state store config object and a shutdown event as arguments.

  • config_class – A class (based on the BaseConfig class) that defines the configuration schema for the extractor

  • metrics – Metrics collection, a default one with be created if omitted.

  • use_default_state_store – Create a simple instance of the LocalStateStore to provide to the run handle. If false a NoStateStore will be created in its place.

  • cancellation_token – An event that will be set when the extractor should shut down, an empty one will be created if omitted.

  • config_file_path – If supplied, the extractor will not use command line arguments to get a config file, but rather use the supplied path.

  • continuous_extractor – If True, extractor will both successful start and end time. Else, only show run on exit.

  • heartbeat_waiting_time – Time interval between each heartbeat to the extraction pipeline in seconds.

classmethod get_current_config() CustomConfigClass[source]

Get the current configuration singleton.

Returns:

The current configuration singleton

Raises:

ValueError – If no configuration singleton has been created, meaning no config file has been loaded.

classmethod get_current_statestore() AbstractStateStore[source]

Get the current state store singleton.

Returns:

The current state store singleton

Raises:

ValueError – If no state store singleton has been created, meaning no state store has been loaded.

reload_config_callback() None[source]

If the reload_config_action was set to CALLBACK, this method will be called when the config file is reloaded.

This method should be overridden in subclasses to provide custom behavior when the config file is reloaded.

run() None[source]

Run the extractor.

Ensures that the Extractor is set up correctly (run called within a with) and calls the run_handle.

Can be overriden in subclasses.

configtools - Utilities for reading, parsing and validating config files

The configtools module exists of tools for loading and verifying config files for extractors.

Extractor configurations are conventionally written in hyphen-cased YAML. These are typically loaded and serialized as dataclasses in Python.

Config loader

cognite.extractorutils.configtools.load_yaml(source: TextIO | str, config_type: type[CustomConfigClass], case_style: str = 'hyphen', expand_envvars: bool = True, keyvault_loader: KeyVaultLoader | None = None) CustomConfigClass[source]

Read a YAML file, and create a config object based on its contents.

Parameters:
  • source – Input stream (as returned by open(…)) or string containing YAML.

  • config_type – Class of config type (i.e. your custom subclass of BaseConfig).

  • case_style – Casing convention of config file. Valid options are ‘snake’, ‘hyphen’ or ‘camel’. Should be ‘hyphen’.

  • expand_envvars – Substitute values with the pattern ${VAR} with the content of the environment variable VAR

  • keyvault_loader – Pre-built loader for keyvault tags. Will be loaded from config if not set.

Returns:

An initialized config object.

Raises:

InvalidConfigError – If any config field is given as an invalid type, is missing or is unknown

cognite.extractorutils.configtools.load_yaml_dict(source: TextIO | str, case_style: str = 'hyphen', expand_envvars: bool = True, keyvault_loader: KeyVaultLoader | None = None) dict[str, Any][source]

Read a YAML file and return a dictionary from its contents.

Parameters:
  • source – Input stream (as returned by open(…)) or string containing YAML.

  • case_style – Casing convention of config file. Valid options are ‘snake’, ‘hyphen’ or ‘camel’. Should be ‘hyphen’.

  • expand_envvars – Substitute values with the pattern ${VAR} with the content of the environment variable VAR

  • keyvault_loader – Pre-built loader for keyvault tags. Will be loaded from config if not set.

Returns:

A raw dict with the contents of the config file.

Raises:

InvalidConfigError – If any config field is given as an invalid type, is missing or is unknown

class cognite.extractorutils.configtools.KeyVaultAuthenticationMethod(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Enum representing the authentication methods for Azure KeyVault.

class cognite.extractorutils.configtools.KeyVaultLoader(config: dict | None)[source]

Class responsible for configuring keyvault for clients using Azure.

Parameters:

config – A dictionary containing the configuration for the keyvault.

Base classes

The configtools module contains several prebuilt config classes for many common parameters. The class BaseConfig is intended as a starting point for a custom configuration schema, containing parameters for config version, CDF connection and logging.

Example:

@dataclass
class ExtractorConfig:
    state_store: Optional[StateStoreConfig]
    ...

@dataclass
class SourceConfig:
    ...


@dataclass
class MyConfig(BaseConfig):
    extractor: ExtractorConfig
    source: SourceConfig
class cognite.extractorutils.configtools.BaseConfig(type: ConfigType | None, cognite: CogniteConfig, version: str | int | None, logger: LoggingConfig)[source]

Basis for an extractor config, containing config version, CogniteConfig and LoggingConfig.

class cognite.extractorutils.configtools.CogniteConfig(project: str, idp_authentication: ~cognite.extractorutils.configtools.elements.AuthenticatorConfig, data_set: ~cognite.extractorutils.configtools.elements.EitherIdConfig | None = None, data_set_id: int | None = None, data_set_external_id: str | None = None, extraction_pipeline: ~cognite.extractorutils.configtools.elements.EitherIdConfig | None = None, timeout: ~cognite.extractorutils.configtools.elements.TimeIntervalConfig = <factory>, connection: ~cognite.extractorutils.configtools.elements.ConnectionConfig = <factory>, security_categories: list[int] | None = None, external_id_prefix: str = '', host: str = 'https://api.cognitedata.com')[source]

Configuration parameters for CDF connection, such as project name, host address and authentication.

class cognite.extractorutils.configtools.EitherIdConfig(id: int | None, external_id: str | None)[source]

Configuration parameter representing an ID in CDF, which can either be an external or internal ID.

An EitherId can only hold one ID type, not both.

class cognite.extractorutils.configtools.ConnectionConfig(disable_gzip: bool = False, status_forcelist: list[int] = <factory>, max_retries: int = 10, max_retries_connect: int = 3, max_retry_backoff: int = 30, max_connection_pool_size: int = 50, disable_ssl: bool = False, proxies: dict[str, str] = <factory>)[source]

Configuration parameters for the global_config python SDK settings.

class cognite.extractorutils.configtools.AuthenticatorConfig(client_id: str, scopes: list[str], secret: str | None = None, tenant: str | None = None, token_url: str | None = None, resource: str | None = None, audience: str | None = None, authority: str = 'https://login.microsoftonline.com/', min_ttl: float = 30, certificate: CertificateConfig | None = None)[source]

Configuration parameters for an OIDC flow.

class cognite.extractorutils.configtools.CertificateConfig(path: str, password: str | None, authority_url: str | None = None)[source]

Configuration parameters for certificates.

class cognite.extractorutils.configtools.LoggingConfig(console: _ConsoleLoggingConfig | None, file: _FileLoggingConfig | None, metrics: bool | None = False)[source]

Logging settings, such as log levels and path to log file.

class cognite.extractorutils.configtools.MetricsConfig(push_gateways: list[_PushGatewayConfig] | None, cognite: _CogniteMetricsConfig | None, server: _PromServerConfig | None)[source]

Destination(s) for metrics.

Including options for one or several Prometheus push gateways, and pushing as CDF Time Series.

class cognite.extractorutils.configtools.RawDestinationConfig(database: str, table: str)[source]

Configuration parameters for using Raw.

class cognite.extractorutils.configtools.StateStoreConfig(raw: RawStateStoreConfig | None = None, local: LocalStateStoreConfig | None = None)[source]

Configuration of the State Store, containing LocalStateStoreConfig or RawStateStoreConfig.

class cognite.extractorutils.configtools.RawStateStoreConfig(database: str, table: str, upload_interval: ~cognite.extractorutils.configtools.elements.TimeIntervalConfig = <factory>)[source]

Configuration of a state store based on CDF RAW.

class cognite.extractorutils.configtools.LocalStateStoreConfig(path: ~pathlib.Path, save_interval: ~cognite.extractorutils.configtools.elements.TimeIntervalConfig = <factory>)[source]

Configuration of a state store using a local JSON file.

class cognite.extractorutils.configtools.TimeIntervalConfig(expression: str)[source]

Configuration parameter for setting a time interval.

class cognite.extractorutils.configtools.FileSizeConfig(expression: str)[source]

Configuration parameter for setting a file size.

class cognite.extractorutils.configtools.ConfigType(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Type of configuration, either local or remote.

Exceptions

exception cognite.extractorutils.configtools.InvalidConfigError(message: str, details: list[str] | None = None)[source]

Exception thrown from load_yaml and load_yaml_dict if config file is invalid. This can be due to.

  • Missing fields

  • Incompatible types

  • Unknown fields

metrics - Automatic pushers of performance metrics

Module containing tools for pushers for metric reporting.

The classes in this module scrape the default Prometheus registry and uploads it periodically to either a Prometheus push gateway, or to CDF as time series.

The BaseMetrics class forms the basis for a metrics collection for an extractor, containing some general metrics that all extractors should report. To create your own set of metrics, subclass this class and populate it with extractor-specific metrics, as such:

class MyMetrics(BaseMetrics):
    def __init__(self):
        super().__init__(extractor_name="my_extractor", extractor_version=__version__)

        self.a_counter = Counter("my_extractor_example_counter", "An example counter")
        ...

The metrics module also contains some Pusher classes that are used to routinely send metrics to a remote server, these can be automatically created with the start_pushers method described in configtools.

class cognite.extractorutils.metrics.AbstractMetricsPusher(push_interval: int | None = None, thread_name: str | None = None, cancellation_token: CancellationToken | None = None)[source]

Bases: ABC

Base class for metric pushers.

Metric pushers spawns a thread that routinely pushes metrics to a configured destination.

Contains all the logic for starting and running threads.

Parameters:
  • push_interval – Seconds between each upload call

  • thread_name – Name of thread to start. If omitted, a standard name such as Thread-4 will be generated.

  • cancellation_token – Event object to be used as a thread cancelation event

start() None[source]

Starts a thread that pushes the default registry to the configured gateway at certain intervals.

stop() None[source]

Stop the push loop.

class cognite.extractorutils.metrics.BaseMetrics(extractor_name: str, extractor_version: str, process_scrape_interval: float = 15)[source]

Bases: object

Base collection of extractor metrics.

The class also spawns a collector thread on init that regularly fetches process information and update the process_* gauges.

To create a set of metrics for an extractor, create a subclass of this class.

Note that only one instance of this class (or any subclass) can exist simultaneously

The collection includes the following metrics:
  • startup: Startup time (unix epoch)

  • finish: Finish time (unix epoch)

  • process_num_threads Number of active threads. Set automatically.

  • process_memory_bytes Memory usage of extractor. Set automatically.

  • process_cpu_percent CPU usage of extractor. Set automatically.

Parameters:
  • extractor_name – Name of extractor, used to prefix metric names

  • process_scrape_interval – Interval (in seconds) between each fetch of data for the process_* gauges

class cognite.extractorutils.metrics.CognitePusher(cdf_client: CogniteClient, external_id_prefix: str, push_interval: int, asset: Asset | None = None, data_set: EitherId | None = None, thread_name: str | None = None, cancellation_token: CancellationToken | None = None)[source]

Bases: AbstractMetricsPusher

Pusher to CDF. Creates time series in CDF for all Gauges and Counters in the default Prometheus registry.

Optional contextualization with an Asset to make the time series observable in Asset Data Insight. The given asset will be created at root level in the tenant if it doesn’t already exist.

Parameters:
  • cdf_client – The CDF tenant to upload time series to

  • external_id_prefix – Unique external ID prefix for this pusher.

  • push_interval – Seconds between each upload call

  • asset – Optional contextualization.

  • data_set – Data set the metrics timeseries created under.

  • thread_name – Name of thread to start. If omitted, a standard name such as Thread-4 will be generated.

  • cancellation_token – Event object to be used as a thread cancelation event

start() None

Starts a thread that pushes the default registry to the configured gateway at certain intervals.

stop() None

Stop the push loop.

class cognite.extractorutils.metrics.PrometheusPusher(job_name: str, url: str, push_interval: int, username: str | None = None, password: str | None = None, thread_name: str | None = None, cancellation_token: CancellationToken | None = None)[source]

Bases: AbstractMetricsPusher

Pusher to a Prometheus push gateway.

Parameters:
  • job_name – Prometheus job name

  • username – Push gateway credentials

  • password – Push gateway credentials

  • url – URL (with portnum) of push gateway

  • push_interval – Seconds between each upload call

  • thread_name – Name of thread to start. If omitted, a standard name such as Thread-4 will be generated.

  • cancellation_token – Event object to be used as a thread cancelation event

clear_gateway() None[source]

Delete metrics stored at the gateway (reset gateway).

start() None

Starts a thread that pushes the default registry to the configured gateway at certain intervals.

stop() None

Stop the push loop.

cognite.extractorutils.metrics.safe_get(cls: type[T], *args: Any, **kwargs: Any) T[source]

A factory for instances of metrics collections.

Since Prometheus doesn’t allow multiple metrics with the same name, any subclass of BaseMetrics must never be created more than once. This function creates an instance of the given class on the first call and stores it, any subsequent calls with the same class as argument will return the same instance.

>>> a = safe_get(MyMetrics)  # This will create a new instance of MyMetrics
>>> b = safe_get(MyMetrics)  # This will return the same instance
>>> a is b
True
Parameters:
  • cls – Metrics class to either create or get a cached version of

  • args – Arguments passed as-is to the class constructor

  • kwargs – Keyword arguments passed as-is to the class constructor

Returns:

An instance of given class

statestore - Storing extractor state between runs locally or remotely

Module containing state stores for extractors.

The statestore module contains classes for keeping track of the extraction state of individual items, facilitating incremental load and speeding up startup times.

At the beginning of a run the extractor typically calls the initialize method, which loads the states from the remote store (which can either be a local JSON file or a table in CDF RAW), and during and/or at the end of a run, the synchronize method is called, which saves the current states to the remote store.

You can choose the back-end for your state store with which class you’re instantiating:

# A state store using a JSON file as remote storage:
states = LocalStateStore("state.json")
states.initialize()

# A state store using a RAW table as remote storage:
states = RawStateStore(
    cdf_client = CogniteClient(),
    database = "extractor_states",
    table = "my_extractor_deployment"
)
states.initialize()

You can now use this state store to get states:

low, high = states.get_state(external_id = "my-id")

You can set states:

states.set_state(external_id = "another-id", high=100)

and similar for low. The set_state(...) method will always overwrite the current state. Some times you might want to only set state if larger than the previous state, in that case consider expand_state(...):

# High watermark of another-id is already 100, nothing happens in this call:
states.expand_state(external_id = "another-id", high=50)

# This will set high to 150 as it is larger than the previous state
states.expand_state(external_id = "another-id", high=150)

To store the state to the remote store, use the synchronize() method:

states.synchronize()

You can set a state store to automatically update on upload triggers from an upload queue by using the post_upload_function in the upload queue:

states = LocalStateStore("state.json")
states.initialize()

uploader = TimeSeriesUploadQueue(
    cdf_client = CogniteClient(),
    max_upload_interval = 10
    post_upload_function = states.post_upload_handler()
)

# The state store is now updated automatically!

states.synchronize()
class cognite.extractorutils.statestore.AbstractHashStateStore(save_interval: int | None = None, trigger_log_level: str = 'DEBUG', thread_name: str | None = None, cancellation_token: CancellationToken | None = None)[source]

Bases: _BaseStateStore, ABC

Base class for state stores that use hashing to track changes.

This class is thread-safe.

get_state(external_id: str) str | None[source]

Get the state for a given external ID as a hash digest.

Parameters:

external_id – The external ID for which to retrieve the state.

Returns:

The hash digest of the state if it exists, otherwise None.

has_changed(external_id: str, data: dict[str, Any]) bool[source]

Check if the provided data is different from the stored state for the given external ID.

This is done by comparing the hash of the provided data with the stored hash.

Parameters:
  • external_id – The external ID for which to check the state.

  • data – The data to hash and compare against the stored state.

Returns:

True if the data has changed (i.e., the hash is different or not present), otherwise False.

abstract initialize(force: bool = False) None

Get states from remote store.

set_state(external_id: str, data: dict[str, Any]) None[source]

Set the state for a given external ID based on a hash of the provided data.

Parameters:
  • external_id – The external ID for which to set the state.

  • data – The data to hash and store as the state.

start(initialize: bool = True) None

Start saving state periodically if save_interval is set.

This calls the synchronize method every save_interval seconds.

Parameters:

initialize (bool) – (Optional). If True, call initialize method before starting the thread.

stop(ensure_synchronize: bool = True) None

Stop synchronize thread if running, and ensure state is saved if ensure_synchronize is True.

Parameters:

ensure_synchronize (bool) – (Optional). Call synchronize one last time after shutting down thread.

abstract synchronize() None

Upload states to remote store.

class cognite.extractorutils.statestore.AbstractStateStore(save_interval: int | None = None, trigger_log_level: str = 'DEBUG', thread_name: str | None = None, cancellation_token: CancellationToken | None = None)[source]

Bases: _BaseStateStore, ABC

Base class for a state store.

This class is thread-safe.

Parameters:
  • save_interval – Automatically trigger synchronize each m seconds when run as a thread (use start/stop methods).

  • trigger_log_level – Log level to log synchronize triggers to.

  • thread_name – Thread name of synchronize thread.

  • cancellation_token – Token to cancel event from elsewhere. Cancelled when stop is called.

delete_state(external_id: str) None[source]

Delete an external ID from the state store.

Parameters:

external_id – External ID to remove

expand_state(external_id: str, low: Any | None = None, high: Any | None = None) None[source]

Only set/update state if the proposed state is outside the stored state.

Only updates the low watermark if the proposed low is lower than the stored low, and only updates the high watermark if the proposed high is higher than the stored high.

Parameters:
  • external_id – External ID of e.g. time series to store state of

  • low – Low watermark

  • high – High watermark

get_state(external_id: str | list[str]) tuple[Any, Any] | list[tuple[Any, Any]][source]

Get state(s) for external ID(s).

Parameters:

external_id – An external ID or list of external IDs to get states for

Returns:

A tuple with (low, high) watermarks, or a list of tuples

abstract initialize(force: bool = False) None

Get states from remote store.

outside_state(external_id: str, new_state: Any) bool[source]

Check if a new proposed state is outside state interval (ie, if a new datapoint should be processed).

Returns true if new_state is outside of stored state or if external_id is previously unseen.

Parameters:
  • external_id – External ID to test

  • new_state – Proposed new state to test

Returns:

True if new_state is higher than the stored high watermark or lower than the low watermark.

post_upload_handler() Callable[[list[dict[str, str | list[tuple[int | datetime, float] | tuple[int | datetime, str] | tuple[int | datetime, int] | tuple[int | datetime, float, StatusCode | int] | tuple[int | datetime, str, StatusCode | int]]]]], None][source]

Get a callback function to handle post-upload events.

This callable is suitable for passing to a time series upload queue as post_upload_function, that will automatically update the states in this state store when that upload queue is uploading.

Returns:

A function that expands the current states with the values given

set_state(external_id: str, low: Any | None = None, high: Any | None = None) None[source]

Set/update state of a singe external ID.

Consider using expand_state instead, since this method will overwrite the current state no matter if it is actually outside the current state.

Parameters:
  • external_id – External ID of e.g. time series to store state of

  • low – Low watermark

  • high – High watermark

start(initialize: bool = True) None

Start saving state periodically if save_interval is set.

This calls the synchronize method every save_interval seconds.

Parameters:

initialize (bool) – (Optional). If True, call initialize method before starting the thread.

stop(ensure_synchronize: bool = True) None

Stop synchronize thread if running, and ensure state is saved if ensure_synchronize is True.

Parameters:

ensure_synchronize (bool) – (Optional). Call synchronize one last time after shutting down thread.

abstract synchronize() None

Upload states to remote store.

class cognite.extractorutils.statestore.LocalHashStateStore(file_path: str, save_interval: int | None = None, trigger_log_level: str = 'DEBUG', thread_name: str | None = None, cancellation_token: CancellationToken | None = None)[source]

Bases: AbstractHashStateStore

A version of AbstractHashStateStore that uses a local JSON file to store and persist states.

All states are stored in a JSON file, where each key is an external ID and the value is a dictionary containing the hash digest of the data.

This class is thread-safe.

Parameters:
  • file_path – The path to the JSON file where states will be stored.

  • save_interval – If set, the state store will periodically synchronize with the JSON file.

  • trigger_log_level – The logging level to use for synchronization triggers.

  • thread_name – Name of the thread used for synchronization.

  • cancellation_token – A CancellationToken to control the lifecycle of the state store.

get_state(external_id: str) str | None

Get the state for a given external ID as a hash digest.

Parameters:

external_id – The external ID for which to retrieve the state.

Returns:

The hash digest of the state if it exists, otherwise None.

has_changed(external_id: str, data: dict[str, Any]) bool

Check if the provided data is different from the stored state for the given external ID.

This is done by comparing the hash of the provided data with the stored hash.

Parameters:
  • external_id – The external ID for which to check the state.

  • data – The data to hash and compare against the stored state.

Returns:

True if the data has changed (i.e., the hash is different or not present), otherwise False.

initialize(force: bool = False) None[source]

Load states from specified JSON file.

Unless force is set to True, this will not re-initialize the state store if it has already been initialized. Subsequent calls to this method will be noop unless force is set to True.

Parameters:

force – Enable re-initialization, i.e. overwrite when called multiple times

set_state(external_id: str, data: dict[str, Any]) None

Set the state for a given external ID based on a hash of the provided data.

Parameters:
  • external_id – The external ID for which to set the state.

  • data – The data to hash and store as the state.

start(initialize: bool = True) None

Start saving state periodically if save_interval is set.

This calls the synchronize method every save_interval seconds.

Parameters:

initialize (bool) – (Optional). If True, call initialize method before starting the thread.

stop(ensure_synchronize: bool = True) None

Stop synchronize thread if running, and ensure state is saved if ensure_synchronize is True.

Parameters:

ensure_synchronize (bool) – (Optional). Call synchronize one last time after shutting down thread.

synchronize() None[source]

Save states to specified JSON file.

class cognite.extractorutils.statestore.LocalStateStore(file_path: str, save_interval: int | None = None, trigger_log_level: str = 'DEBUG', thread_name: str | None = None, cancellation_token: CancellationToken | None = None)[source]

Bases: AbstractStateStore

An extractor state store using a local JSON file as backend.

Parameters:
  • file_path – File path to JSON file to use

  • save_interval – Automatically trigger synchronize each m seconds when run as a thread (use start/stop methods).

  • trigger_log_level – Log level to log synchronize triggers to.

  • thread_name – Thread name of synchronize thread.

  • cancellation_token – Token to cancel event from elsewhere. Cancelled when stop is called.

delete_state(external_id: str) None

Delete an external ID from the state store.

Parameters:

external_id – External ID to remove

expand_state(external_id: str, low: Any | None = None, high: Any | None = None) None

Only set/update state if the proposed state is outside the stored state.

Only updates the low watermark if the proposed low is lower than the stored low, and only updates the high watermark if the proposed high is higher than the stored high.

Parameters:
  • external_id – External ID of e.g. time series to store state of

  • low – Low watermark

  • high – High watermark

get_state(external_id: str | list[str]) tuple[Any, Any] | list[tuple[Any, Any]]

Get state(s) for external ID(s).

Parameters:

external_id – An external ID or list of external IDs to get states for

Returns:

A tuple with (low, high) watermarks, or a list of tuples

initialize(force: bool = False) None[source]

Load states from specified JSON file.

Parameters:

force – Enable re-initialization, ie overwrite when called multiple times

outside_state(external_id: str, new_state: Any) bool

Check if a new proposed state is outside state interval (ie, if a new datapoint should be processed).

Returns true if new_state is outside of stored state or if external_id is previously unseen.

Parameters:
  • external_id – External ID to test

  • new_state – Proposed new state to test

Returns:

True if new_state is higher than the stored high watermark or lower than the low watermark.

post_upload_handler() Callable[[list[dict[str, str | list[tuple[int | datetime, float] | tuple[int | datetime, str] | tuple[int | datetime, int] | tuple[int | datetime, float, StatusCode | int] | tuple[int | datetime, str, StatusCode | int]]]]], None]

Get a callback function to handle post-upload events.

This callable is suitable for passing to a time series upload queue as post_upload_function, that will automatically update the states in this state store when that upload queue is uploading.

Returns:

A function that expands the current states with the values given

set_state(external_id: str, low: Any | None = None, high: Any | None = None) None

Set/update state of a singe external ID.

Consider using expand_state instead, since this method will overwrite the current state no matter if it is actually outside the current state.

Parameters:
  • external_id – External ID of e.g. time series to store state of

  • low – Low watermark

  • high – High watermark

start(initialize: bool = True) None

Start saving state periodically if save_interval is set.

This calls the synchronize method every save_interval seconds.

Parameters:

initialize (bool) – (Optional). If True, call initialize method before starting the thread.

stop(ensure_synchronize: bool = True) None

Stop synchronize thread if running, and ensure state is saved if ensure_synchronize is True.

Parameters:

ensure_synchronize (bool) – (Optional). Call synchronize one last time after shutting down thread.

synchronize() None[source]

Save states to specified JSON file.

class cognite.extractorutils.statestore.NoStateStore[source]

Bases: AbstractStateStore

A state store that only keeps states in memory and never stores or initializes from external sources.

This class is thread-safe.

delete_state(external_id: str) None

Delete an external ID from the state store.

Parameters:

external_id – External ID to remove

expand_state(external_id: str, low: Any | None = None, high: Any | None = None) None

Only set/update state if the proposed state is outside the stored state.

Only updates the low watermark if the proposed low is lower than the stored low, and only updates the high watermark if the proposed high is higher than the stored high.

Parameters:
  • external_id – External ID of e.g. time series to store state of

  • low – Low watermark

  • high – High watermark

get_state(external_id: str | list[str]) tuple[Any, Any] | list[tuple[Any, Any]]

Get state(s) for external ID(s).

Parameters:

external_id – An external ID or list of external IDs to get states for

Returns:

A tuple with (low, high) watermarks, or a list of tuples

initialize(force: bool = False) None[source]

Does nothing.

outside_state(external_id: str, new_state: Any) bool

Check if a new proposed state is outside state interval (ie, if a new datapoint should be processed).

Returns true if new_state is outside of stored state or if external_id is previously unseen.

Parameters:
  • external_id – External ID to test

  • new_state – Proposed new state to test

Returns:

True if new_state is higher than the stored high watermark or lower than the low watermark.

post_upload_handler() Callable[[list[dict[str, str | list[tuple[int | datetime, float] | tuple[int | datetime, str] | tuple[int | datetime, int] | tuple[int | datetime, float, StatusCode | int] | tuple[int | datetime, str, StatusCode | int]]]]], None]

Get a callback function to handle post-upload events.

This callable is suitable for passing to a time series upload queue as post_upload_function, that will automatically update the states in this state store when that upload queue is uploading.

Returns:

A function that expands the current states with the values given

set_state(external_id: str, low: Any | None = None, high: Any | None = None) None

Set/update state of a singe external ID.

Consider using expand_state instead, since this method will overwrite the current state no matter if it is actually outside the current state.

Parameters:
  • external_id – External ID of e.g. time series to store state of

  • low – Low watermark

  • high – High watermark

start(initialize: bool = True) None

Start saving state periodically if save_interval is set.

This calls the synchronize method every save_interval seconds.

Parameters:

initialize (bool) – (Optional). If True, call initialize method before starting the thread.

stop(ensure_synchronize: bool = True) None

Stop synchronize thread if running, and ensure state is saved if ensure_synchronize is True.

Parameters:

ensure_synchronize (bool) – (Optional). Call synchronize one last time after shutting down thread.

synchronize() None[source]

Does nothing.

class cognite.extractorutils.statestore.RawHashStateStore(cdf_client: CogniteClient, database: str, table: str, save_interval: int | None = None, trigger_log_level: str = 'DEBUG', thread_name: str | None = None, cancellation_token: CancellationToken | None = None)[source]

Bases: AbstractHashStateStore

A version of AbstractHashStateStore that uses CDF RAW to store and persist states.

All states are stored in a CDF RAW table, where each row is identified by an external ID.

This class is thread-safe.

Parameters:
  • cdf_client – The CogniteClient instance to use for ingesting to/reading from RAW.

  • database – The name of the CDF RAW database.

  • table – The name of the CDF RAW table.

  • save_interval – If set, the state store will periodically synchronize with CDF RAW.

  • trigger_log_level – The logging level to use for synchronization triggers.

  • thread_name – Name of the thread used for synchronization.

  • cancellation_token – A CancellationToken to control the lifecycle of the state store.

get_state(external_id: str) str | None

Get the state for a given external ID as a hash digest.

Parameters:

external_id – The external ID for which to retrieve the state.

Returns:

The hash digest of the state if it exists, otherwise None.

has_changed(external_id: str, data: dict[str, Any]) bool

Check if the provided data is different from the stored state for the given external ID.

This is done by comparing the hash of the provided data with the stored hash.

Parameters:
  • external_id – The external ID for which to check the state.

  • data – The data to hash and compare against the stored state.

Returns:

True if the data has changed (i.e., the hash is different or not present), otherwise False.

initialize(force: bool = False) None[source]

Initialize the state store by loading all known states from CDF RAW.

Unless force is set to True, this will not re-initialize the state store if it has already been initialized. Subsequent calls to this method will be noop unless force is set to True.

Parameters:

force – Enable re-initialization, ie overwrite when called multiple times

set_state(external_id: str, data: dict[str, Any]) None

Set the state for a given external ID based on a hash of the provided data.

Parameters:
  • external_id – The external ID for which to set the state.

  • data – The data to hash and store as the state.

start(initialize: bool = True) None

Start saving state periodically if save_interval is set.

This calls the synchronize method every save_interval seconds.

Parameters:

initialize (bool) – (Optional). If True, call initialize method before starting the thread.

stop(ensure_synchronize: bool = True) None

Stop synchronize thread if running, and ensure state is saved if ensure_synchronize is True.

Parameters:

ensure_synchronize (bool) – (Optional). Call synchronize one last time after shutting down thread.

synchronize() None[source]

Upload local state store to CDF.

class cognite.extractorutils.statestore.RawStateStore(cdf_client: CogniteClient, database: str, table: str, save_interval: int | None = None, trigger_log_level: str = 'DEBUG', thread_name: str | None = None, cancellation_token: CancellationToken | None = None)[source]

Bases: AbstractStateStore

An extractor state store based on CDF RAW.

This class is thread-safe.

Parameters:
  • cdf_client – Cognite client to use

  • database – Name of CDF database

  • table – Name of CDF table

  • save_interval – Automatically trigger synchronize each m seconds when run as a thread (use start/stop methods).

  • trigger_log_level – Log level to log synchronize triggers to.

  • thread_name – Thread name of synchronize thread.

  • cancellation_token – Token to cancel event from elsewhere. Cancelled when stop is called.

delete_state(external_id: str) None

Delete an external ID from the state store.

Parameters:

external_id – External ID to remove

expand_state(external_id: str, low: Any | None = None, high: Any | None = None) None

Only set/update state if the proposed state is outside the stored state.

Only updates the low watermark if the proposed low is lower than the stored low, and only updates the high watermark if the proposed high is higher than the stored high.

Parameters:
  • external_id – External ID of e.g. time series to store state of

  • low – Low watermark

  • high – High watermark

get_state(external_id: str | list[str]) tuple[Any, Any] | list[tuple[Any, Any]]

Get state(s) for external ID(s).

Parameters:

external_id – An external ID or list of external IDs to get states for

Returns:

A tuple with (low, high) watermarks, or a list of tuples

initialize(force: bool = False) None[source]

Initialize the state store by loading all known states from CDF RAW.

Unless force is set to True, this will not re-initialize the state store if it has already been initialized. Subsequent calls to this method will be noop unless force is set to True.

Parameters:

force – Enable re-initialization, ie overwrite when called multiple times

outside_state(external_id: str, new_state: Any) bool

Check if a new proposed state is outside state interval (ie, if a new datapoint should be processed).

Returns true if new_state is outside of stored state or if external_id is previously unseen.

Parameters:
  • external_id – External ID to test

  • new_state – Proposed new state to test

Returns:

True if new_state is higher than the stored high watermark or lower than the low watermark.

post_upload_handler() Callable[[list[dict[str, str | list[tuple[int | datetime, float] | tuple[int | datetime, str] | tuple[int | datetime, int] | tuple[int | datetime, float, StatusCode | int] | tuple[int | datetime, str, StatusCode | int]]]]], None]

Get a callback function to handle post-upload events.

This callable is suitable for passing to a time series upload queue as post_upload_function, that will automatically update the states in this state store when that upload queue is uploading.

Returns:

A function that expands the current states with the values given

set_state(external_id: str, low: Any | None = None, high: Any | None = None) None

Set/update state of a singe external ID.

Consider using expand_state instead, since this method will overwrite the current state no matter if it is actually outside the current state.

Parameters:
  • external_id – External ID of e.g. time series to store state of

  • low – Low watermark

  • high – High watermark

start(initialize: bool = True) None

Start saving state periodically if save_interval is set.

This calls the synchronize method every save_interval seconds.

Parameters:

initialize (bool) – (Optional). If True, call initialize method before starting the thread.

stop(ensure_synchronize: bool = True) None

Stop synchronize thread if running, and ensure state is saved if ensure_synchronize is True.

Parameters:

ensure_synchronize (bool) – (Optional). Call synchronize one last time after shutting down thread.

synchronize() None[source]

Upload the contents of the state store to CDF RAW.

uploader - Batching upload queues with automatic upload triggers

Module containing upload queue classes.

The UploadQueue classes chunks together items and uploads them together to CDF,both to minimize the load on the API, and also to speed up uploading as requests can be slow.

Each upload queue comes with some configurable conditions that, when met, automatically triggers an upload.

Note: You cannot assume that an element is uploaded when it is added to the queue, since the upload may be delayed. To ensure that everything is uploaded you should set the post_upload_function callback to verify. For example, for a time series queue you might want to check the latest time stamp, as such (assuming incremental time stamps and using timestamp-value tuples as data point format):

You can create an upload queue manually like this:

queue = TimeSeriesUploadQueue(cdf_client=my_cognite_client)

and then call queue.upload() to upload all data in the queue to CDF. However you could set some upload conditions and have the queue perform the uploads automatically, for example:

client = CogniteClient()
upload_queue = TimeSeriesUploadQueue(cdf_client=client, max_upload_interval=10)

upload_queue.start()

while not stop:
    timestamp, value = source.query()
    upload_queue.add_to_upload_queue((timestamp, value), external_id="my-timeseries")

upload_queue.stop()

The max_upload_interval specifies the maximum time (in seconds) between each API call. The upload method will be called on stop() as well so no datapoints are lost. You can also use the queue as a context:

client = CogniteClient()

with TimeSeriesUploadQueue(cdf_client=client, max_upload_interval=1) as upload_queue:
    while not stop:
        timestamp, value = source.query()

        upload_queue.add_to_upload_queue((timestamp, value), external_id="my-timeseries")

This will call the start() and stop() methods automatically.

You can also trigger uploads after a given amount of data is added, by using the max_queue_size keyword argument instead. If both are used, the condition being met first will trigger the upload.

class cognite.extractorutils.uploader.AssetUploadQueue(cdf_client: CogniteClient, post_upload_function: Callable[[list[Any]], None] | None = None, max_queue_size: int | None = None, max_upload_interval: int | None = None, trigger_log_level: str = 'DEBUG', thread_name: str | None = None, cancellation_token: CancellationToken | None = None)[source]

Bases: AbstractUploadQueue

Upload queue for assets.

Parameters:
  • cdf_client – Cognite Data Fusion client to use

  • post_upload_function – A function that will be called after each upload. The function will be given one argument: A list of the assets that were uploaded.

  • max_queue_size – Maximum size of upload queue. Defaults to no max size.

  • max_upload_interval – Automatically trigger an upload each m seconds when run as a thread (use start/stop methods).

  • trigger_log_level – Log level to log upload triggers to.

  • thread_name – Thread name of uploader thread.

  • cancellation_token – Cancellation token

add_to_upload_queue(asset: Asset) None[source]

Add asset to upload queue.

The queue will be uploaded if the queue size is larger than the threshold specified in the __init__.

Parameters:

asset – Asset to add

start() None

Start upload thread if max_upload_interval is set.

stop(ensure_upload: bool = True) None

Stop upload thread if running, and ensures that the upload queue is empty if ensure_upload is True.

Parameters:

ensure_upload (bool) – (Optional). Call upload one last time after shutting down thread to ensure empty upload queue.

upload() None[source]

Trigger an upload of the queue, clears queue afterwards.

class cognite.extractorutils.uploader.BytesUploadQueue(cdf_client: CogniteClient, post_upload_function: Callable[[list[FileMetadata | CogniteExtractorFileApply]], None] | None = None, max_queue_size: int | None = None, trigger_log_level: str = 'DEBUG', thread_name: str | None = None, overwrite_existing: bool = False, cancellation_token: CancellationToken | None = None, ssl_verify: bool | str = True)[source]

Bases: IOFileUploadQueue

Upload queue for bytes.

Parameters:
  • cdf_client – Cognite Data Fusion client to use

  • post_upload_function – A function that will be called after each upload. The function will be given one argument: A list of the events that were uploaded.

  • max_queue_size – Maximum size of upload queue.

  • trigger_log_level – Log level to log upload triggers to.

  • thread_name – Thread name of uploader thread.

  • overwrite_existing – If ‘overwrite’ is set to true, fields for the files found for externalIds can be overwritten

add_entry_failure_logger(file_name: str, error: Exception) None

Add an entry to the failure logger if it exists.

add_io_to_upload_queue(file_meta: FileMetadata | CogniteExtractorFileApply, read_file: Callable[[], BinaryIO], extra_retries: tuple[type[Exception], ...] | dict[type[Exception], Callable[[Any], bool]] | None = None) None

Add file to upload queue.

The file will start uploading immediately. If the size of the queue is larger than the specified max size, this call will block until it’s completed the upload.

Parameters:
  • file_meta – File metadata-object

  • read_file – Callable that returns a BinaryIO stream to read the file from.

  • extra_retries – Exception types that might be raised by read_file that should be retried

add_to_upload_queue(content: bytes, file_meta: FileMetadata | CogniteExtractorFileApply) None[source]

Add file to upload queue.

The file will start uploading immediately. If the size of the queue is larger than the specified max size, this call will block until it’s completed the upload.

Parameters:
  • content – bytes object to upload

  • file_meta – File metadata-object

flush_failure_logger() None

Flush the failure logger if it exists, writing all failures to the file.

get_failure_logger() FileFailureManager | None

Get the failure logger for this upload queue, if it exists.

initialize_failure_logging() None

Initialize the failure logging manager if a path is provided in the constructor.

start() None

Start upload thread if max_upload_interval is set.

stop(ensure_upload: bool = True) None

Stop upload thread if running, and ensures that the upload queue is empty if ensure_upload is True.

Parameters:

ensure_upload (bool) – (Optional). Call upload one last time after shutting down thread to ensure empty upload queue.

upload(fail_on_errors: bool = True, timeout: float | None = None) None

Wait for all uploads to finish.

class cognite.extractorutils.uploader.EventUploadQueue(cdf_client: CogniteClient, post_upload_function: Callable[[list[Event]], None] | None = None, max_queue_size: int | None = None, max_upload_interval: int | None = None, trigger_log_level: str = 'DEBUG', thread_name: str | None = None, cancellation_token: CancellationToken | None = None)[source]

Bases: AbstractUploadQueue

Upload queue for events.

Parameters:
  • cdf_client – Cognite Data Fusion client to use

  • post_upload_function – A function that will be called after each upload. The function will be given one argument: A list of the events that were uploaded.

  • max_queue_size – Maximum size of upload queue. Defaults to no max size.

  • max_upload_interval – Automatically trigger an upload each m seconds when run as a thread (use start/stop methods).

  • trigger_log_level – Log level to log upload triggers to.

  • thread_name – Thread name of uploader thread.

add_to_upload_queue(event: Event) None[source]

Add event to upload queue.

The queue will be uploaded if the queue size is larger than the threshold specified in the __init__.

Parameters:

event – Event to add

start() None

Start upload thread if max_upload_interval is set.

stop(ensure_upload: bool = True) None

Stop upload thread if running, and ensures that the upload queue is empty if ensure_upload is True.

Parameters:

ensure_upload (bool) – (Optional). Call upload one last time after shutting down thread to ensure empty upload queue.

upload() None[source]

Trigger an upload of the queue, clears queue afterwards.

class cognite.extractorutils.uploader.FileUploadQueue(cdf_client: CogniteClient, post_upload_function: Callable[[list[FileMetadata | CogniteExtractorFileApply]], None] | None = None, max_queue_size: int | None = None, max_upload_interval: int | None = None, trigger_log_level: str = 'DEBUG', thread_name: str | None = None, overwrite_existing: bool = False, cancellation_token: CancellationToken | None = None, ssl_verify: bool | str = True)[source]

Bases: IOFileUploadQueue

Upload queue for files.

Parameters:
  • cdf_client – Cognite Data Fusion client to use

  • post_upload_function – A function that will be called after each upload. The function will be given one argument: A list of the events that were uploaded.

  • max_queue_size – Maximum size of upload queue.

  • trigger_log_level – Log level to log upload triggers to.

  • thread_name – Thread name of uploader thread.

add_entry_failure_logger(file_name: str, error: Exception) None

Add an entry to the failure logger if it exists.

add_io_to_upload_queue(file_meta: FileMetadata | CogniteExtractorFileApply, read_file: Callable[[], BinaryIO], extra_retries: tuple[type[Exception], ...] | dict[type[Exception], Callable[[Any], bool]] | None = None) None

Add file to upload queue.

The file will start uploading immediately. If the size of the queue is larger than the specified max size, this call will block until it’s completed the upload.

Parameters:
  • file_meta – File metadata-object

  • read_file – Callable that returns a BinaryIO stream to read the file from.

  • extra_retries – Exception types that might be raised by read_file that should be retried

add_to_upload_queue(file_meta: FileMetadata | CogniteExtractorFileApply, file_name: str | PathLike) None[source]

Add file to upload queue.

The file will start uploading immediately. If the size of the queue is larger than the specified max size, this call will block until it’s completed the upload.

Parameters:
  • file_meta – File metadata-object

  • file_name – Path to file to be uploaded. If none, the file object will still be created, but no data is uploaded

flush_failure_logger() None

Flush the failure logger if it exists, writing all failures to the file.

get_failure_logger() FileFailureManager | None

Get the failure logger for this upload queue, if it exists.

initialize_failure_logging() None

Initialize the failure logging manager if a path is provided in the constructor.

start() None

Start upload thread if max_upload_interval is set.

stop(ensure_upload: bool = True) None

Stop upload thread if running, and ensures that the upload queue is empty if ensure_upload is True.

Parameters:

ensure_upload (bool) – (Optional). Call upload one last time after shutting down thread to ensure empty upload queue.

upload(fail_on_errors: bool = True, timeout: float | None = None) None

Wait for all uploads to finish.

class cognite.extractorutils.uploader.IOFileUploadQueue(cdf_client: CogniteClient, post_upload_function: Callable[[list[FileMetadata | CogniteExtractorFileApply]], None] | None = None, max_queue_size: int | None = None, trigger_log_level: str = 'DEBUG', thread_name: str | None = None, overwrite_existing: bool = False, cancellation_token: CancellationToken | None = None, max_parallelism: int | None = None, failure_logging_path: None | str = None, ssl_verify: bool | str = True)[source]

Bases: AbstractUploadQueue

Upload queue for files using BinaryIO.

Note that if the upload fails, the stream needs to be restarted, so the enqueued callback needs to produce a new IO object for each call.

Parameters:
  • cdf_client – Cognite Data Fusion client to use

  • post_upload_function – A function that will be called after each upload. The function will be given one argument: A list of the events that were uploaded.

  • max_queue_size – Maximum size of upload queue.

  • trigger_log_level – Log level to log upload triggers to.

  • thread_name – Thread name of uploader thread.

  • max_parallelism – Maximum number of parallel uploads. If nothing is given, the parallelism will be capped by the max_workers of the cognite client.

  • ssl_verify – Either a string (path to a CA bundle) or a bool (false to turn off completely, true to use standard CA bundle)

add_entry_failure_logger(file_name: str, error: Exception) None[source]

Add an entry to the failure logger if it exists.

add_io_to_upload_queue(file_meta: FileMetadata | CogniteExtractorFileApply, read_file: Callable[[], BinaryIO], extra_retries: tuple[type[Exception], ...] | dict[type[Exception], Callable[[Any], bool]] | None = None) None[source]

Add file to upload queue.

The file will start uploading immediately. If the size of the queue is larger than the specified max size, this call will block until it’s completed the upload.

Parameters:
  • file_meta – File metadata-object

  • read_file – Callable that returns a BinaryIO stream to read the file from.

  • extra_retries – Exception types that might be raised by read_file that should be retried

flush_failure_logger() None[source]

Flush the failure logger if it exists, writing all failures to the file.

get_failure_logger() FileFailureManager | None[source]

Get the failure logger for this upload queue, if it exists.

initialize_failure_logging() None[source]

Initialize the failure logging manager if a path is provided in the constructor.

start() None

Start upload thread if max_upload_interval is set.

stop(ensure_upload: bool = True) None

Stop upload thread if running, and ensures that the upload queue is empty if ensure_upload is True.

Parameters:

ensure_upload (bool) – (Optional). Call upload one last time after shutting down thread to ensure empty upload queue.

upload(fail_on_errors: bool = True, timeout: float | None = None) None[source]

Wait for all uploads to finish.

class cognite.extractorutils.uploader.RawUploadQueue(cdf_client: CogniteClient, post_upload_function: Callable[[list[Any]], None] | None = None, max_queue_size: int | None = None, max_upload_interval: int | None = None, trigger_log_level: str = 'DEBUG', thread_name: str | None = None, cancellation_token: CancellationToken | None = None)[source]

Bases: AbstractUploadQueue

Upload queue for RAW.

Parameters:
  • cdf_client – Cognite Data Fusion client to use

  • post_upload_function – A function that will be called after each upload. The function will be given one argument: A list of the rows that were uploaded.

  • max_queue_size – Maximum size of upload queue. Defaults to no max size.

  • max_upload_interval – Automatically trigger an upload each m seconds when run as a thread (use start/stop methods).

  • trigger_log_level – Log level to log upload triggers to.

  • thread_name – Thread name of uploader thread.

add_to_upload_queue(database: str, table: str, raw_row: Row) None[source]

Adds a row to the upload queue.

The queue will be uploaded if the queue size is larger than the threshold specified in the __init__.

Parameters:
  • database – The database to upload the Raw object to

  • table – The table to upload the Raw object to

  • raw_row – The row object

start() None

Start upload thread if max_upload_interval is set.

stop(ensure_upload: bool = True) None

Stop upload thread if running, and ensures that the upload queue is empty if ensure_upload is True.

Parameters:

ensure_upload (bool) – (Optional). Call upload one last time after shutting down thread to ensure empty upload queue.

upload() None[source]

Trigger an upload of the queue, clears queue afterwards.

class cognite.extractorutils.uploader.SequenceUploadQueue(cdf_client: CogniteClient, post_upload_function: Callable[[list[Any]], None] | None = None, max_queue_size: int | None = None, max_upload_interval: int | None = None, trigger_log_level: str = 'DEBUG', thread_name: str | None = None, create_missing: bool = False, cancellation_token: CancellationToken | None = None)[source]

Bases: AbstractUploadQueue

Upload queue for sequences.

Parameters:
  • cdf_client – Cognite Data Fusion client to use

  • post_upload_function – A function that will be called after each upload. The function will be given one argument: A list of the events that were uploaded.

  • max_queue_size – Maximum size of upload queue. Defaults to no max size.

  • max_upload_interval – Automatically trigger an upload each m seconds when run as a thread (use start/stop methods).

  • trigger_log_level – Log level to log upload triggers to.

  • thread_name – Thread name of uploader thread.

  • create_missing – Create missing sequences if possible (ie, if external id is used).

add_to_upload_queue(rows: dict[int, list[int | float | str]] | list[tuple[int, int | float | str]] | list[dict[str, Any]] | SequenceData | SequenceRows, column_external_ids: list[dict] | None = None, id: int | None = None, external_id: str | None = None) None[source]

Add sequence rows to upload queue.

Mirrors implementation of SequenceApi.insert. Inserted rows will be cached until uploaded.

Parameters:
  • rows – The rows to be inserted. Can either be a list of tuples, a list of [“rownumber”: …, “values”: …] objects, a dictionary of rowNumber: data, or a SequenceData object.

  • column_external_ids – list of external id for the columns of the sequence

  • id – Sequence internal ID Use if external_id is None

  • external_id – Sequence external ID Us if id is None

set_sequence_column_definition(col_def: list[dict[str, str]], id: int | None = None, external_id: str | None = None) None[source]

Set sequence column definition.

Parameters:
  • col_def – Sequence column definition

  • id – Sequence internal ID Use if external_id is None

  • external_id – Sequence external ID Us if id is None

set_sequence_metadata(metadata: dict[str, str | int | float], id: int | None = None, external_id: str | None = None, asset_external_id: str | None = None, dataset_external_id: str | None = None, name: str | None = None, description: str | None = None) None[source]

Set sequence metadata.

Metadata will be cached until the sequence is created. The metadata will be updated if the sequence already exists.

Parameters:
  • metadata – Sequence metadata

  • id – Sequence internal ID Use if external_id is None

  • external_id – Sequence external ID Use if id is None

  • asset_external_id – Sequence asset external ID

  • dataset_external_id – Sequence dataset external ID

  • name – Sequence name

  • description – Sequence description

start() None

Start upload thread if max_upload_interval is set.

stop(ensure_upload: bool = True) None

Stop upload thread if running, and ensures that the upload queue is empty if ensure_upload is True.

Parameters:

ensure_upload (bool) – (Optional). Call upload one last time after shutting down thread to ensure empty upload queue.

upload() None[source]

Trigger an upload of the queue, clears queue afterwards.

class cognite.extractorutils.uploader.TimeSeriesUploadQueue(cdf_client: CogniteClient, post_upload_function: Callable[[list[dict[str, str | list[tuple[int | datetime, float] | tuple[int | datetime, str] | tuple[int | datetime, int] | tuple[int | datetime, float, StatusCode | int] | tuple[int | datetime, str, StatusCode | int]]]]], None] | None = None, max_queue_size: int | None = None, max_upload_interval: int | None = None, trigger_log_level: str = 'DEBUG', thread_name: str | None = None, create_missing: Callable[[str, list[tuple[int | datetime, float] | tuple[int | datetime, str] | tuple[int | datetime, int] | tuple[int | datetime, float, StatusCode | int] | tuple[int | datetime, str, StatusCode | int]]], TimeSeries] | bool = False, data_set_id: int | None = None, cancellation_token: CancellationToken | None = None)[source]

Bases: AbstractUploadQueue

Upload queue for time series.

Parameters:
  • cdf_client – Cognite Data Fusion client to use

  • post_upload_function – A function that will be called after each upload. The function will be given one argument: A list of dicts containing the datapoints that were uploaded (on the same format as the kwargs in datapoints upload in the Cognite SDK).

  • max_queue_size – Maximum size of upload queue. Defaults to no max size.

  • max_upload_interval – Automatically trigger an upload each m seconds when run as a thread (use start/stop methods).

  • trigger_log_level – Log level to log upload triggers to.

  • thread_name – Thread name of uploader thread.

  • create_missing – Create missing time series if possible (ie, if external id is used). Either given as a boolean (True would auto-create a time series with nothing but an external ID), or as a factory function taking an external ID and a list of datapoints about to be inserted and returning a TimeSeries object.

  • data_set_id – Data set id passed to create_missing. Does nothing if create_missing is False. If a custom timeseries creation method is set in create_missing, this is used as fallback if that method does not set data set id on its own.

add_to_upload_queue(*, id: int | None = None, external_id: str | None = None, datapoints: list[tuple[int | datetime, float] | tuple[int | datetime, str] | tuple[int | datetime, int] | tuple[int | datetime, float, StatusCode | int] | tuple[int | datetime, str, StatusCode | int]] | None = None) None[source]

Add data points to upload queue.

The queue will be uploaded if the queue size is larger than the threshold specified in the __init__.

Parameters:
  • id – Internal ID of time series. Either this or external_id must be set.

  • external_id – External ID of time series. Either this or external_id must be set.

  • datapoints – list of data points to add

start() None

Start upload thread if max_upload_interval is set.

stop(ensure_upload: bool = True) None

Stop upload thread if running, and ensures that the upload queue is empty if ensure_upload is True.

Parameters:

ensure_upload (bool) – (Optional). Call upload one last time after shutting down thread to ensure empty upload queue.

upload() None[source]

Trigger an upload of the queue, clears queue afterwards.

cognite.extractorutils.uploader.default_time_series_factory(external_id: str, datapoints: list[tuple[int | datetime, float] | tuple[int | datetime, str] | tuple[int | datetime, int] | tuple[int | datetime, float, StatusCode | int] | tuple[int | datetime, str, StatusCode | int]]) TimeSeries[source]

Default time series factory used when create_missing in a TimeSeriesUploadQueue is given as a boolean.

Parameters:
  • external_id – External ID of time series to create

  • datapoints – The list of datapoints that were tried to be inserted

Returns:

A TimeSeries object with external_id set, and the is_string automatically detected

class cognite.extractorutils.uploader._base.AbstractUploadQueue(cdf_client: CogniteClient, post_upload_function: Callable[[list[Any]], None] | None = None, max_queue_size: int | None = None, max_upload_interval: int | None = None, trigger_log_level: str = 'DEBUG', thread_name: str | None = None, cancellation_token: CancellationToken | None = None)[source]

Bases: ABC

Abstract uploader class.

Parameters:
  • cdf_client – Cognite Data Fusion client to use

  • post_upload_function – A function that will be called after each upload. The function will be given one argument: A list of the elements that were uploaded.

  • max_queue_size – Maximum size of upload queue. Defaults to no max size.

  • max_upload_interval – Automatically trigger an upload each m seconds when run as a thread (use start/stop methods).

  • trigger_log_level – Log level to log upload triggers to.

  • thread_name – Thread name of uploader thread.

start() None[source]

Start upload thread if max_upload_interval is set.

stop(ensure_upload: bool = True) None[source]

Stop upload thread if running, and ensures that the upload queue is empty if ensure_upload is True.

Parameters:

ensure_upload (bool) – (Optional). Call upload one last time after shutting down thread to ensure empty upload queue.

abstract upload() None[source]

Uploads the queue.

class cognite.extractorutils.uploader._base.TimestampedObject(payload: Any, created: arrow.arrow.Arrow)[source]

Bases: object

created: Arrow
payload: Any

Upload queue for (legacy) assets.

class cognite.extractorutils.uploader.assets.AssetUploadQueue(cdf_client: CogniteClient, post_upload_function: Callable[[list[Any]], None] | None = None, max_queue_size: int | None = None, max_upload_interval: int | None = None, trigger_log_level: str = 'DEBUG', thread_name: str | None = None, cancellation_token: CancellationToken | None = None)[source]

Bases: AbstractUploadQueue

Upload queue for assets.

Parameters:
  • cdf_client – Cognite Data Fusion client to use

  • post_upload_function – A function that will be called after each upload. The function will be given one argument: A list of the assets that were uploaded.

  • max_queue_size – Maximum size of upload queue. Defaults to no max size.

  • max_upload_interval – Automatically trigger an upload each m seconds when run as a thread (use start/stop methods).

  • trigger_log_level – Log level to log upload triggers to.

  • thread_name – Thread name of uploader thread.

  • cancellation_token – Cancellation token

add_to_upload_queue(asset: Asset) None[source]

Add asset to upload queue.

The queue will be uploaded if the queue size is larger than the threshold specified in the __init__.

Parameters:

asset – Asset to add

upload() None[source]

Trigger an upload of the queue, clears queue afterwards.

Upload queue for (legacy) events.

class cognite.extractorutils.uploader.events.EventUploadQueue(cdf_client: CogniteClient, post_upload_function: Callable[[list[Event]], None] | None = None, max_queue_size: int | None = None, max_upload_interval: int | None = None, trigger_log_level: str = 'DEBUG', thread_name: str | None = None, cancellation_token: CancellationToken | None = None)[source]

Bases: AbstractUploadQueue

Upload queue for events.

Parameters:
  • cdf_client – Cognite Data Fusion client to use

  • post_upload_function – A function that will be called after each upload. The function will be given one argument: A list of the events that were uploaded.

  • max_queue_size – Maximum size of upload queue. Defaults to no max size.

  • max_upload_interval – Automatically trigger an upload each m seconds when run as a thread (use start/stop methods).

  • trigger_log_level – Log level to log upload triggers to.

  • thread_name – Thread name of uploader thread.

add_to_upload_queue(event: Event) None[source]

Add event to upload queue.

The queue will be uploaded if the queue size is larger than the threshold specified in the __init__.

Parameters:

event – Event to add

upload() None[source]

Trigger an upload of the queue, clears queue afterwards.

Upload queue for files.

class cognite.extractorutils.uploader.files.BytesUploadQueue(cdf_client: CogniteClient, post_upload_function: Callable[[list[FileMetadata | CogniteExtractorFileApply]], None] | None = None, max_queue_size: int | None = None, trigger_log_level: str = 'DEBUG', thread_name: str | None = None, overwrite_existing: bool = False, cancellation_token: CancellationToken | None = None, ssl_verify: bool | str = True)[source]

Bases: IOFileUploadQueue

Upload queue for bytes.

Parameters:
  • cdf_client – Cognite Data Fusion client to use

  • post_upload_function – A function that will be called after each upload. The function will be given one argument: A list of the events that were uploaded.

  • max_queue_size – Maximum size of upload queue.

  • trigger_log_level – Log level to log upload triggers to.

  • thread_name – Thread name of uploader thread.

  • overwrite_existing – If ‘overwrite’ is set to true, fields for the files found for externalIds can be overwritten

add_to_upload_queue(content: bytes, file_meta: FileMetadata | CogniteExtractorFileApply) None[source]

Add file to upload queue.

The file will start uploading immediately. If the size of the queue is larger than the specified max size, this call will block until it’s completed the upload.

Parameters:
  • content – bytes object to upload

  • file_meta – File metadata-object

class cognite.extractorutils.uploader.files.ChunkedStream(inner: BinaryIO, max_chunk_size: int, stream_length: int)[source]

Bases: RawIOBase, BinaryIO

Wrapper around a read-only stream to allow treating it as a sequence of smaller streams.

next_chunk will return true if there is one more chunk, it must be called before this is treated as a stream the first time, typically in a while loop.

Parameters:
  • inner – Stream to wrap.

  • max_chunk_size – Maximum size per stream chunk.

  • stream_length – Total (remaining) length of the inner stream. This must be accurate.

property chunk_count: int

Number of chunks in the stream.

property current_chunk: int

Current chunk number.

property len: int

Length of the current chunk, in bytes.

next_chunk() bool[source]

Step into the next chunk, letting this be read as a stream again.

Returns False if the stream is exhausted.

read(size: int = -1) bytes[source]

Read bytes from the current chunk.

readable() bool[source]

Check if the stream is readable. Always True for ChunkedStream.

tell() int[source]

Get the current position of the stream.

write(_ChunkedStream__b: Any) int[source]

Not supported for ChunkedStream.

writelines(_ChunkedStream__lines: Any) None[source]

Not supported for ChunkedStream.

class cognite.extractorutils.uploader.files.FileUploadQueue(cdf_client: CogniteClient, post_upload_function: Callable[[list[FileMetadata | CogniteExtractorFileApply]], None] | None = None, max_queue_size: int | None = None, max_upload_interval: int | None = None, trigger_log_level: str = 'DEBUG', thread_name: str | None = None, overwrite_existing: bool = False, cancellation_token: CancellationToken | None = None, ssl_verify: bool | str = True)[source]

Bases: IOFileUploadQueue

Upload queue for files.

Parameters:
  • cdf_client – Cognite Data Fusion client to use

  • post_upload_function – A function that will be called after each upload. The function will be given one argument: A list of the events that were uploaded.

  • max_queue_size – Maximum size of upload queue.

  • trigger_log_level – Log level to log upload triggers to.

  • thread_name – Thread name of uploader thread.

add_to_upload_queue(file_meta: FileMetadata | CogniteExtractorFileApply, file_name: str | PathLike) None[source]

Add file to upload queue.

The file will start uploading immediately. If the size of the queue is larger than the specified max size, this call will block until it’s completed the upload.

Parameters:
  • file_meta – File metadata-object

  • file_name – Path to file to be uploaded. If none, the file object will still be created, but no data is uploaded

class cognite.extractorutils.uploader.files.IOByteStream(stream: BinaryIO)[source]

Bases: SyncByteStream

Wraps a BinaryIO stream to be used as a httpx SyncByteStream.

class cognite.extractorutils.uploader.files.IOFileUploadQueue(cdf_client: CogniteClient, post_upload_function: Callable[[list[FileMetadata | CogniteExtractorFileApply]], None] | None = None, max_queue_size: int | None = None, trigger_log_level: str = 'DEBUG', thread_name: str | None = None, overwrite_existing: bool = False, cancellation_token: CancellationToken | None = None, max_parallelism: int | None = None, failure_logging_path: None | str = None, ssl_verify: bool | str = True)[source]

Bases: AbstractUploadQueue

Upload queue for files using BinaryIO.

Note that if the upload fails, the stream needs to be restarted, so the enqueued callback needs to produce a new IO object for each call.

Parameters:
  • cdf_client – Cognite Data Fusion client to use

  • post_upload_function – A function that will be called after each upload. The function will be given one argument: A list of the events that were uploaded.

  • max_queue_size – Maximum size of upload queue.

  • trigger_log_level – Log level to log upload triggers to.

  • thread_name – Thread name of uploader thread.

  • max_parallelism – Maximum number of parallel uploads. If nothing is given, the parallelism will be capped by the max_workers of the cognite client.

  • ssl_verify – Either a string (path to a CA bundle) or a bool (false to turn off completely, true to use standard CA bundle)

add_entry_failure_logger(file_name: str, error: Exception) None[source]

Add an entry to the failure logger if it exists.

add_io_to_upload_queue(file_meta: FileMetadata | CogniteExtractorFileApply, read_file: Callable[[], BinaryIO], extra_retries: tuple[type[Exception], ...] | dict[type[Exception], Callable[[Any], bool]] | None = None) None[source]

Add file to upload queue.

The file will start uploading immediately. If the size of the queue is larger than the specified max size, this call will block until it’s completed the upload.

Parameters:
  • file_meta – File metadata-object

  • read_file – Callable that returns a BinaryIO stream to read the file from.

  • extra_retries – Exception types that might be raised by read_file that should be retried

flush_failure_logger() None[source]

Flush the failure logger if it exists, writing all failures to the file.

get_failure_logger() FileFailureManager | None[source]

Get the failure logger for this upload queue, if it exists.

initialize_failure_logging() None[source]

Initialize the failure logging manager if a path is provided in the constructor.

upload(fail_on_errors: bool = True, timeout: float | None = None) None[source]

Wait for all uploads to finish.

Upload queue for RAW.

class cognite.extractorutils.uploader.raw.RawUploadQueue(cdf_client: CogniteClient, post_upload_function: Callable[[list[Any]], None] | None = None, max_queue_size: int | None = None, max_upload_interval: int | None = None, trigger_log_level: str = 'DEBUG', thread_name: str | None = None, cancellation_token: CancellationToken | None = None)[source]

Bases: AbstractUploadQueue

Upload queue for RAW.

Parameters:
  • cdf_client – Cognite Data Fusion client to use

  • post_upload_function – A function that will be called after each upload. The function will be given one argument: A list of the rows that were uploaded.

  • max_queue_size – Maximum size of upload queue. Defaults to no max size.

  • max_upload_interval – Automatically trigger an upload each m seconds when run as a thread (use start/stop methods).

  • trigger_log_level – Log level to log upload triggers to.

  • thread_name – Thread name of uploader thread.

add_to_upload_queue(database: str, table: str, raw_row: Row) None[source]

Adds a row to the upload queue.

The queue will be uploaded if the queue size is larger than the threshold specified in the __init__.

Parameters:
  • database – The database to upload the Raw object to

  • table – The table to upload the Raw object to

  • raw_row – The row object

upload() None[source]

Trigger an upload of the queue, clears queue afterwards.

Upload queue for time series and sequences.

class cognite.extractorutils.uploader.time_series.SequenceUploadQueue(cdf_client: CogniteClient, post_upload_function: Callable[[list[Any]], None] | None = None, max_queue_size: int | None = None, max_upload_interval: int | None = None, trigger_log_level: str = 'DEBUG', thread_name: str | None = None, create_missing: bool = False, cancellation_token: CancellationToken | None = None)[source]

Bases: AbstractUploadQueue

Upload queue for sequences.

Parameters:
  • cdf_client – Cognite Data Fusion client to use

  • post_upload_function – A function that will be called after each upload. The function will be given one argument: A list of the events that were uploaded.

  • max_queue_size – Maximum size of upload queue. Defaults to no max size.

  • max_upload_interval – Automatically trigger an upload each m seconds when run as a thread (use start/stop methods).

  • trigger_log_level – Log level to log upload triggers to.

  • thread_name – Thread name of uploader thread.

  • create_missing – Create missing sequences if possible (ie, if external id is used).

add_to_upload_queue(rows: dict[int, list[int | float | str]] | list[tuple[int, int | float | str]] | list[dict[str, Any]] | SequenceData | SequenceRows, column_external_ids: list[dict] | None = None, id: int | None = None, external_id: str | None = None) None[source]

Add sequence rows to upload queue.

Mirrors implementation of SequenceApi.insert. Inserted rows will be cached until uploaded.

Parameters:
  • rows – The rows to be inserted. Can either be a list of tuples, a list of [“rownumber”: …, “values”: …] objects, a dictionary of rowNumber: data, or a SequenceData object.

  • column_external_ids – list of external id for the columns of the sequence

  • id – Sequence internal ID Use if external_id is None

  • external_id – Sequence external ID Us if id is None

set_sequence_column_definition(col_def: list[dict[str, str]], id: int | None = None, external_id: str | None = None) None[source]

Set sequence column definition.

Parameters:
  • col_def – Sequence column definition

  • id – Sequence internal ID Use if external_id is None

  • external_id – Sequence external ID Us if id is None

set_sequence_metadata(metadata: dict[str, str | int | float], id: int | None = None, external_id: str | None = None, asset_external_id: str | None = None, dataset_external_id: str | None = None, name: str | None = None, description: str | None = None) None[source]

Set sequence metadata.

Metadata will be cached until the sequence is created. The metadata will be updated if the sequence already exists.

Parameters:
  • metadata – Sequence metadata

  • id – Sequence internal ID Use if external_id is None

  • external_id – Sequence external ID Use if id is None

  • asset_external_id – Sequence asset external ID

  • dataset_external_id – Sequence dataset external ID

  • name – Sequence name

  • description – Sequence description

upload() None[source]

Trigger an upload of the queue, clears queue afterwards.

class cognite.extractorutils.uploader.time_series.TimeSeriesUploadQueue(cdf_client: CogniteClient, post_upload_function: Callable[[list[dict[str, str | list[tuple[int | datetime, float] | tuple[int | datetime, str] | tuple[int | datetime, int] | tuple[int | datetime, float, StatusCode | int] | tuple[int | datetime, str, StatusCode | int]]]]], None] | None = None, max_queue_size: int | None = None, max_upload_interval: int | None = None, trigger_log_level: str = 'DEBUG', thread_name: str | None = None, create_missing: Callable[[str, list[tuple[int | datetime, float] | tuple[int | datetime, str] | tuple[int | datetime, int] | tuple[int | datetime, float, StatusCode | int] | tuple[int | datetime, str, StatusCode | int]]], TimeSeries] | bool = False, data_set_id: int | None = None, cancellation_token: CancellationToken | None = None)[source]

Bases: AbstractUploadQueue

Upload queue for time series.

Parameters:
  • cdf_client – Cognite Data Fusion client to use

  • post_upload_function – A function that will be called after each upload. The function will be given one argument: A list of dicts containing the datapoints that were uploaded (on the same format as the kwargs in datapoints upload in the Cognite SDK).

  • max_queue_size – Maximum size of upload queue. Defaults to no max size.

  • max_upload_interval – Automatically trigger an upload each m seconds when run as a thread (use start/stop methods).

  • trigger_log_level – Log level to log upload triggers to.

  • thread_name – Thread name of uploader thread.

  • create_missing – Create missing time series if possible (ie, if external id is used). Either given as a boolean (True would auto-create a time series with nothing but an external ID), or as a factory function taking an external ID and a list of datapoints about to be inserted and returning a TimeSeries object.

  • data_set_id – Data set id passed to create_missing. Does nothing if create_missing is False. If a custom timeseries creation method is set in create_missing, this is used as fallback if that method does not set data set id on its own.

add_to_upload_queue(*, id: int | None = None, external_id: str | None = None, datapoints: list[tuple[int | datetime, float] | tuple[int | datetime, str] | tuple[int | datetime, int] | tuple[int | datetime, float, StatusCode | int] | tuple[int | datetime, str, StatusCode | int]] | None = None) None[source]

Add data points to upload queue.

The queue will be uploaded if the queue size is larger than the threshold specified in the __init__.

Parameters:
  • id – Internal ID of time series. Either this or external_id must be set.

  • external_id – External ID of time series. Either this or external_id must be set.

  • datapoints – list of data points to add

upload() None[source]

Trigger an upload of the queue, clears queue afterwards.

cognite.extractorutils.uploader.time_series.default_time_series_factory(external_id: str, datapoints: list[tuple[int | datetime, float] | tuple[int | datetime, str] | tuple[int | datetime, int] | tuple[int | datetime, float, StatusCode | int] | tuple[int | datetime, str, StatusCode | int]]) TimeSeries[source]

Default time series factory used when create_missing in a TimeSeriesUploadQueue is given as a boolean.

Parameters:
  • external_id – External ID of time series to create

  • datapoints – The list of datapoints that were tried to be inserted

Returns:

A TimeSeries object with external_id set, and the is_string automatically detected

util - Miscellaneous utilities

This module contains miscellaneous functions and classes.

class cognite.extractorutils.util.BufferedReadWithLength(raw: RawIOBase, buffer_size: int, len: int, on_close: Callable[[], None] | None = None)[source]

Bases: BufferedReader

A BufferedReader that also has a length attribute.

Some libraries (like requests) checks streams for a len attribute to use for the content-length header when uploading files. Using this class allows these libraries to work with streams that have a known length without seeking to the end of the stream to find its length.

Parameters:
  • raw – The raw IO object to read from.

  • buffer_size – The size of the buffer to use.

  • len – The length of the stream in bytes.

  • on_close – A callable that will be called when the stream is closed. This can be used to clean up resources.

close() None[source]

Close the stream and call the on_close callback if it is set.

closed
detach()

Disconnect this buffer from its underlying raw stream and return it.

After the raw stream has been detached, the buffer is in an unusable state.

fileno()

Returns underlying file descriptor if one exists.

OSError is raised if the IO object does not use a file descriptor.

flush()

Flush write buffers, if applicable.

This is not implemented for read-only and non-blocking streams.

isatty()

Return whether this is an ‘interactive’ stream.

Return False if it can’t be determined.

mode
name
peek(size=0, /)
raw
read(size=-1, /)

Read and return up to n bytes.

If the argument is omitted, None, or negative, reads and returns all data until EOF.

If the argument is positive, and the underlying raw stream is not ‘interactive’, multiple raw reads may be issued to satisfy the byte count (unless EOF is reached first). But for interactive raw streams (as well as sockets and pipes), at most one raw read will be issued, and a short result does not imply that EOF is imminent.

Returns an empty bytes object on EOF.

Returns None if the underlying raw stream was open in non-blocking mode and no data is available at the moment.

read1(size=-1, /)

Read and return up to n bytes, with at most one read() call to the underlying raw stream. A short result does not imply that EOF is imminent.

Returns an empty bytes object on EOF.

readable()

Return whether object was opened for reading.

If False, read() will raise OSError.

readinto(buffer, /)
readinto1(buffer, /)
readline(size=-1, /)

Read and return a line from the stream.

If size is specified, at most size bytes will be read.

The line terminator is always b’n’ for binary files; for text files, the newlines argument to open can be used to select the line terminator(s) recognized.

readlines(hint=-1, /)

Return a list of lines from the stream.

hint can be specified to control the number of lines read: no more lines will be read if the total size (in bytes/characters) of all lines so far exceeds hint.

seek(target, whence=0, /)

Change the stream position to the given byte offset.

offset

The stream position, relative to ‘whence’.

whence

The relative position to seek from.

The offset is interpreted relative to the position indicated by whence. Values for whence are:

  • os.SEEK_SET or 0 – start of stream (the default); offset should be zero or positive

  • os.SEEK_CUR or 1 – current stream position; offset may be negative

  • os.SEEK_END or 2 – end of stream; offset is usually negative

Return the new absolute position.

seekable()

Return whether object supports random access.

If False, seek(), tell() and truncate() will raise OSError. This method may need to do a test seek().

tell()

Return current stream position.

truncate(pos=None, /)

Truncate file to size bytes.

File pointer is left unchanged. Size defaults to the current IO position as reported by tell(). Returns the new size.

writable()

Return whether object was opened for writing.

If False, write() will raise OSError.

write()

Write the given buffer to the IO stream.

Returns the number of bytes written, which is always the length of b in bytes.

Raises BlockingIOError if the buffer is full and the underlying raw stream cannot accept more data at the moment.

writelines(lines, /)

Write a list of lines to stream.

Line separators are not added, so it is usual for each of the lines provided to have a line separator at the end.

class cognite.extractorutils.util.EitherId(**kwargs: int | str | None)[source]

Bases: object

Class representing an ID in CDF, which can either be an external or internal ID.

An EitherId can only hold one ID type, not both.

Parameters:
  • id – Internal ID

  • external_id – external ID. It can be external_id or externalId

Raises:

TypeError – If none of both of id types are set.

content() int | str[source]

Get the value of the ID.

Returns:

The ID

type() str[source]

Get the type of the ID.

Returns:

‘id’ if the EitherId represents an internal ID, ‘externalId’ if the EitherId represents an external ID

cognite.extractorutils.util.add_extraction_pipeline(extraction_pipeline_ext_id: str, cognite_client: CogniteClient, heartbeat_waiting_time: int = 600, added_message: str = '') Callable[[Callable[[...], _T1]], Callable[[...], _T1]][source]

This is to be used as a decorator for extractor functions to add extraction pipeline information.

Parameters:
  • extraction_pipeline_ext_id – External ID of the extraction pipeline

  • cognite_client – Client to use when communicating with CDF

  • heartbeat_waiting_time – Target interval between heartbeats, in seconds

  • added_message – Message to add to the extraction pipeline run status message.

Usage:

If you have a function named “extract_data(*args, **kwargs)” and want to connect it to an extraction pipeline, you can use this decorator function as:

@add_extraction_pipeline(
    extraction_pipeline_ext_id=<INSERT EXTERNAL ID>,
    cognite_client=<INSERT COGNITE CLIENT OBJECT>,
)
def extract_data(*args, **kwargs):
    <INSERT FUNCTION BODY>
cognite.extractorutils.util.cognite_exceptions(status_codes: list[int] | None = None) dict[type[Exception], Callable[[Any], bool]][source]

Retry exceptions from using the Cognite SDK.

This will retry all connection and HTTP errors matching the given status codes.

Example:

@retry(exceptions = cognite_exceptions())
def my_function() -> None:
    ...
cognite.extractorutils.util.datetime_to_timestamp(dt: datetime) int[source]

Convert a datetime object to a timestamp in milliseconds since 1970-01-01 00:00:00 UTC.

Parameters:

dt – The datetime object to convert. It should be timezone-aware.

Returns:

The timestamp in milliseconds.

cognite.extractorutils.util.ensure_assets(cdf_client: CogniteClient, assets: Iterable[Asset]) None[source]

Ensure that all the given assets exists in CDF.

Searches through the tenant after the external IDs of the assets given, and creates those that are missing.

Parameters:
  • cdf_client – Tenant to create assets in

  • assets – Assets to create

cognite.extractorutils.util.ensure_time_series(cdf_client: CogniteClient, time_series: Iterable[TimeSeries]) None[source]

Ensure that all the given time series exists in CDF.

Searches through the tenant after the external IDs of the time series given, and creates those that are missing.

Parameters:
  • cdf_client – Tenant to create time series in

  • time_series – Time series to create

cognite.extractorutils.util.httpx_exceptions(status_codes: list[int] | None = None) dict[type[Exception], Callable[[Any], bool]][source]

Retry exceptions from using the httpx library.

This will retry all connection and HTTP errors matching the given status codes.

Example:

@retry(exceptions = httpx_exceptions())
def my_function() -> None:
    ...
cognite.extractorutils.util.iterable_to_stream(iterator: Iterable[bytes], file_size_bytes: int, buffer_size: int = 8192, on_close: Callable[[], None] | None = None) BufferedReadWithLength[source]

Convert an iterable of bytes into a stream that can be read from.

Parameters:
  • iterator – An iterable that yields bytes. This can be a generator or any other iterable.

  • file_size_bytes – The total size of the file in bytes. This is used to set the length of the stream.

  • buffer_size – The size of the buffer to use when reading from the stream.

  • on_close – A callable that will be called when the stream is closed. This can be used to clean up resources.

Returns:

A BufferedReader that can be read from, with a known length.

cognite.extractorutils.util.now() int[source]

Current time in CDF format (milliseconds since 1970-01-01 00:00:00 UTC).

cognite.extractorutils.util.requests_exceptions(status_codes: list[int] | None = None) dict[type[Exception], Callable[[Any], bool]][source]

Retry exceptions from using the requests library.

This will retry all connection and HTTP errors matching the given status codes.

Example:

@retry(exceptions = requests_exceptions())
def my_function() -> None:
    ...
cognite.extractorutils.util.retry(cancellation_token: ~cognite.extractorutils.threading.CancellationToken | None = None, exceptions: tuple[type[Exception], ...] | dict[type[Exception], ~collections.abc.Callable[[~typing.Any], bool]] = (<class 'Exception'>,), tries: int = 10, delay: float = 1, max_delay: float | None = 60, backoff: float = 2, jitter: float | tuple[float, float] = (0, 2)) Callable[[Callable[[...], _T2]], Callable[[...], _T2]][source]

Returns a retry decorator.

This is adapted from https://github.com/invl/retry

Parameters:
  • cancellation_token – a threading token that is waited on.

  • exceptions – a tuple of exceptions to catch, or a dictionary from exception types to a callback determining whether to retry the exception or not. The callback will be given the exception object as argument. default: retry all exceptions.

  • tries – the maximum number of attempts. default: -1 (infinite).

  • delay – initial delay between attempts. default: 0.

  • max_delay – the maximum value of delay. default: None (no limit).

  • backoff – multiplier applied to delay between attempts. default: 1 (no backoff).

  • jitter – extra seconds added to delay between attempts. default: 0. fixed if a number, random if a range tuple (min, max)

  • logger – logger.warning(fmt, error, delay) will be called on failed attempts. default: retry.logging_logger. if None, logging is disabled.

cognite.extractorutils.util.throttled_loop(target_time: int, cancellation_token: CancellationToken) Generator[None, None, None][source]

A loop generator that automatically sleeps until each iteration has taken the desired amount of time.

Useful for when you want to avoid overloading a source system with requests.

Example

This example will throttle printing to only print every 10th second:

for _ in throttled_loop(10, stop_event):
    print("Hello every 10 seconds!")
Parameters:
  • target_time – How long (in seconds) an iteration should take om total

  • cancellation_token – An Event object that will act as the stop event. When set, the loop will stop.

Returns:

A generator that will only yield when the target iteration time is met

cognite.extractorutils.util.timestamp_to_datetime(ts: int) datetime[source]

Convert a timestamp in milliseconds since 1970-01-01 00:00:00 UTC to a datetime object.

Parameters:

ts – The timestamp in milliseconds.

Returns:

A datetime object representing the timestamp in UTC.

cognite.extractorutils.util.truncate_byte_len(item: str, ln: int) str[source]

Safely truncate an arbitrary utf-8 string.

Used to sanitize metadata.

Parameters:
  • item (str) – string to be truncated

  • ln (int) – length (bytes)

Returns:

truncated string

Return type:

str