.transform() vs .withColumns(): Right Tool, Right Grain

pyspark
Spark
best-practices
Using .transform() for column-level work hides intent and increases code volume. .withColumns() is explicit, compact, and generates a single plan node. Use .transform() for pipeline-level chaining instead.
Modified

27/02/2026

NoteLive Execution

All code in this post is rendered against a Databricks workspace before publication.

NoteContext

Inspired by Denys K.’s LinkedIn post on using .transform() to break up long .withColumn() chains. The intent is spot on — nobody wants 500-line mega-chains. But there’s a better tool for column-level work.

Summary

  • Wrapping individual .withColumn() calls in functions and chaining with .transform() hides column-level intent — you have to read every function definition to know what columns are being added
  • .withColumns() (Spark 3.3+ / DBR 12.0+) keeps every column name and expression visible at the call site, in a single plan node
  • .transform() shines at pipeline-level chaining: deduplication, SCD logic, audit metadata — not individual column expressions
  • Both approaches are testable.

Faker-generated data matching the original post’s scenario: first, last, age, email, status.

from faker import Faker
from pyspark.sql import functions as F
from pyspark.sql.types import *
import random

fake = Faker()
Faker.seed(42)
random.seed(42)

rows = []
for _ in range(500):
    rows.append((
        fake.first_name(),
        fake.last_name(),
        random.randint(18, 75),
        f"  {fake.email()}  ",  # whitespace to clean
        random.choice(["active", "inactive", "pending", "suspended"]),
    ))

schema = StructType([
    StructField("first",  StringType()),
    StructField("last",   StringType()),
    StructField("age",    IntegerType()),
    StructField("email",  StringType()),
    StructField("status", StringType()),
])

df = spark.createDataFrame(rows, schema)
df.show(5, truncate=False)
+--------+--------+---+-----------------------------+--------+
|first   |last    |age|email                        |status  |
+--------+--------+---+-----------------------------+--------+
|Danielle|Johnson |58 |  john21@example.net         |active  |
|Joy     |Gardner |19 |  fjohnson@example.org       |pending |
|Jesse   |Guzman  |33 |  jennifermiles@example.com  |inactive|
|Jeffrey |Lawrence|26 |  blakeerik@example.com      |active  |
|Matthew |Moore   |61 |  curtis61@example.com       |active  |
+--------+--------+---+-----------------------------+--------+
only showing top 5 rows

Approach 1: .transform() with Per-Column Functions

This is the pattern from the original post — each column gets its own function:

from pyspark.sql import functions as F

def add_full_name(df):
    return df.withColumn(
        "full_name", F.concat_ws(" ", F.col("first"), F.col("last"))
    )

def categorize_age(df):
    return df.withColumn(
        "age_group",
        F.when(F.col("age") < 30, "young")
         .when(F.col("age") < 50, "mid")
         .otherwise("senior"),
    )

def normalize_email(df):
    return df.withColumn("email_clean", F.lower(F.trim(F.col("email"))))

def clean_status(df):
    return df.withColumn(
        "status_clean",
        F.when(F.col("status") == "active", "ACTIVE").otherwise("INACTIVE"),
    )

df_transform = (
    df
    .transform(add_full_name)
    .transform(categorize_age)
    .transform(normalize_email)
    .transform(clean_status)
)

df_transform.show(5, truncate=False)
+--------+--------+---+-----------------------------+--------+----------------+---------+-------------------------+------------+
|first   |last    |age|email                        |status  |full_name       |age_group|email_clean              |status_clean|
+--------+--------+---+-----------------------------+--------+----------------+---------+-------------------------+------------+
|Danielle|Johnson |58 |  john21@example.net         |active  |Danielle Johnson|senior   |john21@example.net       |ACTIVE      |
|Joy     |Gardner |19 |  fjohnson@example.org       |pending |Joy Gardner     |young    |fjohnson@example.org     |INACTIVE    |
|Jesse   |Guzman  |33 |  jennifermiles@example.com  |inactive|Jesse Guzman    |mid      |jennifermiles@example.com|INACTIVE    |
|Jeffrey |Lawrence|26 |  blakeerik@example.com      |active  |Jeffrey Lawrence|young    |blakeerik@example.com    |ACTIVE      |
|Matthew |Moore   |61 |  curtis61@example.com       |active  |Matthew Moore   |senior   |curtis61@example.com     |ACTIVE      |
+--------+--------+---+-----------------------------+--------+----------------+---------+-------------------------+------------+
only showing top 5 rows
WarningThe problem

To understand what columns this adds, you need to read 4 separate function definitions. The column names and expressions are scattered. Each .transform() wrapping a .withColumn() still creates a separate Project node — 4 nodes total, same as a loop. And the total line count went up, not down.

Approach 2: .withColumns() — Explicit and Compact

from pyspark.sql import functions as F

df_with_columns = df.withColumns({
    "full_name":    F.concat_ws(" ", F.col("first"), F.col("last")),
    "age_group":    F.when(F.col("age") < 30, "young")
                     .when(F.col("age") < 50, "mid")
                     .otherwise("senior"),
    "email_clean":  F.lower(F.trim(F.col("email"))),
})

