Skip to content

TakeOrderedAndProjectExec

Overview

The TakeOrderedAndProjectExec operator efficiently combines sorting, limiting, offset, and projection operations into a single physical operator. It takes the first limit elements as defined by the sortOrder, drops the first offset elements, and applies projection if needed. This operator is logically equivalent to having a Limit and/or Offset operator after a SortExec operator with an optional ProjectExec.

When Used

The query planner chooses this operator when: - A LIMIT clause is combined with ORDER BY in SQL queries - An OFFSET is specified along with ORDER BY and LIMIT - Projection is needed after the ordered limit operation - The operation is the final operator collecting results back to the driver - It's more efficient than separate Sort → Limit → Project operators

Input Requirements

  • Input partitioning: No specific requirement (accepts any partitioning)
  • Input ordering: Can work with any input ordering; optimizes when input already satisfies the required sort order
  • Number of children: Unary operator (extends OrderPreservingUnaryExecNode)

Output Properties

  • Output partitioning: Always SinglePartition - collects all data to a single partition
  • Output ordering: Preserves the specified sortOrder from the ordering expressions
  • Output schema: Derived from the projectList expressions via projectList.map(_.toAttribute)

Algorithm

Ordering Check: Determines if child's output ordering already satisfies the required sort order using SortOrder.orderingSatisfies()Local TopK: Each partition computes its local top-K elements - if ordering is satisfied, uses simple take(limit), otherwise uses Utils.takeOrdered()Shuffle Consolidation: If multiple partitions exist, shuffles local results to a single partition using ShuffleExchangeExecGlobal TopK: Applies Utils.takeOrdered() on the consolidated data to get the final top-K elements • Offset Application: Drops the first offset elements from the ordered results if offset > 0 • Projection: Applies UnsafeProjection if projectList differs from child output schema • Memory Copying: Creates defensive copies of rows using iter.map(_.copy()) to avoid corruption

Memory Usage

  • Spill behavior: Does not directly spill to disk, but relies on Utils.takeOrdered() which may use external sorting for large datasets
  • Memory requirements: Needs to buffer up to limit rows per partition in memory during local TopK phase
  • Buffering: Maintains internal buffers for the top-K elements and uses UnsafeRowSerializer for shuffle operations

Partitioning Behavior

  • Distribution effect: Reduces data to a single partition, eliminating parallelism for downstream operators
  • Shuffle requirements: Requires shuffle when input has multiple partitions (uses ShuffleExchangeExec with SinglePartition)
  • Partition changes: Always outputs exactly 1 partition regardless of input partition count

Supported Join/Aggregation Types

Not applicable - this is a limit/projection operator, not a join or aggregation operator.

Metrics

  • Shuffle write metrics: Bytes written, records written, write time (from SQLShuffleWriteMetricsReporter)
  • Shuffle read metrics: Bytes read, records read, fetch wait time, local blocks read (from SQLShuffleReadMetricsReporter)
  • Combined metrics: metrics = readMetrics ++ writeMetrics

Code Generation

This operator does not support whole-stage code generation. It extends OrderPreservingUnaryExecNode but does not implement CodegenSupport interface, relying instead on RDD transformations.

Configuration Options

  • spark.sql.execution.arrow.maxRecordsPerBatch: May affect batch processing during shuffle operations
  • spark.sql.shuffle.partitions: Affects shuffle behavior during consolidation phase
  • spark.serializer: Affects the UnsafeRowSerializer performance during shuffle
  • Memory-related configs: spark.executor.memory, spark.sql.execution.sort.spillThreshold affect the underlying sorting operations

Edge Cases

  • Null handling: Relies on LazilyGeneratedOrdering which handles nulls according to SQL semantics in sort expressions
  • Empty partitions: Returns ParallelCollectionRDD with empty sequence when input has 0 partitions
  • Zero limit: Handles limit = 0 by returning empty RDD
  • Offset without limit: Supports limit = -1 with offset > 0 to skip rows without limiting
  • Large offsets: May be inefficient if offset is much larger than the dataset size

Examples

TakeOrderedAndProject(limit=10, offset=5, orderBy=[age#1 ASC NULLS FIRST], output=[name#0,age#1])
+- LocalTableScan [name#0, age#1]

This shows a query like SELECT name, age FROM people ORDER BY age LIMIT 10 OFFSET 5.

See Also

  • CollectLimitExec: For simple limit operations without ordering
  • GlobalLimitExec: For limit operations across all partitions
  • SortExec: For sorting operations without limit
  • ProjectExec: For projection-only operations