diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java index c355163c..75fcf891 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java @@ -21,6 +21,9 @@ import io.serverlessworkflow.api.types.OutputAs; import io.serverlessworkflow.api.types.SchemaUnion; import io.serverlessworkflow.api.types.SecretBasedAuthenticationPolicy; +import io.serverlessworkflow.api.types.TaskBase; +import io.serverlessworkflow.api.types.TaskTimeout; +import io.serverlessworkflow.api.types.Timeout; import io.serverlessworkflow.api.types.TimeoutAfter; import io.serverlessworkflow.api.types.UriTemplate; import io.serverlessworkflow.api.types.Workflow; @@ -34,6 +37,7 @@ import java.net.URISyntaxException; import java.time.Duration; import java.util.Map; +import java.util.Objects; import java.util.Optional; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -208,6 +212,28 @@ public static WorkflowValueResolver fromTimeoutAfter( } } + public static Optional> getTaskTimeout( + WorkflowApplication appl, Workflow workflow, TaskBase task) { + TaskTimeout timeout = task.getTimeout(); + if (timeout == null) { + return Optional.empty(); + } + Timeout timeoutDef = timeout.getTaskTimeoutDefinition(); + if (timeoutDef == null && timeout.getTaskTimeoutReference() != null) { + timeoutDef = + Objects.requireNonNull( + Objects.requireNonNull( + workflow.getUse().getTimeouts(), + "Timeout reference " + + timeout.getTaskTimeoutReference() + + " specified, but use timeout was not defined") + .getAdditionalProperties() + .get(timeout.getTaskTimeoutReference()), + "Timeout reference " + timeout.getTaskTimeoutReference() + "cannot be found"); + } + return Optional.of(WorkflowUtils.fromTimeoutAfter(appl, timeoutDef.getAfter())); + } + public static final String secretProp(WorkflowContext context, String secretName, String prop) { return (String) secret(context, secretName).get(prop); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java index 055cbba6..de7ce013 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java @@ -24,8 +24,6 @@ import io.serverlessworkflow.api.types.Input; import io.serverlessworkflow.api.types.Output; import io.serverlessworkflow.api.types.TaskBase; -import io.serverlessworkflow.api.types.TaskTimeout; -import io.serverlessworkflow.api.types.Timeout; import io.serverlessworkflow.api.types.Workflow; import io.serverlessworkflow.impl.TaskContext; import io.serverlessworkflow.impl.WorkflowApplication; @@ -52,7 +50,6 @@ import java.time.Instant; import java.util.Iterator; import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; @@ -124,28 +121,7 @@ protected AbstractTaskExecutorBuilder( getSchemaValidator(application.validatorFactory(), resourceLoader, export.getSchema()); } this.ifFilter = application.expressionFactory().buildIfFilter(task); - this.timeout = getTaskTimeout(); - } - - private Optional> getTaskTimeout() { - TaskTimeout timeout = task.getTimeout(); - if (timeout == null) { - return Optional.empty(); - } - Timeout timeoutDef = timeout.getTaskTimeoutDefinition(); - if (timeoutDef == null && timeout.getTaskTimeoutReference() != null) { - timeoutDef = - Objects.requireNonNull( - Objects.requireNonNull( - workflow.getUse().getTimeouts(), - "Timeout reference " - + timeout.getTaskTimeoutReference() - + " specified, but use timeout was not defined") - .getAdditionalProperties() - .get(timeout.getTaskTimeoutReference()), - "Timeout reference " + timeout.getTaskTimeoutReference() + "cannot be found"); - } - return Optional.of(WorkflowUtils.fromTimeoutAfter(application, timeoutDef.getAfter())); + this.timeout = WorkflowUtils.getTaskTimeout(application, workflow, task); } protected final TransitionInfoBuilder next( diff --git a/impl/http/src/main/java/io/serverlessworkflow/impl/executors/http/AbstractHttpExecutorBuilder.java b/impl/http/src/main/java/io/serverlessworkflow/impl/executors/http/AbstractHttpExecutorBuilder.java deleted file mode 100644 index 00f56b56..00000000 --- a/impl/http/src/main/java/io/serverlessworkflow/impl/executors/http/AbstractHttpExecutorBuilder.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright 2020-Present The Serverless Workflow Specification Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.serverlessworkflow.impl.executors.http; - -import io.serverlessworkflow.impl.WorkflowApplication; -import io.serverlessworkflow.impl.WorkflowValueResolver; -import jakarta.ws.rs.HttpMethod; -import jakarta.ws.rs.client.Invocation; -import jakarta.ws.rs.client.WebTarget; -import java.net.URI; -import java.util.Map; -import java.util.Optional; - -abstract class AbstractHttpExecutorBuilder { - - protected WorkflowValueResolver targetSupplier; - protected WorkflowValueResolver> headersMap; - protected WorkflowValueResolver> queryMap; - protected Optional authProvider = Optional.empty(); - protected RequestSupplier requestFunction; - - protected static RequestSupplier buildRequestSupplier( - String method, Object body, boolean redirect, WorkflowApplication application) { - - switch (method.toUpperCase()) { - case HttpMethod.POST: - return new WithBodyRequestSupplier(Invocation.Builder::post, application, body, redirect); - case HttpMethod.PUT: - return new WithBodyRequestSupplier(Invocation.Builder::put, application, body, redirect); - case HttpMethod.DELETE: - return new WithoutBodyRequestSupplier(Invocation.Builder::delete, application, redirect); - case HttpMethod.HEAD: - return new WithoutBodyRequestSupplier(Invocation.Builder::head, application, redirect); - case HttpMethod.PATCH: - return new WithBodyRequestSupplier( - (request, entity) -> request.method("PATCH", entity), application, body, redirect); - case HttpMethod.OPTIONS: - return new WithoutBodyRequestSupplier(Invocation.Builder::options, application, redirect); - case HttpMethod.GET: - default: - return new WithoutBodyRequestSupplier(Invocation.Builder::get, application, redirect); - } - } - - protected static WorkflowValueResolver getTargetSupplier( - WorkflowValueResolver uriSupplier) { - return (w, t, n) -> HttpClientResolver.client(w, t).target(uriSupplier.apply(w, t, n)); - } - - public HttpExecutor build() { - return new HttpExecutor( - targetSupplier, - Optional.ofNullable(headersMap), - Optional.ofNullable(queryMap), - authProvider, - requestFunction); - } -} diff --git a/impl/http/src/main/java/io/serverlessworkflow/impl/executors/http/AbstractRequestSupplier.java b/impl/http/src/main/java/io/serverlessworkflow/impl/executors/http/AbstractRequestSupplier.java index c6d3854e..cbcbc867 100644 --- a/impl/http/src/main/java/io/serverlessworkflow/impl/executors/http/AbstractRequestSupplier.java +++ b/impl/http/src/main/java/io/serverlessworkflow/impl/executors/http/AbstractRequestSupplier.java @@ -52,7 +52,7 @@ public WorkflowModel apply( private void validateStatus(TaskContext task, Response response, HttpModelConverter converter) { Family statusFamily = response.getStatusInfo().getFamily(); - if (statusFamily != SUCCESSFUL || (!this.redirect && statusFamily == REDIRECTION)) { + if (statusFamily != SUCCESSFUL && (!this.redirect || statusFamily != REDIRECTION)) { throw new WorkflowException( converter .errorFromResponse(WorkflowError.communication(response.getStatus(), task), response) diff --git a/impl/http/src/main/java/io/serverlessworkflow/impl/executors/http/CallableTaskHttpExecutorBuilder.java b/impl/http/src/main/java/io/serverlessworkflow/impl/executors/http/CallableTaskHttpExecutorBuilder.java index ae57898e..848bfe93 100644 --- a/impl/http/src/main/java/io/serverlessworkflow/impl/executors/http/CallableTaskHttpExecutorBuilder.java +++ b/impl/http/src/main/java/io/serverlessworkflow/impl/executors/http/CallableTaskHttpExecutorBuilder.java @@ -23,54 +23,64 @@ import io.serverlessworkflow.api.types.TaskBase; import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowMutablePosition; +import io.serverlessworkflow.impl.WorkflowUtils; +import io.serverlessworkflow.impl.WorkflowValueResolver; +import io.serverlessworkflow.impl.executors.CallableTask; import io.serverlessworkflow.impl.executors.CallableTaskBuilder; -import java.util.Optional; +import java.net.URI; -public class CallableTaskHttpExecutorBuilder extends AbstractHttpExecutorBuilder - implements CallableTaskBuilder { +public class CallableTaskHttpExecutorBuilder implements CallableTaskBuilder { + + private HttpExecutorBuilder builder; + private WorkflowValueResolver uriSupplier; @Override public void init(CallHTTP task, WorkflowDefinition definition, WorkflowMutablePosition position) { + + builder = HttpExecutorBuilder.builder(definition); final HTTPArguments httpArgs = task.getWith(); final Endpoint endpoint = httpArgs.getEndpoint(); - this.authProvider = - endpoint.getEndpointConfiguration() == null - ? Optional.empty() - : AuthProviderFactory.getAuth( - definition, endpoint.getEndpointConfiguration().getAuthentication()); + if (endpoint.getEndpointConfiguration() != null) { + builder.withAuth(endpoint.getEndpointConfiguration().getAuthentication()); + } - this.targetSupplier = getTargetSupplier(definition.resourceLoader().uriSupplier(endpoint)); + uriSupplier = definition.resourceLoader().uriSupplier(endpoint); if (httpArgs.getHeaders() != null) { - this.headersMap = + builder.withHeaders( buildMapResolver( definition.application(), httpArgs.getHeaders().getRuntimeExpression(), httpArgs.getHeaders().getHTTPHeaders() != null ? httpArgs.getHeaders().getHTTPHeaders().getAdditionalProperties() - : null); + : null)); } if (httpArgs.getQuery() != null) { - queryMap = + builder.withQueryMap( buildMapResolver( definition.application(), httpArgs.getQuery().getRuntimeExpression(), httpArgs.getQuery().getHTTPQuery() != null ? httpArgs.getQuery().getHTTPQuery().getAdditionalProperties() - : null); + : null)); } - this.requestFunction = - buildRequestSupplier( - httpArgs.getMethod().toUpperCase(), - httpArgs.getBody(), - httpArgs.isRedirect(), - definition.application()); + + builder.withBody(httpArgs.getBody()); + builder.withMethod(httpArgs.getMethod().toUpperCase()); + builder.redirect(httpArgs.isRedirect()); + builder.timeout( + WorkflowUtils.getTaskTimeout(definition.application(), definition.workflow(), task)); } @Override public boolean accept(Class clazz) { return clazz.equals(CallHTTP.class); } + + @Override + public CallableTask build() { + return builder.build(uriSupplier); + } } diff --git a/impl/http/src/main/java/io/serverlessworkflow/impl/executors/http/HttpClientResolver.java b/impl/http/src/main/java/io/serverlessworkflow/impl/executors/http/HttpClientResolver.java index 00c165cf..e161008d 100644 --- a/impl/http/src/main/java/io/serverlessworkflow/impl/executors/http/HttpClientResolver.java +++ b/impl/http/src/main/java/io/serverlessworkflow/impl/executors/http/HttpClientResolver.java @@ -16,9 +16,12 @@ package io.serverlessworkflow.impl.executors.http; import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowContext; import jakarta.ws.rs.client.Client; import jakarta.ws.rs.client.ClientBuilder; +import java.time.Duration; +import java.util.Optional; public class HttpClientResolver { @@ -29,10 +32,16 @@ private static class DefaultHolder { } public static Client client(WorkflowContext workflowContext, TaskContext taskContext) { - return workflowContext - .definition() - .application() - .additionalObject(HTTP_CLIENT_PROVIDER, workflowContext, taskContext) + return client(workflowContext, taskContext, false, Optional.empty()); + } + + public static Client client( + WorkflowContext workflowContext, + TaskContext taskContext, + boolean redirect, + Optional timeout) { + WorkflowApplication appl = workflowContext.definition().application(); + return appl.additionalObject(HTTP_CLIENT_PROVIDER, workflowContext, taskContext) .orElseGet(() -> DefaultHolder.client); } diff --git a/impl/http/src/main/java/io/serverlessworkflow/impl/executors/http/HttpExecutorBuilder.java b/impl/http/src/main/java/io/serverlessworkflow/impl/executors/http/HttpExecutorBuilder.java index 814bb2bb..ecebf13e 100644 --- a/impl/http/src/main/java/io/serverlessworkflow/impl/executors/http/HttpExecutorBuilder.java +++ b/impl/http/src/main/java/io/serverlessworkflow/impl/executors/http/HttpExecutorBuilder.java @@ -20,17 +20,24 @@ import io.serverlessworkflow.impl.WorkflowUtils; import io.serverlessworkflow.impl.WorkflowValueResolver; import jakarta.ws.rs.HttpMethod; +import jakarta.ws.rs.client.Invocation; import jakarta.ws.rs.client.WebTarget; import java.net.URI; +import java.time.Duration; import java.util.Map; +import java.util.Optional; -public class HttpExecutorBuilder extends AbstractHttpExecutorBuilder { +public class HttpExecutorBuilder { private final WorkflowDefinition definition; private WorkflowValueResolver pathSupplier; private Object body; private String method = HttpMethod.GET; private boolean redirect; + private Optional> timeout = Optional.empty(); + private WorkflowValueResolver> headersMap; + private WorkflowValueResolver> queryMap; + private Optional authProvider = Optional.empty(); private HttpExecutorBuilder(WorkflowDefinition definition) { this.definition = definition; @@ -79,28 +86,69 @@ public HttpExecutorBuilder redirect(boolean redirect) { return this; } + public HttpExecutorBuilder timeout(Optional> timeout) { + this.timeout = timeout; + return this; + } + public HttpExecutor build(String uri) { return build((w, f, n) -> URI.create(uri)); } public HttpExecutor build(WorkflowValueResolver uriSupplier) { - this.requestFunction = buildRequestSupplier(method, body, redirect, definition.application()); - this.targetSupplier = - pathSupplier == null - ? getTargetSupplier(uriSupplier) - : getTargetSupplier(uriSupplier, pathSupplier); - return build(); + + return new HttpExecutor( + getTargetSupplier(uriSupplier), + Optional.ofNullable(headersMap), + Optional.ofNullable(queryMap), + authProvider, + buildRequestSupplier()); } - private static WorkflowValueResolver getTargetSupplier( - WorkflowValueResolver uriSupplier, WorkflowValueResolver pathSupplier) { - return (w, t, n) -> - HttpClientResolver.client(w, t) - .target( - WorkflowUtils.concatURI(uriSupplier.apply(w, t, n), pathSupplier.apply(w, t, n))); + private WorkflowValueResolver getTargetSupplier( + WorkflowValueResolver uriSupplier) { + return pathSupplier == null + ? (w, t, n) -> + HttpClientResolver.client(w, t, redirect, timeout.map(v -> v.apply(w, t, n))) + .target(uriSupplier.apply(w, t, n)) + : (w, t, n) -> + HttpClientResolver.client(w, t, redirect, timeout.map(v -> v.apply(w, t, n))) + .target( + WorkflowUtils.concatURI( + uriSupplier.apply(w, t, n), pathSupplier.apply(w, t, n))); } public static HttpExecutorBuilder builder(WorkflowDefinition definition) { return new HttpExecutorBuilder(definition); } + + private RequestSupplier buildRequestSupplier() { + switch (method.toUpperCase()) { + case HttpMethod.POST: + return new WithBodyRequestSupplier( + Invocation.Builder::post, definition.application(), body, redirect); + case HttpMethod.PUT: + return new WithBodyRequestSupplier( + Invocation.Builder::put, definition.application(), body, redirect); + case HttpMethod.DELETE: + return new WithoutBodyRequestSupplier( + Invocation.Builder::delete, definition.application(), redirect); + case HttpMethod.HEAD: + return new WithoutBodyRequestSupplier( + Invocation.Builder::head, definition.application(), redirect); + case HttpMethod.PATCH: + return new WithBodyRequestSupplier( + (request, entity) -> request.method("PATCH", entity), + definition.application(), + body, + redirect); + case HttpMethod.OPTIONS: + return new WithoutBodyRequestSupplier( + Invocation.Builder::options, definition.application(), redirect); + case HttpMethod.GET: + default: + return new WithoutBodyRequestSupplier( + Invocation.Builder::get, definition.application(), redirect); + } + } }