Building a Data Warehouse: R, PostgreSQL, bash, and Task Scheduler

Dan Gray

2019/10/03

Preamble

How easy is it to get started building a data warehouse? What are the fundamental requirements for this system? Four fundamental components are required;

Deliverable

A simplified working data-warehouse deployed via PostgreSQL to hold data (“hourly rainfall”) from the German Weather Service.

Architecture

The application consists of several chained orchestration and execution processes:

Querying the Data

The source data is stored on a [ftp server, on a per-station basis as zipped archives. The data is stored as .txt files - specifically the last 365 days of hourly measurements up until the preceding measurement day.

I use the amazing map function to prepare the listing of files to be called.

# load libraries
library(tidyverse)
library(lubridate)

# set working directory
setwd("C:/Users/eoedd/Desktop/locstore/projects/r/rainfall/output")

# define stations of interest: Bremen, Braunschweig and Magdeburg
sNames <- c("00691","00662","03126")

# function for defining source location on ftp server
namesFunc <- function(x) {
  
  paste0("ftp://ftp-cdc.dwd.de/climate_environment/CDC/observations_germany/climate/hourly/precipitation/recent/",
         "stundenwerte_RR_",x,"_akt.zip")
  
}

# iterate for all instances
map_chr(sNames,namesFunc) %>% write(.,"iList.csv")

Leveraging the power of the pandoc engine in R I can call an array of other languages (awk, bash, python etc.) to execute code.

Here I call bash available on my Windows OS via the Windows Subsystem for Linux, to download the archives.

# set working directory
cd /mnt/c/Users/eoedd/Desktop/locstore/projects/r/rainfall/output

# download all files
wget -i iList.csv

Unzipping the the files is achieved using dir followed by map - this code is independent of (changes in) filenames.

setwd("C:/Users/eoedd/Desktop/locstore/projects/r/rainfall/output")
zips <- dir(pattern = "*.zip")

# use the power of purrr to map over values
map(zips,unzip)

I again can rely on a bash script to clean up working directory.

# clean up directory
cd /mnt/c/Users/eoedd/Desktop/locstore/projects/r/rainfall/output
rm [M]*
rm [s]*

Initial Insert into the Target

The individual files can be stacked in a single data-frame using map_df.

setwd("C:/Users/eoedd/Desktop/locstore/projects/r/rainfall/output")

# load files of unknown name
files <- dir(pattern = "*.txt")

# map to a DF and parse appropriately
feed <- files %>% map_df(read_delim,";", escape_double = FALSE, trim_ws = TRUE,na = "-999",col_types = cols(MESS_DATUM = col_character())) %>% .[,1:6]

# parse dates
feed$MESS_DATUM <- ymd_h(feed$MESS_DATUM)

I connect to my local PostgreSQL instance using the DBI package.

library(DBI)

# connect to Postrgres database
con <- dbConnect(RPostgres::Postgres(),dbname="postgres",
                 port=5432,user="USER",password="PASSWORD")

The first time the pipeline is run we need to do an initial full-load of the data.

# initial loads - the subset is all avialable data
# commented out after first load

subset <- feed 
dbWriteTable(con, "rainfall", subset)

Incremental Loads of New Data

For all subsequent iterations we can use the “highwater-mark” principle to only take new records and append them to the pre-existing table.

Two vectors of values holding the station identifier and maximum date are extracted for use in the incremental routine.

# make a conenction to the table
# get the highwater mark :: maxixum date per station
res<-dbGetQuery(con,'
  select "STATIONS_ID", max("MESS_DATUM") as max_date
  from "rainfall"
  group by "STATIONS_ID"
')

# extract off individual vectors
station <- res$STATIONS_ID
max_date <- res$max_date

This routine should be scalable to N-sources, therefore I write a function to hold the incremental load logic.

The function selects the most recent data i.e. anything newer than the what already exists in the database on a station basis. A logging mechanism to a local directory is also used for data-quality control.

Execution is facilitated by the multi-parameter version of the purrr map_* function.

pmap applies a function to the given list of input values in parallel - referencing the parameters using the dot-notation (..1, ..2, ..n etc.)

setwd("C:/Users/eoedd/Desktop/locstore/projects/r/rainfall/output/logs")

# incremental load functions
# filters the feed to the highwater mark for each station
# appends to existing database
# writes out log files of incremental loads for quality control

incFunc <- function(station,max_date) {
  
  subset <- feed %>% filter(STATIONS_ID == station & MESS_DATUM > max_date)
  
  dbWriteTable(con, "rainfall", subset, append = TRUE)
  
  write.csv(subset,paste0(station,"inc_",Sys.Date(),".csv"))
  
}

# iterate using pmap
pmap(list(station,max_date),~incFunc(..1,..2))

Cleaning Up

We clean up our processing directory as a final execution step.

cd /mnt/c/Users/eoedd/Desktop/locstore/projects/r/rainfall/output

rm [i]*
rm [p]*

Scheduling the Incremental Loads

Scheduling is handled by a .bat file (task.bat) which calls a steering script (task.R) that in turn calls the execution script (rainfall_datawarehouse.Rmd).

The contents of the files in each case are as follows;

  1. task.bat

Calls the R CMD BATCH command, with the path pointing to the steering script.

@echo off

"C:\Program Files\Microsoft\R Open\R-3.5.3\bin\x64\R.exe" CMD BATCH C:\Users\eoedd\Desktop\locstore\projects\r\rainfall\task.R
  1. task.R

Initiates the executions script by referencing the required libraries for execution. Essentially a render command from the rmarkdown library.

setwd("C:/Users/eoedd/Desktop/locstore/projects/r/rainfall")

Sys.setenv(RSTUDIO_PANDOC="C:/Program Files/RStudio/bin/pandoc")

library(rmarkdown)

rmarkdown::render("rainfall_datawarehouse.Rmd",output_format = "all")

To schedule this routine, I create a basic schedule in Windows Task Schedule which executes the task.bat file.

What is particularly useful is that the R CMD BATCH call generates an outfile on execution, which can be used to verify correct execution of each R chunk within the .Rmd file.