diff --git a/docs/layouts/shortcodes/generated/all_jobmanager_section.html b/docs/layouts/shortcodes/generated/all_jobmanager_section.html
index d16e2058cc6ec..eab9ebcc37275 100644
--- a/docs/layouts/shortcodes/generated/all_jobmanager_section.html
+++ b/docs/layouts/shortcodes/generated/all_jobmanager_section.html
@@ -158,6 +158,12 @@
Enum |
Determines which scheduler implementation is used to schedule tasks. If this option is not explicitly set, batch jobs will use the 'AdaptiveBatch' scheduler as the default, while streaming jobs will default to the 'Default' scheduler.
Possible values:- "Default": Default scheduler
- "Adaptive": Adaptive scheduler. More details can be found here.
- "AdaptiveBatch": Adaptive batch scheduler. More details can be found here.
|
+
+ web.adaptive-scheduler.rescale-history.size |
+ 0 |
+ Integer |
+ The maximum number of the rescale records per job whose scheduler is AdaptiveScheduler. The feature will be disabled when the configuration value is smaller or equals to 0. |
+
web.exception-history-size |
16 |
diff --git a/docs/layouts/shortcodes/generated/web_configuration.html b/docs/layouts/shortcodes/generated/web_configuration.html
index faeb45e392d7c..3c9f1809c2f75 100644
--- a/docs/layouts/shortcodes/generated/web_configuration.html
+++ b/docs/layouts/shortcodes/generated/web_configuration.html
@@ -14,6 +14,12 @@
String |
Access-Control-Allow-Origin header for all responses from the web-frontend. |
+
+ web.adaptive-scheduler.rescale-history.size |
+ 0 |
+ Integer |
+ The maximum number of the rescale records per job whose scheduler is AdaptiveScheduler. The feature will be disabled when the configuration value is smaller or equals to 0. |
+
web.cancel.enable |
true |
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java
index e53de835f1baa..c20330534f8ce 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java
@@ -141,6 +141,20 @@ public class WebOptions {
.withDescription(
"The maximum number of failures collected by the exception history per job.");
+ /** The maximum number of the adaptive scheduler rescale history. */
+ @Documentation.Section(Documentation.Sections.ALL_JOB_MANAGER)
+ public static final ConfigOption MAX_ADAPTIVE_SCHEDULER_RESCALE_HISTORY_SIZE =
+ key("web.adaptive-scheduler.rescale-history.size")
+ .intType()
+ .defaultValue(0)
+ .withDescription(
+ Description.builder()
+ .text(
+ "The maximum number of the rescale records per job whose scheduler is %s. "
+ + "The feature will be disabled when the configuration value is smaller or equals to 0.",
+ code("AdaptiveScheduler"))
+ .build());
+
/** Timeout for asynchronous operations by the web monitor in milliseconds. */
public static final ConfigOption TIMEOUT =
key("web.timeout")
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index 52f88b1b66aa4..4ac41ec5d8fc2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -114,6 +114,12 @@
import org.apache.flink.runtime.scheduler.adaptive.allocator.ReservedSlots;
import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAllocator;
import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.DefaultRescaleTimeline;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.Rescale;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.RescaleTimeline;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.TerminalState;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.TerminatedReason;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.TriggerCause;
import org.apache.flink.runtime.scheduler.adaptivebatch.NonAdaptiveExecutionPlanSchedulingContext;
import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry;
import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
@@ -161,6 +167,7 @@
import static org.apache.flink.configuration.TraceOptions.CHECKPOINT_SPAN_DETAIL_LEVEL;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphUtils.isAnyOutputBlocking;
import static org.apache.flink.runtime.scheduler.adaptive.ForwardEdgesAdapter.copyJobGraphWithAdaptedForwardEdges;
+import static org.apache.flink.runtime.scheduler.adaptive.timeline.RescaleTimeline.NoOpRescaleTimeline;
/**
* A {@link SchedulerNG} implementation that uses the declarative resource management and
@@ -304,7 +311,8 @@ public static Settings of(
configuration.get(
SCHEDULER_RESCALE_TRIGGER_MAX_DELAY,
maximumDelayForRescaleTriggerDefault),
- rescaleOnFailedCheckpointsCount);
+ rescaleOnFailedCheckpointsCount,
+ configuration.get(WebOptions.MAX_ADAPTIVE_SCHEDULER_RESCALE_HISTORY_SIZE));
}
private final SchedulerExecutionMode executionMode;
@@ -315,6 +323,7 @@ public static Settings of(
private final Duration executingResourceStabilizationTimeout;
private final Duration maximumDelayForTriggeringRescale;
private final int rescaleOnFailedCheckpointCount;
+ private final int rescaleHistoryMax;
private Settings(
SchedulerExecutionMode executionMode,
@@ -324,7 +333,8 @@ private Settings(
Duration executingCooldownTimeout,
Duration executingResourceStabilizationTimeout,
Duration maximumDelayForTriggeringRescale,
- int rescaleOnFailedCheckpointCount) {
+ int rescaleOnFailedCheckpointCount,
+ int rescaleHistoryMax) {
this.executionMode = executionMode;
this.submissionResourceWaitTimeout = submissionResourceWaitTimeout;
this.submissionResourceStabilizationTimeout = submissionResourceStabilizationTimeout;
@@ -333,6 +343,7 @@ private Settings(
this.executingResourceStabilizationTimeout = executingResourceStabilizationTimeout;
this.maximumDelayForTriggeringRescale = maximumDelayForTriggeringRescale;
this.rescaleOnFailedCheckpointCount = rescaleOnFailedCheckpointCount;
+ this.rescaleHistoryMax = rescaleHistoryMax;
}
public SchedulerExecutionMode getExecutionMode() {
@@ -366,6 +377,10 @@ public Duration getMaximumDelayForTriggeringRescale() {
public int getRescaleOnFailedCheckpointCount() {
return rescaleOnFailedCheckpointCount;
}
+
+ public int getRescaleHistoryMax() {
+ return rescaleHistoryMax;
+ }
}
private final Settings settings;
@@ -430,6 +445,8 @@ public int getRescaleOnFailedCheckpointCount() {
private final Supplier clock = Instant::now;
+ private final RescaleTimeline rescaleTimeline;
+
public AdaptiveScheduler(
Settings settings,
JobGraph jobGraph,
@@ -523,6 +540,12 @@ public AdaptiveScheduler(
this.initialParallelismStore = vertexParallelismStore;
this.jobInformation = new JobGraphJobInformation(jobGraph, vertexParallelismStore);
+ int rescaleHistoryMax = settings.getRescaleHistoryMax();
+ this.rescaleTimeline =
+ rescaleHistoryMax <= 0
+ ? NoOpRescaleTimeline.INSTANCE
+ : new DefaultRescaleTimeline(() -> jobInformation, rescaleHistoryMax);
+
this.declarativeSlotPool = declarativeSlotPool;
this.initializationTimestamp = initializationTimestamp;
this.ioExecutor = ioExecutor;
@@ -707,6 +730,11 @@ static VertexParallelismStore computeVertexParallelismStoreForExecution(
jobGraph.getVertices(), defaultMaxParallelismFunc);
}
+ @Override
+ public RescaleTimeline getRescaleTimeline() {
+ return rescaleTimeline;
+ }
+
private void newResourcesAvailable(Collection extends PhysicalSlot> physicalSlots) {
state.tryRun(
ResourceListener.class,
@@ -1093,6 +1121,7 @@ public void updateJobResourceRequirements(JobResourceRequirements jobResourceReq
if (maybeUpdateVertexParallelismStore.isPresent()) {
this.jobInformation =
new JobGraphJobInformation(jobGraph, maybeUpdateVertexParallelismStore.get());
+ driveRescaleTimelineByNewResourceRequirements();
declareDesiredResources();
state.tryRun(
ResourceListener.class,
@@ -1101,6 +1130,27 @@ public void updateJobResourceRequirements(JobResourceRequirements jobResourceReq
}
}
+ private void driveRescaleTimelineByNewResourceRequirements() {
+ rescaleTimeline.updateRescale(
+ rescale ->
+ rescale.addSchedulerState(state)
+ .setTerminatedReason(TerminatedReason.RESOURCE_REQUIREMENTS_UPDATED)
+ .setEndTimestamp(Instant.now().toEpochMilli())
+ .log());
+ rescaleTimeline.newRescale(true);
+ rescaleTimeline.updateRescale(
+ rescale ->
+ rescale.setStartTimestamp(Instant.now().toEpochMilli())
+ .setDesiredVertexParallelism(jobInformation)
+ .setTriggerCause(TriggerCause.UPDATE_REQUIREMENT)
+ .setDesiredSlots(jobInformation)
+ .setMinimalRequiredSlots(jobInformation)
+ .setPreRescaleSlotsAndParallelisms(
+ jobInformation,
+ rescaleTimeline.latestRescale(TerminalState.COMPLETED))
+ .log());
+ }
+
// ----------------------------------------------------------------
@Override
@@ -1258,6 +1308,15 @@ public void goToCanceling(
OperatorCoordinatorHandler operatorCoordinatorHandler,
List failureCollection) {
+ if (!rescaleTimeline.isIdling()) {
+ rescaleTimeline.updateRescale(
+ rescale ->
+ rescale.addSchedulerState(state)
+ .setEndTimestamp(Instant.now().toEpochMilli())
+ .setTerminatedReason(TerminatedReason.JOB_CANCELED)
+ .log());
+ }
+
transitionToState(
new Canceling.Factory(
this,
@@ -1278,6 +1337,8 @@ public void goToRestarting(
@Nullable VertexParallelism restartWithParallelism,
List failureCollection) {
+ driveRescaleTimelineByJobRestarting(restartWithParallelism);
+
for (ExecutionVertex executionVertex : executionGraph.getAllExecutionVertices()) {
final int attemptNumber =
executionVertex.getCurrentExecutionAttempt().getAttemptNumber();
@@ -1306,6 +1367,39 @@ public void goToRestarting(
}
}
+ private void driveRescaleTimelineByJobRestarting(VertexParallelism restartWithParallelism) {
+ if (restartWithParallelism == null) {
+ // For the failover restarting.
+ if (!rescaleTimeline.isIdling()) {
+ // Process by https://lists.apache.org/thread/hh7w2p6lnmbo1q6d9ngkttdyrw4lp74h.
+ LOG.info(
+ "Merge the current non-terminated rescale and the new rescale triggered by recoverable failover into the current rescale.");
+ rescaleTimeline.updateRescale(Rescale::clearSchedulerStates);
+ } else if (rescaleTimeline.isIdling()) {
+ rescaleTimeline.newRescale(false);
+ }
+ rescaleTimeline.updateRescale(
+ rescale ->
+ rescale.setStartTimestamp(Instant.now().toEpochMilli())
+ .setTriggerCause(TriggerCause.RECOVERABLE_FAILOVER)
+ .setMinimalRequiredSlots(jobInformation)
+ .setPreRescaleSlotsAndParallelisms(
+ jobInformation,
+ rescaleTimeline.latestRescale(TerminalState.COMPLETED))
+ .setDesiredVertexParallelism(jobInformation)
+ .setDesiredSlots(jobInformation)
+ .log());
+ } else {
+ // For the normal rescaling restarting.
+ rescaleTimeline.updateRescale(
+ rescale ->
+ rescale.setMinimalRequiredSlots(jobInformation)
+ .setDesiredVertexParallelism(jobInformation)
+ .setDesiredSlots(jobInformation)
+ .log());
+ }
+ }
+
@Override
public void goToFailing(
ExecutionGraph executionGraph,
@@ -1313,6 +1407,14 @@ public void goToFailing(
OperatorCoordinatorHandler operatorCoordinatorHandler,
Throwable failureCause,
List failureCollection) {
+ rescaleTimeline.updateRescale(
+ rescale ->
+ rescale.setEndTimestamp(Instant.now().toEpochMilli())
+ .addSchedulerState(state, failureCause)
+ .setTerminatedReason(TerminatedReason.JOB_FAILED)
+ .setStringifiedException(
+ ExceptionUtils.stringifyException(failureCause))
+ .log());
transitionToState(
new Failing.Factory(
this,
@@ -1351,6 +1453,12 @@ public CompletableFuture goToStopWithSavepoint(
@Override
public void goToFinished(ArchivedExecutionGraph archivedExecutionGraph) {
+ rescaleTimeline.updateRescale(
+ rescale ->
+ rescale.addSchedulerState(state)
+ .setEndTimestamp(Instant.now().toEpochMilli())
+ .setTerminatedReason(TerminatedReason.JOB_FINISHED)
+ .log());
transitionToState(new Finished.Factory(this, archivedExecutionGraph, LOG));
}
@@ -1620,6 +1728,9 @@ T transitionToState(StateFactory targetState) {
final JobStatus previousJobStatus = state.getJobStatus();
state.onLeave(targetState.getStateClass());
+
+ rescaleTimeline.updateRescale(rescale -> rescale.addSchedulerState(state));
+
T targetStateInstance = targetState.getState();
state = targetStateInstance;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Created.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Created.java
index 041477740746f..a42baf32d6abb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Created.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Created.java
@@ -19,6 +19,9 @@
package org.apache.flink.runtime.scheduler.adaptive;
import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.scheduler.adaptive.allocator.JobInformation;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.RescaleTimeline;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.TriggerCause;
import org.slf4j.Logger;
@@ -39,9 +42,24 @@ public JobStatus getJobStatus() {
/** Starts the scheduling by going into the {@link WaitingForResources} state. */
void startScheduling() {
+ driveRescaleTimelineByInitialScheduling();
context.goToWaitingForResources(null);
}
+ private void driveRescaleTimelineByInitialScheduling() {
+ RescaleTimeline rescaleTimeline = context.getRescaleTimeline();
+ JobInformation jobInformation = rescaleTimeline.getJobInformation();
+ rescaleTimeline.newRescale(true);
+ rescaleTimeline.updateRescale(
+ rescale ->
+ rescale.setTriggerCause(TriggerCause.INITIAL_SCHEDULE)
+ .setStartTimestamp(getDurable().getEnterTimestamp())
+ .setDesiredVertexParallelism(jobInformation)
+ .setDesiredSlots(jobInformation)
+ .setMinimalRequiredSlots(jobInformation)
+ .log());
+ }
+
/** Context of the {@link Created} state. */
interface Context
extends StateWithoutExecutionGraph.Context, StateTransitions.ToWaitingForResources {}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java
index bfdc6d8704a29..16ab459f34e0f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java
@@ -33,6 +33,8 @@
import org.apache.flink.runtime.scheduler.GlobalFailureHandler;
import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.RescaleTimeline;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.TerminatedReason;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
@@ -42,6 +44,7 @@
import javax.annotation.Nullable;
import java.time.Duration;
+import java.time.Instant;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
@@ -97,6 +100,9 @@ private void handleExecutionGraphCreation(
CreatingExecutionGraph.class.getSimpleName(),
Executing.class.getSimpleName(),
throwable);
+
+ driveRescaleTimelineByException(throwable);
+
context.goToFinished(context.getArchivedExecutionGraph(JobStatus.FAILED, throwable));
} else {
for (ExecutionVertex vertex :
@@ -108,6 +114,8 @@ private void handleExecutionGraphCreation(
context.tryToAssignSlots(executionGraphWithVertexParallelism);
if (result.isSuccess()) {
+ driveRescaleTimelineByGraphCreation(executionGraphWithVertexParallelism);
+
getLogger()
.debug(
"Successfully reserved and assigned the required slots for the ExecutionGraph.");
@@ -155,6 +163,31 @@ private void handleExecutionGraphCreation(
}
}
+ private void driveRescaleTimelineByGraphCreation(
+ ExecutionGraphWithVertexParallelism executionGraphWithVertexParallelism) {
+ RescaleTimeline rescaleTimeline = context.getRescaleTimeline();
+ rescaleTimeline.updateRescale(
+ rescale ->
+ rescale.setPostRescaleVertexParallelism(
+ rescaleTimeline.getJobInformation(),
+ executionGraphWithVertexParallelism.getVertexParallelism())
+ .setPostRescaleSlots(
+ executionGraphWithVertexParallelism.jobSchedulingPlan
+ .getSlotAssignments())
+ .log());
+ }
+
+ private void driveRescaleTimelineByException(Throwable throwable) {
+ final long epochMilli = Instant.now().toEpochMilli();
+ context.getRescaleTimeline()
+ .updateRescale(
+ rescale ->
+ rescale.setTerminatedReason(TerminatedReason.EXCEPTION_OCCURRED)
+ .setEndTimestamp(epochMilli)
+ .addSchedulerState(this, throwable)
+ .log());
+ }
+
@Override
public JobStatus getJobStatus() {
return JobStatus.CREATED;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager.java
index 5435873be4e62..98800858fd506 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager.java
@@ -20,6 +20,11 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.scheduler.adaptive.allocator.JobInformation;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.RescaleTimeline;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.TerminalState;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.TerminatedReason;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.TriggerCause;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
@@ -29,6 +34,7 @@
import javax.annotation.concurrent.NotThreadSafe;
import java.time.Duration;
+import java.time.Instant;
import java.time.temporal.Temporal;
import java.util.ArrayList;
import java.util.List;
@@ -108,10 +114,18 @@ public class DefaultStateTransitionManager implements StateTransitionManager {
Preconditions.checkNotNull(cooldownTimeout));
}
+ State schedulerState() {
+ return transitionContext.schedulerstate();
+ }
+
+ RescaleTimeline getRescaleTimeline() {
+ return transitionContext.getRescaleTimeline();
+ }
+
@Override
- public void onChange() {
+ public void onChange(boolean newResourceDriven) {
LOG.debug("OnChange event received in phase {} for job {}.", getPhase(), getJobId());
- phase.onChange();
+ phase.onChange(newResourceDriven);
}
@Override
@@ -231,7 +245,7 @@ boolean hasSufficientResources() {
return context.transitionContext.hasSufficientResources();
}
- void onChange() {}
+ void onChange(boolean newResourceDriven) {}
void onTrigger() {}
@@ -266,9 +280,10 @@ private Cooldown(
}
@Override
- void onChange() {
+ void onChange(boolean newResourceDriven) {
if (hasSufficientResources() && firstChangeEventTimestamp == null) {
firstChangeEventTimestamp = now();
+ driveRescaleTimelineByNewResourcesAvailable(context(), newResourceDriven);
}
}
@@ -292,14 +307,30 @@ static final class Idling extends Phase {
private Idling(Supplier clock, DefaultStateTransitionManager context) {
super(clock, context);
+ driveRescaleTimelineByNoResourcesOrParallelismChange();
}
@Override
- void onChange() {
+ void onChange(boolean newResourceDriven) {
if (hasSufficientResources()) {
+ driveRescaleTimelineByNewResourcesAvailable(context(), newResourceDriven);
context().progressToStabilizing(now());
}
}
+
+ private void driveRescaleTimelineByNoResourcesOrParallelismChange() {
+ if (context().transitionContext.schedulerstate() instanceof Executing) {
+ context()
+ .getRescaleTimeline()
+ .updateRescale(
+ rescale ->
+ rescale.setTerminatedReason(
+ TerminatedReason
+ .NO_RESOURCES_OR_PARALLELISMS_CHANGE)
+ .setEndTimestamp(Instant.now().toEpochMilli())
+ .addSchedulerState(context().schedulerState()));
+ }
+ }
}
/**
@@ -333,7 +364,7 @@ private Stabilizing(
}
@Override
- void onChange() {
+ void onChange(boolean newResourceDriven) {
// schedule another desired-resource evaluation in scenarios where the previous change
// event was already handled by a onTrigger callback with a no-op
onChangeEventTimestamp = now();
@@ -424,4 +455,27 @@ private Transitioning(Supplier clock, DefaultStateTransitionManager co
super(clock, context);
}
}
+
+ static void driveRescaleTimelineByNewResourcesAvailable(
+ DefaultStateTransitionManager context, boolean newResourceDriven) {
+ if (context.transitionContext.schedulerstate() instanceof Executing && newResourceDriven) {
+ RescaleTimeline rescaleTimeline = context.getRescaleTimeline();
+ if (rescaleTimeline.isIdling()) {
+ JobInformation jobInformation = rescaleTimeline.getJobInformation();
+ rescaleTimeline.newRescale(false);
+ rescaleTimeline.updateRescale(
+ rescale ->
+ rescale.setDesiredSlots(jobInformation)
+ .setDesiredVertexParallelism(jobInformation)
+ .setMinimalRequiredSlots(jobInformation)
+ .setTriggerCause(TriggerCause.NEW_RESOURCE_AVAILABLE)
+ .setStartTimestamp(Instant.now().toEpochMilli())
+ .setPreRescaleSlotsAndParallelisms(
+ jobInformation,
+ rescaleTimeline.latestRescale(
+ TerminalState.COMPLETED))
+ .log());
+ }
+ }
+ }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
index 13848e7aba3e0..a50b19d696ee5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
@@ -34,8 +34,11 @@
import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.RescaleTimeline;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.TerminatedReason;
import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry;
import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationManager;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
@@ -43,6 +46,7 @@
import javax.annotation.Nullable;
import java.time.Duration;
+import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -95,18 +99,40 @@ class Executing extends StateWithExecutionGraph
this.rescaleOnFailedCheckpointCount = rescaleOnFailedCheckpointCount;
this.failedCheckpointCountdown = null;
+ driveRescaleTimelineByJobExecuting(context);
+
deploy();
// check if new resources have come available in the meantime
context.runIfState(
this,
() -> {
- stateTransitionManager.onChange();
+ stateTransitionManager.onChange(true);
stateTransitionManager.onTrigger();
},
Duration.ZERO);
}
+ private void driveRescaleTimelineByJobExecuting(Context context) {
+ context.getRescaleTimeline()
+ .updateRescale(
+ rescale ->
+ rescale.addSchedulerState(this)
+ .setTerminatedReason(TerminatedReason.SUCCEEDED)
+ .setEndTimestamp(Instant.now().toEpochMilli())
+ .log());
+ }
+
+ @Override
+ public State schedulerstate() {
+ return this;
+ }
+
+ @Override
+ public RescaleTimeline getRescaleTimeline() {
+ return context.getRescaleTimeline();
+ }
+
@Override
public boolean hasSufficientResources() {
return parallelismChanged() && context.hasSufficientResources();
@@ -149,19 +175,31 @@ public ScheduledFuture> scheduleOperation(Runnable callback, Duration delay) {
@Override
public void transitionToSubsequentState() {
+ Optional availableVertexParallelism =
+ context.getAvailableVertexParallelism();
+ if (!availableVertexParallelism.isPresent()) {
+ IllegalStateException exception =
+ new IllegalStateException("Resources must be available when rescaling.");
+ driveRescaleTimelineByNoResourcesEnough(exception);
+ throw exception;
+ }
context.goToRestarting(
getExecutionGraph(),
getExecutionGraphHandler(),
getOperatorCoordinatorHandler(),
Duration.ofMillis(0L),
- context.getAvailableVertexParallelism()
- .orElseThrow(
- () ->
- new IllegalStateException(
- "Resources must be available when rescaling.")),
+ availableVertexParallelism.get(),
getFailures());
}
+ private void driveRescaleTimelineByNoResourcesEnough(IllegalStateException exception) {
+ context.getRescaleTimeline()
+ .updateRescale(
+ rescale ->
+ rescale.setStringifiedException(
+ ExceptionUtils.stringifyException(exception)));
+ }
+
@Override
public JobStatus getJobStatus() {
return JobStatus.RUNNING;
@@ -219,13 +257,13 @@ private void handleDeploymentFailure(ExecutionVertex executionVertex, JobExcepti
@Override
public void onNewResourcesAvailable() {
- stateTransitionManager.onChange();
+ stateTransitionManager.onChange(true);
initializeFailedCheckpointCountdownIfUnset();
}
@Override
public void onNewResourceRequirements() {
- stateTransitionManager.onChange();
+ stateTransitionManager.onChange(false);
initializeFailedCheckpointCountdownIfUnset();
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitionManager.java
index 5b6f78ab82ca9..a6e96752656cf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitionManager.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.scheduler.adaptive;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.RescaleTimeline;
import java.time.Duration;
import java.util.concurrent.ScheduledFuture;
@@ -31,8 +32,17 @@ public interface StateTransitionManager {
/**
* Is called if the environment changed in a way that a state transition could be considered.
+ *
+ * @param newResourceDriven Whether the onchange is driven by new resources.
*/
- void onChange();
+ void onChange(boolean newResourceDriven);
+
+ /**
+ * Is called if the environment changed in a way that a state transition could be considered.
+ */
+ default void onChange() {
+ onChange(false);
+ }
/**
* Is called when any previous observed environment changes shall be verified possibly
@@ -49,6 +59,10 @@ default void close() {}
*/
interface Context {
+ State schedulerstate();
+
+ RescaleTimeline getRescaleTimeline();
+
/**
* Returns {@code true} if the available resources are sufficient enough for a state
* transition; otherwise {@code false}.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
index e5cc1e7cdd8d9..03f37dfb98df8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
@@ -56,6 +56,7 @@
import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
import org.apache.flink.runtime.scheduler.VertexEndOfDataListener;
import org.apache.flink.runtime.scheduler.adaptive.timeline.Durable;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.RescaleTimeline;
import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry;
import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationManager;
@@ -487,5 +488,14 @@ interface Context extends StateTransitions.ToFinished {
/** Archive failure. */
void archiveFailure(RootExceptionHistoryEntry failure);
+
+ /**
+ * Get the rescale timeline.
+ *
+ * @return The rescale timeline handler.
+ */
+ default RescaleTimeline getRescaleTimeline() {
+ throw new UnsupportedOperationException();
+ }
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithoutExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithoutExecutionGraph.java
index bb3b8381d77b5..ebfd13a82e477 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithoutExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithoutExecutionGraph.java
@@ -22,6 +22,7 @@
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.scheduler.adaptive.timeline.Durable;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.RescaleTimeline;
import org.slf4j.Logger;
@@ -104,5 +105,14 @@ interface Context extends StateTransitions.ToFinished {
*/
ArchivedExecutionGraph getArchivedExecutionGraph(
JobStatus jobStatus, @Nullable Throwable cause);
+
+ /**
+ * Get the rescale timeline.
+ *
+ * @return The rescale timeline handler.
+ */
+ default RescaleTimeline getRescaleTimeline() {
+ throw new UnsupportedOperationException();
+ }
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java
index c5bea458f8a2c..6a11518a31018 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java
@@ -21,6 +21,7 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.RescaleTimeline;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
@@ -76,6 +77,16 @@ class WaitingForResources extends StateWithoutExecutionGraph
context.runIfState(this, this::checkPotentialStateTransition, Duration.ZERO);
}
+ @Override
+ public State schedulerstate() {
+ return this;
+ }
+
+ @Override
+ public RescaleTimeline getRescaleTimeline() {
+ return context.getRescaleTimeline();
+ }
+
@Override
public void onLeave(Class extends State> newState) {
if (resourceTimeoutFuture != null) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/DefaultRescaleTimeline.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/DefaultRescaleTimeline.java
new file mode 100644
index 0000000000000..0a3b8c202d5de
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/DefaultRescaleTimeline.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.timeline;
+
+import org.apache.flink.runtime.scheduler.adaptive.allocator.JobInformation;
+import org.apache.flink.runtime.util.BoundedFIFOQueue;
+import org.apache.flink.util.AbstractID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Supplier;
+
+/** Default implementation of {@link RescaleTimeline}. */
+public class DefaultRescaleTimeline implements RescaleTimeline {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultRescaleTimeline.class);
+
+ private final Supplier jobInformationGetter;
+
+ private RescaleIdInfo rescaleIdInfo;
+
+ @Nullable private Rescale currentRescale;
+
+ private final BoundedFIFOQueue rescaleHistory;
+
+ private final Map latestRescales;
+
+ private final RescalesSummary rescalesSummary;
+
+ public DefaultRescaleTimeline(
+ Supplier jobInformationGetter, int maxHistorySize) {
+ this.jobInformationGetter = jobInformationGetter;
+ this.rescaleIdInfo = new RescaleIdInfo(new AbstractID(), 0L);
+ this.latestRescales = new ConcurrentHashMap<>(TerminalState.values().length);
+ this.rescaleHistory = new BoundedFIFOQueue<>(maxHistorySize);
+ this.rescalesSummary = new RescalesSummary(maxHistorySize);
+ }
+
+ @Nullable
+ @Override
+ public Rescale latestRescale(TerminalState terminalState) {
+ return latestRescales.get(terminalState);
+ }
+
+ @Nullable
+ @Override
+ public JobInformation getJobInformation() {
+ return jobInformationGetter.get();
+ }
+
+ @Override
+ public boolean isIdling() {
+ return currentRescale == null || currentRescale.isTerminated();
+ }
+
+ @Override
+ public boolean newRescale(boolean newRescaleEpoch) {
+ rollingLatestRescale();
+ if (!isIdling()) {
+ String hintMsg =
+ String.format("Rescale %s with unexpected terminal state.", currentRescale);
+ LOG.warn(hintMsg);
+ throw new IllegalStateException(hintMsg);
+ }
+ currentRescale = new Rescale(nextRescaleId(newRescaleEpoch));
+ rescaleHistory.add(currentRescale);
+ rescalesSummary.addInProgress(currentRescale);
+ return true;
+ }
+
+ @Override
+ public boolean updateRescale(RescaleUpdater rescaleUpdater) {
+ if (!isIdling() && Objects.nonNull(rescaleUpdater)) {
+ rescaleUpdater.update(currentRescale);
+ rollingLatestRescale();
+ if (Rescale.isTerminated(currentRescale)) {
+ rescalesSummary.addTerminated(currentRescale);
+ }
+ return true;
+ } else {
+ if (isIdling()) {
+ LOG.warn(
+ "Current rescale {} is null or terminated, so the update action is ignored.",
+ currentRescale);
+ return false;
+ }
+ if (rescaleUpdater == null) {
+ LOG.warn(
+ "The rescale updater is null for {}, so the null update action is ignored.",
+ currentRescale);
+ return false;
+ }
+ }
+ return false;
+ }
+
+ @Nullable
+ Rescale currentRescale() {
+ return currentRescale;
+ }
+
+ private RescaleIdInfo nextRescaleId(boolean newRescaleEpoch) {
+ if (newRescaleEpoch) {
+ rescaleIdInfo = new RescaleIdInfo(new AbstractID(), 1L);
+ } else {
+ rescaleIdInfo =
+ new RescaleIdInfo(
+ rescaleIdInfo.getResourceRequirementsId(),
+ rescaleIdInfo.getRescaleAttemptId() + 1L);
+ }
+ return rescaleIdInfo;
+ }
+
+ /** Rolling the last rescale for the specified status. */
+ private void rollingLatestRescale() {
+ if (Rescale.isTerminated(currentRescale)) {
+ latestRescales.put(currentRescale.getTerminalState(), currentRescale);
+ } else {
+ LOG.warn(
+ "Rescale {} is not terminated now, the rolling action is ignored.",
+ currentRescale);
+ }
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/Rescale.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/Rescale.java
index 2fc3ef21b486a..6bb4f3ae1028e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/Rescale.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/Rescale.java
@@ -195,12 +195,21 @@ public TerminalState getTerminalState() {
return terminalState;
}
+ @Nullable
+ public TerminatedReason getTerminatedReason() {
+ return terminatedReason;
+ }
+
+ public RescaleIdInfo getRescaleIdInfo() {
+ return rescaleIdInfo;
+ }
+
@Nullable
public String getStringifiedException() {
return stringifiedException;
}
- private boolean isTerminated() {
+ boolean isTerminated() {
return terminalState != null;
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleTimeline.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleTimeline.java
new file mode 100644
index 0000000000000..78147ed3a7df0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleTimeline.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.timeline;
+
+import org.apache.flink.runtime.scheduler.adaptive.allocator.JobInformation;
+
+import javax.annotation.Nullable;
+
+/**
+ * The rescale timeline information updating interface. When rescale history is enabled for a job,
+ * this class is used to perform fast operations on rescaling and to keep historical records. That
+ * is, a container used to record changes in rescaling.
+ */
+public interface RescaleTimeline {
+
+ /** Get the job information of current job. */
+ @Nullable
+ JobInformation getJobInformation();
+
+ /**
+ * Judge whether the current rescale in timeline is terminated or null, which represent no
+ * rescales in the current phase.
+ *
+ * @return ture if there are no rescales to perform. false else.
+ */
+ boolean isIdling();
+
+ /**
+ * Create a new rescale and assign it as current rescale. Note, the {@link #isIdling()} must be
+ * true when creating a new current rescale. When there's a rescale in transition phase, we must
+ * seal the current resale before creating a new rescale.
+ *
+ * @param newRescaleEpoch It represents whether the rescale resource requirements is in the new
+ * epoch.
+ * @return ture if create successfully false else.
+ */
+ boolean newRescale(boolean newRescaleEpoch);
+
+ /** Get the latest rescale for the specified terminal state. */
+ @Nullable
+ Rescale latestRescale(TerminalState terminalState);
+
+ /**
+ * Update the current rescale. It only makes sense to update a rescale when there is an ongoing
+ * rescale that is in the process of transition states.
+ *
+ * @param rescaleUpdater The action to update the current rescale.
+ * @return ture if update successfully false else.
+ */
+ boolean updateRescale(RescaleUpdater rescaleUpdater);
+
+ /** Rescale operation interface. */
+ interface RescaleUpdater {
+ void update(Rescale rescaleToUpdate);
+ }
+
+ /** Default implementation of {@link RescaleTimeline} without any actions. */
+ enum NoOpRescaleTimeline implements RescaleTimeline {
+ INSTANCE;
+
+ @Override
+ public boolean newRescale(boolean newRescaleEpoch) {
+ return false;
+ }
+
+ @Override
+ public boolean updateRescale(RescaleUpdater rescaleUpdater) {
+ return false;
+ }
+
+ @Nullable
+ @Override
+ public Rescale latestRescale(TerminalState terminalState) {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ public JobInformation getJobInformation() {
+ return null;
+ }
+
+ @Override
+ public boolean isIdling() {
+ return false;
+ }
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesSummary.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesSummary.java
new file mode 100644
index 0000000000000..6d1e2a567cb67
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesSummary.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.timeline;
+
+import org.apache.flink.runtime.util.stats.StatsSummary;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+/** Statistics summary of rescales. */
+public class RescalesSummary implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(RescalesSummary.class);
+
+ private final StatsSummary allTerminatedSummary;
+ private final StatsSummary completedRescalesSummary;
+ private final StatsSummary ignoredRescalesSummary;
+ private final StatsSummary failedRescalesSummary;
+
+ // Total terminated count
+ private long totalRescalesCount = 0L;
+ private long inProgressRescalesCount = 0L;
+
+ public RescalesSummary(int maxHistorySize) {
+ this.allTerminatedSummary = new StatsSummary(maxHistorySize);
+ this.completedRescalesSummary = new StatsSummary(maxHistorySize);
+ this.ignoredRescalesSummary = new StatsSummary(maxHistorySize);
+ this.failedRescalesSummary = new StatsSummary(maxHistorySize);
+ }
+
+ /**
+ * Add a terminated rescale in. Note, The method could be called after calling {@link
+ * #addInProgress(Rescale)}.
+ *
+ * @param rescale the target terminated rescale.
+ */
+ public void addTerminated(Rescale rescale) {
+ if (!Rescale.isTerminated(rescale)) {
+ LOG.warn(
+ "Unexpected rescale: {}, which will be ignored when computing statistics.",
+ rescale);
+ return;
+ }
+
+ this.allTerminatedSummary.add(rescale.getDuration().toMillis());
+ this.inProgressRescalesCount = 0;
+
+ if (rescale.getTerminalState() == null) {
+ return;
+ }
+
+ switch (rescale.getTerminalState()) {
+ case FAILED:
+ failedRescalesSummary.add(rescale.getDuration().toMillis());
+ break;
+ case COMPLETED:
+ completedRescalesSummary.add(rescale.getDuration().toMillis());
+ break;
+ case IGNORED:
+ ignoredRescalesSummary.add(rescale.getDuration().toMillis());
+ break;
+ default:
+ break;
+ }
+ }
+
+ /**
+ * Add an in-progress rescale in. Note, The method could be called before {@link
+ * #addTerminated(Rescale)}.
+ *
+ * @param rescale the target non-terminated rescale.
+ */
+ public void addInProgress(Rescale rescale) {
+ if (Rescale.isTerminated(rescale)) {
+ LOG.warn("Unexpected rescale: {}, which will be ignored.", rescale);
+ } else {
+ inProgressRescalesCount++;
+ totalRescalesCount++;
+ }
+ }
+
+ public long getTotalRescalesCount() {
+ return totalRescalesCount;
+ }
+
+ public long getInProgressRescalesCount() {
+ return inProgressRescalesCount;
+ }
+
+ public long getCompletedRescalesCount() {
+ return completedRescalesSummary.getCount();
+ }
+
+ public long getIgnoredRescalesCount() {
+ return ignoredRescalesSummary.getCount();
+ }
+
+ public long getFailedRescalesCount() {
+ return failedRescalesSummary.getCount();
+ }
+
+ public StatsSummary getAllTerminatedSummary() {
+ return allTerminatedSummary;
+ }
+
+ public StatsSummary getCompletedRescalesSummary() {
+ return completedRescalesSummary;
+ }
+
+ public StatsSummary getIgnoredRescalesSummary() {
+ return ignoredRescalesSummary;
+ }
+
+ public StatsSummary getFailedRescalesSummary() {
+ return failedRescalesSummary;
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index d5c312c4dfe07..95815a112933c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -100,6 +100,7 @@
import org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlot;
import org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlotAllocator;
import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.Durable;
import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry;
import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
import org.apache.flink.runtime.slots.ResourceRequirement;
@@ -2699,6 +2700,22 @@ public DummyState(StateWithoutExecutionGraph.Context context, JobStatus jobStatu
this.jobStatus = jobStatus;
}
+ @Override
+ public Durable getDurable() {
+ return null;
+ }
+
+ @Override
+ public void cancel() {}
+
+ @Override
+ public void suspend(Throwable cause) {}
+
+ @Override
+ public JobID getJobId() {
+ return null;
+ }
+
@Override
public JobStatus getJobStatus() {
return jobStatus;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManagerTest.java
index 885850f9b74dd..c05a441b7ab67 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManagerTest.java
@@ -21,6 +21,7 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.testutils.ScheduledTask;
import org.apache.flink.runtime.scheduler.adaptive.DefaultStateTransitionManager.Idling;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.RescaleTimeline;
import org.apache.flink.util.Preconditions;
import org.junit.jupiter.api.Test;
@@ -511,6 +512,16 @@ public TestingStateTransitionManagerContext withSufficientResources() {
// StateTransitionManager.Context interface methods
// ///////////////////////////////////////////////
+ @Override
+ public State schedulerstate() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public RescaleTimeline getRescaleTimeline() {
+ return RescaleTimeline.NoOpRescaleTimeline.INSTANCE;
+ }
+
@Override
public boolean hasSufficientResources() {
return this.hasSufficientResources;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/TestingStateTransitionManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/TestingStateTransitionManager.java
index ee3b35dda2bc4..f0316401ad0de 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/TestingStateTransitionManager.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/TestingStateTransitionManager.java
@@ -41,6 +41,9 @@ public TestingStateTransitionManager(Runnable onChangeRunnable, Runnable onTrigg
this.onTriggerRunnable = onTriggerRunnable;
}
+ @Override
+ public void onChange(boolean newResourceDriven) {}
+
@Override
public void onChange() {
this.onChangeRunnable.run();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
index 0b74ecf2523b3..19f45c91a3286 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
@@ -28,7 +28,6 @@
import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan;
import org.apache.flink.runtime.scheduler.adaptive.allocator.JobAllocationsInformation.VertexAllocationInformation;
-import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.topology.VertexID;
@@ -47,6 +46,7 @@
import java.util.Optional;
import java.util.Set;
+import static org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
import static org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlot.getSlots;
import static org.assertj.core.api.Assertions.assertThat;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/DefaultRescaleTimelineTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/DefaultRescaleTimelineTest.java
new file mode 100644
index 0000000000000..a264948a3e6a7
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/DefaultRescaleTimelineTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.timeline;
+
+import org.apache.flink.runtime.scheduler.DefaultVertexParallelismStore;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link DefaultRescaleTimeline}. */
+class DefaultRescaleTimelineTest {
+
+ private RescaleTimeline rescaleTimeline;
+ private DefaultRescaleTimeline defaultRescaleTimeline;
+
+ @BeforeEach
+ void setUp() {
+ this.rescaleTimeline =
+ new DefaultRescaleTimeline(
+ () ->
+ new TestingJobInformation(
+ Collections.emptySet(),
+ Collections.emptyList(),
+ new DefaultVertexParallelismStore()),
+ 3);
+ this.defaultRescaleTimeline = (DefaultRescaleTimeline) rescaleTimeline;
+ }
+
+ @Test
+ void testIsIdling() {
+ assertThat(rescaleTimeline.isIdling()).isTrue();
+ rescaleTimeline.newRescale(true);
+ assertThat(rescaleTimeline.isIdling()).isFalse();
+
+ rescaleTimeline.updateRescale(
+ rescaleToUpdate -> rescaleToUpdate.setTerminatedReason(TerminatedReason.SUCCEEDED));
+ assertThat(rescaleTimeline.isIdling()).isTrue();
+ }
+
+ @Test
+ void testNewRescaleWithIdInfoGenerationLogic() {
+ assertThat(defaultRescaleTimeline.currentRescale()).isNull();
+ rescaleTimeline.newRescale(true);
+ Rescale rescale1 = defaultRescaleTimeline.currentRescale();
+ assertThat(rescale1).isNotNull();
+ RescaleIdInfo rescaleIdInfo1 = rescale1.getRescaleIdInfo();
+ assertThat(rescaleIdInfo1.getRescaleAttemptId()).isOne();
+
+ rescaleTimeline.updateRescale(r -> r.setTerminatedReason(TerminatedReason.SUCCEEDED));
+ rescaleTimeline.newRescale(false);
+ Rescale rescale2 = defaultRescaleTimeline.currentRescale();
+ RescaleIdInfo rescaleIdInfo2 = rescale2.getRescaleIdInfo();
+ assertThat(rescale2).isNotNull();
+ assertThat(rescaleIdInfo2.getRescaleAttemptId()).isEqualTo(2L);
+ assertThat(rescaleIdInfo2.getResourceRequirementsId())
+ .isEqualTo(rescaleIdInfo1.getResourceRequirementsId());
+ assertThat(rescaleIdInfo2.getRescaleUuid()).isNotEqualTo(rescaleIdInfo1.getRescaleUuid());
+
+ rescaleTimeline.updateRescale(r -> r.setTerminatedReason(TerminatedReason.SUCCEEDED));
+ rescaleTimeline.newRescale(true);
+ Rescale rescale3 = defaultRescaleTimeline.currentRescale();
+ RescaleIdInfo rescaleIdInfo3 = rescale3.getRescaleIdInfo();
+ assertThat(rescale3).isNotNull();
+ assertThat(rescaleIdInfo3.getRescaleAttemptId()).isOne();
+ assertThat(rescaleIdInfo3.getResourceRequirementsId())
+ .isNotEqualTo(rescaleIdInfo1.getResourceRequirementsId())
+ .isNotEqualTo(rescaleIdInfo2.getResourceRequirementsId());
+ assertThat(rescaleIdInfo3.getRescaleUuid())
+ .isNotEqualTo(rescaleIdInfo1.getRescaleUuid())
+ .isNotEqualTo(rescaleIdInfo2.getRescaleUuid());
+ }
+
+ @Test
+ void testLatestRescale() {
+ assertThat(rescaleTimeline.latestRescale(TerminalState.FAILED)).isNull();
+ assertThat(rescaleTimeline.latestRescale(TerminalState.COMPLETED)).isNull();
+ assertThat(rescaleTimeline.latestRescale(TerminalState.IGNORED)).isNull();
+
+ rescaleTimeline.newRescale(true);
+ rescaleTimeline.updateRescale(
+ rescaleToUpdate -> rescaleToUpdate.setTerminatedReason(TerminatedReason.SUCCEEDED));
+ Rescale firstLatestCompletedRescale = defaultRescaleTimeline.currentRescale();
+ assertThat(rescaleTimeline.latestRescale(TerminalState.COMPLETED))
+ .isNotNull()
+ .isEqualTo(defaultRescaleTimeline.currentRescale());
+
+ rescaleTimeline.newRescale(true);
+ rescaleTimeline.updateRescale(
+ rescaleToUpdate ->
+ rescaleToUpdate.setTerminatedReason(TerminatedReason.EXCEPTION_OCCURRED));
+ assertThat(rescaleTimeline.latestRescale(TerminalState.FAILED))
+ .isNotNull()
+ .isEqualTo(defaultRescaleTimeline.currentRescale());
+
+ rescaleTimeline.newRescale(true);
+ rescaleTimeline.updateRescale(
+ rescaleToUpdate ->
+ rescaleToUpdate.setTerminatedReason(TerminatedReason.JOB_FINISHED));
+ assertThat(rescaleTimeline.latestRescale(TerminalState.IGNORED))
+ .isNotNull()
+ .isEqualTo(defaultRescaleTimeline.currentRescale());
+
+ rescaleTimeline.newRescale(true);
+ rescaleTimeline.updateRescale(
+ rescaleToUpdate -> rescaleToUpdate.setTerminatedReason(TerminatedReason.SUCCEEDED));
+ assertThat(rescaleTimeline.latestRescale(TerminalState.COMPLETED))
+ .isNotNull()
+ .isEqualTo(defaultRescaleTimeline.currentRescale())
+ .isNotEqualTo(firstLatestCompletedRescale);
+ }
+
+ @Test
+ void testUpdateRescale() {
+ assertThat(defaultRescaleTimeline.currentRescale()).isNull();
+ rescaleTimeline.newRescale(true);
+ rescaleTimeline.updateRescale(
+ rescaleToUpdate -> rescaleToUpdate.setTerminatedReason(TerminatedReason.SUCCEEDED));
+ assertThat(defaultRescaleTimeline.currentRescale().getTerminalState())
+ .isNotNull()
+ .isEqualTo(TerminalState.COMPLETED);
+ assertThat(defaultRescaleTimeline.currentRescale().getTerminatedReason())
+ .isNotNull()
+ .isEqualTo(TerminatedReason.SUCCEEDED);
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleRecordingLogicITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleRecordingLogicITCase.java
new file mode 100644
index 0000000000000..4b0fcad14b867
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleRecordingLogicITCase.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.timeline;
+
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Test for recording rescale history by {@link DefaultRescaleTimeline} or {@link
+ * RescaleTimeline.NoOpRescaleTimeline}.
+ */
+class RescaleRecordingLogicITCase {
+ static final String DISABLED_DESCRIPTION =
+ "TODO: Blocked by FLINK-38343, the ITCases need the SchedulerNG#requstJob() to get the rescale history.";
+
+ // Tests for rescale trigger causes.
+ @Disabled(DISABLED_DESCRIPTION)
+ @Test
+ void testRecordingRescaleTriggerredByInitialSchedule() {}
+
+ @Disabled(DISABLED_DESCRIPTION)
+ @Test
+ void testRecordingRescaleTriggerredByUpdateRequirement() {}
+
+ @Disabled(DISABLED_DESCRIPTION)
+ @Test
+ void testRecordingRescaleTriggerredByNewResourceAvailable() {}
+
+ @Disabled(DISABLED_DESCRIPTION)
+ @Test
+ void testRecordingRescaleTriggerredByRecoverableFailover() {}
+
+ // End of tests for rescale trigger causes.
+
+ // Start of tests for rescale terminated reasons and terminal state.
+
+ @Disabled(DISABLED_DESCRIPTION)
+ @Test
+ void testRecordingRescaleTerminatedBySucceeded() {}
+
+ @Disabled(DISABLED_DESCRIPTION)
+ @Test
+ void testRecordingRescaleTerminatedByJobFinished() {}
+
+ @Disabled(DISABLED_DESCRIPTION)
+ @Test
+ void testRecordingRescaleTerminatedByJobCancelled() {}
+
+ @Disabled(DISABLED_DESCRIPTION)
+ @Test
+ void testRecordingRescaleTerminatedByJobFailed() {}
+
+ @Disabled(DISABLED_DESCRIPTION)
+ @Test
+ void testRecordingRescaleTerminatedByNoResourcesOrParallelismsChange() {}
+
+ @Disabled(DISABLED_DESCRIPTION)
+ @Test
+ void testRecordingRescaleTerminatedByResourcesNotEnoughException() {}
+
+ @Disabled(DISABLED_DESCRIPTION)
+ @Test
+ void testRecordingRescaleTerminatedByResourceRequirementsUpdated() {}
+
+ @Disabled(DISABLED_DESCRIPTION)
+ @Test
+ void testRecordingRescaleTerminatedByJobRestarting() {}
+
+ // End of tests for rescale terminated reasons and terminal state.
+
+ @Disabled(DISABLED_DESCRIPTION)
+ @Test
+ void testRecordingRescaleWithoutPreRescaleInfo() {}
+
+ @Disabled(DISABLED_DESCRIPTION)
+ @Test
+ void testRecordingRescaleWithPreRescaleInfo() {}
+
+ @Disabled(DISABLED_DESCRIPTION)
+ @Test
+ void testUseNonTerminatedRescaleToRecordMergingWithNewRecoverableFailureTriggerCause() {}
+
+ @Disabled(DISABLED_DESCRIPTION)
+ @Test
+ void testRecordingInProgressRescale() {}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesSummaryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesSummaryTest.java
new file mode 100644
index 0000000000000..4e717aee344a4
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesSummaryTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.timeline;
+
+import org.apache.flink.runtime.util.stats.StatsSummary;
+import org.apache.flink.util.AbstractID;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link RescalesSummary}. */
+class RescalesSummaryTest {
+
+ private Rescale getRescale() {
+ Rescale rescale = new Rescale(new RescaleIdInfo(new AbstractID(), 1L));
+ rescale.setStartTimestamp(1L);
+ rescale.setEndTimestamp(2L);
+ return rescale;
+ }
+
+ private void assertSummary(
+ StatsSummary summary,
+ long expectedCount,
+ long expectedSum,
+ long expectedAvg,
+ long expectedMax,
+ long expectedMin) {
+ assertThat(summary.getMinimum()).isEqualTo(expectedMin);
+ assertThat(summary.getMaximum()).isEqualTo(expectedMax);
+ assertThat(summary.getAverage()).isEqualTo(expectedAvg);
+ assertThat(summary.getSum()).isEqualTo(expectedSum);
+ assertThat(summary.getCount()).isEqualTo(expectedCount);
+ }
+
+ @Test
+ void testAddInProgressAndTerminated() {
+ RescalesSummary rescalesSummary = new RescalesSummary(5);
+ Rescale rescale = getRescale();
+ rescale.setStartTimestamp(1L);
+ rescale.setEndTimestamp(2L);
+
+ // Test adding unexpected non-terminated rescale.
+ rescalesSummary.addTerminated(rescale);
+ assertThat(rescalesSummary.getTotalRescalesCount()).isZero();
+ assertSummary(rescalesSummary.getAllTerminatedSummary(), 0L, 0L, 0L, 0L, 0L);
+
+ // Test adding unexpected terminated rescale.
+ rescale.setTerminatedReason(TerminatedReason.SUCCEEDED);
+ rescalesSummary.addInProgress(rescale);
+ assertThat(rescalesSummary.getTotalRescalesCount()).isZero();
+ assertSummary(rescalesSummary.getAllTerminatedSummary(), 0L, 0L, 0L, 0L, 0L);
+
+ // Test add in-progress rescale.
+ rescale = getRescale();
+ rescalesSummary.addInProgress(rescale);
+ assertThat(rescalesSummary.getTotalRescalesCount()).isOne();
+ assertThat(rescalesSummary.getCompletedRescalesCount()).isZero();
+ assertThat(rescalesSummary.getIgnoredRescalesCount()).isZero();
+ assertThat(rescalesSummary.getFailedRescalesCount()).isZero();
+ assertThat(rescalesSummary.getInProgressRescalesCount()).isOne();
+
+ // Test add a completed rescale after adding a in-progress rescale.
+ rescale.setTerminatedReason(TerminatedReason.SUCCEEDED);
+ rescalesSummary.addTerminated(rescale);
+ assertThat(rescalesSummary.getTotalRescalesCount()).isOne();
+ assertThat(rescalesSummary.getCompletedRescalesCount()).isOne();
+ assertThat(rescalesSummary.getIgnoredRescalesCount()).isZero();
+ assertThat(rescalesSummary.getInProgressRescalesCount()).isZero();
+ assertThat(rescalesSummary.getFailedRescalesCount()).isZero();
+ assertSummary(rescalesSummary.getCompletedRescalesSummary(), 1L, 1L, 1L, 1L, 1L);
+ }
+}