Building your first end to end data orchestration and data pipeline could be overwhelming. There are numerous tech stacks and open source tools one can use, so it could be hard to decide, learn and build one fully functional production pipeline.
Fortunately, AWS provides a set of serverless services that we can use to load the data from either on-prem or cloud database, transform and load it into another service with very simple cloud based serverless architecture.
A serverless architecture enables agile principles and allows continuous delivery to customers, it supports requirement changes, and without any surplus need to manage infrastructure. It also empowers developers to own the platform.
Below is the data architectural diagram for the different AWS services we used to design, build, and implement data pipelines in Production.
Here is the step by step details with code for each of these steps -
You can have multiple source systems producing data in different file formats, it can be plain CSV or zipped file (Gzip or Bzip) or XML. You can use push method from the source system to push the files to S3. It simply means sending the files to S3 from any external or internal system that can communicate with your AWS environment securely.
This pipeline also connects to Oracle database as one of the source systems, Using the Glue connection with JDBC drivers you can directly connect to the database and pull the data, so the extra work of creating the files and sending to S3 is all taken care by Glue within AWS. If your Database can directly connect to your AWS environment, this is the best approach to pull the data using Spark’s parallel processing capabilities
For creating a Glue connection to connect to Oracle Database, you need to create the JDBC URL in the below pattern
Refer this to for more details about Glue JDBC Connection and its properties -
You can either use the AWS Glue UI to create a Glue connection like below -
and give all the required details.
Additionally you can also use the below CloudFormation template to create a connection.
I like to have everything in the form of code so it can be saved in Code Repository and can be deployed in multiple environments, so you will find CF template for almost every artifact.
# The name of the connection to be created
CatalogId: !Ref AWS::AccountId
Description: "Connect to Oracle via JDBC"
"JDBC_CONNECTION_URL": !Ref CFNJDBCString,
"USERNAME": !Ref CFNJDBCUser,
"PASSWORD": !Ref CFNJDBCPassword
Name: !Ref CFNConnectionName
Once you have established the connection, you are ready to connect to the oracle instance.
All the data lands into S3 bucket and also called as Raw layer, as the name implies, data is stored in its original format with no transformations.
This helps in maintaining the source files intact and also helps in debugging at later point in time.
In S3 Raw layer, data is saved in its multiple formats — CSV, XML, JSON or any other format.
If we are pulling data directly from the database, you could get it in parquet format as well via spark.
Lambda / event based triggers is used to kick off stage Glue job as soon as file lands in Raw layer. This means that you are processing the file to create its staging table as soon as it lands with very low latency.
Create the function as below —
s3_resource = boto3.resource('s3')
glue = boto3.client('glue')
def lambda_handler(event, context):object = event['Records']['s3']['object']['key']
gluejobname = object.split('/')[-2]
jobrun = glue.start_job_run(JobName=gluejobname)return
Note — gluejobname is derived from the S3 object using py split function, so make sure the name of the source file and Glue job are in sync, this will enable running multiple jobs as soon as its individual file lands in raw-layer-2021 bucket.
Now that Lamda function is ready, lets create the bucket where our source data will be saved
Create three S3 buckets as below -
Event Trigger on S3
We have created the lambda and bucket, now we need to invoke the function for every push/put operation in raw-layer-2021 S3 bucket
S3 bucket → Properties → Event Notifications
Create Event Notification -
This will create an event for all/any create operation in the source1 folder of raw layer bucket.
Our Raw layer is ready with data from source systems, Lets get started with Staging.
Staging sits between Raw Layer and Mart Layer and contains all the data from with little transformations.
You can Create a Glue job per table and using Spark functions to create the transformation. You could also write all the functions in a Py file and call the same library file in all the jobs as below —
Refer my another blog on how to create Glue jobs using CloudFormation template and how to deploy the jobs for multiple environments.
You can do any number of transformations here to clean and validate the data, below are the some of them.
- CDC — Change Data Capture — You could write a simple algorithm to capture the changed data, where you compare with what has already been loaded yesterday vs today and load only the modified/updated and new records.
- Primary Key — You can add all the logical keys to your stage here and it helps while creating the mart and makes it simple to do SQL joins in data. especially when you have data from multiple source systems.
- Audit Columns — You must add audit columns at each layer to monitor the data loading and processing.
- Schema Validation — You could either use Glue Crawler to update schema or have a pre-defined JSON Schema files and use that schema to load the data and raise exceptions in case there is schema mismatch,
I would recommend using defined schema only if you don’t completely trust the source system’s schema or if its file system, If you are pulling directly from any DB, using Glue crawler is a wise option.
5. Partition — You can also add partitioning to your data. You can partition the data on any column/key but the best practice is to use a date column or a column that is used most in your WHERE clause to filter the data.
By partitioning your data, you can restrict the amount of data scanned by each query, thus improving performance and reducing cost.
Once staging is complete, you can schedule the Mart jobs to run after staging Jobs, You can use Glue Workflow triggers to create dependency in Mart and Stage Jobs
Refer the below link to create a Glue Trigger and Workflow.
Mart Layer is the most important layer as this would be the customer facing layer.
Your staging will have all the Source data but Mart could be built as per requirement and could also join data from multiple source systems to build one common Platform
Since we are working on serverless architecture, we are not using Redshift or any other database, there is no way we can update the data in S3 yet, so every time we want to show the latest data to customer, we will have to reload the Mart.
You can customise the Mart as per business requirements, Here are few examples -
- As-of-date Data Mart — Create tables in Athena using Spark SQL in Glue Jobs to only pick latest row for every record from Staging Layer.
This would be one of simplest and easy to maintain Data Mart, You could truncate and reload the Mart everyday and it would show only as-of-date data.
2. SCD2 Data mart — You could have your dimensions in SCD2, even here you would be building CDC to truncate and load Mart daily.
3. Historical Data Mart — If customer wants to build the ML models on historical data, you would want to save all the data in Mart with historical changes over certain period of time.
4. Star Schema — You can build Facts and dimensions and can have “Logical” Foreign Key references from dimensions to Fact tables
Final Layer — Data View Layer
AWS QuickSight — Well, this is where customer plays a vital role, if you are building the Data Mart for reporting, you may want to use AWS QuickSight to build dashboards and reports.
AWS SageMaker — If the data mart is built for Analytics, you would want to write athena queries in SageMaker to build the ML models.
AWS Athena —When you are creating Mart Glue jobs, creating your Glue Catalog and updating crawler, it would be creating tables in Athena for visualisation.
This the high level architecture view of serverless ETL pipeline in AWS. You could use this architecture to build a production enterprise level ETL orchestration.
Please feel free to reach out in comments sections if you have any questions or want to build the pipeline, I would be happy to help!