-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsync.lua
More file actions
285 lines (246 loc) · 8.7 KB
/
sync.lua
File metadata and controls
285 lines (246 loc) · 8.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
--- The sync library exposes interfaces for various synchronization structures.
--
-- @module system.sync
local expect = require "expect"
local util = require "util"
local sync = {
mutex = {},
semaphore = {},
conditionVariable = {},
atomic = {},
barrier = {},
rwLock = {},
}
--#region Mutex
--- A mutex is an object that controls access to a variable across multiple threads.
-- It ensures only one thread accesses a resource at a time by blocking other
-- threads from locking the mutex until the current thread unlocks it.
-- @type mutex
--- Creates a new mutex.
-- @tparam[opt] boolean recursive Whether to make the mutex recursive
-- @treturn mutex The new mutex object
function sync.mutex.new(recursive)
expect(1, recursive, "boolean", "nil")
return setmetatable({recursive = recursive and 0}, {__name = "mutex", __index = sync.mutex})
end
--- Locks the mutex, waiting if it's currently owned by another thread.
function sync.mutex:lock()
expect(1, self, "mutex")
return util.syscall.lockmutex(self)
end
--- Unlocks the mutex. This is only valid from the thread that owns the lock.
function sync.mutex:unlock()
expect(1, self, "mutex")
return util.syscall.unlockmutex(self)
end
--- Tries to lock the thread, returning false if it could not be locked.
-- @treturn boolean Whether the mutex is now locked
function sync.mutex:tryLock()
expect(1, self, "mutex")
return util.syscall.trylockmutex(self)
end
--- Locks the mutex, waiting until it's unlocked or until the specified timeout.
-- @tparam number timeout The number of seconds to wait
-- @treturn boolean Whether the mutex is now locked
function sync.mutex:tryLockFor(timeout)
expect(1, self, "mutex")
expect(2, timeout, "number")
return util.syscall.timelockmutex(self, timeout)
end
--#endregion
--#region Semaphore
--- A semaphore controls access to a limited number of resources. A function may
-- acquire a resource from the semaphore, decrementing its available count. If
-- the count is zero, it waits until another function releases a resource, at
-- which point it will acquire it and return.
-- @type semaphore
--- Creates a new semaphore.
-- @tparam[opt=1] number init The initial count of the semaphore
-- @treturn semaphore The new semaphore object
function sync.semaphore.new(init)
expect(1, init, "number", "nil")
if init then expect.range(init, 0) end
return setmetatable({count = init or 1}, {__name = "semaphore", __index = sync.semaphore})
end
--- Acquires a resource from the semaphore, waiting until there is one available.
function sync.semaphore:acquire()
expect(1, self, "semaphore")
return util.syscall.acquiresemaphore(self)
end
--- Acquires a resource from the semaphore, waiting until there is one available or until a timeout.
-- @tparam number timeout The number of seconds to wait
-- @treturn boolean Whether the resource was acquired
function sync.semaphore:tryAcquireFor(timeout)
expect(1, self, "semaphore")
expect(2, timeout, "number")
return util.syscall.timeacquiresemaphore(self, timeout)
end
--- Releases a resource to the semaphore. This can be called from any thread.
function sync.semaphore:release()
expect(1, self, "semaphore")
return util.syscall.releasesemaphore(self)
end
--#endregion
--#region Condition variable
--- A condition variable allows threads to wait until another thread notifies
-- them to resume.
-- @type conditionVariable
--- Creates a new condition variable.
-- @treturn conditionVariable The new condition variable.
function sync.conditionVariable.new()
return setmetatable({
lock = sync.mutex.new(),
sem = sync.semaphore.new(0),
waiting = 0
}, {__name = "condition variable", __index = sync.conditionVariable})
end
--- Waits for a notification from another thread.
function sync.conditionVariable:wait()
expect(1, self, "condition variable")
self.lock:lock()
self.waiting = self.waiting + 1
self.lock:unlock()
self.sem:acquire()
self.lock:lock()
self.waiting = self.waiting - 1
self.lock:unlock()
end
--- Waits for a notification from another thread, or until a timeout occurs.
-- @tparam number timeout The number of seconds to wait
-- @treturn boolean Whether a notification occurred
function sync.conditionVariable:waitFor(timeout)
expect(1, self, "condition variable")
expect(2, timeout, "number")
self.lock:lock()
self.waiting = self.waiting + 1
self.lock:unlock()
local retval = self.sem:tryAcquireFor(timeout)
self.lock:lock()
self.waiting = self.waiting - 1
self.lock:unlock()
return retval
end
--- Notifies a single (unspecified) thread to continue.
function sync.conditionVariable:notifyOne()
expect(1, self, "condition variable")
self.sem:release()
end
--- Notifies all waiting threads to continue.
function sync.conditionVariable:notifyAll()
expect(1, self, "condition variable")
self.lock:lock()
self.sem.count = self.sem.count + self.waiting - 1
self.sem:release()
self.lock:unlock()
end
--#endregion
--#region Atomic variables
-- TODO
--#endregion
--#region Barrier
--- A barrier is a lock that waits for a specific number of threads to wait on
-- the object, at which point all threads will be released together.
-- @type barrier
--- Creates a new barrier object.
-- @tparam number count The number of threads to wait for
-- @treturn barrier A new barrier object
function sync.barrier.new(count)
expect(1, count, "number")
expect.range(count, 1)
return setmetatable({
cvar = sync.conditionVariable.new(),
lock = sync.mutex.new(),
left = count,
count = count,
cycles = 0
}, {__name = "barrier", __index = sync.barrier})
end
--- Adds one to the thread wait count, and waits until it meets the limit.
-- @treturn boolean Whether this call directly resulted in the barrier being met
function sync.barrier:wait()
expect(1, self, "barrier")
self.lock:lock()
self.left = self.left - 1
if self.left == 0 then
self.left = self.count
self.cycles = self.cycles + 1
self.lock:unlock()
self.cvar:notifyAll()
return true
else
self.lock:unlock()
self.cvar:wait()
return false
end
end
--#endregion
--#region Readers-writer lock
--- A readers-writer lock implements two related locks: a read lock, which can
-- be held by multiple threads, and a write lock, which can only be held by one
-- thread. Multiple threads can hold a read lock, but a write lock blocks both
-- read and write locks.
-- @type rwlock
--- Creates a new RW lock.
-- @treturn rwlock The new RW lock
function sync.rwLock.new()
return setmetatable({
count = 0,
readLock = sync.mutex.new(),
globalLock = sync.semaphore.new(1)
}, {__name = "rwlock", __index = sync.rwLock})
end
--- Acquires the lock for reading, waiting for the write lock to be released first.
function sync.rwLock:lockRead()
expect(1, self, "rwlock")
self.readLock:lock()
self.count = self.count + 1
if self.count == 1 then self.globalLock:acquire() end
self.readLock:unlock()
end
--- Releases the lock for reading.
function sync.rwLock:unlockRead()
expect(1, self, "rwlock")
self.readLock:lock()
self.count = self.count - 1
if self.count == 0 then self.globalLock:release() end
self.readLock:unlock()
end
--- Acquires the lock for writing, waiting for the read and write locks to be released.
function sync.rwLock:lockWrite()
expect(1, self, "rwlock")
self.globalLock:acquire()
end
--- Releases the lock for writing.
function sync.rwLock:unlockWrite()
expect(1, self, "rwlock")
self.globalLock:release()
end
--#endregion
--- Calls a function, ensuring that the mutex is locked before calling and unlocked
-- after calling, even if the function returns early or throws an error.
-- @tparam mutex mutex The mutex to lock
-- @tparam function fn The function to call
-- @tparam any ... Any parameters to pass
-- @treturn any... The return values from the function
function sync.lockGuard(mutex, fn, ...)
expect(1, mutex, "mutex")
expect(2, fn, "function")
mutex:lock()
local res = table.pack(pcall(fn, ...))
mutex:unlock()
if not res[1] then error(res[2], 0) end
return table.unpack(res, 2, res.n)
end
--- Creates a new synchronized table. A synchronized table is a table that's
-- protected by a mutex. The table can only be accessed by calling it as a
-- function, which will lock the mutex and calls the callback with the table.
-- @treturn function(callback:function(any):any) The accessor for the variable
function sync.synctab()
local tab = {}
local lock = sync.mutex.new()
return function(fn)
expect(1, fn, "function")
return sync.lockGuard(lock, fn, tab)
end
end
return sync