Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,20 +54,21 @@

public class OpcUaSubscriptionLifecycleHandler implements OpcUaSubscription.SubscriptionListener {

private static final Logger log = LoggerFactory.getLogger(OpcUaSubscriptionLifecycleHandler.class);
private static final long KEEP_ALIVE_TIMEOUT_MS = 30_000; // 30 seconds
public static final long KEEP_ALIVE_SAFETY_MARGIN_MS = 5_000L;

private static final @NotNull Logger log = LoggerFactory.getLogger(OpcUaSubscriptionLifecycleHandler.class);
private static final int MAX_MONITORED_ITEM_COUNT = 5;

private final @NotNull ProtocolAdapterMetricsService protocolAdapterMetricsService;
private final @NotNull ProtocolAdapterTagStreamingService tagStreamingService;
private final @NotNull EventService eventService;
private final @NotNull String adapterId;
final Map<OpcuaTag, Boolean> tagToFirstSeen = new ConcurrentHashMap<>();
private final @NotNull Map<OpcuaTag, Boolean> tagToFirstSeen;
private final @NotNull Map<NodeId, OpcuaTag> nodeIdToTag;
private final @NotNull List<OpcuaTag> tags;
private final @NotNull OpcUaClient client;
private final @NotNull DataPointFactory dataPointFactory;
final @NotNull OpcUaSpecificAdapterConfig config;
private final @NotNull OpcUaSpecificAdapterConfig config;

// Track last keep-alive timestamp for health monitoring
private volatile long lastKeepAliveTimestamp;
Expand All @@ -89,33 +90,12 @@ public OpcUaSubscriptionLifecycleHandler(
this.client = client;
this.dataPointFactory = dataPointFactory;
this.tags = tags;
this.tagToFirstSeen = new ConcurrentHashMap<>();
this.lastKeepAliveTimestamp = System.currentTimeMillis();
nodeIdToTag = tags.stream()
this.nodeIdToTag = tags.stream()
.collect(Collectors.toMap(tag -> NodeId.parse(tag.getDefinition().getNode()), Function.identity()));
}

/**
* Subscribes to the OPC UA client.
* If a subscription ID is provided, it attempts to transfer the subscription.
* If the transfer fails or no ID is provided, it creates a new subscription.
* It then synchronizes the tags and monitored items.
*
* @param client the OPC UA client
* @return an Optional containing the created or transferred subscription, or empty if failed
*/
public @NotNull Optional<OpcUaSubscription> subscribe(final @NotNull OpcUaClient client) {
return createNewSubscription(client)
.map(subscription -> {
subscription.setPublishingInterval((double) config.getOpcuaToMqttConfig().publishingInterval());
subscription.setSubscriptionListener(this);
if(syncTagsAndMonitoredItems(subscription, tags, config)) {
return subscription;
} else {
return null;
}
});
}

/**
* Creates a new OPC UA subscription.
* If the subscription is created successfully, it returns an Optional containing the subscription.
Expand All @@ -129,22 +109,52 @@ public OpcUaSubscriptionLifecycleHandler(
final OpcUaSubscription subscription = new OpcUaSubscription(client);
try {
subscription.create();
return subscription
.getSubscriptionId()
.map(subscriptionId -> {
log.trace("New subscription ID: {}", subscriptionId);
return subscription;
})
.or(() -> {
log.error("Subscription not created on the server");
return Optional.empty();
});
return subscription.getSubscriptionId().map(subscriptionId -> {
log.trace("New subscription ID: {}", subscriptionId);
return subscription;
}).or(() -> {
log.error("Subscription not created on the server");
return Optional.empty();
});
} catch (final UaException e) {
log.error("Failed to create subscription", e);
}
return Optional.empty();
}

private static @NotNull String extractPayload(final @NotNull OpcUaClient client, final @NotNull DataValue value)
throws UaException {
if (value.getValue().getValue() == null) {
return "";
}

final ByteBuffer byteBuffer = OpcUaToJsonConverter.convertPayload(client.getDynamicEncodingContext(), value);
final byte[] buffer = new byte[byteBuffer.remaining()];
byteBuffer.get(buffer);
return new String(buffer, StandardCharsets.UTF_8);
}

/**
* Subscribes to the OPC UA client.
* If a subscription ID is provided, it attempts to transfer the subscription.
* If the transfer fails or no ID is provided, it creates a new subscription.
* It then synchronizes the tags and monitored items.
*
* @param client the OPC UA client
* @return an Optional containing the created or transferred subscription, or empty if failed
*/
public @NotNull Optional<OpcUaSubscription> subscribe(final @NotNull OpcUaClient client) {
return createNewSubscription(client).map(subscription -> {
subscription.setPublishingInterval((double) config.getOpcuaToMqttConfig().publishingInterval());
subscription.setSubscriptionListener(this);
if (syncTagsAndMonitoredItems(subscription, tags, config)) {
return subscription;
} else {
return null;
}
});
}

/**
* Synchronizes the tags and monitored items in the subscription.
* It removes monitored items that are not in the tags list and adds new monitored items from the tags list.
Expand All @@ -155,18 +165,34 @@ public OpcUaSubscriptionLifecycleHandler(
* @param config the configuration for the OPC UA adapter
* @return true if synchronization was successful, false otherwise
*/
private boolean syncTagsAndMonitoredItems(final @NotNull OpcUaSubscription subscription, final @NotNull List<OpcuaTag> tags, final @NotNull OpcUaSpecificAdapterConfig config) {
private boolean syncTagsAndMonitoredItems(
final @NotNull OpcUaSubscription subscription,
final @NotNull List<OpcuaTag> tags,
final @NotNull OpcUaSpecificAdapterConfig config) {

final var nodeIdToTag = tags.stream().collect(Collectors.toMap(tag -> NodeId.parse(tag.getDefinition().getNode()), Function.identity()));
final var nodeIdToMonitoredItem = subscription.getMonitoredItems().stream().collect(Collectors.toMap(monitoredItem -> monitoredItem.getReadValueId().getNodeId(), Function.identity()));
final var nodeIdToTag = tags.stream()
.collect(Collectors.toMap(tag -> NodeId.parse(tag.getDefinition().getNode()), Function.identity()));
final var nodeIdToMonitoredItem = subscription.getMonitoredItems()
.stream()
.collect(Collectors.toMap(monitoredItem -> monitoredItem.getReadValueId().getNodeId(),
Function.identity()));

final var monitoredItemsToRemove = nodeIdToMonitoredItem.entrySet().stream().filter(entry -> !nodeIdToTag.containsKey(entry.getKey())).map(Map.Entry::getValue).toList();
final var monitoredItemsToAdd = nodeIdToTag.entrySet().stream().filter(entry -> !nodeIdToMonitoredItem.containsKey(entry.getKey())).map(Map.Entry::getValue).toList();
final var monitoredItemsToRemove = nodeIdToMonitoredItem.entrySet()
.stream()
.filter(entry -> !nodeIdToTag.containsKey(entry.getKey()))
.map(Map.Entry::getValue)
.toList();
final var monitoredItemsToAdd = nodeIdToTag.entrySet()
.stream()
.filter(entry -> !nodeIdToMonitoredItem.containsKey(entry.getKey()))
.map(Map.Entry::getValue)
.toList();

//clear deleted monitored items
if(!monitoredItemsToRemove.isEmpty()) {
if (!monitoredItemsToRemove.isEmpty()) {
subscription.removeMonitoredItems(monitoredItemsToRemove);
log.debug("Removed monitored items: {}", monitoredItemsToRemove.stream().map(item -> item.getReadValueId().getNodeId()));
log.debug("Removed monitored items: {}",
monitoredItemsToRemove.stream().map(item -> item.getReadValueId().getNodeId()));
}

//update existing monitored items
Expand All @@ -177,15 +203,16 @@ private boolean syncTagsAndMonitoredItems(final @NotNull OpcUaSubscription subsc
});

//add new monitored items
if(!monitoredItemsToAdd.isEmpty()) {
if (!monitoredItemsToAdd.isEmpty()) {
monitoredItemsToAdd.forEach(opcuaTag -> {
final String nodeId = opcuaTag.getDefinition().getNode();
final var monitoredItem = OpcUaMonitoredItem.newDataItem(NodeId.parse(nodeId));
monitoredItem.setQueueSize(uint(config.getOpcuaToMqttConfig().serverQueueSize()));
monitoredItem.setSamplingInterval(config.getOpcuaToMqttConfig().publishingInterval());
subscription.addMonitoredItem(monitoredItem);
});
log.debug("Added monitored items: {}", monitoredItemsToAdd.stream().map(item -> item.getDefinition().getNode()).toList());
log.debug("Added monitored items: {}",
monitoredItemsToAdd.stream().map(item -> item.getDefinition().getNode()).toList());
}

try {
Expand Down Expand Up @@ -225,21 +252,32 @@ private boolean syncTagsAndMonitoredItems(final @NotNull OpcUaSubscription subsc
public void onKeepAliveReceived(final @NotNull OpcUaSubscription subscription) {
lastKeepAliveTimestamp = System.currentTimeMillis();
protocolAdapterMetricsService.increment(Constants.METRIC_SUBSCRIPTION_KEEPALIVE_COUNT);

subscription.getSubscriptionId().ifPresent(subscriptionId -> {
log.debug("Keep-alive received for subscription {} of adapter '{}'", subscriptionId, adapterId);
});
subscription.getSubscriptionId()
.ifPresent(sid -> log.debug("Keep-alive received for subscription {} of adapter '{}'", sid, adapterId));
}

/**
* Checks if keep-alive messages are being received within the expected timeout.
* The timeout is computed dynamically from ConnectionOptions.
* Can be used for health monitoring to detect subscription issues.
*
* @return true if last keep-alive was received within KEEP_ALIVE_TIMEOUT_MS, false otherwise
* @return true if last keep-alive was received within the computed timeout, false otherwise
*/
public boolean isKeepAliveHealthy() {
final long timeSinceLastKeepAlive = System.currentTimeMillis() - lastKeepAliveTimestamp;
return timeSinceLastKeepAlive < KEEP_ALIVE_TIMEOUT_MS;
return (System.currentTimeMillis() - lastKeepAliveTimestamp) < getKeepAliveTimeoutMs();
}

/**
* Computes the keep-alive timeout based on ConnectionOptions.
* The timeout allows for the configured number of missed keep-alives plus one
* before considering the connection unhealthy, plus a safety margin.
* Formula: keepAliveIntervalMs × (keepAliveFailuresAllowed + 1) + KEEP_ALIVE_SAFETY_MARGIN_MS
*
* @return the computed keep-alive timeout in milliseconds
*/
public long getKeepAliveTimeoutMs() {
final var opts = config.getConnectionOptions();
return opts.keepAliveIntervalMs() * (opts.keepAliveFailuresAllowed() + 1) + KEEP_ALIVE_SAFETY_MARGIN_MS;
}

@Override
Expand All @@ -252,17 +290,11 @@ public void onTransferFailed(
protocolAdapterMetricsService.increment(Constants.METRIC_SUBSCRIPTION_TRANSFER_FAILED_COUNT);

log.error("Subscription Transfer failed, recreating subscription for adapter '{}'", adapterId);
createNewSubscription(client)
.ifPresentOrElse(
replacementSubscription -> {
// reconnect the listener with the new subscription
replacementSubscription.setSubscriptionListener(this);
syncTagsAndMonitoredItems(replacementSubscription, tags, config);
},
() -> {
log.error("Subscription Transfer failed, unable to create new subscription '{}'", adapterId);
}
);
createNewSubscription(client).ifPresentOrElse(replacementSubscription -> {
// reconnect the listener with the new subscription
replacementSubscription.setSubscriptionListener(this);
syncTagsAndMonitoredItems(replacementSubscription, tags, config);
}, () -> log.error("Subscription Transfer failed, unable to create new subscription '{}'", adapterId));
}

@Override
Expand All @@ -277,9 +309,7 @@ public void onDataReceived(
if (null == tagToFirstSeen.putIfAbsent(tag, true)) {
eventService.createAdapterEvent(adapterId, PROTOCOL_ID_OPCUA)
.withSeverity(Event.SEVERITY.INFO)
.withMessage(String.format("Adapter '%s' took first sample for tag '%s'",
adapterId,
tn))
.withMessage(String.format("Adapter '%s' took first sample for tag '%s'", adapterId, tn))
.fire();
}
try {
Expand All @@ -292,16 +322,4 @@ public void onDataReceived(
}
}
}

private static @NotNull String extractPayload(final @NotNull OpcUaClient client, final @NotNull DataValue value)
throws UaException {
if (value.getValue().getValue() == null) {
return "";
}

final ByteBuffer byteBuffer = OpcUaToJsonConverter.convertPayload(client.getDynamicEncodingContext(), value);
final byte[] buffer = new byte[byteBuffer.remaining()];
byteBuffer.get(buffer);
return new String(buffer, StandardCharsets.UTF_8);
}
}
Loading
Loading