Uploading data to CDF
In the previous chapter we introduced an upload queue, in this chapter we will look more closely at these. We will also
look at using the base Extractor
class which will, among other things,
automate the reporting of extractor runs to the extraction pipelines feature in CDF.
Using an upload queue
We begin by looking at the upload queue. Since this extractor will write to CDF RAW, we will use the
RawUploadQueue
. Similar queues exists for
time series data points
,
events
,
assets
,
sequence rows
and files
.
The reason for using an upload queue is to batch together data into larger requests to CDF. This will increase performance, some times quite dramatically since network latencies can often be a bottleneck for extractors. We can add elements (in our case, RAW rows) to the queue, and when we trigger an upload, all the data in the queue will be merged into as big requests as possible. If there is too much data in the queue for a single request, the queue will automatically split it into several requests and perform them in parallel, further increasing performance.
Instead of triggering these uploads ourselves, the reccomended way to use an upload queue is to use it as a context
manager (with a with
statement). The advantage is that we can then set one or more criteria for when uploads should
happen (such as every x seconds or when the queue has y elements in it), and don’t think about it again. If the
queue is not empty when we exit the context, a final upload will be made to make sure no data is left behind.
To create a RawUploadQueue
, we write therefore start with
with RawUploadQueue(cdf_client=cognite, max_queue_size=100_000) as queue:
This will make an upload queue that uploads every time we reach 100 000 rows. We could let that value be configurable if
we were worried about memory usage. The reason we chose 100 000 in this example is that one request to RAW can handle up
to 10 000 rows, so this would allow us to batch up and send 10 request in parallel. If data freshness is a concern, we
could set the max_upload_interval
as well which would define a maximum time (in seconds) between uploads.
Tying it all together
To finish our extractor, we will enter the extractor.py
file created by cogex
, and fill in the run_extractor
function. This function is called from the __main__.py
file, and is provided with a set of arguments:
cognite
is an initiatedCogniteClient
that is set up to to use the CDF project that the user configured in their config file.states
is aState Store
object, we will not cover these in this tutorial, but in short it allows us to keep track of extraction state between runs to avoid duplicate workconfig
is the config file the user have provided, which has been loaded and stored as an instance of the Config class we made in the Read CSV files chapter.stop_event
is an instance of the `CancellationToken class`_ It will be set whenever the extractor is asked to stop (for example by a user sending an interrupt signal), or you can set it yourself when a stopping condition is met. In our case, we will check the status of this stop event in our main loop before submitting a file for extraction. This way, when a user e.g. hits CTRL+C to stop the extractor, it will finish what it was doing, upload the final data to CDF, and then stop gracefully.
In the run_extractor
function we will first create an upload queue, and then loop through all the files from the
config and hand it to the extract_file
function from the Read CSV files chapter. Our function then looks like
this:
def run_extractor(cognite: CogniteClient, states: AbstractStateStore, config: Config, stop_event: Event):
with RawUploadQueue(cdf_client=cognite, max_queue_size=100_000) as queue:
for file in config.files:
if stop_event.is_set():
break
extract_file(file, queue)
This will call the start()
and stop()
methods from AbstractUploadQueue
class automatically once all files are processed or the limit of the queue, defined by the max_queue_size
keyword argument, is reached.
Extraction pipeline runs
Extraction pipelines are a way of monitoring the health of our extractor from CDF itself. You can read more about it here.
Our extractor is already set up to use extraction pipelines, this is because of the use of the
Extractor
base class in __main__.py
:
def main() -> None:
with Extractor(
name="csv_extractor",
description="An extractor that takes CSV files and uploads their content to RAW",
config_class=Config,
run_handle=run_extractor,
version=__version__,
) as extractor:
extractor.run()
Since we are using the Extractor
class as a context manager, it will detect
if an unhandled exception is thrown in the run_extractor
function. If such an unhandled exception occurs, it will
report a new failed run. If the context manager exits cleanly, it will report a new successful run.
To enable reporting of runs, the user would simply have to include an extraction-pipeline
field in the cognite
section of the config file, containing either an external-id
or (internal) id
.
cognite:
# Read these from environment variables
host: ${COGNITE_BASE_URL}
project: ${COGNITE_PROJECT}
idp-authentication:
token-url: ${COGNITE_TOKEN_URL}
client-id: ${COGNITE_CLIENT_ID}
secret: ${COGNITE_CLIENT_SECRET}
scopes:
- ${COGNITE_BASE_URL}/.default
extraction-pipeline:
external-id: abc123