Skip to content

UpdatingSessionsExec

Overview

UpdatingSessionsExec is a physical operator that updates session window specifications for input rows by analyzing neighboring rows to determine which rows belong to the same session window. Unlike MergingSessionsExec, it preserves the number of input rows (1:1 mapping) and only updates session window metadata without performing aggregation.

When Used

The query planner chooses this operator when session window processing is required but aggregation cannot be performed simultaneously with session window determination. This is typically used as a preprocessing step when the query structure prevents combining session window calculation with aggregation in a single MergingSessionsExec operator.

Input Requirements

  • Expected input partitioning: Either AllTuples (for streaming without grouping keys) or ClusteredDistribution/StatefulOperatorPartitioning based on grouping expressions
  • Expected input ordering: Ascending sort on grouping keys (excluding session expression) followed by session expression
  • Number of children: Unary (single child operator)

Output Properties

  • Output partitioning: Preserves child's output partitioning unchanged
  • Output ordering: No specific ordering guarantees beyond input ordering preservation
  • Output schema: Identical to child's output schema - no schema transformation occurs

Algorithm

  • Partition input data using mapPartitions transformation on child RDD
  • Create UpdatingSessionsIterator for each partition with buffering thresholds from configuration
  • Iterator processes sorted rows within each partition to identify session boundaries
  • Analyze neighboring rows based on grouping expressions to determine session membership
  • Update session window metadata for each row while preserving original row structure
  • Apply memory management with configurable in-memory and spill thresholds
  • Output updated rows maintaining 1:1 correspondence with input

Memory Usage

  • Spill to disk: Yes, supports spilling based on configurable thresholds
  • Memory requirements: Controlled by three configuration parameters for buffer management
  • Buffering behavior: Uses sessionWindowBufferInMemoryThreshold, sessionWindowBufferSpillThreshold, and sessionWindowBufferSpillSizeThreshold for memory management

Partitioning Behavior

  • Data distribution: Preserves input partitioning without redistribution
  • Shuffle requirements: May require shuffle if child doesn't meet required distribution (ClusteredDistribution for batch, StatefulOperatorPartitioning for streaming)
  • Partition count changes: No change in partition count

Supported Join/Aggregation Types

Not applicable - this operator performs session window processing, not joins or aggregations.

Metrics

The specific metrics are not visible in this source code as they would be implemented in the UpdatingSessionsIterator class. Typical session window metrics would include rows processed, session windows created, and spill-related metrics.

Code Generation

No explicit whole-stage code generation support is evident in this operator. Processing is handled through iterator-based execution via UpdatingSessionsIterator.

Configuration Options

  • spark.sql.streaming.sessionWindow.buffer.inMemoryThreshold - Controls in-memory buffer threshold
  • spark.sql.streaming.sessionWindow.buffer.spillThreshold - Controls when to trigger spilling
  • spark.sql.streaming.sessionWindow.buffer.spillSizeThreshold - Controls spill size threshold
  • Number of shuffle partitions (for streaming mode via numShufflePartitions parameter)

Edge Cases

  • Null handling: Specific null handling behavior would be implemented in UpdatingSessionsIterator
  • Empty partition handling: Empty partitions are processed through mapPartitions with empty iterators
  • Skew handling: Relies on proper partitioning of grouping keys; no specific skew mitigation beyond standard Spark mechanisms

Examples

== Physical Plan ==
UpdatingSessionsExec(isStreaming=false, numShufflePartitions=None, 
  groupingExpression=[user_id#1], sessionExpression=session_window#5)
+- Sort [user_id#1 ASC NULLS FIRST, session_window#5 ASC NULLS FIRST]
   +- Exchange hashpartitioning(user_id#1, 200)
      +- LocalTableScan [user_id#1, event_time#2, session_window#5]

See Also

  • MergingSessionsExec: Preferred alternative that combines session window determination with aggregation
  • UpdatingSessionsIterator: The actual iterator implementation that performs the session window logic
  • StatefulOperatorPartitioning: Used for streaming session window partitioning requirements