随着复赛今天截止,为期两个月的挑战赛也终于结束了.这两个月里很大一部分时间花在这上面,有过欢乐,也有为分数刷不上去而发愁.作为第一次参加比赛,对比赛结果还算是满意吧.而在这个过程中,对多线程知识,netty,nio等知识的深入认识.
下面是对比赛的总结和思考.排名如下
初赛:《Service Mesh Agent for Apache Dubbo (Incubating) 》
- 赛题的思考
题目看起来是让我们实现一个rpc agent.因为官方已经给出了consumer和provider,选手就是要实现两个代理,第一个代理是consumer-agent,负责把consumer的调用通过自定义协议发动给Provider-agent.第二个代理就是provider-agent.他的任务就是接收Consumer-agent通过网络发动过来的消息,然后通过dubbo调用provider.最后把结果返回给consumer-agent.整个系统的调用图如下:
- 设计和实现.
整个调用过程如下所示:
- ①处这里采用Netty Http应用作为服务端,处理Consumer发送过来的http请求.
- ②处这里就是在Consumer-agent开启Netty Client,Provider-agent端开启Netty Server进行请求和响应.
- ③ Provider-agent通Netty Client去调用Provider的服务.
- ④ Provider把结果返回给Consumer-agent.
- ⑤ Consumer-agent把结果封装成HttpResponse返回给客户端.
Provider提供的服务如下:
整个代码我放在github中,这里不对整个代码做分析,只分析出关键的点.
负载均衡
如下图,3个provider的负载能力如下,那么我们可以选择负载均衡算法的时候,把这个考虑进去.我选择是随机加权算法.根据大家的一致认同,small:meddium:large = 1:2:2.
所有的服务都运行在docker环境中,而用的etcd作为服务发现的组件.事先并不知道那台机器是small,large,meddium.那么我们可以考虑把参数加上启动参数.一旦服务启动,这些信息,都会注册到etcd中.然后取出来,做相应的判断就行.
在etcd做服务发现的时候,把型号信息转换成比例注册上去
Consumer在选择那个Provider的时候就可以根据以上的信息,轮询选择一个.
这样一个随机加权的算法就实现了.
EventLoop复用
当我们创建Provident-agent的时候,我们是否可以考虑Eventloop的复用,这样每个请求从接收到发动都是用同一个线程处理的,没有上下文切换.另外一个,这样做好处,把channel和Eventloop绑定起来,也就限定了channel的个数,相当于做了一个channel的缓存(因为channel的数量得控制).一举两得.
回调的设计
当Provider返回给结果后,那我们应该如何把结果返回给Consumer-agent呢,也就是它如何记住之前的通道.这里采用的是一个回调的设计.这样就能够记住上下文,也就是记住过来时候的
当结果返回后,通过回调调用回调函数的逻辑
Netty 提供了一个方便的解码工具类 ByteToMessageDecoder ,如图上半部分所示,这个类具备 accumulate 批量解包能力,可以尽可能的从 socket 里读取字节,然后同步调用 decode 方法,解码出业务对象,并组成一个 List 。最后再循环遍历该 List ,依次提交到 ChannelPipeline 进行处理。此处我们做了一个细小的改动,如图下半部分所示,即将提交的内容从单个 command ,改为整个 List 一起提交,如此能减少 pipeline 的执行次数,同时提升吞吐量。这个模式在低并发场景,并没有什么优势,而在高并发场景下对提升吞吐量有不小的性能提升。
负载均衡
上面我的做法有点硬编码的意思,而且随机的话,而且不确定性有点大.那是是否可以考虑根据调用的次数来做负载均衡,也就是说,给句每个Provider请求的次数,尽量把请求分给请求量少的Provider,当然这个量还是得加权.实现的复杂性有点高.
限流
经过朋友提醒,是否可以尝试下,限流,也就是说不放那么多请求进取,只通过一部分来请求,待完成之后,再放另外一部分,这个可以尝试用令牌桶来实现.处于理论阶段,没实际尝试过.
编码
我做的处理里面都是采用的jdk自带的编码方式.如果采用kryo,protobuf的方式,性能上也会有一定的提升.
我的代码:https://github.com/maskwang520/springforall.git
复赛:实现一个进程内的队列引擎,单机可支持100万队列以上,能够承受2亿消息的存取.
- 赛题的思考
题目要求有5个:
1.各个阶段线程数在20~30左右
2.发送阶段:消息大小在50字节左右,消息条数在20亿条左右,也即发送总数据在100G左右
3.索引校验阶段:会对所有队列的索引进行随机校验;平均每个队列会校验1~2次;
4.顺序消费阶段:挑选20%的队列进行全部读取和校验;
5.发送阶段最大耗时不能超过1800s;索引校验阶段和顺序消费阶段加在一起,最大耗时也不能超过1800s;超时会被判断为评测失败。
100万个queue,20亿消息,如果放内存是完全不现实的,内存肯定会爆.接下来自然想到把消息存放到文件中,内存中只放索引就行.但是内存存放索引,是20亿消息的消息,索引自然是由(消息起始位置+长度)构成.但是这样的Map
Block的设计
|
|
因为一个queue中可能有多个Block,在消息检索的时候给出的是在队列中的偏移量,那么size这个域方便后面消息检索的时候判断在哪个block中.
消息缓存的设计
因为每当来一个消息都要flush到文件中去,这样Io的时间就太多了,题目的关键点在于如何减少Io的时间.所以可以采用消息的缓存来处理.每当来一个消息,就放入缓存中,当缓存中超过10次消息的时候,就同步写入到文件中去.这样的话,相当于每10次写,才做一次Io.12345public class DataCache {//消息缓存public ByteBuffer dataBuffer = ByteBuffer.allocate(1024);public int count;}
这里将缓存的大小设置为1024Byte,当然你也可以设置成更大.这里有个小Tips.缓存的消息最好设置成Block的大小.这样当缓存满了之后,就可以直接写入到一个Block块中,而不用接着上一个Block写(上面一个Block写),这样设计,写入更简单,每次flush到文件的时候,只要新开辟一个新的Block,而不用管之前的Block.
消息的存储
因为不可能每个队列的消息都用一个文件来存放,所以这里用hash来把文件限定在32个.一个queue的Block必须在一个文件里面.不同queue的Block可以在一个文件里面.
还存在一个问题就是,往一个文件中写入消息的时候,什么位置写,因为按块写.所以已经写过的块不能用.只能从新开辟一个块,块与块之间尽可能紧凑.
消息存放
这里采用的是原生的
消息获取
消息获取的思路是根据队列名,找到该队列对应的List
思考
- 将所有的ByteBuf池化,包括缓存的那部分ByteBuf.通过ThreadLocal,将ByteBuf与线程绑定起来,后面申请Buffer,直接从对应的线程里面去申请即可.
- 在写入的时候,可以不同步写,实现异步写.由一个线程去异步flush到文件里面
- 当读取消息块达到临界点的时候,由单线程申请buffer资源来预读后面的消息块存入,并缓存.
我的代码:https://github.com/maskwang520/messagequeue.git