mysql-connection-pool

写在前面

关于这个数据库连接池,是一个模仿施磊老师课程的小项目。涉及到的技术有:

MySQL数据库编程、单例模式、queue队列容器、C++11多线程编程、线程互斥、线程同步通信和

unique_lock、基于CAS的原子整形、智能指针shared_ptr、lambda表达式、生产者-消费者线程模型

项目背景

  • 提高MySQL数据库(基于C/S设计)的访问瓶颈
  • 在高并发情况下,大量的TCP三次握手、MySQL Server连接认证、MySQL Server关闭连接回收资源和TCP四次挥手所耗费的性能时间也是很明显的
  • 目前有很多基于Java的数据库连接池,例如druid、c3p0以及dbcp连接池
  • 因此本项目想实现一个基于C/CPP的数据库连接池,提高访问效率

功能点介绍

一个数据库连接池应该包括:管理的数据库连接实例、连接池的相关参数(释放时间、最大连接数量等)、获取一个连接的接口、超时释放连接接口等等。

在本项目中,有以下参数:

  • 初始连接量(initSize):

表示连接池事先会和MySQL Server创建initSize个数的connection连接,当应用发起MySQL访问时,不用再创建和MySQL Server新的连接,直接从连接池中获取一个可用的连接就可以,使用完成后,并不去释放connection,而是把当前connection再归还到连接池当中。

  • 最大连接量(maxSize):

当并发访问MySQL Server的请求增多时,初始连接量已经不够使用了,此时会根据新的请求数量去创建更多的连接给应用去使用,但是新创建的连接数量上限是maxSize,不能无限制的创建连接,因为每一个连接都会占用一个socket资源,一般连接池和服务器程序是部署在一台主机上的,如果连接池占用过多的socket资源,那么服务器就不能接收太多的客户端请求了。当这些连接使用完成后,再次归还到连接池当中来维护。

  • 最大空闲时间(maxIdleTime):

当访问MySQL的并发请求多了以后,连接池里面的连接数量会动态增加,上限是maxSize个,当这些连接用完再次归还到连接池当中。如果在指定的maxIdleTime里面,这些新增加的连接都没有被再次使用过,那么新增加的这些连接资源就要被回收掉,只需要保持初始连接量initSize个连接就可以了。

  • 连接超时时间(connectionTimeout):

当MySQL的并发请求量过大,连接池中的连接数量已经到达maxSize了,而此时没有空闲的连接可供使用,那么此时应用从连接池获取连接无法成功,它通过阻塞的方式获取连接的时间如果超过connectionTimeout时间,那么获取连接失败,无法访问数据库。

该项目主要实现上述的连接池四大功能,其余连接池更多的扩展功能,可以自行实现。

功能实现设计

ConnectionPool.h/cpp:数据库连接池代码

Connection.h/cpp:数据库操作代码,包含增删改查的实现

设计要点:

  • 单例模式:数据库连接池的存在应该是唯一的
  • 从ConnectionPool中获取Connection连接
  • 空闲的Connection应该被维护在一个线程安全的队列中(可以考虑使用互斥锁保证队列的线程安全)
  • 空闲Connection队列如果为空,则需要动态创建连接,但是最大的连接数量不超过maxSize
  • 空闲连接超过连接超时时间之后就要对它进行释放,保留initSize个连接(可以使用单独的线程进行轮询)
  • 如果Connection队列为空,并且连接的数量达到了上限(连接用完了,也不能再动态创建了),那么需要等待connectionTimeout的时间,如果还是等不到,那么获取连接失败
  • 客户端获取Connection应该使用shared_ptr进行管理,可以通过lambda表达式来定制连接释放的功能(防止归还连接时,直接被销毁)
  • Connection的生产与消费应该采用生产者消费者线程模型来设计,需要使用线程间通信机制:条件变量、互斥锁来实现

具体实现

Connection.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#pragma once
#include <mysql/mysql.h>
#include <string>
#include <ctime>

