Skip to content

StateMachine没有batch执行接口,未能充分利用batch能力 #248

@dyx2025

Description

@dyx2025

StateMachine没有batch执行接口,未能充分利用batch能力

由于StateMachine没有batch执行接口,即使遇到一个batch类型的paxoslog(包含N个非batch类型的paxoslog),也得执行N次StateMachine::Execute/StateMachine::ExecuteForCheckpoint接口,未能充分利用batch能力。

通常状态机执行时会伴随写盘操作。把同属一个状态机的连续M次执行合并到1次执行通常能大大降低写盘操作的次数,能更充分利用batch能力。

phxpaxos可以为状态机提供batch执行接口,进一步提高性能。

原代码路径:
include/phxpaxos/sm.h

// ...

class StateMachine
{

// ...

    // 没有batch执行接口,一次只能执行一个非batch类型的paxoslog
   
    virtual bool Execute(const int iGroupIdx, const uint64_t llInstanceID, 
            const std::string & sPaxosValue, SMCtx * poSMCtx) = 0;

    virtual bool ExecuteForCheckpoint(const int iGroupIdx, const uint64_t llInstanceID, 
            const std::string & sPaxosValue);

// ...

};

// ...

src/sm-base/sm_base.cpp

// ...

bool SMFac :: BatchExecute(const int iGroupIdx, const uint64_t llInstanceID, const std::string & sBodyValue, BatchSMCtx * poBatchSMCtx)
{
    BatchPaxosValues oBatchValues;
    bool bSucc = oBatchValues.ParseFromArray(sBodyValue.data(), sBodyValue.size());
    if (!bSucc)
    {   
        PLG1Err("ParseFromArray fail, valuesize %zu", sBodyValue.size());
        return false;
    }   

    if (poBatchSMCtx != nullptr) 
    {   
        if ((int)poBatchSMCtx->m_vecSMCtxList.size() != oBatchValues.values_size())
        {   
            PLG1Err("values size %d not equal to smctx size %zu",
                    oBatchValues.values_size(), poBatchSMCtx->m_vecSMCtxList.size());
            return false;
        }   
    }   

    for (int i = 0; i < oBatchValues.values_size(); i++)
    {   
        const PaxosValue & oValue = oBatchValues.values(i);
        SMCtx * poSMCtx = poBatchSMCtx != nullptr ? poBatchSMCtx->m_vecSMCtxList[i] : nullptr;
        bool bExecuteSucc = DoExecute(iGroupIdx, llInstanceID, oValue.value(), oValue.smid(), poSMCtx);  // 即使遇到一个batch类型的paxoslog(包含N个非batch类型的paxoslog),也得执行N次StateMachine::Execute
        if (!bExecuteSucc)                                                                                                                                                          
        {
            return false;
        }
    }

    return true;
}


bool SMFac :: DoExecute(const int iGroupIdx, const uint64_t llInstanceID,                                                                                                           
        const std::string & sBodyValue, const int iSMID, SMCtx * poSMCtx)
{
    if (iSMID == 0)
    {   
        PLG1Imp("Value no need to do sm, just skip, instanceid %lu", llInstanceID);
        return true;
    }
    
    if (m_vecSMList.size() == 0)
    {   
        PLG1Imp("No any sm, need wait sm, instanceid %lu", llInstanceID);
        return false;
    }
    
    for (auto & poSM : m_vecSMList)
    {   
        if (poSM->SMID() == iSMID)
        {   
            return poSM->Execute(iGroupIdx, llInstanceID, sBodyValue, poSMCtx);
        }
    }
    
    PLG1Err("Unknown smid %d instanceid %lu", iSMID, llInstanceID);
    return false;
}

// ...

bool SMFac :: BatchExecuteForCheckpoint(const int iGroupIdx, const uint64_t llInstanceID, 
        const std::string & sBodyValue)
{
    BatchPaxosValues oBatchValues;
    bool bSucc = oBatchValues.ParseFromArray(sBodyValue.data(), sBodyValue.size());
    if (!bSucc)
    {
        PLG1Err("ParseFromArray fail, valuesize %zu", sBodyValue.size());
        return false;
    }

    for (int i = 0; i < oBatchValues.values_size(); i++)
    {
        const PaxosValue & oValue = oBatchValues.values(i);
        bool bExecuteSucc = DoExecuteForCheckpoint(iGroupIdx, llInstanceID, oValue.value(), oValue.smid()); // 即使遇到一个batch类型的paxoslog(包含N个非batch类型的paxoslog),也得执行N次StateMachine::ExecuteForCheckpoint
        if (!bExecuteSucc)
        {
            return false;
        }
    }

    return true;
}

