diff --git a/README.md b/README.md index e0d25bb..3bc4a5c 100644 --- a/README.md +++ b/README.md @@ -69,6 +69,29 @@ func main() { } ``` +## Watchdog + +If you want a long lock, you can use watchdog to refresh in background atomatically. +Set an interval for watchdog shorter than TTL, it would refresh the lock before expiration, +therefore your lock won't be released until you release it explicitly. + +```go +lock, err := locker.Obtain(ctx, "my-key", 100*time.Millisecond, &redislock.Options{ + Watchdog: redislock.NewTickWatchdog(50*time.Millisecond), +}) +``` + +## Stats + +Sometimes you need statistics for monitoring, telemetry, debugging, etc. + +```go +stats := redislock.GetStats() +``` + +If you want prometheus metrics, see [redislock-prometheus](https://github.com/WqyJh/redislock-prometheus). + + ## Documentation Full documentation is available on [GoDoc](http://godoc.org/github.com/bsm/redislock) diff --git a/redislock.go b/redislock.go index 67561f6..50da9db 100644 --- a/redislock.go +++ b/redislock.go @@ -69,7 +69,11 @@ func (c *Client) Obtain(ctx context.Context, key string, ttl time.Duration, opt // If any of requested key are already locked, no additional keys are // locked and ErrNotObtained is returned. // May return ErrNotObtained if not successful. -func (c *Client) ObtainMulti(ctx context.Context, keys []string, ttl time.Duration, opt *Options) (*Lock, error) { +func (c *Client) ObtainMulti(ctx context.Context, keys []string, ttl time.Duration, opt *Options) (_ *Lock, err error) { + defer func() { + recordStatus(&stats.Obtain, err) + }() + token := opt.getToken() // Create a random token if token == "" { @@ -83,20 +87,18 @@ func (c *Client) ObtainMulti(ctx context.Context, keys []string, ttl time.Durati ttlVal := strconv.FormatInt(int64(ttl/time.Millisecond), 10) retry := opt.getRetryStrategy() - // make sure we don't retry forever - if _, ok := ctx.Deadline(); !ok { - var cancel context.CancelFunc - ctx, cancel = context.WithDeadline(ctx, time.Now().Add(ttl)) - defer cancel() - } - var ticker *time.Ticker for { ok, err := c.obtain(ctx, keys, value, len(token), ttlVal) if err != nil { return nil, err } else if ok { - return &Lock{Client: c, keys: keys, value: value, tokenLen: len(token)}, nil + lock := &Lock{Client: c, keys: keys, value: value, tokenLen: len(token)} + watchdog := opt.getWatchdog() + if watchdog != nil { + go watchdog.Start(ctx, lock, ttl) + } + return lock, nil } backoff := retry.NextBackoff() @@ -152,6 +154,7 @@ type Lock struct { keys []string value string tokenLen int + opt *Options } // Obtain is a short-cut for New(...).Obtain(...). @@ -206,12 +209,17 @@ func (l *Lock) TTL(ctx context.Context) (time.Duration, error) { // Refresh extends the lock with a new TTL. // May return ErrNotObtained if refresh is unsuccessful. -func (l *Lock) Refresh(ctx context.Context, ttl time.Duration, opt *Options) error { +func (l *Lock) Refresh(ctx context.Context, ttl time.Duration, opt *Options) (err error) { + defer func() { + recordStatus(&stats.Refresh, err) + }() + if l == nil { return ErrNotObtained } + ttlVal := strconv.FormatInt(int64(ttl/time.Millisecond), 10) - _, err := luaRefresh.Run(ctx, l.client, l.keys, l.value, ttlVal).Result() + _, err = luaRefresh.Run(ctx, l.client, l.keys, l.value, ttlVal).Result() if err != nil { if errors.Is(err, redis.Nil) { return ErrNotObtained @@ -223,11 +231,21 @@ func (l *Lock) Refresh(ctx context.Context, ttl time.Duration, opt *Options) err // Release manually releases the lock. // May return ErrLockNotHeld. -func (l *Lock) Release(ctx context.Context) error { +func (l *Lock) Release(ctx context.Context) (err error) { + defer func() { + recordStatus(&stats.Release, err) + }() + if l == nil { return ErrLockNotHeld } - _, err := luaRelease.Run(ctx, l.client, l.keys, l.value).Result() + + dog := l.opt.getWatchdog() + if dog != nil { + dog.Stop() + } + + _, err = luaRelease.Run(ctx, l.client, l.keys, l.value).Result() if err != nil { if errors.Is(err, redis.Nil) { return ErrLockNotHeld @@ -245,12 +263,15 @@ type Options struct { // Default: do not retry RetryStrategy RetryStrategy - // Metadata string. + // Metadata string is appended to the lock token. Metadata string // Token is a unique value that is used to identify the lock. By default, a random tokens are generated. Use this // option to provide a custom token instead. Token string + + // Watchdog allows to refresh atomatically. + Watchdog Watchdog } func (o *Options) getMetadata() string { @@ -267,6 +288,13 @@ func (o *Options) getToken() string { return "" } +func (o *Options) getWatchdog() Watchdog { + if o != nil && o.Watchdog != nil { + return o.Watchdog + } + return nil +} + func (o *Options) getRetryStrategy() RetryStrategy { if o != nil && o.RetryStrategy != nil { return o.RetryStrategy @@ -345,3 +373,80 @@ func (r *exponentialBackoff) NextBackoff() time.Duration { return d } } + +func isCanceled(err error) bool { + return errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) +} + +func recordStatus(s *Status, err error) { + if err == nil { + atomic.AddInt64(&s.Success, 1) + } + if err == ErrNotObtained { + atomic.AddInt64(&s.Failed, 1) + } else if isCanceled(err) { + atomic.AddInt64(&s.Cancel, 1) + } else { + atomic.AddInt64(&s.Error, 1) + } +} + +// -------------------------------------------------------------------- + +// Watchdog allows to refresh atomatically. +type Watchdog interface { + // Start starts the watchdog. + Start(ctx context.Context, lock *Lock, ttl time.Duration) + // Stop stops and waits the watchdog. + Stop() +} + +// TickWatchdog refreshes the lock at regular intervals. +type TickWatchdog struct { + ctx context.Context + cancel context.CancelFunc + interval time.Duration + ch chan struct{} +} + +// NewTickWatchdog creates a new watchdog that refreshes the lock at regular intervals. +func NewTickWatchdog(interval time.Duration) *TickWatchdog { + return &TickWatchdog{interval: interval, ch: make(chan struct{})} +} + +// Start starts the watchdog. +func (w *TickWatchdog) Start(ctx context.Context, lock *Lock, ttl time.Duration) { + defer close(w.ch) + atomic.AddInt64(&stats.Watchdog, 1) + + w.ctx, w.cancel = context.WithCancel(ctx) + + ticker := time.NewTicker(w.interval) + defer ticker.Stop() + + for { + select { + case <-w.ctx.Done(): + atomic.AddInt64(&stats.WatchdogDone, 1) + return + case <-ticker.C: + atomic.AddInt64(&stats.WatchdogTick, 1) + + err := lock.Refresh(w.ctx, ttl, nil) + if err != nil { + if err == ErrNotObtained { + return + } + // continue on other errors + } + } + } +} + +// Stop stops and waits the watchdog. +func (w *TickWatchdog) Stop() { + if w.cancel != nil { + w.cancel() + } + <-w.ch +} diff --git a/stats.go b/stats.go new file mode 100644 index 0000000..113f863 --- /dev/null +++ b/stats.go @@ -0,0 +1,52 @@ +package redislock + +import "sync/atomic" + +type Status struct { + Success int64 + Failed int64 + Error int64 + Cancel int64 +} + +type Stats struct { + Obtain Status + Release Status + Refresh Status + Backoff int64 + Watchdog int64 + WatchdogDone int64 + WatchdogTick int64 +} + +var ( + stats Stats +) + +func GetStats() Stats { + s := Stats{ + Obtain: Status{ + Success: atomic.LoadInt64(&stats.Obtain.Success), + Failed: atomic.LoadInt64(&stats.Obtain.Failed), + Error: atomic.LoadInt64(&stats.Obtain.Error), + Cancel: atomic.LoadInt64(&stats.Obtain.Cancel), + }, + Release: Status{ + Success: atomic.LoadInt64(&stats.Release.Success), + Failed: atomic.LoadInt64(&stats.Release.Failed), + Error: atomic.LoadInt64(&stats.Release.Error), + Cancel: atomic.LoadInt64(&stats.Release.Cancel), + }, + Refresh: Status{ + Success: atomic.LoadInt64(&stats.Refresh.Success), + Failed: atomic.LoadInt64(&stats.Refresh.Failed), + Error: atomic.LoadInt64(&stats.Refresh.Error), + Cancel: atomic.LoadInt64(&stats.Refresh.Cancel), + }, + Backoff: atomic.LoadInt64(&stats.Backoff), + Watchdog: atomic.LoadInt64(&stats.Watchdog), + WatchdogDone: atomic.LoadInt64(&stats.WatchdogDone), + WatchdogTick: atomic.LoadInt64(&stats.WatchdogTick), + } + return s +}