0%

ShardingSphere-Proxy简介

透明化的数据库代理端,提供封装了数据库二进制协议的服务端版本,用于完成对异构语言的支持。 目前提供 MySQL 和 PostgreSQL 版本,它可以使用任何兼容 MySQL/PostgreSQL 协议的访问客户端(如:MySQL Command Client, MySQL Workbench, Navicat 等)操作数据,对 DBA 更加友好。

  • 向应用程序完全透明,可直接当做 MySQL/PostgreSQL 使用
  • 适用于任何兼容 MySQL/PostgreSQL 协议的的客户端

与Sharding-JDBC不同的是,Sharding-Proxy是一个独立的应用,相当于是将Sharding-JDBC中的配置抽离到Sharding-Proxy进行配置

在conf/server.yaml配置文件中配置数据库的认证信息(用户名,密码)以及数据库的共用属性

在conf/config-sharding.yaml配置文件中配置分库分表的配置,与sharding-jdbc配置相似

在conf/config-master_slave.yaml配置文件中配置读写分离配置

ShardingSphere-JDBC简介

在 Java 的 JDBC 层提供的额外服务。 它使用客户端直连数据库,以 jar 包形式提供服务,无需额外部署和依赖,可理解为增强版的 JDBC 驱动,完全兼容 JDBC 和各种 ORM 框架。

  • 适用于任何基于 JDBC 的 ORM 框架,如:JPA, Hibernate, Mybatis, Spring JDBC Template 或直接使用 JDBC。
  • 支持任何第三方的数据库连接池,如:DBCP, C3P0, BoneCP, Druid, HikariCP 等。
  • 支持任意实现 JDBC 规范的数据库,目前支持 MySQL,Oracle,SQLServer,PostgreSQL 以及任何遵循 SQL92 标准的数据库

作用

  • 简化对分库分表之后数据的操作
  • 数据分片
  • 读写分离

ShardingSphere-JDBC读写分离

sharding-jdbc会通过sql语句进行语义分析,如果是insert、update、delete语句会路由到master库进行操作,如果是select语句会路由到slave库进行操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
spring:
application:
name: sharding-jdbc-test
shardingsphere:
datasource:
names: m0,s0 # 配置数据源,给数据源起名
m0: # 主库,配置数据源具体内容,连接池、驱动、地址、用户名、密码
type: com.alibaba.druid.pool.DruidDataSource
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://localhost:3306/mybatis?useSSL=false
username: root
password: 123456
s0: # 从库,配置数据源具体内容,连接池、驱动、地址、用户名、密码
type: com.alibaba.druid.pool.DruidDataSource
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://localhost:3306/mybatis1?useSSL=false
username: root
password: 123456
# sharding: # 方式一
# master-slave-rules:
# m0:
# master-data-source-name: m0 #配置m0数据库的主库
# slave-data-source-names: s0 #配置m0数据库的从库
masterslave: # 方式二
name: ms
master-data-source-name: m0
slave-data-source-names: s0
load-balance-algorithm-type: round_robin
props:
sql:
show: true # 输出日志

mybatis初始化

mybatis的初始化实际上是解析xml配置文件构建SqlSessionFactory的过程

1
2
InputStream is = Thread.currentThread().getContextClassLoader().getResourceAsStream("mybatis-config.xml");
return new SqlSessionFactoryBuilder().build(is);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public SqlSessionFactory build(InputStream inputStream, String environment, Properties properties) {
try {
// 生成XPathParser对象,构建dom树,用于解析配置文件
XMLConfigBuilder parser = new XMLConfigBuilder(inputStream, environment, properties);
// parser.parse()将配置文件信息解析构建Configuration对象
return build(parser.parse());
} catch (Exception e) {
throw ExceptionFactory.wrapException("Error building SqlSession.", e);
} finally {
ErrorContext.instance().reset();
try {
inputStream.close();
} catch (IOException e) {
// Intentionally ignore. Prefer previous error.
}
}
}

parser.parse()将配置文件信息解析构建Configuration对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public Configuration parse() {
if (parsed) {
throw new BuilderException("Each XMLConfigBuilder can only be used once.");
}
parsed = true;
//从XPathParser中取出 <configuration>节点对应的Node对象,然后解析此Node节点的子Node
parseConfiguration(parser.evalNode("/configuration"));
return configuration;
}


