目录[-]

Feature Transformers

Tokenizer

Tokenization(分词)是将文本(如句子)分解成单个词(通常是单词)的过程。一个简单的Tokenizer类提供了这个功能。下面的例子展示了如何将句子拆分成单词序列。

RegexTokenizer允许更高级的基于正则表达式(正则表达式)匹配的分词。默认情况下,使用参数“pattern”(正则表达式,默认:"\s+")作为分隔符来分割输入文本。或者,用户可以将参数“gaps”设置为false,指示正则表达式“pattern”而不是分割间隙来表示“tokens”,并查找所有匹配事件作为分词结果。

Examples

有关API的更多详细信息,请参阅Tokenizer Python文档RegexTokenizer Python文档

from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("TokenizerExample").getOrCreate()
sentenceDataFrame = spark.createDataFrame([
    (0, "Hi I heard about Spark"),
    (1, "I wish Java could use case classes"),
    (2, "Logistic,regression,models,are,neat")
], ["id", "sentence"])

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")

regexTokenizer = RegexTokenizer(inputCol="sentence", outputCol="words", pattern="\\W")
# alternatively, pattern="\\w+", gaps(False)

countTokens = udf(lambda words: len(words), IntegerType())

tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.select("sentence", "words")\
    .withColumn("tokens", countTokens(col("words"))).show(truncate=False)

regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("sentence", "words") \
    .withColumn("tokens", countTokens(col("words"))).show(truncate=False)
spark.stop()

output:

+-----------------------------------+------------------------------------------+------+
|sentence                           |words                                     |tokens|
+-----------------------------------+------------------------------------------+------+
|Hi I heard about Spark             |[hi, i, heard, about, spark]              |5     |
|I wish Java could use case classes |[i, wish, java, could, use, case, classes]|7     |
|Logistic,regression,models,are,neat|[logistic,regression,models,are,neat]     |1     |
+-----------------------------------+------------------------------------------+------+

+-----------------------------------+------------------------------------------+------+
|sentence                           |words                                     |tokens|
+-----------------------------------+------------------------------------------+------+
|Hi I heard about Spark             |[hi, i, heard, about, spark]              |5     |
|I wish Java could use case classes |[i, wish, java, could, use, case, classes]|7     |
|Logistic,regression,models,are,neat|[logistic, regression, models, are, neat] |5     |
+-----------------------------------+------------------------------------------+------+

Find full example code at "examples/src/main/python/ml/tokenizer_example.py" in the Spark repo.

StopWordsRemover

Stop words(停止词)是应该从输入中排除的词,通常是因为这些词经常出现而又不具有如此多的含义。

StopWordsRemover将一串字符串(例如一个Tokenizer的输出)作为输入,并从输入序列中删除所有的停止词。停用词表由stopWords参数指定。某些语言的默认停用词可通过调用访问StopWordsRemover.loadDefaultStopWords(language),可用的选项有“danish”, “dutch”, “english”, “finnish”, “french”, “german”, “hungarian”, “italian”, “norwegian”, “portuguese”, “russian”, “spanish”, “swedish” and “turkish”。布尔参数caseSensitive指示匹配是否区分大小写(默认为false)。

Examples

假设我们有列如下数据帧,拥有列id和raw:

 id | raw
----|----------
 0  | [I, saw, the, red, baloon]
 1  | [Mary, had, a, little, lamb]

应用StopWordsRemover与raw作为输入列,filtered作为输出列,我们应该得到以下:

 id | raw                         | filtered
----|-----------------------------|--------------------
 0  | [I, saw, the, red, baloon]  |  [saw, red, baloon]
 1  | [Mary, had, a, little, lamb]|[Mary, little, lamb]

在这里filtered,“I”,“the”,“had”和“a”这些停用词语已被滤除。\ 有关API的更多详细信息,请参阅StopWordsRemover Python文档

from pyspark.ml.feature import StopWordsRemover

sentenceData = spark.createDataFrame([
    (0, ["I", "saw", "the", "red", "balloon"]),
    (1, ["Mary", "had", "a", "little", "lamb"])
], ["id", "raw"])

remover = StopWordsRemover(inputCol="raw", outputCol="filtered")
remover.transform(sentenceData).show(truncate=False)

Find full example code at "examples/src/main/python/ml/stopwords_remover_example.py" in the Spark repo.

n-gram

一个n-gram是一个包含整数n个tokens(通常是单词)的序列。NGram类可用于输入特征转变成n-grams。

NGram将一串字符串(例如一个Tokenizer的输出)作为输入。参数n用于确定每个n-gram中的terms的数量。输出将由n-grams的序列组成,每个n-gram由空格分隔的n个连续的words的字符串表示。如果输入序列少于n,则没有输出。

Examples

有关API的更多细节,请参阅NGram Python文档

from pyspark.ml.feature import NGram
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("n_gramExample").getOrCreate()
wordDataFrame = spark.createDataFrame([
    (0, ["Hi", "I", "heard", "about", "Spark"]),
    (1, ["I", "wish", "Java", "could", "use", "case", "classes"]),
    (2, ["Logistic", "regression", "models", "are", "neat"])
], ["id", "words"])

ngram = NGram(n=2, inputCol="words", outputCol="ngrams")

ngramDataFrame = ngram.transform(wordDataFrame)
ngramDataFrame.select("ngrams").show(truncate=False)
spark.stop()

output:

+------------------------------------------------------------------+
|ngrams                                                            |
+------------------------------------------------------------------+
|[Hi I, I heard, heard about, about Spark]                         |
|[I wish, wish Java, Java could, could use, use case, case classes]|
|[Logistic regression, regression models, models are, are neat]    |
+------------------------------------------------------------------+

Find full example code at "examples/src/main/python/ml/n_gram_example.py" in the Spark repo.

Binarizer

Binarization(二值化)是将数字特征阈值化为二进制(0/1)特征的过程。

Binarizer需传入参数inputCol和outputCol,以及所述threshold参数来进行二值化。大于阈值的特征值被二进制化为1.0; 等于或小于阈值的值被二值化为0.0。inputCol支持Vector和Double类型。

Examples

有关API的更多细节,请参阅Binarizer Python文档

from pyspark.ml.feature import Binarizer
from pyspark.sql import SparkSession  

spark = SparkSession.builder.appName("BinarizerExample").getOrCreate()
continuousDataFrame = spark.createDataFrame([
    (0, 0.1),
    (1, 0.8),
    (2, 0.2)
], ["id", "feature"])

binarizer = Binarizer(threshold=0.5, inputCol="feature", outputCol="binarized_feature")

binarizedDataFrame = binarizer.transform(continuousDataFrame)

print("Binarizer output with Threshold = %f" % binarizer.getThreshold())
binarizedDataFrame.show()
spark.stop()

output:

Binarizer output with Threshold = 0.500000
+---+-------+-----------------+
| id|feature|binarized_feature|
+---+-------+-----------------+
|  0|    0.1|              0.0|
|  1|    0.8|              1.0|
|  2|    0.2|              0.0|
+---+-------+-----------------+

Find full example code at "examples/src/main/python/ml/binarizer_example.py" in the Spark repo.

PCA

PCA是一个统计过程,它使用正交变换将一组可能相关的变量的观察值转换成一组称为主成分的线性不相关变量的值。一个PCA类使用PCA将向量映射到低维空间来训练一个模型。下面的例子显示了如何将五维特征向量投影到三维主成分中。

Examples

有关API的更多细节,请参阅PCA Python文档

