Read CSV files
In this chapter, we will see how we can read and parse CSV files with Python. The standard library in Python actually has a csv module that makes this process very easy.
In the extractor.py
file generated by cogex
we start by defining a logger object. We will place this globally in
the module since it will come in handy several places:
logger = logging.getLogger(__name__)
We then define a new extract_file
function that will take in a FileConfig
and an upload queue. We will look more
closely at upload queues in the Uploading data to CDF chapter. Any project initiated by cogex
will perform
type checks before every commit. We should therefore include type hints in our function definition, like so:
def extract_file(file: FileConfig, queue: RawUploadQueue) -> None:
To make our extractor observable and easier to debug if something should go wrong, it is good to have the extractor make small log statements whenever it does something worth noting, so we begin with a log statement including the file we are extracting, and where we are uploading it:
logger.info(f"Extracting content from {file.path} to {file.destination.database}/{file.destination.table}")
We now create a new DictReader
, imported from the csv
module:
with open(file.path) as infile:
reader = csv.DictReader(infile, delimiter=",")
If we wanted to make our extractor even more generic, we could have made the delimiter
configurable as well.
We now have a reader
object. This is an iterator that will return each row in the CSV file as a dictionary where
column names are keys and the row are values. We can now iterate over this reader and add all the rows to the upload
queue using the add_to_upload_queue
method.
for row in reader:
queue.add_to_upload_queue(
database=file.destination.database,
table=file.destination.table,
raw_row=Row(key=row[file.key_column], columns=row),
)
Our final extract_file
function looks like the following:
def extract_file(file: FileConfig, queue: RawUploadQueue) -> None:
logger.info(f"Extracting content from {file.path} to {file.destination.database}/{file.destination.table}")
with open(file.path) as infile:
reader = csv.DictReader(infile, delimiter=",")
for row in reader:
queue.add_to_upload_queue(
database=file.destination.database,
table=file.destination.table,
raw_row=Row(key=row[file.key_column], columns=row),
)