C/C++编程:连接池服务管理类connect_manager用来管理 connectpool
版本1:每个连接管理类都必须创建一个连接池对象
这时可以通过指定子类必须实现 create_pool
,用来创建子类所要管理的连接池对象
#ifndef OCEANSTAR_HTTP_CONNECT_MANAGER_H
#define OCEANSTAR_HTTP_CONNECT_MANAGER_H
#include <noncopyable.h>
#include "connect_pool.h"
namespace oceanstar{
/**
* connect pool 服务管理器,有获取连接池等功能
*/
class connect_manager : public noncopyable{
protected:
/**
* 纯虚函数,子类必须实现此函数用来创建连接池对象
* @param addr {const char*} 服务器监听地址,格式:ip:port
* @param count {size_t} 连接池的大小限制,为 0 时,则连接池没有限制
* @return {connect_pool*} 返回创建的连接池对象
*/
virtual connect_pool* create_pool(const char* addr, size_t count) = 0;
};
}
#endif //OCEANSTAR_HTTP_CONNECT_MANAGER_H
class connect_manager : public oceanstar::connect_manager
{
public:
connect_manager(){
}
virtual ~connect_manager(){
}
protected:
// 基类纯虚函数的实现
oceanstar::connect_pool* create_pool(const char* addr,size_t count){
return new connect_pool(addr, count);
}
};
功能:根据服务端地址获得或者创建该服务器的连接池
/**
* 根据服务端地址获得该服务器的连接池
* @param addr {const char*} redis 服务器地址(ip:port)
* @param exclusive {bool} 是否需要互斥访问连接池数组,当需要动态
* 管理连接池集群时,该值应为 true
* @param restore {bool} 当该服务结点被置为不可用时,该参数决定是否
* 自动将之恢复为可用状态
* @return {connect_pool*} 返回空表示没有此服务
*/
connect_pool* connect_manager::get(const char* addr, bool exclusive /* = true */, bool restore /* = false */){
std::string key;
get_key(addr, key);
unsigned long id = get_id();
if (exclusive) {
lock_.lock();
}
conns_pools& pools = get_pools_by_id(id); // 去manager_中找,如果conns_pools已经创建就直接返回,如果没有创建就新创建一个(connect_manager中可以管理多个conns_pools)
// 如果当前conns_pools中某个connect_pool的addr与当前addr相同,就返回
pools_t::iterator it = pools.pools.begin();
for (; it != pools.pools.end(); ++it) {
if (key == (*it)->get_key()) {
if (restore && (*it)->aliving() == false) {
(*it)->set_alive(true);
}
if (exclusive) {
lock_.unlock();
}
return *it;
}
}
std::map<string, conn_config>::const_iterator cit = addrs_.find(key);
if (cit == addrs_.end()) {
if (exclusive) {
lock_.unlock();
}
logger_error( "no connect pool for addr %s", addr);
return NULL;
}
conn_config config = cit->second;
connect_pool* pool = create_pool(config); // 根据当前addr的配置来创建一个连接池
pools.pools.push_back(pool); //将当前连接池压入被管理的数组中
if (exclusive) {
lock_.unlock();
}
return pool;
}
功能:初始化服务器集群(所有服务器的连接池)
/**
* 添加服务器的客户端连接池,该函数可以在程序运行时被调用,内部自动加锁
* @param addr {const char*} 服务器地址,格式:ip:port
* 注意:调用本函数时每次仅能添加一个服务器地址,可以循环调用本方法
* @param count {size_t} 连接池数量限制, 如果该值设为 0,则不设置
* 连接池的连接上限
* @param conn_timeout {int} 网络连接时间(秒)
* @param rw_timeout {int} 网络 IO 超时时间(秒)
*/
void connect_manager::set(const char *addr, size_t count, int conn_timeout /* = 30 */, int rw_timeout /* = 30 */) {
std::string buf(addr);
boost::to_lower(buf);
thread_mutex_guard guard(lock_);
std::map<std::string, conn_config>::iterator it = addrs_.find(buf);
if (it == addrs_.end()) {
conn_config config;
config.addr = addr;
config.count = count;
config.conn_timeout = conn_timeout;
config.rw_timeout = rw_timeout;
addrs_[buf] = config;
} else {
it->second.count = count;
it->second.conn_timeout = conn_timeout;
it->second.rw_timeout = rw_timeout;
}
}
// 分析一个服务器地址,格式:IP:PORT[:MAX_CONN]
// 返回值 < 0 表示非法的地址
static int check_addr(const char* addr, std::string& buf, size_t default_count){
// 数据格式:IP:PORT[:CONNECT_COUNT]
ACL_ARGV* tokens = acl_argv_split(addr, ":");
if (tokens->argc < 2) {
logger_error("invalid addr: %s", addr);
acl_argv_free(tokens);
return -1;
}
int port = atoi(tokens->argv[1].c_str());
if (port <= 0 || port >= 65535) {
logger_error("invalid addr: %s, port: %d", addr, port);
acl_argv_free(tokens);
return -1;
}
buf = format("%s:%d", tokens->argv[0].c_str(), port);
int conn_max;
if (tokens->argc >= 3) {
conn_max = atoi(tokens->argv[2].c_str());
} else {
conn_max = (int) default_count;
}
if (conn_max < 0) {
conn_max = (int) default_count;
}
acl_argv_free(tokens);
return conn_max;
}
// 设置除缺省服务之外的服务器集群
void connect_manager::set_service_list(const char* addr_list, int count, int conn_timeout, int rw_timeout){
if (addr_list == NULL || *addr_list == 0) {
logger_warn("addr_list null");
return;
}
// 创建连接池服务集群
char *buf = strdup(addr_list);
char* addrs = acl_mystr_trim(buf);
ACL_ARGV* tokens = acl_argv_split(addrs, ";");
std::string addr;
for(const auto& iter : tokens->argv){
const char* ptr = iter.c_str();
int max = check_addr(ptr, addr, count); // addr = [ptr列表中的第一个 (ip:port)]
if (max < 0) {
logger_error("invalid server addr: %s", addr.c_str());
continue;
}
set(addr.c_str(), max, conn_timeout, rw_timeout); // 设置addrs
logger_info("add one service: %s, max connect: %d", addr.c_str(), max);
}
acl_argv_free(tokens);
free(buf);
}
/**
* 初始化所有服务器的连接池,该函数调用 set 过程添加每个服务的连接池
* @param default_addr {const char*} 缺省的服务器地址,如果非空,
* 则在查询时优先使用此服务器
* @param addr_list {const char*} 所有服务器列表,可以为空
* 格式: IP:PORT:COUNT;IP:PORT:COUNT;IP:PORT;IP:PORT ...
* 或 IP:PORT:COUNT,IP:PORT:COUNT,IP:PORT;IP:PORT ...
* 如:127.0.0.1:7777:50;192.168.1.1:7777:10;127.0.0.1:7778
* @param count {size_t} 当 addr_list 中分隔的某个服务没有
* COUNT 信息时便用此值,当此值为 0 时,则不限制连接数上限
* @param conn_timeout {int} 网络连接时间(秒)
* @param rw_timeout {int} 网络 IO 超时时间(秒)
* 注:default_addr 和 addr_list 不能同时为空
*/
void connect_manager::init(const char* default_addr, const char* addr_list,
size_t count, int conn_timeout /* = 30 */, int rw_timeout /* = 30 */ ){
if (addr_list != NULL && *addr_list != 0) {
set_service_list(addr_list, (int) count, conn_timeout, rw_timeout); // 设置 addrs_: 如果在addrs找不到addr就插入并配置配置,如果找得到就更新配置
}
// 创建缺省服务连接池对象,该对象一同放入总的连接池集群中
if (default_addr != NULL && *default_addr != 0) {
logger_info("default_pool: %s", default_addr);
int max = check_addr(default_addr, default_addr_, count); // 设置default_addr_ : 设置default_addr_ = [default_addr列表中的第一个 (ip:port)]
if (max < 0) {
logger_info("no default connection set");
} else {
set(default_addr_.c_str(), max, conn_timeout, rw_timeout); //设置default_addr_的配置
default_pool_ = get(default_addr_.c_str()); //将当前default_addr_绑定到一个连接池上
}
} else {
logger_info("no default connection set");
}
}
还没有评论,来说两句吧...