Apache Spark™ is a powerful engine for large-scale, real-time data processing.
It offers APIs in Java, Scala, Python and R, can work with Hadoop, Mesos or standalone.
It can access several data sources such as HDFS, Cassandra, HBase and S3.
Let’s take a look at the competition offered by MAIF on datasciences.net : Décodage d’une formule de pricing.
We’ll tackle the problem in Scala, directly in the Spark shell.
We won’t go into setting up the Spark solution, but propose an image Docker to easily launch a cluster.
The code extracts require imports that we won’t go into.
Let’s start by defining the structure of the dataset. For each column, we give the field name, the data type and whether the field can be null:
val customSchema = StructType(Array(
StructField("id", IntegerType, true),
StructField("annee_naissance", IntegerType, true),
StructField("annee_permis", IntegerType, true),
StructField("marque", StringType, true),
StructField("puis_fiscale", IntegerType, true),
StructField("anc_veh", IntegerType, true),
StructField("codepostal", StringType, true),
StructField("energie_veh", StringType, true),
StructField("kmage_annuel", IntegerType, true),
StructField("crm", IntegerType, true),
StructField("profession", StringType, true),
StructField("var1", IntegerType, true),
...
StructField("prime_tot_ttc", DoubleType, true)))
We can then import the data according to this schema. To use these commands, you need to run Spark with the option --packages com.databricks:spark-csv_2.10:1.3.0
.
var df = sqlContext.read.format("com.databricks.spark.csv").
option("header", "true").option("delimiter", ";").schema(customSchema).
load("/path/to/data/ech_apprentissage.csv")
var test = sqlContext.read.format("com.databricks.spark.csv").
option("header", "true").option("delimiter", ";").schema(customSchema).
load("/path/to/data/ech_test.csv")
We then complete the column to be predicted on the test sample, as well as the uncompleted values (na).
test = test.drop("prime_tot_ttc").withColumn("prime_tot_ttc",expr("-999"))
df = df.na.fill(-999).na.fill("EMPTY")
test = test.na.fill(-999).na.fill("EMPTY")
We delete data that we do not consider necessary for our study:
df = df.drop("anc_veh").drop("codepostal")
test = test.drop("anc_veh").drop("codepostal")
Algorithms don’t really like string variables, so we create a list containing the names of fields containing strings so that we can process them separately.
var dfString = df
val exprs = dfString.dtypes.filter(_._2 != "StringType").map(ct => ct._1).toList
exprs.foreach(cname => dfString = dfString.drop(cname))
Then we index all these variables to replace them with numbers (which can be processed by the algorithms).
val stringIndexer: Array[org.apache.spark.ml.PipelineStage] = dfString.columns.map(cname => new StringIndexer().setInputCol(s"${cname}").setOutputCol(s"${cname}_index"))
var stages: Array[org.apache.spark.ml.PipelineStage] = stringIndexer
val pipelinePrepare = new Pipeline().setStages(stages)
val all = df.unionAll(test)
val preparation = pipelinePrepare.fit(all)
We add new columns containing indexes.
df = preparation.transform(df)
test = preparation.transform(test)
We then delete the string fields.
val exprs = df.dtypes.filter(_._2 == "StringType").map(ct => ct._1).toList
exprs.foreach(cname => df = df.drop(cname))
val exprs = test.dtypes.filter(_._2 == "StringType").map(ct => ct._1).toList
exprs.foreach(cname => test = test.drop(cname))