Suppose I have the following dataframe:
var df = Seq(
("2019-09-01", 0.1, 1, "0x0000000000000001", "0x00000001", "True"),
("2019-09-02", 0.2, 2, "0x0000000000000002", "0x00000002", "False"),
("2019-09-03", 0.3, 3, "0x0000000000000003", "0x00000003", "True")
).toDF("Timestamp", "Float", "Integer", "Hex1", "Hex2", "Bool")
I need to run a transformation on the string colums (in this example: Hex1, Hex2 and Bool) and convert them to a numeric value by using some custom logic.
The dataframes are generated by reading CSV files which I don't know the schema. All I know is that they contain a Timestamp column as the first column and then a variable number of columns which might be numeric (integers or doubles/floats) or these hex and boolean values.
I'm thinking this transformation would need to find all the string columns and for each one, run the transformation that will add a new column to the dataframe with the numerical representation of the string. In this case, the hex values would be converted to their decimal representation. And the "True", "False" strings would be converted to 1 and 0 respectively.
Back to the simplified example, I should get a df like this:
|Timestamp |Float|Integer|Hex1 |Hex2 |Bool |
|-----------|-----|-------|------------------|----------|-----|
|2019-09-01 |0.1 |1 |1 |1 |1 |
|2019-09-02 |0.2 |2 |2 |2 |0 |
|2019-09-03"|0.3 |3 |3 |3 |1 |
With all numeric (integer, float or double) columns except for the Timestamp
As per your example use following function:
Use conv standard function to convert hex to appropriate type. conv(num: Column, fromBase: Int, toBase: Int): Column Convert a number in a string column from one base to another.
when(Column condition, Object value): Evaluates a list of conditions and returns one of multiple possible result expressions.
import org.apache.spark.sql.functions.conv
import org.apache.spark.sql.functions._
val s1 = df.
withColumn("Hex1", conv(col("Hex1").substr(lit(3), length(col("Hex1"))), 16, 10) cast IntegerType).
withColumn("Hex2", conv(col("Hex2").substr(lit(3), length(col("Hex2"))), 16, 10) cast IntegerType).
withColumn("Bool", when(col("Bool") === "True", 1)
.otherwise(0))
s1.show()
s1.printSchema()
From your problem definition ie dynamically. If you want to do same task dynamically you have to do extra work.
val list = List(
("Hex", "Hex1"),
("Hex", "Hex2"),
("Bool", "Bool")
)
object Helper {
def convert(columnDetail: (String, String)): Column = {
columnDetail._1 match {
case "Hex" => conv(col(columnDetail._2).substr(lit(3), length(col(columnDetail._2))), 16, 10) cast IntegerType
case "Bool" => when(col(columnDetail._2) === "True", 1).otherwise(0)
// your other case
}
}
}
you can add all the cases and its appropriate implementation.
import spark.implicits._
var df = Seq(
("2019-09-01", 0.1, 1, "0x0000000000000001", "0x00000001", "True"),
("2019-09-02", 0.2, 2, "0x0000000000000002", "0x00000002", "False"),
("2019-09-03", 0.3, 3, "0x0000000000000003", "0x00000003", "True")
).toDF("Timestamp", "Float", "Integer", "Hex1", "Hex2", "Bool")
val list = List(
("Hex", "Hex1"),
("Hex", "Hex2"),
("Bool", "Bool")
)
val temp = list.foldLeft(df) { (tempDF, listValue) =>
tempDF.withColumn(listValue._2, Helper.convert(listValue))
}
temp.show(false)
temp.printSchema()
}
object Helper {
def convert(columnDetail: (String, String)): Column = {
columnDetail._1 match {
case "Hex" => conv(col(columnDetail._2).substr(lit(3), length(col(columnDetail._2))), 16, 10) cast IntegerType
case "Bool" => when(col(columnDetail._2) === "True", 1).otherwise(0)
// your other case
}
}
}
Result:
+----------+-----+-------+----+----+----+
|Timestamp |Float|Integer|Hex1|Hex2|Bool|
+----------+-----+-------+----+----+----+
|2019-09-01|0.1 |1 |1 |1 |1 |
|2019-09-02|0.2 |2 |2 |2 |0 |
|2019-09-03|0.3 |3 |3 |3 |1 |
+----------+-----+-------+----+----+----+
root
|-- Timestamp: string (nullable = true)
|-- Float: double (nullable = false)
|-- Integer: integer (nullable = false)
|-- Hex1: integer (nullable = true)
|-- Hex2: integer (nullable = true)
|-- Bool: integer (nullable = false)
Below is my spark code to do this. I have used conv function of spark sql http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.conv . Also if you want to write a logic to dynamically identify all string columns at run time and perform conversion, it could be done only if you know exactly what kind of conversion you are going to do.
var df = Seq(
("2019-09-01", 0.1, 1, "0x0000000000000001", "0x00000001", "True"),
("2019-09-02", 0.2, 2, "0x0000000000000002", "0x00000002", "False"),
("2019-09-03", 0.3, 3, "0x0000000000000003", "0x00000003", "True")
).toDF("Timestamp", "Float", "Integer", "Hex1", "Hex2", "Bool")
// df.show
df.createOrReplaceTempView("sourceTable")
val finalDF = spark.sql("""
select Timestamp,
Float,
Integer,
conv(substr(Hex1,3),16,10) as Hex1,
conv(substr(Hex2,3),16,10) as Hex2,
case when Bool = "True" then 1
when Bool = "False" then 0
else NULL
end as Bool
from sourceTable
""")
finalDF.show
Result :
+----------+-----+-------+----+----+----+
| Timestamp|Float|Integer|Hex1|Hex2|Bool|
+----------+-----+-------+----+----+----+
|2019-09-01| 0.1| 1| 1| 1| 1|
|2019-09-02| 0.2| 2| 2| 2| 0|
|2019-09-03| 0.3| 3| 3| 3| 1|
+----------+-----+-------+----+----+----+
User contributions licensed under CC BY-SA 3.0