Skip to content

Commit 9d0c6f8

Browse files
committed
[CI] heart beating ongoing chunk
1 parent 65d77e8 commit 9d0c6f8

File tree

7 files changed

+318
-75
lines changed

7 files changed

+318
-75
lines changed

redis/heartbeat.lua

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,39 @@ local zset_key = KEYS[1]
22
local processed_key = KEYS[2]
33
local owners_key = KEYS[3]
44
local worker_queue_key = KEYS[4]
5+
local heartbeats_key = KEYS[5]
6+
local test_group_timeout_key = KEYS[6]
57

6-
local current_time = ARGV[1]
8+
local current_time = tonumber(ARGV[1])
79
local test = ARGV[2]
10+
local default_timeout = tonumber(ARGV[3]) or 0
811

912
-- already processed, we do not need to bump the timestamp
1013
if redis.call('sismember', processed_key, test) == 1 then
11-
return false
14+
return nil
1215
end
1316

1417
-- we're still the owner of the test, we can bump the timestamp
1518
if redis.call('hget', owners_key, test) == worker_queue_key then
16-
return redis.call('zadd', zset_key, current_time, test)
19+
-- Record last heartbeat time in a separate hash for "recent activity" tracking
20+
redis.call('hset', heartbeats_key, test, current_time)
21+
22+
-- Get the dynamic timeout for this test (if any) or use default
23+
local dynamic_timeout = redis.call('hget', test_group_timeout_key, test)
24+
local timeout_to_use
25+
if dynamic_timeout and dynamic_timeout ~= "" then
26+
timeout_to_use = tonumber(dynamic_timeout)
27+
else
28+
timeout_to_use = default_timeout
29+
end
30+
31+
local new_deadline = current_time + timeout_to_use
32+
33+
-- Extend the deadline by setting score to current_time + timeout
34+
redis.call('zadd', zset_key, new_deadline, test)
35+
36+
-- Return the new deadline and timeout used for logging
37+
return { new_deadline, timeout_to_use }
1738
end
39+
40+
return nil

redis/reserve_lost.lua

Lines changed: 42 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@ local processed_key = KEYS[2]
33
local worker_queue_key = KEYS[3]
44
local owners_key = KEYS[4]
55
local test_group_timeout_key = KEYS[5]
6+
local heartbeats_key = KEYS[6]
67

78
local current_time = tonumber(ARGV[1])
89
local timeout = tonumber(ARGV[2])
910
local use_dynamic_deadline = ARGV[3] == "true"
1011
local default_timeout = tonumber(ARGV[4]) or 0
12+
local heartbeat_grace_period = tonumber(ARGV[5]) or 30
1113

1214
local lost_tests
1315
if use_dynamic_deadline then
@@ -18,20 +20,50 @@ end
1820