class Connection
{
public:
Connection();
~Connection();

// 初始化数据库连接
bool connect(const std::string &ip, unsigned short port, const std::string &username, const std::string &password, const std::string &database);

// 更新操作
bool update(const std::string &sql);

// 查询操作
MYSQL_RES *query(const std::string &sql);

// 刷新连接的起始空闲时间点
void refreshAliveTime();

// 返回存活的时间
clock_t getAliveTime();

private:
MYSQL *_conn;// 表示和MySQL Server的一条连接
clock_t _alivetime;// 记录进入空闲状态的起始时间
};

Connection.cc

调用mysql的接口完成相关数据库操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#include "Connection.h"

Connection::Connection()
{
_conn = mysql_init(nullptr);
_alivetime = clock();
}

Connection::~Connection()
{
if (_conn != nullptr)
{
mysql_close(_conn);
_conn = nullptr;
}
}

bool Connection::connect(const std::string &ip, unsigned short port, const std::string &username, const std::string &password, const std::string &database)
{
// 连接 MySQL 数据库
MYSQL *p = mysql_real_connect(_conn, ip.c_str(), username.c_str(), password.c_str(), database.c_str(), port, nullptr, 0);
return p != nullptr;
}

bool Connection::update(const std::string &sql)
{
return mysql_query(_conn, sql.c_str()) == 0;
}

MYSQL_RES *Connection::query(const std::string &sql)
{
if (mysql_query(_conn, sql.c_str()))
{
return nullptr;
}
return mysql_store_result(_conn);
}

// 刷新连接的起始空闲时间点
void Connection::refreshAliveTime()
{
_alivetime = clock();
}

// 返回存活的时间
clock_t Connection::getAliveTime()
{
return clock() - _alivetime;
}

ConnectionPool.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#pragma once

#include <string>
#include <mutex>
#include <queue>
#include <atomic>
#include <memory>
#include <condition_variable>
#include "Connection.h"

using namespace std;

class ConnectionPool
{
public:
// 获取连接池实例(单例模式)
static ConnectionPool *getConnectionPool();
// 提供接口给外部获取连接
shared_ptr<Connection> getConnection();

private:
// 构造函数设为私有,单例模式
ConnectionPool();

// 加载配置文件
bool loadConfigFile();
// 运行在独立的线程中,专门负责生产连接
void produceConnectionTask();
// 定时扫描任务,回收空闲连接
void scannerTask();

string _ip; // 数据库服务器IP地址
unsigned short _port; // 数据库服务器端口号
string _username; // 数据库用户名
string _password; // 数据库密码
string _database; // 数据库名称
int _initSize; // 初始连接数量
int _maxSize; // 连接池最大连接数量
int _maxIdleTime; // 连接的最大空闲时间
int _connectionTimeout; // 获取连接的超时时间

queue<Connection *> _connectionQue; // 存放数据库连接的队列
mutex _queueMutex; // 保护连接队列的互斥锁
condition_variable _cv; // 条件变量,用于连接生产和消费的同步
atomic_int _connectionCnt; // 记录连接数量的原子变量
};

ConnectionPool.cc

这是这个项目的核心部分,需要好好解释,先show一下全部的代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
#include "ConnectionPool.h"
#include <yaml-cpp/yaml.h>
#include <iostream>
#include <thread>
#include <functional>

ConnectionPool *ConnectionPool::getConnectionPool()
{
static ConnectionPool pool;
return &pool;
}

