我想知道如何在 scala 中执行以下操作?
- 使用 Spark scala 连接到 postgreSQL 数据库。
- 编写 SQL 查询(如 SELECT 、 UPDATE 等)来修改表 该数据库。
我知道使用scala来做到这一点,但是如何在打包时将psql scala的连接器jar导入到sbt中?
请您参考如下方法:
我们的目标是从 Spark 工作线程运行并行 SQL 查询。
build设置
将连接器和 JDBC 添加到 libraryDependencies
在build.sbt
。我只在 MySQL 上尝试过这一点,所以我将在我的示例中使用它,但 Postgres 应该大致相同。
libraryDependencies ++= Seq(
jdbc,
"mysql" % "mysql-connector-java" % "5.1.29",
"org.apache.spark" %% "spark-core" % "1.0.1",
// etc
)
代码
当您创建SparkContext
时你告诉它哪些 jar 复制到执行者。包括连接器 jar 。一个很好的方法来做到这一点:
val classes = Seq(
getClass, // To get the jar with our own code.
classOf[mysql.jdbc.Driver] // To get the connector.
)
val jars = classes.map(_.getProtectionDomain().getCodeSource().getLocation().getPath())
val conf = new SparkConf().setJars(jars)
现在 Spark 已准备好连接到数据库。每个执行器将运行部分查询,以便为分布式计算准备好结果。
有两种选择。较旧的方法是使用 org.apache.spark.rdd.JdbcRDD
:
val rdd = new org.apache.spark.rdd.JdbcRDD(
sc,
() => {
sql.DriverManager.getConnection("jdbc:mysql://mysql.example.com/?user=batman&password=alfred")
},
"SELECT * FROM BOOKS WHERE ? <= KEY AND KEY <= ?",
0, 1000, 10,
row => row.getString("BOOK_TITLE")
)
查看参数文档。简而言之:
- 您有
SparkContext
. - 然后是创建连接的函数。这将在每个工作线程上调用以连接到数据库。
- 然后是 SQL 查询。这必须与示例类似,并包含开始键和结束键的占位符。
- 然后指定键的范围(在我的示例中为 0 到 1000)和分区数。该范围将在分区之间划分。因此一个执行器线程最终将执行
SELECT * FROM FOO WHERE 0 <= KEY AND KEY <= 100
在示例中。 - 最后我们有一个函数可以转换
ResultSet
进入某事。在示例中我们将其转换为String
,所以你最终得到RDD[String]
.
自 Apache Spark 1.3.0 版起,可以通过 DataFrame API 使用另一种方法。而不是JdbcRDD
您将创建一个 org.apache.spark.sql.DataFrame
:
val df = sqlContext.load("jdbc", Map(
"url" -> "jdbc:mysql://mysql.example.com/?user=batman&password=alfred",
"dbtable" -> "BOOKS"))
参见https://spark.apache.org/docs/1.3.1/sql-programming-guide.html#jdbc-to-other-databases完整的选项列表(可以像 JdbcRDD
一样设置键范围和分区数量)。
更新
JdbcRDD
不支持更新。但你可以简单地在 foreachPartition
中完成它们.
rdd.foreachPartition { it =>
val conn = sql.DriverManager.getConnection("jdbc:mysql://mysql.example.com/?user=batman&password=alfred")
val del = conn.prepareStatement("DELETE FROM BOOKS WHERE BOOK_TITLE = ?")
for (bookTitle <- it) {
del.setString(1, bookTitle)
del.executeUpdate
}
}
(这会为每个分区创建一个连接。如果这是一个问题,请使用连接池!)
DataFrame
通过createJDBCTable
支持更新和insertIntoJDBC
方法。