并行处理之引用计数与状态的使用
--------------------------------------------
标题: 并行处理之引用计数与状态的使用
作者: 叶飞虎
日期: 2013.03.09
--------------------------------------------
在并行处理过程中多线程对同一对象进行操作,而且操作各自独立,但又需要依赖对象
本身的状态,如已打开或已登录等等。这时需要给状态进行引用计数,以保证操作的有效和
并发性。
为了便于描述, 以<远程文件系统>中的客户端连接对象类(TRFConnObj)中部分代码做为
示例:
// 对象状态enum TObjState {osInactive = 0, // 未打开 osClosing = 1, // 正在关闭 osOpening = 2, // 正在打开 osOpened = 3}; // 已经打开// 引用计数加 1bool TRFConnObj::IncRefCount(){ // 初始化 bool result = false; // 操作 Lock(); if (FState == osOpened) { FRefCount++; result = true; } Unlock(); // 返回结果 return result;}// 引用计数减 1void TRFConnObj::DecRefCount(){ Lock(); if (FRefCount > 0) FRefCount--; Unlock();}// 打开long TRFConnObj::Open(){ // 初始化 long result = crStateInvalid; bool boolNext = false; // 更改状态 Lock(); if (FState == osInactive) { FState = osOpening; boolNext = true; FRefCount= 0; } else if (FState == osOpened) result = crSuccess; Unlock(); // 判断是否继续 if (boolNext) { // 执行打开 boolNext = false; result = DoOpen(); // 更改状态 Lock(); if (FState != osOpening) boolNext = true; else if (result == crSuccess) FState = osOpened; else FState = osInactive; Unlock(); // 判断是否需要关闭 if (boolNext) { // 执行关闭 if (result == crSuccess) { DoClose(true); result = crFailure; } // 更改状态(不需要加锁) FState = osInactive; } } // 返回结果 return result;}// 关闭void TRFConnObj::Close(bool ANeedWait){ // 初始化 bool boolNext = false; bool boolWait = false; // 更改状态 Lock(); if (FState == osOpened) { FState = osClosing; boolNext = true; } else if (FState == osOpening) { FState = osClosing; boolWait = true; } else if (FState == osClosing) boolWait = ANeedWait; Unlock(); // 判断是否等待 if (boolWait) { // 等待 Close 结束 while (FState == osClosing) Sleep(15); } else if (boolNext) { // 执行关闭 DoClose(true); // 更改状态(不需要加锁) FState = osInactive; }}// 并行操作的方法long TRFConnObj::<method>(...){ // 初始化 long result = crStateInvalid; // 引用计数加 1 if (IncRefCount()) { // 操作 // ??? ... ... // 引用计数减 1 DecRefCount(); } // 返回结果 return result;}
可以把 Open 和 Close 方法理解为打开和关闭并行处理之门,只有 Open 之后才有效。
当调用 Close 方法时,若还有其它线程正在操作相关方法,则会等待操作完成后才能关闭,
若已调用 Close 方法且状态为 osClosing 且其它线程开始操作相关方法时,则会直接返回
状态无效,并拒之门外。
对象状态和引用计数配合使用会提高事件处理的并发性,且不会在边界状态时导致异常
发生,同时还可以保证系统的并发性能。在服务器程序的开发中会常用到引用计数和状态,
若完全理解 Open 和 Close 中状态处理,则有助于灵活运用到并发性高的软件开发中。
<远程文件系统>提供完整源码,下载地址如下:
远程文件系统服务器源码[http://download.csdn.net/detail/kyee/5109478]
远程文件系统客户端源码[http://download.csdn.net/detail/kyee/5109484]
服务器和客户端相关接口[http://download.csdn.net/detail/kyee/5109491]
客户端连接对象类(TRFConnObj)部分代码如下:
// 返回值及错误码enum TRFCResult {crSuccess = 1, // 成功 crFailure = 0, // 失败 crUnknown = -1, // 未知错误 crNotExist = -2, // 不存在(如: AConnObj) crNotStart = -3, // 服务器未启动 crNotConnect = -4, // 连接未打开 crNonsupport = -5, // 不支持 crVersion = -6, // 版本太低 crPassword = -7, // 密码错误 crIsExisted = -8, // 已存在 crIsIllegal = -9, // 不合法 crAttrInvalid = -10, // 属性无效 crStateInvalid = -11, // 状态无效 crHandleInvalid = -12, // 句柄无效 crAccessIllegal = -13}; // 存取非法// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~/* TRFConnObj - 客户端连接对象类 */class TRFConnObj{public: TRFConnObj(TRFConnEvent* AEventObj = NULL, const char* AHost = NULL, long APort = 0, const char* APassword = NULL); virtual ~TRFConnObj(); // 属性 void* Data() const { return FData; } TRCConnection* ConnObj() const { return FConnObj; } TRFConnEvent* EventObj() const { return FEventObj; } Longword CallTimeout() const { return FCallTimeout; } TObjState State() const { return FState; } KYString RFVersion() const { return FRFInfo.Version; } TDateTime RFStartTime() const { return FRFInfo.StartTime; } TDateTime RFDateTime(); // 设置属性 void SetData(void* AData) { FData = AData; } void SetCallTimeout(Longword ATimeout); // 打开/关闭连接 long Open(); void Close(bool ANeedWait = false); // 读取/设置文件属性 long GetFileAttr(const char* AFileName, long* Attrib); long SetFileAttr(const char* AFileName, long Attrib); // 文件存在/删除/移动文件或目录 long FileExisted(const char* AFileName); long DeleteFile(const char* AFileName); long MoveFile(const char* AOldName, const char* ANewName); // 目录存在/创建/删除 long DirExisted(const char* APathName); long CreateDir(const char* APathName, bool AForced = false); long RemoveDir(const char* APathName, bool AForced = false); // 磁盘操作相关函数 long DriveType(const char* ADrive, long* AType); long DiskSpace(const char* APath, __int64* ATotalSpace, __int64* AFreeSpace);protected: // 状态锁 void Lock() const { FLock->Enter(); } void Unlock() const { FLock->Leave(); } // 对象次数增减 long IncObjTimes() { return InterlockedIncrement(&FObjTimes); } long DecObjTimes() { return InterlockedDecrement(&FObjTimes); } // 引用计数增减 bool IncRefCount_Valid(); bool IncRefCount(); void DecRefCount(); // 读取 RF 的信息 void GetRFInfo(); void GetRFDateTime(); // 存取 RF 属性方法 long GetRFInt(long Attrib, long& AValue); long GetRFStr(long Attrib, KYString& AValue); long SetRFInt(long Attrib, long AValue); long SetRFStr(long Attrib, const char* AValue, long ALen);private: // 执行初始化/释放 void DoInit(); void DoFreeing() { FEventObj->DoFreeing(this); } // 执行打开/关闭 long DoOpen(); void DoClose(bool ANeedClose); // 执行断开连接 void DoDisconnect(); void CallOnDisconnect(); // 执行 RC 事件的回调方法 void DoRecvData(const void* AData, long ASize);private: void* FData; // 自定义数据 TKYCritSect* FLock; // 状态锁 TRCConnection* FConnObj; // RC 连接对象 TRFConnEvent* FEventObj; // 事件对象 TObjState FState; // 状态 long FObjTimes; // 对象次数 long FRefCount; // 引用计数 Longword FCallTimeout; // 调用超时时长(毫秒) TRFInfo FRFInfo; // 存储服务器信息 Longword FLastTick; // 读取最后一次 RFDateTime 的 tick 值 TDateTime FDeltaTime; // 存储服务器的当前时间差值 TRFCOnDisconnect FOnDisconnect; // OnDisconnect 回调事件函数private: // RC 连接的回调函数 static void __stdcall _DoRCCallback(long AConnID, long AEvent, long ACmdID, long AReturn); static void __stdcall _DoRCRecvData(long AConnID, long ASize, const void* AData);};
/* TRFConnObj - 客户端连接对象类 */// ---------------- 静态函数 ----------------// RC 连接的回调函数void __stdcall TRFConnObj::_DoRCCallback(long AConnID, long AEvent, long ACmdID, long AReturn){ if (AEvent == rcvConnClosed) try { TRCConnection* objConn = ConnID2Object(AConnID); if (objConn != NULL) ((TRFConnObj*)objConn->Data())->DoDisconnect(); } catch (...) {}}// RC 接收自定义数据的回调函数void __stdcall TRFConnObj::_DoRCRecvData(long AConnID, long ASize, const void* AData){ try { TRCConnection* objConn = ConnID2Object(AConnID); if (objConn != NULL) ((TRFConnObj*)objConn->Data())->DoRecvData(AData, ASize); } catch (...) {}}// ---------------- 构造函数和析构函数 ----------------// 构造函数TRFConnObj::TRFConnObj(TRFConnEvent* AEventObj, const char* AHost, long APort, const char* APassword){ // 初始化 FData = NULL; FState = osInactive; FEventObj = AEventObj; FObjTimes = 1; FRefCount = 0; FCallTimeout = Timeout_DefCall; // 初始化函数指针 FOnDisconnect = NULL; // 创建连接对象 FConnObj = new TRCConnection(AHost, RF_AppName, APassword, APort, &TRFConnObj::_DoRCCallback); // 设置连接对象属性 FConnObj->SetData(this); FConnObj->SetLinger(0); FConnObj->SetOnRecvData(&TRFConnObj::_DoRCRecvData); // 创建对象 FLock = new TKYCritSect; // 执行初始化 DoInit();}// 析构函数TRFConnObj::~TRFConnObj(){ // 关闭 Close(true); FConnObj->SetData(NULL); // 执行释放方法 DoFreeing(); // 释放对象 FreeAndNil(FConnObj); FreeAndNil(FLock);}// ---------------- 私有函数 ----------------// 执行初始化void TRFConnObj::DoInit(){ // 初始化 FLastTick = 0; FDeltaTime = 0.0; // 初始化信息 FRFInfo.StartTime = 0.0; // 清除 FRFInfo.Version.Clear();}// 执行打开long TRFConnObj::DoOpen(){ // 初始化 long result = crFailure; // 打开连接 switch (RCConnOpen(FConnObj->ConnID())) { case CRet_Success: // 加载命令 if (FConnObj->LoadCmd()) { // 取 RF 信息 GetRFInfo(); // 返回成功 result = crSuccess; } else FConnObj->Close(); break; case CRet_Failure: result = crNotConnect; break; case CRet_AppPass_Error: result = crPassword; break; case CRet_AppName_NotExist: result = crNotStart; break; case CRet_ConnID_NotExist: result = crNotExist; break; default: result = crUnknown; } // 返回结果 return result;}// 执行关闭void TRFConnObj::DoClose(bool ANeedClose){ // 判断是否需要关闭 if (ANeedClose) FConnObj->Close(); // 等待 RefCount == 0 while (FRefCount > 0) Sleep(15); // 执行初始化 DoInit();}// 执行连接的断开事件void TRFConnObj::DoDisconnect(){ // 初始化 bool boolNext = false; bool boolClose = false; // 检查状态 Lock(); if (FState >= osOpened) { boolClose = true; boolNext = true; FState = osClosing; } else if (FState == osOpening) FState = osClosing; else boolNext = (FState == osClosing); Unlock(); // 判断是否继续 if (boolNext) { // 激发 OnDisconnect 事件方法 FEventObj->DoDisconnect(this); // 激发 OnDiconnect 函数指针 CallOnDisconnect(); // 判断是否需要关闭 if (boolClose) { // 执行关闭 DoClose(false); // 更改状态(不需要加锁) FState = osInactive; } }}// 激发 OnDiconnect 函数指针void TRFConnObj::CallOnDisconnect(){ if (FOnDisconnect != NULL) __try { (*FOnDisconnect)(this); } __except (_ExceptIgnore) {}}// 执行 RC 接收自定义数据方法void TRFConnObj::DoRecvData(const void* AData, long ASize){}// ---------------- 保护函数 ----------------// 引用计数加 1bool TRFConnObj::IncRefCount_Valid(){ // 初始化 bool result = false; // 操作 Lock(); if (FState >= osOpening) { FRefCount++; result = true; } Unlock(); // 返回结果 return result;}// 引用计数加 1bool TRFConnObj::IncRefCount(){ // 初始化 bool result = false; // 操作 Lock(); if (FState == osOpened) { FRefCount++; result = true; } Unlock(); // 返回结果 return result;}// 引用计数减 1void TRFConnObj::DecRefCount(){ Lock(); if (FRefCount > 0) FRefCount--; Unlock();}// ---------------- 公有函数 ----------------// 打开long TRFConnObj::Open(){ // 初始化 long result = crStateInvalid; bool boolNext = false; // 更改状态 Lock(); if (FState == osInactive) { FState = osOpening; boolNext = true; FRefCount= 0; } else if (FState == osOpened) result = crSuccess; Unlock(); // 判断是否继续 if (boolNext) { // 执行打开 boolNext = false; result = DoOpen(); // 更改状态 Lock(); if (FState != osOpening) boolNext = true; else if (result == crSuccess) FState = osOpened; else FState = osInactive; Unlock(); // 判断是否需要关闭 if (boolNext) { // 执行关闭 if (result == crSuccess) { DoClose(true); result = crFailure; } // 更改状态(不需要加锁) FState = osInactive; } } // 返回结果 return result;}// 关闭void TRFConnObj::Close(bool ANeedWait){ // 初始化 bool boolNext = false; bool boolWait = false; // 更改状态 Lock(); if (FState == osOpened) { FState = osClosing; boolNext = true; } else if (FState == osOpening) { FState = osClosing; boolWait = true; } else if (FState == osClosing) boolWait = ANeedWait; Unlock(); // 判断是否等待 if (boolWait) { // 等待 Close 结束 while (FState == osClosing) Sleep(15); } else if (boolNext) { // 执行关闭 DoClose(true); // 更改状态(不需要加锁) FState = osInactive; }}// 文件存在long TRFConnObj::FileExisted(const char* AFileName){ // 初始化 long result = crStateInvalid; // 检查参数 if ((AFileName == NULL) || (*AFileName == '\0')) result = crIsIllegal; else if (IncRefCount()) // 引用计数加 1 { // 操作 // ??? ... ... // 引用计数减 1 DecRefCount(); } // 返回结果 return result;}// 文件删除long TRFConnObj::DeleteFile(const char* AFileName){ // 初始化 long result = crStateInvalid; // 检查参数 if ((AFileName == NULL) || (*AFileName == '\0')) result = crIsIllegal; else if (IncRefCount()) // 引用计数加 1 { // 操作 // ??? ... ... // 引用计数减 1 DecRefCount(); } // 返回结果 return result;}// 移动文件或目录long TRFConnObj::MoveFile(const char* AOldName, const char* ANewName){ // 初始化 long result = crStateInvalid; // 检查参数 if ((AOldName == NULL) || (*AOldName == '\0') || (ANewName == NULL) || (*ANewName == '\0')) result = crIsIllegal; else if (IncRefCount()) // 引用计数加 1 { // 操作 // ??? ... ... // 引用计数减 1 DecRefCount(); } // 返回结果 return result;}// 目录存在long TRFConnObj::DirExisted(const char* APathName){ // 初始化 long result = crStateInvalid; // 检查参数 if ((APathName == NULL) || (*APathName == '\0')) result = crIsIllegal; else if (IncRefCount()) // 引用计数加 1 { // 操作 // ??? ... ... // 引用计数减 1 DecRefCount(); } // 返回结果 return result;}// 目录创建long TRFConnObj::CreateDir(const char* APathName, bool AForced){ // 初始化 long result = crStateInvalid; // 检查参数 if ((APathName == NULL) || (*APathName == '\0')) result = crIsIllegal; else if (IncRefCount()) // 引用计数加 1 { // 操作 // ??? ... ... // 引用计数减 1 DecRefCount(); } // 返回结果 return result;}// 目录删除long TRFConnObj::RemoveDir(const char* APathName, bool AForced){ // 初始化 long result = crStateInvalid; // 检查参数 if ((APathName == NULL) || (*APathName == '\0')) result = crIsIllegal; else if (IncRefCount()) // 引用计数加 1 { // 操作 // ??? ... ... // 引用计数减 1 DecRefCount(); } // 返回结果 return result;}// 取驱动器类型long TRFConnObj::DriveType(const char* ADrive, long* AType){ // 初始化 long result = crStateInvalid; // 检查参数 if ((ADrive == NULL) || (*ADrive == '\0') || (AType == NULL)) result = crIsIllegal; else if (IncRefCount()) // 引用计数加 1 { // 操作 // ??? ... ... // 引用计数减 1 DecRefCount(); } // 返回结果 return result;}// 取磁盘空闲空间long TRFConnObj::DiskSpace(const char* APath, __int64* ATotalSpace, __int64* AFreeSpace){ // 初始化 long result = crStateInvalid; // 检查参数 if ((APath == NULL) || (*APath == '\0') || ((ATotalSpace == NULL) && (AFreeSpace == NULL))) result = crIsIllegal; else if (IncRefCount()) // 引用计数加 1 { // 操作 // ??? ... ... // 引用计数减 1 DecRefCount(); } // 返回结果 return result;}
--------------------------------------------