bool ConnectionPool::loadConfigFile()
{

try
{
// 从配置文件读取配置信息
YAML::Node config = YAML::LoadFile("config.yaml");

if (config["ip"])
{
_ip = config["ip"].as<std::string>();
}
if (config["port"])
{
_port = config["port"].as<unsigned short>();
}
if (config["username"])
{
_username = config["username"].as<std::string>();
}
if (config["password"])
{
_password = config["password"].as<std::string>();
}
if (config["database"])
{
_database = config["database"].as<std::string>();
}
if (config["initSize"])
{
_initSize = config["initSize"].as<int>();
}
if (config["maxSize"])
{
_maxSize = config["maxSize"].as<int>();
}
if (config["maxIdleTime"])
{
_maxIdleTime = config["maxIdleTime"].as<int>();
}
if (config["connectionTimeout"])
{
_connectionTimeout = config["connectionTimeout"].as<int>();
}

return true;
}
catch (const YAML::Exception &e)
{
std::cerr << "Error loading config file: " << e.what() << std::endl;
return false;
}
}

ConnectionPool::ConnectionPool()
{
if (!loadConfigFile())
{
exit(1);
}
// 初始化连接池
for (int i = 0; i < _initSize; ++i)
{
Connection *conn = new Connection();
conn->connect(_ip, _port, _username, _password, _database);
_connectionQue.push(conn);
_connectionCnt++;
}

// 启动一个线程,作为生产者
thread producer(std::bind(&ConnectionPool::produceConnectionTask,
this));
producer.detach();

// 启动一个新的定时线程,扫描超过maxIdleTime时间的空闲连接,进行回收
thread scanner(std::bind(&ConnectionPool::scannerTask, this));
scanner.detach();
}

void ConnectionPool::produceConnectionTask()
{
for (;;)
{
unique_lock<mutex> lock(_queueMutex);
while (!_connectionQue.empty())
{
_cv.wait(lock);
}
if (_connectionCnt < _maxSize)
{
Connection *Conn = new Connection();
Conn->connect(_ip, _port, _username, _password, _database);
Conn->refreshAliveTime();
_connectionQue.push(Conn);
_connectionCnt++;
}
_cv.notify_all();
}
}

void ConnectionPool::scannerTask()
{
for (;;)
{
this_thread::sleep_for(chrono::seconds(_maxIdleTime));
unique_lock<mutex> lock(_queueMutex);
while (_connectionCnt > _initSize)
{
Connection *conn = _connectionQue.front();
if (conn->getAliveTime() > _maxIdleTime * 1000)
{
_connectionQue.pop();
_connectionCnt--;
delete conn;
}
else
{
break;
}
}
}
}

shared_ptr<Connection> ConnectionPool::getConnection()
{
// 加锁
unique_lock<mutex> lock(_queueMutex);
while (_connectionQue.empty())
{
// 等待
if (cv_status::timeout == _cv.wait_for(lock, chrono::milliseconds(_connectionTimeout)))
{
if (_connectionQue.empty())
{
return nullptr;
}
}
}

shared_ptr<Connection> sp(_connectionQue.front(), [&](Connection *pcon)
{
unique_lock<mutex> lock(_queueMutex);
pcon->refreshAliveTime();
_connectionQue.push(pcon); });

_connectionQue.pop();
_cv.notify_all();

return sp;
}

单例模式的实现

通过类的静态方法实现:getConnectionPool 方法通过静态局部变量 pool (它们在函数首次被调用时初始化,并在程序结束时销毁。)来确保 ConnectionPool 类只有一个实例,并且该实例在首次调用 getConnectionPool 时创建。并且设置了私有的构造函数,防止在外部进行实例化。

  • 在C++11及更高版本中,局部静态变量的初始化是线程安全的。这意味着即使多个线程同时调用 getConnectionPool 方法,也只会有一个线程执行 pool 的初始化,其他线程会等待初始化完成。
1
2
3
4
5
6
7
8
// 成员变量
static ConnectionPool *getConnectionPool();
// .cc
ConnectionPool *ConnectionPool::getConnectionPool()
{
static ConnectionPool pool;
return &pool;
}

获取Connection

需要注意的是,其中对于Connection队列的线程安全,使用了互斥锁和条件变量的方式来实现;

