0%

spark敲门砖之WordCount

spark敲门砖之WordCount

首先要确定好scala和spark的版本,可以去看一下maven中的对应关系(我原本用的是scala2.13版本的,由于spark没有与之对应的版本,降回到2.12了)

maven版本对应

maven依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
<!-- 添加scala的依赖 -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.12.13</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<!-- 这里注意scala的版本和spark的版本,不然会出现问题 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.1.1</version>
</dependency>

原本我用的scala2.13会报Cloneable类找不到(由于SparkConf继承了Cloneable,所以报的错),这个问题告诉我,版本没有必要用最新的,因为框架没有追上来,再等等

接下来就是一个简单的wordCount,其实与scala操作差不多,只是SparkContext中对于一些方法又进行了一下封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// spark配置
// local代表本地环境
val sparkConf = new SparkConf().setMaster("local").setAppName("WordCount")
// 建立Spark连接
val sc = new SparkContext(sparkConf)
// 读取文件,一行一行的数据
val lines:RDD[String] = sc.textFile(ClassLoader.getSystemResource("wordcount/wordcount.txt").getPath)
println("======lines=======")
for(item <- lines.collect()){
println(item)
}
// 1.将每行数据进行拆分,分为一个一个的单词
val words:RDD[String] = lines.flatMap(_.split(" "))
println("======words=======")
for(item <- words.collect()){
println(item)
}
// 2.对每个单词设置值为1
val wordToOne:RDD[(String,Int)] = words.map((_,1))
println("======wordToOne=======")
for(item <- wordToOne.collect()){
println(item)
}
// 3.使用reduceByKey可以对根据key对value进行聚合
val wordCount:RDD[(String,Int)] = wordToOne.reduceByKey(_+_)
println("======wordCount=======")
for(item <- wordCount.collect()){
println(item)
}
// 关闭spark连接
sc.stop()

结果

1
2
3
4
5
6
7
8
9
10
11
12
13
(Spark,1)
(Zookeeper,3)
(Kafka,1)
(Yarn,1)
(MapReduce,1)
(Hdfs,1)
(Flume,1)
(Hive,2)
(Agent,1)
(Scala,1)
(MySQL,1)
(Hbase,1)
(Hadoop,4)

在spark shell中,SparkContext已经被创建好了,在变量中叫做sc,可以直接使用

欢迎关注我的其它发布渠道