completableFuture) {
+ flux.subscribe(
+ // next
+ event -> {
+ try {
+ producer.send(
+ buildMessageForResponse(
+ workAgentResponseTopic,
+ liteTopic,
+ RocketMQResponse.builder()
+ .responseBody(event.toString())
+ .requestId(requestId)
+ .stream(true)
+ .end(false)
+ .build()));
+ } catch (ClientException error) {
+ log.error("writeRocketMQ send stream error: {}", error.getMessage());
+ }
+ },
+ // error
+ error -> {
+ log.error("writeRocketMQ send stream error: {}", error.getMessage());
+ completableFuture.complete(false);
+ },
+ // complete
+ () -> {
+ try {
+ producer.send(
+ buildMessageForResponse(
+ workAgentResponseTopic,
+ liteTopic,
+ RocketMQResponse.builder()
+ .responseBody(null)
+ .requestId(requestId)
+ .stream(true)
+ .end(true)
+ .build()));
+ } catch (ClientException e) {
+ log.error("writeRocketMQ send stream error: {}", e.getMessage());
+ }
+ completableFuture.complete(true);
+ });
+ }
+ }
+
+ /**
+ * Validates required configuration parameters for initializing RocketMQA2aServer.
+ * All parameters are mandatory. If any is {@code null} or empty, an {@link IllegalArgumentException}
+ * is thrown with detailed information about which field(s) failed validation.
+ */
+ private void checkConfigParam() {
+ Map configParams = new LinkedHashMap<>();
+ configParams.put("rocketMQEndpoint", rocketMQEndpoint);
+ configParams.put("bizTopic", bizTopic);
+ configParams.put("bizConsumerGroup", bizConsumerGroup);
+ configParams.put("workAgentResponseTopic", workAgentResponseTopic);
+ configParams.put("workAgentResponseGroupID", workAgentResponseGroupId);
+ List missingParams = new ArrayList<>();
+ for (Map.Entry entry : configParams.entrySet()) {
+ if (StringUtils.isEmpty(entry.getValue())) {
+ String paramName = entry.getKey();
+ log.error("RocketMQA2aServer checkConfigParam [{}] is empty", paramName);
+ missingParams.add(paramName);
+ }
+ }
+ if (!missingParams.isEmpty()) {
+ throw new IllegalArgumentException(
+ "RocketMQA2aServer init failed, missing required params: " + missingParams);
+ }
+ }
+}
diff --git a/agentscope-extensions/agentscope-extensions-rocketmq/src/main/java/io/agentscope/extensions/rocketmq/a2a/wrapper/RocketMQTransportWrapperBuilder.java b/agentscope-extensions/agentscope-extensions-rocketmq/src/main/java/io/agentscope/extensions/rocketmq/a2a/wrapper/RocketMQTransportWrapperBuilder.java
new file mode 100644
index 000000000..890c9add1
--- /dev/null
+++ b/agentscope-extensions/agentscope-extensions-rocketmq/src/main/java/io/agentscope/extensions/rocketmq/a2a/wrapper/RocketMQTransportWrapperBuilder.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2024-2026 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.agentscope.extensions.rocketmq.a2a.wrapper;
+
+import io.a2a.server.requesthandlers.RequestHandler;
+import io.a2a.spec.AgentCard;
+import io.a2a.transport.jsonrpc.handler.JSONRPCHandler;
+import io.agentscope.core.a2a.server.transport.TransportWrapperBuilder;
+import io.agentscope.core.a2a.server.transport.jsonrpc.JsonRpcTransportWrapper;
+import java.util.concurrent.Executor;
+import org.apache.rocketmq.a2a.common.constant.RocketMQA2AConstant;
+
+/**
+ * Builder implementation for {@link JsonRpcTransportWrapper}.
+ */
+public class RocketMQTransportWrapperBuilder
+ implements TransportWrapperBuilder {
+
+ @Override
+ public String getTransportType() {
+ return RocketMQA2AConstant.ROCKETMQ_PROTOCOL;
+ }
+
+ @Override
+ public JsonRpcTransportWrapper build(
+ AgentCard agentCard,
+ RequestHandler requestHandler,
+ Executor executor,
+ AgentCard extendedAgentCard) {
+ // TODO: after support of extended agent card after support authenticated.
+ JSONRPCHandler jsonrpcHandler =
+ new JSONRPCHandler(agentCard, null, requestHandler, executor);
+ return new JsonRpcTransportWrapper(jsonrpcHandler);
+ }
+}
diff --git a/agentscope-extensions/agentscope-extensions-rocketmq/src/main/resources/META-INF/services/io.agentscope.core.a2a.server.transport.TransportWrapperBuilder b/agentscope-extensions/agentscope-extensions-rocketmq/src/main/resources/META-INF/services/io.agentscope.core.a2a.server.transport.TransportWrapperBuilder
new file mode 100644
index 000000000..64636cdc0
--- /dev/null
+++ b/agentscope-extensions/agentscope-extensions-rocketmq/src/main/resources/META-INF/services/io.agentscope.core.a2a.server.transport.TransportWrapperBuilder
@@ -0,0 +1,17 @@
+#
+# Copyright 2024-2026 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+io.agentscope.extensions.rocketmq.a2a.wrapper.RocketMQTransportWrapperBuilder
diff --git a/agentscope-extensions/agentscope-spring-boot-starters/agentscope-a2a-spring-boot-starter/pom.xml b/agentscope-extensions/agentscope-spring-boot-starters/agentscope-a2a-spring-boot-starter/pom.xml
index 3d6b63584..18045330d 100644
--- a/agentscope-extensions/agentscope-spring-boot-starters/agentscope-a2a-spring-boot-starter/pom.xml
+++ b/agentscope-extensions/agentscope-spring-boot-starters/agentscope-a2a-spring-boot-starter/pom.xml
@@ -85,6 +85,12 @@
spring-boot-starter-test
test