Main Transformations and Actions Available in Apache Spark DataFrame: An Overview with Practical Examples

Clarifying Examples of Transformations and Actions in PySpark DataFrames.

🕒 Estimated reading time: 11 minutes

Welcome back to the third part of our PySpark series. If you missed our previous articles, you can catch up by clicking here and here.

Now, let's dive into the world of PySpark with some practical examples that are also available on Google Colab here 👨‍🔬.

Apache Spark Ecosystem

The Apache Spark ecosystem is a powerful, large-scale data processing platform that enables fast, distributed processing of large volumes of data. It integrates seamlessly with various components and tools such as Hadoop, Hive, HBase and Cassandra, making it highly versatile.

Spark Core is the core engine that manages the execution of distributed jobs, while additional libraries such as Spark SQL, Spark Streaming, MLlib, and GraphX ​​provide support for structured data querying, real-time data stream processing, machine learning, data processing, and graphs, respectively.

Apache Spark Ecosystem

Fundamental abstractions in the Apache Spark ecosystem

RDD (Resilient Distributed Dataset)

It is the central data structure in Apache Spark and represents an immutable, distributed collection of objects. A RDD is divided into partitions, which can be processed in parallel across a cluster of computers.

RDDs offer fault tolerance, meaning they can be automatically rebuilt if cluster nodes fail. RDDs provide a lower programming interface compared to DataFrames and Datasets, making them better suited for developers who need granular control over data processing.

  • Once created, RDD are immutable.

  • You can persist or cache RDDs in memory or disk.

  • Spark RDDs are fault tolerant. If a particular Spark node or task fails, RDD can be automatically built on remaining nodes and the task will terminate.

Operations on RDDs can be transformations (such as map, filter, groupByKey) or actions (such as collect, count, saveAsTextFile).

  • Transformations are operations that return new RDDs and are executed in a "lazy" manner, that is, they are only applied when an action is called.

  • Actions are operations that return values ​​or save data to external storage.

Lazy evaluation

Late evaluation in Spark means that execution will not start until an action is triggered. In Spark, late evaluation arises when Spark transformations occur.

Lazy evaluation

DataFrame

A DataFrame is a data abstraction organized into named columns. It is similar to a table in a relational database or a DataFrame in Python pandas or R. DataFrames in Spark are distributed and support large-scale data processing operations.

DataFrames are designed to be more performance efficient than RDDs because they leverage Spark's Catalyst Optimizer, which optimizes queries and performs physical optimizations such as column projections and pushdown filtering.

DataFrames also offer a richer API compared to RDDs, making it easier and more intuitive for developers to work with them.

Datasets

Datasets are a newer abstraction introduced in Spark 1.6. They are similar to DataFrames in terms of tabular representation of data, but provide static typing and Scala and Java programming language support. This means that Datasets have the benefits of compile-time type inference and type error checking.

Datasets combine the efficiency of DataFrames with the object orientation of RDDs. However, Dataset functionality is primarily available for Scala and Java, while Python has limited support as of Spark 3.0.

Summary

Apache RDD (Resilient Distributed Datasets) was a fundamental abstraction in Apache Spark for distributed data processing. RDDs provided a parallel, fault-tolerant data processing framework that allowed users to perform distributed computations on large data sets.

However, there have been advances in Spark, and newer abstractions such as DataFrames and Datasets have gained prominence. DataFrames and Datasets provide a higher-level API and optimizations over RDDs, making them easier to use and efficient for many common use cases.

RDDs are still used in certain scenarios where fine-grained control over distributed data processing is required. However, for most users, especially those dealing with structured data, DataFrames and Datasets have been recommended due to their ease of use and performance benefits.

For those learning Apache Spark or distributed computing, it is beneficial to understand the historical context of RDDs, but you should focus on newer abstractions such as DataFrames and Datasets as they are more widely used in modern Spark applications.

Main Transformations

select, selects a specific set of columns.

df.select("column1", "column2").show()

filter, filters rows based on a condition.

df.filter(df["column"] > 10).show()

groupBy, groups the DataFrame by one or more columns.

df.groupBy("column1").agg({"column2": "sum"}).show()

withColumn, adds or replaces a column.

df.withColumn("new_column", df["column"] * 2).show()

join, performs a join between two DataFrames.

df1.join(df2, df1["column1"] == df2["column2"], "inner").show()

orderBy, orders the DataFrame based on one or more columns.

df.orderBy("column1", ascending=False).show()

drop, removes a column from the DataFrame.

df.drop("column").show()

distinct, returns the distinct rows of the DataFrame.

