目录[-]

Feature Selectors

VectorSlicer

VectorSlicer是一个transformer,它将一个特征向量转换成一个新的具有原始特征的sub-array的特征向量。这对从向量列中提取特征很有用。

VectorSlicer接受一个具有指定索引的向量列,然后输出一个新的向量列,其值通过这些索引来选择。有两种类型的索引:

  1. setIndices():代表向量中索引的整数索引。

  2. setNames():代表向量中特征名称的字符串索引。 这需要vector列有一个AttributeGroup,因为实现得匹配Attribute名称字段。

整数和字符串的规范都是可以接受的。而且,您可以同时使用整数索引和字符串名称。必须至少选择一个特征,不允许重复的特征,所以选择的索引和名称之间就没有重叠。请注意,如果选择了特征的名称,遇到空的输入属性时将会抛出异常。

输出向量将首先按照选定的索引(按给定的顺序)排序,然后是选定的名称(按给定的顺序)。

Examples

有关API的更多详细信息,请参阅VectorSlicer Python文档

from pyspark.ml.feature import VectorSlicer
from pyspark.ml.linalg import Vectors
from pyspark.sql.types import Row
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("VectorSlicerExample").getOrCreate()
df = spark.createDataFrame([
    Row(userFeatures=Vectors.sparse(3, {0: -2.0, 1: 2.3})),
    Row(userFeatures=Vectors.dense([-2.0, 2.3, 0.0]))])

slicer = VectorSlicer(inputCol="userFeatures", outputCol="features", indices=[1])

output = slicer.transform(df)

output.select("userFeatures", "features").show()
spark.stop()

output:

+--------------------+-------------+
|        userFeatures|     features|
+--------------------+-------------+
|(3,[0,1],[-2.0,2.3])|(1,[0],[2.3])|
|      [-2.0,2.3,0.0]|        [2.3]|
+--------------------+-------------+

Find full example code at "examples/src/main/python/ml/vector_slicer_example.py" in the Spark repo.

RFormula

RFormula选择由Rmodel formula指定的列。目前我们支持R操作符的有限子集,包括'〜','。',':','+'和' - '。其基本的操作符是:

  • '~': 分开 target和terms
  • '+': 连接terms,“+ 0”表示删除intercept
  • '-': 删除一个term,“ - 1”表示删除intercept
  • ':': interaction(数值相乘或二元化分类值)
  • '.': 除target以外的所有列

假设a和都是b是double类型的列,我们使用以下简单的例子来说明RFormula的作用:

y ~ a + b意味着模型y ~ w0 + w1 * a + w2 * b,其中w0是截距intercept,w1, w2是系数coefficients。\ y ~ a + b + a:b - 1装置模型y ~ w1 * a + w2 * b + w3 * a * b,其中w1, w2, w3为系数。\ RFormula产生特征的一个向量列的和一个double类型列或标签的字符串类型列。就像在R中使用公式进行线性回归时一样,字符串输入列将被进行one-hot编码,而数字列将被转换为doule类型。如果标签列是字符串类型的,它将首先被转换为StringIndexer的double类型。如果DataFrame中不存在标签列,则将使用公式中指定的结果变量创建输出标签列。

Examples

有关API的更多详细信息,请参阅RFormula Python文档

from pyspark.ml.feature import RFormula
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("RFormulaExample").getOrCreate()
dataset = spark.createDataFrame(
    [(7, "US", 18, 1.0),
     (8, "CA", 12, 0.0),
     (9, "NZ", 15, 0.0)],
    ["id", "country", "hour", "clicked"])

formula = RFormula(
    formula="clicked ~ country + hour",
    featuresCol="features",
    labelCol="label")

output = formula.fit(dataset).transform(dataset)
output.select("features", "label").show()
spark.stop()

output:

+--------------+-----+
|      features|label|
+--------------+-----+
|[0.0,0.0,18.0]|  1.0|
|[1.0,0.0,12.0]|  0.0|
|[0.0,1.0,15.0]|  0.0|
+--------------+-----+

Find full example code at "examples/src/main/python/ml/rformula_example.py" in the Spark repo.

ChiSqSelector

