Skip to main content
 首页 » 编程设计

apache-spark之Spark DataFrame:计算每列的不同值

2025年02月15日13soundcode

标题中的问题很多:是否有一种有效的方法来计算DataFrame中每一列中的不同值?

describe方法仅提供计数,但不提供非重复计数,我想知道是否存在一种方法来获取所有(或某些选定)列的非重复计数。

请您参考如下方法:

多个聚合计算起来非常昂贵。我建议您改用近似方法。在这种情况下,近似计数:

val df = Seq((1,3,4),(1,2,3),(2,3,4),(2,3,5)).toDF("col1","col2","col3") 
 
val exprs = df.columns.map((_ -> "approx_count_distinct")).toMap 
df.agg(exprs).show() 
// +---------------------------+---------------------------+---------------------------+ 
// |approx_count_distinct(col1)|approx_count_distinct(col2)|approx_count_distinct(col3)| 
// +---------------------------+---------------------------+---------------------------+ 
// |                          2|                          2|                          3| 
// +---------------------------+---------------------------+---------------------------+ 
approx_count_distinct方法依赖于底层的 HyperLogLog

HyperLogLog 算法及其变体HyperLogLog++(在Spark中实现)依赖于以下巧妙的观察。

如果数字均匀地分布在一个范围内,则可以从数字的二进制表示形式中前导零的最大数量中近似得出不同元素的数量。

例如,如果我们观察到一个数字,其二进制形式的数字为 0…(k times)…01…1,那么我们可以估计集合中的元素数量为2 ^ k。这是一个非常粗略的估计,但是可以使用草绘算法将其精炼到很高的精度。

可以在 original paper中找到对该算法背后机制的详尽解释。

注意:从Spark 1.6开始 Spark,当Spark调用SELECT SOME_AGG(DISTINCT foo)), SOME_AGG(DISTINCT bar)) FROM df时,每个子句应分别触发每个子句的聚合。而这与SELECT SOME_AGG(foo), SOME_AGG(bar) FROM df不同,后者仅聚合一次。因此,当使用count(distinct(_))approxCountDistinct(或approx_count_distinct)时,性能将不具有可比性。

自Spark 1.6以来,这是行为的变化之一:

With the improved query planner for queries having distinct aggregations (SPARK-9241), the plan of a query having a single distinct aggregation has been changed to a more robust version. To switch back to the plan generated by Spark 1.5’s planner, please set spark.sql.specializeSingleDistinctAggPlanning to true. (SPARK-12077)



引用:Approximate Algorithms in Apache Spark: HyperLogLog and Quantiles