-
Notifications
You must be signed in to change notification settings - Fork 106
/
Copy pathgates.go
128 lines (105 loc) · 2.57 KB
/
gates.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package core
import (
"errors"
"math"
"sync"
)
const maxAgentsLimit uint16 = math.MaxUint16
// Gate ...
type Gate interface {
Register(count uint16)
Reset()
SetCount(uint16) error
WalkThrough() error
AwaitGateCondition() error
CancelWithError(error)
Clear()
}
type gateImpl struct {
count uint16
arrived uint16
gateCondition *sync.Cond
canceled bool
err error
}
func (g *gateImpl) Register(count uint16) {
g.gateCondition.L.Lock()
defer g.gateCondition.L.Unlock()
g.count += count
}
// SetCount sets the expected number of arrivals on the gate
func (g *gateImpl) SetCount(count uint16) error {
g.gateCondition.L.Lock()
defer g.gateCondition.L.Unlock()
// you can't set count larger than limit if limit is max uint but leaving it here for correctness in case limit changes
if count > maxAgentsLimit || count < g.arrived {
return ErrGateIntegrity
}
g.count = count
return nil
}
func (g *gateImpl) Reset() {
g.gateCondition.L.Lock()
defer g.gateCondition.L.Unlock()
if !g.canceled {
g.arrived = 0
}
}
// ErrGateIntegrity ...
var ErrGateIntegrity = errors.New("ErrGateIntegrity")
// ErrGateCanceled ...
var ErrGateCanceled = errors.New("ErrGateCanceled")
// WalkThrough walks through this gate without awaiting others.
func (g *gateImpl) WalkThrough() error {
g.gateCondition.L.Lock()
defer g.gateCondition.L.Unlock()
if g.arrived == g.count {
return ErrGateIntegrity
}
g.arrived++
if g.arrived == g.count {
g.gateCondition.Broadcast()
}
return nil
}
// AwaitGateCondition suspends thread execution until gate condition
// is met or await is canceled via Cancel method.
func (g *gateImpl) AwaitGateCondition() error {
g.gateCondition.L.Lock()
defer g.gateCondition.L.Unlock()
for g.arrived != g.count && !g.canceled {
g.gateCondition.Wait()
}
if g.canceled {
if g.err != nil {
return g.err
}
return ErrGateCanceled
}
return nil
}
// CancelWithError cancels gate condition with error and awakes suspended threads.
func (g *gateImpl) CancelWithError(err error) {
g.gateCondition.L.Lock()
defer g.gateCondition.L.Unlock()
g.canceled = true
g.err = err
g.gateCondition.Broadcast()
}
// Clear gate state
func (g *gateImpl) Clear() {
g.gateCondition.L.Lock()
defer g.gateCondition.L.Unlock()
g.canceled = false
g.arrived = 0
g.err = nil
}
// NewGate returns new gate instance.
func NewGate(count uint16) Gate {
return &gateImpl{
count: count,
gateCondition: sync.NewCond(&sync.Mutex{}),
}
}