手写数据库连接池

数据库连接池项目

一、项目意义

在设计前先了解一下数据库连接池的作用:

除了在服务器端增加缓存服务器缓存常用的数据 之外(例如redis),还可以增加连接池,来提高MySQL Server的访问效率,在高并发情况下,大量的 TCP三次握手、MySQL Server连接认证、MySQL Server关闭连接回收资源和TCP四次挥手所耗费的 性能时间也是很明显的,增加连接池就是为了减少这一部分的性能损耗

二、环境配置

MySQ数据库编程环境配置:

在win10的vs项目中用C/C++客户端开发包,vs做如下配置:

1.右键项目属性 – C/C++ – 常规 – 附加包含目录,填写mysql.h头文件的路径

2.右键项目属性 – 链接器 – 常规 – 附加库目录,填写libmysql.lib的路径

3.右键项目属性 – 链接器 – 输入 – 附加依赖项,填写libmysql.lib库的名字

4.把libmysql.dll动态链接库(Linux下后缀名是.so库)放在工程目录下;

如果运行数据库连接文件出现:《手写数据库连接池》

说明系统环境变量中没有配置MySQL环境,找不到动态链接,解决方案:在系统环境变量添加MySQL的bin文件夹路径; 也可以在 右键项目属性 – 调试 – 环境,填写 PATH=自己MySQL的bin文件夹路径;

三、项目设计

所需要实现的数据库连接池功能:连接池有连接数量的起始值和阈值;连接池可以动态自动生成和回收连接池里面的资源;连接池和数据库信息等分离,达到复用简单的效果;

①设计一个数据库的的基本操作类

  • 封装数据库的连接、增删改查操作;
  • 给该基本操作类加上存活时间相关的属性和方法;

②数据库连接池类

  • 因为是一个池对应多个资源(对象)的关系,我们也只需要一个池,设计成单例模式;

  • 保存含有基本操作类一群对象资源;

  • 有资源的初始化、增加、释放操作(释放和存活时间相关)

四、详细代码

代码结构:

《手写数据库连接池》

①public.h: 该连接池的全局日志输出,打印一些错误在log中;

#pragma once
/*
* 定义宏等全局定义
*/

#define LOG(str) \
	cout << __FILE__ << ":" << __LINE__ << " " << \
	__TIMESTAMP__ << " : " << str << endl;

②mysql.ini:mysql的详细信息

#数据库连接池的配置文件,和宏定义一样,注意行后面不要有空格
ip=127.0.0.1
port=3306
username=root
password=root
dbname=chat
initSize=10
maxSize=1024
#最大空闲时间默认秒
maxIdleTime=60
#连接超时时间单位是毫秒
connectionTimeout=100

③Connection.h 和 Connection.cpp :数据库操作类 头文件和实现;

Connection.h:

using namespace std;
#include "public.h"

// 数据库操作类
class Connection
{
public:
	// 初始化数据库连接
	Connection();
	// 释放数据库连接资源
	~Connection();
	// 连接数据库
	bool connect(string ip, unsigned short port, string user, string password, string dbname);
	// 更新操作 insert、delete、update
	bool update(string sql);
	// 查询操作:select;
	MYSQL_RES* query(string sql);

	//刷新一下连接的起始空闲时间点
	void refreshAliveTime() { _alivetime = clock(); }//clock()函数当下时间
	// 返回存活的时间
	clock_t getAliveTime() { return clock() - _alivetime; }

private:
	MYSQL* _conn; // 表示和MySQL Server的一条连接
	clock_t _alivetime;// 记录进入空闲状态后的存活时间
};

Connection.cpp:

#include "Connection.h"
#include "public.h"

