spark敲门砖之WordCount
首先要确定好scala和spark的版本,可以去看一下maven中的对应关系(我原本用的是scala2.13版本的,由于spark没有与之对应的版本,降回到2.12了)
maven依赖
1 2 3 4 5 6 7 8 9 10 11 12 13
| <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.12.13</version> </dependency>
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.1.1</version> </dependency>
|
原本我用的scala2.13会报Cloneable类找不到(由于SparkConf继承了Cloneable,所以报的错),这个问题告诉我,版本没有必要用最新的,因为框架没有追上来,再等等
接下来就是一个简单的wordCount,其实与scala操作差不多,只是SparkContext中对于一些方法又进行了一下封装
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
|
val sparkConf = new SparkConf().setMaster("local").setAppName("WordCount")
val sc = new SparkContext(sparkConf)
val lines:RDD[String] = sc.textFile(ClassLoader.getSystemResource("wordcount/wordcount.txt").getPath) println("======lines=======") for(item <- lines.collect()){ println(item) }
val words:RDD[String] = lines.flatMap(_.split(" ")) println("======words=======") for(item <- words.collect()){ println(item) }
val wordToOne:RDD[(String,Int)] = words.map((_,1)) println("======wordToOne=======") for(item <- wordToOne.collect()){ println(item) }
val wordCount:RDD[(String,Int)] = wordToOne.reduceByKey(_+_) println("======wordCount=======") for(item <- wordCount.collect()){ println(item) }
sc.stop()
|
结果
1 2 3 4 5 6 7 8 9 10 11 12 13
| (Spark,1) (Zookeeper,3) (Kafka,1) (Yarn,1) (MapReduce,1) (Hdfs,1) (Flume,1) (Hive,2) (Agent,1) (Scala,1) (MySQL,1) (Hbase,1) (Hadoop,4)
|
在spark shell中,SparkContext已经被创建好了,在变量中叫做sc,可以直接使用