ChiSqSelector代表Chi-Squared特征选择。它作用于具有分类特征的已标记数据。ChiSqSelector使用Chi-Squared test of independence来决定选择哪些特征。它支持五种选择方法:numTopFeatures,percentile,fpr,fdr,fwe: numTopFeatures选择一个根据卡方检验得到的固定的数目前几个特征,这类似于产生具有最大预测能力的特征。 percentile类似于numTopFeatures,但只选择所有特征的一部分,而不是固定的数目。 fpr选择p值低于阈值的所有特征,从而控制选择的误报率。 fdr使用Benjamini-Hochbergprocedure选择false discovery rate低于阈值的所有特征。* fwe选择p值低于阈值的所有特征,阈值由1 / numFeatures缩放,从而控制选择的family-wise error rate。默认选择方法是numTopFeatures,top特征的默认数量设置为50.用户可以使用setSelectorType选择方法。

Examples

有关API的更多详细信息,请参阅ChiSqSelector Python文档

from pyspark.ml.feature import ChiSqSelector
from pyspark.ml.linalg import Vectors
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("ChiSqSelectorExample").getOrCreate()
df = spark.createDataFrame([
    (7, Vectors.dense([0.0, 0.0, 18.0, 1.0]), 1.0,),
    (8, Vectors.dense([0.0, 1.0, 12.0, 0.0]), 0.0,),
    (9, Vectors.dense([1.0, 0.0, 15.0, 0.1]), 0.0,)], ["id", "features", "clicked"])

selector = ChiSqSelector(numTopFeatures=1, featuresCol="features",
                         outputCol="selectedFeatures", labelCol="clicked")

result = selector.fit(df).transform(df)

print("ChiSqSelector output with top %d features selected" % selector.getNumTopFeatures())
result.show()
spark.stop()

output:

ChiSqSelector output with top 1 features selected
+---+------------------+-------+----------------+
| id|          features|clicked|selectedFeatures|
+---+------------------+-------+----------------+
|  7|[0.0,0.0,18.0,1.0]|    1.0|          [18.0]|
|  8|[0.0,1.0,12.0,0.0]|    0.0|          [12.0]|
|  9|[1.0,0.0,15.0,0.1]|    0.0|          [15.0]|
+---+------------------+-------+----------------+

Find full example code at "examples/src/main/python/ml/chisq_selector_example.py" in the Spark repo.

Locality Sensitive Hashing

Locality Sensitive Hashing (LSH)是一类重要的散列技术,常用于聚类,近似最近邻搜索和大数据集异常值检测。

LSH的总体思路是使用一系列函数(“LSH families”)将数据点散列到桶中,使得彼此靠近的数据点高概率地出现在同一个桶中,而彼此相距很远的数据点很有可能在不同的桶里。LSH family正式定义如下。

在一个度量空间中(M, d)中,M一个集合,d是一个基于M的距离函数,LSH族是满足下列性质的函数族h:

存在p,q属于M,
d(p,q) <= r1 => Pr(h(p)) = h(q) >= p1
d(p,1) >= r2 => Pr(h(p)) = h(q) <= p2 
则这个LSH族被称为 (r1, r2, p1, p2)-敏感的。

在Spark中,不同的LSH families在不同的类中实现(例如MinHash),并且在每个类中提供用于feature transformation(特征变换),approximate simility join(最近似连接)和approximate nearest neighbor最近邻的APIs。

在LSH中,我们将false positive定义为一对散列到同一个桶中远距输入特征(with d(p,q)≥r2),并且将一个false negative定义为一对被散列到不同的桶中近距特征(with d(p,q)≤r1)

LSH Operations

我们描述了LSH可以应用的主要操作类型。一个拟合的LSH模型具有下列每个操作的方法。

Feature Transformation

Feature Transformation是添加哈希值作为新列的基本功能。这对降维有用。用户可以通过设置inputCol和outputCol来指定输入和输出列名。

LSH还支持多个LSH哈希表。用户可以通过设置numHashTables来指定哈希表的数量。这也用于在approximate similarity join和approximate nearest neighbo的OR-amplification。增加哈希表的数量会提高精度,但同时也会增加通信成本和运行时间。

outputCol的类型是Seq[Vector],其中数组的维度等于numHashTables,vectors的维度当前设置为1。在未来的版本中,我们将实现AND-amplification,使得用户可以指定这些vectors的维度。

Approximate Similarity Join

Approximate Similarity Join输入两个数据集,近似地返回数据集中那些距离小于用户定义的阈值的数行对。Approximate similarity join支持连接两个不同的数据集和self-joining(自连接)。自加入会产生一些重复的对。

Approximate similarity join接受转换和未转换的数据集作为输入。如果使用未转换的数据集,则会自动进行转换。在这种情况下,hash signture(哈希签名)将被创建为outputCol。

