Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ package za.co.absa.cobrix.spark.cobol.utils
import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.SparkContext
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
import org.apache.spark.sql.{Column, DataFrame, Row, SparkSession}
import za.co.absa.cobrix.cobol.internal.Logging
import za.co.absa.cobrix.spark.cobol.parameters.MetadataFields.MAX_ELEMENTS
import za.co.absa.cobrix.spark.cobol.utils.impl.HofsWrapper.transform
Expand Down Expand Up @@ -550,5 +551,48 @@ object SparkUtils extends Logging {
fields.toList
}

/**
* A UDF that receives the entire record as a [[Row]] and returns a
* human-readable string representation of its contents.
*
* Usage (after columns are combined into a struct):
* {{{
* df.withColumn("record_dump", printRowUdf(struct(df.columns.map(col): _*)))
* }}}
*/
val printRowUdf: UserDefinedFunction = udf { row: Row =>
def rowToString(r: Row): String = {
val schema = r.schema
val fields = schema.fields.zipWithIndex.map { case (field, idx) =>
val value = if (r.isNullAt(idx)) {
"null"
} else {
r.get(idx) match {
case nestedRow: Row =>
s"{${rowToString(nestedRow)}}"
case seq: Seq[_] =>
val items = seq.map {
case nestedRow: Row => s"{${rowToString(nestedRow)}}"
case other => String.valueOf(other)
}
s"[${items.mkString(", ")}]"
case other =>
String.valueOf(other)
}
}
s"${field.name}=$value"
}
fields.mkString(", ")
}

if (row == null) {
null
} else {
val result = rowToString(row)
// Side-effect: print to stdout so the content is visible during tests
//println(s"[printRowUdf] $result")
result
}
}

}

This file was deleted.

Loading