from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors
from pyspark.sql import SparkSession  
spark = SparkSession.builder.appName("PCA_Example").getOrCreate()
data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]),),
        (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
        (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
df = spark.createDataFrame(data, ["features"])

pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(df)

result = model.transform(df).select("pcaFeatures")
result.show(truncate=False)
spark.stop()

output:

+-----------------------------------------------------------+
|pcaFeatures                                                |
+-----------------------------------------------------------+
|[1.6485728230883807,-4.013282700516296,-5.524543751369388] |
|[-4.645104331781534,-1.1167972663619026,-5.524543751369387]|
|[-6.428880535676489,-5.337951427775355,-5.524543751369389] |
+-----------------------------------------------------------+

Find full example code at "examples/src/main/python/ml/pca_example.py" in the Spark repo.

PolynomialExpansion

Polynomial expansion(多项式展开)是将特征扩展到一个多项式空间的过程,这个多项式空间是由原始维度的n-degree组合形成的。一个PolynomialExpansion类提供此功能。下面的例子展示了如何将特征扩展到一个三次多项式空间。

Examples

有关API的更多详细信息,请参阅PolynomialExpansion Python文档

from pyspark.ml.feature import PolynomialExpansion
from pyspark.ml.linalg import Vectors
from pyspark.sql import SparkSession  

spark = SparkSession.builder.appName("PolynormialExpansionExample").getOrCreate()
df = spark.createDataFrame([
    (Vectors.dense([2.0, 1.0]),),
    (Vectors.dense([0.0, 0.0]),),
    (Vectors.dense([3.0, -1.0]),)
], ["features"])

polyExpansion = PolynomialExpansion(degree=3, inputCol="features", outputCol="polyFeatures")
polyDF = polyExpansion.transform(df)

polyDF.show(truncate=False)
spark.stop()

output:

+----------+------------------------------------------+
|features  |polyFeatures                              |
+----------+------------------------------------------+
|[2.0,1.0] |[2.0,4.0,8.0,1.0,2.0,4.0,1.0,2.0,1.0]     |
|[0.0,0.0] |[0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0]     |
|[3.0,-1.0]|[3.0,9.0,27.0,-1.0,-3.0,-9.0,1.0,3.0,-1.0]|
+----------+------------------------------------------+

Find full example code at "examples/src/main/python/ml/polynomial_expansion_example.py" in the Spark repo.

Discrete Cosine Transform(DCT)

Discrete Cosine Transform离散余弦变换将时域中的长度为N的实数序列转换为另一个频域中长度为N的实数序列。一个DCT类提供此功能,实现 DCT-II 和通过缩放结果1/sqrt(2)倍使得变换的表示矩阵是单一的。被应用于变换的序列是无偏移的(例如变换的序列的第0th个元素是 第0th 个DCT系数而不是N/2个)。

Examples

有关API的更多详细信息,请参阅DCT Python文档

from pyspark.ml.feature import DCT
from pyspark.ml.linalg import Vectors
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DCT_Example").getOrCreate()
df = spark.createDataFrame([
    (Vectors.dense([0.0, 1.0, -2.0, 3.0]),),
    (Vectors.dense([-1.0, 2.0, 4.0, -7.0]),),
    (Vectors.dense([14.0, -2.0, -5.0, 1.0]),)], ["features"])

dct = DCT(inverse=False, inputCol="features", outputCol="featuresDCT")

dctDf = dct.transform(df)

dctDf.select("featuresDCT").show(truncate=False)
spark.stop()

output:

+----------------------------------------------------------------+
|featuresDCT                                                     |
+----------------------------------------------------------------+
|[1.0,-1.1480502970952693,2.0000000000000004,-2.7716385975338604]|
|[-1.0,3.378492794482933,-7.000000000000001,2.9301512653149677]  |
|[4.0,9.304453421915744,11.000000000000002,1.5579302036357163]   |
+----------------------------------------------------------------+

Find full example code at "examples/src/main/python/ml/dct_example.py" in the Spark repo.

StringIndexer

StringIndexer将一串字符串标签编码为标签索引。这些索引范围为[0, numLabels)按照标签频率排序,因此最频繁的标签获得索引0。对于unseen的标签如果用户选择保留它们,它们将被放在索引numLabels处。如果输入列是数字,我们将其转换为字符串值并将其索引。当下游管道组件(例如Estimator或 Transformer)使用此字符串索引标签时,必须将组件的输入列设置为此字符串索引列名称。在许多情况下,您可以使用setInputCol设置输入列

from pyspark.ml.feature import StringIndexer
from pyspark.sql import SparkSession  
spark = SparkSession.builder.appName("StringIndexerExample").getOrCreate()
df = spark.createDataFrame(
    [(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
    ["id", "category"])

indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
indexed = indexer.fit(df).transform(df)
indexed.show()
spark.stop()

output:

+---+--------+-------------+
| id|category|categoryIndex|
+---+--------+-------------+
|  0|       a|          0.0|
|  1|       b|          2.0|
|  2|       c|          1.0|
|  3|       a|          0.0|
|  4|       a|          0.0|
|  5|       c|          1.0|
+---+--------+-------------+

此外,StringIndexer处理看不见的标签还有三个策略: 1. 抛出一个异常(默认的) 2. 完全跳过unseen标签的行 3. 把一个unseen的标签放在一个特殊的额外桶里,在索引numLabels处

让我们回到之前的例子:

 id | category
----|----------
 0  | a
 1  | b
 2  | c
 3  | d
 4  | e

如果你没有设置如何StringIndexer处理看不见的标签或将其设置为“错误”,则会抛出异常。但是,如果您已经调用setHandleInvalid("skip"),则会生成以下数据集:

 id | category | categoryIndex
----|----------|---------------
 0  | a        | 0.0
 1  | b        | 2.0
 2  | c        | 1.0

请注意,包含“d”或“e”的行不显示。

如果你调用setHandleInvalid("keep"),将生成以下数据集:

 id | category | categoryIndex
----|----------|---------------
 0  | a        | 0.0
 1  | b        | 2.0
 2  | c        | 1.0
 3  | d        | 3.0
 4  | e        | 3.0
 # d,e 所在的被映射到索引“3.0”

Find full example code at "examples/src/main/python/ml/string_indexer_example.py" in the Spark repo.

IndexToString

对应于StringIndexer,IndexToString将一列标签索引映射回包含作为字符串的原始标签的列。一个常见的用例是从StringIndexer标签生成索引,用这些索引对模型进行训练,并从预测IndexToString索引列中检索原始标签。然而,你也可以提供自己的标签。

Examples

构造tringIndexer例子,假设我们有一个如下的数据帧,其有id和categoryIndex列:

 id | categoryIndex
----|---------------
 0  | 0.0
 1  | 2.0
 2  | 1.0
 3  | 0.0
 4  | 0.0
 5  | 1.0

将categoryIndex作为输入列,应用IndexToString, originalCategory作为输出列,我们能够检索我们的原始标签(他们将从列的元数据推断):

有关API的更多详细信息,请参阅IndexToString Python文档

from pyspark.ml.feature import IndexToString, StringIndexer
from pyspark.sql import SparkSession  
spark = SparkSession.builder.appName("IndexToStringExample").getOrCreate()

df = spark.createDataFrame(
    [(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
    ["id", "category"])

indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = indexer.fit(df)
indexed = model.transform(df)

print("Transformed string column '%s' to indexed column '%s'"
      % (indexer.getInputCol(), indexer.getOutputCol()))
indexed.show()

print("StringIndexer will store labels in output column metadata\n")

converter = IndexToString(inputCol="categoryIndex", outputCol="originalCategory")
converted = converter.transform(indexed)

print("Transformed indexed column '%s' back to original string column '%s' using "
      "labels in metadata" % (converter.getInputCol(), converter.getOutputCol()))
converted.select("id", "categoryIndex", "originalCategory").show()
spark.stop()

output:

Transformed string column 'category' to indexed column 'categoryIndex'
+---+--------+-------------+
| id|category|categoryIndex|
+---+--------+-------------+
|  0|       a|          0.0|
|  1|       b|          2.0|
|  2|       c|          1.0|
|  3|       a|          0.0|
|  4|       a|          0.0|
|  5|       c|          1.0|
+---+--------+-------------+

StringIndexer will store labels in output column metadata

Transformed indexed column 'categoryIndex' back to original string column 'originalCategory' using labels in metadata
+---+-------------+----------------+
| id|categoryIndex|originalCategory|
+---+-------------+----------------+
|  0|          0.0|               a|
|  1|          2.0|               b|
|  2|          1.0|               c|
|  3|          0.0|               a|
|  4|          0.0|               a|
|  5|          1.0|               c|
+---+-------------+----------------+

Find full example code at "examples/src/main/python/ml/index_to_string_example.py" in the Spark repo.

OneHotEncoding

One-hot encoding将一列标签索引映射到一列二进制向量,其中最多只有一个one-value。该编码允许那些期望使用连续特征的算法(例如Logistic回归)使用分类特征。

Examples

关于 API的更多细节请参考OneHotEncoder Python文档

from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.sql import SparkSession  
spark = SparkSession.builder.appName("OneHotEncoderExample").getOrCreate()
df = spark.createDataFrame([
    (0, "a"),
    (1, "b"),
    (2, "c"),
    (3, "a"),
    (4, "a"),
    (5, "c")
], ["id", "category"])

stringIndexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = stringIndexer.fit(df)
indexed = model.transform(df)

encoder = OneHotEncoder(inputCol="categoryIndex", outputCol="categoryVec")
encoded = encoder.transform(indexed)
encoded.show()
spark.stop()

output:

+---+--------+-------------+-------------+
| id|category|categoryIndex|  categoryVec|
+---+--------+-------------+-------------+
|  0|       a|          0.0|(2,[0],[1.0])|
|  1|       b|          2.0|    (2,[],[])|
|  2|       c|          1.0|(2,[1],[1.0])|
|  3|       a|          0.0|(2,[0],[1.0])|
|  4|       a|          0.0|(2,[0],[1.0])|
|  5|       c|          1.0|(2,[1],[1.0])|
+---+--------+-------------+-------------+

Find full example code at "examples/src/main/python/ml/onehot_encoder_example.py" in the Spark repo.

VectorIndexer

VectorIndexer有助于索引Vectors的数据集中的分类特征。它可以自动决定哪些特征是分类的,并将原始值转换为分类索引。具体来说,它做了以下几点:

  1. 取一个Vector类型的输入列和一个参数maxCategories。
  2. 根据不同值的数量确定哪些特征应该分类,这些特征最多被分为maxCategories类。
  3. 计算每个分类特征的分类索引(0-based)。
  4. 索引分类特征并将原始特征值转换为索引。

索引分类特征允许Decision Trees(决策树)和Tree Ensembles等算法适当地处理分类特征,提高性能。

Examples 在下面的例子中,我们读入一个标记点​​的数据集,然后用VectorIndexer来决定哪些特征应该被视为分类特征。我们将分类特征值转换为它们的索引。这个转换的数据然后可以被传递给诸如DecisionTreeRegressor处理分类特征的算法。

请参阅VectorIndexer Python文档 以获取有关API的更多详细信息。

from pyspark.ml.feature import VectorIndexer
from pyspark.sql import SparkSession  

spark = SparkSession.builder.appName("VectorIndexerExample").getOrCreate()
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

indexer = VectorIndexer(inputCol="features", outputCol="indexed", maxCategories=10)
indexerModel = indexer.fit(data)

categoricalFeatures = indexerModel.categoryMaps
print("Chose %d categorical features: %s" %
      (len(categoricalFeatures), ", ".join(str(k) for k in categoricalFeatures.keys())))

# Create new column "indexed" with categorical values transformed to indices
indexedData = indexerModel.transform(data)
indexedData.show()
spark.stop()

output:

Chose 351 categorical features: 645, 69, 365, 138, 101, 479, 333, 249, 0, 555, 666, 88, 170, 115, 276, 308, 5, 449, 120, 247, 614, 677, 202, 10, 56, 533, 142, 500, 340, 670, 174, 42, 417, 24, 37, 25, 257, 389, 52, 14, 504, 110, 587, 619, 196, 559, 638, 20, 421, 46, 93, 284, 228, 448, 57, 78, 29, 475, 164, 591, 646, 253, 106, 121, 84, 480, 147, 280, 61, 221, 396, 89, 133, 116, 1, 507, 312, 74, 307, 452, 6, 248, 60, 117, 678, 529, 85, 201, 220, 366, 534, 102, 334, 28, 38, 561, 392, 70, 424, 192, 21, 137, 165, 33, 92, 229, 252, 197, 361, 65, 97, 665, 583, 285, 224, 650, 615, 9, 53, 169, 593, 141, 610, 420, 109, 256, 225, 339, 77, 193, 669, 476, 642, 637, 590, 679, 96, 393, 647, 173, 13, 41, 503, 134, 73, 105, 2, 508, 311, 558, 674, 530, 586, 618, 166, 32, 34, 148, 45, 161, 279, 64, 689, 17, 149, 584, 562, 176, 423, 191, 22, 44, 59, 118, 281, 27, 641, 71, 391, 12, 445, 54, 313, 611, 144, 49, 335, 86, 672, 172, 113, 681, 219, 419, 81, 230, 362, 451, 76, 7, 39, 649, 98, 616, 477, 367, 535, 103, 140, 621, 91, 66, 251, 668, 198, 108, 278, 223, 394, 306, 135, 563, 226, 3, 505, 80, 167, 35, 473, 675, 589, 162, 531, 680, 255, 648, 112, 617, 194, 145, 48, 557, 690, 63, 640, 18, 282, 95, 310, 50, 67, 199, 673, 16, 585, 502, 338, 643, 31, 336, 613, 11, 72, 175, 446, 612, 143, 43, 250, 231, 450, 99, 363, 556, 87, 203, 671, 688, 104, 368, 588, 40, 304, 26, 258, 390, 55, 114, 171, 139, 418, 23, 8, 75, 119, 58, 667, 478, 536, 82, 620, 447, 36, 168, 146, 30, 51, 190, 19, 422, 564, 305, 107, 4, 136, 506, 79, 195, 474, 664, 532, 94, 283, 395, 332, 528, 644, 47, 15, 163, 200, 68, 62, 277, 691, 501, 90, 111, 254, 227, 337, 122, 83, 309, 560, 639, 676, 222, 592, 364, 100
+-----+--------------------+--------------------+
|label|            features|             indexed|
+-----+--------------------+--------------------+
|  0.0|(692,[127,128,129...|(692,[127,128,129...|
|  1.0|(692,[158,159,160...|(692,[158,159,160...|
|  1.0|(692,[124,125,126...|(692,[124,125,126...|
|  1.0|(692,[152,153,154...|(692,[152,153,154...|
|  1.0|(692,[151,152,153...|(692,[151,152,153...|
|  0.0|(692,[129,130,131...|(692,[129,130,131...|
|  1.0|(692,[158,159,160...|(692,[158,159,160...|
|  1.0|(692,[99,100,101,...|(692,[99,100,101,...|
|  0.0|(692,[154,155,156...|(692,[154,155,156...|
|  0.0|(692,[127,128,129...|(692,[127,128,129...|
|  1.0|(692,[154,155,156...|(692,[154,155,156...|
|  0.0|(692,[153,154,155...|(692,[153,154,155...|
|  0.0|(692,[151,152,153...|(692,[151,152,153...|
|  1.0|(692,[129,130,131...|(692,[129,130,131...|
|  0.0|(692,[154,155,156...|(692,[154,155,156...|
|  1.0|(692,[150,151,152...|(692,[150,151,152...|
|  0.0|(692,[124,125,126...|(692,[124,125,126...|
|  0.0|(692,[152,153,154...|(692,[152,153,154...|
|  1.0|(692,[97,98,99,12...|(692,[97,98,99,12...|
|  1.0|(692,[124,125,126...|(692,[124,125,126...|
+-----+--------------------+--------------------+
only showing top 20 rows

Find full example code at "examples/src/main/python/ml/vector_indexer_example.py" in the Spark repo.

Interaction

Interaction是一个Transformer,采用向量或双值列的方法生成一个单一的向量列,其中包含每个输入列的一个值的所有组合的乘积。

例如,如果您有两个向量类型列,每个列都有三个维度作为输入列,那么您将获得一个9维向量作为输出列。

Examples

假设我们有以下DataFrame,有列“id1”,“vec1”和“vec2”:

  id1|vec1          |vec2          
  ---|--------------|--------------
  1  |[1.0,2.0,3.0] |[8.0,4.0,5.0] 
  2  |[4.0,3.0,8.0] |[7.0,9.0,8.0] 
  3  |[6.0,1.0,9.0] |[2.0,3.0,6.0] 
  4  |[10.0,8.0,6.0]|[9.0,4.0,5.0] 
  5  |[9.0,2.0,7.0] |[10.0,7.0,3.0]
  6  |[1.0,1.0,4.0] |[2.0,8.0,4.0]   

应用Interaction作用于这些输入列,然后interactedCol输出列包含:

  id1|vec1          |vec2          |interactedCol                                         
  ---|--------------|--------------|------------------------------------------------------
  1  |[1.0,2.0,3.0] |[8.0,4.0,5.0] |[8.0,4.0,5.0,16.0,8.0,10.0,24.0,12.0,15.0]            
  2  |[4.0,3.0,8.0] |[7.0,9.0,8.0] |[56.0,72.0,64.0,42.0,54.0,48.0,112.0,144.0,128.0]     
  3  |[6.0,1.0,9.0] |[2.0,3.0,6.0] |[36.0,54.0,108.0,6.0,9.0,18.0,54.0,81.0,162.0]        
  4  |[10.0,8.0,6.0]|[9.0,4.0,5.0] |[360.0,160.0,200.0,288.0,128.0,160.0,216.0,96.0,120.0]
  5  |[9.0,2.0,7.0] |[10.0,7.0,3.0]|[450.0,315.0,135.0,100.0,70.0,30.0,350.0,245.0,105.0] 
  6  |[1.0,1.0,4.0] |[2.0,8.0,4.0] |[12.0,48.0,24.0,12.0,48.0,24.0,48.0,192.0,96.0] 

注:该方法暂时并没有python的实现,有scala和Java的

Normalizer

Normalizer是一个Transformer,它转换数据集的Vector行,规范化每个Vector为unit norm。它采用参数p来规范化,它指定用于规范化的p范数。(默认p = 2 )。这种规范化可以帮助标准化您的输入数据,并改善学习算法的行为。

Examples

以下示例演示如何以libsvm格式加载数据集,然后将每行标准化为unit L^1 norm1和unitL^∞ norm。

有关API的更多详细信息,请参阅Normalizer Python文档

from pyspark.ml.feature import Normalizer
from pyspark.ml.linalg import Vectors
from pyspark.sql import SparkSession  

spark = SparkSession.builder.appName("NormalizerExample").getOrCreate()
dataFrame = spark.createDataFrame([
    (0, Vectors.dense([1.0, 0.5, -1.0]),),
    (1, Vectors.dense([2.0, 1.0, 1.0]),),
    (2, Vectors.dense([4.0, 10.0, 2.0]),)
], ["id", "features"])

# Normalize each Vector using $L^1$ norm.
normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)
l1NormData = normalizer.transform(dataFrame)
print("Normalized using L^1 norm")
l1NormData.show()

# Normalize each Vector using $L^\infty$ norm.
lInfNormData = normalizer.transform(dataFrame, {normalizer.p: float("inf")})
print("Normalized using L^inf norm")
lInfNormData.show()
spark.stop()

output:

Normalized using L^1 norm
+---+--------------+------------------+
| id|      features|      normFeatures|
+---+--------------+------------------+
|  0|[1.0,0.5,-1.0]|    [0.4,0.2,-0.4]|
|  1| [2.0,1.0,1.0]|   [0.5,0.25,0.25]|
|  2|[4.0,10.0,2.0]|[0.25,0.625,0.125]|
+---+--------------+------------------+

Normalized using L^inf norm
+---+--------------+--------------+
| id|      features|  normFeatures|
+---+--------------+--------------+
|  0|[1.0,0.5,-1.0]|[1.0,0.5,-1.0]|
|  1| [2.0,1.0,1.0]| [1.0,0.5,0.5]|
|  2|[4.0,10.0,2.0]| [0.4,1.0,0.2]|
+---+--------------+--------------+

Find full example code at "examples/src/main/python/ml/normalizer_example.py" in the Spark repo.

StandardScaler

StandardScaler转换Vector行的数据集,将每个特征归一化为具有单位标准偏差和/或零均值。它需要参数:

  • withStd:默认为true。将数据缩放到单位标准偏差。
  • withMean:默认为False。在缩放之前将数据集中在平均值上。它会建立一个密集的输出,所以在应用于稀疏输入时要小心。

StandardScaler是一个Estimator,可以fit在一个数据集上产生一个StandardScalerModel; 这相当于计算汇总统计。然后该模型可以转换Vector数据集中的列以具有单位标准偏差和/或零均值特征。

请注意,如果某个要素的标准偏差为零,则会在该特征的Vector中返回默认值0.0。

Examples

以下示例演示如何加载数据集,然后将每个特征标准化为单位标准偏差。

有关API的更多详细信息,请参阅StandardScaler Python文档

from pyspark.ml.feature import StandardScaler
from pyspark.sql import SparkSession  
spark = SparkSession.builder.appName("StandardScalerExample").getOrCreate()
dataFrame = spark.createDataFrame([
    (0, Vectors.dense([1.0, 0.5, -1.0]),),
    (1, Vectors.dense([2.0, 1.0, 1.0]),),
    (2, Vectors.dense([4.0, 10.0, 2.0]),)
], ["id", "features"])
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
                        withStd=True, withMean=False)

# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(dataFrame)

# Normalize each feature to have unit standard deviation.
scaledData = scalerModel.transform(dataFrame)
scaledData.show(truncate=False)
spark.stop()

output:

+---+--------------+------------------------------------------------------------+
|id |features      |scaledFeatures                                              |
+---+--------------+------------------------------------------------------------+
|0  |[1.0,0.5,-1.0]|[0.6546536707079772,0.09352195295828244,-0.6546536707079771]|
|1  |[2.0,1.0,1.0] |[1.3093073414159544,0.1870439059165649,0.6546536707079771]  |
|2  |[4.0,10.0,2.0]|[2.618614682831909,1.870439059165649,1.3093073414159542]    |
+---+--------------+------------------------------------------------------------+

Find full example code at "examples/src/main/python/ml/standard_scaler_example.py" in the Spark repo.

MinMaxScaler

MinMaxScaler转换Vector行数据集,将每个特征重新缩放到特定范围(通常为[0,1])。它需要参数:

min:默认为0.0。转换后的下界,被所有特征共享。 max:默认为1.0。变换后的上界,被所有的特征共享。 MinMaxScaler计算数据集的汇总统计并生成一个MinMaxScalerModel。然后模型可以单独转换每个特征,使其在给定的范围内。

特征E的重新缩放的值被计算为, Rescaled(ei) = (ei − Emin) / (Emax − Emin) ∗ (max − min) + min 对于Emax==Emin的情况Rescaled(ei)=0.5∗(max+min)

请注意,由于零值可能会被转换为非零值,所以transofromer的输出将会是DenseVector,即使输入是稀疏输入。

Examples

有关API的更多详细信息,请参阅MinMaxScaler Python文档MinMaxScalerModel Python文档

from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors
from pyspark.sql import SparkSession  
spark = SparkSession.builder.appName("MinMaxScalerExample").getOrCreate()
dataFrame = spark.createDataFrame([
    (0, Vectors.dense([1.0, 0.1, -1.0]),),
    (1, Vectors.dense([2.0, 1.1, 1.0]),),
    (2, Vectors.dense([3.0, 10.1, 3.0]),)
], ["id", "features"])

scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")

# Compute summary statistics and generate MinMaxScalerModel
scalerModel = scaler.fit(dataFrame)

# rescale each feature to range [min, max].
scaledData = scalerModel.transform(dataFrame)
print("Features scaled to range: [%f, %f]" % (scaler.getMin(), scaler.getMax()))
scaledData.select("features", "scaledFeatures").show()
spark.stop()

output:

Features scaled to range: [0.000000, 1.000000]
+--------------+--------------+
|      features|scaledFeatures|
+--------------+--------------+
|[1.0,0.1,-1.0]| [0.0,0.0,0.0]|
| [2.0,1.1,1.0]| [0.5,0.1,0.5]|
|[3.0,10.1,3.0]| [1.0,1.0,1.0]|
+--------------+--------------+

Find full example code at "examples/src/main/python/ml/min_max_scaler_example.py" in the Spark repo.

MaxAbsScaler

MaxAbsScaler转换Vector行的数据集,通过分割每个特征的最大绝对值来重新缩放每个特征到范围[-1,1]。它不会移动/居中数据,因此不会破坏任何稀疏性。

MaxAbsScaler计算数据集的汇总统计并生成一个MaxAbsScalerModel。该模型可以将每个特征分别转换为范围[-1,1]。

Examples

有关API的更多详细信息,请参阅MaxAbsScaler Python文档MaxAbsScalerModel Python文档

from pyspark.ml.feature import MaxAbsScaler
from pyspark.ml.linalg import Vectors
from pyspark.sql import SparkSession  

spark = SparkSession.builder.appName("MaxAbsScalerExample").getOrCreate()
dataFrame = spark.createDataFrame([
    (0, Vectors.dense([1.0, 0.1, -8.0]),),
    (1, Vectors.dense([2.0, 1.0, -4.0]),),
    (2, Vectors.dense([4.0, 10.0, 8.0]),)
], ["id", "features"])

scaler = MaxAbsScaler(inputCol="features", outputCol="scaledFeatures")

# Compute summary statistics and generate MaxAbsScalerModel
scalerModel = scaler.fit(dataFrame)

# rescale each feature to range [-1, 1].
scaledData = scalerModel.transform(dataFrame)

scaledData.select("features", "scaledFeatures").show()
spark.stop()

output:

+--------------+----------------+
|      features|  scaledFeatures|
+--------------+----------------+
|[1.0,0.1,-8.0]|[0.25,0.01,-1.0]|
|[2.0,1.0,-4.0]|  [0.5,0.1,-0.5]|
|[4.0,10.0,8.0]|   [1.0,1.0,1.0]|
+--------------+----------------+

Find full example code at "examples/src/main/python/ml/max_abs_scaler_example.py" in the Spark repo.

Bucketizer

Bucketizer将一列连续的特征转换成特征桶列,其中桶由用户指定。它需要一个参数:

  • splits:用于将连续特征映射到存储桶的参数。n个buckets有n+1个splits。由分割x,y定义的bucket值范围为[x,y)不包含y,而只有最后一个bucket包含y。splits应是严格增加的。必须明确提供inf的值以涵盖所有Double值; 否则,指定splits之外的值将被视为错误。两个splits的例子是Array(Double.NegativeInfinity, 0.0, 1.0, Double.PositiveInfinity)和Array(0.0, 1.0, 2.0)。

请注意,如果您不知道目标列的上限和下限,则应该添加Double.NegativeInfinity并Double.PositiveInfinity作为分割的界限,以防止出现Bucketizer界限异常。

还要注意,你提供的splits必须严格按照递增顺序,即s0 < s1 < s2 < ... < sn。

更多细节可以在Bucketizer的API文档中找到。

Examples

以下示例演示了如何将一列Doubles转换为另一个索引表列

from pyspark.ml.feature import Bucketizer
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("BucketizerExample").getOrCreate()
splits = [-float("inf"), -0.5, 0.0, 0.5, float("inf")]

data = [(-999.9,), (-0.5,), (-0.3,), (0.0,), (0.2,), (999.9,)]
dataFrame = spark.createDataFrame(data, ["features"])

bucketizer = Bucketizer(splits=splits, inputCol="features", outputCol="bucketedFeatures")

# Transform original data into its bucket index.
bucketedData = bucketizer.transform(dataFrame)

print("Bucketizer output with %d buckets" % (len(bucketizer.getSplits())-1))
bucketedData.show()
spark.stop()

output:

Bucketizer output with 4 buckets
+--------+----------------+
|features|bucketedFeatures|
+--------+----------------+
|  -999.9|             0.0|
|    -0.5|             1.0|
|    -0.3|             1.0|
|     0.0|             2.0|
|     0.2|             2.0|
|   999.9|             3.0|
+--------+----------------+

Find full example code at "examples/src/main/python/ml/bucketizer_example.py" in the Spark repo.

ElementwiseProduct

ElementwiseProduct将每个输入矢量使用元素乘法乘以一个提供的“权重”矢量。换句话说,它通过标量乘数来缩放数据集的每一列。这表示输入向量v和变换向量w之间的Hadamard product(哈达玛积),得到结果向量。 (v1...vN).T 。(w1...WN).T = (v1w1...vNwN).T

Examples

下面的这个例子演示了如何使用变换向量值来变换向量。有关API的更多详细信息,请参阅ElementwiseProduct Python文档

from pyspark.ml.feature import ElementwiseProduct
from pyspark.ml.linalg import Vectors
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("ElementwiseProductExample").getOrCreate()
# Create some vector data; also works for sparse vectors
data = [(Vectors.dense([1.0, 2.0, 3.0]),), (Vectors.dense([4.0, 5.0, 6.0]),)]
df = spark.createDataFrame(data, ["vector"])
transformer = ElementwiseProduct(scalingVec=Vectors.dense([0.0, 1.0, 2.0]),
                                 inputCol="vector", outputCol="transformedVector")
# Batch transform the vectors to create new column:
transformer.transform(df).show()
spark.stop()

output:

+-------------+-----------------+
|       vector|transformedVector|
+-------------+-----------------+
|[1.0,2.0,3.0]|    [0.0,2.0,6.0]|
|[4.0,5.0,6.0]|   [0.0,5.0,12.0]|
+-------------+-----------------+

Find full example code at "examples/src/main/python/ml/elementwise_product_example.py" in the Spark repo.

SQLTransformer

SQLTransformer实现由SQL语句定义的转换。目前我们只支持一下SQL语法:"SELECT ... FROM THIS ..." where, "THIS"代表输入数据集的基础表。select子句指定要在输出中显示的字段,常量和表达式,并且可以是Spark SQL支持的任何select子句。用户还可以使用Spark SQL内置函数和UDF对这些选定的列进行操作。例如,SQLTransformer支持像这样的语句:

  • SELECT a, a + b AS a_b FROM THIS
  • SELECT a, SQRT(b) AS b_sqrt FROM THIS where a > 5
  • SELECT a, b, SUM(c) AS c_sum FROM THIS GROUP BY a, b

Examples

有关该API的更多详细信息,请参阅SQLTransformer Python文档

from pyspark.ml.feature import SQLTransformer
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("SQLTransformerExample").getOrCreate()
df = spark.createDataFrame([
    (0, 1.0, 3.0),
    (2, 2.0, 5.0)
], ["id", "v1", "v2"])
sqlTrans = SQLTransformer(
    statement="SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
sqlTrans.transform(df).show()
spark.stop()

output:

+---+---+---+---+----+
| id| v1| v2| v3|  v4|
+---+---+---+---+----+
|  0|1.0|3.0|4.0| 3.0|
|  2|2.0|5.0|7.0|10.0|
+---+---+---+---+----+

Find full example code at "examples/src/main/python/ml/sql_transformer.py" in the Spark repo.

VectorAssembler

VectorAssembler是一个将给定的列列表组合成单个向量列的transoformer。对于将原始特征和由不同特征变换器生成的特征组合成一个特征向量,以便训练诸如逻辑回归和决策树等ML模型是有用的。 VectorAssembler接受以下输入列类型:所有数字类型,布尔类型和向量类型。在每一行中,输入列的值将按照指定的顺序连接成一个向量。

Examples

请参阅VectorAssembler Python文档 以获取有关API的更多详细信息。

from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("VectorAssemblerExample").getOrCreate()
dataset = spark.createDataFrame(
    [(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0)],
    ["id", "hour", "mobile", "userFeatures", "clicked"])

assembler = VectorAssembler(
    inputCols=["hour", "mobile", "userFeatures"],
    outputCol="features")

output = assembler.transform(dataset)
print("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(truncate=False)
spark.stop()

output:

Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'
+-----------------------+-------+
|features               |clicked|
+-----------------------+-------+
|[18.0,1.0,0.0,10.0,0.5]|1.0    |
+-----------------------+-------+

Find full example code at "examples/src/main/python/ml/vector_assembler_example.py" in the Spark repo.

QuantileDiscretizer

QuantileDiscretizer将带有连续特征的列转换成具有分类分类特征的列。bins的数量通过numBuckets参数设置。如果输入的独特值不足以创建足够多的分位数,则所用桶的数量可能会小于此值。

NaN values:NaN值将在QuantileDiscretizer拟合过程中从列中移除。这将产生一个Bucketizer预测模型。在转换期间,当在数据集中发现NaN值时Bucketizer会引发错误,但是用户也可以通过设置handleInvalid来选择保留或删除数据集中的NaN值。如果用户选择保留NaN值,他们将被专门处理,并放入他们自己的bucket中,例如,如果使用4个bucket,那么非NaN数据将被放入bucket[0-3],但是NaN将是算在一个特殊的bucket[4]里。

Algorithm:使用近似算法(有关详细说明,请参阅approxQuantile文档 )来选择bin的范围。近似的精度可以用relativeError参数来控制 。设置为零时,计算确切的分位数(注意:精确计算分位数是一个耗费的操作)。下部和上部bin边界会是-Infinity和+Infinity以来涵盖所有实数值。

Examples

请参阅QuantileDiscretizer Python文档 以获取有关API的更多详细信息。

from pyspark.ml.feature import QuantileDiscretizer
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("QuantileDiscretizerExample").getOrCreate()
data = [(0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2)]
df = spark.createDataFrame(data, ["id", "hour"])

discretizer = QuantileDiscretizer(numBuckets=3, inputCol="hour", outputCol="result")

result = discretizer.fit(df).transform(df)
result.show()
spark.stop()

output:

+---+----+------+
| id|hour|result|
+---+----+------+
|  0|18.0|   2.0|
|  1|19.0|   2.0|
|  2| 8.0|   1.0|
|  3| 5.0|   1.0|
|  4| 2.2|   0.0|
+---+----+------+

Find full example code at "examples/src/main/python/ml/quantile_discretizer_example.py" in the Spark repo.

Imputer

Imputer transformer使用平均值或位于列的中位数填充数据集中缺少的值。输入列应该是 DoubleType或FloatType。目前Imputer不支持分类特征,并可能为包含分类特征的列创建不正确的值。

注意:输入列中的所有null值都被视为缺失,所以也被归类。

Examples

有关API的更多详细信息,请参阅Imputer Python文档

from pyspark.ml.feature import Imputer
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("ImputerExample").getOrCreate()
df = spark.createDataFrame([
    (1.0, float("nan")),
    (2.0, float("nan")),
    (float("nan"), 3.0),
    (4.0, 4.0),
    (5.0, 5.0)
], ["a", "b"])

imputer = Imputer(inputCols=["a", "b"], outputCols=["out_a", "out_b"])
model = imputer.fit(df)

model.transform(df).show()
spark.stop()

output:

+---+---+-----+-----+
|  a|  b|out_a|out_b|
+---+---+-----+-----+
|1.0|NaN|  1.0|  4.0|
|2.0|NaN|  2.0|  4.0|
|NaN|3.0|  3.0|  3.0|
|4.0|4.0|  4.0|  4.0|
|5.0|5.0|  5.0|  5.0|
+---+---+-----+-----+

Find full example code at "examples/src/main/python/ml/imputer_example.py" in the Spark repo.