Skip to content

Commit 364deec

Browse files
committed
api: replaced Future.done with a sync.Cond
This commit reduces allocations. Future.done allocation replaced with - Future.cond (sync.Cond) - Future.finished (bool) Other code use `Future.finished` instead `Future.done == nil` check. Added Future.finish() marks Future as done. Future.WaitChan() now creates channel on demand. Closes #496
1 parent 994b372 commit 364deec

File tree

4 files changed

+70
-29
lines changed

4 files changed

+70
-29
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
1414
* Added `box.MustNew` wrapper for `box.New` without an error (#448).
1515
* Added missing IPROTO feature flags to greeting negotiation
1616
(iproto.IPROTO_FEATURE_IS_SYNC, iproto.IPROTO_FEATURE_INSERT_ARROW) (#466).
17+
* Added Future.cond (sync.Cond) and Future.finished bool. Added Future.finish() marks Future as done (#496).
1718

1819
### Changed
1920

@@ -28,6 +29,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
2829
* `LogAppendPushFailed` replaced with `LogBoxSessionPushUnsupported` (#480).
2930
* Removed deprecated `Connection` methods, related interfaces and tests are updated (#479).
3031
* Replaced the use of optional types in crud with go-option library (#492).
32+
* Future.done replaced with Future.cond (sync.Cond) + Future.finished bool (#496).
3133

3234
### Fixed
3335

MIGRATION.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ TODO
1515
* Removed `box.session.push()` support: Future.AppendPush() and Future.GetIterator()
1616
methods, ResponseIterator and TimeoutResponseIterator types.
1717
* Removed deprecated `Connection` methods, related interfaces and tests are updated.
18+
1819
*NOTE*: due to Future.GetTyped() doesn't decode SelectRequest into structure, substitute Connection.GetTyped() following the example:
1920
```Go
2021
var singleTpl = Tuple{}
@@ -30,6 +31,7 @@ TODO
3031
).GetTyped(&tpl)
3132
singleTpl := tpl[0]
3233
```
34+
* Future.done replaced with Future.cond (sync.Cond) + Future.finished bool.
3335

3436
## Migration from v1.x.x to v2.x.x
3537

connection.go

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -936,7 +936,7 @@ func (conn *Connection) newFuture(req Request) (fut *Future) {
936936
ErrRateLimited,
937937
"Request is rate limited on client",
938938
}
939-
fut.done = nil
939+
fut.finish()
940940
return
941941
}
942942
}
@@ -950,23 +950,23 @@ func (conn *Connection) newFuture(req Request) (fut *Future) {
950950
ErrConnectionClosed,
951951
"using closed connection",
952952
}
953-
fut.done = nil
953+
fut.finish()
954954
shard.rmut.Unlock()
955955
return
956956
case connDisconnected:
957957
fut.err = ClientError{
958958
ErrConnectionNotReady,
959959
"client connection is not ready",
960960
}
961-
fut.done = nil
961+
fut.finish()
962962
shard.rmut.Unlock()
963963
return
964964
case connShutdown:
965965
fut.err = ClientError{
966966
ErrConnectionShutdown,
967967
"server shutdown in progress",
968968
}
969-
fut.done = nil
969+
fut.finish()
970970
shard.rmut.Unlock()
971971
return
972972
}
@@ -995,7 +995,7 @@ func (conn *Connection) newFuture(req Request) (fut *Future) {
995995
runtime.Gosched()
996996
select {
997997
case conn.rlimit <- struct{}{}:
998-
case <-fut.done:
998+
case <-fut.WaitChan():
999999
if fut.err == nil {
10001000
panic("fut.done is closed, but err is nil")
10011001
}
@@ -1009,12 +1009,12 @@ func (conn *Connection) newFuture(req Request) (fut *Future) {
10091009
// is "done" before the response is come.
10101010
func (conn *Connection) contextWatchdog(fut *Future, ctx context.Context) {
10111011
select {
1012-
case <-fut.done:
1012+
case <-fut.WaitChan():
10131013
case <-ctx.Done():
10141014
}
10151015

10161016
select {
1017-
case <-fut.done:
1017+
case <-fut.WaitChan():
10181018
return
10191019
default:
10201020
conn.cancelFuture(fut, fmt.Errorf("context is done (request ID %d): %w",
@@ -1036,7 +1036,12 @@ func (conn *Connection) send(req Request, streamId uint64) *Future {
10361036
conn.incrementRequestCnt()
10371037

10381038
fut := conn.newFuture(req)
1039-
if fut.done == nil {
1039+
1040+
fut.mutex.Lock()
1041+
is_done := fut.finished
1042+
fut.mutex.Unlock()
1043+
1044+
if is_done {
10401045
conn.decrementRequestCnt()
10411046
return fut
10421047
}
@@ -1059,12 +1064,16 @@ func (conn *Connection) putFuture(fut *Future, req Request, streamId uint64) {
10591064
shardn := fut.requestId & (conn.opts.Concurrency - 1)
10601065
shard := &conn.shard[shardn]
10611066
shard.bufmut.Lock()
1062-
select {
1063-
case <-fut.done:
1067+
1068+
fut.mutex.Lock()
1069+
is_done := fut.finished
1070+
fut.mutex.Unlock()
1071+
1072+
if is_done {
10641073
shard.bufmut.Unlock()
10651074
return
1066-
default:
10671075
}
1076+
10681077
firstWritten := shard.buf.Len() == 0
10691078
if shard.buf.Cap() == 0 {
10701079
shard.buf.b = make([]byte, 0, 128)

future.go

Lines changed: 46 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,32 +15,39 @@ type Future struct {
1515
mutex sync.Mutex
1616
resp Response
1717
err error
18+
cond sync.Cond
19+
finished bool
1820
done chan struct{}
1921
}
2022

2123
func (fut *Future) wait() {
22-
if fut.done == nil {
23-
return
24+
fut.mutex.Lock()
25+
defer fut.mutex.Unlock()
26+
27+
for !fut.finished {
28+
fut.cond.Wait()
2429
}
25-
<-fut.done
2630
}
2731

28-
func (fut *Future) isDone() bool {
29-
if fut.done == nil {
30-
return true
31-
}
32-
select {
33-
case <-fut.done:
34-
return true
35-
default:
36-
return false
32+
func (fut *Future) finish() {
33+
fut.mutex.Lock()
34+
defer fut.mutex.Unlock()
35+
36+
fut.finished = true
37+
38+
if fut.done != nil {
39+
close(fut.done)
3740
}
41+
42+
fut.cond.Broadcast()
3843
}
3944

4045
// NewFuture creates a new empty Future for a given Request.
4146
func NewFuture(req Request) (fut *Future) {
4247
fut = &Future{}
43-
fut.done = make(chan struct{})
48+
fut.done = nil
49+
fut.finished = false
50+
fut.cond.L = &fut.mutex
4451
fut.req = req
4552
return fut
4653
}
@@ -50,7 +57,7 @@ func (fut *Future) SetResponse(header Header, body io.Reader) error {
5057
fut.mutex.Lock()
5158
defer fut.mutex.Unlock()
5259

53-
if fut.isDone() {
60+
if fut.finished {
5461
return nil
5562
}
5663

@@ -60,7 +67,14 @@ func (fut *Future) SetResponse(header Header, body io.Reader) error {
6067
}
6168
fut.resp = resp
6269

63-
close(fut.done)
70+
fut.finished = true
71+
72+
if fut.done != nil {
73+
close(fut.done)
74+
}
75+
76+
fut.cond.Broadcast()
77+
6478
return nil
6579
}
6680

@@ -69,12 +83,18 @@ func (fut *Future) SetError(err error) {
6983
fut.mutex.Lock()
7084
defer fut.mutex.Unlock()
7185

72-
if fut.isDone() {
86+
if fut.finished {
7387
return
7488
}
7589
fut.err = err
7690

77-
close(fut.done)
91+
fut.finished = true
92+
93+
if fut.done != nil {
94+
close(fut.done)
95+
}
96+
97+
fut.cond.Broadcast()
7898
}
7999

80100
// GetResponse waits for Future to be filled and returns Response and error.
@@ -122,8 +142,16 @@ func init() {
122142

123143
// WaitChan returns channel which becomes closed when response arrived or error occurred.
124144
func (fut *Future) WaitChan() <-chan struct{} {
125-
if fut.done == nil {
145+
fut.mutex.Lock()
146+
defer fut.mutex.Unlock()
147+
148+
if fut.finished {
126149
return closedChan
127150
}
151+
152+
if fut.done == nil {
153+
fut.done = make(chan struct{})
154+
}
155+
128156
return fut.done
129157
}

0 commit comments

Comments
 (0)