12/26/24

Cosmos DB for MongoDB: Tapping into Change Streams for Real-Time Integration

Overview

Azure Functions triggers for Cosmos DB enable developers to write event-driven applications that respond to changes in a collection/container. While this integration works seamlessly with the core SQL API, it doesn't directly support the MongoDB API. To achieve similar functionality with the MongoDB API, you can leverage change streams, a powerful feature that provides real-time monitoring of data modifications. This article will guide you through setting up and utilizing change streams in Cosmos DB's MongoDB API within an Azure Function.

Cosmos DB is a data base service which supports various database systems: NoSQL, MongoDB, PostgreSQL, Apache Cassandra, Apache Gremlin, and Table.

Cosmos DB for MongoDB: Tapping into Change Streams for Real-Time Integration

Understanding Change Streams

Change streams offer a continuous, ordered stream of changes occurring in a MongoDB collection (or a Cosmos DB container using the MongoDB API). They track inserts, updates, replaces, and deletes, providing your applications with real-time visibility into data modifications. This is invaluable for scenarios like:

  • Real-time Analytics and Reporting: Update dashboards and analytics systems as data changes.
  • Data Synchronization: Keep different data stores in sync by reacting to changes in real time.
  • Event-Driven Architectures: Trigger downstream processes and workflows based on data modifications.
  • Auditing and Logging: Capture a detailed history of data changes for audit trails and compliance.

Implementing Change Streams in Azure Functions

Here's how to set up a change stream within an Azure Function:

  • Prerequisites:

    • An active Azure subscription.
    • A Cosmos DB account configured with the MongoDB API.
    • An Azure Function App.
    • Install the MongoDB Driver: Use npm to install the necessary driver:
npm install mongodb

Implement the Azure Function

For this integration, we can use a Timer Trigger function. Since the MongoDB API doesn't offer a direct change feed trigger like the SQL API, the Timer Trigger provides a workaround. The function will execute at specified intervals (e.g., every 5 minutes or less) and establish a connection to MongoDB. Upon connection, it can then retrieve change stream events. This approach maintains the serverless nature of Azure Functions, as the function isn't continuously running but activates periodically to process changes.

An alternative to an Azure Function is to build a Node.js or .NET Core application and run it as a service on a VM. This provides a constantly running process for change stream monitoring, but requires managing the VM and application lifecycle.

  • Configure the Timer Trigger:

In your Azure Function's function.json, configure the timerTrigger to define the execution schedule. The schedule expression follows the NCrontab format. For example, to trigger every 5 minutes, use */5 * * * *.

{
  "bindings": [
    {
      "name": "myTimer",
      "type": "timerTrigger",
      "direction": "in",
      "schedule": "*/5 * * * *"
    }
  ]
}

The name property specifies the name of the timer object that will be passed to your function. The schedule expression determines the frequency of execution. Adjust the schedule value as needed to control the polling interval for change stream events. More frequent polling captures changes more rapidly, but consumes more resources. Less frequent polling conserves resources, but may introduce latency in processing changes.

  • Configure your app settings

    Use the local.settings.json for local development and your function settings on Azure to store the following configurations values:

{
  "IsEncrypted": false,
  "Values": {
    "FUNCTIONS_WORKER_RUNTIME": "node",
    "AzureWebJobsStorage": "<your-storage-connection-string>",
    "CosmosDBConnectionString": "mongodb://<your-cosmosdb-connection-string-from azure>",
    "CosmosDBDatabaseName": "<db-name>",
    "CosmosDBCollectionName": "<collection-name>"
  }
}

Ensure you define the AzureWebJobsStorage setting with a valid Azure Storage connection string. This is essential for the Azure Functions runtime. Furthermore, we'll use this storage account to persist the last processed change stream resume token. Each change stream record includes a token, enabling the resumption of processing from a specific point. By saving the token after each function execution, we can restart the function and continue processing new changes without duplicates. Upon restarting, the function will retrieve the stored token and resume the change stream from that point.

  • Implement the Function Code:

    Use the MongoDB Node.js driver to connect to Cosmos DB and process the change stream:

import { InvocationContext, CosmosDBTrigger } from "@azure/functions";
import { MongoClient, ChangeStreamDocument, Document } from "mongodb";


const getResumeToken = async function() : Promise<Binary | null> {

    // document = {token, updatedDt, _id}
    const lastProcessedDoc = await getLastProcessedToken() || null; // retrieve the last processed token
    let lastProcessedToken = lastProcessedDoc?.token ? Binary.createFromBase64(lastProcessedDoc.token) : null;        

    return lastProcessedToken;
}

const factoryTrigger = async function (context: Context, documents: ChangeStreamDocument<Document>[]): Promise<void> {

    // read the database env settings from local.settings.json or function configuration (Azure)
    const connectionString = process.env["CosmosDBConnection"]; 
    const databaseName = process.env["CosmosDBDatabaseName"]; 
    const collectionName = process.env["CosmosDBCollectionName"]; 
    const client = new MongoClient(connectionString);

    let currentToken = null;    

    try {
        await client.connect();

        const database = client.db(databaseName);
        const collection = database.collection(collectionName);                          
        const lastProcessedToken = await getResumeToken();

        // define the pipeline with the events and properties on the document
        const pipeline = [ 
        { $match: { "operationType": { $in: ["insert", "update", "replace"] } } }, 
        { $project: { "_id": 1, "fullDocument": 1, "ns": 1, "documentKey": 1} } ];

        // use resumeAfter when the token is found
        const changeStream = lastProcessedToken ? 
            collection.watch(pipeline, { resumeAfter: { _data: lastProcessedToken}, fullDocument: 'updateLookup'}) : collection.watch(pipeline,  { fullDocument: 'updateLookup' });                             

        // Set up event handlers for the change stream the doc is the full document with the current changes
        changeStream.on('change', async (doc: ChangeStreamDocWithToken<Document>) => {
            console.log('Data change detected', doc);

            // get the resume token from the document
            const binToken = doc._id._data;
            const token = binToken.toString('base64');

            // Save the last processed token the next run
            currentToken = { token, updated: doc.fullDocument.updatedDt, id: doc.fullDocument._id };            

            // Add your auditing logic here
            console.log(`Send to storage id: ${doc.fullDocument._id}`);

            await sendToLog(logCollection, doc.fullDocument);

            const dateString = new Date().toISOString().slice(0, 10); // "YYYY-MM-DD"
            const blobName = `log-${dateString}.json`;  // Example filename: log-2024-07-25.json
            const changeData = JSON.stringify(doc.fullDocument) + '\n'; // Newline-delimited JSON
            console.log(changeData);

            // Append the data to blob or another mongodb collection depending on your requirements            

        });

        changeStream.on('error', async (error: any) => { 
          // or more specific error type if known
            context.log("Change stream error:", error);                          
        });

        changeStream.on('close', () => {  
            // Handle the 'close' event, usually optional
            context.log('Change Stream Closed');           
        });

        changeStream.on('connect', () => {  
            console.log('Change Stream Connected');           
        });

        // max timer to allow function to stop
        const timeout = setTimeout(async () => { 
                console.log(`Closing the stream after timeout ${maxDuration}`);
                clearTimeout(timeout);            
                await changeStream.close();   
                await client.close();  
                if (currentToken)       
                    updateLastProcessedToken(currentToken);
            }, maxDuration);

        context.log('Watching for changes...');
    } catch (err) {
        context.log('Error setting up change stream:', err);

        if (client)
            await client.close();
    } 
};

