Most likely scenario here is that you cache input DataFrame. In that case, Spark won't attempt selection or projection pushdown, and instead will fetch data to the cluster, and process locally.
It easy to illustrate this behavior:
df = spark.read.jdbc(url, table, properties={})
df
DataFrame[id: int, UPDATE_DATE: timestamp]
If data is not cached:
df.select("UPDATE_DATE").where(df.UPDATE_DATE > max_lookup_datetime).explain(True)
== Parsed Logical Plan ==
Filter (UPDATE_DATE#1 > 1516289713075960)
+- Project [UPDATE_DATE#1]
+- Relation[id#0,UPDATE_DATE#1] JDBCRelation(df) [numPartitions=1]
== Analyzed Logical Plan ==
UPDATE_DATE: timestamp
Filter (UPDATE_DATE#1 > 1516289713075960)
+- Project [UPDATE_DATE#1]
+- Relation[id#0,UPDATE_DATE#1] JDBCRelation(df) [numPartitions=1]
== Optimized Logical Plan ==
Project [UPDATE_DATE#1]
+- Filter (isnotnull(UPDATE_DATE#1) && (UPDATE_DATE#1 > 1516289713075960))
+- Relation[id#0,UPDATE_DATE#1] JDBCRelation(df) [numPartitions=1]
== Physical Plan ==
*Scan JDBCRelation(df) [numPartitions=1] [UPDATE_DATE#1] PushedFilters: [*IsNotNull(UPDATE_DATE), *GreaterThan(UPDATE_DATE,2018-01-18 15:35:13.07596)], ReadSchema: struct<UPDATE_DATE:timestamp>
both selection and projection are pushed down. However if you cache df, and check execution plan once again:
df.cache()
DataFrame[id: int, UPDATE_DATE: timestamp]
df.select("UPDATE_DATE").where(df.UPDATE_DATE > max_lookup_datetime).explain(True)max_lookup_datetime).explain(True)
== Parsed Logical Plan ==
Filter (UPDATE_DATE#1 > 1516289713075960)
+- Project [UPDATE_DATE#1]
+- Relation[id#0,UPDATE_DATE#1] JDBCRelation(df) [numPartitions=1]
== Analyzed Logical Plan ==
UPDATE_DATE: timestamp
Filter (UPDATE_DATE#1 > 1516289713075960)
+- Project [UPDATE_DATE#1]
+- Relation[id#0,UPDATE_DATE#1] JDBCRelation(df) [numPartitions=1]
== Optimized Logical Plan ==
Project [UPDATE_DATE#1]
+- Filter (isnotnull(UPDATE_DATE#1) && (UPDATE_DATE#1 > 1516289713075960))
+- InMemoryRelation [id#0, UPDATE_DATE#1], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- *Scan JDBCRelation(df) [numPartitions=1] [id#0,UPDATE_DATE#1] ReadSchema: struct<id:int,UPDATE_DATE:timestamp>
== Physical Plan ==
*Filter (isnotnull(UPDATE_DATE#1) && (UPDATE_DATE#1 > 1516289713075960))
+- InMemoryTableScan [UPDATE_DATE#1], [isnotnull(UPDATE_DATE#1), (UPDATE_DATE#1 > 1516289713075960)]
+- InMemoryRelation [id#0, UPDATE_DATE#1], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- *Scan JDBCRelation(df) [numPartitions=1] [id#0,UPDATE_DATE#1] ReadSchema: struct<id:int,UPDATE_DATE:timestamp>
both projection and selection are delayed.