# Building a Data Warehouse: R, PostgreSQL, bash, and Task Scheduler

## 2019/10/03

### Preamble

How easy is it to get started building a data warehouse? What are the fundamental requirements for this system? Four fundamental components are required;

• a data source
• a query mechanism (with incremental update functionality)
• a target (typically a database)
• a scheduling mechanism

### Deliverable

A simplified working data-warehouse deployed via PostgreSQL to hold data (“hourly rainfall”) from the German Weather Service.

### Architecture

The application consists of several chained orchestration and execution processes:

• scheduling via Windows Scheduler, calling a .bat file
• a calling process .Rscript
• the application logic embedded in a .Rmd file, executing both R and bash code blocks

### Querying the Data

The source data is stored on a [ftp server, on a per-station basis as zipped archives. The data is stored as .txt files - specifically the last 365 days of hourly measurements up until the preceding measurement day.

I use the amazing map function to prepare the listing of files to be called.

library(tidyverse)
library(lubridate)

# set working directory
setwd("C:/Users/eoedd/Desktop/locstore/projects/r/rainfall/output")

# define stations of interest: Bremen, Braunschweig and Magdeburg
sNames <- c("00691","00662","03126")

# function for defining source location on ftp server
namesFunc <- function(x) {

paste0("ftp://ftp-cdc.dwd.de/climate_environment/CDC/observations_germany/climate/hourly/precipitation/recent/",
"stundenwerte_RR_",x,"_akt.zip")

}

# iterate for all instances
map_chr(sNames,namesFunc) %>% write(.,"iList.csv")

Leveraging the power of the pandoc engine in R I can call an array of other languages (awk, bash, python etc.) to execute code.

Here I call bash available on my Windows OS via the Windows Subsystem for Linux, to download the archives.

# set working directory
cd /mnt/c/Users/eoedd/Desktop/locstore/projects/r/rainfall/output

wget -i iList.csv

Unzipping the the files is achieved using dir followed by map - this code is independent of (changes in) filenames.

setwd("C:/Users/eoedd/Desktop/locstore/projects/r/rainfall/output")
zips <- dir(pattern = "*.zip")

# use the power of purrr to map over values
map(zips,unzip)

I again can rely on a bash script to clean up working directory.

# clean up directory
cd /mnt/c/Users/eoedd/Desktop/locstore/projects/r/rainfall/output
rm [M]*
rm [s]*

### Initial Insert into the Target

The individual files can be stacked in a single data-frame using map_df.

setwd("C:/Users/eoedd/Desktop/locstore/projects/r/rainfall/output")

# load files of unknown name
files <- dir(pattern = "*.txt")

# map to a DF and parse appropriately
feed <- files %>% map_df(read_delim,";", escape_double = FALSE, trim_ws = TRUE,na = "-999",col_types = cols(MESS_DATUM = col_character())) %>% .[,1:6]

# parse dates
feed$MESS_DATUM <- ymd_h(feed$MESS_DATUM)

I connect to my local PostgreSQL instance using the DBI package.

library(DBI)

# connect to Postrgres database
con <- dbConnect(RPostgres::Postgres(),dbname="postgres",

The first time the pipeline is run we need to do an initial full-load of the data.

# initial loads - the subset is all avialable data
# commented out after first load

subset <- feed
dbWriteTable(con, "rainfall", subset)

### Incremental Loads of New Data

For all subsequent iterations we can use the “highwater-mark” principle to only take new records and append them to the pre-existing table.

Two vectors of values holding the station identifier and maximum date are extracted for use in the incremental routine.

# make a conenction to the table
# get the highwater mark :: maxixum date per station
res<-dbGetQuery(con,'
select "STATIONS_ID", max("MESS_DATUM") as max_date
from "rainfall"
group by "STATIONS_ID"
')

# extract off individual vectors
station <- res$STATIONS_ID max_date <- res$max_date

This routine should be scalable to N-sources, therefore I write a function to hold the incremental load logic.

The function selects the most recent data i.e. anything newer than the what already exists in the database on a station basis. A logging mechanism to a local directory is also used for data-quality control.

Execution is facilitated by the multi-parameter version of the purrr map_* function.

pmap applies a function to the given list of input values in parallel - referencing the parameters using the dot-notation (..1, ..2, ..n etc.)

setwd("C:/Users/eoedd/Desktop/locstore/projects/r/rainfall/output/logs")

# filters the feed to the highwater mark for each station
# appends to existing database
# writes out log files of incremental loads for quality control

incFunc <- function(station,max_date) {

subset <- feed %>% filter(STATIONS_ID == station & MESS_DATUM > max_date)

dbWriteTable(con, "rainfall", subset, append = TRUE)

write.csv(subset,paste0(station,"inc_",Sys.Date(),".csv"))

}

# iterate using pmap
pmap(list(station,max_date),~incFunc(..1,..2))

### Cleaning Up

We clean up our processing directory as a final execution step.

cd /mnt/c/Users/eoedd/Desktop/locstore/projects/r/rainfall/output

rm [i]*
rm [p]*

Scheduling is handled by a .bat file (task.bat) which calls a steering script (task.R) that in turn calls the execution script (rainfall_datawarehouse.Rmd).

The contents of the files in each case are as follows;

Calls the R CMD BATCH command, with the path pointing to the steering script.

@echo off

"C:\Program Files\Microsoft\R Open\R-3.5.3\bin\x64\R.exe" CMD BATCH C:\Users\eoedd\Desktop\locstore\projects\r\rainfall\task.R

Initiates the executions script by referencing the required libraries for execution. Essentially a render command from the rmarkdown library.

setwd("C:/Users/eoedd/Desktop/locstore/projects/r/rainfall")

Sys.setenv(RSTUDIO_PANDOC="C:/Program Files/RStudio/bin/pandoc")

library(rmarkdown)

rmarkdown::render("rainfall_datawarehouse.Rmd",output_format = "all")

To schedule this routine, I create a basic schedule in Windows Task Schedule which executes the task.bat file.

What is particularly useful is that the R CMD BATCH call generates an outfile on execution, which can be used to verify correct execution of each R chunk within the .Rmd file.