Skip to content

Commit 20034c5

Browse files
committed
sam's feedback
1 parent f4f9aeb commit 20034c5

File tree

2 files changed

+248
-101
lines changed

2 files changed

+248
-101
lines changed

modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/listeners/OpcUaSubscriptionLifecycleHandler.java

Lines changed: 80 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@
5454

5555
public class OpcUaSubscriptionLifecycleHandler implements OpcUaSubscription.SubscriptionListener {
5656

57+
public static final long KEEP_ALIVE_SAFETY_MARGIN_MS = 5_000L;
58+
5759
private static final @NotNull Logger log = LoggerFactory.getLogger(OpcUaSubscriptionLifecycleHandler.class);
5860
private static final int MAX_MONITORED_ITEM_COUNT = 5;
5961

@@ -94,28 +96,6 @@ public OpcUaSubscriptionLifecycleHandler(
9496
.collect(Collectors.toMap(tag -> NodeId.parse(tag.getDefinition().getNode()), Function.identity()));
9597
}
9698

97-
/**
98-
* Subscribes to the OPC UA client.
99-
* If a subscription ID is provided, it attempts to transfer the subscription.
100-
* If the transfer fails or no ID is provided, it creates a new subscription.
101-
* It then synchronizes the tags and monitored items.
102-
*
103-
* @param client the OPC UA client
104-
* @return an Optional containing the created or transferred subscription, or empty if failed
105-
*/
106-
public @NotNull Optional<OpcUaSubscription> subscribe(final @NotNull OpcUaClient client) {
107-
return createNewSubscription(client)
108-
.map(subscription -> {
109-
subscription.setPublishingInterval((double) config.getOpcuaToMqttConfig().publishingInterval());
110-
subscription.setSubscriptionListener(this);
111-
if(syncTagsAndMonitoredItems(subscription, tags, config)) {
112-
return subscription;
113-
} else {
114-
return null;
115-
}
116-
});
117-
}
118-
11999
/**
120100
* Creates a new OPC UA subscription.
121101
* If the subscription is created successfully, it returns an Optional containing the subscription.
@@ -129,22 +109,52 @@ public OpcUaSubscriptionLifecycleHandler(
129109
final OpcUaSubscription subscription = new OpcUaSubscription(client);
130110
try {
131111
subscription.create();
132-
return subscription
133-
.getSubscriptionId()
134-
.map(subscriptionId -> {
135-
log.trace("New subscription ID: {}", subscriptionId);
136-
return subscription;
137-
})
138-
.or(() -> {
139-
log.error("Subscription not created on the server");
140-
return Optional.empty();
141-
});
112+
return subscription.getSubscriptionId().map(subscriptionId -> {
113+
log.trace("New subscription ID: {}", subscriptionId);
114+
return subscription;
115+
}).or(() -> {
116+
log.error("Subscription not created on the server");
117+
return Optional.empty();
118+
});
142119
} catch (final UaException e) {
143120
log.error("Failed to create subscription", e);
144121
}
145122
return Optional.empty();
146123
}
147124

125+
private static @NotNull String extractPayload(final @NotNull OpcUaClient client, final @NotNull DataValue value)
126+
throws UaException {
127+
if (value.getValue().getValue() == null) {
128+
return "";
129+
}
130+
131+
final ByteBuffer byteBuffer = OpcUaToJsonConverter.convertPayload(client.getDynamicEncodingContext(), value);
132+
final byte[] buffer = new byte[byteBuffer.remaining()];
133+
byteBuffer.get(buffer);
134+
return new String(buffer, StandardCharsets.UTF_8);
135+
}
136+
137+
/**
138+
* Subscribes to the OPC UA client.
139+
* If a subscription ID is provided, it attempts to transfer the subscription.
140+
* If the transfer fails or no ID is provided, it creates a new subscription.
141+
* It then synchronizes the tags and monitored items.
142+
*
143+
* @param client the OPC UA client
144+
* @return an Optional containing the created or transferred subscription, or empty if failed
145+
*/
146+
public @NotNull Optional<OpcUaSubscription> subscribe(final @NotNull OpcUaClient client) {
147+
return createNewSubscription(client).map(subscription -> {
148+
subscription.setPublishingInterval((double) config.getOpcuaToMqttConfig().publishingInterval());
149+
subscription.setSubscriptionListener(this);
150+
if (syncTagsAndMonitoredItems(subscription, tags, config)) {
151+
return subscription;
152+
} else {
153+
return null;
154+
}
155+
});
156+
}
157+
148158
/**
149159
* Synchronizes the tags and monitored items in the subscription.
150160
* It removes monitored items that are not in the tags list and adds new monitored items from the tags list.
@@ -155,18 +165,34 @@ public OpcUaSubscriptionLifecycleHandler(
155165
* @param config the configuration for the OPC UA adapter
156166
* @return true if synchronization was successful, false otherwise
157167
*/
158-
private boolean syncTagsAndMonitoredItems(final @NotNull OpcUaSubscription subscription, final @NotNull List<OpcuaTag> tags, final @NotNull OpcUaSpecificAdapterConfig config) {
168+
private boolean syncTagsAndMonitoredItems(
169+
final @NotNull OpcUaSubscription subscription,
170+
final @NotNull List<OpcuaTag> tags,
171+
final @NotNull OpcUaSpecificAdapterConfig config) {
159172

160-
final var nodeIdToTag = tags.stream().collect(Collectors.toMap(tag -> NodeId.parse(tag.getDefinition().getNode()), Function.identity()));
161-
final var nodeIdToMonitoredItem = subscription.getMonitoredItems().stream().collect(Collectors.toMap(monitoredItem -> monitoredItem.getReadValueId().getNodeId(), Function.identity()));
173+
final var nodeIdToTag = tags.stream()
174+
.collect(Collectors.toMap(tag -> NodeId.parse(tag.getDefinition().getNode()), Function.identity()));
175+
final var nodeIdToMonitoredItem = subscription.getMonitoredItems()
176+
.stream()
177+
.collect(Collectors.toMap(monitoredItem -> monitoredItem.getReadValueId().getNodeId(),
178+
Function.identity()));
162179

163-
final var monitoredItemsToRemove = nodeIdToMonitoredItem.entrySet().stream().filter(entry -> !nodeIdToTag.containsKey(entry.getKey())).map(Map.Entry::getValue).toList();
164-
final var monitoredItemsToAdd = nodeIdToTag.entrySet().stream().filter(entry -> !nodeIdToMonitoredItem.containsKey(entry.getKey())).map(Map.Entry::getValue).toList();
180+
final var monitoredItemsToRemove = nodeIdToMonitoredItem.entrySet()
181+
.stream()
182+
.filter(entry -> !nodeIdToTag.containsKey(entry.getKey()))
183+
.map(Map.Entry::getValue)
184+
.toList();
185+
final var monitoredItemsToAdd = nodeIdToTag.entrySet()
186+
.stream()
187+
.filter(entry -> !nodeIdToMonitoredItem.containsKey(entry.getKey()))
188+
.map(Map.Entry::getValue)
189+
.toList();
165190

166191
//clear deleted monitored items
167-
if(!monitoredItemsToRemove.isEmpty()) {
192+
if (!monitoredItemsToRemove.isEmpty()) {
168193
subscription.removeMonitoredItems(monitoredItemsToRemove);
169-
log.debug("Removed monitored items: {}", monitoredItemsToRemove.stream().map(item -> item.getReadValueId().getNodeId()));
194+
log.debug("Removed monitored items: {}",
195+
monitoredItemsToRemove.stream().map(item -> item.getReadValueId().getNodeId()));
170196
}
171197

172198
//update existing monitored items
@@ -177,15 +203,16 @@ private boolean syncTagsAndMonitoredItems(final @NotNull OpcUaSubscription subsc
177203
});
178204

179205
//add new monitored items
180-
if(!monitoredItemsToAdd.isEmpty()) {
206+
if (!monitoredItemsToAdd.isEmpty()) {
181207
monitoredItemsToAdd.forEach(opcuaTag -> {
182208
final String nodeId = opcuaTag.getDefinition().getNode();
183209
final var monitoredItem = OpcUaMonitoredItem.newDataItem(NodeId.parse(nodeId));
184210
monitoredItem.setQueueSize(uint(config.getOpcuaToMqttConfig().serverQueueSize()));
185211
monitoredItem.setSamplingInterval(config.getOpcuaToMqttConfig().publishingInterval());
186212
subscription.addMonitoredItem(monitoredItem);
187213
});
188-
log.debug("Added monitored items: {}", monitoredItemsToAdd.stream().map(item -> item.getDefinition().getNode()).toList());
214+
log.debug("Added monitored items: {}",
215+
monitoredItemsToAdd.stream().map(item -> item.getDefinition().getNode()).toList());
189216
}
190217

191218
try {
@@ -225,9 +252,8 @@ private boolean syncTagsAndMonitoredItems(final @NotNull OpcUaSubscription subsc
225252
public void onKeepAliveReceived(final @NotNull OpcUaSubscription subscription) {
226253
lastKeepAliveTimestamp = System.currentTimeMillis();
227254
protocolAdapterMetricsService.increment(Constants.METRIC_SUBSCRIPTION_KEEPALIVE_COUNT);
228-
229-
subscription.getSubscriptionId().ifPresent(subscriptionId ->
230-
log.debug("Keep-alive received for subscription {} of adapter '{}'", subscriptionId, adapterId));
255+
subscription.getSubscriptionId()
256+
.ifPresent(sid -> log.debug("Keep-alive received for subscription {} of adapter '{}'", sid, adapterId));
231257
}
232258

233259
/**
@@ -238,21 +264,20 @@ public void onKeepAliveReceived(final @NotNull OpcUaSubscription subscription) {
238264
* @return true if last keep-alive was received within the computed timeout, false otherwise
239265
*/
240266
public boolean isKeepAliveHealthy() {
241-
final long timeSinceLastKeepAlive = System.currentTimeMillis() - lastKeepAliveTimestamp;
242-
return timeSinceLastKeepAlive < getKeepAliveTimeoutMs();
267+
return (System.currentTimeMillis() - lastKeepAliveTimestamp) < getKeepAliveTimeoutMs();
243268
}
244269

245270
/**
246271
* Computes the keep-alive timeout based on ConnectionOptions.
247272
* The timeout allows for the configured number of missed keep-alives plus one
248273
* before considering the connection unhealthy, plus a safety margin.
249-
* Formula: keepAliveIntervalMs × (keepAliveFailuresAllowed + 1) + SAFETY_MARGIN_MS
274+
* Formula: keepAliveIntervalMs × (keepAliveFailuresAllowed + 1) + KEEP_ALIVE_SAFETY_MARGIN_MS
250275
*
251276
* @return the computed keep-alive timeout in milliseconds
252277
*/
253278
public long getKeepAliveTimeoutMs() {
254-
final var connOpts = config.getConnectionOptions();
255-
return connOpts.keepAliveIntervalMs() * (connOpts.keepAliveFailuresAllowed() + 1) + 5_000L;
279+
final var opts = config.getConnectionOptions();
280+
return opts.keepAliveIntervalMs() * (opts.keepAliveFailuresAllowed() + 1) + KEEP_ALIVE_SAFETY_MARGIN_MS;
256281
}
257282

258283
@Override
@@ -265,15 +290,11 @@ public void onTransferFailed(
265290
protocolAdapterMetricsService.increment(Constants.METRIC_SUBSCRIPTION_TRANSFER_FAILED_COUNT);
266291

267292
log.error("Subscription Transfer failed, recreating subscription for adapter '{}'", adapterId);
268-
createNewSubscription(client)
269-
.ifPresentOrElse(
270-
replacementSubscription -> {
271-
// reconnect the listener with the new subscription
272-
replacementSubscription.setSubscriptionListener(this);
273-
syncTagsAndMonitoredItems(replacementSubscription, tags, config);
274-
},
275-
() -> log.error("Subscription Transfer failed, unable to create new subscription '{}'", adapterId)
276-
);
293+
createNewSubscription(client).ifPresentOrElse(replacementSubscription -> {
294+
// reconnect the listener with the new subscription
295+
replacementSubscription.setSubscriptionListener(this);
296+
syncTagsAndMonitoredItems(replacementSubscription, tags, config);
297+
}, () -> log.error("Subscription Transfer failed, unable to create new subscription '{}'", adapterId));
277298
}
278299

279300
@Override
@@ -288,9 +309,7 @@ public void onDataReceived(
288309
if (null == tagToFirstSeen.putIfAbsent(tag, true)) {
289310
eventService.createAdapterEvent(adapterId, PROTOCOL_ID_OPCUA)
290311
.withSeverity(Event.SEVERITY.INFO)
291-
.withMessage(String.format("Adapter '%s' took first sample for tag '%s'",
292-
adapterId,
293-
tn))
312+
.withMessage(String.format("Adapter '%s' took first sample for tag '%s'", adapterId, tn))
294313
.fire();
295314
}
296315
try {
@@ -303,16 +322,4 @@ public void onDataReceived(
303322
}
304323
}
305324
}
306-
307-
private static @NotNull String extractPayload(final @NotNull OpcUaClient client, final @NotNull DataValue value)
308-
throws UaException {
309-
if (value.getValue().getValue() == null) {
310-
return "";
311-
}
312-
313-
final ByteBuffer byteBuffer = OpcUaToJsonConverter.convertPayload(client.getDynamicEncodingContext(), value);
314-
final byte[] buffer = new byte[byteBuffer.remaining()];
315-
byteBuffer.get(buffer);
316-
return new String(buffer, StandardCharsets.UTF_8);
317-
}
318325
}

0 commit comments

Comments
 (0)