Creating a custom operator typically requires at least:

  • Logical operator, which will be created by the user code
  • Physical operator, which will perform the actual transformation
  • Strategy class that converts logical operator to the physical one
  • Registration of the strategy class with Spark SQL session

https://medium.com/@vladimir.prus/advanced-custom-operators-in-spark-79b12da61ca7 Writing Spark Native Functions (Catalyst Expressions) - neapowers

Code Generation in operators

case class MillisToTs(millis: Expression) 
extends UnaryExpression with ExpectsInputTypes {
 
  override def child: Expression = millis
 
  override def inputTypes: Seq[AbstractDataType] = Seq(LongType) 
  override def dataType: DataType = TimestampType
 
  override protected def doGenCode(ctx: CodegenContext, 
                                   ev: ExprCode): ExprCode = {
    defineCodeGen(ctx, ev, c => s"$c * 1000")
  }
}
def millis_to_ts(c: Column) = new Column(MillisToTs(c.expr))
// With that in place, converting milliseconds to Timestamp is as easy as
 
df.withColumn("event_ts", millis_to_ts($"event_ts_ms"))