1
0

Updated libraries

This commit is contained in:
konrad
2019-05-07 21:42:24 +02:00
parent 2b160b73c3
commit 3d7fd9ca20
313 changed files with 37947 additions and 6783 deletions

View File

@ -5,8 +5,6 @@ services:
- redis-server
go:
- 1.7.x
- 1.8.x
- 1.9.x
- 1.10.x
- 1.11.x

View File

@ -3,6 +3,8 @@ all: testdeps
go test ./... -short -race
env GOOS=linux GOARCH=386 go test ./...
go vet
go get github.com/gordonklaus/ineffassign
ineffassign .
testdeps: testdata/redis/src/redis-server
@ -13,7 +15,7 @@ bench: testdeps
testdata/redis:
mkdir -p $@
wget -qO- https://github.com/antirez/redis/archive/unstable.tar.gz | tar xvz --strip-components=1 -C $@
wget -qO- https://github.com/antirez/redis/archive/5.0.tar.gz | tar xvz --strip-components=1 -C $@
testdata/redis/src/redis-server: testdata/redis
sed -i.bak 's/libjemalloc.a/libjemalloc.a -lrt/g' $</src/Makefile

View File

@ -3,7 +3,6 @@ package redis
import (
"context"
"crypto/tls"
"errors"
"fmt"
"math"
"math/rand"
@ -18,7 +17,6 @@ import (
"github.com/go-redis/redis/internal/hashtag"
"github.com/go-redis/redis/internal/pool"
"github.com/go-redis/redis/internal/proto"
"github.com/go-redis/redis/internal/singleflight"
)
var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes")
@ -50,6 +48,9 @@ type ClusterOptions struct {
// and Cluster.ReloadState to manually trigger state reloading.
ClusterSlots func() ([]ClusterSlot, error)
// Optional hook that is called when a new node is created.
OnNewNode func(*Client)
// Following options are copied from Options struct.
OnConnect func(*Conn) error
@ -166,6 +167,10 @@ func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
go node.updateLatency()
}
if clOpt.OnNewNode != nil {
clOpt.OnNewNode(node.Client)
}
return &node
}
@ -237,8 +242,6 @@ type clusterNodes struct {
clusterAddrs []string
closed bool
nodeCreateGroup singleflight.Group
_generation uint32 // atomic
}
@ -341,11 +344,6 @@ func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) {
return node, nil
}
v, err := c.nodeCreateGroup.Do(addr, func() (interface{}, error) {
node := newClusterNode(c.opt, addr)
return node, nil
})
c.mu.Lock()
defer c.mu.Unlock()
@ -355,15 +353,13 @@ func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) {
node, ok := c.allNodes[addr]
if ok {
_ = v.(*clusterNode).Close()
return node, err
}
node = v.(*clusterNode)
node = newClusterNode(c.opt, addr)
c.allAddrs = appendIfNotExists(c.allAddrs, addr)
if err == nil {
c.clusterAddrs = append(c.clusterAddrs, addr)
}
c.clusterAddrs = append(c.clusterAddrs, addr)
c.allNodes[addr] = node
return node, err
@ -533,10 +529,12 @@ func (c *clusterState) slotSlaveNode(slot int) (*clusterNode, error) {
n := rand.Intn(len(nodes)-1) + 1
slave = nodes[n]
if !slave.Loading() {
break
return slave, nil
}
}
return slave, nil
// All slaves are loading - use master.
return nodes[0], nil
}
}
@ -580,23 +578,12 @@ func (c *clusterState) slotNodes(slot int) []*clusterNode {
return nil
}
func (c *clusterState) IsConsistent() bool {
if c.nodes.opt.ClusterSlots != nil {
return true
}
return len(c.Masters) <= len(c.Slaves)
}
//------------------------------------------------------------------------------
type clusterStateHolder struct {
load func() (*clusterState, error)
state atomic.Value
firstErrMu sync.RWMutex
firstErr error
state atomic.Value
reloading uint32 // atomic
}
@ -607,24 +594,8 @@ func newClusterStateHolder(fn func() (*clusterState, error)) *clusterStateHolder
}
func (c *clusterStateHolder) Reload() (*clusterState, error) {
state, err := c.reload()
if err != nil {
return nil, err
}
if !state.IsConsistent() {
time.AfterFunc(time.Second, c.LazyReload)
}
return state, nil
}
func (c *clusterStateHolder) reload() (*clusterState, error) {
state, err := c.load()
if err != nil {
c.firstErrMu.Lock()
if c.firstErr == nil {
c.firstErr = err
}
c.firstErrMu.Unlock()
return nil, err
}
c.state.Store(state)
@ -638,16 +609,11 @@ func (c *clusterStateHolder) LazyReload() {
go func() {
defer atomic.StoreUint32(&c.reloading, 0)
for {
state, err := c.reload()
if err != nil {
return
}
time.Sleep(100 * time.Millisecond)
if state.IsConsistent() {
return
}
_, err := c.Reload()
if err != nil {
return
}
time.Sleep(100 * time.Millisecond)
}()
}
@ -660,15 +626,7 @@ func (c *clusterStateHolder) Get() (*clusterState, error) {
}
return state, nil
}
c.firstErrMu.RLock()
err := c.firstErr
c.firstErrMu.RUnlock()
if err != nil {
return nil, err
}
return nil, errors.New("redis: cluster has no state")
return c.Reload()
}
func (c *clusterStateHolder) ReloadOrGet() (*clusterState, error) {
@ -716,10 +674,6 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
c.processTxPipeline = c.defaultProcessTxPipeline
c.init()
_, _ = c.state.Reload()
_, _ = c.cmdsInfoCache.Get()
if opt.IdleCheckFrequency > 0 {
go c.reaper(opt.IdleCheckFrequency)
}
@ -727,17 +681,17 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
return c
}
// ReloadState reloads cluster state. It calls ClusterSlots func
func (c *ClusterClient) init() {
c.cmdable.setProcessor(c.Process)
}
// ReloadState reloads cluster state. If available it calls ClusterSlots func
// to get cluster slots information.
func (c *ClusterClient) ReloadState() error {
_, err := c.state.Reload()
return err
}
func (c *ClusterClient) init() {
c.cmdable.setProcessor(c.Process)
}
func (c *ClusterClient) Context() context.Context {
if c.ctx != nil {
return c.ctx
@ -818,6 +772,11 @@ func cmdSlot(cmd Cmder, pos int) int {
}
func (c *ClusterClient) cmdSlot(cmd Cmder) int {
args := cmd.Args()
if args[0] == "cluster" && args[1] == "getkeysinslot" {
return args[2].(int)
}
cmdInfo := c.cmdInfo(cmd.Name())
return cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo))
}
@ -829,7 +788,7 @@ func (c *ClusterClient) cmdSlotAndNode(cmd Cmder) (int, *clusterNode, error) {
}
cmdInfo := c.cmdInfo(cmd.Name())
slot := cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo))
slot := c.cmdSlot(cmd)
if c.opt.ReadOnly && cmdInfo != nil && cmdInfo.ReadOnly {
if c.opt.RouteByLatency {
@ -890,15 +849,12 @@ func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error {
if err == nil {
break
}
if internal.IsRetryableError(err, true) {
if err != Nil {
c.state.LazyReload()
continue
}
moved, ask, addr := internal.IsMovedError(err)
if moved || ask {
c.state.LazyReload()
node, err = c.nodes.GetOrCreate(addr)
if err != nil {
return err
@ -906,7 +862,7 @@ func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error {
continue
}
if err == pool.ErrClosed {
if err == pool.ErrClosed || internal.IsReadOnlyError(err) {
node, err = c.slotMasterNode(slot)
if err != nil {
return err
@ -914,6 +870,10 @@ func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error {
continue
}
if internal.IsRetryableError(err, true) {
continue
}
return err
}
@ -978,16 +938,34 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error {
if err == nil {
break
}
if err != Nil {
c.state.LazyReload()
}
// If slave is loading - read from master.
// If slave is loading - pick another node.
if c.opt.ReadOnly && internal.IsLoadingError(err) {
node.MarkAsLoading()
node = nil
continue
}
var moved bool
var addr string
moved, ask, addr = internal.IsMovedError(err)
if moved || ask {
node, err = c.nodes.GetOrCreate(addr)
if err != nil {
break
}
continue
}
if err == pool.ErrClosed || internal.IsReadOnlyError(err) {
node = nil
continue
}
if internal.IsRetryableError(err, true) {
c.state.LazyReload()
// First retry the same node.
if attempt == 0 {
continue
@ -1001,24 +979,6 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error {
continue
}
var moved bool
var addr string
moved, ask, addr = internal.IsMovedError(err)
if moved || ask {
c.state.LazyReload()
node, err = c.nodes.GetOrCreate(addr)
if err != nil {
break
}
continue
}
if err == pool.ErrClosed {
node = nil
continue
}
break
}
@ -1349,14 +1309,15 @@ func (c *ClusterClient) pipelineProcessCmds(
}
err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error {
return c.pipelineReadCmds(rd, cmds, failedCmds)
return c.pipelineReadCmds(node, rd, cmds, failedCmds)
})
return err
}
func (c *ClusterClient) pipelineReadCmds(
rd *proto.Reader, cmds []Cmder, failedCmds *cmdsMap,
node *clusterNode, rd *proto.Reader, cmds []Cmder, failedCmds *cmdsMap,
) error {
var firstErr error
for _, cmd := range cmds {
err := cmd.readReply(rd)
if err == nil {
@ -1371,9 +1332,14 @@ func (c *ClusterClient) pipelineReadCmds(
continue
}
return err
failedCmds.mu.Lock()
failedCmds.m[node] = append(failedCmds.m[node], cmd)
failedCmds.mu.Unlock()
if firstErr == nil {
firstErr = err
}
}
return nil
return firstErr
}
func (c *ClusterClient) checkMovedErr(
@ -1561,40 +1527,46 @@ func (c *ClusterClient) txPipelineReadQueued(
return nil
}
func (c *ClusterClient) pubSub(channels []string) *PubSub {
func (c *ClusterClient) pubSub() *PubSub {
var node *clusterNode
pubsub := &PubSub{
opt: c.opt.clientOptions(),
newConn: func(channels []string) (*pool.Conn, error) {
if node == nil {
var slot int
if len(channels) > 0 {
slot = hashtag.Slot(channels[0])
} else {
slot = -1
}
masterNode, err := c.slotMasterNode(slot)
if err != nil {
return nil, err
}
node = masterNode
if node != nil {
panic("node != nil")
}
return node.Client.newConn()
slot := hashtag.Slot(channels[0])
var err error
node, err = c.slotMasterNode(slot)
if err != nil {
return nil, err
}
cn, err := node.Client.newConn()
if err != nil {
return nil, err
}
return cn, nil
},
closeConn: func(cn *pool.Conn) error {
return node.Client.connPool.CloseConn(cn)
err := node.Client.connPool.CloseConn(cn)
node = nil
return err
},
}
pubsub.init()
return pubsub
}
// Subscribe subscribes the client to the specified channels.
// Channels can be omitted to create empty subscription.
func (c *ClusterClient) Subscribe(channels ...string) *PubSub {
pubsub := c.pubSub(channels)
pubsub := c.pubSub()
if len(channels) > 0 {
_ = pubsub.Subscribe(channels...)
}
@ -1604,7 +1576,7 @@ func (c *ClusterClient) Subscribe(channels ...string) *PubSub {
// PSubscribe subscribes the client to the given patterns.
// Patterns can be omitted to create empty subscription.
func (c *ClusterClient) PSubscribe(channels ...string) *PubSub {
pubsub := c.pubSub(channels)
pubsub := c.pubSub()
if len(channels) > 0 {
_ = pubsub.PSubscribe(channels...)
}

View File

@ -1337,6 +1337,68 @@ func zSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
//------------------------------------------------------------------------------
type ZWithKeyCmd struct {
baseCmd
val ZWithKey
}
var _ Cmder = (*ZWithKeyCmd)(nil)
func NewZWithKeyCmd(args ...interface{}) *ZWithKeyCmd {
return &ZWithKeyCmd{
baseCmd: baseCmd{_args: args},
}
}
func (cmd *ZWithKeyCmd) Val() ZWithKey {
return cmd.val
}
func (cmd *ZWithKeyCmd) Result() (ZWithKey, error) {
return cmd.Val(), cmd.Err()
}
func (cmd *ZWithKeyCmd) String() string {
return cmdString(cmd, cmd.val)
}
func (cmd *ZWithKeyCmd) readReply(rd *proto.Reader) error {
var v interface{}
v, cmd.err = rd.ReadArrayReply(zWithKeyParser)
if cmd.err != nil {
return cmd.err
}
cmd.val = v.(ZWithKey)
return nil
}
// Implements proto.MultiBulkParse
func zWithKeyParser(rd *proto.Reader, n int64) (interface{}, error) {
if n != 3 {
return nil, fmt.Errorf("got %d elements, expected 3", n)
}
var z ZWithKey
var err error
z.Key, err = rd.ReadString()
if err != nil {
return nil, err
}
z.Member, err = rd.ReadString()
if err != nil {
return nil, err
}
z.Score, err = rd.ReadFloatReply()
if err != nil {
return nil, err
}
return z, nil
}
//------------------------------------------------------------------------------
type ScanCmd struct {
baseCmd

View File

@ -166,6 +166,7 @@ type Cmdable interface {
SUnion(keys ...string) *StringSliceCmd
SUnionStore(destination string, keys ...string) *IntCmd
XAdd(a *XAddArgs) *StringCmd
XDel(stream string, ids ...string) *IntCmd
XLen(stream string) *IntCmd
XRange(stream, start, stop string) *XMessageSliceCmd
XRangeN(stream, start, stop string, count int64) *XMessageSliceCmd
@ -174,6 +175,7 @@ type Cmdable interface {
XRead(a *XReadArgs) *XStreamSliceCmd
XReadStreams(streams ...string) *XStreamSliceCmd
XGroupCreate(stream, group, start string) *StatusCmd
XGroupCreateMkStream(stream, group, start string) *StatusCmd
XGroupSetID(stream, group, start string) *StatusCmd
XGroupDestroy(stream, group string) *IntCmd
XGroupDelConsumer(stream, group, consumer string) *IntCmd
@ -185,6 +187,8 @@ type Cmdable interface {
XClaimJustID(a *XClaimArgs) *StringSliceCmd
XTrim(key string, maxLen int64) *IntCmd
XTrimApprox(key string, maxLen int64) *IntCmd
BZPopMax(timeout time.Duration, keys ...string) *ZWithKeyCmd
BZPopMin(timeout time.Duration, keys ...string) *ZWithKeyCmd
ZAdd(key string, members ...Z) *IntCmd
ZAddNX(key string, members ...Z) *IntCmd
ZAddXX(key string, members ...Z) *IntCmd
@ -228,6 +232,7 @@ type Cmdable interface {
ClientKillByFilter(keys ...string) *IntCmd
ClientList() *StringCmd
ClientPause(dur time.Duration) *BoolCmd
ClientID() *IntCmd
ConfigGet(parameter string) *SliceCmd
ConfigResetStat() *StatusCmd
ConfigSet(parameter, value string) *StatusCmd
@ -265,6 +270,7 @@ type Cmdable interface {
ClusterResetHard() *StatusCmd
ClusterInfo() *StringCmd
ClusterKeySlot(key string) *IntCmd
ClusterGetKeysInSlot(slot int, count int) *StringSliceCmd
ClusterCountFailureReports(nodeID string) *IntCmd
ClusterCountKeysInSlot(slot int) *IntCmd
ClusterDelSlots(slots ...int) *StatusCmd
@ -1337,6 +1343,16 @@ func (c *cmdable) XAdd(a *XAddArgs) *StringCmd {
return cmd
}
func (c *cmdable) XDel(stream string, ids ...string) *IntCmd {
args := []interface{}{"xdel", stream}
for _, id := range ids {
args = append(args, id)
}
cmd := NewIntCmd(args...)
c.process(cmd)
return cmd
}
func (c *cmdable) XLen(stream string) *IntCmd {
cmd := NewIntCmd("xlen", stream)
c.process(cmd)
@ -1410,6 +1426,12 @@ func (c *cmdable) XGroupCreate(stream, group, start string) *StatusCmd {
return cmd
}
func (c *cmdable) XGroupCreateMkStream(stream, group, start string) *StatusCmd {
cmd := NewStatusCmd("xgroup", "create", stream, group, start, "mkstream")
c.process(cmd)
return cmd
}
func (c *cmdable) XGroupSetID(stream, group, start string) *StatusCmd {
cmd := NewStatusCmd("xgroup", "setid", stream, group, start)
c.process(cmd)
@ -1431,9 +1453,11 @@ func (c *cmdable) XGroupDelConsumer(stream, group, consumer string) *IntCmd {
type XReadGroupArgs struct {
Group string
Consumer string
Streams []string
Count int64
Block time.Duration
// List of streams and ids.
Streams []string
Count int64
Block time.Duration
NoAck bool
}
func (c *cmdable) XReadGroup(a *XReadGroupArgs) *XStreamSliceCmd {
@ -1445,6 +1469,9 @@ func (c *cmdable) XReadGroup(a *XReadGroupArgs) *XStreamSliceCmd {
if a.Block >= 0 {
args = append(args, "block", int64(a.Block/time.Millisecond))
}
if a.NoAck {
args = append(args, "noack")
}
args = append(args, "streams")
for _, s := range a.Streams {
args = append(args, s)
@ -1550,6 +1577,12 @@ type Z struct {
Member interface{}
}
// ZWithKey represents sorted set member including the name of the key where it was popped.
type ZWithKey struct {
Z
Key string
}
// ZStore is used as an arg to ZInterStore and ZUnionStore.
type ZStore struct {
Weights []float64
@ -1557,6 +1590,34 @@ type ZStore struct {
Aggregate string
}
// Redis `BZPOPMAX key [key ...] timeout` command.
func (c *cmdable) BZPopMax(timeout time.Duration, keys ...string) *ZWithKeyCmd {
args := make([]interface{}, 1+len(keys)+1)
args[0] = "bzpopmax"
for i, key := range keys {
args[1+i] = key
}
args[len(args)-1] = formatSec(timeout)
cmd := NewZWithKeyCmd(args...)
cmd.setReadTimeout(timeout)
c.process(cmd)
return cmd
}
// Redis `BZPOPMIN key [key ...] timeout` command.
func (c *cmdable) BZPopMin(timeout time.Duration, keys ...string) *ZWithKeyCmd {
args := make([]interface{}, 1+len(keys)+1)
args[0] = "bzpopmin"
for i, key := range keys {
args[1+i] = key
}
args[len(args)-1] = formatSec(timeout)
cmd := NewZWithKeyCmd(args...)
cmd.setReadTimeout(timeout)
c.process(cmd)
return cmd
}
func (c *cmdable) zAdd(a []interface{}, n int, members ...Z) *IntCmd {
for i, m := range members {
a[n+2*i] = m.Score
@ -2010,6 +2071,24 @@ func (c *cmdable) ClientPause(dur time.Duration) *BoolCmd {
return cmd
}
func (c *cmdable) ClientID() *IntCmd {
cmd := NewIntCmd("client", "id")
c.process(cmd)
return cmd
}
func (c *cmdable) ClientUnblock(id int64) *IntCmd {
cmd := NewIntCmd("client", "unblock", id)
c.process(cmd)
return cmd
}
func (c *cmdable) ClientUnblockWithError(id int64) *IntCmd {
cmd := NewIntCmd("client", "unblock", id, "error")
c.process(cmd)
return cmd
}
// ClientSetName assigns a name to the connection.
func (c *statefulCmdable) ClientSetName(name string) *BoolCmd {
cmd := NewBoolCmd("client", "setname", name)
@ -2325,6 +2404,12 @@ func (c *cmdable) ClusterKeySlot(key string) *IntCmd {
return cmd
}
func (c *cmdable) ClusterGetKeysInSlot(slot int, count int) *StringSliceCmd {
cmd := NewStringSliceCmd("cluster", "getkeysinslot", slot, count)
c.process(cmd)
return cmd
}
func (c *cmdable) ClusterCountFailureReports(nodeID string) *IntCmd {
cmd := NewIntCmd("cluster", "count-failure-reports", nodeID)
c.process(cmd)

View File

@ -47,7 +47,8 @@ func IsBadConn(err error, allowTimeout bool) bool {
return false
}
if IsRedisError(err) {
return strings.HasPrefix(err.Error(), "READONLY ")
// #790
return IsReadOnlyError(err)
}
if allowTimeout {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
@ -82,3 +83,7 @@ func IsMovedError(err error) (moved bool, ask bool, addr string) {
func IsLoadingError(err error) bool {
return strings.HasPrefix(err.Error(), "LOADING ")
}
func IsReadOnlyError(err error) bool {
return strings.HasPrefix(err.Error(), "READONLY ")
}

View File

@ -1,64 +0,0 @@
/*
Copyright 2013 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package singleflight provides a duplicate function call suppression
// mechanism.
package singleflight
import "sync"
// call is an in-flight or completed Do call
type call struct {
wg sync.WaitGroup
val interface{}
err error
}
// Group represents a class of work and forms a namespace in which
// units of work can be executed with duplicate suppression.
type Group struct {
mu sync.Mutex // protects m
m map[string]*call // lazily initialized
}
// Do executes and returns the results of the given function, making
// sure that only one execution is in-flight for a given key at a
// time. If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
g.mu.Unlock()
c.wg.Wait()
return c.val, c.err
}
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
c.val, c.err = fn()
c.wg.Done()
g.mu.Lock()
delete(g.m, key)
g.mu.Unlock()
return c.val, c.err
}

View File

@ -14,6 +14,17 @@ import (
"github.com/go-redis/redis/internal/pool"
)
// Limiter is the interface of a rate limiter or a circuit breaker.
type Limiter interface {
// Allow returns a nil if operation is allowed or an error otherwise.
// If operation is allowed client must report the result of operation
// whether is a success or a failure.
Allow() error
// ReportResult reports the result of previously allowed operation.
// nil indicates a success, non-nil error indicates a failure.
ReportResult(result error)
}
type Options struct {
// The network type, either tcp or unix.
// Default is tcp.
@ -90,6 +101,9 @@ func (opt *Options) init() {
if opt.Network == "" {
opt.Network = "tcp"
}
if opt.Addr == "" {
opt.Addr = "localhost:6379"
}
if opt.Dialer == nil {
opt.Dialer = func() (net.Conn, error) {
netDialer := &net.Dialer{

View File

@ -10,6 +10,7 @@ type pipelineExecer func([]Cmder) error
type Pipeliner interface {
StatefulCmdable
Do(args ...interface{}) *Cmd
Process(cmd Cmder) error
Close() error
Discard() error
@ -31,6 +32,12 @@ type Pipeline struct {
closed bool
}
func (c *Pipeline) Do(args ...interface{}) *Cmd {
cmd := NewCmd(args...)
_ = c.Process(cmd)
return cmd
}
// Process queues the cmd for later execution.
func (c *Pipeline) Process(cmd Cmder) error {
c.mu.Lock()

View File

@ -26,6 +26,7 @@ func SetLogger(logger *log.Logger) {
type baseClient struct {
opt *Options
connPool pool.Pooler
limiter Limiter
process func(Cmder) error
processPipeline func([]Cmder) error
@ -61,6 +62,24 @@ func (c *baseClient) newConn() (*pool.Conn, error) {
}
func (c *baseClient) getConn() (*pool.Conn, error) {
if c.limiter != nil {
err := c.limiter.Allow()
if err != nil {
return nil, err
}
}
cn, err := c._getConn()
if err != nil {
if c.limiter != nil {
c.limiter.ReportResult(err)
}
return nil, err
}
return cn, nil
}
func (c *baseClient) _getConn() (*pool.Conn, error) {
cn, err := c.connPool.Get()
if err != nil {
return nil, err
@ -78,6 +97,10 @@ func (c *baseClient) getConn() (*pool.Conn, error) {
}
func (c *baseClient) releaseConn(cn *pool.Conn, err error) {
if c.limiter != nil {
c.limiter.ReportResult(err)
}
if internal.IsBadConn(err, false) {
c.connPool.Remove(cn)
} else {
@ -86,6 +109,10 @@ func (c *baseClient) releaseConn(cn *pool.Conn, err error) {
}
func (c *baseClient) releaseConnStrict(cn *pool.Conn, err error) {
if c.limiter != nil {
c.limiter.ReportResult(err)
}
if err == nil || internal.IsRedisError(err) {
c.connPool.Put(cn)
} else {
@ -132,7 +159,7 @@ func (c *baseClient) initConn(cn *pool.Conn) error {
// Do creates a Cmd from the args and processes the cmd.
func (c *baseClient) Do(args ...interface{}) *Cmd {
cmd := NewCmd(args...)
c.Process(cmd)
_ = c.Process(cmd)
return cmd
}
@ -396,12 +423,12 @@ func (c *Client) WithContext(ctx context.Context) *Client {
if ctx == nil {
panic("nil context")
}
c2 := c.copy()
c2 := c.clone()
c2.ctx = ctx
return c2
}
func (c *Client) copy() *Client {
func (c *Client) clone() *Client {
cp := *c
cp.init()
return &cp
@ -412,6 +439,11 @@ func (c *Client) Options() *Options {
return c.opt
}
func (c *Client) SetLimiter(l Limiter) *Client {
c.limiter = l
return c
}
type PoolStats pool.Stats
// PoolStats returns connection pool stats.
@ -460,6 +492,30 @@ func (c *Client) pubSub() *PubSub {
// Subscribe subscribes the client to the specified channels.
// Channels can be omitted to create empty subscription.
// Note that this method does not wait on a response from Redis, so the
// subscription may not be active immediately. To force the connection to wait,
// you may call the Receive() method on the returned *PubSub like so:
//
// sub := client.Subscribe(queryResp)
// iface, err := sub.Receive()
// if err != nil {
// // handle error
// }
//
// // Should be *Subscription, but others are possible if other actions have been
// // taken on sub since it was created.
// switch iface.(type) {
// case *Subscription:
// // subscribe succeeded
// case *Message:
// // received first message
// case *Pong:
// // pong received
// default:
// // handle error
// }
//
// ch := sub.Channel()
func (c *Client) Subscribe(channels ...string) *PubSub {
pubsub := c.pubSub()
if len(channels) > 0 {

View File

@ -319,12 +319,12 @@ func (c *ringShards) Close() error {
//------------------------------------------------------------------------------
// Ring is a Redis client that uses constistent hashing to distribute
// Ring is a Redis client that uses consistent hashing to distribute
// keys across multiple Redis servers (shards). It's safe for
// concurrent use by multiple goroutines.
//
// Ring monitors the state of each shard and removes dead shards from
// the ring. When shard comes online it is added back to the ring. This
// the ring. When a shard comes online it is added back to the ring. This
// gives you maximum availability and partition tolerance, but no
// consistency between different shards or even clients. Each client
// uses shards that are available to the client and does not do any

View File

@ -164,6 +164,24 @@ func (c *SentinelClient) Sentinels(name string) *SliceCmd {
return cmd
}
// Failover forces a failover as if the master was not reachable, and without
// asking for agreement to other Sentinels.
func (c *SentinelClient) Failover(name string) *StatusCmd {
cmd := NewStatusCmd("sentinel", "failover", name)
c.Process(cmd)
return cmd
}
// Reset resets all the masters with matching name. The pattern argument is a
// glob-style pattern. The reset process clears any previous state in a master
// (including a failover in progress), and removes every slave and sentinel
// already discovered and associated with the master.
func (c *SentinelClient) Reset(pattern string) *IntCmd {
cmd := NewIntCmd("sentinel", "reset", pattern)
c.Process(cmd)
return cmd
}
type sentinelFailover struct {
sentinelAddrs []string
@ -176,6 +194,7 @@ type sentinelFailover struct {
masterName string
_masterAddr string
sentinel *SentinelClient
pubsub *PubSub
}
func (c *sentinelFailover) Close() error {
@ -304,13 +323,27 @@ func (c *sentinelFailover) switchMaster(addr string) {
func (c *sentinelFailover) setSentinel(sentinel *SentinelClient) {
c.discoverSentinels(sentinel)
c.sentinel = sentinel
go c.listen(sentinel)
c.pubsub = sentinel.Subscribe("+switch-master")
go c.listen(c.pubsub)
}
func (c *sentinelFailover) closeSentinel() error {
err := c.sentinel.Close()
var firstErr error
err := c.pubsub.Close()
if err != nil && firstErr == err {
firstErr = err
}
c.pubsub = nil
err = c.sentinel.Close()
if err != nil && firstErr == err {
firstErr = err
}
c.sentinel = nil
return err
return firstErr
}
func (c *sentinelFailover) discoverSentinels(sentinel *SentinelClient) {
@ -335,10 +368,7 @@ func (c *sentinelFailover) discoverSentinels(sentinel *SentinelClient) {
}
}
func (c *sentinelFailover) listen(sentinel *SentinelClient) {
pubsub := sentinel.Subscribe("+switch-master")
defer pubsub.Close()
func (c *sentinelFailover) listen(pubsub *PubSub) {
ch := pubsub.Channel()
for {
msg, ok := <-ch

View File

@ -29,10 +29,10 @@ func (c *Client) newTx() *Tx {
return &tx
}
// Watch prepares a transcaction and marks the keys to be watched
// Watch prepares a transaction and marks the keys to be watched
// for conditional execution if there are any keys.
//
// The transaction is automatically closed when the fn exits.
// The transaction is automatically closed when fn exits.
func (c *Client) Watch(fn func(*Tx) error, keys ...string) error {
tx := c.newTx()
if len(keys) > 0 {