写在前面
关于这个数据库连接池,是一个模仿施磊老师课程的小项目。涉及到的技术有:
MySQL数据库编程、单例模式、queue队列容器、C++11多线程编程、线程互斥、线程同步通信和
unique_lock、基于CAS的原子整形、智能指针shared_ptr、lambda表达式、生产者-消费者线程模型
项目背景
- 提高MySQL数据库(基于C/S设计)的访问瓶颈
- 在高并发情况下,大量的TCP三次握手、MySQL Server连接认证、MySQL Server关闭连接回收资源和TCP四次挥手所耗费的性能时间也是很明显的
- 目前有很多基于Java的数据库连接池,例如druid、c3p0以及dbcp连接池
- 因此本项目想实现一个基于C/CPP的数据库连接池,提高访问效率
功能点介绍
一个数据库连接池应该包括:管理的数据库连接实例、连接池的相关参数(释放时间、最大连接数量等)、获取一个连接的接口、超时释放连接接口等等。
在本项目中,有以下参数:
表示连接池事先会和MySQL Server创建initSize个数的connection连接,当应用发起MySQL访问时,不用再创建和MySQL Server新的连接,直接从连接池中获取一个可用的连接就可以,使用完成后,并不去释放connection,而是把当前connection再归还到连接池当中。
当并发访问MySQL Server的请求增多时,初始连接量已经不够使用了,此时会根据新的请求数量去创建更多的连接给应用去使用,但是新创建的连接数量上限是maxSize,不能无限制的创建连接,因为每一个连接都会占用一个socket资源,一般连接池和服务器程序是部署在一台主机上的,如果连接池占用过多的socket资源,那么服务器就不能接收太多的客户端请求了。当这些连接使用完成后,再次归还到连接池当中来维护。