在连接的数据集中,可以在datasetA和datasetB中查询原始数据集。距离列将被添加到输出数据集,以显示返回的每对行之间的真实距离。

Approximate Nearest Neighbor Search

Approximate nearest neighbor search需要(拥有特征向量s的)数据集和一个关键字(单个特征向量),并且它近似地返回数据集中最接近这个向量的指定数量的行。

Approximate nearest neighbor search接受转换和未转换的数据集作为输入。如果使用未转换的数据集,则会自动进行转换。在这种情况下,哈希签名将被创建为outputCol。

距离列将被添加到输出数据集,以显示每个输出行和搜索键之间的真实距离。

注意:当散列桶中没有足够的候选项时,Approximate nearest neighbor search将返回少于l行。

LSH Algorithms

Bucketed Random Projection for Euclidean Distance

Bucketed Random Projection是一个基于欧氏距离的LSH family。欧几里德距离定义如下:

d(x,y) = sqrt(sum((xi - yi)**2))

其LSH族将特征向量投影到随机单位向量上,并将投影结果分成哈希桶: h(x) = [x·v/r] 其中r是用户定义的桶长度。桶长度可以用来控制散列桶的平均大小(从而控制桶的数量)。较大的桶长度(即,较少的桶)增加了特征被散列到相同桶的可能性(增加了true and false positives)。

Bucketed Random Projection接受任意向量作为输入特征,同时支持稀疏和密集向量。\ 有关API的更多详细信息,请参阅BucketedRandomProjectionLSH Python文档

from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("BucketedRandomProjectionLshExample").getOrCreate()

dataA = [(0, Vectors.dense([1.0, 1.0]),),
         (1, Vectors.dense([1.0, -1.0]),),
         (2, Vectors.dense([-1.0, -1.0]),),
         (3, Vectors.dense([-1.0, 1.0]),)]
dfA = spark.createDataFrame(dataA, ["id", "features"])

dataB = [(4, Vectors.dense([1.0, 0.0]),),
         (5, Vectors.dense([-1.0, 0.0]),),
         (6, Vectors.dense([0.0, 1.0]),),
         (7, Vectors.dense([0.0, -1.0]),)]
dfB = spark.createDataFrame(dataB, ["id", "features"])

key = Vectors.dense([1.0, 0.0])

brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes", bucketLength=2.0,
                                  numHashTables=3)
model = brp.fit(dfA)