df.distinct().show()

Main Actions

  • show, displays the first rows of the DataFrame.

  • count, returns the number of rows in the DataFrame.

  • collect, returns all rows of the DataFrame as a list in the driver program.

  • take, returns the first n rows of the DataFrame.

  • describe, calculates descriptive statistics for numeric columns.

  • printSchema, displays the DataFrame schema.

  • write, writes the DataFrame to external sources.

Project in Action: From Theory to Practice

To reinforce your knowledge, we will apply what we learned in a small practical project inspired by the articles Analyzing Excel Sales Data with Python Pandas and Seaborn. If you missed our previous articles, you can catch up by clicking here, here and here.

  1. Create a Spark session.

  2. Read two CSV files into Spark DataFrames.

  3. Perform a join between the two DataFrames.

  4. Add a new column with mathematical operations.

  5. Group the data by a column and then apply an aggregation.

  6. Ordering and display of final results.

  7. Save the resulting DataFrames to a Parquet file.

Below is a complete commented code example to guide you:

Create a Spark session

from pyspark.sql import SparkSession

# 1. Create the Spark Session
# SparkSession is the main gateway to interacting with Spark.
# Here, we are creating an instance of SparkSession, which will be used to create DataFrames.

# "builder" creates an instance of Builder that allows us to configure the Spark session.
# "appName" sets the name of the Spark application so that we can identify it in the Spark UI.
# "getOrCreate" creates a new SparkSession if there is no existing one, or returns the existing one.
spark = SparkSession.builder.appName("Practical Project").getOrCreate()

# From this point, we can use 'spark' to load data, transform it, and perform various analytical operations.

Read two CSV files into Spark DataFrames

# Read a CSV file into a DataFrame
# The read.csv() method reads the specified CSV file.
# "orders.csv" is the name of the file to read.
# header=True tells Spark that the CSV file has a header in the first row, which will be used as column names.
# inferSchema=True tells Spark to automatically infer the data type of each column (e.g. integer, string, etc.)
orders_df = spark.read.csv("orders.csv", header=True, inferSchema=True)

# Print the schema of the DataFrame, which includes the column names and the data types of each column
# printSchema() is useful for understanding the structure of the data you are manipulating.
orders_df.printSchema()

# Print the first 5 rows of the DataFrame.
# 'show' prints the data to standard output. # The parameter 5 indicates how many rows should be displayed.
# 'truncate=False' ensures that the cell contents are not truncated and are displayed in full.
orders_df.show(5, truncate=False)

Running the above code will produce the following output that displays the schema (structure) of the DataFrame and the first 5 rows of the DataFrame.

# Read a CSV file and load the data into a DataFrame.
# 'read.csv' is a method provided by SparkSession to read CSV files.
# "customers.csv" is the path to the CSV file we want to read.
# header=True indicates that the CSV file has a header row with the column names.
# inferSchema=True allows Spark to automatically infer the data types of the columns based on the analysis of the data.
customers_df = spark.read.csv("customers.csv", header=True, inferSchema=True)

# Display the schema (structure) of the DataFrame.
# 'printSchema' shows the columns and their respective inferred data types.
customers_df.printSchema()

# Display the first 5 rows of the DataFrame.
# 'show' displays the data to standard output.
# The parameter 5 indicates how many rows to display.
# 'truncate=False' ensures that the cell contents are not truncated and are displayed in full.
customers_df.show(5, truncate=False)

Perform a join between the two DataFrames

# Import the function library from the pyspark.sql module, renaming it "F"
# This makes it easier to call SQL functions provided by PySpark.
from pyspark.sql import functions as F

# Perform a join between two DataFrames: customers_df and orders_df.
# The join is made based on the "CustomerID" column that both DataFrames have.
customers_orders_df = customers_df.join(orders_df, customers_df["CustomerID"] == orders_df["CustomerID"])

# Show the first 5 records of the resulting DataFrame after the join.
customers_orders_df.show(5, truncate=False)

Running the above code will produce the following output.

Add a new column with math operations

# The withColumn function accepts two arguments: the name of the new column and the calculation/resulting expression.
# In this case, we are creating a new column called "Total".
# The resulting expression is the multiplication (using the * operator) of the value of the 'Quantity' column by the value of the 'Price' column for each row of the customers_orders_df DataFrame.
customers_orders_df = customers_orders_df.withColumn("Total", F.col('Quantity') * F.col("Price"))

# The show method displays the first n rows of the DataFrame, where n is specified by the user.
customers_orders_df.show(5, truncate=False)