df_with_columns.show(5, truncate=False)
+--------+--------+---+-----------------------------+--------+----------------+---------+-------------------------+
|first   |last    |age|email                        |status  |full_name       |age_group|email_clean              |
+--------+--------+---+-----------------------------+--------+----------------+---------+-------------------------+
|Danielle|Johnson |58 |  john21@example.net         |active  |Danielle Johnson|senior   |john21@example.net       |
|Joy     |Gardner |19 |  fjohnson@example.org       |pending |Joy Gardner     |young    |fjohnson@example.org     |
|Jesse   |Guzman  |33 |  jennifermiles@example.com  |inactive|Jesse Guzman    |mid      |jennifermiles@example.com|
|Jeffrey |Lawrence|26 |  blakeerik@example.com      |active  |Jeffrey Lawrence|young    |blakeerik@example.com    |
|Matthew |Moore   |61 |  curtis61@example.com       |active  |Matthew Moore   |senior   |curtis61@example.com     |
+--------+--------+---+-----------------------------+--------+----------------+---------+-------------------------+
only showing top 5 rows
TipWhy this wins

One call. Every column name and its expression visible at the call site. No function definitions to chase. Fewer lines. More readable.

When .transform() IS the Right Tool

.transform() is excellent at the pipeline grain — composing meaningful DataFrame stages:

from pyspark.sql import functions as F
from pyspark.sql.window import Window

def select_latest_per_email(df):
    """Keep the most recent record per email."""
    w = Window.partitionBy("email").orderBy(F.col("age").desc())
    return (df
        .withColumn("_rn", F.row_number().over(w))
        .filter("_rn = 1")
        .drop("_rn"))

def enrich_columns(df):
    """Add derived columns in one pass."""
    return df.withColumns({
        "full_name":    F.concat_ws(" ", F.col("first"), F.col("last")),
        "age_group":    F.when(F.col("age") < 30, "young")
                         .when(F.col("age") < 50, "mid")
                         .otherwise("senior"),
        "email_clean":  F.lower(F.trim(F.col("email"))),
        "status_clean": F.when(F.col("status") == "active", "ACTIVE")
                         .otherwise("INACTIVE"),
    })

def add_audit_metadata(df):
    """Standard audit columns."""
    return df.withColumns({
        "ingested_at": F.current_timestamp(),
        "source":      F.lit("CRM"),
    })

# Pipeline-level chaining — THIS is the right grain for .transform()
df_final = (
    df
    .transform(select_latest_per_email)
    .transform(enrich_columns)
    .transform(add_audit_metadata)
)

df_final.show(5, truncate=False)
+-------+---------+---+-----------------------------+---------+---------------+---------+-------------------------+------------+--------------------------+------+
|first  |last     |age|email                        |status   |full_name      |age_group|email_clean              |status_clean|ingested_at               |source|
+-------+---------+---+-----------------------------+---------+---------------+---------+-------------------------+------------+--------------------------+------+
|Clayton|Gray     |47 |  abrown@example.net         |suspended|Clayton Gray   |mid      |abrown@example.net       |INACTIVE    |2026-02-27 19:08:57.959623|CRM   |
|James  |Rodriguez|59 |  adam65@example.net         |suspended|James Rodriguez|senior   |adam65@example.net       |INACTIVE    |2026-02-27 19:08:57.959623|CRM   |
|Tanya  |Riley    |60 |  adkinsbrian@example.net    |pending  |Tanya Riley    |senior   |adkinsbrian@example.net  |INACTIVE    |2026-02-27 19:08:57.959623|CRM   |
|Larry  |Gibson   |35 |  adrianabailey@example.org  |pending  |Larry Gibson   |mid      |adrianabailey@example.org|INACTIVE    |2026-02-27 19:08:57.959623|CRM   |
|Kelly  |Raymond  |73 |  agarcia@example.net        |pending  |Kelly Raymond  |senior   |agarcia@example.net      |INACTIVE    |2026-02-27 19:08:57.959623|CRM   |
+-------+---------+---+-----------------------------+---------+---------------+---------+-------------------------+------------+--------------------------+------+
only showing top 5 rows
TipBest of both

Each .transform() function is a meaningful pipeline stage — deduplication, enrichment, auditing. Inside each stage, .withColumns() handles the column work. This is composable, readable, and generates an efficient plan.

What About Testing?

A fair point for .transform(): you can test each function in isolation with a tiny DataFrame. But .withColumns() is equally testable:

from pyspark.sql import functions as F

# Define expressions as reusable variables
_age_group = (
    F.when(F.col("age") < 30, "young")
     .when(F.col("age") < 50, "mid")
     .otherwise("senior")
)

# Test against a 3-row DataFrame — same as you would with a function
test_df = spark.createDataFrame([(25,), (45,), (65,)], ["age"])
test_df.select(_age_group.alias("age_group")).show()

# Or test the full enrichment dict
ENRICHMENT_COLS = {
    "age_group":    _age_group,
    "status_clean": F.when(F.col("status") == "active", "ACTIVE")
                     .otherwise("INACTIVE"),
}

test_df2 = spark.createDataFrame(
    [(25, "active"), (45, "inactive")],
    ["age", "status"],
)
test_df2.withColumns(ENRICHMENT_COLS).show()
+---------+
|age_group|
+---------+
|    young|
|      mid|
|   senior|
+---------+

+---+--------+---------+------------+
|age|  status|age_group|status_clean|
+---+--------+---------+------------+
| 25|  active|    young|      ACTIVE|
| 45|inactive|      mid|    INACTIVE|
+---+--------+---------+------------+

Extract expressions to variables or the dict to a constant — both are testable with a 3-row dummy DataFrame.

References & Further Reading

Subscribe for daily Databricks tips

Get practical examples and concise updates delivered to your inbox.

Back to top