Predicting the Life Journey of Consignments through Advanced Data Continuum’s (Part 2/3)

Introduction

In the first part of this series, we had covered the overall architecture and details of data injection mechanism that help us accurately predict every event in the life of a consignment.

In this second part of the series, we will give an overview of the storage layer for persisting the audit trail of events corresponding to a consignment.

Article2 Image1

High level architecture of the persistence layer

Following are the key process flows based on the above diagram:

  • Streaming the ingested data to Amazon S3 for long term storage
  • Data persistence design
  • Preparing the data for analytics and audit
  • Processing the data to generating analytics
  • Maintaining dependent ETL jobs' queries graph using Apache Airflow

Streaming the ingested data to Amazon S3 for long term storage

As mentioned previously, upon capturing the event changelogs, we persist the events in amazon S3 buckets. This bucket is the starting point for data crunching for all analytics jobs. For streaming the ingested events from Apache Kafka to Amazon S3, we use the S3 connector plugin over the Kafka connect cluster again.

Data persistence design

Data partitioning

As we stream the data into S3, the changelog data is stored in partitions. In our case, we partition the data on ingestion timestamp. This is done by creating hourly folders for data ingestion. Our directory structure in S3 has the following structure:

yyyy=2018:mm=04:dd=16:hh=21,

yyyy=2018:mm=04:dd=16:hh=22,

yyyy=2018:mm=04:dd=16:hh=23

The advantage of using such a structure is that data is loaded incrementally. Since the data is partitioned based on ingestion timestamp, one partition contains data ingested in that corresponding one hour only. Another advantage of this approach is management of transactional data for very large tables. This is particularly relevant in terms of storage and query performance as queries would load data from just some partitions and not all.

Snapshot and metadata tables

For data warehousing, we have deployed the star schema which enables modularizing the unit of ETL tasks. We use facts and dimensional tables for data enrichment. We call these snapshot and metadata tables. In this post, we will use these terms interchangeably.

Essentially, in snapshot tables (fact tables), each row represents a transaction. On the other hand, metadata tables (dimension tables) store the less frequently changing static master data. This data can be used to enrich the snapshot table data to derive the business metrics, provided a foreign key relationship exists between the two. This provides us with an option to derive denormalized data when required. In our context, examples of fact tables are trips, shipment bookings etc., whereas examples of metadata table include users, clients, organizations etc.

This can be represented in a diagram for the schema as below.

Article2 Image2

Preparing the data for analytics and audit

In order to capture the business flow intent from transactional changelogs, it would be best if the raw changelog is in the format given below.

Article2 Image3

Such an entity ensures that one knows exactly which fields have changed in a table as part of a transaction and also the modified values for the same. This piece of information is very valuable. Let's consider an example from the logistics domain to understand why. Suppose a client books a shipment from Delhi to Chennai but then calls the support team to change the destination to Bangalore. The support team obliges and updates the destination location of the shipment. In the message envelope above, the data is received as given below.

Article2 Image4

This information can be used to generate an SMS or email alert confirming change in destination and change in charges. For audit purposes, only the "after" part of the message needs to be considered. This is shown below.

Article2 Image5

If the payloads are ordered chronologically, we get the audit trail of the changes that have happened to the event. This is why we create audit tables in Hive. It helps maintain the entire history of data. Furthermore, the before and after parts of messages can be used to validate data consistency. The "before" part of message 2 should match the "after" part of data. This can also help confirm whether the operation was "create", "update" or "delete". For "create", "before" would be null, for "update", both "before" and "after" would be applicable and for "delete", "after" would be null.

Once we persist this data in the audit table, we de-duplicate the data and periodically create point-in-time snapshots from the data depending on the extent of staleness that is tolerable in the snapshots. The point-in-time snapshots are required for enriching certain data with certain other metadata. For example, if a shipment is damaged, we would have captured the changelog stating this fact. Now we can enrich it with other data points like the user who handled this shipment, the location where the damage happened, the time at which the damage alert was raised etc. We can then have fact tables which generally have the identifiers of such events, enriched with point-in-time snapshots of dimensions where columns roll up to provide required aggregates.

Processing the data to generate analytics

As an example of the series of queries powered by this platform, let's consider a case where we want to generate daily metrics for a client summarizing weekly bookings and deliveries. If we had to simply generate the number of shipments booked in each cluster in the past week, a hive query for the same would be as given below.

Article2 Image6

Sample MIS shared with clients

Maintaining dependent ETL jobs' queries graph using Apache Airflow

The analytical dashboards explained above require a pipeline of sequential and parallel jobs. Jobs consist of some Hive queries, python scripts (for mathematical modelling) and spark jobs (ETL jobs). Any ETL job, at its core, is built on top of three building blocks, namely, Extract, Transform and Load. Conceptually, ETL jobs that derive business metrics are complex in nature. They consist of many combinations of E, T, and L tasks, both in sequence and in parallel. As a result, it is very desirable to visualize complex data flows using a graph.

Visually, a node in a graph represents a task while an arrow represents the dependency of one task on another. Given that data only needs to be computed once on a given task and the computation then carries forward, the graph is directed and acyclic. An example of Airflow DAG can be visualized as below.

Article2 Image8

While Airflow DAGs describe how to run a data pipeline, airflow operators describe what to do in a data pipeline. Typically, there are three broad categories of operators:

  • Sensors: They unblock the data flow after a certain time has passed or when data from an upstream data source becomes available
  • Operators: They trigger a certain action in a graph node (for example, run a bash command, execute a Hive query, execute a spark job etc.)
  • Transfers: They move the data from one location to another

You can read more about Apache Airflow here.


Conclusion

In this post we covered the data warehousing techniques that are used to persist valuable data, orchestrate ETL workflows and power some analytical dashboards. This further creates the foundation for predicting events in the life of a consignment. In the next post of the series, we will take a step further and cover real time stream processing for decision making in consignment lifecycle.