Streaming services are the new norm for data ingestion and therefore we are using Apache Kafka to ingest data in a columnar parquet format in S3. The data is stored in S3, partitioned by year and month. On top we have Schema Crawler running on it and are generating a schema automatically, which is accessible from AWS Athena.
Our use case for the DWH is, to push our data into AWS Redshift, which is a columnar MPP database and used for our Data Warehousing purpose. In order to generate a schema in AWS Redshift, it needs to know the meta data of the parquet files. The good news is, that we can retrieve the meta data of those files via AWS Athena and Redshift Spectrum. Therefore, we can create an external schema with example schema given below and we will receive all tables from Athena lists and make it accessible with external schema in Redshift.
create external schema schema_name_of_your_choice
from data catalog
database ‘your_external_schema’
iam_role ‘arn:aws:iam::your_arn_number:role/your_redshift_role’;
Alright – So now first challenge solved!!!
Now the next question is how to create tables so we can use the Redshift COPY command to load data. Good news is that Redshift has its own internal metadata tables where information about table definition is stored. Therefore, we can use the SVV_COLUMNS table to get the positions of columns and may be filter some of columns which are not part of the table, for example a partitioned column. Some data types in Redshift are different to Athena – for example struct and array is stored as super data type in Redshift. The query given below provides us with data regarding a table and weird concatenation case when statement which result in many rows for one table – but do not worry 🙂 it will make sense shortly.
SELECT
TABLE_NAME,
CASE
WHEN ordinal_position = 1
THEN ‘create table ’ + TABLE_NAME + ' ( '
ELSE ‘,’
END AS comma ,
COLUMN_NAME,
CASE
WHEN data_type = ‘string’
THEN ‘varchar’ + ‘(’ + character_maximum_length + ‘)’
WHEN LEFT(data_type,6) = ‘struct’
OR LEFT(data_type,5) = ‘array’
THEN ‘super’
ELSE data_type
END AS data_type ,
ordinal_position
FROM
SVV_COLUMNS
WHERE
table_schema IN (‘your_external_schema’)
AND COLUMN_NAME NOT IN (‘your_partition_name’)
ORDER BY
TABLE_NAME,
ordinal_position
The query given above provides us with an unusual data structure and many rows per tables which does not make sense at all. Luckily, we have one great function in Redshift to help us, called “listagg”. This function creates multiple rows, adds them as one and we can group by table name. Pffffff, finally now we have one row for one table and its table definition generated dynamically. Now we have schemas generated automatically (YAY!! :-)).
But how do we generate copy command dynamically as well, so we can load data from S3 to Redshift? Well, it turns out that we can quite easily add one more column with few modifications.
DROP TABLE
IF EXISTS table_definition_temp;
CREATE TEMP TABLE table_definition_temp AS
WITH
temp_table AS
( SELECT
TABLE_NAME,
CASE
WHEN ordinal_position = 1
THEN ‘create table ’ + TABLE_NAME + ' ( '
ELSE ‘,’
END AS comma ,
COLUMN_NAME,
CASE
WHEN data_type = ‘string’
THEN ‘varchar’ + ‘(’ + character_maximum_length + ‘)’
WHEN LEFT(data_type,6) = ‘struct’
OR LEFT(data_type,5) = ‘array’
THEN ‘super’
ELSE data_type
END AS data_type ,
ordinal_position
FROM
SVV_COLUMNS
WHERE
table_schema IN (‘your_external_schema’)
AND COLUMN_NAME NOT IN (‘_date’)
ORDER BY
TABLE_NAME,
ordinal_position
)
SELECT
TABLE_NAME,
(comma :: VARCHAR || ' ' || COLUMN_NAME || ' ' || data_type :: VARCHAR) :: VARCHAR AS
table_definition,
ltrim(TO_CHAR(GETDATE()::date -1 ,‘YYYYMM’),‘0’) AS partition_year_month
FROM
temp_table;
SELECT
TABLE_NAME,
listagg(table_definition) + ' ) ;' table_definition,
'
truncate your_schema_name.' + TABLE_NAME + ' ;
copy your_schema_name' + TABLE_NAME +
'
from ‘’s3://path/to/data/’ + TABLE_NAME +
'
iam_role ‘’replace_with_real_iam_role’'
FORMAT AS PARQUET SERIALIZETOJSON ;
'
FROM
table_definition_temp
GROUP BY
TABLE_NAME;
Finally, we have every ingredient for our recipe! But how do we execute it? Well, there are many ways to do it. One of the ways which we are using to execute the query given above in Apache Airflow is to create tasks of each row and execute it as an independent task.
Schema changes are always a topic for a DWH team. Those changes must be communicated in time by other teams and must be adapted by the DWH team in timely manner as well to avoid data load failures in the nightly ETL process. We are solving those issues, so DWH can focus on elaborated tasks and support the company in taking data driven decisions.
This process is start of bigger project, which will help Data Engineers to do faster and better data related work and helps Data Analyst to provide quicker insights to Business.