0%

RDD编程

RDD的创建方式

从集合(内存)中创建RDD

使用parallelize或者makeRDD方法传入序列即可

1
2
3
4
5
6
7
8
9
// 准备集合
val list = List(1,2,3,4)
// 创建RDD
val rdd = context.parallelize(list)
// 该方法内部也是直接调用的parallelize(seq, numSlices)
context.makeRDD(list)
rdd.collect().foreach(println)

context.stop()
阅读全文 »

scala样例类

scala中的样例类是专门为模式匹配而创建的类,使用case来进行声明

1
case class HeartBeat(id:String)

样例类构造器中的参数都是默认为val修饰的,样例类会自动生成apply方法,unapply方法,toString方法,equals方法,hashCode方法,copy方法等

1
2
3
4
5
6
7
8
9
10
11
12
13
14
case class Teacher(id :Int)


val tea1 = Teacher(1)
val tea2 = Teacher(2)
val tea3 = Teacher(1)
val teaList = List(tea1,tea2,tea3)

for(tea <- teaList){
tea match {
case Teacher(1) => println(s"相同的老师${tea.id} ")
case _ => println(s"不是相同的老师${tea.id}")
}
}

scala视图

scala可以使用视图来进行懒加载操作,使得操作不会立即执行,只有使用到该结果时才会执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def even(num: Int): Boolean = {
println("开始过滤")
if (num % 2 == 0) {
true
} else {
false
}
}

val listInt: List[Int] = List(1, 2, 3)
// 使用view产生懒加载集合
val list2 = listInt.view.filter(even)
println("------")
// View(<not computed>)
println(list2)
// 只要真正使用的时候才会执行even方法
println(list2.size)

spark核心数据结构之RDD

RDD是Spark对数据的核心抽象,称为弹性分布式数据集(Resilient Distributed Dataset,简称RDD),即为分布式的元素集合,在Spark中,对数据的所有操作就是创建RDD、转化已有RDD以及调用RDD操作进行求值,然后Spark自动将RDD中的数据分发到集群上,将操作并行化执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/* Internally, each RDD is characterized by five main properties:
*
* - A list of partitions
* - A function for computing each split
* - A list of dependencies on other RDDs
* - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
* - Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
*/
// 分区列表,用于执行任务时并行计算
protected def getPartitions: Array[Partition]
// 分区计算函数,使用分区函数对每一个分区进行计算
def compute(split: Partition, context: TaskContext): Iterator[T]
// RDD之间的依赖关系,多个计算模型进行组合时,需要建立多个RDD的依赖关系
protected def getDependencies: Seq[Dependency[_]] = deps
// 分区器,可以通过设定分区器自定义数据的分区(可选)
@transient val partitioner: Option[Partitioner] = None
// 首选位置,计算数据时,可根据计算节点的状态选择不同的节点位置来进行计算,可以减少一些不必要的网络传输
protected def getPreferredLocations(split: Partition): Seq[String] = Nil

spark历史服务器

由于spark-shell停掉之后页面的4040端口也就无法访问了,那么使用spark-submit执行的任务就无法查看,此时就需要配置历史服务器

首先修改spark-defaults.conf配置文件

1
2
3
# 配置日志存储
spark.eventLog.enabled true
spark.eventLog.dir hdfs://localhost:9000/directory

修改spark-env.sh

1
2
3
4
5
#配置历史服务器
export SPARK_HISTORY_OPTS="
78 -Dspark.history.ui.port=18080
79 -Dspark.history.fs.logDirectory=hdfs://localhost:9000/directory
80 -Dspark.history.retainedApplications=30"

spark.history.fs.logDirectory和spark.eventLog.dir要对应,且这里的目录一定要存在,如果不存在需要在hdfs上创建

之后启动历史服务器(在sbin目录下)

1
./start-history-server.sh

启动之后可以去localhost:18080页面查看

spark历史服务器页面

spring事务失效场景

该描述的是使用注解@Transactional的方式来配置事务

  • 配置的方法非public修饰

    由于事务是使用的代理,而代理对于非public的方法不生效

  • 配置的所在类非spring容器管理的bean

  • 注解修饰的方法被所在类使用this或默认调用

    Spring在扫描Bean的时候会自动为标注了@Transactional注解的类生成一个代理类(proxy),当有注解的方法被调用的时候,实际上是代理类调用的,代理类在调用之前会开启事务,执行事务的操作,但是同类中的方法互相调用,相当于this.B(),此时的B方法并非是代理类调用,而是直接通过原有的Bean直接调用,所以注解会失效

  • 默认情况下,业务抛出异常为非RuntimeException异常

    由于默认情况下只对于RuntimeException异常回滚

  • 业务代码使用try…catch捕获异常,然后直接消化了,并未抛出异常

  • 注解中设置了错误的传播方式

spark环境配置

首先我下载的是spark-3.1.1-bin-without-hadoop,由于spark中没有带有hadoop,所以操作起来是比较麻烦的,spark先要关联我自己的hadoop环境

在conf下的spark-env.sh中添加

1
2
3
// /usr/local/myself/hadoop-3.3.0/bin/hadoop是我自己的hadoop所在位置,这样就可以使用hadoop中的jar包了
// 如果不加这句话会导致spark缺少很多jar包
export SPARK_DIST_CLASSPATH=$(/usr/local/myself/hadoop-3.3.0/bin/hadoop classpath)

注意在环境变量中配置scala、spark、hadoop等

阅读全文 »

spark敲门砖之WordCount

首先要确定好scala和spark的版本,可以去看一下maven中的对应关系(我原本用的是scala2.13版本的,由于spark没有与之对应的版本,降回到2.12了)

maven版本对应

maven依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
<!-- 添加scala的依赖 -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.12.13</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<!-- 这里注意scala的版本和spark的版本,不然会出现问题 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.1.1</version>
</dependency>
阅读全文 »

spark与hadoop对比

spark就相当于是Hadoop的升级版本,对于MapReduce进行了再一次的优化

Hadoop

  • Hadoop 是由 java 语言编写的,在分布式服务器集群上存储海量数据并运行分布式分析应用的开源框架
  • 作为 Hadoop 分布式文件系统,HDFS 处于 Hadoop 生态圈的最下层,存储着所有的数据 ,支持着 Hadoop的所有 服务。它的理论基础源于Google的TheGoogleFileSystem这篇论文,它是GFS的开源实现
  • MapReduce是一种编程模型,Hadoop根据Google的MapReduce论文将其实现,作为Hadoop的分布式计算模型,是 Hadoop的核心。基于这个框架,分布式并行程序的编写变得异常简单。综合了 HDFS 的分布式存储和MapReduce 的分布式计算,Hadoop在处理海量数据时,性能横向扩展变得非常容易

Spark

  • Spark 是一种由 Scala 语言开发的快速、通用、可扩展的大数据分析引擎
  • Spark Core 中提供了 Spark 最基础与最核心的功能
  • Spark SQL 是 Spark 用来操作结构化数据的组件。通过 Spark SQL,用户可以使用SQL 或者 Apache Hive 版本的 SQL 方言(HQL)来查询数据
  • Spark Streaming 是 Spark 平台上针对实时数据进行流式计算的组件,提供了丰富的处理数据流的 API

根本区别在于:Spark多个作业之间数据通信是基于内存,而Hadoop是基于磁盘,但是由于Spark是基于内存的,由于内存的限制,有时候会由于内存资源不足导致Job执行失败,所以Spark并不能完全的替代MapReduce