0%

spark核心数据结构之RDD

RDD是Spark对数据的核心抽象,称为弹性分布式数据集(Resilient Distributed Dataset,简称RDD),即为分布式的元素集合,其表示分布在多个计算节点上可以并行操作的元素集合。在Spark中,对数据的所有操作就是创建RDD、转化已有RDD以及调用RDD操作进行求值,然后Spark自动将RDD中的数据分发到集群上,将操作并行化执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/* Internally, each RDD is characterized by five main properties:
*
* - A list of partitions
* - A function for computing each split
* - A list of dependencies on other RDDs
* - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
* - Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
*/
// 分区列表,用于执行任务时并行计算
protected def getPartitions: Array[Partition]
// 分区计算函数,使用分区函数对每一个分区进行计算
def compute(split: Partition, context: TaskContext): Iterator[T]
// RDD之间的依赖关系,多个计算模型进行组合时,需要建立多个RDD的依赖关系
protected def getDependencies: Seq[Dependency[_]] = deps
// 分区器,可以通过设定分区器自定义数据的分区(可选)
@transient val partitioner: Option[Partitioner] = None
// 首选位置,计算数据时,可根据计算节点的状态选择不同的节点位置来进行计算,可以减少一些不必要的网络传输
protected def getPreferredLocations(split: Partition): Seq[String] = Nil

spark组件说明

下图是spark执行时的基本结构

spark架构

阅读全文 »

spark历史服务器

spark-shell停掉之后页面的4040端口也就无法访问了,那么使用spark-submit执行的任务就无法查看,此时就需要配置历史服务器

首先修改spark-defaults.conf配置文件

1
2
3
# 配置日志存储
spark.eventLog.enabled true
spark.eventLog.dir hdfs://localhost:9000/directory

修改spark-env.sh

1
2
3
4
5
6
7
#配置历史服务器  spark.history.ui.port是WEBUI访问的端口号
# spark.history.fs.logDirectory 历史服务器日志存储的路径
# spark.history.retainedApplications 保留的Application历史记录的个数
export SPARK_HISTORY_OPTS="
-Dspark.history.ui.port=18080
-Dspark.history.fs.logDirectory=hdfs://localhost:9000/directory
-Dspark.history.retainedApplications=30"

spark.history.fs.logDirectory和spark.eventLog.dir要对应,且这里的目录一定要存在,如果不存在需要在hdfs上创建

之后启动历史服务器(在sbin目录下)

1
./start-history-server.sh

启动之后可以去localhost:18080页面查看

spark历史服务器页面

spring事务失效场景

首先spring的事务是使用AOP来实现的,而AOP的底层是代理(JDK代理或者CGLIB代理),所以事务失效就想什么时候不能进行代理

该描述的是使用注解@Transactional的方式来配置事务

  • 配置的方法非public修饰

    由于事务是使用的代理,而代理对于非public的方法不生效(private 不能被子类继承)

  • 配置的所在类非spring容器管理的bean

  • 注解修饰的方法被所在类使用this或默认调用

    Spring在扫描Bean的时候会自动为标注了@Transactional注解的类生成一个代理类(proxy),当有注解的方法被调用的时候,实际上是代理类调用的,代理类在调用之前会开启事务,执行事务的操作,但是同类中的方法互相调用,相当于this.B(),此时的B方法并非是代理类调用,而是直接通过原有的Bean直接调用,所以注解会失效

    可以使用

    1
    <aop:config expose-proxy="true"></aop:config>

    来暴露代理类

    然后在代码中使用((MyServiceBiz)(AopContext.currentProxy())).test()来调用对应的方法

  • 默认情况下,业务抛出异常为非RuntimeException异常

    由于默认情况下只对于RuntimeException异常回滚

  • 业务代码使用try…catch捕获异常,然后直接消化了,并未抛出异常

  • 注解中设置了错误的传播方式

这里除了使用this调用外别的都比较好理解,只有this大家其实理解不了。举个例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class Base {

public void test(){
System.out.println("Base#this执行");
}

public void method(){
System.out.println("Base#method执行");
this.test();
}
}

public class Child extends Base{

public void test(){
System.out.println("Child#test执行");
}


public void invoke(){
super.method();
}

public static void main(String[] args) {
Child child = new Child();
child.invoke();
}
}

// 执行结果为
Base#method执行
Child#test执行
阅读全文 »

spark环境配置

首先我下载的是spark-3.1.1-bin-without-hadoop,由于spark中没有带有hadoop,所以操作起来是比较麻烦的,spark先要关联我自己的hadoop环境

在conf下的spark-env.sh中添加

1
2
3
// /usr/local/myself/hadoop-3.3.0/bin/hadoop是我自己的hadoop所在位置,这样就可以使用hadoop中的jar包了
// 如果不加这句话会导致spark缺少很多jar包
export SPARK_DIST_CLASSPATH=$(/usr/local/myself/hadoop-3.3.0/bin/hadoop classpath)

注意在环境变量中配置scala、spark、hadoop等

阅读全文 »