而释放连接指针的操作,则通过lambda表达式来定义。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
shared_ptr<Connection> ConnectionPool::getConnection()
{
// 加锁
unique_lock<mutex> lock(_queueMutex);
// 当队列为空的时候,等待新连接加入队列
while (_connectionQue.empty())
{
// 等待_connectionTimeout时间,等待其他线程释放_queueMutex上的互斥锁(等待的期间不持有锁)
if (cv_status::timeout == _cv.wait_for(lock, chrono::milliseconds(_connectionTimeout)))
{
// 一直没有等到
if (_connectionQue.empty())
{
return nullptr;
}
}
}
// 创建连接指针,并定义释放时候的操作
shared_ptr<Connection> sp(_connectionQue.front(), [&](Connection *pcon)
{
// 获取互斥锁
unique_lock<mutex> lock(_queueMutex);
// 重置存活时间
pcon->refreshAliveTime();
// 重新入队
_connectionQue.push(pcon); });

// 从连接队列中弹出指针
_connectionQue.pop();
// 使用条件变量通知生产者和其他线程
_cv.notify_all();

return sp;
}

生产新连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void ConnectionPool::produceConnectionTask()
{
// 开启无限循环
for (;;)
{
unique_lock<mutex> lock(_queueMutex);
// 当队列不空的时候,不需要进行生产,等待被唤醒
while (!_connectionQue.empty())
{
// 需要等待条件变量被唤醒并且满足lock的条件(能够获取到锁)
_cv.wait(lock);
}
// 当现在的所有线程数量小于最大数量时,可以进行生产
if (_connectionCnt < _maxSize)
{
// 创建新连接
Connection *Conn = new Connection();
Conn->connect(_ip, _port, _username, _password, _database);
Conn->refreshAliveTime();
_connectionQue.push(Conn);
_connectionCnt++;
}
// 通知消费者线程
_cv.notify_all();
}
}

扫描任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void ConnectionPool::scannerTask()
{
for (;;)
{
this_thread::sleep_for(chrono::seconds(_maxIdleTime));
unique_lock<mutex> lock(_queueMutex);
while (_connectionCnt > _initSize)
{
Connection *conn = _connectionQue.front();
if (conn->getAliveTime() > _maxIdleTime * 1000)
{
_connectionQue.pop();
_connectionCnt--;
delete conn;
}
else
{
break;
}
}
}
}

构造函数

创建并启动两个线程(生产者和扫描线程)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
ConnectionPool::ConnectionPool()
{
if (!loadConfigFile())
{
exit(1);
}
// 初始化连接池
for (int i = 0; i < _initSize; ++i)
{
Connection *conn = new Connection();
conn->connect(_ip, _port, _username, _password, _database);
_connectionQue.push(conn);
_connectionCnt++;
}

// 启动一个线程,作为生产者
thread producer(std::bind(&ConnectionPool::produceConnectionTask,
this));
producer.detach();

// 启动一个新的定时线程,扫描超过maxIdleTime时间的空闲连接,进行回收
thread scanner(std::bind(&ConnectionPool::scannerTask, this));
scanner.detach();
}

CMakeLists

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
cmake_minimum_required(VERSION 3.10)

# 项目信息
project(MySQLConnectionPool)

# 设置 C++ 标准
set(CMAKE_CXX_STANDARD 11)
set(CMAKE_CXX_STANDARD_REQUIRED True)

# 包含头文件目录
include_directories(/usr/include/mysql /usr/include/yaml-cpp)

# 查找库
find_library(MYSQLCLIENT_LIB mysqlclient)
find_library(YAMLCPP_LIB yaml-cpp)

# 添加源文件
set(SOURCES
src/main.cpp
src/ConnectionPool.cc
src/Connection.cc
)

# 添加可执行文件
add_executable(main ${SOURCES})

# 链接库
target_link_libraries(main ${MYSQLCLIENT_LIB} ${YAMLCPP_LIB})

功能测试

压力测试


mysql-connection-pool
https://yintel12138.github.io/2024/12/31/mysql-connection-pool/
作者
Yintel
发布于
2024年12月31日
许可协议