Source code for cognite.extractorutils.uploader.raw

#  Copyright 2023 Cognite AS
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

from types import TracebackType
from typing import Any, Callable, Dict, List, Optional, Type

import arrow
from arrow import Arrow

from cognite.client import CogniteClient
from cognite.client.data_classes import Row
from cognite.extractorutils.threading import CancellationToken
from cognite.extractorutils.uploader._base import (
    RETRIES,
    RETRY_BACKOFF_FACTOR,
    RETRY_DELAY,
    RETRY_MAX_DELAY,
    AbstractUploadQueue,
    TimestampedObject,
)
from cognite.extractorutils.uploader._metrics import (
    RAW_UPLOADER_QUEUE_SIZE,
    RAW_UPLOADER_ROWS_DUPLICATES,
    RAW_UPLOADER_ROWS_QUEUED,
    RAW_UPLOADER_ROWS_WRITTEN,
)
from cognite.extractorutils.util import cognite_exceptions, retry


[docs] class RawUploadQueue(AbstractUploadQueue): """ Upload queue for RAW Args: cdf_client: Cognite Data Fusion client to use post_upload_function: A function that will be called after each upload. The function will be given one argument: A list of the rows that were uploaded. max_queue_size: Maximum size of upload queue. Defaults to no max size. max_upload_interval: Automatically trigger an upload each m seconds when run as a thread (use start/stop methods). trigger_log_level: Log level to log upload triggers to. thread_name: Thread name of uploader thread. """ def __init__( self, cdf_client: CogniteClient, post_upload_function: Optional[Callable[[List[Any]], None]] = None, max_queue_size: Optional[int] = None, max_upload_interval: Optional[int] = None, trigger_log_level: str = "DEBUG", thread_name: Optional[str] = None, cancellation_token: Optional[CancellationToken] = None, ): # Super sets post_upload and thresholds super().__init__( cdf_client, post_upload_function, max_queue_size, max_upload_interval, trigger_log_level, thread_name, cancellation_token, ) self.upload_queue: Dict[str, Dict[str, List[TimestampedObject]]] = {} # It is a hack since Prometheus client registers metrics on object creation, so object has to be created once self.rows_queued = RAW_UPLOADER_ROWS_QUEUED self.rows_written = RAW_UPLOADER_ROWS_WRITTEN self.rows_duplicates = RAW_UPLOADER_ROWS_DUPLICATES self.queue_size = RAW_UPLOADER_QUEUE_SIZE
[docs] def add_to_upload_queue(self, database: str, table: str, raw_row: Row) -> None: """ Adds a row to the upload queue. The queue will be uploaded if the queue size is larger than the threshold specified in the __init__. Args: database: The database to upload the Raw object to table: The table to upload the Raw object to raw_row: The row object """ with self.lock: # Ensure that the dicts has correct keys if database not in self.upload_queue: self.upload_queue[database] = {} if table not in self.upload_queue[database]: self.upload_queue[database][table] = [] # Append row to queue self.upload_queue[database][table].append(TimestampedObject(payload=raw_row, created=Arrow.utcnow())) self.upload_queue_size += 1 self.rows_queued.labels(f"{database}:{table}").inc() self.queue_size.set(self.upload_queue_size) self._check_triggers()
[docs] def upload(self) -> None: """ Trigger an upload of the queue, clears queue afterwards """ @retry( exceptions=cognite_exceptions(), cancellation_token=self.cancellation_token, tries=RETRIES, delay=RETRY_DELAY, max_delay=RETRY_MAX_DELAY, backoff=RETRY_BACKOFF_FACTOR, ) def _upload_batch(database: str, table: str, patch: List[Row]) -> None: # Upload self.cdf_client.raw.rows.insert(db_name=database, table_name=table, row=patch, ensure_parent=True) if len(self.upload_queue) == 0: return with self.lock: for database, tables in self.upload_queue.items(): for table, rows in tables.items(): _labels = f"{database}:{table}" # Deduplicate # In case of duplicate keys, the first key is preserved, and the last value is preserved. patch: Dict[str, Row] = {r.payload.key: r.payload for r in rows} self.rows_duplicates.labels(_labels).inc(len(rows) - len(patch)) _upload_batch(database=database, table=table, patch=list(patch.values())) self.rows_written.labels(_labels).inc(len(patch)) _written: Arrow = arrow.utcnow() # Perform post-upload logic if applicable try: self._post_upload(rows) except Exception as e: self.logger.error("Error in upload callback: %s", str(e)) self.upload_queue.clear() self.logger.info(f"Uploaded {self.upload_queue_size} raw rows") self.upload_queue_size = 0 self.queue_size.set(self.upload_queue_size)
def __enter__(self) -> "RawUploadQueue": """ Wraps around start method, for use as context manager Returns: self """ self.start() return self def __exit__( self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType] ) -> None: """ Wraps around stop method, for use as context manager Args: exc_type: Exception type exc_val: Exception value exc_tb: Traceback """ self.stop() def __len__(self) -> int: """ The size of the upload queue Returns: Number of elements in queue """ return self.upload_queue_size