SparkSession的功能
Spark2.0中引入了SparkSession的概念,它为用户提供了一个统一的切入点来使用Spark的各项功能,用户不但可以使用DataFrame和Dataset的各种API,学习Spark的难度也会大大降低。
Spark REPL和Databricks Notebook中的SparkSession对象
在之前的Spark版本中,Spark shell会自动创建一个SparkContext对象sc。2.0中Spark shell则会自动创建一个SparkSession对象(spark),在输入spark时就会发现它已经存在了。
在Databricks notebook中创建集群时也会自动生成一个SparkSession,这里用的名字也是spark。
SparkContext
SparkContext起到的是一个中介的作用,通过它来使用Spark其他的功能。每一个JVM都有一个对应的SparkContext,driver program通过SparkContext连接到集群管理器来实现对集群中任务的控制。Spark配置参数的设置以及对SQLContext、HiveContext和StreamingContext的控制也要通过SparkContext进行。
不过在Spark2.0中上述的一切功能都是通过SparkSession来完成的,同时SparkSession也简化了DataFrame/Dataset API的使用和对数据的操作。
pyspark.sql.SparkSession
https://blog.csdn.net/cjhnbls/article/details/79254188
1 | ctx = SparkSession.builder.appName("ApplicationName").config("spark.driver.memory", "6G").master('local[7]').getOrCreate() |
Scala API
创建SparkSession
在2.0版本之前,使用Spark必须先创建SparkConf和SparkContext,代码如下:
1 | //set up the spark configuration and create contexts |
不过在Spark2.0中只要创建一个SparkSession就够了,SparkConf、SparkContext和SQLContext都已经被封装在SparkSession当中。下面的代码创建了一个SparkSession对象并设置了一些参数。这里使用了生成器模式,只有此“spark”对象不存在时才会创建一个新对象。
1 | // Create a SparkSession. No need to create SparkContext |
执行完上面的代码就可以使用spark对象了。
设置运行参数
创建SparkSession之后可以设置运行参数,代码如下, 也可以使用Scala的迭代器来读取configMap中的数据。
1 | //set new runtime options |
读取元数据
如果需要读取元数据(catalog),可以通过SparkSession来获取。1
2
3//fetch metadata data from the catalog
spark.catalog.listDatabases.show(false)
spark.catalog.listTables.show(false)
这里返回的都是Dataset,所以可以根据需要再使用Dataset API来读取。 ???
创建Dataset和Dataframe
通过SparkSession来创建Dataset和Dataframe有多种方法。其中最简单的就是使用spark.range方法来生成Dataset,在摸索Dataset API的时候这个办法尤其有用。
1 | // create a Dataset using spark.range starting from 5 to 100, with increments of 5 |
读取JSON数据
此外,还可以用SparkSession读取JSON、CSV、TXT和parquet表。下面的代码中读取了一个JSON文件,返回的是一个DataFrame。
1 | // read the json file and create the dataframe |
使用SparkSQL
借助SparkSession用户可以像SQLContext一样使用Spark SQL的全部功能。下面的代码中先创建了一个表然后对此表进行查询。
1 | // Now create an SQL table and issue SQL queries against it without |
存储/读取Hive表
下面的代码演示了通过SparkSession来创建Hive表并进行查询的方法。
1 | //drop the table if exists to get around existing table error |
这里可以看到从DataFrame API、Spark SQL和Hive语句返回的结果是完全相同的。