Read CSV files

In this chapter, we will see how we can read and parse CSV files with Python. The standard library in Python actually has a csv module that makes this process very easy.

In the extractor.py file generated by cogex we start by defining a logger object. We will place this globally in the module since it will come in handy several places:

logger = logging.getLogger(__name__)

We then define a new extract_file function that will take in a FileConfig and an upload queue. We will look more closely at upload queues in the Uploading data to CDF chapter. Any project initiated by cogex will perform type checks before every commit. We should therefore include type hints in our function definition, like so:

def extract_file(file: FileConfig, queue: RawUploadQueue) -> None:

To make our extractor observable and easier to debug if something should go wrong, it is good to have the extractor make small log statements whenever it does something worth noting, so we begin with a log statement including the file we are extracting, and where we are uploading it:

logger.info(f"Extracting content from {file.path} to {file.destination.database}/{file.destination.table}")

We now create a new DictReader, imported from the csv module:

with open(file.path) as infile:
    reader = csv.DictReader(infile, delimiter=",")

If we wanted to make our extractor even more generic, we could have made the delimiter configurable as well.

We now have a reader object. This is an iterator that will return each row in the CSV file as a dictionary where column names are keys and the row are values. We can now iterate over this reader and add all the rows to the upload queue using the add_to_upload_queue method.

for row in reader:
    queue.add_to_upload_queue(
        database=file.destination.database,
        table=file.destination.table,
        raw_row=Row(key=row[file.key_column], columns=row),
    )

Our final extract_file function looks like the following:

def extract_file(file: FileConfig, queue: RawUploadQueue) -> None:
    logger.info(f"Extracting content from {file.path} to {file.destination.database}/{file.destination.table}")

    with open(file.path) as infile:
        reader = csv.DictReader(infile, delimiter=",")

        for row in reader:
            queue.add_to_upload_queue(
                database=file.destination.database,
                table=file.destination.table,
                raw_row=Row(key=row[file.key_column], columns=row),
            )