1921
for _, test in ipairs(lost_tests) do
2022
if redis.call('sismember', processed_key, test) == 0 then
21-
if use_dynamic_deadline then
22-
local dynamic_timeout = redis.call('hget', test_group_timeout_key, test)
23-
if not dynamic_timeout or dynamic_timeout == "" then
24-
dynamic_timeout = default_timeout
23+
-- Check if the owner is still actively heartbeating
24+
local last_heartbeat = redis.call('hget', heartbeats_key, test)
25+
if last_heartbeat and last_heartbeat ~= "" then
26+
local heartbeat_age = current_time - tonumber(last_heartbeat)
27+
-- If heartbeated recently (within grace period), skip this test
28+
-- The owner is still actively working on it
29+
if heartbeat_age < heartbeat_grace_period then
30+
-- Skip this test, try the next one
31+
-- Don't claim it since the worker is still alive
2532
else
26-
dynamic_timeout = tonumber(dynamic_timeout)
33+
-- Heartbeat is stale, safe to claim
34+
if use_dynamic_deadline then
35+
local dynamic_timeout = redis.call('hget', test_group_timeout_key, test)
36+
if not dynamic_timeout or dynamic_timeout == "" then
37+
dynamic_timeout = default_timeout
38+
else
39+
dynamic_timeout = tonumber(dynamic_timeout)
40+
end
41+
redis.call('zadd', zset_key, current_time + dynamic_timeout, test)
42+
else
43+
redis.call('zadd', zset_key, current_time + timeout, test)
44+
end
45+
redis.call('lpush', worker_queue_key, test)
46+
redis.call('hset', owners_key, test, worker_queue_key) -- Take ownership
47+
redis.call('hdel', heartbeats_key, test) -- Clear stale heartbeat
48+
return test
2749
end
28-
redis.call('zadd', zset_key, current_time + dynamic_timeout, test)
2950
else
30-
redis.call('zadd', zset_key, current_time + timeout, test)
51+
-- No heartbeat record, proceed with claiming (legacy behavior or never heartbeated)
52+
if use_dynamic_deadline then
53+
local dynamic_timeout = redis.call('hget', test_group_timeout_key, test)
54+
if not dynamic_timeout or dynamic_timeout == "" then
55+
dynamic_timeout = default_timeout
56+
else
57+
dynamic_timeout = tonumber(dynamic_timeout)
58+
end
59+
redis.call('zadd', zset_key, current_time + dynamic_timeout, test)
60+
else
61+
redis.call('zadd', zset_key, current_time + timeout, test)
62+
end
63+
redis.call('lpush', worker_queue_key, test)
64+
redis.call('hset', owners_key, test, worker_queue_key) -- Take ownership
65+
return test
3166
end
32-
redis.call('lpush', worker_queue_key, test)
33-
redis.call('hset', owners_key, test, worker_queue_key) -- Take ownership
34-
return test
3567
end
3668
end
3769

