Quickstart¶
Installation¶
To install this package:
pip install cognite-extractor-utils
If the Cognite SDK is not already installed, the installation will automatically fetch and install it as well.
cognite.extractorutils.uploader
¶
API requests can be expensive to perform, even with a well-performing backend. The solution is to batch toghether more data into a single API request.
The uploader module contains upload queues. These will hold data until a configured condition is met, triggering an upload. For example, instead of
client = CogniteClient()
while not stop:
timestamp, value = source.query()
client.datapoints.insert((timestamp, value), external_id="my-timeseries")
which would do very many API requests to CDF, you could do
client = CogniteClient()
upload_queue = TimeSeriesUploadQueue(cdf_client=client, max_upload_interval=1)
upload_queue.start()
while not stop:
timestamp, value = source.query()
upload_queue.add_to_upload_queue((timestamp, value), external_id="my-timeseries")
upload_queue.stop()
The max_upload_interval
specifies the maximum time (in seconds) between each API call. The upload method will be
called on stop()
as well so no datapoints are lost. You can also use the queue as a context:
client = CogniteClient()
with TimeSeriesUploadQueue(cdf_client=client, max_upload_interval=1) as upload_queue:
while not stop:
timestamp, value = source.query()
upload_queue.add_to_upload_queue((timestamp, value), external_id="my-timeseries")
This will call the start()
and stop()
methods automatically.
You can also trigger uploads after a given amount of data is added, by using the max_queue_size
keyword argument
instead. If both are used, the condition being met first will trigger the upload.
Similar queues exists for RAW rows and Events too.
cognite.extractorutils.statestore
¶
A state store object is a dictionary from an external ID to a low and high watermark, used to keep track of the progress of a front- or backfill between runs.
A state is a tuple, typically containing two timestamps (range of extracted data).
You can choose the back-end for your state store with which class you’re instatiating:
# A state store using a JSON file as remote storage:
states = LocalStateStore("state.json")
states.initialize()
# A state store using a RAW table as remote storage:
states = RawStateStore(
cdf_client = CogniteClient(),
database = "extractor_states",
table = "my_extractor_deployment"
)
states.initialize()
The initialize()
method loads all the states from the configured remote store.
You can now use this state store to get states:
low, high = states.get_state(external_id = "my-id")
You can set states:
states.set_state(external_id = "another-id", high=100)
and similar for low
. The set_state(...)
method will always overwrite the current state. Some times you might
want to only set state if larger than the previous state, in that case consider expand_state(...)
:
# High watermark of another-id is already 100, nothing happens in this call:
states.expand_state(external_id = "another-id", high=50)
# This will set high to 150 as it is larger than the previous state
states.expand_state(external_id = "another-id", high=150)
To store the state to the remote store, use the synchronize()
method:
states.synchronize()
Integrating with upload queues¶
You can set a state store to automatically update on upload triggers from an upload queue by using the
post_upload_function
in the upload queue:
states = LocalStateStore("state.json")
states.initialize()
uploader = TimeSeriesUploadQueue(
cdf_client = CogniteClient(),
max_upload_interval = 10
post_upload_function = states.post_upload_handler()
)
# The state store is now updated automatically!
states.synchronize()
cognite.extractorutils.metrics
¶
The metrics module contains a general, pre-built metrics collection, as well as tools to routinely push metrics to a remote server.
The BaseMetrics
class forms the basis for a metrics collection for an extractor, containing some general metrics
that all extractors should report (such as e.g. CPU and memory usage of the extractor). To create your own set of
metrics, subclass this class and populate it with extractor-specific metrics, as such:
class MyMetrics(BaseMetrics):
def __init__(self):
super().__init__(extractor_name="my_extractor", extractor_version=__version__)
self.a_counter = Counter("my_extractor_example_counter", "An example counter")
...
The metrics module also contains some Pusher classes that are used to routinely send metrics to a
remote server, these can be automatically created with the start_pushers
method described in
configtools
.
cognite.extractorutils.configtools
¶
The configtools contains base classes for configuration, and a YAML loader to automatically serialize these dataclasses from a config file.
Configs are described as dataclass``es, and use the ``BaseConfig
class as a superclass to get a few things built-in:
config version, Cognite project and logging. Use type hints to specify types, use the Optional
type to specify that
a config parameter is optional, and give the attribute a value to give it a default.
For example, a config class for an extractor may look like the following:
class ExtractorConfig:
parallelism: int = 10
state_store: Optional[StateStoreConfig]
...
@dataclass
class SourceConfig:
host: str
username: str
password: str
...
@dataclass
class MyConfig(BaseConfig):
extractor: ExtractorConfig
source: SourceConfig
You can then load a YAML file into this dataclass with the load_yaml function:
with open("config.yaml") as infile:
config = load_yaml(infile, MyConfig)
The config object¶
The config object can additionally do several things, such as:
Creating a CogniteClient
based on the config:
client = config.cognite.get_cognite_client("my-client")
Setup the logging according to the config:
config.logger.setup_logging()
Start and stop threads to automatically push all the prometheus metrics in the default prometheus registry to the configured push-gateways:
config.metrics.start_pushers(client)
# Extractor code
config.metrics.stop_pushers()
Get a state store object as configured:
states = config.extractor.state_store.create_state_store()