StateMachine没有batch执行接口,未能充分利用batch能力
由于StateMachine没有batch执行接口,即使遇到一个batch类型的paxoslog(包含N个非batch类型的paxoslog),也得执行N次StateMachine::Execute/StateMachine::ExecuteForCheckpoint接口,未能充分利用batch能力。
通常状态机执行时会伴随写盘操作。把同属一个状态机的连续M次执行合并到1次执行通常能大大降低写盘操作的次数,能更充分利用batch能力。
phxpaxos可以为状态机提供batch执行接口,进一步提高性能。
原代码路径:
include/phxpaxos/sm.h
// ...
class StateMachine
{
// ...
// 没有batch执行接口,一次只能执行一个非batch类型的paxoslog
virtual bool Execute(const int iGroupIdx, const uint64_t llInstanceID,
const std::string & sPaxosValue, SMCtx * poSMCtx) = 0;
virtual bool ExecuteForCheckpoint(const int iGroupIdx, const uint64_t llInstanceID,
const std::string & sPaxosValue);
// ...
};
// ...
src/sm-base/sm_base.cpp
// ...
bool SMFac :: BatchExecute(const int iGroupIdx, const uint64_t llInstanceID, const std::string & sBodyValue, BatchSMCtx * poBatchSMCtx)
{
BatchPaxosValues oBatchValues;
bool bSucc = oBatchValues.ParseFromArray(sBodyValue.data(), sBodyValue.size());
if (!bSucc)
{
PLG1Err("ParseFromArray fail, valuesize %zu", sBodyValue.size());
return false;
}
if (poBatchSMCtx != nullptr)
{
if ((int)poBatchSMCtx->m_vecSMCtxList.size() != oBatchValues.values_size())
{
PLG1Err("values size %d not equal to smctx size %zu",
oBatchValues.values_size(), poBatchSMCtx->m_vecSMCtxList.size());
return false;
}
}
for (int i = 0; i < oBatchValues.values_size(); i++)
{
const PaxosValue & oValue = oBatchValues.values(i);
SMCtx * poSMCtx = poBatchSMCtx != nullptr ? poBatchSMCtx->m_vecSMCtxList[i] : nullptr;
bool bExecuteSucc = DoExecute(iGroupIdx, llInstanceID, oValue.value(), oValue.smid(), poSMCtx); // 即使遇到一个batch类型的paxoslog(包含N个非batch类型的paxoslog),也得执行N次StateMachine::Execute
if (!bExecuteSucc)
{
return false;
}
}
return true;
}
bool SMFac :: DoExecute(const int iGroupIdx, const uint64_t llInstanceID,
const std::string & sBodyValue, const int iSMID, SMCtx * poSMCtx)
{
if (iSMID == 0)
{
PLG1Imp("Value no need to do sm, just skip, instanceid %lu", llInstanceID);
return true;
}
if (m_vecSMList.size() == 0)
{
PLG1Imp("No any sm, need wait sm, instanceid %lu", llInstanceID);
return false;
}
for (auto & poSM : m_vecSMList)
{
if (poSM->SMID() == iSMID)
{
return poSM->Execute(iGroupIdx, llInstanceID, sBodyValue, poSMCtx);
}
}
PLG1Err("Unknown smid %d instanceid %lu", iSMID, llInstanceID);
return false;
}
// ...
bool SMFac :: BatchExecuteForCheckpoint(const int iGroupIdx, const uint64_t llInstanceID,
const std::string & sBodyValue)
{
BatchPaxosValues oBatchValues;
bool bSucc = oBatchValues.ParseFromArray(sBodyValue.data(), sBodyValue.size());
if (!bSucc)
{
PLG1Err("ParseFromArray fail, valuesize %zu", sBodyValue.size());
return false;
}
for (int i = 0; i < oBatchValues.values_size(); i++)
{
const PaxosValue & oValue = oBatchValues.values(i);
bool bExecuteSucc = DoExecuteForCheckpoint(iGroupIdx, llInstanceID, oValue.value(), oValue.smid()); // 即使遇到一个batch类型的paxoslog(包含N个非batch类型的paxoslog),也得执行N次StateMachine::ExecuteForCheckpoint
if (!bExecuteSucc)
{
return false;
}
}
return true;
}
bool SMFac :: DoExecuteForCheckpoint(const int iGroupIdx, const uint64_t llInstanceID,
const std::string & sBodyValue, const int iSMID)
{
if (iSMID == 0)
{
PLG1Imp("Value no need to do sm, just skip, instanceid %lu", llInstanceID);
return true;
}
if (m_vecSMList.size() == 0)
{
PLG1Imp("No any sm, need wait sm, instanceid %lu", llInstanceID);
return false;
}
for (auto & poSM : m_vecSMList)
{
if (poSM->SMID() == iSMID)
{
return poSM->ExecuteForCheckpoint(iGroupIdx, llInstanceID, sBodyValue);
}
}
PLG1Err("Unknown smid %d instanceid %lu", iSMID, llInstanceID);
return false;
}
// ...
修改后的代码:
include/phxpaxos/sm.h
// ...
class StateMachine
{
// ...
virtual bool Execute(const int iGroupIdx, const uint64_t llInstanceID,
const std::string & sPaxosValue, SMCtx * poSMCtx) = 0;
virtual bool ExecuteForCheckpoint(const int iGroupIdx, const uint64_t llInstanceID,
const std::string & sPaxosValue);
// 提供batch接口,后面新增的参数分别加入到BatchXXXReq/BatchXXXResp并设置对应的默认值,避免c++虚函数的默认参数引发的不预期行为
struct BatchExecuteReq {
int iGroupIdx{-1};
uint64_t llInstanceID{0};
std::vector<std::pair<const char*, size_t>> vecPaxosValue; // 传指针和长度,避免不必要的字符串拷贝
};
struct BatchExecuteResp {
std::vector<SMCtx *> vecSMCtx;
};
virtual bool BatchExecute(const BatchExecuteReq& req, BatchExecuteResp& resp) {
// need to implement
return false;
}
virtual bool UseBatchExecute() const {
return false;
}
struct BatchExecuteForCheckpointReq {
int iGroupIdx{-1};
uint64_t llInstanceID{0};
std::vector<std::pair<const char*, size_t>> vecPaxosValue; // 传指针和长度,避免不必要的字符串拷贝
};
struct BatchExecuteForCheckpointResp {
BatchExecuteForCheckpointResp() {}
};
virtual bool BatchExecuteForCheckpoint(const BatchExecuteForCheckpointReq& req, BatchExecuteForCheckpointResp& resp) {
// need to implement
return false;
}
virtual bool UseBatchExecuteForCheckpoint() const {
return false;
}
// ...
};
// ...
src/sm-base/sm_base.cpp
bool SMFac :: BatchExecute(const int iGroupIdx, const uint64_t llInstanceID, const std::string & sBodyValue, BatchSMCtx * poBatchSMCtx)
{
BatchPaxosValues oBatchValues;
bool bSucc = oBatchValues.ParseFromArray(sBodyValue.data(), sBodyValue.size());
if (!bSucc)
{
PLG1Err("ParseFromArray fail, valuesize %zu", sBodyValue.size());
return false;
}
if (poBatchSMCtx != nullptr)
{
if ((int)poBatchSMCtx->m_vecSMCtxList.size() != oBatchValues.values_size())
{
PLG1Err("values size %d not equal to smctx size %zu",
oBatchValues.values_size(), poBatchSMCtx->m_vecSMCtxList.size());
return false;
}
}
/*
for (int i = 0; i < oBatchValues.values_size(); i++)
{
const PaxosValue & oValue = oBatchValues.values(i);
SMCtx * poSMCtx = poBatchSMCtx != nullptr ? poBatchSMCtx->m_vecSMCtxList[i] : nullptr;
bool bExecuteSucc = DoExecute(iGroupIdx, llInstanceID, oValue.value(), oValue.smid(), poSMCtx);
if (!bExecuteSucc)
{
return false;
}
}
*/
// 修改代码
std::vector<std::pair<int, statemachine*>> begin_index_and_sm;
int pre_smid = -1;
for (int i = 0; i < oBatchValues.values_size(); i++)
{
const PaxosValue & oValue = oBatchValues.values(i);
int iSMID = oValue.smid();
statemachine* sm = nullptr;
for (auto & poSM : m_vecSMList)
{
if (poSM->SMID() == iSMID)
{
sm = poSM;
break;
}
PLG1Err("Unknown smid %d instanceid %lu", iSMID, llInstanceID);
return false;
}
if (iSMID != pre_smid) {
begin_index_and_sm.emplace_back(i, sm);
}
pre_smid = iSMID;
}
int j = 0;
int i = 0;
while (i < oBatchValues.values_size()) {
statemachine* sm = begin_index_and_sm[j];
bool use_batch = sm->UseBatchExecute();
int k = -1;
if (j != begin_index_and_sm.size() -1) {
k = begin_index_and_sm[j + 1];
} else {
k = oBatchValues.values_size();
}
if (use_batch) {
BatchExecuteReq req;
req.iGroupIdx = iGroupIdx;
req.llInstanceID = llInstanceID;
req.vecPaxosValue.reserve(k - i);
BatchExecutetResp resp;
resp.vecSMCtx.reserve(k - i);
while (i < k) {
req.vecPaxosValue.emplace_back(oBatchValues.values(i).data(), oBatchValues.values(i).size());
SMCtx * poSMCtx = poBatchSMCtx != nullptr ? poBatchSMCtx->m_vecSMCtxList[i] : nullptr;
resp.vecSMCtx.emplace_back(poSMCtx);
++i;
}
bool succ = sm->BatchExecute(req, resp);
if (!succ ) {
return false;
}
} else {
while (i < k) {
SMCtx * poSMCtx = poBatchSMCtx != nullptr ? poBatchSMCtx->m_vecSMCtxList[i] : nullptr;
bool succ = DoExecute(iGroupIdx, llInstanceID, oBatchValues.values(i).value(), oBatchValues.values(i).smid(), poSMCtx);
if (!succ) {
return false;
}
++i;
}
}
++j;
}
return true;
}
bool SMFac :: DoExecute(const int iGroupIdx, const uint64_t llInstanceID,
const std::string & sBodyValue, const int iSMID, SMCtx * poSMCtx)
{
if (iSMID == 0)
{
PLG1Imp("Value no need to do sm, just skip, instanceid %lu", llInstanceID);
return true;
}
if (m_vecSMList.size() == 0)
{
PLG1Imp("No any sm, need wait sm, instanceid %lu", llInstanceID);
return false;
}
for (auto & poSM : m_vecSMList)
{
if (poSM->SMID() == iSMID)
{
// 新增代码
// 处理单个非batch类型paxoslog可以视为BatchExecute的特例
if (poSM->UseBatchExecute()) {
BatchExecuteReq req;
req.iGroupIdx = iGroupIdx;
req.llInstanceID = llInstanceID;
req.vecPaxosValue.emplace_back(sBodyValue.data(), sBodyValue.size());
BatchExecuteResp resp;
resp.vecSMCtx.emplace_back(poSMCtx);
return poSM->BatchExecute(req, resp);
}
return poSM->Execute(iGroupIdx, llInstanceID, sBodyValue, poSMCtx);
}
}
PLG1Err("Unknown smid %d instanceid %lu", iSMID, llInstanceID);
return false;
}
// ...
bool SMFac :: BatchExecuteForCheckpoint(const int iGroupIdx, const uint64_t llInstanceID,
const std::string & sBodyValue)
{
BatchPaxosValues oBatchValues;
bool bSucc = oBatchValues.ParseFromArray(sBodyValue.data(), sBodyValue.size());
if (!bSucc)
{
PLG1Err("ParseFromArray fail, valuesize %zu", sBodyValue.size());
return false;
}
/*
for (int i = 0; i < oBatchValues.values_size(); i++)
{
const PaxosValue & oValue = oBatchValues.values(i);
bool bExecuteSucc = DoExecuteForCheckpoint(iGroupIdx, llInstanceID, oValue.value(), oValue.smid());
if (!bExecuteSucc)
{
return false;
}
}
*/
// 修改代码
std::vector<std::pair<int, statemachine*>> begin_index_and_sm;
int pre_smid = -1;
for (int i = 0; i < oBatchValues.values_size(); i++)
{
const PaxosValue & oValue = oBatchValues.values(i);
int iSMID = oValue.smid();
statemachine* sm = nullptr;
for (auto & poSM : m_vecSMList)
{
if (poSM->SMID() == iSMID)
{
sm = poSM;
break;
}
PLG1Err("Unknown smid %d instanceid %lu", iSMID, llInstanceID);
return false;
}
if (iSMID != pre_smid) {
begin_index_and_sm.emplace_back(i, sm);
}
pre_smid = iSMID;
}
int j = 0;
int i = 0;
while (i < oBatchValues.values_size()) {
statemachine* sm = begin_index_and_sm[j];
bool use_batch = sm->UseBatchExecuteForCheckpoint();
int k = -1;
if (j != begin_index_and_sm.size() -1) {
k = begin_index_and_sm[j + 1];
} else {
k = oBatchValues.values_size();
}
if (use_batch) {
BatchExecuteForCheckpointReq req;
req.iGroupIdx = iGroupIdx;
req.llInstanceID = llInstanceID;
req.vecPaxosValue.reserve(k - i);
while (i < k) {
req.vecPaxosValue.emplace_back(oBatchValues.values(i).data(), oBatchValues.values(i).size());
++i;
}
BatchExecuteForCheckpointResp resp;
bool succ = sm->BatchExecuteForCheckpoint(req, resp);
if (!succ ) {
return false;
}
} else {
while (i < k) {
bool succ = DoExecuteForCheckpoint(iGroupIdx, llInstanceID, oBatchValues.values(i), oBatchValues.values(i).smid());
if (!succ) {
return false;
}
++i;
}
}
++j;
}
return true;
}
bool SMFac :: DoExecuteForCheckpoint(const int iGroupIdx, const uint64_t llInstanceID,
const std::string & sBodyValue, const int iSMID)
{
if (iSMID == 0)
{
PLG1Imp("Value no need to do sm, just skip, instanceid %lu", llInstanceID);
return true;
}
if (m_vecSMList.size() == 0)
{
PLG1Imp("No any sm, need wait sm, instanceid %lu", llInstanceID);
return false;
}
for (auto & poSM : m_vecSMList)
{
if (poSM->SMID() == iSMID)
{
// 新增代码
// 处理单个非batch类型paxoslog可以视为BatchExecuteForCheckpoint的特例
if (poSM->UseBatchExecuteForCheckpoint()) {
BatchExecuteForCheckpointReq req;
req.iGroupIdx = iGroupIdx;
req.llInstanceID = llInstanceID;
req.vecPaxosValue.emplace_back(sBodyValue.data(), sBodyValue.size());
BatchExecuteForCheckpointResp resp;
return poSM->BatchExecuteForCheckpoint(req, resp);
}
return poSM->ExecuteForCheckpoint(iGroupIdx, llInstanceID, sBodyValue);
}
}
PLG1Err("Unknown smid %d instanceid %lu", iSMID, llInstanceID);
return false;
}
// ...
StateMachine没有batch执行接口,未能充分利用batch能力
由于StateMachine没有batch执行接口,即使遇到一个batch类型的paxoslog(包含N个非batch类型的paxoslog),也得执行N次StateMachine::Execute/StateMachine::ExecuteForCheckpoint接口,未能充分利用batch能力。
通常状态机执行时会伴随写盘操作。把同属一个状态机的连续M次执行合并到1次执行通常能大大降低写盘操作的次数,能更充分利用batch能力。
phxpaxos可以为状态机提供batch执行接口,进一步提高性能。
原代码路径:
include/phxpaxos/sm.h
src/sm-base/sm_base.cpp
修改后的代码:
include/phxpaxos/sm.h
src/sm-base/sm_base.cpp