Running the above code will produce the following output. In this case, we are creating a new column called "Total".

Group the data by a column and then apply an aggregation

# Group the data by product name and calculate the sum of total sales for each product
revenue_by_product_df = customers_orders_df.groupBy("Product").agg(F.sum("Total").alias("Total Revenue"))

# Display the first 5 rows of the resulting DataFrame without truncating the columns (show the entire contents of the columns)
revenue_by_product_df.show(5, truncate=False)

Running the above code will produce the following output with the total revenue per product.

Ordering and display of final results

# Sorts 'revenue_by_product_df' in descending order by the "Total Revenue" column
# F.desc("Total Revenue") specifies that the ordering should be done in descending order in relation to the total revenue
# In SQL, it would be equivalent to: ORDER BY "Total Revenue" DESC
top_10_products_df = revenue_by_product_df.orderBy(F.desc("Total Revenue")).limit(10)

# Applies a limit of 10 rows after the ordering
# That is, selects only the first 10 products with the highest total revenue
# In SQL, it would be equivalent to: LIMIT 10

# Displays the resulting DataFrame ('top_10_products_df') in the console
top_10_products_df.show(truncate=False)

Running the above code will produce the following output. Here only the first 10 products with the highest total revenue are selected.

Selecting the Top 10 Products with the Most Unit Sales

# We group the data by the "Product" field to combine all rows that have the same product.
# Then, we add up the number of units sold for each product.
# The groupBy function is used to group the data and the sum function to add up the values ​​of the "Quantity" field.

top_10_products_by_quantity_df = (
    customers_orders_df
    .groupBy("Product")               # Grouping the data by the "Product" field
    .sum("Quantity")                  # Sum of the quantities (units) for each product
    .orderBy(F.desc("sum(Quantity)")) # Sorting in descending order by the sum of the quantities
    .limit(10)                        # Limiting the result to the top 10 products with the most sales
)
# Display the results of the DataFrame
top_10_products_by_quantity_df.show(truncate=False)

Running the code above will produce the following output, with the top 10 products with the most unit sales.

Save the resulting DataFrames to a Parquet file

# The following code saves two DataFrames in Parquet format using Apache Spark.
# Each code block saves a different DataFrame and uses "snappy" compression to reduce the file size.

# Saving the DataFrame "top_10_produtos_df" in Parquet format
# Start the DataFrame writing operation using the write method
top_10_products_df.write \
    .format("parquet") \
    .mode("overwrite") \
    .option("compression", "snappy") \
    .save("top_10_produtos.parquet")

# Saving the DataFrame "top_10_produtos_por_quantidade_df" in Parquet format
# Start the DataFrame writing operation using the write method
top_10_products_by_quantity_df.write \
    .format("parquet") \
    .mode("overwrite") \
    .option("compression", "snappy") \
    .save("top_10_products_by_quantity.parquet")

# Explanation of the methods used
# .format("parquet") Sets the output file format to "parquet"
# .mode("overwrite") Sets the writing mode to "overwrite". This means that if the file already exists, it will be overwritten.
# .option("compression", "snappy") Sets the file compression option to "snappy" to reduce storage space
# .save("top_10_products.parquet") Specifies the path where the parquet file will be saved

Conclusion

In this article we cover the main transformations and actions available in the Apache Spark DataFrame, providing practical examples for efficient manipulation and processing of large volumes of data. Initially, it presents an overview of the Apache Spark ecosystem and its fundamental abstractions, such as RDD, DataFrames and Datasets, highlighting their characteristics and use cases.

The transformations discussed include operations such as select, filter, groupBy, withColumn, join, and orderBy. Among the actions, show, printSchema, and write stand out. A practical example is presented, showing the creation of a Spark session, the manipulation of DataFrames from CSV files, and the performance of complex operations, culminating in the persistence of results in Parquet files.

For Apache Spark beginners and intermediate professionals, familiarization with DataFrames should be a priority, given their optimization capabilities and the simplicity of their API. Even though RDDs still find application in specific scenarios where granularity and control are required, DataFrames and Datasets are the recommended abstractions due to their powerful combination of ease of use and performance enhancement.

We hope this article has provided you with a solid understanding of manipulating DataFrames in Spark and that the examples provided serve as a starting point for your own large-scale data analysis. We invite you to continue experimenting with Google Colab and explore the many capabilities of this robust tool to enhance your Apache Spark knowledge and skills.

To deepen your knowledge and see more practical examples, check out our previous articles mentioned at the beginning of this page, in addition to the official Apache Spark manual.