export default cosmosDBTrigger;

The factoryTrigger function uses a timer to periodically poll a Cosmos DB change stream (using the MongoDB API) for data modifications. It retrieves the last processed change stream token from blob storage to resume processing from where it left off. The function then watches the specified Cosmos DB collection for inserts, updates, and replaces, processing each change by sending the full document to a log collection and appending it to a newline-delimited JSON file in blob storage.

A timeout is implemented to limit the function's execution time and maintain its serverless nature. The timer ensures the function doesn't run continuously, conserving resources, while still periodically checking for and processing changes. The timeout further enforces this resource constraint by closing the change stream and exiting the function after a specified duration. This prevents runaway execution and associated costs while allowing the function to pick up where it left off in the next timer-triggered execution.

  • Implement the Blob Storage API

import { fileRead, fileWrite } from "./blobStorageUtils";

export interface DocumentToken {
    token: string;
    updated: string;
    id: string;
}

const tokenKey = 'resume-token';

const MISSING_CONFIG = 'Missing configuration partition/row key for last processed token.'
const NOT_FOUND = 'No existing entity found'
const FAILED_UPDATE = 'Failed to update record';

/**
 * Read the last process token from a blob storage
 * @returns the value 
 */
export async function getLastProcessedToken(): Promise<DocumentToken | null> {

    if (!tokenKey) {
        throw new Error(MISSING_CONFIG);
        return;
    }

    try {
        const blobName = `${tokenKey}.json`;        
        const value = await fileRead(blobName);
        return JSON.parse(value) as DocumentToken;
    } catch (error) {
        console.log(NOT_FOUND);
        return null;
    }
}

export async function updateLastProcessedToken(value: DocumentToken): Promise<void> {

    if (!tokenKey || !rowKey) {
        throw new Error(MISSING_CONFIG);
        return;
    }

    try {        
        const blobName = `${tokenKey}.json`;        
        await fileWrite(blobName, JSON.stringify(value));

    } catch (error) {
        console.log(FAILED_UPDATE, error);        
    }
}

This is a simple implementation wrapper which simplifies interaction with Azure Blob Storage, abstracting away the complexities of the @azure/storage-blob package (which requires import { BlobServiceClient, ContainerClient } from "@azure/storage-blob";). It offers convenient functions for reading and writing JSON data to blobs, streamlining the process of managing the change stream resume token. Consult the Azure documentation on the @azure/storage-blob package for more detailed information and advanced usage scenarios.

The getLastProcessedToken function reads the resume token from a file named resume-token.json. The updateLastProcessedToken function then overwrites this file with the latest resume token. This mechanism allows the change stream to be restarted from a specific point, ensuring that changes are processed sequentially without gaps or duplicates.

Conclusion:

Change streams provide a powerful mechanism for reacting to data modifications in Cosmos DB's MongoDB API. While the MongoDB API doesn't directly support a change feed trigger within Azure Functions, the timer-based approach outlined here offers a near real-time solution. By periodically polling the change stream, applications can effectively capture and process data changes with minimal latency. This approach balances the need for real-time responsiveness with the efficiency and cost-effectiveness of serverless functions. Leveraging change streams in this way opens up opportunities to build dynamic, data-driven applications that react swiftly to evolving information, combining the scalability and flexibility of Cosmos DB with the familiar MongoDB development experience.

Thanks for reading.

Send question or comment at Twitter @ozkary 👍 Originally published by ozkary.com

11/26/24

Introduction to Data Lakes and Data Warehouses - Data Engineering Process Fundamentals -

Overview

In this technical presentation, we will delve into the fundamental concepts of Data Engineering, focusing on two pivotal components of modern data architecture - Data Lakes and Data Warehouses. We will explore their roles, differences, and how they collectively empower organizations to harness the true potential of their data.

Introduction to Data Lake and Data Warehouse - Data Engineering Process Fundamentals

  • Follow this GitHub repo during the presentation: (Star the project to follow and get updates)

👉 GitHub Repo

  • Data engineering Series:

👉 Blog Series

YouTube Video

Video Agenda

Agenda:

  1. Introduction to Data Engineering:

  2. Brief overview of the data engineering landscape and its critical role in modern data-driven organizations.

  3. Operational Data

  4. Understanding Data Lakes:

  5. Explanation of what a data lake is and its purpose in storing vast amounts of raw and unstructured data.

  6. Exploring Data Warehouses:

  7. Definition of data warehouses and their role in storing structured, processed, and business-ready data.

  8. Comparing Data Lakes and Data Warehouses:

  9. Comparative analysis of data lakes and data warehouses, highlighting their strengths and weaknesses.

  10. Discussing when to use each based on specific use cases and business needs.

  11. Integration and Data Pipelines:

  12. Insight into the seamless integration of data lakes and data warehouses within a data engineering pipeline.

  13. Code walkthrough showcasing data movement and transformation between these two crucial components.

  14. Real-world Use Cases:

  15. Presentation of real-world use cases where effective use of data lakes and data warehouses led to actionable insights and business success.

  16. Hands-on demonstration using Python, Jupyter Notebook and SQL to solidify the concepts discussed, providing attendees with practical insights and skills.

  17. Q&A and Hands-on Session:

  18. An interactive Q&A session to address any queries.

Conclusion:

This session aims to equip attendees with a strong foundation in data engineering, focusing on the pivotal role of data lakes and data warehouses. By the end of this presentation, participants will grasp how to effectively utilize these tools, enabling them to design efficient data solutions and drive informed business decisions.

Presentation

Data Engineering Overview

A Data Engineering Process involves executing steps to understand the problem, scope, design, and architecture for creating a solution. This enables ongoing big data analysis using analytical and visualization tools.

Topics

  • Data Lake and Data Warehouse
  • Discovery and Data Analysis
  • Design and Infrastructure Planning
  • Data Lake - Pipeline and Orchestration
  • Data Warehouse - Design and Implementation
  • Analysis and Visualization

Follow this project: Give a star

👉 Data Engineering Process Fundamentals

Operational Data

Operational data is often generated by applications, and it is stored in transactional relational databases like SQL Server, Oracle and NoSQL (JSON) databases like MongoDB, Firebase. This is the data that is created after an application saves a user transaction like contact information, a purchase or other activities that are available from the application.

