forked from ripienaar/stomp-irb
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstomp-irb
More file actions
executable file
·254 lines (192 loc) · 6.59 KB
/
stomp-irb
File metadata and controls
executable file
·254 lines (192 loc) · 6.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
#!/usr/bin/env ruby
# An interactive IRB like interface for STOMP
require 'yaml'
require 'rubygems'
require 'stomp'
require 'pp'
require 'irb'
require 'optparse'
class StompLogger
def on_connecting(params)
puts "info> Attempting to connect to %s" % stomp_url(params)
end
def on_connectfail(params)
puts "error> Connection to %s failed" % stomp_url(params)
end
def on_connectfail(params)
puts "info> Disconnected from %s" % stomp_url(params)
end
def on_connected(params)
puts "info> Connected to %s" % stomp_url(params)
end
def on_miscerr(params, errstr)
puts "error> Unknown error on stomp connection %s: %s" % [stomp_url(params), errstr]
end
def stomp_url(params)
"%s://%s@%s:%d" % [ params[:cur_ssl] ? "stomp+ssl" : "stomp", params[:cur_login], params[:cur_host], params[:cur_port]]
end
end
def consolize(&block)
yield
IRB.setup(nil)
irb = IRB::Irb.new
IRB.conf[:MAIN_CONTEXT] = irb.context
irb.context.evaluate("require 'irb/completion'", 0)
install_alias_method :help, :stomp_help, IRB::ExtendCommandBundle::OVERRIDE_ALL
trap("SIGINT") do
irb.signal_handle
end
catch(:IRB_EXIT) do
irb.eval_input
end
end
def stomp_help
puts <<EOF
Interactive Ruby Shell for Stomp
================================
subscribe :topic, "foo" - subscribes to /topic/foo
unsubscribe :topic, "foo" - unsubscribes from /topic/foo
topic "foo", "hello" - sends "hello" to /topic/foo
topic "foo", "hello" - sends "hello" to /topic/foo
queue "foo", "hello" - sends "hello" to /queue/foo
queue "foo", "hello", {:persistent => true} - sends "hello" to /queue/foo
and persist to storage
recv_callback{|f| pp f} - dumps every received message using pp
verbose - shows timestamps and source of
received messages
short_format "<format>" - sets the short display format
long_format "<format>" - sets the long display format
Display Formats:
You can set the display format used for showing incoming messages
the default long format is:
"<<%{time}:%{source}>> %{body}"
the default short format is:
"<<%{body}>>"
EOF
end
def short_format(frm)
@options[:short_display_format] = frm
end
def long_format(frm)
@options[:long_display_format] = frm
end
def show_subscriptions
puts "Current Subscriptions:"
@connection.instance_variable_get("@subscriptions").keys.each do |s|
puts "\t#{s}"
end
puts
end
def unsubscribe(type, q, headers={})
dest = "/#{type}/#{q}"
unless @subscriptions[dest]
puts "error> Not currently subscribed to #{dest}"
return false
end
@connection.unsubscribe("/#{type}/#{q}", {:id => @subscriptions[dest][:id]}.merge(headers))
@subscriptions.delete(dest)
show_subscriptions
end
def subscribe(type, q, headers={})
dest = "/#{type}/#{q}"
if @subscriptions[dest]
puts "error> Already subscribed to #{dest}"
return false
end
@connection.subscribe(dest, {:id => @next_subscription_id}.merge(headers))
@subscriptions[dest] = {:id => @next_subscription_id}
@next_subscription_id += 1
show_subscriptions
end
def verbose
@options[:verbose] ? @options[:verbose] = false : @options[:verbose] = true
end
def send_msg(dest, message, headers={})
if @connection.respond_to?(:publish)
@connection.publish(dest, message.to_s, headers)
else
@connection.send(dest, message.to_s, headers)
end
puts "Sent #{message.to_s} to #{dest} with headers #{headers.pretty_inspect.chomp}"
end
def topic(dest, message, headers={})
send_msg("/topic/#{dest}", message, headers)
end
def queue(dest, message, headers={})
send_msg("/queue/#{dest}", message, headers)
end
def recv_callback(&blk)
if block_given?
@options[:callback] = blk
else
puts "ERROR: Need to pass a block into the callback"
end
end
def receive_and_print_loop
Thread.new(@connection) do |conn|
while true
begin
msg = conn.receive
dest = msg.headers["destination"]
time = Time.now.strftime('%H:%M:%S')
if @options[:verbose]
txt = @options[:long_display_format].clone
else
txt = @options[:short_display_format].clone
end
txt.gsub!(/%\{time\}/, time)
txt.gsub!(/%\{source\}/, dest)
txt.gsub!(/%\{body\}/, msg.body.chomp)
puts "\r#{txt}\n"
@options[:callback].call(msg)
rescue Exception => e
puts "error> Failed to receive from #{options[:stompserver]}: #{e.class}: #{e}"
end
end
end
end
consolize do
@options = {}
@options[:stompserver] = ENV["STOMP_SERVER"] || "localhost"
@options[:stompport] = (ENV["STOMP_PORT"] || 61613).to_i
@options[:stompuser] = ENV["STOMP_USER"] || "guest"
@options[:stomppass] = ENV["STOMP_PASSWORD"] || "guest"
@options[:verbose] = false
@options[:long_display_format] = "<<%{time}:%{source}>> %{body}"
@options[:short_display_format] = "<<stomp>> %{body}"
@options[:callback] = Proc.new { true }
@options[:heartbeat_interval] = "30000"
@options[:vhost] = nil
@subscriptions = {}
@next_subscription_id = 0
opt = OptionParser.new
opt.on("--server [SERVER]", "-s", "Server to connect to") do |val|
@options[:stompserver] = val
end
opt.on("--port [PORT]", "-p", Integer, "Port to connect to") do |val|
@options[:stompport] = val
end
opt.on("--user [USER]", "-u", "User to connect as") do |val|
@options[:stompuser] = val
end
opt.on("--password [PASSWORD]", "-P", "Password to connect with") do |val|
@options[:stomppass] = val
end
opt.on("--heartbeat-interval [INTERVAL]", "-i", "Interval for Stomp 1.1 heartbeats") do |val|
@options[:heartbeat_interval] = val
end
opt.on("--vhost [VHOST]", "-V", "Vhost used by RabbitMQ") do |val|
@options[:vhost] = val
end
opt.parse!
connection = {:hosts => [ {:login => @options[:stompuser], :passcode => @options[:stomppass], :host => @options[:stompserver], :port => @options[:stompport]} ],
:logger => StompLogger.new,
:connect_headers => {:"heart-beat" => "#{@options[:heartbeat_interval]},#{@options[:heartbeat_interval]}", :"accept-version" => "1.1,1.0", :host => @options[:vhost] || @options[:stompserver] }}
puts "Interactive Ruby shell for STOMP"
puts
@connection = Stomp::Connection.open(connection)
puts
puts "Type 'help' for usage instructions"
puts
receive_and_print_loop
end