private void parseConfiguration(XNode root) {
try {
// issue #117 read properties first
// 解析properties节点
propertiesElement(root.evalNode("properties"));
// 解析settings节点
Properties settings = settingsAsProperties(root.evalNode("settings"));
loadCustomVfs(settings);
loadCustomLogImpl(settings);
// 解析typeAliases节点
typeAliasesElement(root.evalNode("typeAliases"));
// 解析plugins节点
pluginElement(root.evalNode("plugins"));
// 解析objectFactory节点
objectFactoryElement(root.evalNode("objectFactory"));
// 解析objectWrapperFactory节点
objectWrapperFactoryElement(root.evalNode("objectWrapperFactory"));
// 解析reflectorFactory节点
reflectorFactoryElement(root.evalNode("reflectorFactory"));
settingsElement(settings);
// read it after objectFactory and objectWrapperFactory issue #631
// 解析typeAliases节点
environmentsElement(root.evalNode("environments"));
// 解析environments节点
databaseIdProviderElement(root.evalNode("databaseIdProvider"));
// 解析typeHandlers节点
typeHandlerElement(root.evalNode("typeHandlers"));
// 解析mappers节点
mapperElement(root.evalNode("mappers"));
} catch (Exception e) {
throw new BuilderException("Error parsing SQL Mapper Configuration. Cause: " + e, e);
}
}

解析mapper.xml

1
2
// 解析mappers节点
mapperElement(root.evalNode("mappers"));

通过解析mappers来获取mapper.xml,进而进行解析mapper.xml

1
2
3
4
5
6
7
8
9
10
11
12
public void parse() {
if (!configuration.isResourceLoaded(resource)) {
// 解析mapper
configurationElement(parser.evalNode("/mapper"));
configuration.addLoadedResource(resource);
bindMapperForNamespace();
}

parsePendingResultMaps();
parsePendingCacheRefs();
parsePendingStatements();
}

解析mapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private void configurationElement(XNode context) {
try {
// namespace属性
String namespace = context.getStringAttribute("namespace");
if (namespace == null || namespace.isEmpty()) {
throw new BuilderException("Mapper's namespace cannot be empty");
}
builderAssistant.setCurrentNamespace(namespace);
// cache-ref节点
cacheRefElement(context.evalNode("cache-ref"));
// cache节点
cacheElement(context.evalNode("cache"));
// parameterMap节点
parameterMapElement(context.evalNodes("/mapper/parameterMap"));
// resultMap节点
resultMapElements(context.evalNodes("/mapper/resultMap"));
// sql节点
sqlElement(context.evalNodes("/mapper/sql"));
// select|insert|update|delete节点
buildStatementFromContext(context.evalNodes("select|insert|update|delete"));
} catch (Exception e) {
throw new BuilderException("Error parsing Mapper XML. The XML location is '" + resource + "'. Cause: " + e, e);
}
}
解析resultMap节点
1
2
 // resultMap节点
resultMapElements(context.evalNodes("/mapper/resultMap"));

将resultMap节点解析为ResultMap对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
private ResultMap resultMapElement(XNode resultMapNode, List<ResultMapping> additionalResultMappings, Class<?> enclosingType) {
ErrorContext.instance().activity("processing " + resultMapNode.getValueBasedIdentifier());
// 获取type属性,结果集所对应的类型名称
String type = resultMapNode.getStringAttribute("type",
resultMapNode.getStringAttribute("ofType",
resultMapNode.getStringAttribute("resultType",
resultMapNode.getStringAttribute("javaType"))));
// 通过反射获取所对应的类型
Class<?> typeClass = resolveClass(type);
if (typeClass == null) {
typeClass = inheritEnclosingType(resultMapNode, enclosingType);
}
Discriminator discriminator = null;
// 记录解析结果
List<ResultMapping> resultMappings = new ArrayList<>(additionalResultMappings);
// 处理子节点
List<XNode> resultChildren = resultMapNode.getChildren();
for (XNode resultChild : resultChildren) {
// constructor节点
if ("constructor".equals(resultChild.getName())) {
// 解析构造器元素
processConstructorElement(resultChild, typeClass, resultMappings);
} else if ("discriminator".equals(resultChild.getName())) {
discriminator = processDiscriminatorElement(resultChild, typeClass, resultMappings);
} else {
List<ResultFlag> flags = new ArrayList<>();
// 主键标识
if ("id".equals(resultChild.getName())) {
flags.add(ResultFlag.ID);
}
// 创建resultMapping对象,并加入到resultMappings中
resultMappings.add(buildResultMappingFromContext(resultChild, typeClass, flags));
}
}
// 获取resultMap的id属性,如果没有的话使用resultMapNode.getValueBasedIdentifier()设置默认值,先取id属性,id属性没有取value属性,value属性没有取property属性
String id = resultMapNode.getStringAttribute("id",
resultMapNode.getValueBasedIdentifier());
// 获取resultMap的extends属性,表示结果集的继承
String extend = resultMapNode.getStringAttribute("extends");
// 自动映射属性,将列名自动映射为属性
Boolean autoMapping = resultMapNode.getBooleanAttribute("autoMapping");
// 创建ResultMapResolver对象,用于解析resultMappings生成ResultMap对象
ResultMapResolver resultMapResolver = new ResultMapResolver(builderAssistant, id, typeClass, extend, discriminator, resultMappings, autoMapping);
try {
return resultMapResolver.resolve();
} catch (IncompleteElementException e) {
// 其内包含有还没有解析到的resultMap节点
// 如果无法创建resultMap对象,则将结果添加到incompleteResultMaps集合中,表示未完成的结果集
configuration.addIncompleteResultMap(resultMapResolver);
throw e;
}
}

