Skip to content

Commit

Permalink
fix connection leak
Browse files Browse the repository at this point in the history
  • Loading branch information
ebirukov committed Jan 27, 2024
1 parent f621efb commit be7bbc6
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 14 deletions.
3 changes: 2 additions & 1 deletion pkg/activerecord/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,8 +421,9 @@ func NewConfigCacher() *DefaultConfigCacher {
// Получение конфигурации. Если есть в кеше и он еще валидный, то конфигурация берётся из кешаб
// если в кеше нет, то достаём из конфига и кешируем.
func (cc *DefaultConfigCacher) Get(ctx context.Context, path string, globs MapGlobParam, optionCreator func(ShardInstanceConfig) (OptionInterface, error)) (*Cluster, error) {
curConf := cc.container[path]
var curConf *Cluster
if cc.lock.TryLock() {
curConf = cc.container[path]
if cc.updateTime.Sub(Config().GetLastUpdateTime()) < 0 {
// Очищаем кеш если поменялся конфиг
cc.container = make(map[string]*Cluster)
Expand Down
26 changes: 13 additions & 13 deletions pkg/octopus/box.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,18 +91,16 @@ func Box(ctx context.Context, shard int, instType activerecord.ShardInstanceType
}

func CheckShardInstance(ctx context.Context, instance activerecord.ShardInstance) (activerecord.OptionInterface, error) {
octopusOpt, ok := instance.Options.(*ConnectionOptions)
if !ok {
return nil, fmt.Errorf("invalit type of options %T, want Options", instance.Options)
}

var err error
c := activerecord.ConnectionCacher().Get(instance)
if c == nil {
c, err = GetConnection(ctx, octopusOpt)
if err != nil {
return nil, fmt.Errorf("error from connectionCacher: %w", err)
c, err := activerecord.ConnectionCacher().GetOrAdd(instance, func(options interface{}) (activerecord.ConnectionInterface, error) {
octopusOpt, ok := options.(*ConnectionOptions)
if !ok {
return nil, fmt.Errorf("invalit type of options %T, want Options", options)
}

return GetConnection(ctx, octopusOpt)
})
if err != nil {
return nil, fmt.Errorf("error from connectionCacher: %w", err)
}

conn, ok := c.(*Connection)
Expand All @@ -123,10 +121,12 @@ func CheckShardInstance(ctx context.Context, instance activerecord.ShardInstance
ret := td[0]
switch string(ret.Data[0]) {
case "primary":
return NewOptions(octopusOpt.server, ModeMaster)
instance.Config.Mode = activerecord.ServerModeType(ModeMaster)
default:
return NewOptions(octopusOpt.server, ModeReplica)
instance.Config.Mode = activerecord.ServerModeType(ModeReplica)
}

return DefaultOptionCreator(instance.Config)
}

return nil, fmt.Errorf("can't parse status: %w", err)
Expand Down

0 comments on commit be7bbc6

Please sign in to comment.