ruby/lib/ci/queue/configuration.rb

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ class Configuration
1313
attr_accessor :branch
1414
attr_accessor :timing_redis_url
1515
attr_accessor :write_duration_averages
16+
attr_accessor :heartbeat_grace_period, :heartbeat_interval
1617
attr_reader :circuit_breakers
1718
attr_writer :seed, :build_id
1819
attr_writer :queue_init_timeout, :report_timeout, :inactive_workers_timeout
@@ -61,7 +62,9 @@ def initialize(
6162
strategy: :random, timing_file: nil, timing_fallback_duration: 100.0, export_timing_file: nil,
6263
suite_max_duration: 120_000, suite_buffer_percent: 10,
6364
branch: nil,
64-
timing_redis_url: nil
65+
timing_redis_url: nil,
66+
heartbeat_grace_period: 30,
67+
heartbeat_interval: 10
6568
)
6669
@build_id = build_id
6770
@circuit_breakers = [CircuitBreaker::Disabled]
@@ -96,6 +99,8 @@ def initialize(
9699
@branch = branch
97100
@timing_redis_url = timing_redis_url
98101
@write_duration_averages = false
102+
@heartbeat_grace_period = heartbeat_grace_period
103+
@heartbeat_interval = heartbeat_interval
99104
end
100105

101106
def queue_init_timeout

ruby/lib/ci/queue/redis/worker.rb

Lines changed: 84 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,9 @@ def poll
8787
executable = resolve_executable(id)
8888

8989
if executable
90-
yield executable
90+
with_heartbeat(id) do
91+
yield executable
92+
end
9193
else
9294
warn("Warning: Could not resolve executable for ID #{id.inspect}. Acknowledging to remove from queue.")
9395
acknowledge(id)
@@ -209,10 +211,88 @@ def release!
209211
nil
210212
end
211213

214+
# Send a heartbeat for the currently reserved test to indicate the worker is still active.
215+
# This extends the deadline and prevents other workers from stealing the test.
216+
# Returns true if heartbeat was successful, false if test was already processed or
217+
# we're no longer the owner.
218+
def heartbeat(test_or_id = nil)
219+
test_key = if test_or_id
220+
test_or_id.respond_to?(:id) ? test_or_id.id : test_or_id
221+
else
222+
@reserved_test
223+
end
224+
return false unless test_key
225+
226+
current_time = CI::Queue.time_now.to_f
227+
result = eval_script(
228+
:heartbeat,
229+
keys: [
230+
key('running'),
231+
key('processed'),
232+
key('owners'),
233+
key('worker', worker_id, 'queue'),
234+
key('heartbeats'),
235+
key('test-group-timeout')
236+
],
237+
argv: [current_time, test_key, config.timeout]
238+
)
239+
240+
if result.is_a?(Array) && result.length == 2
241+
new_deadline, timeout_used = result
242+
current_time_readable = Time.at(current_time).strftime('%Y-%m-%d %H:%M:%S')
243+
deadline_readable = Time.at(new_deadline).strftime('%Y-%m-%d %H:%M:%S')
244+
warn "[heartbeat] test=#{test_key} current_time=#{current_time_readable} extended_deadline=#{deadline_readable} timeout=#{timeout_used}s"
245+
true
246+
else
247+
false
248+
end
249+
rescue *CONNECTION_ERRORS
250+
false
251+
end
252+
212253
private
213254

214255
attr_reader :index
215256

257+
# Runs a block while sending periodic heartbeats in a background thread.
258+
# This prevents other workers from stealing the test while it's being executed.
259+
def with_heartbeat(test_id)
260+
return yield unless config.heartbeat_interval&.positive?
261+
262+
# Pre-initialize Redis connection and script in current thread context
263+
# This ensures background threads use the same initialized connection
264+
ensure_connection_and_script(:heartbeat)
265+
puts "[heartbeat] Will send heartbeat every #{config.heartbeat_interval} seconds while running test #{test_id}."
266+
267+
stop_heartbeat = false
268+
heartbeat_thread = Thread.new do
269+
until stop_heartbeat
270+
sleep(config.heartbeat_interval)
271+
break if stop_heartbeat
272+
273+
begin
274+
heartbeat(test_id)
275+
rescue StandardError => e
276+
warn("[heartbeat] Failed to send heartbeat for #{test_id}: #{e.message}")
277+
end
278+
end
279+
end
280+
281+
yield
282+
ensure
283+
stop_heartbeat = true
284+
heartbeat_thread&.kill
285+
heartbeat_thread&.join(1) # Wait up to 1 second for thread to finish
286+
end
287+
288+
def ensure_connection_and_script(script)
289+
# Pre-initialize Redis connection and script in current thread context
290+
# This ensures background threads use the same initialized connection
291+
load_script(script)
292+
# Ping Redis to ensure connection is established
293+
redis.ping
294+
end
295+
216296
def worker_id
217297
config.worker_id
218298
end
@@ -295,9 +375,10 @@ def try_to_reserve_lost_test
295375
key('completed'),
296376
key('worker', worker_id, 'queue'),
297377
key('owners'),
298-
key('test-group-timeout')
378+
key('test-group-timeout'),
379+
key('heartbeats')
299380
],
300-
argv: [current_time, timeout, 'true', config.timeout]
381+
argv: [current_time, timeout, 'true', config.timeout, config.heartbeat_grace_period]
301382
)
302383

303384
if lost_test

ruby/test/ci/queue/.DS_Store

8 KB
Binary file not shown.

ruby/test/ci/queue/redis/dynamic_timeout_test.rb

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,17 +194,19 @@ def test_reserve_lost_test_passes_dynamic_deadline_flag
194194
'build:42:completed',
195195
'build:42:worker:1:queue',
196196
'build:42:owners',
197-
'build:42:test-group-timeout' # 5th key for dynamic deadline
197+
'build:42:test-group-timeout', # 5th key for dynamic deadline
198+
'build:42:heartbeats' # 6th key for heartbeat tracking
198199
]
199200

200201
@worker.stub(:eval_script, proc { |script, keys:, argv:|
201202
assert_equal :reserve_lost, script
202203
assert_equal expected_keys, keys
203-
assert_equal 4, argv.length
204+
assert_equal 5, argv.length
204205
assert_instance_of Float, argv[0]
205206
assert_equal @config.timeout, argv[1]
206207
assert_equal 'true', argv[2]
207208
assert_equal @config.timeout, argv[3]
209+
assert_equal @config.heartbeat_grace_period, argv[4]
208210
nil # Return nil (no lost test)
209211
}) do
210212
@worker.send(:try_to_reserve_lost_test)

0 commit comments

Comments
 (0)