Features:

  • Application support and transactions
  • Relational data structure and SQL or document structure NoSQL
  • Small queries for case analysis

Not Best For:

  • Reporting system
  • Large queries
  • Centralized Big Data system

Data Engineering Process Fundamentals - Operational Data

Data Lake - Analytical Data Staging

A Data Lake is an optimized storage system for Big Data scenarios. The primary function is to store the data in its raw format without any transformation. Analytical data is the transaction data that has been extracted from a source system via a data pipeline as part of the staging data process.

Features:

  • Store the data in its raw format without any transformation
  • This can include structure data like CSV files, unstructured data like JSON and XML documents, or column-base data like parquet files
  • Low Cost for massive storage power
  • Not Designed for querying or data analysis
  • It is used as external tables by most systems

Data Engineering Process Fundamentals - Analytical Data staging

Data Warehouse - Analytical Data

A Data Warehouse is a centralized storage system that stores integrated data from multiple sources. The system is designed to host and serve Big Data scenarios with lower operational cost than transaction databases, but higher costs than a Data Lake. This system host the Analytical Data that has been processed and is ready for analytical purposes.

Data Warehouse Features:

  • Stores historical data in relational tables with an optimized schema, which enables the data analysis process
  • Provides SQL support to query the data
  • It can integrate external resources like CSV and parquet files that are stored on Data Lakes as external tables
  • The system is designed to host and serve Big Data scenarios. It is not meant to be used as a transactional system
  • Storage is more expensive
  • Offloads archived data to Data Lakes

Data Engineering Process Fundamentals - Analytical Data Store

Discovery - Data Analysis

During the discovery phase of a Data Engineering Process, we look to identify and clearly document a problem statement, which helps us have an understanding of what we are trying to solve. We also look at our analytical approach to make observations about at the data, its structure and source. This leads us into defining the requirements for the project, so we can define the scope, design and architecture of the solution.

  • Download sample data files
  • Run experiments to make observations
  • Write Python scripts using VS Code or Jupyter Notebooks
  • Transform the data with Pandas
  • Make charts with Plotly
  • Document the requirements

Data Engineering Process Fundamentals - Data Analysis and discovery

Design and Planning

The design and planning phase of a data engineering project is crucial for laying out the foundation of a successful system. It involves defining the system architecture, designing data pipelines, implementing source control practices, ensuring continuous integration and deployment (CI/CD), and leveraging tools like Docker and Terraform for infrastructure automation.

  • Use GitHub for code repo and for CI/CD actions
  • Use Terraform is an Infrastructure as Code (IaC) tool that enables us to manage cloud resources across multiple cloud providers
  • Use Docker containers to run the code and manage its dependencies

Data Engineering Process Fundamentals - Design and Planning

Data Lake - Pipeline and Orchestration

A data pipeline is basically a workflow of tasks that can be executed in Docker containers. The execution, scheduling, managing and monitoring of the pipeline is referred to as orchestration. In order to support the operations of the pipeline and its orchestration, we need to provision a VM and data lake, and monitor cloud resources.

  • This can be code-centric, leveraging languages like Python
  • Or a low-code approach, utilizing tools such as Azure Data Factory, which provides a turn-key solution
  • Monitor services enable us to track telemetry data
  • Docker Hub, GitHub can be used for the CI/CD process

Data Engineering Process Fundamentals - Data Lake - Data Pipeline and Orchestration

Data Warehouse - Design and Implementation

In the design phase, we lay the groundwork by defining the database system, schema model, and technology stack required to support the data warehouse’s implementation and operations. In the implementation phase, we focus on converting conceptual data models into a functional system. By creating concrete structures like dimension and fact tables and performing data transformation tasks, including data cleansing, integration, and scheduled batch loading, we ensure that raw data is processed and unified for analysis. Create a repeatable and extendable process.

Data Engineering Process Fundamentals - Data Warehouse Design and Implementation

Data Warehouse - Data Analysis

Data analysis is the practice of exploring data and understanding its meaning. It involves activities that can help us achieve a specific goal, such as identifying data dimensions and measures, as well as data analysis to identify outliers, trends, and distributions.

  • We can accomplish these activities by writing code using Python and Pandas, SQL, Visual Studio Code or Jupyter Notebooks.
  • What's more, we can use libraries, such as Plotly, to generate some visuals to further analyze data and create prototypes.

Data Engineering Process Fundamentals - Data Analysis

Data Analysis and Visualization

Data visualization is a powerful tool that takes the insights derived from data analysis and presents them in a visual format. While tables with numbers on a report provide raw information, visualizations allow us to grasp complex relationships and trends at a glance.

  • Dashboards, in particular, bring together various visual components like charts, graphs, and scorecards into a unified interface that can help us tell a story
  • Use tools like PowerBI, Looker, Tableau to model the data and create enterprise level visualizations

Data Engineering Process Fundamentals - Data Visualization

Conclusion

Both data lakes and data warehouses are essential components of a data engineering project. The primary function of a data lake is to store large amounts of operational data in its raw format, serving as a staging area for analytical processes. In contrast, a data warehouse acts as a centralized repository for information, enabling engineers to transform, process, and store extensive data. This allows the analytical team to utilize coding languages like Python and tools such as Jupyter Notebooks, as well as low-code platforms like Looker Studio and Power BI, to create enterprise-quality dashboards for the organization.

We've covered a lot today, but this is just the beginning!

If you're interested in learning more about building cloud data pipelines, I encourage you to check out my book, 'Data Engineering Process Fundamentals,' part of the Data Engineering Process Fundamentals series. It provides in-depth explanations, code samples, and practical exercises to help in your learning.

Data Engineering Process Fundamentals Book Amazon

Thanks for reading.

Send question or comment at Twitter @ozkary 👍 Originally published by ozkary.com

10/31/24

A Hands-On Exploration into the discovery phase - Data Engineering Process Fundamentals

Overview

The discovery process involves identifying the problem, analyzing data sources, defining project requirements, establishing the project scope, and designing an effective architecture to address the identified challenges.

In this session, we will delve into the essential building blocks of data engineering, placing a spotlight on the discovery process. From framing the problem statement to navigating the intricacies of exploratory data analysis (EDA) using Python, VSCode, Jupyter Notebooks, and GitHub, you'll gain a solid understanding of the fundamental aspects that drive effective data engineering projects.

A Hands-On Exploration into the discovery phase - Data Engineering Process Fundamentals

  • Follow this GitHub repo during the presentation: (Give it a star)

👉 https://github.com/ozkary/data-engineering-mta-turnstile

Jupyter Notebook

👉 https://github.com/ozkary/data-engineering-mta-turnstile/blob/main/Step1-Discovery/mta_discovery.ipynb

  • Data engineering Series:

👉 https://www.ozkary.com/2023/03/data-engineering-process-fundamentals.html

Jupyter Notebook Preview

# Standard library imports
from time import time
from pathlib import Path
import requests
from io import StringIO
# Load pandas support for data analysis tasks, dataframe (two-dimensional data structure with rows and columns) management
import pandas as pd    
import numpy as np 

# URL of the file you want to download. Note: It should be a Saturday date
url = 'http://web.mta.info/developers/data/nyct/turnstile/turnstile_241026.txt'

# Download the file in memory
response = requests.get(url)
response.raise_for_status()  # Check if the request was successful

# Create a DataFrame from the downloaded content
data = StringIO(response.text)
df = pd.read_csv(data)

# Display the DataFrame first 10 rows
df.head(10)

# use info to get the column names, data type and null values
df.info()

# remove spaces and type case the columns
df.columns = [column.strip() for column in df.columns]
print(df.columns)
df["ENTRIES"] = df["ENTRIES"].astype(int)
df["EXITS"] = df["EXITS"].astype(int)

# Define the set of special characters you want to check for
special_characters_set = set('@#$%/')


def has_special_characters(col, special_characters):
    # Check if any character in the column name is not alphanumeric or in the specified set
    return any(char in special_characters for char in col)

def rename_columns(df, special_characters_set):
    # Create a mapping of old column names to new column names
    mapping = {col: ''.join(char for char in col if char.isalnum() or char not in special_characters_set) for col in df.columns}

    print(mapping)
    # Rename columns using the mapping
    df_renamed = df.rename(columns=mapping)

    return df_renamed


# Identify columns with special characters using list comprehension syntax
columns_with_special_characters = [col for col in df.columns if has_special_characters(col, special_characters_set)]

# Print the result
print("Columns with special characters:", columns_with_special_characters)

# Identify columns with special characters and rename them
df = rename_columns(df, special_characters_set)

# Display the data frame again. there should be no column name with special characters
print(df.info())

YouTube Video

Video Agenda

  1. Introduction:

    • Unveiling the importance of the discovery process in data engineering.

    • Setting the stage with a real-world problem statement that will guide our exploration.

  2. Setting the Stage:

    • Downloading and comprehending sample data to kickstart our discovery journey.

    • Configuring the development environment with VSCode and Jupyter Notebooks.

  3. Exploratory Data Analysis (EDA):

    • Delving deep into EDA techniques with a focus on the discovery phase.

    • Demonstrating practical approaches using Python to uncover insights within the data.

  4. Code-Centric Approach:

    • Advocating the significance of a code-centric approach during the discovery process.

    • Showcasing how a code-centric mindset enhances collaboration, repeatability, and efficiency.

  5. Version Control with GitHub:

    • Integrating GitHub seamlessly into our workflow for version control and collaboration.

    • Managing changes effectively to ensure a streamlined data engineering discovery process.

  6. Real-World Application:

    • Applying insights gained from EDA to address the initial problem statement.

    • Discussing practical solutions and strategies derived from the discovery process.

Key Takeaways:

  • Mastery of the foundational aspects of data engineering.

  • Hands-on experience with EDA techniques, emphasizing the discovery phase.

  • Appreciation for the value of a code-centric approach in the data engineering discovery process.

Some of the technologies that we will be covering:

  • Python
  • Data Analysis and Visualization
  • Jupyter Notebook
  • Visual Studio Code

Presentation

Data Engineering Overview

A Data Engineering Process involves executing steps to understand the problem, scope, design, and architecture for creating a solution. This enables ongoing big data analysis using analytical and visualization tools.

Topics

  • Importance of the Discovery Process
  • Setting the Stage - Technologies
  • Exploratory Data Analysis (EDA)
  • Code-Centric Approach
  • Version Control
  • Real-World Use Case

Follow this project: Give a star

👉 Data Engineering Process Fundamentals

Importance of the Discovery Process

The discovery process involves identifying the problem, analyzing data sources, defining project requirements, establishing the project scope, and designing an effective architecture to address the identified challenges.

  • Clearly document the problem statement to understand the challenges the project aims to address.
  • Make observations about the data, its structure, and sources during the discovery process.
  • Define project requirements based on the observations, enabling the team to understand the scope and goals.
  • Clearly outline the scope of the project, ensuring a focused and well-defined set of objectives.
  • Use insights from the discovery phase to inform the design of the solution, including data architecture.
  • Develop a robust project architecture that aligns with the defined requirements and scope.

Data Engineering Process Fundamentals - Discovery Process

Setting the Stage - Technologies

To set the stage, we need to identify and select the tools that can facilitate the analysis and documentation of the data. Here are key technologies that play a crucial role in this stage:

  • Python: A versatile programming language with rich libraries for data manipulation, analysis, and scripting.

Use Cases: Data download, cleaning, exploration, and scripting for automation.

  • Jupyter Notebooks: An interactive tool for creating and sharing documents containing live code, visualizations, and narrative text.

Use Cases: Exploratory data analysis, documentation, and code collaboration.

  • Visual Studio Code: A lightweight, extensible code editor with powerful features for source code editing and debugging.

Use Cases: Writing and debugging code, integrating with version control systems like GitHub.

  • SQL (Structured Query Language): A domain-specific language for managing and manipulating relational databases.

Use Cases: Querying databases, data extraction, and transformation.

Data Engineering Process Fundamentals - Discovery Tools

Exploratory Data Analysis (EDA)

EDA is our go-to method for downloading, analyzing, understanding and documenting the intricacies of the datasets. It's like peeling back the layers of information to reveal the stories hidden within the data. Here's what EDA is all about:

  • EDA is the process of analyzing data to identify patterns, relationships, and anomalies, guiding the project's direction.

  • Python and Jupyter Notebook collaboratively empower us to download, describe, and transform data through live queries.

  • Insights gained from EDA set the foundation for informed decision-making in subsequent data engineering steps.

  • Code written on Jupyter Notebook can be exported and used as the starting point for components for the data pipeline and transformation services.

Data Engineering Process Fundamentals - Discovery Pie Chart

Code-Centric Approach

A code-centric approach, using programming languages and tools in EDA, helps us understand the coding methodology for building data structures, defining schemas, and establishing relationships. This robust understanding seamlessly guides project implementation.

  • Code delves deep into data intricacies, revealing integration and transformation challenges often unclear with visual tools.

  • Using code taps into Pandas and Numpy libraries, empowering robust manipulation of data frames, establishment of loading schemas, and addressing transformation needs.

  • Code-centricity enables sophisticated analyses, covering aggregation, distribution, and in-depth examinations of the data.

  • While visual tools have their merits, a code-centric approach excels in hands-on, detailed data exploration, uncovering subtle nuances and potential challenges.

Data Engineering Process Fundamentals - Discovery Pie Chart

Version Control

