0%

netty缓冲区ByteBuf

Netty提供了一个缓冲类ByteBuf来操作字节进行数据传输,ByteBuf可以看做是一个数据容器,其提供了两个索引,一个用来读,一个用来写

NIO中ByteBuffer的缺点

其实在java原生NIO中是有缓冲区ByteBuffer的,为什么netty还要再写一套呢?是因为原生的ByteBuffer有很多缺点

  • ByteBuffer长度固定,一旦创建完成,容量就不能改变,不支持动态扩展和收缩。如果我们编码的对象大于其容量时,会出现索引越界异常
  • ByteBuffer只有一个标识位置的指针position,读写时需要手动调用flip()和rewind()来进行切换,使用不方便

netty如何解决?

  • 长度固定问题: netty实现动态扩展,如果剩余空间不足,会进行扩容,直到指定的最大容量maxCapacity(小于4M前,从64 byte开始每次扩一倍,大于4M后,每次扩4M)
  • 读写切换问题:netty提供了两个位置指针分别操作读写,读操作使用readerIndex,写操作使用writeIndex

随机访问索引

就像普通的字节数组一样,ByteBuf使用zero-based indexing,这意味着第一个字节的索引总是0,最后一个字节的索引是capacity - 1,例如,要迭代缓冲区中的所有字节,可以使用如下方式

1
2
3
4
for (int i = 0; i < buffer.capacity(); i ++) {
byte b = buffer.getByte(i);
System.out.println((char) b);
}

顺序访问索引