bool SMFac :: DoExecuteForCheckpoint(const int iGroupIdx, const uint64_t llInstanceID, 
        const std::string & sBodyValue, const int iSMID)
{
    if (iSMID == 0)
    {
        PLG1Imp("Value no need to do sm, just skip, instanceid %lu", llInstanceID);
        return true;
    }

    if (m_vecSMList.size() == 0)
    {
        PLG1Imp("No any sm, need wait sm, instanceid %lu", llInstanceID);
        return false;
    }

    for (auto & poSM : m_vecSMList)
    {
        if (poSM->SMID() == iSMID)
        {
            return poSM->ExecuteForCheckpoint(iGroupIdx, llInstanceID, sBodyValue);
        }
    }

    PLG1Err("Unknown smid %d instanceid %lu", iSMID, llInstanceID);

    return false;
}


// ...

修改后的代码:
include/phxpaxos/sm.h

// ...

class StateMachine
{

// ...
   
    virtual bool Execute(const int iGroupIdx, const uint64_t llInstanceID, 
            const std::string & sPaxosValue, SMCtx * poSMCtx) = 0;

    virtual bool ExecuteForCheckpoint(const int iGroupIdx, const uint64_t llInstanceID, 
            const std::string & sPaxosValue);

   // 提供batch接口,后面新增的参数分别加入到BatchXXXReq/BatchXXXResp并设置对应的默认值,避免c++虚函数的默认参数引发的不预期行为

   struct BatchExecuteReq {
          int iGroupIdx{-1};
          uint64_t llInstanceID{0};
          std::vector<std::pair<const char*, size_t>> vecPaxosValue;  // 传指针和长度,避免不必要的字符串拷贝
   }; 

   struct BatchExecuteResp {
           std::vector<SMCtx *> vecSMCtx;
   };


    virtual bool BatchExecute(const BatchExecuteReq& req, BatchExecuteResp& resp) {
        // need to implement
        return false;       
    }

    virtual bool UseBatchExecute() const {
         return false;
    }

   struct BatchExecuteForCheckpointReq {
          int iGroupIdx{-1};
          uint64_t llInstanceID{0};
          std::vector<std::pair<const char*, size_t>> vecPaxosValue;  // 传指针和长度,避免不必要的字符串拷贝
   }; 

   struct BatchExecuteForCheckpointResp {

           BatchExecuteForCheckpointResp() {}
   };


    virtual bool BatchExecuteForCheckpoint(const BatchExecuteForCheckpointReq& req, BatchExecuteForCheckpointResp& resp) {
        // need to implement
        return false;       
    }

    virtual bool UseBatchExecuteForCheckpoint() const {
         return false;
    }


// ...

};

// ...

src/sm-base/sm_base.cpp