Using a tool like GitHub is essential for effective version control and collaboration in our discovery process. GitHub enables us to track our exploratory code and Jupyter Notebooks, fostering collaboration, documentation, and comprehensive project management. Here's how GitHub enhances our process:

  • Centralized Tracking: GitHub centralizes tracking and managing our exploratory code and Jupyter Notebooks, ensuring a transparent and organized record of our data exploration.

  • Sharing: Easily share code and Notebooks with team members on GitHub, fostering seamless collaboration and knowledge sharing.

  • Documentation: GitHub supports Markdown, enabling comprehensive documentation of processes, findings, and insights within the same repository.

  • Project Management: GitHub acts as a project management hub, facilitating CI/CD pipeline integration for smooth and automated delivery of data engineering projects.

Data Engineering Process Fundamentals - Discovery Problem Statement

Summary: The Power of Discovery

By mastering the discovery phase, you lay a strong foundation for successful data engineering projects. A thorough understanding of your data is essential for extracting meaningful insights.

  • Understanding Your Data: The discovery phase is crucial for understanding your data's characteristics, quality, and potential.
  • Exploratory Data Analysis (EDA): Use techniques to uncover patterns, trends, and anomalies.
  • Data Profiling: Assess data quality, identify missing values, and understand data distributions.
  • Data Cleaning: Address data inconsistencies and errors to ensure data accuracy.
  • Domain Knowledge: Leverage domain expertise to guide data exploration and interpretation.
  • Setting the Stage: Choose the right language and tools for efficient data exploration and analysis.

The data engineering discovery process involves defining the problem statement, gathering requirements, and determining the scope of work. It also includes a data analysis exercise utilizing Python and Jupyter Notebooks or other tools to extract valuable insights from the data. These steps collectively lay the foundation for successful data engineering endeavors.

Thanks for reading.

Send question or comment at Twitter @ozkary 👍 Originally published by ozkary.com

9/25/24

Live Dashboards: Boosting App Performance with Real-Time Integration

Overview

Dive into the future of web applications. We're moving beyond traditional API polling and embracing real-time integration. Imagine your client app maintaining a persistent connection with the server, enabling bidirectional communication and live data streaming. We'll also tackle scalability challenges and integrate Redis as our in-memory data solution.

Live Dashboards: Boosting App Performance with Real-Time Integration

  • Follow this GitHub repo during the presentation: (Give it a star)

👉 https://github.com/ozkary/Realtime-Apps-with-Nodejs-Angular-Socketio-Redis

YouTube Video

Video Agenda

This presentation explores strategies for building highly responsive and interactive live dashboards. We'll delve into the challenges of traditional API polling and demonstrate how to leverage Node.js, Angular, Socket.IO, and Redis to achieve real-time updates and a seamless user experience.

  • Introduction:

    • Understanding telemetry data and the importance to monitor the data
    • Challenges of traditional API polling for real-time data.
    • Design patterns to enhance an app with minimum changes
  • Traditional Solution Architecture

    • SQL Database Integration.
    • Restful API
    • Angular and Node.js Integration
  • Real-Time Integration with Web Sockets

    • Database Optimization Challenges
    • Introduction to Web Sockets for bidirectional communication.
    • Implementing Web Sockets in a Web application.
    • Handling data synchronization and consistency.
  • Distributed Caching with Redis:

    • Benefits of in-memory caching for improving performance and scalability.
    • Integrating Redis into your Node.js application.
    • Caching strategies for distributed systems.
  • Case Study: Building a Live Telemetry Dashboard

    • Step-by-step demonstration of the implementation.
    • Performance comparison with and without optimization techniques.
    • User experience benefits of real-time updates.
  • Benefits and Considerations

    • Improved dashboard performance and responsiveness.
    • Reduced server load and costs.
    • Scalability and scalability considerations.
    • Best practices for implementing real-time updates.

Why Attend:

Gain a deep understanding of real-time data integration for your Web application.

Presentation

Telemetry Data Story

Devices send telemetry data via API integration with SQL Server. There are inherit performance problems with a disk-based database. We progressively enhance the system with minimum changes by adding real-time integration and an in-memory cache system.

Live Dashboards: Real-time dashboard

Database Integration

Solution Architecture

  • Disk-based Storage
  • Web apps and APIs query database to get the data
  • Applications can do both high reads and writes
  • Web components, charts polling back-end database for reads

Let’s Start our Journey

  • Review our API integration and talk about concerns
  • Do not refactor everything
  • Enhance to real-time integration with sockets
  • Add Redis as the distributed cache
  • Add the service broker strategy to sync the data sources
  • Centralized the real-time integration with Redis

Live Dashboards: Direct API Integration

RESTful API Integration

Applied Technologies

  • REST API Written with Node.js
  • TypeORM Library Repository
  • Angular Client Application with Plotly.js Charts
  • Disk-based storage – SQL Server
  • API Telemetry (GET, POST) route

Use Case

  • IoT devices report telemetry information via API
  • Dashboard reads that most recent data only via API calls which queries the storage service
  • Polling the database to get new records

Project Repo (Star the project and follow) https://github.com/ozkary/Realtime-Apps-with-Nodejs-Angular-Socketio-Redis

Live Dashboards: Repository Integration

Database Optimization and Challenges

Slow Queries on disk-based storage

  • Effort on index optimization
  • Database Partition strategies
  • Double-digit millisecond average speed (physics on data disks)

Simplify data access strategies

  • Relational data is not optimal for high data read systems (joins?)
  • Structure needs to be de-normalized
  • Often views are created to shape the data, date range limit

Database Contention

  • Read isolation levels (nolock)
  • Reads competing with inserts

Cost to Scale

  • Vertical and horizontal scaling up on resources
  • Database read-replicas to separate reads and writes
  • Replication workloads/tasks
  • Data lakes and data warehouse

Live Dashboards: SQL Query

Real-Time Integration

What is Socket.io, Web Sockets?

  • Enables real-time bidirectional communication.
  • Push data to clients as events take place on the server
  • Data streaming
  • Connection starts as HTTP is them promoted to Web Sockets

Additional Technologies -Socket.io (Signalr for .Net) for both client and server components

Use Case

  • IoT devices report telemetry information via sockets. All subscribed clients get the information as an event which updates the dashboard

Demo

  • Update both server and client to support Web sockets
  • Use device demo tool to connect and automate the telemetry data to the server

Live Dashboards: Web Socket Integration

Distributed Cache Strategy

Why Use a Cache?

  • Data is stored in-memory
  • Sub-millisecond average speed
  • Cache-Aside Pattern
    • Read from cache first (cache-hit) fail over to database (cache miss)
    • Update cache on cache miss
  • Write-Through
    • Write to cache and database
    • Maintain both systems updated
  • Improves app performance
  • Reduces load on Database

Application Changes

  • Changes are only done on the server
  • No changes on client-side

