Run a custom transformation on string columns


Suppose I have the following dataframe:

var df = Seq(
  ("2019-09-01", 0.1, 1, "0x0000000000000001", "0x00000001", "True"),
  ("2019-09-02", 0.2, 2, "0x0000000000000002", "0x00000002", "False"),
  ("2019-09-03", 0.3, 3, "0x0000000000000003", "0x00000003", "True")
).toDF("Timestamp", "Float", "Integer", "Hex1", "Hex2", "Bool")

I need to run a transformation on the string colums (in this example: Hex1, Hex2 and Bool) and convert them to a numeric value by using some custom logic.

The dataframes are generated by reading CSV files which I don't know the schema. All I know is that they contain a Timestamp column as the first column and then a variable number of columns which might be numeric (integers or doubles/floats) or these hex and boolean values.

I'm thinking this transformation would need to find all the string columns and for each one, run the transformation that will add a new column to the dataframe with the numerical representation of the string. In this case, the hex values would be converted to their decimal representation. And the "True", "False" strings would be converted to 1 and 0 respectively.

Back to the simplified example, I should get a df like this:

|Timestamp  |Float|Integer|Hex1              |Hex2      |Bool |
|2019-09-01 |0.1  |1      |1                 |1         |1    |
|2019-09-02 |0.2  |2      |2                 |2         |0    |
|2019-09-03"|0.3  |3      |3                 |3         |1    |

With all numeric (integer, float or double) columns except for the Timestamp

asked on Stack Overflow Oct 4, 2019 by empz • edited Oct 4, 2019 by thebluephantom

As per your example use following function:

Use conv standard function to convert hex to appropriate type. conv(num: Column, fromBase: Int, toBase: Int): Column Convert a number in a string column from one base to another.

when(Column condition, Object value): Evaluates a list of conditions and returns one of multiple possible result expressions.

import org.apache.spark.sql.functions.conv
import org.apache.spark.sql.functions._

val s1 = df.
      withColumn("Hex1", conv(col("Hex1").substr(lit(3), length(col("Hex1"))), 16, 10) cast IntegerType).
      withColumn("Hex2", conv(col("Hex2").substr(lit(3), length(col("Hex2"))), 16, 10) cast IntegerType).
      withColumn("Bool", when(col("Bool") === "True", 1)

From your problem definition ie dynamically. If you want to do same task dynamically you have to do extra work.

  1. Create mapping ie column and it's datatype map: This can be abstracted out, you can create your mapping file externally. Can be generated dynamically by reading mapping file.
val list = List(
          ("Hex", "Hex1"),
          ("Hex", "Hex2"),
          ("Bool", "Bool")
  1. create converter using pattern matching :
object Helper {
    def convert(columnDetail: (String, String)): Column = {
      columnDetail._1 match {
        case "Hex" => conv(col(columnDetail._2).substr(lit(3), length(col(columnDetail._2))), 16, 10) cast IntegerType
        case "Bool" => when(col(columnDetail._2) === "True", 1).otherwise(0)
      // your other case 

you can add all the cases and its appropriate implementation.

  1. final solution
    import spark.implicits._
    var df = Seq(
      ("2019-09-01", 0.1, 1, "0x0000000000000001", "0x00000001", "True"),
      ("2019-09-02", 0.2, 2, "0x0000000000000002", "0x00000002", "False"),
      ("2019-09-03", 0.3, 3, "0x0000000000000003", "0x00000003", "True")
    ).toDF("Timestamp", "Float", "Integer", "Hex1", "Hex2", "Bool")

    val list = List(
      ("Hex", "Hex1"),
      ("Hex", "Hex2"),
      ("Bool", "Bool")

    val temp = list.foldLeft(df) { (tempDF, listValue) =>
      tempDF.withColumn(listValue._2, Helper.convert(listValue))


  object Helper {
    def convert(columnDetail: (String, String)): Column = {
      columnDetail._1 match {
        case "Hex" => conv(col(columnDetail._2).substr(lit(3), length(col(columnDetail._2))), 16, 10) cast IntegerType
        case "Bool" => when(col(columnDetail._2) === "True", 1).otherwise(0)
        // your other case 


|Timestamp |Float|Integer|Hex1|Hex2|Bool|
|2019-09-01|0.1  |1      |1   |1   |1   |
|2019-09-02|0.2  |2      |2   |2   |0   |
|2019-09-03|0.3  |3      |3   |3   |1   |

 |-- Timestamp: string (nullable = true)
 |-- Float: double (nullable = false)
 |-- Integer: integer (nullable = false)
 |-- Hex1: integer (nullable = true)
 |-- Hex2: integer (nullable = true)
 |-- Bool: integer (nullable = false)
answered on Stack Overflow Oct 4, 2019 by maogautam

Below is my spark code to do this. I have used conv function of spark sql . Also if you want to write a logic to dynamically identify all string columns at run time and perform conversion, it could be done only if you know exactly what kind of conversion you are going to do.

var df = Seq(
  ("2019-09-01", 0.1, 1, "0x0000000000000001", "0x00000001", "True"),
  ("2019-09-02", 0.2, 2, "0x0000000000000002", "0x00000002", "False"),
  ("2019-09-03", 0.3, 3, "0x0000000000000003", "0x00000003", "True")
).toDF("Timestamp", "Float", "Integer", "Hex1", "Hex2", "Bool")

val finalDF = spark.sql("""
select  Timestamp,
        conv(substr(Hex1,3),16,10) as Hex1,
        conv(substr(Hex2,3),16,10) as Hex2,
        case when Bool = "True" then 1 
             when Bool = "False" then 0 
             else NULL 
        end as Bool  
from sourceTable

Result :

| Timestamp|Float|Integer|Hex1|Hex2|Bool|
|2019-09-01|  0.1|      1|   1|   1|   1|
|2019-09-02|  0.2|      2|   2|   2|   0|
|2019-09-03|  0.3|      3|   3|   3|   1|
answered on Stack Overflow Oct 4, 2019 by Manish • edited Oct 4, 2019 by Manish