当访问MySQL的并发请求多了以后,连接池里面的连接数量会动态增加,上限是maxSize个,当这些连接用完再次归还到连接池当中。如果在指定的maxIdleTime里面,这些新增加的连接都没有被再次使用过,那么新增加的这些连接资源就要被回收掉,只需要保持初始连接量initSize个连接就可以了。
- 连接超时时间(connectionTimeout):
当MySQL的并发请求量过大,连接池中的连接数量已经到达maxSize了,而此时没有空闲的连接可供使用,那么此时应用从连接池获取连接无法成功,它通过阻塞的方式获取连接的时间如果超过connectionTimeout时间,那么获取连接失败,无法访问数据库。
该项目主要实现上述的连接池四大功能,其余连接池更多的扩展功能,可以自行实现。
功能实现设计
ConnectionPool.h/cpp:数据库连接池代码
Connection.h/cpp:数据库操作代码,包含增删改查的实现
设计要点:
- 单例模式:数据库连接池的存在应该是唯一的
- 从ConnectionPool中获取Connection连接
- 空闲的Connection应该被维护在一个线程安全的队列中(可以考虑使用互斥锁保证队列的线程安全)
- 空闲Connection队列如果为空,则需要动态创建连接,但是最大的连接数量不超过maxSize
- 空闲连接超过连接超时时间之后就要对它进行释放,保留initSize个连接(可以使用单独的线程进行轮询)
- 如果Connection队列为空,并且连接的数量达到了上限(连接用完了,也不能再动态创建了),那么需要等待connectionTimeout的时间,如果还是等不到,那么获取连接失败
- 客户端获取Connection应该使用shared_ptr进行管理,可以通过lambda表达式来定制连接释放的功能(防止归还连接时,直接被销毁)
- Connection的生产与消费应该采用生产者消费者线程模型来设计,需要使用线程间通信机制:条件变量、互斥锁来实现
具体实现
Connection.h
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| #pragma once #include <mysql/mysql.h> #include <string> #include <ctime>
class Connection { public: Connection(); ~Connection();
bool connect(const std::string &ip, unsigned short port, const std::string &username, const std::string &password, const std::string &database);
bool update(const std::string &sql);
MYSQL_RES *query(const std::string &sql);
void refreshAliveTime();
clock_t getAliveTime();
private: MYSQL *_conn; clock_t _alivetime; };
|
Connection.cc
调用mysql的接口完成相关数据库操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| #include "Connection.h"
Connection::Connection() { _conn = mysql_init(nullptr); _alivetime = clock(); }
Connection::~Connection() { if (_conn != nullptr) { mysql_close(_conn); _conn = nullptr; } }
bool Connection::connect(const std::string &ip, unsigned short port, const std::string &username, const std::string &password, const std::string &database) {
MYSQL *p = mysql_real_connect(_conn, ip.c_str(), username.c_str(), password.c_str(), database.c_str(), port, nullptr, 0); return p != nullptr; }
bool Connection::update(const std::string &sql) { return mysql_query(_conn, sql.c_str()) == 0; }
MYSQL_RES *Connection::query(const std::string &sql) { if (mysql_query(_conn, sql.c_str())) { return nullptr; } return mysql_store_result(_conn); }
void Connection::refreshAliveTime() { _alivetime = clock(); }
clock_t Connection::getAliveTime() { return clock() - _alivetime; }
|
ConnectionPool.h
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| #pragma once
#include <string> #include <mutex> #include <queue> #include <atomic> #include <memory> #include <condition_variable> #include "Connection.h"
using namespace std;
class ConnectionPool { public: static ConnectionPool *getConnectionPool(); shared_ptr<Connection> getConnection();
private: ConnectionPool();
bool loadConfigFile(); void produceConnectionTask(); void scannerTask();
string _ip; unsigned short _port; string _username; string _password; string _database; int _initSize; int _maxSize; int _maxIdleTime; int _connectionTimeout;
queue<Connection *> _connectionQue; mutex _queueMutex; condition_variable _cv; atomic_int _connectionCnt; };
|
ConnectionPool.cc
这是这个项目的核心部分,需要好好解释,先show一下全部的代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
| #include "ConnectionPool.h" #include <yaml-cpp/yaml.h> #include <iostream> #include <thread> #include <functional>
ConnectionPool *ConnectionPool::getConnectionPool() { static ConnectionPool pool; return &pool; }
bool ConnectionPool::loadConfigFile() {
try {
YAML::Node config = YAML::LoadFile("config.yaml");
if (config["ip"]) { _ip = config["ip"].as<std::string>(); } if (config["port"]) { _port = config["port"].as<unsigned short>(); } if (config["username"]) { _username = config["username"].as<std::string>(); } if (config["password"]) { _password = config["password"].as<std::string>(); } if (config["database"]) { _database = config["database"].as<std::string>(); } if (config["initSize"]) { _initSize = config["initSize"].as<int>(); } if (config["maxSize"]) { _maxSize = config["maxSize"].as<int>(); } if (config["maxIdleTime"]) { _maxIdleTime = config["maxIdleTime"].as<int>(); } if (config["connectionTimeout"]) { _connectionTimeout = config["connectionTimeout"].as<int>(); }
return true; } catch (const YAML::Exception &e) { std::cerr << "Error loading config file: " << e.what() << std::endl; return false; } }
ConnectionPool::ConnectionPool() { if (!loadConfigFile()) { exit(1); }
for (int i = 0; i < _initSize; ++i) { Connection *conn = new Connection(); conn->connect(_ip, _port, _username, _password, _database); _connectionQue.push(conn); _connectionCnt++; }
thread producer(std::bind(&ConnectionPool::produceConnectionTask, this)); producer.detach();
thread scanner(std::bind(&ConnectionPool::scannerTask, this)); scanner.detach(); }
void ConnectionPool::produceConnectionTask() { for (;;) { unique_lock<mutex> lock(_queueMutex); while (!_connectionQue.empty()) { _cv.wait(lock); } if (_connectionCnt < _maxSize) { Connection *Conn = new Connection(); Conn->connect(_ip, _port, _username, _password, _database); Conn->refreshAliveTime(); _connectionQue.push(Conn); _connectionCnt++; } _cv.notify_all(); } }
void ConnectionPool::scannerTask() { for (;;) { this_thread::sleep_for(chrono::seconds(_maxIdleTime)); unique_lock<mutex> lock(_queueMutex); while (_connectionCnt > _initSize) { Connection *conn = _connectionQue.front(); if (conn->getAliveTime() > _maxIdleTime * 1000) { _connectionQue.pop(); _connectionCnt--; delete conn; } else { break; } } } }
shared_ptr<Connection> ConnectionPool::getConnection() {
unique_lock<mutex> lock(_queueMutex); while (_connectionQue.empty()) {
if (cv_status::timeout == _cv.wait_for(lock, chrono::milliseconds(_connectionTimeout))) { if (_connectionQue.empty()) { return nullptr; } } }
shared_ptr<Connection> sp(_connectionQue.front(), [&](Connection *pcon) { unique_lock<mutex> lock(_queueMutex); pcon->refreshAliveTime(); _connectionQue.push(pcon); });
_connectionQue.pop(); _cv.notify_all();
return sp; }
|
单例模式的实现
通过类的静态方法实现:getConnectionPool
方法通过静态局部变量 pool
(它们在函数首次被调用时初始化,并在程序结束时销毁。)来确保 ConnectionPool
类只有一个实例,并且该实例在首次调用 getConnectionPool
时创建。并且设置了私有的构造函数,防止在外部进行实例化。
- 在C++11及更高版本中,局部静态变量的初始化是线程安全的。这意味着即使多个线程同时调用
getConnectionPool
方法,也只会有一个线程执行 pool
的初始化,其他线程会等待初始化完成。
1 2 3 4 5 6 7 8
| static ConnectionPool *getConnectionPool();
ConnectionPool *ConnectionPool::getConnectionPool() { static ConnectionPool pool; return &pool; }
|
获取Connection
需要注意的是,其中对于Connection队列的线程安全,使用了互斥锁和条件变量的方式来实现;
而释放连接指针的操作,则通过lambda表达式来定义。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| shared_ptr<Connection> ConnectionPool::getConnection() {
unique_lock<mutex> lock(_queueMutex); while (_connectionQue.empty()) {
if (cv_status::timeout == _cv.wait_for(lock, chrono::milliseconds(_connectionTimeout))) { if (_connectionQue.empty()) { return nullptr; } } } shared_ptr<Connection> sp(_connectionQue.front(), [&](Connection *pcon) { unique_lock<mutex> lock(_queueMutex); pcon->refreshAliveTime(); _connectionQue.push(pcon); });
_connectionQue.pop(); _cv.notify_all();
return sp; }
|
生产新连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| void ConnectionPool::produceConnectionTask() { for (;;) { unique_lock<mutex> lock(_queueMutex); while (!_connectionQue.empty()) { _cv.wait(lock); } if (_connectionCnt < _maxSize) { Connection *Conn = new Connection(); Conn->connect(_ip, _port, _username, _password, _database); Conn->refreshAliveTime(); _connectionQue.push(Conn); _connectionCnt++; } _cv.notify_all(); } }
|
扫描任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| void ConnectionPool::scannerTask() { for (;;) { this_thread::sleep_for(chrono::seconds(_maxIdleTime)); unique_lock<mutex> lock(_queueMutex); while (_connectionCnt > _initSize) { Connection *conn = _connectionQue.front(); if (conn->getAliveTime() > _maxIdleTime * 1000) { _connectionQue.pop(); _connectionCnt--; delete conn; } else { break; } } } }
|
构造函数
创建并启动两个线程(生产者和扫描线程)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| ConnectionPool::ConnectionPool() { if (!loadConfigFile()) { exit(1); }
for (int i = 0; i < _initSize; ++i) { Connection *conn = new Connection(); conn->connect(_ip, _port, _username, _password, _database); _connectionQue.push(conn); _connectionCnt++; }
thread producer(std::bind(&ConnectionPool::produceConnectionTask, this)); producer.detach();
thread scanner(std::bind(&ConnectionPool::scannerTask, this)); scanner.detach(); }
|
CMakeLists
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| cmake_minimum_required(VERSION 3.10)
project(MySQLConnectionPool)
set(CMAKE_CXX_STANDARD 11) set(CMAKE_CXX_STANDARD_REQUIRED True)
include_directories(/usr/include/mysql /usr/include/yaml-cpp)
find_library(MYSQLCLIENT_LIB mysqlclient) find_library(YAMLCPP_LIB yaml-cpp)
set(SOURCES src/main.cpp src/ConnectionPool.cc src/Connection.cc )
add_executable(main ${SOURCES})
target_link_libraries(main ${MYSQLCLIENT_LIB} ${YAMLCPP_LIB})
|
功能测试
压力测试