Live Dashboards: Cache Architecture

Redis and Socket.io Integration

What is Redis?

  • Key-value store, keys can contain strings (JSON), hashes, lists, sets, & sorted sets
  • Redis supports a set of atomic operations on these data types (available until committed)
  • Other features include transactions, publish/subscribe, limited time to live -TTL
  • You can use Redis from most of today's programming languages using libraries

Use Case

  • As application load and data frequency increases, we need to use a cache for performance. We also need to centralize the events, so all the socket servers behind a load balancer can notify the clients. Update both storage and cache

Demo

  • Start Redis-cli on Ubuntu and show some inserts, reads and sync events.
    • sudo service redis-server restart
    • redis-cli -c -p 6379 -h localhost
    • zadd table:data 100 "{data:'100'}“
    • zrangebycore table:data 100 200
    • subscribe telemetry:data

Live Dashboards: Load Balanced Architecture

Summary: Boosting Your App Performance

When your application starts to slow down due to heavy read and writes on your database, consider moving the read operations to a cache solution and broadcasting the data to your application via a real-time integration using Web Sockets. This approach can significantly enhance performance and user experience.

Key Benefits

  • Improved Performance: Offloading reads to a cache system like Redis reduces load on the database.
  • Real-Time Updates: Using Web Sockets ensures that your application receives updates in real-time, with no need for manual refreshes.
  • Scalability: By reducing the database load, your application can handle more concurrent users.
  • Efficient Resource Utilization: Leveraging caching and real-time technologies optimizes the user of server resources, leading to savings and better performance.

Live Dashboards: Load Balanced Architecture

We've covered a lot today, but this is just the beginning!

If you're interested in learning more about building cloud data pipelines, I encourage you to check out my book, 'Data Engineering Process Fundamentals,' part of the Data Engineering Process Fundamentals series. It provides in-depth explanations, code samples, and practical exercises to help in your learning.

Data Engineering Process Fundamentals - Book by Oscar Garcia Data Engineering Process Fundamentals - Book by Oscar Garcia

Thanks for reading.

Send question or comment at Twitter @ozkary 👍 Originally published by ozkary.com

8/21/24

Medallion Architecture: A Blueprint for Data Insights and Governance - Data Engineering Process Fundamentals

Overview

Gain understanding of Medallion Architecture and its application in modern data engineering. Learn how to optimize data pipelines, improve data quality, and unlock valuable insights. Discover practical steps to implement Medallion principles in your organization and drive data-driven decision-making.

Data Engineering Process Fundamentals - Medallion Architecture

  • Follow this GitHub repo during the presentation: (Give it a star)

👉 https://github.com/ozkary/data-engineering-mta-turnstile

  • Read more information on my blog at:

👉 https://www.ozkary.com/2023/03/data-engineering-process-fundamentals.html

YouTube Video

Video Agenda

  • Introduction to Medallion Architecture

    • Defining Medallion Architecture
    • Core Principles
    • Benefits of Medallion Architecture
  • The Raw Zone

    • Understanding the purpose of the Raw Zone
    • Best practices for data ingestion and storage
  • The Bronze Zone

    • Data transformation and cleansing
    • Creating a foundation for analysis
  • The Silver Zone

    • Data optimization and summarization
    • Preparing data for consumption
  • The Gold Zone

    • Curated data for insights and action
    • Enabling self-service analytics
  • Empowering Insights

    • Data-driven decision-making
    • Accelerated Insights
  • Data Governance

    • Importance of data governance in Medallion Architecture
    • Implementing data ownership and stewardship
    • Ensuring data quality and security

Why Attend:

Gain a deep understanding of Medallion Architecture and its application in modern data engineering. Learn how to optimize data pipelines, improve data quality, and unlock valuable insights. Discover practical steps to implement Medallion principles in your organization and drive data-driven decision-making.

Presentation

Introducing Medallion Architecture

Medallion architecture is a data management approach that organizes data into distinct layers based on its quality and processing level.

  • Improved Data Quality: By separating data into different zones, you can focus on data quality at each stage.
  • Enhanced Data Governance: Clear data ownership and lineage improve data trustworthiness.
  • Accelerated Insights: Optimized data in the Silver and Gold zones enables faster query performance.
  • Scalability: The layered approach can accommodate growing data volumes and complexity.
  • Cost Efficiency: Optimized data storage and processing can reduce costs.

Data Engineering Process Fundamentals - Medallion Architecture Design Diagram

The Raw Zone: Foundation of Your Data Lake

The Raw Zone is the initial landing place for raw, unprocessed data. It serves as a historical archive of your data sources.

  • Key Characteristics:
    • Unstructured or semi-structured format (e.g., CSV, JSON, Parquet)
    • Data is ingested as-is, without any cleaning or transformation
    • High volume and velocity
    • Data retention policies are crucial
  • Benefits:
    • Preserves original data for potential future analysis
    • Enables data reprocessing
    • Supports data lineage and auditability

Data Engineering Process Fundamentals - Medallion Architecture Raw Zone Diagram

Use case Background

The Metropolitan Transportation Authority (MTA) subway system in New York has stations around the city. All the stations are equipped with turnstiles or gates which tracks as each person enters (departure) or exits (arrival) the station.

  • The MTA subway system has stations around the city.
  • All the stations are equipped with turnstiles or gates which tracks as each person enters or leaves the station.
  • CSV files provide information about the amount of commuters per stations at different time slots.

Data Engineering Process Fundamentals - Data streaming MTA Gates

Problem Statement

In the city of New York, commuters use the Metropolitan Transportation Authority (MTA) subway system for transportation. There are millions of people that use this system every day; therefore, businesses around the subway stations would like to be able to use Geofencing advertisement to target those commuters or possible consumers and attract them to their business locations at peak hours of the day.

  • Geofencing is a location based technology service in which mobile devices’ electronic signal is tracked as it enters or leaves a virtual boundary (geo-fence) on a geographical location. Businesses around those locations would like to use this technology to increase their sales.
  • Businesses around those locations would like to use this technology to increase their sales by pushing ads to potential customers at specific times.

ozkary-data-engineering-mta-geo-fence

The Bronze Zone: Transforming Raw Data

The Bronze Zone is where raw data undergoes initial cleaning, structuring, and transformation. It serves as a staging area for data before moving to the Silver Zone.

  • Key Characteristics:
    • Data is cleansed and standardized
    • Basic transformations are applied (e.g., data type conversions, null handling)
    • Data is structured into tables or views
    • Data quality checks are implemented
    • Data retention policies may be shorter than the Raw Zone
  • Benefits:
    • Improves data quality and consistency
    • Provides a foundation for further analysis
    • Enables data exploration and discovery

Data Engineering Process Fundamentals - Medallion Architecture Bronze Zone Diagram

The Silver Zone: A Foundation for Insights

