TakeOrderedAndProjectExec¶
Overview¶
The TakeOrderedAndProjectExec operator efficiently combines sorting, limiting, offset, and projection operations into a single physical operator. It takes the first limit elements as defined by the sortOrder, drops the first offset elements, and applies projection if needed. This operator is logically equivalent to having a Limit and/or Offset operator after a SortExec operator with an optional ProjectExec.
When Used¶
The query planner chooses this operator when:
- A LIMIT clause is combined with ORDER BY in SQL queries
- An OFFSET is specified along with ORDER BY and LIMIT
- Projection is needed after the ordered limit operation
- The operation is the final operator collecting results back to the driver
- It's more efficient than separate Sort → Limit → Project operators
Input Requirements¶
- Input partitioning: No specific requirement (accepts any partitioning)
- Input ordering: Can work with any input ordering; optimizes when input already satisfies the required sort order
- Number of children: Unary operator (extends
OrderPreservingUnaryExecNode)
Output Properties¶
- Output partitioning: Always
SinglePartition- collects all data to a single partition - Output ordering: Preserves the specified
sortOrderfrom the ordering expressions - Output schema: Derived from the
projectListexpressions viaprojectList.map(_.toAttribute)
Algorithm¶
• Ordering Check: Determines if child's output ordering already satisfies the required sort order using SortOrder.orderingSatisfies()
• Local TopK: Each partition computes its local top-K elements - if ordering is satisfied, uses simple take(limit), otherwise uses Utils.takeOrdered()
• Shuffle Consolidation: If multiple partitions exist, shuffles local results to a single partition using ShuffleExchangeExec
• Global TopK: Applies Utils.takeOrdered() on the consolidated data to get the final top-K elements
• Offset Application: Drops the first offset elements from the ordered results if offset > 0
• Projection: Applies UnsafeProjection if projectList differs from child output schema
• Memory Copying: Creates defensive copies of rows using iter.map(_.copy()) to avoid corruption
Memory Usage¶
- Spill behavior: Does not directly spill to disk, but relies on
Utils.takeOrdered()which may use external sorting for large datasets - Memory requirements: Needs to buffer up to
limitrows per partition in memory during local TopK phase - Buffering: Maintains internal buffers for the top-K elements and uses
UnsafeRowSerializerfor shuffle operations
Partitioning Behavior¶
- Distribution effect: Reduces data to a single partition, eliminating parallelism for downstream operators
- Shuffle requirements: Requires shuffle when input has multiple partitions (uses
ShuffleExchangeExecwithSinglePartition) - Partition changes: Always outputs exactly 1 partition regardless of input partition count
Supported Join/Aggregation Types¶
Not applicable - this is a limit/projection operator, not a join or aggregation operator.
Metrics¶
- Shuffle write metrics: Bytes written, records written, write time (from
SQLShuffleWriteMetricsReporter) - Shuffle read metrics: Bytes read, records read, fetch wait time, local blocks read (from
SQLShuffleReadMetricsReporter) - Combined metrics:
metrics = readMetrics ++ writeMetrics
Code Generation¶
This operator does not support whole-stage code generation. It extends OrderPreservingUnaryExecNode but does not implement CodegenSupport interface, relying instead on RDD transformations.
Configuration Options¶
spark.sql.execution.arrow.maxRecordsPerBatch: May affect batch processing during shuffle operationsspark.sql.shuffle.partitions: Affects shuffle behavior during consolidation phasespark.serializer: Affects theUnsafeRowSerializerperformance during shuffle- Memory-related configs:
spark.executor.memory,spark.sql.execution.sort.spillThresholdaffect the underlying sorting operations
Edge Cases¶
- Null handling: Relies on
LazilyGeneratedOrderingwhich handles nulls according to SQL semantics in sort expressions - Empty partitions: Returns
ParallelCollectionRDDwith empty sequence when input has 0 partitions - Zero limit: Handles
limit = 0by returning empty RDD - Offset without limit: Supports
limit = -1withoffset > 0to skip rows without limiting - Large offsets: May be inefficient if offset is much larger than the dataset size
Examples¶
TakeOrderedAndProject(limit=10, offset=5, orderBy=[age#1 ASC NULLS FIRST], output=[name#0,age#1])
+- LocalTableScan [name#0, age#1]
This shows a query like SELECT name, age FROM people ORDER BY age LIMIT 10 OFFSET 5.
See Also¶
- CollectLimitExec: For simple limit operations without ordering
- GlobalLimitExec: For limit operations across all partitions
- SortExec: For sorting operations without limit
- ProjectExec: For projection-only operations