Skip to main content
 首页 » 编程设计

scala之为什么 Spark 应用程序以 “ClassNotFoundException: Failed to find data source: kafka” 作为带有 sbt 程序集的 uber-jar 失败

2025年04月02日85bonelee

我正在尝试运行类似 StructuredKafkaWordCount 的示例.我从 Spark Structured Streaming Programming guide 开始.

我的代码是

package io.boontadata.spark.job1 
 
import org.apache.spark.sql.SparkSession 
 
object DirectKafkaAggregateEvents { 
  val FIELD_MESSAGE_ID = 0 
  val FIELD_DEVICE_ID = 1 
  val FIELD_TIMESTAMP = 2 
  val FIELD_CATEGORY = 3 
  val FIELD_MEASURE1 = 4 
  val FIELD_MEASURE2 = 5 
 
  def main(args: Array[String]) { 
    if (args.length < 3) { 
      System.err.println(s""" 
        |Usage: DirectKafkaAggregateEvents <brokers> <subscribeType> <topics> 
        |  <brokers> is a list of one or more Kafka brokers 
        |  <subscribeType> sample value: subscribe 
        |  <topics> is a list of one or more kafka topics to consume from 
        | 
        """.stripMargin) 
      System.exit(1) 
    } 
 
    val Array(bootstrapServers, subscribeType, topics) = args 
 
    val spark = SparkSession 
      .builder 
      .appName("boontadata-spark-job1") 
      .getOrCreate() 
 
    import spark.implicits._ 
 
    // Create DataSet representing the stream of input lines from kafka 
    val lines = spark 
      .readStream 
      .format("kafka") 
      .option("kafka.bootstrap.servers", bootstrapServers) 
      .option(subscribeType, topics) 
      .load() 
      .selectExpr("CAST(value AS STRING)") 
      .as[String] 
 
    // Generate running word count 
    val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count() 
 
    // Start running the query that prints the running counts to the console 
    val query = wordCounts.writeStream 
      .outputMode("complete") 
      .format("console") 
      .start() 
 
    query.awaitTermination() 
  } 
 
} 

我添加了以下 sbt 文件:

构建.sbt:
name := "boontadata-spark-job1" 
version := "0.1" 
scalaVersion := "2.11.7" 
 
libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.0.2" % "provided" 
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.0.2" % "provided" 
libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "2.0.2" % "provided" 
libraryDependencies += "org.apache.spark" % "spark-sql-kafka-0-10_2.11" % "2.0.2" 
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.0.2" 
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.10.1.1" 
libraryDependencies += "org.apache.kafka" % "kafka_2.11" % "0.10.1.1" 
 
// META-INF discarding 
assemblyMergeStrategy in assembly := {  
   { 
    case PathList("META-INF", xs @ _*) => MergeStrategy.discard 
    case x => MergeStrategy.first 
   } 
} 

我还添加了 project/assembly.sbt
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3") 

这将创建一个带有非 provided 的 Uber jar jar 。

我提交以下行:
spark-submit boontadata-spark-job1-assembly-0.1.jar ks1:9092,ks2:9092,ks3:9092 subscribe sampletopic 

但我收到此运行时错误:
Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at https://cwiki.apache.org/confluence/display/SPARK/Third+Party+Projects 
        at org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource(DataSource.scala:148) 
        at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:79) 
        at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:79) 
        at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:218) 
        at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:80) 
        at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:80) 
        at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30) 
        at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:124) 
        at io.boontadata.spark.job1.DirectKafkaAggregateEvents$.main(StreamingJob.scala:41) 
        at io.boontadata.spark.job1.DirectKafkaAggregateEvents.main(StreamingJob.scala) 
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
        at java.lang.reflect.Method.invoke(Method.java:498) 
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736) 
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185) 
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210) 
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124) 
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource 
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381) 
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5$$anonfun$apply$1.apply(DataSource.scala:132) 
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5$$anonfun$apply$1.apply(DataSource.scala:132) 
        at scala.util.Try$.apply(Try.scala:192) 
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5.apply(DataSource.scala:132) 
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5.apply(DataSource.scala:132) 
        at scala.util.Try.orElse(Try.scala:84) 
        at org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource(DataSource.scala:132) 
        ... 18 more 
16/12/23 13:32:48 INFO spark.SparkContext: Invoking stop() from shutdown hook 

有没有办法知道哪个类没有找到,以便我可以在 maven.org 存储库中搜索该类。
lookupDataSource源代码似乎位于 https://github.com/apache/spark/blob/83a6ace0d1be44f70e768348ae6688798c84343e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala 的第 543 行但我找不到与 Kafka 数据源的直接链接......

完整的源代码在这里: https://github.com/boontadata/boontadata-streams/tree/ad0d0134ddb7664d359c8dca40f1d16ddd94053f

请您参考如下方法:

我试过这样它对我有用。像这样提交,一旦您有任何问题,请告诉我

./spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0 --class com.inndata.StructuredStreaming.Kafka --master local[*] /Users/apple/.m2/repository/com/inndata/StructuredStreaming/0.0.1SNAPSHOT/StructuredStreaming-0.0.1-SNAPSHOT.jar