Preamble
How easy is it to get started building a data warehouse? What are the fundamental requirements for this system? Four fundamental components are required;
- a data source
- a query mechanism (with incremental update functionality)
- a target (typically a database)
- a scheduling mechanism
Deliverable
A simplified working data-warehouse deployed via PostgreSQL to hold data (“hourly rainfall”) from the German Weather Service.
Architecture
The application consists of several chained orchestration and execution processes:
- scheduling via Windows Scheduler, calling a
.bat
file - a calling process
.Rscript
- the application logic embedded in a
.Rmd
file, executing both R and bash code blocks
Querying the Data
The source data is stored on a [ftp server, on a per-station basis as zipped archives. The data is stored as .txt
files - specifically the last 365 days of hourly measurements up until the preceding measurement day.
I use the amazing map
function to prepare the listing of files to be called.
# load libraries
library(tidyverse)
library(lubridate)
# set working directory
setwd("C:/Users/eoedd/Desktop/locstore/projects/r/rainfall/output")
# define stations of interest: Bremen, Braunschweig and Magdeburg
sNames <- c("00691","00662","03126")
# function for defining source location on ftp server
namesFunc <- function(x) {
paste0("ftp://ftp-cdc.dwd.de/climate_environment/CDC/observations_germany/climate/hourly/precipitation/recent/",
"stundenwerte_RR_",x,"_akt.zip")
}
# iterate for all instances
map_chr(sNames,namesFunc) %>% write(.,"iList.csv")
Leveraging the power of the pandoc
engine in R I can call an array of other languages (awk, bash, python etc.) to execute code.
Here I call bash available on my Windows OS via the Windows Subsystem for Linux, to download the archives.
# set working directory
cd /mnt/c/Users/eoedd/Desktop/locstore/projects/r/rainfall/output
# download all files
wget -i iList.csv
Unzipping the the files is achieved using dir
followed by map
- this code is independent of (changes in) filenames.
setwd("C:/Users/eoedd/Desktop/locstore/projects/r/rainfall/output")
zips <- dir(pattern = "*.zip")
# use the power of purrr to map over values
map(zips,unzip)
I again can rely on a bash script to clean up working directory.
# clean up directory
cd /mnt/c/Users/eoedd/Desktop/locstore/projects/r/rainfall/output
rm [M]*
rm [s]*
Initial Insert into the Target
The individual files can be stacked in a single data-frame using map_df
.
setwd("C:/Users/eoedd/Desktop/locstore/projects/r/rainfall/output")
# load files of unknown name
files <- dir(pattern = "*.txt")
# map to a DF and parse appropriately
feed <- files %>% map_df(read_delim,";", escape_double = FALSE, trim_ws = TRUE,na = "-999",col_types = cols(MESS_DATUM = col_character())) %>% .[,1:6]
# parse dates
feed$MESS_DATUM <- ymd_h(feed$MESS_DATUM)
I connect to my local PostgreSQL instance using the DBI
package.
library(DBI)
# connect to Postrgres database
con <- dbConnect(RPostgres::Postgres(),dbname="postgres",
port=5432,user="USER",password="PASSWORD")
The first time the pipeline is run we need to do an initial full-load of the data.
# initial loads - the subset is all avialable data
# commented out after first load
subset <- feed
dbWriteTable(con, "rainfall", subset)
Incremental Loads of New Data
For all subsequent iterations we can use the “highwater-mark” principle to only take new records and append them to the pre-existing table.
Two vectors of values holding the station identifier and maximum date are extracted for use in the incremental routine.
# make a conenction to the table
# get the highwater mark :: maxixum date per station
res<-dbGetQuery(con,'
select "STATIONS_ID", max("MESS_DATUM") as max_date
from "rainfall"
group by "STATIONS_ID"
')
# extract off individual vectors
station <- res$STATIONS_ID
max_date <- res$max_date
This routine should be scalable to N-sources, therefore I write a function to hold the incremental load logic.
The function selects the most recent data i.e. anything newer than the what already exists in the database on a station basis. A logging mechanism to a local directory is also used for data-quality control.
Execution is facilitated by the multi-parameter version of the purrr map_*
function.
pmap
applies a function to the given list of input values in parallel - referencing the parameters using the dot-notation (..1, ..2, ..n etc.)
setwd("C:/Users/eoedd/Desktop/locstore/projects/r/rainfall/output/logs")
# incremental load functions
# filters the feed to the highwater mark for each station
# appends to existing database
# writes out log files of incremental loads for quality control
incFunc <- function(station,max_date) {
subset <- feed %>% filter(STATIONS_ID == station & MESS_DATUM > max_date)
dbWriteTable(con, "rainfall", subset, append = TRUE)
write.csv(subset,paste0(station,"inc_",Sys.Date(),".csv"))
}
# iterate using pmap
pmap(list(station,max_date),~incFunc(..1,..2))
Cleaning Up
We clean up our processing directory as a final execution step.
cd /mnt/c/Users/eoedd/Desktop/locstore/projects/r/rainfall/output
rm [i]*
rm [p]*
Scheduling the Incremental Loads
Scheduling is handled by a .bat
file (task.bat) which calls a steering script (task.R) that in turn calls the execution script (rainfall_datawarehouse.Rmd).
The contents of the files in each case are as follows;
- task.bat
Calls the R CMD BATCH
command, with the path pointing to the steering script.
@echo off
"C:\Program Files\Microsoft\R Open\R-3.5.3\bin\x64\R.exe" CMD BATCH C:\Users\eoedd\Desktop\locstore\projects\r\rainfall\task.R
- task.R
Initiates the executions script by referencing the required libraries for execution. Essentially a render command from the rmarkdown
library.
setwd("C:/Users/eoedd/Desktop/locstore/projects/r/rainfall")
Sys.setenv(RSTUDIO_PANDOC="C:/Program Files/RStudio/bin/pandoc")
library(rmarkdown)
rmarkdown::render("rainfall_datawarehouse.Rmd",output_format = "all")
To schedule this routine, I create a basic schedule in Windows Task Schedule which executes the task.bat
file.
What is particularly useful is that the R CMD BATCH call generates an outfile on execution, which can be used to verify correct execution of each R chunk within the .Rmd file.