The Silver Zone houses data that has been further refined, aggregated, and optimized for specific use cases. It serves as a bridge between the raw data and the final curated datasets.

  • Key Characteristics:
    • Data is cleansed, standardized, and enriched
    • Data is structured for analytical purposes (e.g., normalized, de-normalized)
    • Data is optimized for query performance (e.g., partitioning, indexing)
    • Data is aggregated and summarized for specific use cases
  • Benefits:
    • Improved query performance
    • Supports self-service analytics
    • Enables advanced analytics and machine learning
    • Reduces query costs

Data Engineering Process Fundamentals - Medallion Architecture Silver Zone Diagram

The Gold Zone: Your Data's Final Destination

  • Definition: The Gold Zone contains the final, curated datasets ready for consumption by business users and applications. It is the pinnacle of data transformation and optimization.
  • Key Characteristics:
    • Data is highly refined, aggregated, and optimized for specific use cases
    • Data is often materialized for performance
    • Data is subject to rigorous quality checks and validation
    • Data is secured and governed
  • Benefits:
    • Enables rapid insights and decision-making
    • Supports self-service analytics and reporting
    • Provides a foundation for advanced analytics and machine learning
    • Reduces query latency

Data Engineering Process Fundamentals - Medallion Architecture Gold Zone Diagram

The Gold Zone: Empowering Insights and Actions

The Gold Zone is the final destination for data, providing a foundation for insights, analysis, and action. It houses curated, optimized datasets ready for consumption.

  • Key Characteristics:
    • Data is accessible and easily consumable
    • Supports various analytical tools and platforms (BI, ML, data science)
    • Enables self-service analytics
    • Drives business decisions and actions
  • Examples of Consumption Tools:
    • Business Intelligence (BI) tools (Looker, Tableau, Power BI)
    • Data science platforms (Python, R, SQL)
    • Machine learning platforms (TensorFlow, PyTorch)
    • Advanced analytics tools

Data Engineering Process Fundamentals - Medallion Architecture Analysis Diagram

Data Governance: The Cornerstone of Data Management

Data governance is the framework that defines how data is managed within an organization, while data management is the operational execution of those policies. Data Governance is essential for ensuring data quality, consistency, and security.

Key components of data governance include:

  • Data Lineage: Tracking data's journey from source to consumption.
  • Data Ownership: Defining who is responsible for data accuracy and usage.
  • Data Stewardship: Managing data on a day-to-day basis, ensuring quality and compliance.
  • Data Security: Protecting data from unauthorized access, use, disclosure, disruption, modification, or destruction.
  • Compliance: Adhering to industry regulations (e.g., GDPR, CCPA, HIPAA) and internal policies.

By establishing clear roles, responsibilities, and data lineage, organizations can build trust in their data, improve decision-making, and mitigate risks.

Data Engineering Process Fundamentals - Medallion Architecture Data Governance

Data Transformation and Incremental Strategy

The data transformation phase is a critical stage in a data warehouse project. This phase involves several key steps, including data extraction, cleaning, loading, data type casting, use of naming conventions, and implementing incremental loads to continuously insert the new information since the last update via batch processes.

Data Engineering Process Fundamentals - Data transformation lineage

Data Lineage: Tracks the flow of data from its origin to its destination, including all the intermediate processes and transformations that it undergoes.

Data Governance : Metadata

Assigns the owner, steward and responsibilities of the data.

Data Engineering Process Fundamentals - Medallion Architecture Governance Metadata

Summary: Leverage Medallion Architecture for Success

  • Key Benefits:
    • Improved data quality
    • Enhanced governance
    • Accelerated insights
    • Scalability
    • Cost Efficiency.

Data Engineering Process Fundamentals - Medallion Architecture Diagram

We've covered a lot today, but this is just the beginning!

If you're interested in learning more about building cloud data pipelines, I encourage you to check out my book, 'Data Engineering Process Fundamentals,' part of the Data Engineering Process Fundamentals series. It provides in-depth explanations, code samples, and practical exercises to help in your learning.

Data Engineering Process Fundamentals - Book by Oscar Garcia Data Engineering Process Fundamentals - Book by Oscar Garcia

Thanks for reading.

Send question or comment at Twitter @ozkary 👍 Originally published by ozkary.com

7/24/24

Building Real-Time Data Pipelines: A Practical Guide - Data Engineering Process Fundamentals

Overview

In modern data engineering solutions, handling streaming data is very important. Businesses often need real-time insights to promptly monitor and respond to operational changes and performance trends. A data streaming pipeline facilitates the integration of real-time data into data warehouses and visualization dashboards.

Data Engineering Process Fundamentals - Building Real-Time Data Pipelines: A Practical Guide

  • Follow this GitHub repo during the presentation: (Give it a star)

👉 https://github.com/ozkary/data-engineering-mta-turnstile

  • Read more information on my blog at:

👉 https://www.ozkary.com/2023/03/data-engineering-process-fundamentals.html

YouTube Video

Video Agenda

  1. What is Data Streaming?

    • Understanding the concept of continuous data flow.

    • Real-time vs. batch processing.

    • Benefits and use cases of data streaming.

  2. Data Streaming Channels

    • APIs (Application Programming Interfaces)

    • Events (system-generated signals)

    • Webhooks (HTTP callbacks triggered by events)

  3. Data Streaming Components

    • Message Broker (Apache Kafka)

    • Producers and consumers

    • Topics for data categorization

    • Stream Processing Engine (Apache Spark Structured Streaming)

  4. Solution Design and Architecture

    • Real-time data source integration

    • Leveraging Kafka for reliable message delivery

    • Spark Structured Streaming for real-time processing

    • Writing processed data to the data lake

  5. Q&A Session

    • Get your questions answered by the presenters.

Why Join This Session?

  • Stay Ahead of the Curve: Gain a comprehensive understanding of data streaming, a crucial aspect of modern data engineering.
  • Unlock Real-Time Insights: Learn how to leverage data streaming for immediate processing and analysis, enabling faster decision-making.
  • Learn Kafka and Spark: Explore the power of Apache Kafka as a message broker and Apache Spark Structured Streaming for real-time data processing.
  • Build a Robust Data Lake: Discover how to integrate real-time data into your data lake for a unified data repository.

Presentation

Introduction - What is Data Streaming?

Data streaming enables us to build data integration in real-time. Unlike traditional batch processing, where data is collected and processed periodically, streaming data arrives continuously, and it is processed on-the-fly.

  • Understanding the concept of continuous data flow
    • Real-time, uninterrupted transfer of data from various channels.
    • Allows for immediate processing and analysis of data as it is generated.
  • Real-time vs. batch processing
    • Data is collected and process in chunks at certain times
    • The data can take hours and even days depending on the source
  • Benefits and use cases of data streaming
    • React instantly to events
    • Predict trends with real-time updates
    • Update dashboard with up to the minute/seconds data

