[转载]InnoDB行锁的实现分析
原文网址: http://www.penglixun.com/tech/database/innodb_next_key_locking.html
本文内容遵从CC版权协议, 可以随意转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明
?
感谢Fenng的提醒,已删除锁粒度和死锁的关系,专门撰文写了锁的粒度与死锁的关系。
InnoDB与MyISAM不同,它实现的是一个行级锁,而非MyISAM的表锁。锁的粒度越大,则发生死锁的概率越小、锁机制开销越小,但并发能力会越低。如果锁的粒度变细,则发生死锁的概率也会增大,锁机制的开销会更大,但是并发能力能提高。表锁是如何实现的呢,以MyISAM为例,是在每个表的结构中加入一个互斥变量记录锁状态,像:
struct Table {
Row rows[MAXROWS];
pthread_mutex_t lock;//表锁
};
这样做的好处就是锁非常简单,当操作表的时候,直接锁住整个表就行,锁机制的开销非常小。但是问题也很明显,并发量上不去,因为无论多小的操作,都必须锁整个表,这可能带来其他操作的阻塞。
行锁又是如何实现的呢,Oracle是直接在每个行的block上做标记,而InnoDB则是靠索引来做。InnoDB的主键索引跟一般的索引不太一样,Key后面还跟上了整行的数据,互斥变量也是加载主键索引上的,像
struct PK_Idx {
Row row;
pthread_mutex_t lock;//行锁
};
multimap pk_idx;
这样的形式。
这样做的好处是锁的粒度小,只锁住需要的数据不被更改,但是问题也很明显,锁的开销很大,每个主键索引上都要加上一个标记,因为锁的粒度很小,可能两个不同的操作各锁住一部分行等待对方释放形成死锁,不过这个是有办法解决的,把上锁的操作封装成原子操作就行,不过并发量会受些影响。
下面是类似InnoDB的Next-Key locking算法的演示:
编译需要加-lpthread参数,例如g++ inno.cpp -lpthread -o inno
#include <iostream>#include <cstdio>#include <cstdlib>#include <string>#include <map>#include <unistd.h>#include <time.h>#include <pthread.h>#include <windows.h>?#define LOCK pthread_mutex_lock(&lock)#define UNLOCK pthread_mutex_unlock(&lock)#define PRINT(STR, ...) LOCK;fprintf(stderr, STR, __VA_ARGS__);UNLOCK?#define MAXROWS 100?using namespace std;?/* 行结构 */struct Row { int num; string info;};?/* 主键索引结构 */struct PK_Idx { Row row; pthread_mutex_t lock;//行锁};?/* 表结构 */struct Table { multimap<int, PK_Idx> pk_idx; multimap<int, int> num_idx; multimap<string, int> info_idx; Row rows[MAXROWS]; pthread_mutex_t lock;//表锁};?Table table;int pid;//全局锁pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;?/* 随机字符 */char randChar() { return rand()%26+'A';}?/* 随机字符串 */void randString(string &col, int len) { col = ""; for(int i=0; i<len; ++i) { col += randChar(); }}?/* 初始化数据 */void init() { pid = 0; PK_Idx pk;? srand((unsigned)time(0));? //初始化表数据 for(int i=0; i<MAXROWS; ++i) { pk.row.num = rand()%MAXROWS; randString(pk.row.info, rand()%10+1); //初始化行锁 pk.lock = PTHREAD_MUTEX_INITIALIZER; //写入表数据 table.rows[i].num = pk.row.num; table.rows[i].info = pk.row.info; //写入索引 table.pk_idx.insert(pair<int, PK_Idx>(i, pk)); table.num_idx.insert(pair<int, int>(pk.row.num, i)); table.info_idx.insert(pair<string, int>(pk.row.info, i)); } //初始化表锁 table.lock = PTHREAD_MUTEX_INITIALIZER;}?/*获取范围数据*/void select_num(int begin, int end) { int id; int cur_pid; multimap<int,int>::iterator it, itlow, itup; PK_Idx *pk; /* 按字段范围查找ID */ itlow = table.num_idx.lower_bound (begin); itup = table.num_idx.upper_bound (end);? LOCK; cur_pid = pid++; UNLOCK; PRINT("%d : * Start Select:%d,%d *\n", cur_pid, begin, end); for (it=itlow; it!=itup; ++it) { id = it->second; pk = &(table.pk_idx.find(id)->second);//根据ID去查主键索引 pthread_mutex_lock(&(pk->lock));//在主键索引上加锁 PRINT("%d : LOCK Row %d: %d\t%s\n", cur_pid, id, pk->row.num, pk->row.info.c_str()); Sleep(500); } for (it=itlow; it!=itup; ++it) { id = it->second; pk = &(table.pk_idx.find(id)->second); PRINT("%d : UNLOCK Row %d\n", cur_pid, id); pthread_mutex_unlock(&(pk->lock));//使用完毕依次释放锁 } PRINT("%d : * Select Finished! *\n", cur_pid);}?/*修改范围数据*/void update_num(int begin, int end) { int id; int cur_pid; multimap<int,int>::iterator it, itlow, itup; PK_Idx *pk;? itlow = table.num_idx.lower_bound (begin); itup = table.num_idx.upper_bound (end);? LOCK; cur_pid = pid++; UNLOCK; PRINT("%d : * Start Update:%d,%d *\n", cur_pid, begin, end); for (it=itlow; it!=itup; ++it) { id = it->second; pk = &(table.pk_idx.find(id)->second); pthread_mutex_lock(&(pk->lock)); PRINT("%d : LOCK Row %d: %d\t%s\n", cur_pid, id, pk->row.num, pk->row.info.c_str()); Sleep(500); } for (it=itlow; it!=itup; ++it) { id = it->second; pk = &(table.pk_idx.find(id)->second); PRINT("%d : UNLOCK Row %d\n", cur_pid, id); pthread_mutex_unlock(&(pk->lock)); } PRINT("%d : * Update Finished! *\n", cur_pid);}??void* test_select(void *) { int begin, end; srand((unsigned)time(0)); while(1) { begin = rand()%(MAXROWS/2); end = begin+rand()%(MAXROWS/2); select_num(begin, end); Sleep(500); }}?void* test_update(void *) { int begin, end; srand((unsigned)time(0)); while(1) { begin = rand()%(MAXROWS/5); end = begin+rand()%(MAXROWS/5); update_num(begin, end); Sleep(500); }}?void test() { pthread_t id[2]; if(pthread_create(&id[0], NULL, test_select, NULL) != 0) { PRINT("%s", "Create Thread Error!\n"); }? if(pthread_create(&id[1], NULL, test_update, NULL) != 0) { PRINT("%s", "Create Thread Error!\n"); }? while(1){Sleep(500);}}?int main() { init(); test(); return 0;}