To install this package:
pip install cognite-extractor-utils
If the Cognite SDK is not already installed, the installation will automatically fetch and install it as well.
API requests can be expensive to perform, even with a well-performing backend. The solution is to batch toghether more data into a single API request.
The uploader module contains upload queues. These will hold data until a configured condition is met, triggering an upload. For example, instead of
client = CogniteClient() while not stop: timestamp, value = source.query() client.datapoints.insert((timestamp, value), external_id="my-timeseries")
which would do very many API requests to CDF, you could do
client = CogniteClient() upload_queue = TimeSeriesUploadQueue(cdf_client=client, max_upload_interval=1) upload_queue.start() while not stop: timestamp, value = source.query() upload_queue.add_to_upload_queue((timestamp, value), external_id="my-timeseries") upload_queue.stop()
max_upload_interval specifies the maximum time (in seconds) between each API call. The upload method will be
stop() as well so no datapoints are lost. You can also use the queue as a context:
client = CogniteClient() with TimeSeriesUploadQueue(cdf_client=client, max_upload_interval=1) as upload_queue: while not stop: timestamp, value = source.query() upload_queue.add_to_upload_queue((timestamp, value), external_id="my-timeseries")
This will call the
stop() methods automatically.
You can also trigger uploads after a given amount of data is added, by using the
max_queue_size keyword argument
instead. If both are used, the condition being met first will trigger the upload.
Similar queues exists for RAW rows and Events too.
A state store object is a dictionary from an external ID to a low and high watermark, used to keep track of the progress of a front- or backfill between runs.
A state is a tuple, typically containing two timestamps (range of extracted data).
You can choose the back-end for your state store with which class you’re instatiating:
# A state store using a JSON file as remote storage: states = LocalStateStore("state.json") states.initialize() # A state store using a RAW table as remote storage: states = RawStateStore( cdf_client = CogniteClient(), database = "extractor_states", table = "my_extractor_deployment" ) states.initialize()
initialize() method loads all the states from the configured remote store.
You can now use this state store to get states:
low, high = states.get_state(external_id = "my-id")
You can set states:
states.set_state(external_id = "another-id", high=100)
and similar for
set_state(...) method will always overwrite the current state. Some times you might
want to only set state if larger than the previous state, in that case consider
# High watermark of another-id is already 100, nothing happens in this call: states.expand_state(external_id = "another-id", high=50) # This will set high to 150 as it is larger than the previous state states.expand_state(external_id = "another-id", high=150)
To store the state to the remote store, use the
Integrating with upload queues¶
You can set a state store to automatically update on upload triggers from an upload queue by using the
post_upload_function in the upload queue:
states = LocalStateStore("state.json") states.initialize() uploader = TimeSeriesUploadQueue( cdf_client = CogniteClient(), max_upload_interval = 10 post_upload_function = states.post_upload_handler() ) # The state store is now updated automatically! states.synchronize()
The metrics module contains a general, pre-built metrics collection, as well as tools to routinely push metrics to a remote server.
BaseMetrics class forms the basis for a metrics collection for an extractor, containing some general metrics
that all extractors should report (such as e.g. CPU and memory usage of the extractor). To create your own set of
metrics, subclass this class and populate it with extractor-specific metrics, as such:
class MyMetrics(BaseMetrics): def __init__(self): super().__init__(extractor_name="my_extractor", extractor_version=__version__) self.a_counter = Counter("my_extractor_example_counter", "An example counter") ...
The metrics module also contains some Pusher classes that are used to routinely send metrics to a
remote server, these can be automatically created with the
start_pushers method described in
The configtools contains base classes for configuration, and a YAML loader to automatically serialize these dataclasses from a config file.
Configs are described as
dataclass``es, and use the ``BaseConfig class as a superclass to get a few things built-in:
config version, Cognite project and logging. Use type hints to specify types, use the
Optional type to specify that
a config parameter is optional, and give the attribute a value to give it a default.
For example, a config class for an extractor may look like the following:
class ExtractorConfig: parallelism: int = 10 state_store: Optional[StateStoreConfig] ... @dataclass class SourceConfig: host: str username: str password: str ... @dataclass class MyConfig(BaseConfig): extractor: ExtractorConfig source: SourceConfig
You can then load a YAML file into this dataclass with the load_yaml function:
with open("config.yaml") as infile: config = load_yaml(infile, MyConfig)
The config object¶
The config object can additionally do several things, such as:
CogniteClient based on the config:
client = config.cognite.get_cognite_client("my-client")
Setup the logging according to the config:
Start and stop threads to automatically push all the prometheus metrics in the default prometheus registry to the configured push-gateways:
config.metrics.start_pushers(client) # Extractor code config.metrics.stop_pushers()
Get a state store object as configured:
states = config.extractor.state_store.create_state_store()