// 初始化数据库连接
Connection::Connection()
{
	_conn = mysql_init(nullptr);
}
// 释放数据库连接资源
Connection::~Connection()
{
	if (_conn != nullptr)
		mysql_close(_conn);
}
// 连接数据库
bool Connection::connect(string ip, unsigned short port, string username, string password, string dbname)
{
	MYSQL* p = mysql_real_connect(_conn, ip.c_str(), username.c_str(), password.c_str(), dbname.c_str(), port, nullptr, 0);
	return p != nullptr;
}
// 更新操作 insert、delete、update
bool Connection::update(string sql)
{
	if (mysql_query(_conn, sql.c_str()))//如果查询成功,返回0。如果出现错误,返回非0值。
	{
		LOG("更新失败:" + sql);
		return false;
	}
	return true;
}
// 查询操作:select;
MYSQL_RES* Connection::query(string sql)
{
	if (mysql_query(_conn, sql.c_str()))
	{
		LOG("查询失败:" + sql);
		return nullptr;
	}
	return mysql_use_result(_conn);
}

④MySQLConnectionPool.h和MySQLConnectionPool.cpp:连接池类头文件和实现

MySQLConnectionPool.h:

#pragma once
#include <iostream>
#include <string>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <thread>
#include <memory>
#include <functional>
#include "Connection.h"

using namespace std;
/*
* 实现连接池模块
*/
class ConnectionPool
{
public:
	//获取连接池对象实例
	static ConnectionPool* getConnectionPool();
	// 消费者线程函数:给用户连接,归还时放回连接池
	shared_ptr<Connection> getConnection();

private:
	//单例#1 构造函数私有化
	ConnectionPool(); 

	//从配置文件中加载配置
	bool loadConfigFile();

	//生产者线程函数:运行独立的线程中,负责生产新连接,放在类内方便访问成员变量
	void produceConnectionTask();

	//回收线程函数:扫描超过maxIdleTime时间的空闲连接,进行多余的连接回收
	void scannerConnectionTask();

	string _ip;// mysql ip
	unsigned short _port; //mysql 端口 默认3306
	string _username;// mysql 用户名;
	string _password;// mysql登录秘密
	string _dbname; // 数据库名称
	int _initSize;// 连接池的初始连接量
	int _maxSize;// 连接池的最大连接量
	int _maxIdleTime; //连接池最大空闲时间
	int _connectionTimeout;//连接池获取连接的超时时间

	queue<Connection*> _connectionQue; //存储mysql连接的队列,必须是线程安全的;
	mutex _queueMutex;//维护连接队列的线程安全互斥锁
	atomic_int _connectionCnt;// 记录连接所创建的connection连接的总数量,考虑了连接生产消费数量变化的线程安全问题
	condition_variable cv; //设置条件变量,用于连接 生产线程和消费线程的通信
};


MySQLConnectionPool.cpp:

// MySQlConnectionPool.cpp : 此文件包含 "main" 函数。程序执行将在此处开始并结束。
//避免重复包含头文件
#ifndef __COMPLEX__
#define __COMPLEX__

#include <iostream>
#include <string>
#include "MySQLConnectionPool.h"
#include "public.h"

#endif;
// 线程安全的懒汉单例函数接口
ConnectionPool* ConnectionPool::getConnectionPool()
{
	static ConnectionPool pool; // 静态变量实现lock和unlock,拿单例的线程池;
	return &pool;
}

// 从配置文件中加载配置项
bool ConnectionPool::loadConfigFile()
{
	FILE* pf = fopen("mysql.ini", "r");
	if (pf == nullptr)
	{
		LOG("mysql.ini file is not exist!");
		return false;
	}

	while (!feof(pf))//末尾查一下
	{
		char line[1024] = { 0 };
		fgets(line, 1024, pf);
		string str = line;
		int idx = str.find('=', 0);//找出第一个出现=号的下标
		if (idx == -1)//找不到,无效配置项
		{
			continue;
		}

		//实际中是由\n结尾的,password=root\n
		int endidx = str.find('\n', idx);
		string key = str.substr(0, idx); //参数意义:截取的起点以及截取长度
		string value = str.substr(idx + 1, endidx - idx - 1);

		//存值
		if (key == "ip") _ip = value;
		else if (key == "port") _port = atoi(value.c_str());
		else if (key == "username") _username = value;
		else if (key == "password") _password = value;
		else if (key == "dbname") _dbname = value;
		else if (key == "initSize") _initSize = atoi(value.c_str());
		else if (key == "maxSize") _maxSize = atoi(value.c_str());
		else if (key == "maxIdleTime") _maxIdleTime = atoi(value.c_str());
		else if (key == "connectionTimeout") _connectionTimeout = atoi(value.c_str());

	}
}