# Feature Transformation
print("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show(truncate=False)

# Compute the locality sensitive hashes for the input rows, then perform approximate
# similarity join.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxSimilarityJoin(transformedA, transformedB, 1.5)`
print("Approximately joining dfA and dfB on Euclidean distance smaller than 1.5:")
model.approxSimilarityJoin(dfA, dfB, 1.5, distCol="EuclideanDistance")\
    .select(col("datasetA.id").alias("idA"),
            col("datasetB.id").alias("idB"),
            col("EuclideanDistance")).show(truncate=False)

# Compute the locality sensitive hashes for the input rows, then perform approximate nearest
# neighbor search.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxNearestNeighbors(transformedA, key, 2)`
print("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show(truncate=False)
spark.stop()

output:

The hashed dataset where hashed values are stored in the column 'hashes':
+---+-----------+-----------------------+
|id |features   |hashes                 |
+---+-----------+-----------------------+
|0  |[1.0,1.0]  |[[-1.0], [0.0], [0.0]] |
|1  |[1.0,-1.0] |[[0.0], [-1.0], [0.0]] |
|2  |[-1.0,-1.0]|[[0.0], [-1.0], [-1.0]]|
|3  |[-1.0,1.0] |[[-1.0], [0.0], [-1.0]]|
+---+-----------+-----------------------+

Approximately joining dfA and dfB on Euclidean distance smaller than 1.5:
+---+---+-----------------+
|idA|idB|EuclideanDistance|
+---+---+-----------------+
|0  |4  |1.0              |
|2  |7  |1.0              |
|1  |4  |1.0              |
|0  |6  |1.0              |
|3  |6  |1.0              |
|3  |5  |1.0              |
|1  |7  |1.0              |
|2  |5  |1.0              |
+---+---+-----------------+

Approximately searching dfA for 2 nearest neighbors of the key:
+---+----------+----------------------+-------+
|id |features  |hashes                |distCol|
+---+----------+----------------------+-------+
|0  |[1.0,1.0] |[[-1.0], [0.0], [0.0]]|1.0    |
|1  |[1.0,-1.0]|[[0.0], [-1.0], [0.0]]|1.0    |
+---+----------+----------------------+-------+

Find full example code at "examples/src/main/python/ml/bucketed_random_projection_lsh_example.py" in the Spark repo.

MinHash for Jaccard Distance

MinHash是用于计算Jaccard距离的LSH族,其中输入特征是自然数集合。两个集合的Jaccard距离由它们的交集和并集决定:

d(A,B) = 1 -|A ∩ B| / |A ∪ B|

MinHash 对集合中的每个元素应用随机哈希函数g,并取所有哈希值的最小值:

h(A) = min(g(a))  ,a∈A

MinHash的输入集表示为二元向量,其中向量索引表示元素本身,向量中的非零值表示集合中元素的存在。虽然支持密集和稀疏向量,但通常建议使用稀疏向量来提高效率。例如,Vectors.sparse(10, Array[(2, 1.0), (3, 1.0), (5, 1.0)])意味着空间中有10个元素。该集合包含元素2,元素3和元素5.所有非零值都被视为二进制“1”值。

注意:空集不能被MinHash转换,这意味着任何输入向量必须至少有一个非零的entry。

有关API的更多详细信息,请参阅MinHashLSH Python文档

from pyspark.ml.feature import MinHashLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("MinHashLSHExample").getOrCreate()
dataA = [(0, Vectors.sparse(6, [0, 1, 2], [1.0, 1.0, 1.0]),),
         (1, Vectors.sparse(6, [2, 3, 4], [1.0, 1.0, 1.0]),),
         (2, Vectors.sparse(6, [0, 2, 4], [1.0, 1.0, 1.0]),)]
dfA = spark.createDataFrame(dataA, ["id", "features"])

dataB = [(3, Vectors.sparse(6, [1, 3, 5], [1.0, 1.0, 1.0]),),
         (4, Vectors.sparse(6, [2, 3, 5], [1.0, 1.0, 1.0]),),
         (5, Vectors.sparse(6, [1, 2, 4], [1.0, 1.0, 1.0]),)]
dfB = spark.createDataFrame(dataB, ["id", "features"])

key = Vectors.sparse(6, [1, 3], [1.0, 1.0])

mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=5)
model = mh.fit(dfA)

# Feature Transformation
print("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()

# Compute the locality sensitive hashes for the input rows, then perform approximate
# similarity join.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxSimilarityJoin(transformedA, transformedB, 0.6)`
print("Approximately joining dfA and dfB on distance smaller than 0.6:")
model.approxSimilarityJoin(dfA, dfB, 0.6, distCol="JaccardDistance")\
    .select(col("datasetA.id").alias("idA"),
            col("datasetB.id").alias("idB"),
            col("JaccardDistance")).show()

# Compute the locality sensitive hashes for the input rows, then perform approximate nearest
# neighbor search.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxNearestNeighbors(transformedA, key, 2)`
# It may return less than 2 rows when not enough approximate near-neighbor candidates are
# found.
print("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()
spark.stop()

output:

The hashed dataset where hashed values are stored in the column 'hashes':
+---+--------------------+--------------------+
| id|            features|              hashes|
+---+--------------------+--------------------+
|  0|(6,[0,1,2],[1.0,1...|[[-8.91727E8], [-...|
|  1|(6,[2,3,4],[1.0,1...|[[-1.81795643E9],...|
|  2|(6,[0,2,4],[1.0,1...|[[-1.33587497E8],...|
+---+--------------------+--------------------+

Approximately joining dfA and dfB on distance smaller than 0.6:
+---+---+---------------+
|idA|idB|JaccardDistance|
+---+---+---------------+
|  1|  4|            0.5|
|  1|  5|            0.5|
|  2|  5|            0.5|
|  0|  5|            0.5|
+---+---+---------------+

Approximately searching dfA for 2 nearest neighbors of the key:
+---+--------------------+--------------------+-------+
| id|            features|              hashes|distCol|
+---+--------------------+--------------------+-------+
|  0|(6,[0,1,2],[1.0,1...|[[-8.91727E8], [-...|   0.75|
|  1|(6,[2,3,4],[1.0,1...|[[-1.81795643E9],...|   0.75|
+---+--------------------+--------------------+-------+

Find full example code at "examples/src/main/python/ml/min_hash_lsh_example.py" in the Spark repo.