1
0

Fix dependencies

This commit is contained in:
kolaente
2020-04-09 23:06:57 +02:00
parent 8d1a3f4fd7
commit 0e2449482f
27 changed files with 1117 additions and 548 deletions

View File

@ -9,6 +9,9 @@ import (
)
func IsRetryableError(err error, retryTimeout bool) bool {
if err == nil {
return false
}
if err == io.EOF {
return true
}
@ -44,7 +47,8 @@ func IsBadConn(err error, allowTimeout bool) bool {
return false
}
if IsRedisError(err) {
return strings.HasPrefix(err.Error(), "READONLY ")
// #790
return IsReadOnlyError(err)
}
if allowTimeout {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
@ -79,3 +83,7 @@ func IsMovedError(err error) (moved bool, ask bool, addr string) {
func IsLoadingError(err error) bool {
return strings.HasPrefix(err.Error(), "LOADING ")
}
func IsReadOnlyError(err error) bool {
return strings.HasPrefix(err.Error(), "READONLY ")
}

View File

@ -17,14 +17,16 @@ type Conn struct {
rdLocked bool
wr *proto.Writer
InitedAt time.Time
pooled bool
usedAt atomic.Value
Inited bool
pooled bool
createdAt time.Time
usedAt atomic.Value
}
func NewConn(netConn net.Conn) *Conn {
cn := &Conn{
netConn: netConn,
netConn: netConn,
createdAt: time.Now(),
}
cn.rd = proto.NewReader(netConn)
cn.wr = proto.NewWriter(netConn)

View File

@ -38,7 +38,7 @@ type Pooler interface {
Get() (*Conn, error)
Put(*Conn)
Remove(*Conn)
Remove(*Conn, error)
Len() int
IdleLen() int
@ -289,7 +289,7 @@ func (p *ConnPool) popIdle() *Conn {
func (p *ConnPool) Put(cn *Conn) {
if !cn.pooled {
p.Remove(cn)
p.Remove(cn, nil)
return
}
@ -300,7 +300,7 @@ func (p *ConnPool) Put(cn *Conn) {
p.freeTurn()
}
func (p *ConnPool) Remove(cn *Conn) {
func (p *ConnPool) Remove(cn *Conn, reason error) {
p.removeConn(cn)
p.freeTurn()
_ = p.closeConn(cn)
@ -468,7 +468,7 @@ func (p *ConnPool) isStaleConn(cn *Conn) bool {
if p.opt.IdleTimeout > 0 && now.Sub(cn.UsedAt()) >= p.opt.IdleTimeout {
return true
}
if p.opt.MaxConnAge > 0 && now.Sub(cn.InitedAt) >= p.opt.MaxConnAge {
if p.opt.MaxConnAge > 0 && now.Sub(cn.createdAt) >= p.opt.MaxConnAge {
return true
}

View File

@ -1,53 +1,203 @@
package pool
import (
"fmt"
"sync/atomic"
)
const (
stateDefault = 0
stateInited = 1
stateClosed = 2
)
type BadConnError struct {
wrapped error
}
var _ error = (*BadConnError)(nil)
func (e BadConnError) Error() string {
return "pg: Conn is in a bad state"
}
func (e BadConnError) Unwrap() error {
return e.wrapped
}
type SingleConnPool struct {
cn *Conn
pool Pooler
level int32 // atomic
state uint32 // atomic
ch chan *Conn
_badConnError atomic.Value
}
var _ Pooler = (*SingleConnPool)(nil)
func NewSingleConnPool(cn *Conn) *SingleConnPool {
return &SingleConnPool{
cn: cn,
func NewSingleConnPool(pool Pooler) *SingleConnPool {
p, ok := pool.(*SingleConnPool)
if !ok {
p = &SingleConnPool{
pool: pool,
ch: make(chan *Conn, 1),
}
}
atomic.AddInt32(&p.level, 1)
return p
}
func (p *SingleConnPool) SetConn(cn *Conn) {
if atomic.CompareAndSwapUint32(&p.state, stateDefault, stateInited) {
p.ch <- cn
} else {
panic("not reached")
}
}
func (p *SingleConnPool) NewConn() (*Conn, error) {
panic("not implemented")
return p.pool.NewConn()
}
func (p *SingleConnPool) CloseConn(*Conn) error {
panic("not implemented")
func (p *SingleConnPool) CloseConn(cn *Conn) error {
return p.pool.CloseConn(cn)
}
func (p *SingleConnPool) Get() (*Conn, error) {
return p.cn, nil
// In worst case this races with Close which is not a very common operation.
for i := 0; i < 1000; i++ {
switch atomic.LoadUint32(&p.state) {
case stateDefault:
cn, err := p.pool.Get()
if err != nil {
return nil, err
}
if atomic.CompareAndSwapUint32(&p.state, stateDefault, stateInited) {
return cn, nil
}
p.pool.Remove(cn, ErrClosed)
case stateInited:
if err := p.badConnError(); err != nil {
return nil, err
}
cn, ok := <-p.ch
if !ok {
return nil, ErrClosed
}
return cn, nil
case stateClosed:
return nil, ErrClosed
default:
panic("not reached")
}
}
return nil, fmt.Errorf("pg: SingleConnPool.Get: infinite loop")
}
func (p *SingleConnPool) Put(cn *Conn) {
if p.cn != cn {
panic("p.cn != cn")
defer func() {
if recover() != nil {
p.freeConn(cn)
}
}()
p.ch <- cn
}
func (p *SingleConnPool) freeConn(cn *Conn) {
if err := p.badConnError(); err != nil {
p.pool.Remove(cn, err)
} else {
p.pool.Put(cn)
}
}
func (p *SingleConnPool) Remove(cn *Conn) {
if p.cn != cn {
panic("p.cn != cn")
}
func (p *SingleConnPool) Remove(cn *Conn, reason error) {
defer func() {
if recover() != nil {
p.pool.Remove(cn, ErrClosed)
}
}()
p._badConnError.Store(BadConnError{wrapped: reason})
p.ch <- cn
}
func (p *SingleConnPool) Len() int {
return 1
switch atomic.LoadUint32(&p.state) {
case stateDefault:
return 0
case stateInited:
return 1
case stateClosed:
return 0
default:
panic("not reached")
}
}
func (p *SingleConnPool) IdleLen() int {
return 0
return len(p.ch)
}
func (p *SingleConnPool) Stats() *Stats {
return nil
return &Stats{}
}
func (p *SingleConnPool) Close() error {
level := atomic.AddInt32(&p.level, -1)
if level > 0 {
return nil
}
for i := 0; i < 1000; i++ {
state := atomic.LoadUint32(&p.state)
if state == stateClosed {
return ErrClosed
}
if atomic.CompareAndSwapUint32(&p.state, state, stateClosed) {
close(p.ch)
cn, ok := <-p.ch
if ok {
p.freeConn(cn)
}
return nil
}
}
return fmt.Errorf("pg: SingleConnPool.Close: infinite loop")
}
func (p *SingleConnPool) Reset() error {
if p.badConnError() == nil {
return nil
}
select {
case cn, ok := <-p.ch:
if !ok {
return ErrClosed
}
p.pool.Remove(cn, ErrClosed)
p._badConnError.Store(BadConnError{wrapped: nil})
default:
return fmt.Errorf("pg: SingleConnPool does not have a Conn")
}
if !atomic.CompareAndSwapUint32(&p.state, stateInited, stateDefault) {
state := atomic.LoadUint32(&p.state)
return fmt.Errorf("pg: invalid SingleConnPool state: %d", state)
}
return nil
}
func (p *SingleConnPool) badConnError() error {
if v := p._badConnError.Load(); v != nil {
err := v.(BadConnError)
if err.wrapped != nil {
return err
}
}
return nil
}

View File

@ -55,13 +55,13 @@ func (p *StickyConnPool) putUpstream() {
func (p *StickyConnPool) Put(cn *Conn) {}
func (p *StickyConnPool) removeUpstream() {
p.pool.Remove(p.cn)
func (p *StickyConnPool) removeUpstream(reason error) {
p.pool.Remove(p.cn, reason)
p.cn = nil
}
func (p *StickyConnPool) Remove(cn *Conn) {
p.removeUpstream()
func (p *StickyConnPool) Remove(cn *Conn, reason error) {
p.removeUpstream(reason)
}
func (p *StickyConnPool) Len() int {
@ -101,7 +101,7 @@ func (p *StickyConnPool) Close() error {
if p.reusable {
p.putUpstream()
} else {
p.removeUpstream()
p.removeUpstream(ErrClosed)
}
}

View File

@ -1,64 +0,0 @@
/*
Copyright 2013 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package singleflight provides a duplicate function call suppression
// mechanism.
package singleflight
import "sync"
// call is an in-flight or completed Do call
type call struct {
wg sync.WaitGroup
val interface{}
err error
}
// Group represents a class of work and forms a namespace in which
// units of work can be executed with duplicate suppression.
type Group struct {
mu sync.Mutex // protects m
m map[string]*call // lazily initialized
}
// Do executes and returns the results of the given function, making
// sure that only one execution is in-flight for a given key at a
// time. If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
g.mu.Unlock()
c.wg.Wait()
return c.val, c.err
}
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
c.val, c.err = fn()
c.wg.Done()
g.mu.Lock()
delete(g.m, key)
g.mu.Unlock()
return c.val, c.err
}

View File

@ -27,3 +27,13 @@ func isLower(s string) bool {
}
return true
}
func Unwrap(err error) error {
u, ok := err.(interface {
Unwrap() error
})
if !ok {
return nil
}
return u.Unwrap()
}