1
2
3
4
5
* {@link ByteBuf} provides two pointer variables to support sequential
* read and write operations - {@link #readerIndex() readerIndex} for a read
* operation and {@link #writerIndex() writerIndex} for a write operation
* respectively. The following diagram shows how a buffer is segmented into
* three areas by the two pointers:

ByteBuf提供两个指针变量来支持顺序读写操作,readerIndex作为读指针,writerIndex作为写指针。0到readerIndex之间为已经度过的缓冲区,可以调用discardReadBytes来重用这部分空间,节约内存;readerIndex到writerIndex之间的空间为可读的字节缓冲区;writerIndex到capacity之间为可写的字节缓冲区

1
2
3
4
5
6
*      +-------------------+------------------+------------------+
* | discardable bytes | readable bytes | writable bytes |
* | | (CONTENT) | |
* +-------------------+------------------+------------------+
* | | | |
* 0 <= readerIndex <= writerIndex <= capacity

这个就是为什么ByteBuf不需要使用flip()方法来切换读和写模式的原因,而JDK中的ByteBuffer是只有一个方法来设置索引的

可读字节readable bytes

可读字节存储的是实际的数据,调用read..()或者skip..()方法会使得readerIndex增加

读取所有字节

1
2
3
while (buffer.isReadable()) {
System.out.println(buffer.readByte());
}

可写字节writable bytes

可写字节是需要被填充的空间

填充随机整数

1
2
3
while (buffer.maxWritableBytes() >= 4) {
buffer.writeInt(random.nextInt());
}

可丢弃字节discardable bytes

可丢弃的字节说明已经被读过了,可以使用discardReadBytes()来回收空间

调用前

1
2
3
4
5
6
7
*  BEFORE discardReadBytes()
*
* +-------------------+------------------+------------------+
* | discardable bytes | readable bytes | writable bytes |
* +-------------------+------------------+------------------+
* | | | |
* 0 <= readerIndex <= writerIndex <= capacity

调用后

1
2
3
4
5
6
7
*  AFTER discardReadBytes()
*
* +------------------+--------------------------------------+
* | readable bytes | writable bytes (got more space) |
* +------------------+--------------------------------------+
* | | |
* readerIndex (0) <= writerIndex (decreased) <= capacity

可以看到调用后由于空间被回收可用空间被增大

清除索引

可以通过调用clear()方法来设置readerIndex和writerIndex为0,该操作不会清除缓存数据,仅仅是清除了两个指针

调用前

1
2
3
4
5
6
7
8
9
*  BEFORE clear()
*
* +-------------------+------------------+------------------+
* | discardable bytes | readable bytes | writable bytes |
* +-------------------+------------------+------------------+
* | | | |
* 0 <= readerIndex <= writerIndex <= capacity
*
*

调用后

1
2
3
4
5
6
7
*  AFTER clear()
*
* +---------------------------------------------------------+
* | writable bytes (got more space) |
* +---------------------------------------------------------+
* | |
* 0 = readerIndex = writerIndex <= capacity

使用模式

netty创建缓冲区可以创建堆缓冲区、堆外缓冲区、复合缓冲区

1
2
3
4
5
6
7
// 堆缓冲区
ByteBuf byteBuf = Unpooled.buffer(8);
// 堆外缓冲区
ByteBuf directBuffer = Unpooled.directBuffer(8);
// 复合缓冲区,一部分是堆缓冲区,一部分是堆外缓冲区
CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
compositeByteBuf.addComponents(byteBuf,directBuffer);

对于后端业务消息的编解码使用堆缓冲区,而在IO通信线程的读写缓冲区使用堆外缓冲区,组合使得性能最优

堆缓冲区

ByteBuf将数据存储在JVM的堆空间,通过将数据存储在数组的实现,优点是可以快速分配,当不使用时可以被JVM自动回收;缺点是如果进行Socket的IO读写,需要额外进行一次内存复制,将堆内存对应的缓冲区复制到内核Channel中,性能会有一定下降

直接缓冲区(堆外缓冲区)

直接缓冲区中的内存不是使用的堆内存,其目的是

  • 通过免去中间交换的内存拷贝,提升IO处理速度
  • 直接缓冲区的内容可以驻留在垃圾回收扫描的堆区外
  • 其大小的限制是通过 -XX:MaxDirectMemorySize来限制的

其缺点是在内存空间的分配和释放会比在堆缓冲区更复杂

复合缓冲区CompositeByteBuf

复合缓冲区可以创建多个不同的ByteBuf,然后提供一个这些ByteBuf的视图,可以动态的添加和删除其中的ByteBuf,由于复合缓冲区是一个视图,所以其hasArray方法总是返回false

分布式锁

分布式锁的实现一般有redis或zookeeper临时顺序节点以及基于数据库行锁来实现

redis分布式锁

利用redis的setnx命令,只有在key不存在的情况下,才能set成功

1
2
3
4
5
6
7
8
9
10
11
12
13
//加锁  返回1,说明key原本不存在,线程得到了锁;返回0说明key已经存在了,获取锁失败
jedis.setnx(key,value)

// 释放锁,将key删掉,这样setnx就可以获得锁了
jedis.del(key)

// 锁超时 当超过该时间,则会自动释放锁
jedis.expire(key, 30)


if (jedis.setnx(lockKey, val) == 1) {
jedis.expire(lockKey, timeout);
}

但是sernx+expire是两个操作,不是原子性的,可能会出现setnx刚执行成功,还没来得及执行expire命令,服务挂掉了,这样就会导致该key没有设置过期时间,该key会一直存在,导致别的服务永远无法获取到锁

阅读全文 »

参数优化

  • net.ipv4.tcp_max_tw_buckets=10000 timewait的数量,默认180000
  • net.ipv4.ip_local_port_range=1024 65000 允许系统打开的端口范围
  • net.ipv4.tcp_tw_recyle=1 启用timewait快速回收
  • net.ipv4.tcp_tw_reuse=1 开启重用,允许将TIME-WAIT sockets重新用于新的TCP连接
  • net.ipv4.tcp_syncookies=1开启SYN Cookies,当出现SYN等待队列溢出时,启用cookies来处理

AOP开启注解

在使用注解@Aspect来进行AOP操作时,需要在xml中进行配置

1
2
<!-- 使@Aspect注解生效 -->
<aop:aspectj-autoproxy/>

创建BeanFactory时obtainFreshBeanFactory()在解析xml加载BeanDefinition中,执行parseBeanDefinitions方法进行解析发现其内有逻辑是

1
delegate.parseCustomElement(ele)

即进行自定义标签的解析,会去META-INF/spring.handlers中寻找对应的handler,该标签的namespace是http://www.springframework.org/schema/aop,去spring.handlers中找到对应的记录

1
NamespaceHandler handler = this.readerContext.getNamespaceHandlerResolver().resolve(namespaceUri);
1
http\://www.springframework.org/schema/aop=org.springframework.aop.config.AopNamespaceHandler

还有一个配置是spring.schemas,以找到对应的xsd文件

1
http\://www.springframework.org/schema/aop/spring-aop-3.1.xsd=org/springframework/aop/config/spring-aop-3.1.xsd

然后执行该handler中的init方法

1
2
NamespaceHandler namespaceHandler = (NamespaceHandler) BeanUtils.instantiateClass(handlerClass);
namespaceHandler.init();

即执行AopNamespaceHandler#init

阅读全文 »

消息重复消费问题

为什么会出现重复消费?

原因一:生产者消息重复发送

可能出现的情况

  • 生产者发送消息给消息中间件,消息中间件收到消息并进行存储,但是此时消息中间件出现了问题,导致生产者没有收到发送成功的返回,导致的消息重试
  • 消息中间件因为负载高响应变慢,成功把消息存储到消息存储后,返回成功超时,导致的消息重试
  • 消息中间件将消息成功写入消息村胡扯,在返回结果时网络出现问题,导致消息重试

解决方案

  • 在消息中加入一个唯一id,这样消息重试也不会造成数据重复,在进行处理时需要判断id

原因二:消息中间件投递时重复

  • 消息被投递到消费者时,处理完毕后应用出现问题,没有返回消费成功,此时消息中间件再次投递
  • 消息被投递到消费者进行处理,处理完毕后网络出现问题,导致消息中间件没有收到消息处理结果,再次投递
  • 消息被投递到消费者进行处理,处理时间超时,导致再次投递
  • 消息被投递到消费者进行处理,处理完毕后消息中间件出现问题,没有收到消息处理结果,再次投递
  • 消息被投递到消费者进行处理,处理完毕后消息中间件收到结果,但是消息存储故障,没有更新投递状态,再次投递

解决方案

  • 在消息中加入一个唯一id,这样消息重试也不会造成数据重复,在进行处理时需要判断id