bool SMFac :: BatchExecute(const int iGroupIdx, const uint64_t llInstanceID, const std::string & sBodyValue, BatchSMCtx * poBatchSMCtx)
{
    BatchPaxosValues oBatchValues;
    bool bSucc = oBatchValues.ParseFromArray(sBodyValue.data(), sBodyValue.size());
    if (!bSucc)
    {   
        PLG1Err("ParseFromArray fail, valuesize %zu", sBodyValue.size());
        return false;
    }
    
    if (poBatchSMCtx != nullptr)
    {   
        if ((int)poBatchSMCtx->m_vecSMCtxList.size() != oBatchValues.values_size())
        {   
            PLG1Err("values size %d not equal to smctx size %zu",
                    oBatchValues.values_size(), poBatchSMCtx->m_vecSMCtxList.size());
            return false;
        }
    }

    /* 
    for (int i = 0; i < oBatchValues.values_size(); i++)
    {   
        const PaxosValue & oValue = oBatchValues.values(i);
        SMCtx * poSMCtx = poBatchSMCtx != nullptr ? poBatchSMCtx->m_vecSMCtxList[i] : nullptr;
        bool bExecuteSucc = DoExecute(iGroupIdx, llInstanceID, oValue.value(), oValue.smid(), poSMCtx);
        if (!bExecuteSucc)
        {   
            return false;
        }
    }
    */

    // 修改代码
    std::vector<std::pair<int, statemachine*>> begin_index_and_sm;
    int pre_smid = -1;
    for (int i = 0; i < oBatchValues.values_size(); i++)
    {
        const PaxosValue & oValue = oBatchValues.values(i);      
        int iSMID = oValue.smid();
        statemachine* sm = nullptr;
        for (auto & poSM : m_vecSMList)
        {
            if (poSM->SMID() == iSMID)
            {
                sm = poSM;
                break;
            }

            PLG1Err("Unknown smid %d instanceid %lu", iSMID, llInstanceID);
            return false;
        }

        if (iSMID != pre_smid) {
            begin_index_and_sm.emplace_back(i, sm);
        }

        pre_smid = iSMID;
    }

    int j = 0;
    int i = 0;
    while (i < oBatchValues.values_size()) {
        statemachine* sm = begin_index_and_sm[j];
        bool use_batch = sm->UseBatchExecute(); 
 
        int k = -1;
        if (j != begin_index_and_sm.size() -1) {
            k = begin_index_and_sm[j + 1];
        } else {
            k = oBatchValues.values_size();
        }
   
        if (use_batch) {
             BatchExecuteReq req;
             req.iGroupIdx = iGroupIdx;
             req.llInstanceID = llInstanceID;
             req.vecPaxosValue.reserve(k - i);

             BatchExecutetResp resp;
             resp.vecSMCtx.reserve(k - i);

             while (i < k) {
                 req.vecPaxosValue.emplace_back(oBatchValues.values(i).data(), oBatchValues.values(i).size());

                 SMCtx * poSMCtx = poBatchSMCtx != nullptr ? poBatchSMCtx->m_vecSMCtxList[i] : nullptr;
                 resp.vecSMCtx.emplace_back(poSMCtx);

                 ++i;
             }
              bool succ =  sm->BatchExecute(req, resp);
              if (!succ ) {
                  return false;
              }     
 
        } else {
             while (i < k) {
                  SMCtx * poSMCtx = poBatchSMCtx != nullptr ? poBatchSMCtx->m_vecSMCtxList[i] : nullptr;
                  bool succ = DoExecute(iGroupIdx, llInstanceID, oBatchValues.values(i).value(), oBatchValues.values(i).smid(), poSMCtx);
                  if (!succ) {
                      return false;
                  }  
                 ++i;
             }
        }

        ++j;
    }
   
    
    return true;
}

bool SMFac :: DoExecute(const int iGroupIdx, const uint64_t llInstanceID, 
        const std::string & sBodyValue, const int iSMID, SMCtx * poSMCtx)
{
    if (iSMID == 0)
    {
        PLG1Imp("Value no need to do sm, just skip, instanceid %lu", llInstanceID);
        return true;
    }

    if (m_vecSMList.size() == 0)
    {
        PLG1Imp("No any sm, need wait sm, instanceid %lu", llInstanceID);
        return false;
    }

    for (auto & poSM : m_vecSMList)
    {
        if (poSM->SMID() == iSMID)
        {
            // 新增代码
            // 处理单个非batch类型paxoslog可以视为BatchExecute的特例
            if (poSM->UseBatchExecute()) {
                BatchExecuteReq req;
                req.iGroupIdx = iGroupIdx;
                req.llInstanceID = llInstanceID;
                req.vecPaxosValue.emplace_back(sBodyValue.data(), sBodyValue.size());

                BatchExecuteResp resp;
                resp.vecSMCtx.emplace_back(poSMCtx);

                return poSM->BatchExecute(req, resp);
            }            

            return poSM->Execute(iGroupIdx, llInstanceID, sBodyValue, poSMCtx);
        }
    }

    PLG1Err("Unknown smid %d instanceid %lu", iSMID, llInstanceID);
    return false;
}

// ...

