Source code for cognite.extractorutils.uploader.assets

#  Copyright 2023 Cognite AS
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

from collections.abc import Callable
from types import TracebackType
from typing import Any

from cognite.client import CogniteClient
from cognite.client.data_classes.assets import Asset
from cognite.client.exceptions import CogniteDuplicatedError
from cognite.extractorutils.threading import CancellationToken
from cognite.extractorutils.uploader._base import (
    RETRIES,
    RETRY_BACKOFF_FACTOR,
    RETRY_DELAY,
    RETRY_MAX_DELAY,
    AbstractUploadQueue,
)
from cognite.extractorutils.uploader._metrics import (
    ASSETS_UPLOADER_QUEUE_SIZE,
    ASSETS_UPLOADER_QUEUED,
    ASSETS_UPLOADER_WRITTEN,
)
from cognite.extractorutils.util import cognite_exceptions, retry


[docs] class AssetUploadQueue(AbstractUploadQueue): """ Upload queue for assets Args: cdf_client: Cognite Data Fusion client to use post_upload_function: A function that will be called after each upload. The function will be given one argument: A list of the assets that were uploaded. max_queue_size: Maximum size of upload queue. Defaults to no max size. max_upload_interval: Automatically trigger an upload each m seconds when run as a thread (use start/stop methods). trigger_log_level: Log level to log upload triggers to. thread_name: Thread name of uploader thread. cancellation_token: Cancellation token """ def __init__( self, cdf_client: CogniteClient, post_upload_function: Callable[[list[Any]], None] | None = None, max_queue_size: int | None = None, max_upload_interval: int | None = None, trigger_log_level: str = "DEBUG", thread_name: str | None = None, cancellation_token: CancellationToken | None = None, ): super().__init__( cdf_client, post_upload_function, max_queue_size, max_upload_interval, trigger_log_level, thread_name, cancellation_token, ) self.upload_queue: list[Asset] = [] self.assets_queued = ASSETS_UPLOADER_QUEUED self.assets_written = ASSETS_UPLOADER_WRITTEN self.queue_size = ASSETS_UPLOADER_QUEUE_SIZE
[docs] def add_to_upload_queue(self, asset: Asset) -> None: """ Add asset to upload queue. The queue will be uploaded if the queue size is larger than the threshold specified in the __init__. Args: asset: Asset to add """ with self.lock: self.upload_queue.append(asset) self.assets_queued.inc() self.upload_queue_size += 1 self.queue_size.set(self.upload_queue_size) self._check_triggers()
[docs] def upload(self) -> None: """ Trigger an upload of the queue, clears queue afterwards """ @retry( exceptions=cognite_exceptions(), cancellation_token=self.cancellation_token, tries=RETRIES, delay=RETRY_DELAY, max_delay=RETRY_MAX_DELAY, backoff=RETRY_BACKOFF_FACTOR, ) def _upload_batch() -> None: try: self.cdf_client.assets.create(self.upload_queue) except CogniteDuplicatedError as e: duplicated_ids = set([dup["externalId"] for dup in e.duplicated if "externalId" in dup]) failed: list[Asset] = [e for e in e.failed] to_create = [] to_update = [] for asset in failed: if asset.external_id is not None and asset.external_id in duplicated_ids: to_update.append(asset) else: to_create.append(asset) if to_create: self.cdf_client.assets.create(to_create) if to_update: self.cdf_client.assets.update(to_update) if len(self.upload_queue) > 0: with self.lock: _upload_batch() try: self._post_upload(self.upload_queue) except Exception as e: self.logger.error("Error in upload callback: %s", str(e)) self.assets_written.inc(self.upload_queue_size) self.logger.info(f"Uploaded {self.upload_queue_size} assets") self.upload_queue_size = 0 self.upload_queue.clear() self.queue_size.set(self.upload_queue_size)
def __enter__(self) -> "AssetUploadQueue": """ Wraps around start method, for use as context manager Returns: self """ self.start() return self def __exit__( self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None, ) -> None: """ Wraps around stop method, for use as context manager Args: exc_type: Exception type exc_val: Exception value exc_tb: Traceback """ self.stop()