Seata使用

使用seata进行分布式事务非常的简单,只是依赖于一个seata server服务,首先将服务启动,然后对微服务进行配置

依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<!-- seata 依赖-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<exclusions>
<exclusion>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- 排除掉原本的依赖,选择和使用的seata server版本相同的 -->
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>1.1.0</version>
</dependency>
阅读全文 »

Seata Server配置

修改conf中的file.conf文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140

transport {
# tcp udt unix-domain-socket
type = "TCP"
#NIO NATIVE
server = "NIO"
#enable heartbeat
heartbeat = true
# the client batch send request enable
enableClientBatchSendRequest = false
#thread factory for netty
threadFactory {
bossThreadPrefix = "NettyBoss"
workerThreadPrefix = "NettyServerNIOWorker"
serverExecutorThreadPrefix = "NettyServerBizHandler"
shareBossWorker = false
clientSelectorThreadPrefix = "NettyClientSelector"
clientSelectorThreadSize = 1
clientWorkerThreadPrefix = "NettyClientWorkerThread"
# netty boss thread size,will not be used for UDT
bossThreadSize = 1
#auto default pin or 8
workerThreadSize = "default"
}
shutdown {
# when destroy server, wait seconds
wait = 3
}
serialization = "seata"
compressor = "none"
}
# service configuration, only used in client side
service {
#transaction service group mapping
vgroupMapping.my_test_tx_group = "default"
#only support when registry.type=file, please don't set multiple addresses
default.grouplist = "127.0.0.1:8091"
#degrade, current not support
enableDegrade = false
#disable seata
disableGlobalTransaction = false
}
#client transaction configuration, only used in client side
client {
rm {
asyncCommitBufferLimit = 10000
lock {
retryInterval = 10
retryTimes = 30
retryPolicyBranchRollbackOnConflict = true
}
reportRetryCount = 5
tableMetaCheckEnable = false
reportSuccessEnable = false
sqlParserType = druid
}
tm {
commitRetryCount = 5
rollbackRetryCount = 5
}
undo {
dataValidation = true
logSerialization = "jackson"
logTable = "undo_log"
}
log {
exceptionRate = 100
}
}

## transaction log store, only used in server side
store {
## store mode: file、db
## 存储模式,默认使用的是file,最好改为存储到数据库db中
mode = "file"
## file store property
file {
## store location dir
dir = "sessionStore"
# branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
maxBranchSessionSize = 16384
# globe session size , if exceeded throws exceptions
maxGlobalSessionSize = 512
# file buffer size , if exceeded allocate new buffer
fileWriteBufferCacheSize = 16384
# when recover batch read size
sessionReloadReadSize = 100
# async, sync
flushDiskMode = async
}

## database store property
db {
## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc.
datasource = "dbcp"
## mysql/oracle/h2/oceanbase etc.
dbType = "mysql"
driverClassName = "com.mysql.jdbc.Driver"
url = "jdbc:mysql://127.0.0.1:3306/seata"
user = "mysql"
password = "mysql"
minConn = 1
maxConn = 10
globalTable = "global_table"
branchTable = "branch_table"
lockTable = "lock_table"
queryLimit = 100
}
}
## server configuration, only used in server side
server {
recovery {
#schedule committing retry period in milliseconds
committingRetryPeriod = 1000
#schedule asyn committing retry period in milliseconds
asynCommittingRetryPeriod = 1000
#schedule rollbacking retry period in milliseconds
rollbackingRetryPeriod = 1000
#schedule timeout retry period in milliseconds
timeoutRetryPeriod = 1000
}
undo {
logSaveDays = 7
#schedule delete expired undo_log in milliseconds
logDeletePeriod = 86400000
}
#unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanent
maxCommitRetryTimeout = "-1"
maxRollbackRetryTimeout = "-1"
rollbackRetryTimeoutUnlockEnable = false
}

