diff --git a/pkg/block/laserstream.go b/pkg/block/laserstream.go index fcce9604..0ce82ebd 100644 --- a/pkg/block/laserstream.go +++ b/pkg/block/laserstream.go @@ -1,18 +1,20 @@ package block import ( - //laserstream "github.com/helius-labs/laserstream-sdk/go" "fmt" "math" + "time" - "github.com/Overclock-Validator/mithril/pkg/mlog" - "github.com/Overclock-Validator/mithril/pkg/rpcclient" "github.com/gagliardetto/solana-go" "github.com/gagliardetto/solana-go/rpc" "github.com/rpcpool/yellowstone-grpc/examples/golang/proto" ) -func FromLaserStream(lsBlock *proto.SubscribeUpdateBlock, rpcc *rpcclient.RpcClient) *Block { +type LeaderFetcher interface { + GetLeaderForSlot(slot uint64) (solana.PublicKey, error) +} + +func FromLaserStream(lsBlock *proto.SubscribeUpdateBlock, rpcc LeaderFetcher) *Block { block := &Block{} block.Slot = lsBlock.GetSlot() @@ -31,7 +33,6 @@ func FromLaserStream(lsBlock *proto.SubscribeUpdateBlock, rpcc *rpcclient.RpcCli block.UnixTimestamp = lsBlock.BlockTime.Timestamp block.BlockHeight = lsBlock.BlockHeight.BlockHeight - // rewards for _, r := range lsBlock.Rewards.Rewards { convertedReward := lsBlockRewardToBlockReward(r) block.Rewards = append(block.Rewards, convertedReward) @@ -48,13 +49,21 @@ func FromLaserStream(lsBlock *proto.SubscribeUpdateBlock, rpcc *rpcclient.RpcCli block.BlockReward = &BlockRewardsInfo{Leader: blockReward.Pubkey, Lamports: uint64(blockReward.Lamports), PostBalance: blockReward.PostBalance} } else { if rpcc != nil { - mlog.Log.Infof("calling into rpc for leader info") - leaderForSlot, err := rpcc.GetLeaderForSlot(lsBlock.Slot) + var leaderForSlot solana.PublicKey + var err error + for attempt := 0; attempt < maxRetriesGetLeaderForSlot; attempt++ { + leaderForSlot, err = rpcc.GetLeaderForSlot(lsBlock.Slot) + if err == nil { + break + } + if attempt < maxRetriesGetLeaderForSlot-1 { + time.Sleep(time.Duration(attempt+1) * time.Duration(baseBackoffMs) * time.Millisecond) + } + } if err != nil { - panic(fmt.Sprintf("unable to get blockreward for slot %d", lsBlock.Slot)) - } else { - block.BlockReward = &BlockRewardsInfo{Leader: leaderForSlot} + panic(fmt.Sprintf("unable to get blockreward for slot %d after %d attempts: %v", lsBlock.Slot, maxRetriesGetLeaderForSlot, err)) } + block.BlockReward = &BlockRewardsInfo{Leader: leaderForSlot} } } diff --git a/pkg/block/laserstream_integration_test.go b/pkg/block/laserstream_integration_test.go new file mode 100644 index 00000000..225f5617 --- /dev/null +++ b/pkg/block/laserstream_integration_test.go @@ -0,0 +1,239 @@ +package block + +import ( + "context" + "fmt" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/gagliardetto/solana-go" + "github.com/rpcpool/yellowstone-grpc/examples/golang/proto" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type MockRpcClientForIntegration struct { + callCount int32 + failUntilCall int32 + leaderToReturn solana.PublicKey + callTimestamps []time.Time + mu sync.Mutex +} + +func NewMockRpcClientForIntegration(failUntilCall int32, leader solana.PublicKey) *MockRpcClientForIntegration { + return &MockRpcClientForIntegration{ + failUntilCall: failUntilCall, + leaderToReturn: leader, + callTimestamps: make([]time.Time, 0), + } +} + +func (m *MockRpcClientForIntegration) GetLeaderForSlot(slot uint64) (solana.PublicKey, error) { + m.mu.Lock() + m.callTimestamps = append(m.callTimestamps, time.Now()) + m.mu.Unlock() + + callNum := atomic.AddInt32(&m.callCount, 1) + if callNum <= m.failUntilCall { + return solana.PublicKey{}, fmt.Errorf("simulated RPC failure (attempt %d)", callNum) + } + return m.leaderToReturn, nil +} + +func (m *MockRpcClientForIntegration) GetCallCount() int32 { + return atomic.LoadInt32(&m.callCount) +} + +func (m *MockRpcClientForIntegration) GetCallTimestamps() []time.Time { + m.mu.Lock() + defer m.mu.Unlock() + timestamps := make([]time.Time, len(m.callTimestamps)) + copy(timestamps, m.callTimestamps) + return timestamps +} + +func TestIntegration_RetrySucceedsAfterTransientFailure(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + testLeader := solana.MustPublicKeyFromBase58("HLwR8nYj9tLdLvXEYQfCDh38AGMYCjHYWvNT9Dwc9Ky") + mock := NewMockRpcClientForIntegration(2, testLeader) + + lsBlock := &proto.SubscribeUpdateBlock{ + Slot: 100, + Blockhash: "EhYXq3bK8yWAPsVKLy53ysrHP1RPADvg6oDEXn7dgKda", + BlockHeight: &proto.BlockHeight{BlockHeight: 200}, + BlockTime: &proto.UnixTimestamp{ + Timestamp: 1000, + }, + ParentBlockhash: "EhYXq3bK8yWAPsVKLy53ysrHP1RPADvg6oDEXn7dgKda", + Rewards: &proto.Rewards{ + Rewards: []*proto.Reward{}, + }, + } + + start := time.Now() + block := fromLaserStreamWithMock(lsBlock, mock) + elapsed := time.Since(start) + + require.NotNil(t, block) + require.NotNil(t, block.BlockReward) + assert.Equal(t, testLeader, block.BlockReward.Leader) + + assert.Equal(t, int32(3), mock.GetCallCount()) + assert.GreaterOrEqual(t, elapsed, 300*time.Millisecond) + + t.Logf("✓ Retry succeeded after 2 transient failures in %v", elapsed) +} + +func TestIntegration_ExponentialBackoffTiming(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + testLeader := solana.MustPublicKeyFromBase58("HLwR8nYj9tLdLvXEYQfCDh38AGMYCjHYWvNT9Dwc9Ky") + mock := NewMockRpcClientForIntegration(4, testLeader) + + lsBlock := &proto.SubscribeUpdateBlock{ + Slot: 100, + Blockhash: "EhYXq3bK8yWAPsVKLy53ysrHP1RPADvg6oDEXn7dgKda", + BlockHeight: &proto.BlockHeight{BlockHeight: 200}, + BlockTime: &proto.UnixTimestamp{ + Timestamp: 1000, + }, + ParentBlockhash: "EhYXq3bK8yWAPsVKLy53ysrHP1RPADvg6oDEXn7dgKda", + Rewards: &proto.Rewards{ + Rewards: []*proto.Reward{}, + }, + } + + start := time.Now() + block := fromLaserStreamWithMock(lsBlock, mock) + elapsed := time.Since(start) + + require.NotNil(t, block.BlockReward) + assert.Equal(t, testLeader, block.BlockReward.Leader) + assert.Equal(t, int32(5), mock.GetCallCount()) + + expectedMinDelay := 1000 * time.Millisecond + expectedMaxDelay := 1100 * time.Millisecond + assert.GreaterOrEqual(t, elapsed, expectedMinDelay) + assert.Less(t, elapsed, expectedMaxDelay) + + timestamps := mock.GetCallTimestamps() + require.Equal(t, 5, len(timestamps)) + + for i := 1; i < len(timestamps); i++ { + actualDelay := timestamps[i].Sub(timestamps[i-1]) + expectedDelay := time.Duration(i*100) * time.Millisecond + assert.Greater(t, actualDelay, expectedDelay-50*time.Millisecond) + } + + t.Logf("✓ Exponential backoff verified: %d calls in %v", mock.GetCallCount(), elapsed) +} + +func TestIntegration_PanicAfterAllRetriesExhausted(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + testLeader := solana.MustPublicKeyFromBase58("HLwR8nYj9tLdLvXEYQfCDh38AGMYCjHYWvNT9Dwc9Ky") + mock := NewMockRpcClientForIntegration(999, testLeader) + + lsBlock := &proto.SubscribeUpdateBlock{ + Slot: 100, + Blockhash: "EhYXq3bK8yWAPsVKLy53ysrHP1RPADvg6oDEXn7dgKda", + BlockHeight: &proto.BlockHeight{BlockHeight: 200}, + BlockTime: &proto.UnixTimestamp{ + Timestamp: 1000, + }, + ParentBlockhash: "EhYXq3bK8yWAPsVKLy53ysrHP1RPADvg6oDEXn7dgKda", + Rewards: &proto.Rewards{ + Rewards: []*proto.Reward{}, + }, + } + + defer func() { + if r := recover(); r != nil { + panicMsg := r.(string) + assert.Contains(t, panicMsg, "unable to get blockreward for slot 100") + assert.Contains(t, panicMsg, "after 10 attempts") + assert.Contains(t, panicMsg, "simulated RPC failure") + assert.Equal(t, int32(10), mock.GetCallCount()) + t.Logf("✓ Panic correctly triggered after 10 failed attempts") + return + } + t.Fatal("Expected panic but none occurred") + }() + + fromLaserStreamWithMock(lsBlock, mock) +} + +func TestIntegration_SuccessOnFirstAttempt(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + testLeader := solana.MustPublicKeyFromBase58("HLwR8nYj9tLdLvXEYQfCDh38AGMYCjHYWvNT9Dwc9Ky") + mock := NewMockRpcClientForIntegration(0, testLeader) + + lsBlock := &proto.SubscribeUpdateBlock{ + Slot: 100, + Blockhash: "EhYXq3bK8yWAPsVKLy53ysrHP1RPADvg6oDEXn7dgKda", + BlockHeight: &proto.BlockHeight{BlockHeight: 200}, + BlockTime: &proto.UnixTimestamp{ + Timestamp: 1000, + }, + ParentBlockhash: "EhYXq3bK8yWAPsVKLy53ysrHP1RPADvg6oDEXn7dgKda", + Rewards: &proto.Rewards{ + Rewards: []*proto.Reward{}, + }, + } + + start := time.Now() + block := fromLaserStreamWithMock(lsBlock, mock) + elapsed := time.Since(start) + + require.NotNil(t, block.BlockReward) + assert.Equal(t, testLeader, block.BlockReward.Leader) + assert.Equal(t, int32(1), mock.GetCallCount()) + assert.Less(t, elapsed, 100*time.Millisecond) + + t.Logf("✓ Success on first attempt: 1 call in %v", elapsed) +} + +func fromLaserStreamWithMock(lsBlock *proto.SubscribeUpdateBlock, mock *MockRpcClientForIntegration) *Block { + block := &Block{} + block.Slot = lsBlock.GetSlot() + block.BlockHeight = lsBlock.BlockHeight.BlockHeight + block.UnixTimestamp = lsBlock.BlockTime.Timestamp + block.Blockhash = solana.MustHashFromBase58(lsBlock.Blockhash) + block.LastBlockhash = solana.MustHashFromBase58(lsBlock.ParentBlockhash) + + blockReward := blockRewardRewards(block.Rewards) + if blockReward != nil { + block.BlockReward = &BlockRewardsInfo{Leader: blockReward.Pubkey} + } else { + if mock != nil { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + result, err := RetryWithExponentialBackoff(ctx, maxRetriesGetLeaderForSlot, func(retryCtx context.Context) (interface{}, error) { + return mock.GetLeaderForSlot(lsBlock.Slot) + }) + + if err != nil { + panic(fmt.Sprintf("unable to get blockreward for slot %d after %d attempts: %v", + lsBlock.Slot, maxRetriesGetLeaderForSlot, err)) + } + + leaderForSlot := result.(solana.PublicKey) + block.BlockReward = &BlockRewardsInfo{Leader: leaderForSlot} + } + } + + return block +} diff --git a/pkg/block/laserstream_test.go b/pkg/block/laserstream_test.go new file mode 100644 index 00000000..2f6764b8 --- /dev/null +++ b/pkg/block/laserstream_test.go @@ -0,0 +1,209 @@ +package block + +import ( + "fmt" + "sync/atomic" + "testing" + + "github.com/gagliardetto/solana-go" + "github.com/rpcpool/yellowstone-grpc/examples/golang/proto" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type mockRpcClient struct { + callCount int32 + failUntilCall int32 + leaderToReturn solana.PublicKey +} + +func newMockRpcClient(failUntilCall int32, leader solana.PublicKey) *mockRpcClient { + return &mockRpcClient{ + failUntilCall: failUntilCall, + leaderToReturn: leader, + } +} + +func (m *mockRpcClient) GetLeaderForSlot(slot uint64) (solana.PublicKey, error) { + callNum := atomic.AddInt32(&m.callCount, 1) + if callNum <= m.failUntilCall { + return solana.PublicKey{}, fmt.Errorf("mock RPC failure (call %d)", callNum) + } + return m.leaderToReturn, nil +} + +func (m *mockRpcClient) GetCallCount() int32 { + return atomic.LoadInt32(&m.callCount) +} + +func TestFromLaserStream_BlockRewardFromRewards(t *testing.T) { + testLeader := solana.MustPublicKeyFromBase58("HLwR8nYj9tLdLvXEYQfCDh38AGMYCjHYWvNT9Dwc9Ky") + + lsBlock := &proto.SubscribeUpdateBlock{ + Slot: 100, + Blockhash: "EhYXq3bK8yWAPsVKLy53ysrHP1RPADvg6oDEXn7dgKda", + BlockHeight: &proto.BlockHeight{BlockHeight: 200}, + BlockTime: &proto.UnixTimestamp{ + Timestamp: 1000, + }, + ParentBlockhash: "EhYXq3bK8yWAPsVKLy53ysrHP1RPADvg6oDEXn7dgKda", + Rewards: &proto.Rewards{ + Rewards: []*proto.Reward{ + { + Pubkey: testLeader.String(), + Lamports: 5000, + PostBalance: 10000, + RewardType: proto.RewardType_Fee, + }, + }, + }, + } + + block := FromLaserStream(lsBlock, nil) + + require.NotNil(t, block.BlockReward) + assert.Equal(t, testLeader, block.BlockReward.Leader) + assert.Equal(t, uint64(5000), block.BlockReward.Lamports) + assert.Equal(t, uint64(10000), block.BlockReward.PostBalance) +} + +func TestFromLaserStream_NoRpcClient(t *testing.T) { + lsBlock := &proto.SubscribeUpdateBlock{ + Slot: 100, + Blockhash: "EhYXq3bK8yWAPsVKLy53ysrHP1RPADvg6oDEXn7dgKda", + BlockHeight: &proto.BlockHeight{BlockHeight: 200}, + BlockTime: &proto.UnixTimestamp{ + Timestamp: 1000, + }, + ParentBlockhash: "EhYXq3bK8yWAPsVKLy53ysrHP1RPADvg6oDEXn7dgKda", + Rewards: &proto.Rewards{ + Rewards: []*proto.Reward{}, + }, + } + + block := FromLaserStream(lsBlock, nil) + + require.NotNil(t, block) + assert.Nil(t, block.BlockReward) + assert.Equal(t, uint64(100), block.Slot) +} + +func TestFromLaserStream_RetrySucceeds(t *testing.T) { + testLeader := solana.MustPublicKeyFromBase58("HLwR8nYj9tLdLvXEYQfCDh38AGMYCjHYWvNT9Dwc9Ky") + mock := newMockRpcClient(2, testLeader) + + lsBlock := &proto.SubscribeUpdateBlock{ + Slot: 100, + Blockhash: "EhYXq3bK8yWAPsVKLy53ysrHP1RPADvg6oDEXn7dgKda", + BlockHeight: &proto.BlockHeight{BlockHeight: 200}, + BlockTime: &proto.UnixTimestamp{ + Timestamp: 1000, + }, + ParentBlockhash: "EhYXq3bK8yWAPsVKLy53ysrHP1RPADvg6oDEXn7dgKda", + Rewards: &proto.Rewards{ + Rewards: []*proto.Reward{}, + }, + } + + block := FromLaserStream(lsBlock, mock) + require.NotNil(t, block) + require.NotNil(t, block.BlockReward) + + assert.Equal(t, testLeader, block.BlockReward.Leader) + assert.Equal(t, int32(3), mock.GetCallCount()) +} + +func TestFromLaserStream_RetryPanicsAfterMaxAttempts(t *testing.T) { + testLeader := solana.MustPublicKeyFromBase58("HLwR8nYj9tLdLvXEYQfCDh38AGMYCjHYWvNT9Dwc9Ky") + mock := newMockRpcClient(999, testLeader) + + lsBlock := &proto.SubscribeUpdateBlock{ + Slot: 100, + Blockhash: "EhYXq3bK8yWAPsVKLy53ysrHP1RPADvg6oDEXn7dgKda", + BlockHeight: &proto.BlockHeight{BlockHeight: 200}, + BlockTime: &proto.UnixTimestamp{ + Timestamp: 1000, + }, + ParentBlockhash: "EhYXq3bK8yWAPsVKLy53ysrHP1RPADvg6oDEXn7dgKda", + Rewards: &proto.Rewards{ + Rewards: []*proto.Reward{}, + }, + } + + defer func() { + if r := recover(); r != nil { + panicMsg := r.(string) + assert.Contains(t, panicMsg, "unable to get blockreward for slot 100") + assert.Contains(t, panicMsg, "after 10 attempts") + assert.Equal(t, int32(10), mock.GetCallCount()) + } else { + t.Fatal("expected panic but none occurred") + } + }() + + FromLaserStream(lsBlock, mock) +} + +func TestFromLaserStream_TransactionsParsing(t *testing.T) { + lsBlock := &proto.SubscribeUpdateBlock{ + Slot: 100, + Blockhash: "EhYXq3bK8yWAPsVKLy53ysrHP1RPADvg6oDEXn7dgKda", + BlockHeight: &proto.BlockHeight{BlockHeight: 200}, + BlockTime: &proto.UnixTimestamp{ + Timestamp: 1000, + }, + ParentBlockhash: "EhYXq3bK8yWAPsVKLy53ysrHP1RPADvg6oDEXn7dgKda", + Rewards: &proto.Rewards{ + Rewards: []*proto.Reward{}, + }, + } + + block := FromLaserStream(lsBlock, nil) + + assert.Equal(t, uint64(100), block.Slot) + assert.Equal(t, uint64(200), block.BlockHeight) + assert.Equal(t, int64(1000), block.UnixTimestamp) + assert.Len(t, block.Transactions, 0) +} + +func TestFromLaserStream_RewardPartitions(t *testing.T) { + numPartitions := uint64(4) + + lsBlock := &proto.SubscribeUpdateBlock{ + Slot: 100, + Blockhash: "EhYXq3bK8yWAPsVKLy53ysrHP1RPADvg6oDEXn7dgKda", + BlockHeight: &proto.BlockHeight{BlockHeight: 200}, + BlockTime: &proto.UnixTimestamp{ + Timestamp: 1000, + }, + ParentBlockhash: "EhYXq3bK8yWAPsVKLy53ysrHP1RPADvg6oDEXn7dgKda", + Rewards: &proto.Rewards{ + Rewards: []*proto.Reward{}, + NumPartitions: &proto.NumPartitions{NumPartitions: numPartitions}, + }, + } + + block := FromLaserStream(lsBlock, nil) + + assert.Equal(t, numPartitions, block.NumRewardPartitions) +} + +func TestFromLaserStream_RewardPartitions_Nil(t *testing.T) { + lsBlock := &proto.SubscribeUpdateBlock{ + Slot: 100, + Blockhash: "EhYXq3bK8yWAPsVKLy53ysrHP1RPADvg6oDEXn7dgKda", + BlockHeight: &proto.BlockHeight{BlockHeight: 200}, + BlockTime: &proto.UnixTimestamp{ + Timestamp: 1000, + }, + ParentBlockhash: "EhYXq3bK8yWAPsVKLy53ysrHP1RPADvg6oDEXn7dgKda", + Rewards: &proto.Rewards{ + Rewards: []*proto.Reward{}, + NumPartitions: nil, + }, + } + + block := FromLaserStream(lsBlock, nil) + + assert.Equal(t, uint64(^uint64(0)), block.NumRewardPartitions) +} diff --git a/pkg/block/retry.go b/pkg/block/retry.go new file mode 100644 index 00000000..3e95a230 --- /dev/null +++ b/pkg/block/retry.go @@ -0,0 +1,37 @@ +package block + +import ( + "context" + "fmt" + "time" +) + +const ( + maxRetriesGetLeaderForSlot = 10 + baseBackoffMs = 100 +) + +type GetLeaderFunc func(ctx context.Context) (interface{}, error) + +func RetryWithExponentialBackoff(ctx context.Context, maxRetries int, fn GetLeaderFunc) (interface{}, error) { + var lastErr error + + for attempt := 0; attempt < maxRetries; attempt++ { + result, err := fn(ctx) + if err == nil { + return result, nil + } + lastErr = err + + if attempt < maxRetries-1 { + backoffDuration := time.Duration(attempt+1) * time.Duration(baseBackoffMs) * time.Millisecond + select { + case <-time.After(backoffDuration): + case <-ctx.Done(): + return nil, ctx.Err() + } + } + } + + return nil, fmt.Errorf("operation failed after %d attempts: %w", maxRetries, lastErr) +}