// 连接池的构造
ConnectionPool::ConnectionPool()
{
	//加载配置项
	if (!loadConfigFile())
	{
		return;
	}

	//创建初始数量的连接
	for (int i = 0; i < _initSize; ++i)
	{
		Connection* p = new Connection();
		p->connect(_ip, _port, _username, _password, _dbname);
		p->refreshAliveTime();// 刷新一下开始空闲起始时间;
		_connectionQue.push(p);
		_connectionCnt++;
	}

	//需要启动一个新线程,作为连接的生产者(生产者线程) 
	//c++的线程函数在linux里面底层也是pthread_creat,需要传入c接口,所以传入类方法需要绑定
	thread produce(std::bind(&ConnectionPool::produceConnectionTask, this));
	produce.detach();

	//启动一个新的定时线程,扫描超过maxIdleTime时间的空闲连接,进行多余的连接回收
	thread scanner(std::bind(&ConnectionPool::scannerConnectionTask, this));
	scanner.detach();
}

//生产者线程:运行独立的线程中,负责生产新连接,放在类内方便访问成员变量
void ConnectionPool::produceConnectionTask()
{
	for (;;)
	{
		unique_lock<mutex> lock(_queueMutex);
		while (!_connectionQue.empty())
		{
			cv.wait(lock); //队列不为空,此处生产线程进入等待状态,释放锁
		}

		//可以生产新连接,创建新连接
		if (_connectionCnt < _maxSize) { 
			Connection* p = new Connection();
			p->connect(_ip, _port, _username, _password, _dbname);
			p->refreshAliveTime();// 刷新一下开始空闲起始时间;		
			_connectionQue.push(p);
			_connectionCnt++;
		}

		//通知消费者线程可以消费连接了
		cv.notify_all();
	}
}

// 消费者线程:给用户连接,从连接池中获取一个可用的空闲连接
shared_ptr<Connection> ConnectionPool::getConnection()
{
	unique_lock<mutex> lock(_queueMutex);
	//if (_connectionQue.empty())//空的就让生产者生产
	//{
	//	//不可以用sleep,sleep是直接睡,而wait_for是被通知就可以马上继续走
	//	cv.wait_for(lock, chrono::milliseconds(_connectionTimeout));//毫秒,超过时间没有被唤醒的话也会出来
	//	if (_connectionQue.empty())
	//	{
	//		LOG("获取空闲连接超时了!!!获取连接失败");
	//		return nullptr;
	//	}
	//}

	//上述没有考虑好,有可能等待过程中是被唤醒的,但是拿锁慢,还是被拿走了锁
	//优化一下:
	while (_connectionQue.empty())
	{
		if (cv_status::timeout == cv.wait_for(lock, chrono::milliseconds(_connectionTimeout)))
		{
			//是真的超时了, 并且连接池为空
			if (_connectionQue.empty())
			{
				LOG("获取空闲连接超时了!!!获取连接失败");
				return nullptr;
			}			
		}
	}

	//有连接在池子里
	/*
	shared_ptr智能指针析构时,默认会调用connection析构函数,connection就会被close
	这里就需要自定义share_ptr的释放资源方式:把connection直接归还到_connectionQue中;
	*/
	shared_ptr<Connection> sp(_connectionQue.front(),
		[&](Connection* pcon) {
			//这里是在服务器(多线程)消费者线程中调用的,涉及了共享数据,所以一定要考虑队列的线程安全操作
			unique_lock<mutex> lock(_queueMutex);
			pcon->refreshAliveTime();// 刷新一下开始空闲起始时间;
			_connectionQue.push(pcon);
		});
	_connectionQue.pop();
	//if (_connectionQue.empty()) //这样写也可以
	//{
	//	cv.notify_all();//消费完连接后发现队列为空,通知生产者线程;
	//}
	cv.notify_all();//消费完连接后,通知生产者线程检查线程池是否为空;
	return sp;
}