## metrics configuration, only used in server side
metrics {
enabled = false
registryType = "compact"
# multi exporters use comma divided
exporterList = "prometheus"
exporterPrometheusPort = 9898
}

registry.conf配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
# 默认是file
type = "file"

nacos {
serverAddr = "localhost"
namespace = ""
cluster = "default"
}
eureka {
serviceUrl = "http://localhost:8761/eureka"
application = "default"
weight = "1"
}
redis {
serverAddr = "localhost:6379"
db = "0"
}
zk {
cluster = "default"
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
}
consul {
cluster = "default"
serverAddr = "127.0.0.1:8500"
}
etcd3 {
cluster = "default"
serverAddr = "http://localhost:2379"
}
sofa {
serverAddr = "127.0.0.1:9603"
application = "default"
region = "DEFAULT_ZONE"
datacenter = "DefaultDataCenter"
cluster = "default"
group = "SEATA_GROUP"
addressWaitTime = "3000"
}
file {
name = "file.conf"
}
}

config {
# file、nacos 、apollo、zk、consul、etcd3
type = "file"

nacos {
serverAddr = "localhost"
namespace = ""
group = "SEATA_GROUP"
}
consul {
serverAddr = "127.0.0.1:8500"
}
apollo {
app.id = "seata-server"
apollo.meta = "http://192.168.1.204:8801"
namespace = "application"
}
zk {
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
}
etcd3 {
serverAddr = "http://localhost:2379"
}
file {
name = "file.conf"
}
}

启动seata server

1
2
# 在seata的bin目录下
sh seata-server.sh

Seata简介

Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务

组成

Seata是由一ID+三组件组成的

Transaction ID

全局唯一的事务ID

TC (Transaction Coordinator) - 事务协调者

维护全局和分支事务的状态,驱动全局事务提交或回滚。

seata server就是TC

TM (Transaction Manager) - 事务管理器

定义全局事务的范围:开始全局事务、提交或回滚全局事务。

事务的发起方,使用注解@GlobalTransactional

RM (Resource Manager) - 资源管理器

管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。

事务的参与者

阅读全文 »

Sentinel规则持久化

在Sentinel中配置了规则之后,如果微服务重启之后,Sentinel中的规则就会消失,每次重启之后还需要重新进行配置

那么Sentinel可否进行持久化呢?答案当然是可以了,下面就测试一下将规则持久化到nacos中

依赖

1
2
3
4
5
<!-- sentinel持久化到nacos -->
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-datasource-nacos</artifactId>
</dependency>
阅读全文 »

Sentinel热点

热点即经常访问的数据。很多时候我们希望统计某个热点数据中访问频次最高的 Top K 数据,并对其访问进行限制,热点参数限流会统计传入参数中的热点参数,并根据配置的限流阈值与模式,对包含热点参数的资源调用进行限流。热点参数限流可以看做是一种特殊的流量控制,仅对包含热点参数的资源调用生效,Sentinel 利用 LRU 策略统计最近最常访问的热点参数,结合令牌桶算法来进行参数级别的流控。

热点规则
阅读全文 »

Sentinel系统规则

Sentinel 系统规则是从整体维度对应用入口流量进行控制,结合应用的 Load、总体平均 RT、入口 QPS 和线程数等几个维度的监控指标,让系统的入口流量和系统的负载达到一个平衡,让系统尽可能跑在最大吞吐量的同时保证系统整体的稳定性

系统规则支持以下的阈值类型:

  • Load(仅对 Linux/Unix-like 机器生效):当系统 load1 超过阈值,且系统当前的并发线程数超过系统容量时才会触发系统保护。系统容量由系统的 maxQps * minRt 计算得出。设定参考值一般是 CPU cores * 2.5

  • CPU usage(1.5.0+ 版本):当系统 CPU 使用率超过阈值即触发系统保护(取值范围 0.0-1.0)。

  • RT:当单台机器上所有入口流量的平均 RT 达到阈值即触发系统保护,单位是毫秒。

  • 线程数:当单台机器上所有入口流量的并发线程数达到阈值即触发系统保护。

  • 入口 QPS:当单台机器上所有入口流量的 QPS 达到阈值即触发系统保护

阅读全文 »