Skip to main content
 首页 » 编程设计

scala之如何使用 scala 将 postgreSQL 数据库连接到 Apache Spark

2024年08月23日41freeliver54

我想知道如何在 scala 中执行以下操作?

  1. 使用 Spark scala 连接到 postgreSQL 数据库。
  2. 编写 SQL 查询(如 SELECT 、 UPDATE 等)来修改表 该数据库。

我知道使用scala来做到这一点,但是如何在打包时将psql scala的连接器jar导入到sbt中?

请您参考如下方法:

我们的目标是从 Spark 工作线程运行并行 SQL 查询。

build设置

将连接器和 JDBC 添加到 libraryDependenciesbuild.sbt 。我只在 MySQL 上尝试过这一点,所以我将在我的示例中使用它,但 Postgres 应该大致相同。

libraryDependencies ++= Seq( 
  jdbc, 
  "mysql" % "mysql-connector-java" % "5.1.29", 
  "org.apache.spark" %% "spark-core" % "1.0.1", 
  // etc 
) 

代码

当您创建SparkContext时你告诉它哪些 jar 复制到执行者。包括连接器 jar 。一个很好的方法来做到这一点:

val classes = Seq( 
  getClass,                   // To get the jar with our own code. 
  classOf[mysql.jdbc.Driver]  // To get the connector. 
) 
val jars = classes.map(_.getProtectionDomain().getCodeSource().getLocation().getPath()) 
val conf = new SparkConf().setJars(jars) 

现在 Spark 已准备好连接到数据库。每个执行器将运行部分查询,以便为分布式计算准备好结果。

有两种选择。较旧的方法是使用 org.apache.spark.rdd.JdbcRDD :

val rdd = new org.apache.spark.rdd.JdbcRDD( 
  sc, 
  () => { 
    sql.DriverManager.getConnection("jdbc:mysql://mysql.example.com/?user=batman&password=alfred") 
  }, 
  "SELECT * FROM BOOKS WHERE ? <= KEY AND KEY <= ?", 
  0, 1000, 10, 
  row => row.getString("BOOK_TITLE") 
) 

查看参数文档。简而言之:

  • 您有 SparkContext .
  • 然后是创建连接的函数。这将在每个工作线程上调用以连接到数据库。
  • 然后是 SQL 查询。这必须与示例类似,并包含开始键和结束键的占位符。
  • 然后指定键的范围(在我的示例中为 0 到 1000)和分区数。该范围将在分区之间划分。因此一个执行器线程最终将执行 SELECT * FROM FOO WHERE 0 <= KEY AND KEY <= 100在示例中。
  • 最后我们有一个函数可以转换 ResultSet进入某事。在示例中我们将其转换为 String ,所以你最终得到 RDD[String] .

自 Apache Spark 1.3.0 版起,可以通过 DataFrame API 使用另一种方法。而不是JdbcRDD您将创建一个 org.apache.spark.sql.DataFrame :

val df = sqlContext.load("jdbc", Map( 
  "url" -> "jdbc:mysql://mysql.example.com/?user=batman&password=alfred", 
  "dbtable" -> "BOOKS")) 

参见https://spark.apache.org/docs/1.3.1/sql-programming-guide.html#jdbc-to-other-databases完整的选项列表(可以像 JdbcRDD 一样设置键范围和分区数量)。

更新

JdbcRDD不支持更新。但你可以简单地在 foreachPartition 中完成它们.

rdd.foreachPartition { it => 
  val conn = sql.DriverManager.getConnection("jdbc:mysql://mysql.example.com/?user=batman&password=alfred") 
  val del = conn.prepareStatement("DELETE FROM BOOKS WHERE BOOK_TITLE = ?") 
  for (bookTitle <- it) { 
    del.setString(1, bookTitle) 
    del.executeUpdate 
  } 
} 

(这会为每个分区创建一个连接。如果这是一个问题,请使用连接池!)

DataFrame通过createJDBCTable支持更新和insertIntoJDBC方法。