Data Engineering Process Fundamentals - What is data streaming

Data Streaming Channels

Data streams can arrive from various channels, often hosted on HTTP endpoints. The specific channel technology depends on the provider. Generally, the integration involves either a push or a pull connection.

  • Events (Push Model): These can be delivered using a subscription model like Pub/Sub, where your system subscribes to relevant topics and receives data "pushed" to it whenever events occur. Examples include user clicks, sensor readings, or train arrivals.

  • Webhooks (Push-Based on Events): These are HTTP callbacks triggered by specific events on external platforms. You set up endpoints that listen for these notifications to capture the data stream.

  • APIs (Pull Model): Application Programming Interfaces are used to actively fetch data from external services, like social media platforms. Scheduled calls are made to the API at specific intervals to retrieve the data.

Data Engineering Process Fundamentals - Data streaming channels

Data Streaming System

Powering real-time data pipelines, Apache Kafka efficiently ingests data streams, while Apache Spark analyzes and transforms it, enabling large-scale insights.

Apache Kafka:

Apache Kafka: The heart of the data stream. It's a high-performance platform that acts as a message broker, reliably ingesting data (events) from various sources like applications, sensors, and webhooks. These events are published to categorized channels (topics) within Kafka for further processing.

Spark Structured Streaming:

Built on Spark, it processes Kafka data streams in real-time. Unlike simple ingestion, it allows for transformations, filtering, and aggregations on the fly, enabling real-time analysis of streaming data.

Data Engineering Process Fundamentals - Data streaming Systems

Data Streaming Components

Apache Kafka acts as the central message broker, facilitating real-time data flow. Producers, like applications or sensors, publish data (events) to categorized channels (topics) within Kafka. Spark then subscribes as a consumer, continuously ingesting and processing these data streams in real-time.

  • Message Broker (Kafka): Routes real-time data streams.
  • Producers & Consumers: Producers send data to topics, Consumers receive and process it.
  • Topics (Categories): Organize data streams by category.
  • Stream Processing Engine (Spark Structured Streaming):
    • Reads data from Kafka.
    • Extracts information.
    • Transforms & summarizes data (aggregations).
    • Writes to a data lake.

Data Engineering Process Fundamentals - Data streaming Components

Use case Background

The Metropolitan Transportation Authority (MTA) subway system in New York has stations around the city. All the stations are equipped with turnstiles or gates which tracks as each person enters (departure) or exits (arrival) the station.

  • The MTA subway system has stations around the city.
  • All the stations are equipped with turnstiles or gates which tracks as each person enters or leaves the station.
  • CSV files provide information about the amount of commuters per stations at different time slots.

Data Engineering Process Fundamentals - Data streaming MTA Gates

Data Specifications

Since we already have a data transformation layer that incrementally updates the data warehouse, our real-time integration will focus on leveraging this existing pipeline. We'll achieve this by aggregating data from the stream and writing the results directly to the data lake.

  • Group by these categorical fields: "AC", "UNIT","SCP","STATION","LINENAME","DIVISION", "DATE", "DESC"
  • Aggregate these measures: "ENTRIES", "EXITS"
  • Sample result: "A001,R001,02-00-00,Test-Station,456NQR,BMT,09-23-23,REGULAR,16:54:00,140,153"

# Define the schema for the incoming data
turnstiles_schema = StructType([
    StructField("AC", StringType()),
    StructField("UNIT", StringType()),
    StructField("SCP", StringType()),
    StructField("STATION", StringType()),
    StructField("LINENAME", StringType()),
    StructField("DIVISION", StringType()),
    StructField("DATE", StringType()),
    StructField("TIME", StringType()),
    StructField("DESC", StringType()),
    StructField("ENTRIES", IntegerType()),
    StructField("EXITS", IntegerType()),
    StructField("ID", StringType()),
    StructField("TIMESTAMP", StringType())
])

Solution Architecture for Real-time Data Integration

Data streams are captured by the Kafka producer and sent to Kafka topics. The Spark-based stream consumer retrieves and processes the data in real-time, aggregating it for storage in the data lake.

Components:

  • Real-Time Data Source: Continuously emits data streams (events or messages).
  • Message Broker Layer:
    • Kafka Broker Instance: Acts as a central hub, efficiently collecting and organizing data into topics.
    • Kafka Producer (Python): Bridges the gap between the source and Kafka.
  • Stream Processing Layer:
    • Spark Instance: Processes and transforms data in real-time using Apache Spark.
    • Stream Consumer (Python): Consumes messages from Kafka and acts as both a Kafka consumer and Spark application:
      • Retrieves data as soon as it arrives.
      • Processes and aggregates data.
      • Saves results to a data lake.
  • Data Storage: Data transformation for visualization tools (Looker, Power BI) to access.
  • Docker Containers: Use containers for deployments

Data Engineering Process Fundamentals - Data streaming MTA Gates

Data Transformation and Incremental Strategy

The data transformation phase is a critical stage in a data warehouse project. This phase involves several key steps, including data extraction, cleaning, loading, data type casting, use of naming conventions, and implementing incremental loads to continuously insert the new information since the last update via batch processes.

Data Engineering Process Fundamentals - Data transformation lineage

Data Lineage: Tracks the flow of data from its origin to its destination, including all the intermediate processes and transformations that it undergoes.

Impact on Data Visualization

  • Our architecture efficiently processes real-time data by leveraging our existing data transformation layer.
  • This optimized flow enables significantly faster data visualization.
  • The dashboard refresh time can increase their frequency to load the new data.

For real-time updates directly on the dashboard, a socket-based integration would be necessary.

Data Engineering Process Fundamentals - Data transformation lineage

Key Takeaways: Real-Time Integration

Data streaming solutions are an absolute necessity, enabling the rapid processing and analysis of vast amounts of real-time data. Technologies like Kafka and Spark play a pivotal role in empowering organizations to harness real-time insights from their data streams.

  • Real-time Power: Kafka handles various data streams, feeding them to data topics.
  • Spark Processing Power: Spark reads from these topics, analyzes messages in real-time, and aggregates the data to our specifications.
  • Existing Pipeline Integration: Leverages existing pipelines to write data to data lakes for transformation.
  • Faster Insights: Delivers near real-time information for quicker data analysis and visualization.

We've covered a lot today, but this is just the beginning!

If you're interested in learning more about building cloud data pipelines, I encourage you to check out my book, 'Data Engineering Process Fundamentals,' part of the Data Engineering Process Fundamentals series. It provides in-depth explanations, code samples, and practical exercises to help in your learning.

Data Engineering Process Fundamentals - Book by Oscar Garcia Data Engineering Process Fundamentals - Book by Oscar Garcia

Thanks for reading.

Send question or comment at Twitter @ozkary 👍 Originally published by ozkary.com