bool SMFac :: BatchExecuteForCheckpoint(const int iGroupIdx, const uint64_t llInstanceID, 
        const std::string & sBodyValue)
{
    BatchPaxosValues oBatchValues;
    bool bSucc = oBatchValues.ParseFromArray(sBodyValue.data(), sBodyValue.size());
    if (!bSucc)
    {
        PLG1Err("ParseFromArray fail, valuesize %zu", sBodyValue.size());
        return false;
    }

    /*
    for (int i = 0; i < oBatchValues.values_size(); i++)
    {
        const PaxosValue & oValue = oBatchValues.values(i);
        bool bExecuteSucc = DoExecuteForCheckpoint(iGroupIdx, llInstanceID, oValue.value(), oValue.smid());
        if (!bExecuteSucc)
        {
            return false;
        }
    }
    */

    // 修改代码
    std::vector<std::pair<int, statemachine*>> begin_index_and_sm;
    int pre_smid = -1;
    for (int i = 0; i < oBatchValues.values_size(); i++)
    {
        const PaxosValue & oValue = oBatchValues.values(i);      
        int iSMID = oValue.smid();
        statemachine* sm = nullptr;
        for (auto & poSM : m_vecSMList)
        {
            if (poSM->SMID() == iSMID)
            {
                sm = poSM;
                break;
            }

            PLG1Err("Unknown smid %d instanceid %lu", iSMID, llInstanceID);
            return false;
        }

        if (iSMID != pre_smid) {
            begin_index_and_sm.emplace_back(i, sm);
        }

        pre_smid = iSMID;
    }

    int j = 0;
    int i = 0;
    while (i < oBatchValues.values_size()) {
        statemachine* sm = begin_index_and_sm[j];
        bool use_batch = sm->UseBatchExecuteForCheckpoint(); 
 
        int k = -1;
        if (j != begin_index_and_sm.size() -1) {
            k = begin_index_and_sm[j + 1];
        } else {
            k = oBatchValues.values_size();
        }
   
        if (use_batch) {
             BatchExecuteForCheckpointReq req;
             req.iGroupIdx = iGroupIdx;
             req.llInstanceID = llInstanceID;
             req.vecPaxosValue.reserve(k - i);
             while (i < k) {
                 req.vecPaxosValue.emplace_back(oBatchValues.values(i).data(), oBatchValues.values(i).size());
                 ++i;
             }

             BatchExecuteForCheckpointResp resp;

              bool succ =  sm->BatchExecuteForCheckpoint(req, resp);
              if (!succ ) {
                  return false;
              }     
 
        } else {
             while (i < k) {
                  bool succ = DoExecuteForCheckpoint(iGroupIdx, llInstanceID, oBatchValues.values(i), oBatchValues.values(i).smid());
                  if (!succ) {
                      return false;
                  }  
                 ++i;
             }
        }

        ++j;
    }

    return true;
}

bool SMFac :: DoExecuteForCheckpoint(const int iGroupIdx, const uint64_t llInstanceID, 
        const std::string & sBodyValue, const int iSMID)
{
    if (iSMID == 0)
    {
        PLG1Imp("Value no need to do sm, just skip, instanceid %lu", llInstanceID);
        return true;
    }

    if (m_vecSMList.size() == 0)
    {
        PLG1Imp("No any sm, need wait sm, instanceid %lu", llInstanceID);
        return false;
    }

    for (auto & poSM : m_vecSMList)
    {
        if (poSM->SMID() == iSMID)
        {
           // 新增代码
           // 处理单个非batch类型paxoslog可以视为BatchExecuteForCheckpoint的特例
           if (poSM->UseBatchExecuteForCheckpoint()) {
                BatchExecuteForCheckpointReq req;
                req.iGroupIdx = iGroupIdx;
                req.llInstanceID = llInstanceID;
                req.vecPaxosValue.emplace_back(sBodyValue.data(), sBodyValue.size());

                BatchExecuteForCheckpointResp resp;

                return poSM->BatchExecuteForCheckpoint(req, resp);               
           }

            return poSM->ExecuteForCheckpoint(iGroupIdx, llInstanceID, sBodyValue);
        }
    }

    PLG1Err("Unknown smid %d instanceid %lu", iSMID, llInstanceID);

    return false;
}

// ...

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions