Package reference¶
configtools
- Utilities for reading, parsing and validating config files¶
The configtools
module exists of tools for loading and verifying config files for extractors.
Extractor configurasions are conventionally written in hyphen-cased YAML. These are typically loaded and serialized as dataclasses in Python.
Config loader¶
-
cognite.extractorutils.configtools.
load_yaml
(source: Union[TextIO, str], config_type: Type[T], case_style: str = 'hyphen', expand_envvars=True) → T[source]¶ Read a YAML file, and create a config object based on its contents.
Parameters: - source – Input stream (as returned by open(…)) or string containing YAML.
- config_type – Class of config type (i.e. your custom subclass of BaseConfig).
- case_style – Casing convention of config file. Valid options are ‘snake’, ‘hyphen’ or ‘camel’. Should be ‘hyphen’.
- expand_envvars – Substitute values with the pattern ${VAR} with the content of the environment variable VAR
Returns: An initialized config object.
Raises: InvalidConfigError
– If any config field is given as an invalid type, is missing or is unknown
Base classes¶
The configtools
module contains several prebuilt config classes for many common parameters. The class BaseConfig
is intended as a starting point for a custom configuration schema, containing parameters for config version, CDF connection and logging.
Example:
@dataclass
class ExtractorConfig:
state_store: Optional[StateStoreConfig]
...
@dataclass
class SourceConfig:
...
@dataclass
class MyConfig(BaseConfig):
extractor: ExtractorConfig
source: SourceConfig
-
class
cognite.extractorutils.configtools.
BaseConfig
(version: Union[str, int, None], cognite: cognite.extractorutils.configtools.CogniteConfig, logger: cognite.extractorutils.configtools.LoggingConfig)[source]¶ Basis for an extractor config, containing config version,
CogniteConfig
andLoggingConfig
-
class
cognite.extractorutils.configtools.
CogniteConfig
(project: str, api_key: Optional[str], idp_authentication: Optional[cognite.extractorutils.authentication.AuthenticatorConfig], data_set_id: Optional[int], external_id_prefix: str = '', host: str = 'https://api.cognitedata.com')[source]¶ Configuration parameters for CDF connection, such as project name, host address and API key
-
class
cognite.extractorutils.configtools.
LoggingConfig
(console: Optional[cognite.extractorutils.configtools._ConsoleLoggingConfig], file: Optional[cognite.extractorutils.configtools._FileLoggingConfig])[source]¶ Logging settings, such as log levels and path to log file
-
class
cognite.extractorutils.configtools.
MetricsConfig
(push_gateways: Optional[List[cognite.extractorutils.configtools._PushGatewayConfig]], cognite: Optional[cognite.extractorutils.configtools._CogniteMetricsConfig])[source]¶ Destination(s) for metrics, including options for one or several Prometheus push gateways, and pushing as CDF Time Series.
-
class
cognite.extractorutils.configtools.
StateStoreConfig
(raw: Union[cognite.extractorutils.configtools.RawStateStoreConfig, NoneType] = None, local: Union[cognite.extractorutils.configtools.LocalStateStoreConfig, NoneType] = None)[source]¶
metrics
- Automatic pushers of performance metrics¶
Module containing tools for pushers for metric reporting.
The classes in this module scrape the default Prometheus registry and uploads it periodically to either a Prometheus push gateway, or to CDF as time series.
The BaseMetrics
class forms the basis for a metrics collection for an extractor, containing some general metrics
that all extractors should report. To create your own set of metrics, subclass this class and populate it with
extractor-specific metrics, as such:
class MyMetrics(BaseMetrics):
def __init__(self):
super().__init__(extractor_name="my_extractor", extractor_version=__version__)
self.a_counter = Counter("my_extractor_example_counter", "An example counter")
...
-
class
cognite.extractorutils.metrics.
AbstractMetricsPusher
(push_interval: Optional[int] = None, thread_name: Optional[str] = None)[source]¶ Bases:
abc.ABC
Base class for metric pushers. Metric pushers spawns a thread that routinely pushes metrics to a configured destination.
Contains all the logic for starting and running threads.
Parameters: - push_interval – Seconds between each upload call
- thread_name – Name of thread to start. If omitted, a standard name such as Thread-4 will be generated.
-
class
cognite.extractorutils.metrics.
BaseMetrics
(extractor_name: str, extractor_version: str, process_scrape_interval: float = 15)[source]¶ Bases:
object
Base collection of extractor metrics. The class also spawns a collector thread on init that regularly fetches process information and update the
process_*
gauges.To create a set of metrics for an extractor, create a subclass of this class.
Note that only one instance of this class (or any subclass) can exist simultaneously
- The collection includes the following metrics:
- startup: Startup time (unix epoch)
- finish: Finish time (unix epoch)
- process_num_threads Number of active threads. Set automatically.
- process_memory_bytes Memory usage of extractor. Set automatically.
- process_cpu_percent CPU usage of extractor. Set automatically.
Parameters: - extractor_name – Name of extractor, used to prefix metric names
- process_scrape_interval – Interval (in seconds) between each fetch of data for the
process_*
gauges
-
class
cognite.extractorutils.metrics.
CognitePusher
(cdf_client: cognite.client._cognite_client.CogniteClient, external_id_prefix: str, push_interval: int, asset: Optional[cognite.client.data_classes.assets.Asset] = None, thread_name: Optional[str] = None)[source]¶ Bases:
cognite.extractorutils.metrics.AbstractMetricsPusher
Pusher to CDF. Creates time series in CDF for all Gauges and Counters in the default Prometheus registry.
Optional contextualization with an Asset to make the time series observable in Asset Data Insight. The given asset will be created at root level in the tenant if it doesn’t already exist.
Parameters: - cdf_client – The CDF tenant to upload time series to
- external_id_prefix – Unique external ID prefix for this pusher.
- push_interval – Seconds between each upload call
- asset – Optional contextualization.
- thread_name – Name of thread to start. If omitted, a standard name such as Thread-4 will be generated.
-
start
() → None¶ Starts a thread that pushes the default registry to the configured gateway at certain intervals.
-
stop
() → None¶ Stop the push loop.
-
class
cognite.extractorutils.metrics.
PrometheusPusher
(job_name: str, url: str, push_interval: int, username: Optional[str] = None, password: Optional[str] = None, thread_name: Optional[str] = None)[source]¶ Bases:
cognite.extractorutils.metrics.AbstractMetricsPusher
Pusher to a Prometheus push gateway.
Parameters: - job_name – Prometheus job name
- username – Push gateway credentials
- password – Push gateway credentials
- url – URL (with portnum) of push gateway
- push_interval – Seconds between each upload call
- thread_name – Name of thread to start. If omitted, a standard name such as Thread-4 will be generated.
-
start
() → None¶ Starts a thread that pushes the default registry to the configured gateway at certain intervals.
-
stop
() → None¶ Stop the push loop.
-
cognite.extractorutils.metrics.
safe_get
(cls: Type[T]) → T[source]¶ A factory for instances of metrics collections.
Since Prometheus doesn’t allow multiple metrics with the same name, any subclass of BaseMetrics must never be created more than once. This function creates an instance of the given class on the first call and stores it, any subsequent calls with the same class as argument will return the same instance.
>>> a = safe_get(MyMetrics) # This will create a new instance of MyMetrics >>> b = safe_get(MyMetrics) # This will return the same instance >>> a is b True
Parameters: cls – Metrics class to either create or get a cached version of Returns: An instance of given class
statestore
- Storing extractor state between runs locally or remotely¶
The statestore
module contains classes for keeping track of the extraction state of individual items, facilitating
incremental load and speeding up startup times.
At the beginning of a run the extractor typically calls the initialize
method, which loads the states from the
remote store (which can either be a local JSON file or a table in CDF RAW), and during and/or at the end of a run, the
synchronize
method is called, which saves the current states to the remote store.
-
class
cognite.extractorutils.statestore.
AbstractStateStore
[source]¶ Bases:
abc.ABC
Base class for a state store.
-
delete_state
(external_id: str) → None[source]¶ Delete an external ID from the state store.
Parameters: external_id – External ID to remove
-
expand_state
(external_id: str, low: Optional[Any] = None, high: Optional[Any] = None) → None[source]¶ Like set_state, but only sets state if the proposed state is outside the stored state. That is if e.g. low is lower than the stored low.
Parameters: - external_id – External ID of e.g. time series to store state of
- low – Low watermark
- high – High watermark
-
get_state
(external_id: Union[str, List[str]]) → Union[Tuple[Any, Any], List[Tuple[Any, Any]]][source]¶ Get state(s) for external ID(s)
Parameters: external_id – An external ID or list of external IDs to get states for Returns: A tuple with (low, high) watermarks, or a list of tuples
-
outside_state
(external_id: str, new_state: Any) → bool[source]¶ Check if a new proposed state is outside state interval (ie, if a new datapoint should be processed).
Returns true if new_state is outside of stored state or if external_id is previously unseen.
Parameters: - external_id – External ID to test
- new_state – Proposed new state to test
Returns: True if new_state is higher than the stored high watermark or lower than the low watermark.
-
post_upload_handler
() → Callable[[List[Dict[str, Union[str, List[Dict[Union[int, float, datetime.datetime], Union[int, float, str]]], List[Tuple[Union[int, float, datetime.datetime], Union[int, float, str]]]]]]], None][source]¶ Get a callable suitable for passing to a time series upload queue as post_upload_function, that will automatically update the states in this state store when that upload queue is uploading.
Returns: A function that expands the current states with the values given
-
-
class
cognite.extractorutils.statestore.
LocalStateStore
(file_path: str)[source]¶ Bases:
cognite.extractorutils.statestore.AbstractStateStore
An extractor state store using a local JSON file as backend.
Parameters: file_path – File path to JSON file to use -
delete_state
(external_id: str) → None¶ Delete an external ID from the state store.
Parameters: external_id – External ID to remove
-
expand_state
(external_id: str, low: Optional[Any] = None, high: Optional[Any] = None) → None¶ Like set_state, but only sets state if the proposed state is outside the stored state. That is if e.g. low is lower than the stored low.
Parameters: - external_id – External ID of e.g. time series to store state of
- low – Low watermark
- high – High watermark
-
get_state
(external_id: Union[str, List[str]]) → Union[Tuple[Any, Any], List[Tuple[Any, Any]]]¶ Get state(s) for external ID(s)
Parameters: external_id – An external ID or list of external IDs to get states for Returns: A tuple with (low, high) watermarks, or a list of tuples
-
initialize
(force: bool = False) → None[source]¶ Load states from specified JSON file
Parameters: force – Enable re-initialization, ie overwrite when called multiple times
-
outside_state
(external_id: str, new_state: Any) → bool¶ Check if a new proposed state is outside state interval (ie, if a new datapoint should be processed).
Returns true if new_state is outside of stored state or if external_id is previously unseen.
Parameters: - external_id – External ID to test
- new_state – Proposed new state to test
Returns: True if new_state is higher than the stored high watermark or lower than the low watermark.
-
post_upload_handler
() → Callable[[List[Dict[str, Union[str, List[Dict[Union[int, float, datetime.datetime], Union[int, float, str]]], List[Tuple[Union[int, float, datetime.datetime], Union[int, float, str]]]]]]], None]¶ Get a callable suitable for passing to a time series upload queue as post_upload_function, that will automatically update the states in this state store when that upload queue is uploading.
Returns: A function that expands the current states with the values given
-
set_state
(external_id: str, low: Optional[Any] = None, high: Optional[Any] = None) → None¶ Set/update state of a singe external ID.
Parameters: - external_id – External ID of e.g. time series to store state of
- low – Low watermark
- high – High watermark
-
-
class
cognite.extractorutils.statestore.
NoStateStore
[source]¶ Bases:
cognite.extractorutils.statestore.AbstractStateStore
A state store that only keeps states in memory and never stores or initializes from external sources.
-
delete_state
(external_id: str) → None¶ Delete an external ID from the state store.
Parameters: external_id – External ID to remove
-
expand_state
(external_id: str, low: Optional[Any] = None, high: Optional[Any] = None) → None¶ Like set_state, but only sets state if the proposed state is outside the stored state. That is if e.g. low is lower than the stored low.
Parameters: - external_id – External ID of e.g. time series to store state of
- low – Low watermark
- high – High watermark
-
get_state
(external_id: Union[str, List[str]]) → Union[Tuple[Any, Any], List[Tuple[Any, Any]]]¶ Get state(s) for external ID(s)
Parameters: external_id – An external ID or list of external IDs to get states for Returns: A tuple with (low, high) watermarks, or a list of tuples
-
outside_state
(external_id: str, new_state: Any) → bool¶ Check if a new proposed state is outside state interval (ie, if a new datapoint should be processed).
Returns true if new_state is outside of stored state or if external_id is previously unseen.
Parameters: - external_id – External ID to test
- new_state – Proposed new state to test
Returns: True if new_state is higher than the stored high watermark or lower than the low watermark.
-
post_upload_handler
() → Callable[[List[Dict[str, Union[str, List[Dict[Union[int, float, datetime.datetime], Union[int, float, str]]], List[Tuple[Union[int, float, datetime.datetime], Union[int, float, str]]]]]]], None]¶ Get a callable suitable for passing to a time series upload queue as post_upload_function, that will automatically update the states in this state store when that upload queue is uploading.
Returns: A function that expands the current states with the values given
-
set_state
(external_id: str, low: Optional[Any] = None, high: Optional[Any] = None) → None¶ Set/update state of a singe external ID.
Parameters: - external_id – External ID of e.g. time series to store state of
- low – Low watermark
- high – High watermark
-
-
class
cognite.extractorutils.statestore.
RawStateStore
(cdf_client: cognite.client._cognite_client.CogniteClient, database: str, table: str)[source]¶ Bases:
cognite.extractorutils.statestore.AbstractStateStore
An extractor state store based on CDF RAW.
Parameters: - cdf_client – Cognite client to use
- database – Name of CDF database
- table – Name of CDF table
-
delete_state
(external_id: str) → None¶ Delete an external ID from the state store.
Parameters: external_id – External ID to remove
-
expand_state
(external_id: str, low: Optional[Any] = None, high: Optional[Any] = None) → None¶ Like set_state, but only sets state if the proposed state is outside the stored state. That is if e.g. low is lower than the stored low.
Parameters: - external_id – External ID of e.g. time series to store state of
- low – Low watermark
- high – High watermark
-
get_state
(external_id: Union[str, List[str]]) → Union[Tuple[Any, Any], List[Tuple[Any, Any]]]¶ Get state(s) for external ID(s)
Parameters: external_id – An external ID or list of external IDs to get states for Returns: A tuple with (low, high) watermarks, or a list of tuples
-
outside_state
(external_id: str, new_state: Any) → bool¶ Check if a new proposed state is outside state interval (ie, if a new datapoint should be processed).
Returns true if new_state is outside of stored state or if external_id is previously unseen.
Parameters: - external_id – External ID to test
- new_state – Proposed new state to test
Returns: True if new_state is higher than the stored high watermark or lower than the low watermark.
-
post_upload_handler
() → Callable[[List[Dict[str, Union[str, List[Dict[Union[int, float, datetime.datetime], Union[int, float, str]]], List[Tuple[Union[int, float, datetime.datetime], Union[int, float, str]]]]]]], None]¶ Get a callable suitable for passing to a time series upload queue as post_upload_function, that will automatically update the states in this state store when that upload queue is uploading.
Returns: A function that expands the current states with the values given
-
set_state
(external_id: str, low: Optional[Any] = None, high: Optional[Any] = None) → None¶ Set/update state of a singe external ID.
Parameters: - external_id – External ID of e.g. time series to store state of
- low – Low watermark
- high – High watermark
uploader
- Batching upload queues with automatic upload triggers¶
Module containing upload queue classes. The UploadQueue classes chunks together items and uploads them together to CDF, both to minimize the load on the API, and also to speed up uploading as requests can be slow.
Each upload queue comes with some configurable conditions that, when met, automatically triggers an upload.
Note: You cannot assume that an element is uploaded when it is added to the queue, since the upload may be delayed. To ensure that everything is uploaded you should set the post_upload_function callback to verify. For example, for a time series queue you might want to check the latest time stamp, as such (assuming incremental time stamps and using timestamp-value tuples as data point format):
state_store = LocalStateStore("states.json")
queue = TimeSeriesUploadQueue(
cdf_client=my_cognite_client,
post_upload_function=state_store.post_upload_handler(),
max_upload_interval=1
)
-
class
cognite.extractorutils.uploader.
AbstractUploadQueue
(cdf_client: cognite.client._cognite_client.CogniteClient, post_upload_function: Optional[Callable[[List[Any]], None]] = None, max_queue_size: Optional[int] = None, max_upload_interval: Optional[int] = None, trigger_log_level: str = 'DEBUG', thread_name: Optional[str] = None)[source]¶ Bases:
abc.ABC
Abstract uploader class.
Parameters: - cdf_client – Cognite Data Fusion client to use
- post_upload_function – A function that will be called after each upload. The function will be given one argument: A list of the elements that were uploaded.
- max_queue_size – Maximum size of upload queue. Defaults to no max size.
- max_upload_interval – Automatically trigger an upload each m seconds when run as a thread (use start/stop methods).
- trigger_log_level – Log level to log upload triggers to.
- thread_name – Thread name of uploader thread.
-
add_to_upload_queue
(*args) → None[source]¶ Adds an element to the upload queue. The queue will be uploaded if the queue byte size is larger than the threshold specified in the config.
-
start
() → None[source]¶ Start upload thread if max_upload_interval is set, this called the upload method every max_upload_interval seconds.
-
class
cognite.extractorutils.uploader.
EventUploadQueue
(cdf_client: cognite.client._cognite_client.CogniteClient, post_upload_function: Optional[Callable[[List[cognite.client.data_classes.events.Event]], None]] = None, max_queue_size: Optional[int] = None, max_upload_interval: Optional[int] = None, trigger_log_level: str = 'DEBUG', thread_name: Optional[str] = None)[source]¶ Bases:
cognite.extractorutils.uploader.AbstractUploadQueue
Upload queue for events
Parameters: - cdf_client – Cognite Data Fusion client to use
- post_upload_function – A function that will be called after each upload. The function will be given one argument: A list of the events that were uploaded.
- max_queue_size – Maximum size of upload queue. Defaults to no max size.
- max_upload_interval – Automatically trigger an upload each m seconds when run as a thread (use start/stop methods).
- trigger_log_level – Log level to log upload triggers to.
- thread_name – Thread name of uploader thread.
-
add_to_upload_queue
(event: cognite.client.data_classes.events.Event) → None[source]¶ Add event to upload queue. The queue will be uploaded if the queue size is larger than the threshold specified in the __init__.
Parameters: event – Event to add
-
start
() → None¶ Start upload thread if max_upload_interval is set, this called the upload method every max_upload_interval seconds.
-
stop
(ensure_upload: bool = True) → None¶ Stop upload thread if running, and ensures that the upload queue is empty if ensure_upload is True.
Parameters: ensure_upload (bool) – (Optional). Call upload one last time after shutting down thread to ensure empty upload queue.
-
class
cognite.extractorutils.uploader.
FileUploadQueue
(cdf_client: cognite.client._cognite_client.CogniteClient, post_upload_function: Optional[Callable[[List[cognite.client.data_classes.events.Event]], None]] = None, max_queue_size: Optional[int] = None, max_upload_interval: Optional[int] = None, trigger_log_level: str = 'DEBUG', thread_name: Optional[str] = None, overwrite_existing: bool = False)[source]¶ Bases:
cognite.extractorutils.uploader.AbstractUploadQueue
Upload queue for files
Parameters: - cdf_client – Cognite Data Fusion client to use
- post_upload_function – A function that will be called after each upload. The function will be given one argument: A list of the events that were uploaded.
- max_queue_size – Maximum size of upload queue. Defaults to no max size.
- max_upload_interval – Automatically trigger an upload each m seconds when run as a thread (use start/stop methods).
- trigger_log_level – Log level to log upload triggers to.
- thread_name – Thread name of uploader thread.
-
add_to_upload_queue
(file_meta: cognite.client.data_classes.files.FileMetadata, file_name: Union[str, os.PathLike] = None) → None[source]¶ Add file to upload queue. The queue will be uploaded if the queue size is larger than the threshold specified in the __init__.
Parameters: - file_meta – File metadata-object
- file_name – Path to file to be uploaded. If none, the file object will still be created, but no data is uploaded
-
start
() → None¶ Start upload thread if max_upload_interval is set, this called the upload method every max_upload_interval seconds.
-
stop
(ensure_upload: bool = True) → None¶ Stop upload thread if running, and ensures that the upload queue is empty if ensure_upload is True.
Parameters: ensure_upload (bool) – (Optional). Call upload one last time after shutting down thread to ensure empty upload queue.
-
class
cognite.extractorutils.uploader.
RawUploadQueue
(cdf_client: cognite.client._cognite_client.CogniteClient, post_upload_function: Optional[Callable[[List[Any]], None]] = None, max_queue_size: Optional[int] = None, max_upload_interval: Optional[int] = None, trigger_log_level: str = 'DEBUG', thread_name: Optional[str] = None)[source]¶ Bases:
cognite.extractorutils.uploader.AbstractUploadQueue
Upload queue for RAW
Parameters: - cdf_client – Cognite Data Fusion client to use
- post_upload_function – A function that will be called after each upload. The function will be given one argument: A list of the rows that were uploaded.
- max_queue_size – Maximum size of upload queue. Defaults to no max size.
- max_upload_interval – Automatically trigger an upload each m seconds when run as a thread (use start/stop methods).
- trigger_log_level – Log level to log upload triggers to.
- thread_name – Thread name of uploader thread.
-
add_to_upload_queue
(database: str, table: str, raw_row: cognite.client.data_classes.raw.Row) → None[source]¶ Adds a row to the upload queue. The queue will be uploaded if the queue size is larger than the threshold specified in the __init__.
Parameters: - database – The database to upload the Raw object to
- table – The table to upload the Raw object to
- raw_row – The row object
-
start
() → None¶ Start upload thread if max_upload_interval is set, this called the upload method every max_upload_interval seconds.
-
stop
(ensure_upload: bool = True) → None¶ Stop upload thread if running, and ensures that the upload queue is empty if ensure_upload is True.
Parameters: ensure_upload (bool) – (Optional). Call upload one last time after shutting down thread to ensure empty upload queue.
-
class
cognite.extractorutils.uploader.
SequenceUploadQueue
(cdf_client: cognite.client._cognite_client.CogniteClient, post_upload_function: Optional[Callable[[List[Any]], None]] = None, max_queue_size: Optional[int] = None, max_upload_interval: Optional[int] = None, trigger_log_level: str = 'DEBUG', thread_name: Optional[str] = None, create_missing=False)[source]¶ Bases:
cognite.extractorutils.uploader.AbstractUploadQueue
-
add_to_upload_queue
(rows: Union[Dict[int, List[Union[int, str, float]]], List[Tuple[int, Union[int, float, str]]], List[Dict[str, Any]], cognite.client.data_classes.sequences.SequenceData], column_external_ids: Optional[List[str]] = None, id: int = None, external_id: str = None) → None[source]¶ Add sequence rows to upload queue. Mirrors implementation of SequenceApi.insert. Inserted rows will be cached until uploaded
Parameters: - rows – The rows to be inserted. Can either be a list of tuples, a list of [“rownumber”: …, “values”: …] objects, a dictionary of rowNumber: data, or a SequenceData object.
- column_external_ids – List of external id for the columns of the sequence
- id – Sequence internal ID Use if external_id is None
- external_id – Sequence external ID Us if id is None
-
set_sequence_column_definition
(col_def: List[Dict[str, str]], id: int = None, external_id: str = None) → None[source]¶ Set sequence column definition
Parameters: - col_def – Sequence column definition
- id – Sequence internal ID Use if external_id is None
- external_id – Sequence external ID Us if id is None
-
set_sequence_metadata
(metadata: Dict[str, Union[str, int, float]], id: int = None, external_id: str = None, asset_external_id: str = None, dataset_external_id: str = None) → None[source]¶ Set sequence metadata. Metadata will be cached until the sequence is created. The metadata will be updated if the sequence already exists
Parameters: - metadata – Sequence metadata
- id – Sequence internal ID Use if external_id is None
- external_id – Sequence external ID Us if id is None
- asset_external_id – Sequence asset ID
- dataset_external_id – Sequence dataset id
-
start
() → None¶ Start upload thread if max_upload_interval is set, this called the upload method every max_upload_interval seconds.
-
stop
(ensure_upload: bool = True) → None¶ Stop upload thread if running, and ensures that the upload queue is empty if ensure_upload is True.
Parameters: ensure_upload (bool) – (Optional). Call upload one last time after shutting down thread to ensure empty upload queue.
-
-
class
cognite.extractorutils.uploader.
TimeSeriesUploadQueue
(cdf_client: cognite.client._cognite_client.CogniteClient, post_upload_function: Optional[Callable[[List[Dict[str, Union[str, List[Dict[Union[int, float, datetime.datetime], Union[int, float, str]]], List[Tuple[Union[int, float, datetime.datetime], Union[int, float, str]]]]]]], None]] = None, max_queue_size: Optional[int] = None, max_upload_interval: Optional[int] = None, trigger_log_level: str = 'DEBUG', thread_name: Optional[str] = None, create_missing: Union[Callable[[str, Union[List[Dict[Union[int, float, datetime.datetime], Union[int, float, str]]], List[Tuple[Union[int, float, datetime.datetime], Union[int, float, str]]]]], cognite.client.data_classes.time_series.TimeSeries], bool] = False)[source]¶ Bases:
cognite.extractorutils.uploader.AbstractUploadQueue
Upload queue for time series
Parameters: - cdf_client – Cognite Data Fusion client to use
- post_upload_function – A function that will be called after each upload. The function will be given one argument: A list of dicts containing the datapoints that were uploaded (on the same format as the kwargs in datapoints upload in the Cognite SDK).
- max_queue_size – Maximum size of upload queue. Defaults to no max size.
- max_upload_interval – Automatically trigger an upload each m seconds when run as a thread (use start/stop methods).
- trigger_log_level – Log level to log upload triggers to.
- thread_name – Thread name of uploader thread.
- create_missing – Create missing time series if possible (ie, if external id is used). Either given as a boolean (True would auto-create a time series with nothing but an external ID), or as a factory function taking an external ID and a list of datapoints about to be inserted and returning a TimeSeries object.
-
add_to_upload_queue
(*, id: int = None, external_id: str = None, datapoints: Union[List[Dict[Union[int, float, datetime.datetime], Union[int, float, str]]], List[Tuple[Union[int, float, datetime.datetime], Union[int, float, str]]]] = []) → None[source]¶ Add data points to upload queue. The queue will be uploaded if the queue size is larger than the threshold specified in the __init__.
Parameters: - id – Internal ID of time series. Either this or external_id must be set.
- external_id – External ID of time series. Either this or external_id must be set.
- datapoints – List of data points to add
-
start
() → None¶ Start upload thread if max_upload_interval is set, this called the upload method every max_upload_interval seconds.
-
stop
(ensure_upload: bool = True) → None¶ Stop upload thread if running, and ensures that the upload queue is empty if ensure_upload is True.
Parameters: ensure_upload (bool) – (Optional). Call upload one last time after shutting down thread to ensure empty upload queue.
-
cognite.extractorutils.uploader.
default_time_series_factory
(external_id: str, datapoints: Union[List[Dict[Union[int, float, datetime.datetime], Union[int, float, str]]], List[Tuple[Union[int, float, datetime.datetime], Union[int, float, str]]]]) → cognite.client.data_classes.time_series.TimeSeries[source]¶ Default time series factory used when create_missing in a TimeSeriesUploadQueue is given as a boolean.
Parameters: - external_id – External ID of time series to create
- datapoints – The list of datapoints that were tried to be inserted
Returns: A TimeSeries object with external_id set, and the is_string automatically detected
util
- Miscellaneous utilities¶
The util
package contains miscellaneous functions and classes that can some times be useful while developing
extractors.
-
class
cognite.extractorutils.util.
EitherId
(**kwargs)[source]¶ Bases:
object
Class representing an ID in CDF, which can either be an external or internal ID. An EitherId can only hold one ID type, not both.
Parameters: - id – Internal ID
- or external_id (externalId) – external ID
Raises: TypeError
– If none of both of id types are set.
-
cognite.extractorutils.util.
ensure_time_series
(cdf_client: cognite.client._cognite_client.CogniteClient, time_series: Iterable[cognite.client.data_classes.time_series.TimeSeries]) → None[source]¶ Ensure that all the given time series exists in CDF.
Searches through the tenant after the external IDs of the time series given, and creates those that are missing.
Parameters: - cdf_client – Tenant to create time series in
- time_series – Time series to create