brpc-client 男娘i 2022-10-11 06:30 211阅读 0赞 **ChannelOptions:** connect\_timeout\_ms:连接超时时间; timeout\_ms:rpc超时时间,会起个定时器,通过controler::Failed方法获知; max\_retry:重试次数;(最终会结束RPC,数据应该会丢失) retry\_policy: connection\_type:支持链接方式,单/池/短链接; **channel:** 核心内容: * protocol; * 从集群宕机后恢复时的客户端限流; * 同步访问、异步访问; * 异步发起多个请求配合Join函数; * parallelchannel并发操作:同时访问其包含的sub channel,并合并它们的结果(可以重写CallMapper,可以根据index修改对应sub channel发送的请求,可以重写ResponseMerger,修改返回结果的合并规则); **发送数据:** 1. CallMethod-》controler设置一些参数,创建callid,启动timer(设置backup\_request\_ms,就用这个,否则用这个); 2. IssueRPC-》设置\_current\_call相关数据,包括获取的socket;如果是pool/short,则额外获取sending\_sock; 3. 非阻塞,nodelay socket,设置缓冲区大小,tos/ip优先级,FD\_CLOEXEC,边缘触发; 4. 去全局派发器池,通过fd哈希找到派发器,添加读事件;(全局派发器池中,每个派发器就是epoll bthread,wait; 5. epoll处理事件顺序,遍历读处理,再遍历写处理; 6. 打包请求\_pack\_request,由protocol实现,封装RpcMeta; 7. write,如果没有链接,调用connect 1. 创建EpollOutRequest,创建临时socket,添加写事件; 2. 创建定时器,时间是**connect\_timeout\_ms,HandleEpollOutTimeout,**EpollOutRequest析构则定时器被删除; 3. 事件回调,KeepWriteIfConnected-》CheckConnectedAndKeepWrite-》AfterAppConnected,创建KeepWrite bthread;遍历req指针,直到链表结束,持续向fd写入req数据; 8. 向fd写数据; 9. 去全局派发器池,通过fd哈希找到派发器,添加事件; 10. 没有done的话,Join,初始创建的call id; 11. write逻辑还是比较复杂的,配合KeepWrite bthread,包括错误重试机制; **controler:** \_single\_server\_id,决定是否是链接单个server,如果负载均衡是空的话; 1. **该情况,channel init时候调用SocketMapInsert**,创建全局SocketMap,GlobalSocketCreator 2. 初始化FLAGS\_idle\_timeout\_second,FLAGS\_defer\_close\_second 3. insert 到butil::FlatMap<SocketMapKey, SingleConnection,SocketMapKeyHasher> Map; 4. insert-》InputMessenger提供参数健康检查,回调函数等-》Socket::Create创建socket,通过每个线程维护一个ResourcePool,创建资源,利用\_\_builtin\_expect做条件语句优化; 5. 对\_single\_server\_id进行赋值; \_correlation\_id,分配的call id; **结构图:** ![watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L25hd2VucWlhbmc_size_16_color_FFFFFF_t_70][] **消息顺序:** 如果线程保证了顺序,那么发送数据,返回错误码,会重发,这时候,也是顺序发送; 只有发送后,rpc无响应后: * 要么不发,允许消息丢失 * 要么重发,但brpc不支持幂等,可能重复消费,还可能出现乱序的情况 同时,更重要的是,消费的时候,业务层如何处理: 丢失:A、B发送,A丢失,B如何如理; 乱序:A、B发送,B-》A如何和处理; **负载均衡:** 算法基本就是轮询,权重轮询,随机,延迟低到高,一致性哈希(对于cache访问比较好用); 支持name service策略:bns,file,list,dns 流程: * 通过name service策略获取server,和上一次的数据做差集,对load balancer做增加删除; * 通过load balancer获取server(两种状态,可用、健康检查,健康检查会尝试恢复) 结构图: ![watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L25hd2VucWlhbmc_size_16_color_FFFFFF_t_70 1][] 加入zk nameservice: 库内不做具体实现,通过外部函数指针自己实现; 继承periodic naming service实现相关方法,global.cpp新添加类型并初始化; 麻烦的地方是global初始化全局,并且是额外的线程,注意new方法的实现,naming service thread会额外new,没有使用global初始化的; note: src/butil/config.h文件,会定义BRPC\_WITH\_GLOG。。。 [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L25hd2VucWlhbmc_size_16_color_FFFFFF_t_70]: /images/20221005/477241a46a03406589c1721191b87dfc.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L25hd2VucWlhbmc_size_16_color_FFFFFF_t_70 1]: /images/20221005/3bbe4d8ec44a421e95f2250f602448cc.png
还没有评论,来说两句吧...