feat: implemented a circuit breaker wrapper around hystrix to allow

multiple fallbacks
This commit is contained in:
Ivan Belyakov 2024-03-11 12:09:50 +01:00 committed by IvanBelyakoff
parent 224103d036
commit 7d7fa07754
2 changed files with 234 additions and 0 deletions

View File

@ -0,0 +1,135 @@
package circuitbreaker
import (
"fmt"
"strings"
"github.com/afex/hystrix-go/hystrix"
)
type FallbackFunc func() ([]any, error)
type CommandResult struct {
res []any
err error
}
func (cr CommandResult) Result() []any {
return cr.res
}
func (cr CommandResult) Error() error {
return cr.err
}
type Command struct {
functors []*Functor
}
func NewCommand(functors []*Functor) *Command {
return &Command{
functors: functors,
}
}
func (cmd *Command) Add(ftor *Functor) {
cmd.functors = append(cmd.functors, ftor)
}
func (cmd *Command) IsEmpty() bool {
return len(cmd.functors) == 0
}
type Config struct {
CommandName string
Timeout int
MaxConcurrentRequests int
RequestVolumeThreshold int
SleepWindow int
ErrorPercentThreshold int
}
type CircuitBreaker struct {
config Config
}
func NewCircuitBreaker(config Config) *CircuitBreaker {
return &CircuitBreaker{
config: config,
}
}
type Functor struct {
Exec FallbackFunc
}
func NewFunctor(exec FallbackFunc) *Functor {
return &Functor{
Exec: exec,
}
}
// This a blocking function
func (eh *CircuitBreaker) Execute(cmd Command) CommandResult {
resultChan := make(chan CommandResult, 1)
var result CommandResult
for i := 0; i < len(cmd.functors); i += 2 {
f1 := cmd.functors[i]
var f2 *Functor
if i+1 < len(cmd.functors) {
f2 = cmd.functors[i+1]
}
circuitName := fmt.Sprintf("%s_%d", eh.config.CommandName, i)
if hystrix.GetCircuitSettings()[circuitName] == nil {
hystrix.ConfigureCommand(circuitName, hystrix.CommandConfig{
Timeout: eh.config.Timeout,
MaxConcurrentRequests: eh.config.MaxConcurrentRequests,
RequestVolumeThreshold: eh.config.RequestVolumeThreshold,
SleepWindow: eh.config.SleepWindow,
ErrorPercentThreshold: eh.config.ErrorPercentThreshold,
})
}
// If circuit is the same for all functions, in case of len(cmd.functors) > 2,
// main and fallback providers are different next run if first two fail,
// which causes health issues for both main and fallback and ErrorPercentThreshold
// is reached faster than it should be.
errChan := hystrix.Go(circuitName, func() error {
res, err := f1.Exec()
// Write to resultChan only if success
if err == nil {
resultChan <- CommandResult{res: res, err: err}
}
return err
}, func(err error) error {
// In case of concurrency, we should not execute the fallback
if f2 == nil || err == hystrix.ErrMaxConcurrency {
return err
}
res, err := f2.Exec()
if err == nil {
resultChan <- CommandResult{res: res, err: err}
}
return err
})
select {
case result = <-resultChan:
if result.err == nil {
return result
}
case err := <-errChan:
result = CommandResult{err: err}
// In case of max concurrency, we should delay the execution and stop iterating over fallbacks
// No error unwrapping here, so use strings.Contains
if strings.Contains(err.Error(), hystrix.ErrMaxConcurrency.Error()) {
return result
}
}
}
return result
}

View File

@ -0,0 +1,99 @@
package circuitbreaker
import (
"errors"
"testing"
"time"
"github.com/stretchr/testify/require"
)
const success = "Success"
func TestCircuitBreaker_ExecuteSuccessSingle(t *testing.T) {
cb := NewCircuitBreaker(Config{
CommandName: "SuccessSingle", // unique name to avoid conflicts with go tests `-count` option
Timeout: 1000,
MaxConcurrentRequests: 100,
RequestVolumeThreshold: 10,
SleepWindow: 10,
ErrorPercentThreshold: 10,
})
expectedResult := success
cmd := Command{
functors: []*Functor{
NewFunctor(func() ([]interface{}, error) {
return []any{expectedResult}, nil
}),
},
}
result := cb.Execute(cmd)
require.NoError(t, result.Error())
require.Equal(t, expectedResult, result.Result()[0].(string))
}
func TestCircuitBreaker_ExecuteMultipleFallbacksFail(t *testing.T) {
cb := NewCircuitBreaker(Config{
CommandName: "MultipleFail", // unique name to avoid conflicts with go tests `-count` option
Timeout: 10,
MaxConcurrentRequests: 100,
RequestVolumeThreshold: 10,
SleepWindow: 10,
ErrorPercentThreshold: 10,
})
cmd := Command{
functors: []*Functor{
NewFunctor(func() ([]interface{}, error) {
time.Sleep(100 * time.Millisecond) // will cause hystrix: timeout
return []any{success}, nil
}),
NewFunctor(func() ([]interface{}, error) {
return nil, errors.New("provider 2 failed")
}),
NewFunctor(func() ([]interface{}, error) {
return nil, errors.New("provider 3 failed")
}),
},
}
result := cb.Execute(cmd)
require.Error(t, result.Error())
}
func TestCircuitBreaker_ExecuteMultipleFallbacksFailButLastSuccessStress(t *testing.T) {
cb := NewCircuitBreaker(Config{
CommandName: "LastSuccessStress", // unique name to avoid conflicts with go tests `-count` option
Timeout: 10,
MaxConcurrentRequests: 100,
RequestVolumeThreshold: 10,
SleepWindow: 10,
ErrorPercentThreshold: 10,
})
expectedResult := success
// These are executed sequentially, but I had an issue with the test failing
// because of the open circuit
for i := 0; i < 1000; i++ {
cmd := Command{
functors: []*Functor{
NewFunctor(func() ([]interface{}, error) {
return nil, errors.New("provider 1 failed")
}),
NewFunctor(func() ([]interface{}, error) {
return nil, errors.New("provider 2 failed")
}),
NewFunctor(func() ([]interface{}, error) {
return []any{expectedResult}, nil
}),
},
}
result := cb.Execute(cmd)
require.NoError(t, result.Error())
require.Equal(t, expectedResult, result.Result()[0].(string))
}
}