Feature Engineering with Apache Spark and Optimus

--

https://hioptimus.com

When we talk about Feature Engineering we refer to creating new features from your existing ones to improve model performance. Sometimes this is the case, or sometimes you need to do it because a certain model doesn’t recognize the data as you have it, so these transformations let you run most of Machine and Deep Learning algorithms.

Now with Optimus we have made easy the process of Feature Engineering for Big Data.

To install Optimus you just need to do:

$ pip install optimuspyspark

These methods are part of the DataFrameTransformer, and they are a high level of abstraction for Spark Feature Engineering methods. You’ll see how easy it is to prepare your data with Optimus for Machine Learning.

How to do Feature Engineering with Optimus?

String to index:

This method maps a string column of labels to an ML column of label indices. If the input column is numeric, we cast it to string and index the string values.

input_cols argument receives a list of columns to be indexed.

Let’s start by creating a DataFrame with Optimus.

# Importing Optimus
import optimus as op
#Importing utilities
tools = op.Utilities()

# Creating DF with Optimus
data = [('Japan', 'Tokyo', 37800000),('USA', 'New York', 19795791),('France', 'Paris', 12341418),
('Spain','Madrid',6489162)]
df = tools.create_data_frame(data, ["country", "city", "population"])

# Instantiating transformer
transformer = op.DataFrameTransformer(df)

# Show DF
transformer.show()
+-------+--------+----------+
|country| city|population|
+-------+--------+----------+
| Japan| Tokyo| 37800000|
+-------+--------+----------+
| USA|New York| 19795791|
+-------+--------+----------+
| France| Paris| 12341418|
+-------+--------+----------+
| Spain| Madrid| 6489162|
+-------+--------+----------+

To index the sttrings in the country and city column we just need to do

# Indexing columns 'city" and 'country'
transformer.string_to_index(["city", "country"])

# Show indexed DF
transformer.show()
+-------+--------+----------+----------+-------------+
|country| city|population|city_index|country_index|
+-------+--------+----------+----------+-------------+
| Japan| Tokyo| 37800000| 1.0| 1.0|
+-------+--------+----------+----------+-------------+
| USA|New York| 19795791| 2.0| 3.0|
+-------+--------+----------+----------+-------------+
| France| Paris| 12341418| 3.0| 2.0|
+-------+--------+----------+----------+-------------+
| Spain| Madrid| 6489162| 0.0| 0.0|
+-------+--------+----------+----------+-------------+

Index to String:

This method maps a column of indices back to a new column of corresponding string values. The index-string mapping is either from the ML (Spark) attributes of the input column, or from user-supplied labels (which take precedence over ML attributes).

Let’s go back to strings with the DataFrame we created in the last step.

# Importing Optimus
import optimus as op
#Importing utilities
tools = op.Utilities()

# Instantiating transformer
transformer = op.DataFrameTransformer(df)
# Indexing columns 'city" and 'country'
transformer.string_to_index(["city", "country"])
# Going back to strings from index
transformer.index_to_string(["country_index"])

# Show DF with column "county_index" back to string
transformer.get_data_frame.select("country","country_index", "country_index_string").show()
+-------+-------------+----------+----------+
|country|country_index| country_index_string|
+-------+-------------+---------------------+
| Japan| 1.0| Japan|
+-------+-------------+---------------------+
| USA| 3.0| USA|
+-------+-------------+---------------------+
| France| 2.0| France|
+-------+-------------+---------------------+
| Spain| 0.0| Spain|
+-------+-------------+---------------------+

One Hot Encoder:

One hot encoding is a process by which categorical variables are converted into a form that could be provided to ML algorithms to do a better job in prediction.

This method maps a column of label indices to a column of binary vectors, with at most a single one-value.

Let’s create a sample dataframe to see what OHE does:

# Importing Optimus
import optimus as op
#Importing utilities
tools = op.Utilities()

# Creating DataFrame
data = [
(0, "a"),
(1, "b"),
(2, "c"),
(3, "a"),
(4, "a"),
(5, "c")
]
df = tools.create_data_frame(data,["id", "category"])

# Instantiating the transformer
transformer = op.DataFrameTransformer(df)

# One Hot Encoding
transformer.one_hot_encoder(["id"])

# Show encoded dataframe
transformer.show()
+---+--------+-------------+
| id|category| id_encoded|
+---+--------+-------------+
| 0| a|(5,[0],[1.0])|
+---+--------+-------------+
| 1| b|(5,[1],[1.0])|
+---+--------+-------------+
| 2| c|(5,[2],[1.0])|
+---+--------+-------------+
| 3| a|(5,[3],[1.0])|
+---+--------+-------------+
| 4| a|(5,[4],[1.0])|
+---+--------+-------------+
| 5| c| (5,[],[])|
+---+--------+-------------+

SQL Transformations:

This method implements the transformations which are defined by SQL statement. Spark only support SQL syntax like “SELECT … FROM __THIS__ …” where “__THIS__” represents the underlying table of the input dataframe. Thank Spark for this amazing function.

Let’s create a sample DataFrame to test this function.

# Importing Optimus
import optimus as op
#Importing utilities
tools = op.Utilities()

# Creating DataFrame
data = [
(0, 1.0, 3.0),
(2, 2.0, 5.0)
]

df = tools.create_data_frame(data,["id", "v1", "v2"])

# Instantiating the transformer
transformer = op.DataFrameTransformer(df)

This dataframe is just this:

+---+---+---+
| id| v1| v2|
+---+---+---+
| 0|1.0|3.0|
+---+---+---+
| 2|2.0|5.0|
+---+---+---+

Now let’s create two new columns from these ones. The first will be the sum of the columns v1 and v2, and the second one will be the multiplication of this two columns. With the sql() function we just need to pass the sql expression and use at the end FROM __THIS__ that will be the underlying table of the input dataframe.

transformer.sql("SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")

And this will output:

+---+---+---+---+----+---+----+
| id| v1| v2| v3| v4| v3| v4|
+---+---+---+---+----+---+----+
| 0|1.0|3.0|4.0| 3.0|4.0| 3.0|
+---+---+---+---+----+---+----+
| 2|2.0|5.0|7.0|10.0|7.0|10.0|
+---+---+---+---+----+---+----+

Vector Assembler:

This method combines a given list of columns into a single vector column.

This is very important because lots of Machine Learning algorithms in Spark need this format to work.

Let’s create a sample dataframe to see what vector assembler does:

# Importing Optimus
import optimus as op
#Importing utilities
tools = op.Utilities()
# Import Vectors
from pyspark.ml.linalg import Vectors

# Creating DataFrame
data = [(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0)]

df = tools.create_data_frame(data,["id", "hour", "mobile", "user_features", "clicked"]

# Instantiating the transformer
transformer = op.DataFrameTransformer(df)

# Assemble features
transformer.vector_assembler(["hour", "mobile", "userFeatures"])

# Show assembled df
print("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
transform.get_data_frame.select("features", "clicked").show(truncate=False)

This will output:

+-----------------------+-------+
|features |clicked|
+-----------------------+-------+
|[18.0,1.0,0.0,10.0,0.5]|1.0 |
+-----------------------+-------+

You can compare now how easy it is to do Feature Engineering with Optimus and with other frameworks.

Contributors:

License:

Apache 2.0 © Iron.

--

--

Data scientist, physicist and computer engineer. Love sharing ideas, thoughts and contributing to Open Source in Machine Learning and Deep Learning ;).