Creating a custom operator typically requires at least:
- Logical operator, which will be created by the user code
- Physical operator, which will perform the actual transformation
- Strategy class that converts logical operator to the physical one
- Registration of the strategy class with Spark SQL session
https://medium.com/@vladimir.prus/advanced-custom-operators-in-spark-79b12da61ca7 Writing Spark Native Functions (Catalyst Expressions) - neapowers
Code Generation in operators
case class MillisToTs(millis: Expression)
extends UnaryExpression with ExpectsInputTypes {
override def child: Expression = millis
override def inputTypes: Seq[AbstractDataType] = Seq(LongType)
override def dataType: DataType = TimestampType
override protected def doGenCode(ctx: CodegenContext,
ev: ExprCode): ExprCode = {
defineCodeGen(ctx, ev, c => s"$c * 1000")
}
}def millis_to_ts(c: Column) = new Column(MillisToTs(c.expr))
// With that in place, converting milliseconds to Timestamp is as easy as
df.withColumn("event_ts", millis_to_ts($"event_ts_ms"))