//回收线程函数:扫描超过maxIdleTime时间的空闲连接,进行多余的连接回收
void ConnectionPool::scannerConnectionTask()
{
	for (;;)
	{
		//通过sleep模拟定时效果
		this_thread::sleep_for(chrono::seconds(_maxIdleTime));
	
		// 扫描整个队列,释放多余连接
		unique_lock<mutex> lock(_queueMutex);
		while (_connectionCnt > _initSize)
		{
			Connection* p = _connectionQue.front();
			//这里都释放?应该释放大于initSize以上的?
			if (p->getAliveTime() > _maxIdleTime * 1000) //##队头的空闲时间是最长的,只用看队头就行
			{
				_connectionQue.pop();
				_connectionCnt--;
				delete p;//调用connectin析构函数
			}
			else
			{
				break;//队头都小于,后面肯定小;
			}
		}
	}
}

五、代码测试

进行压力测试对比一下使用连接池和不使用连接池的效果;

测试代码:main函数手动测试

#include <iostream>
#include <thread>
#include "Connection.h"
#include "MySQLConnectionPool.h"


using namespace std;


int main()
{
	/*
	 * 数据库测试
	*/
	//Connection conn;
	//char sql[1024] = { 0 };
	//sprintf(sql, "insert into user(name, age, sex) values ('%s', %d, '%s')", "zhang san", 20, "male");
	//conn.connect("127.0.0.1", 3306, "root", "root", "chat");
	//conn.update(sql);

	//压力测试:
	// 
	//①不用连接池,单线程,更改数据1000、5000、10000
	clock_t begin = clock();
	for (int i = 0; i < 1000; ++i) {
		Connection conn;
		char sql[1024] = { 0 };
		sprintf(sql, "insert into user(name, age, sex) values ('%s', %d, '%s')", "zhang san", 20, "male");
		conn.connect("127.0.0.1", 3306, "root", "root", "chat");
		conn.update(sql);
	}
	clock_t end = clock();
	cout << end - begin << "ms" << endl;

	//②用连接池单线程,更改数据1000、5000、10000
	//clock_t begin = clock();
	//ConnectionPool *cp = ConnectionPool::getConnectionPool();
	//for (int i = 0; i < 5000; ++i) {
	//	shared_ptr<Connection> sp= cp->getConnection();
	//	char sql[1024] = { 0 };
	//	sprintf(sql, "insert into user(name, age, sex) values ('%s', %d, '%s')", "zhang san", 20, "male");
	//	sp->update(sql);
	//}
	//clock_t end = clock();
	//cout << end - begin << "ms" << endl;

	//③不用连接池的4线程
	//不能在多线程中同时连接数据库,是非法的,需要先在外面声明连接
	//Connection conn;
	//conn.connect("127.0.0.1", 3306, "root", "root", "chat");
	//clock_t begin = clock();
	//thread t1([]() {
	//	for (int i = 0; i < 250; ++i) {
	//		Connection conn;
	//		char sql[1024] = { 0 };
	//		sprintf(sql, "insert into user(name, age, sex) values ('%s', %d, '%s')", "zhang san", 20, "male");
	//		conn.connect("127.0.0.1", 3306, "root", "root", "chat");
	//		conn.update(sql);
	//	}
	//	});
	//thread t2([]() {
	//	for (int i = 0; i < 250; ++i) {
	//		Connection conn;
	//		char sql[1024] = { 0 };
	//		sprintf(sql, "insert into user(name, age, sex) values ('%s', %d, '%s')", "zhang san", 20, "male");
	//		conn.connect("127.0.0.1", 3306, "root", "root", "chat");
	//		conn.update(sql);
	//	}
	//	});
	//thread t3([]() {
	//	for (int i = 0; i < 250; ++i) {
	//		Connection conn;
	//		char sql[1024] = { 0 };
	//		sprintf(sql, "insert into user(name, age, sex) values ('%s', %d, '%s')", "zhang san", 20, "male");
	//		conn.connect("127.0.0.1", 3306, "root", "root", "chat");
	//		conn.update(sql);
	//	}
	//	});
	//thread t4([]() {
	//	for (int i = 0; i < 250; ++i) {
	//		Connection conn;
	//		char sql[1024] = { 0 };
	//		sprintf(sql, "insert into user(name, age, sex) values ('%s', %d, '%s')", "zhang san", 20, "male");
	//		conn.connect("127.0.0.1", 3306, "root", "root", "chat");
	//		conn.update(sql);
	//	}
	//	});
	//t1.join();
	//t2.join();
	//t3.join();
	//t4.join();
	//clock_t end = clock();
	//cout << end - begin << "ms" << endl;

	//④用连接池的4线程
	//clock_t begin = clock();
	//thread	t1([]() {
	//	ConnectionPool* cp = ConnectionPool::getConnectionPool();
	//	for (int i = 0; i < 250; ++i) {
	//		shared_ptr<Connection> sp = cp->getConnection();
	//		char sql[1024] = { 0 };
	//		sprintf(sql, "insert into user(name, age, sex) values ('%s', %d, '%s')", "zhang san", 20, "male");
	//		sp->update(sql);
	//	}
	//	});
	//thread	t2([]() {
	//	ConnectionPool* cp = ConnectionPool::getConnectionPool();
	//	for (int i = 0; i < 250; ++i) {
	//		shared_ptr<Connection> sp = cp->getConnection();
	//		char sql[1024] = { 0 };
	//		sprintf(sql, "insert into user(name, age, sex) values ('%s', %d, '%s')", "zhang san", 20, "male");
	//		sp->update(sql);
	//	}
	//	});
	//thread	t3([]() {
	//	ConnectionPool* cp = ConnectionPool::getConnectionPool();
	//	for (int i = 0; i < 250; ++i) {
	//		shared_ptr<Connection> sp = cp->getConnection();
	//		char sql[1024] = { 0 };
	//		sprintf(sql, "insert into user(name, age, sex) values ('%s', %d, '%s')", "zhang san", 20, "male");
	//		sp->update(sql);
	//	}
	//	});
	//thread	t4([]() {
	//	ConnectionPool* cp = ConnectionPool::getConnectionPool();
	//	for (int i = 0; i < 250; ++i) {
	//		shared_ptr<Connection> sp = cp->getConnection();
	//		char sql[1024] = { 0 };
	//		sprintf(sql, "insert into user(name, age, sex) values ('%s', %d, '%s')", "zhang san", 20, "male");
	//		sp->update(sql);
	//	}
	//	});
	//t1.join();
	//t2.join();
	//t3.join();
	//t4.join();
	//clock_t end = clock();
	//cout << end - begin << "ms" << endl;

	return 0;
}

刚开始连接速度,插入速度极慢的原因:

是因为 mysql8.0 一些设置是默认开启的(5.7 是默认关闭的),而这些设置可能会严重影响数据库性能

执行以下优化:

最后我还是换成了5.7的版本进行测试:

数据量未使用连接池所耗时间使用连接池所耗时间
1000单线程:1886ms 四线程:495ms单线程:1078ms 四线程:406ms
5000单线程:10032ms 四线程:2368ms单线程:5328ms 四线程:2033ms
10000单线程:19407ms 四线程:4579ms单线程:10532ms四线程:4041ms

锻炼的技术点:MySQL数据库编程、单例模式、queue队列容器、C++11多线程编程、线程互斥、线程同步通信和 unique_lock、基于CAS的原子整形、智能指针shared_ptr、lambda表达式、生产者-消费者线程模型;

    原文作者:D-booker
    原文地址: https://www.cnblogs.com/D-booker/p/16398833.html
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