Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/all_jobmanager_section.html
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,12 @@
<td><p>Enum</p></td>
<td>Determines which scheduler implementation is used to schedule tasks. If this option is not explicitly set, batch jobs will use the 'AdaptiveBatch' scheduler as the default, while streaming jobs will default to the 'Default' scheduler. <br /><br />Possible values:<ul><li>"Default": Default scheduler</li><li>"Adaptive": Adaptive scheduler. More details can be found <a href="{{.Site.BaseURL}}{{.Site.LanguagePrefix}}/docs/deployment/elastic_scaling#adaptive-scheduler">here</a>.</li><li>"AdaptiveBatch": Adaptive batch scheduler. More details can be found <a href="{{.Site.BaseURL}}{{.Site.LanguagePrefix}}/docs/deployment/elastic_scaling#adaptive-batch-scheduler">here</a>.</li></ul></td>
</tr>
<tr>
<td><h5>web.adaptive-scheduler.rescale-history.size</h5></td>
<td style="word-wrap: break-word;">0</td>
<td>Integer</td>
<td>The maximum number of the rescale records per job whose scheduler is <code class="highlighter-rouge">AdaptiveScheduler</code>. The feature will be disabled when the configuration value is smaller or equals to 0.</td>
</tr>
<tr>
<td><h5>web.exception-history-size</h5></td>
<td style="word-wrap: break-word;">16</td>
Expand Down
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/web_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@
<td>String</td>
<td>Access-Control-Allow-Origin header for all responses from the web-frontend.</td>
</tr>
<tr>
<td><h5>web.adaptive-scheduler.rescale-history.size</h5></td>
<td style="word-wrap: break-word;">0</td>
<td>Integer</td>
<td>The maximum number of the rescale records per job whose scheduler is <code class="highlighter-rouge">AdaptiveScheduler</code>. The feature will be disabled when the configuration value is smaller or equals to 0.</td>
</tr>
<tr>
<td><h5>web.cancel.enable</h5></td>
<td style="word-wrap: break-word;">true</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,20 @@ public class WebOptions {
.withDescription(
"The maximum number of failures collected by the exception history per job.");

/** The maximum number of the adaptive scheduler rescale history. */
@Documentation.Section(Documentation.Sections.ALL_JOB_MANAGER)
public static final ConfigOption<Integer> MAX_ADAPTIVE_SCHEDULER_RESCALE_HISTORY_SIZE =
key("web.adaptive-scheduler.rescale-history.size")
.intType()
.defaultValue(0)
.withDescription(
Description.builder()
.text(
"The maximum number of the rescale records per job whose scheduler is %s. "
+ "The feature will be disabled when the configuration value is smaller or equals to 0.",
code("AdaptiveScheduler"))
.build());

/** Timeout for asynchronous operations by the web monitor in milliseconds. */
public static final ConfigOption<Duration> TIMEOUT =
key("web.timeout")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@
import org.apache.flink.runtime.scheduler.adaptive.allocator.ReservedSlots;
import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAllocator;
import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
import org.apache.flink.runtime.scheduler.adaptive.timeline.DefaultRescaleTimeline;
import org.apache.flink.runtime.scheduler.adaptive.timeline.Rescale;
import org.apache.flink.runtime.scheduler.adaptive.timeline.RescaleTimeline;
import org.apache.flink.runtime.scheduler.adaptive.timeline.TerminalState;
import org.apache.flink.runtime.scheduler.adaptive.timeline.TerminatedReason;
import org.apache.flink.runtime.scheduler.adaptive.timeline.TriggerCause;
import org.apache.flink.runtime.scheduler.adaptivebatch.NonAdaptiveExecutionPlanSchedulingContext;
import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry;
import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
Expand Down Expand Up @@ -161,6 +167,7 @@
import static org.apache.flink.configuration.TraceOptions.CHECKPOINT_SPAN_DETAIL_LEVEL;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphUtils.isAnyOutputBlocking;
import static org.apache.flink.runtime.scheduler.adaptive.ForwardEdgesAdapter.copyJobGraphWithAdaptedForwardEdges;
import static org.apache.flink.runtime.scheduler.adaptive.timeline.RescaleTimeline.NoOpRescaleTimeline;

/**
* A {@link SchedulerNG} implementation that uses the declarative resource management and
Expand Down Expand Up @@ -304,7 +311,8 @@ public static Settings of(
configuration.get(
SCHEDULER_RESCALE_TRIGGER_MAX_DELAY,
maximumDelayForRescaleTriggerDefault),
rescaleOnFailedCheckpointsCount);
rescaleOnFailedCheckpointsCount,
configuration.get(WebOptions.MAX_ADAPTIVE_SCHEDULER_RESCALE_HISTORY_SIZE));
}

private final SchedulerExecutionMode executionMode;
Expand All @@ -315,6 +323,7 @@ public static Settings of(
private final Duration executingResourceStabilizationTimeout;
private final Duration maximumDelayForTriggeringRescale;
private final int rescaleOnFailedCheckpointCount;
private final int rescaleHistoryMax;

private Settings(
SchedulerExecutionMode executionMode,
Expand All @@ -324,7 +333,8 @@ private Settings(
Duration executingCooldownTimeout,
Duration executingResourceStabilizationTimeout,
Duration maximumDelayForTriggeringRescale,
int rescaleOnFailedCheckpointCount) {
int rescaleOnFailedCheckpointCount,
int rescaleHistoryMax) {
this.executionMode = executionMode;
this.submissionResourceWaitTimeout = submissionResourceWaitTimeout;
this.submissionResourceStabilizationTimeout = submissionResourceStabilizationTimeout;
Expand All @@ -333,6 +343,7 @@ private Settings(
this.executingResourceStabilizationTimeout = executingResourceStabilizationTimeout;
this.maximumDelayForTriggeringRescale = maximumDelayForTriggeringRescale;
this.rescaleOnFailedCheckpointCount = rescaleOnFailedCheckpointCount;
this.rescaleHistoryMax = rescaleHistoryMax;
}

public SchedulerExecutionMode getExecutionMode() {
Expand Down Expand Up @@ -366,6 +377,10 @@ public Duration getMaximumDelayForTriggeringRescale() {
public int getRescaleOnFailedCheckpointCount() {
return rescaleOnFailedCheckpointCount;
}

public int getRescaleHistoryMax() {
return rescaleHistoryMax;
}
}

private final Settings settings;
Expand Down Expand Up @@ -430,6 +445,8 @@ public int getRescaleOnFailedCheckpointCount() {

private final Supplier<Temporal> clock = Instant::now;

private final RescaleTimeline rescaleTimeline;

public AdaptiveScheduler(
Settings settings,
JobGraph jobGraph,
Expand Down Expand Up @@ -523,6 +540,12 @@ public AdaptiveScheduler(
this.initialParallelismStore = vertexParallelismStore;
this.jobInformation = new JobGraphJobInformation(jobGraph, vertexParallelismStore);

int rescaleHistoryMax = settings.getRescaleHistoryMax();
this.rescaleTimeline =
rescaleHistoryMax <= 0
? NoOpRescaleTimeline.INSTANCE
: new DefaultRescaleTimeline(() -> jobInformation, rescaleHistoryMax);

this.declarativeSlotPool = declarativeSlotPool;
this.initializationTimestamp = initializationTimestamp;
this.ioExecutor = ioExecutor;
Expand Down Expand Up @@ -707,6 +730,11 @@ static VertexParallelismStore computeVertexParallelismStoreForExecution(
jobGraph.getVertices(), defaultMaxParallelismFunc);
}

@Override
public RescaleTimeline getRescaleTimeline() {
return rescaleTimeline;
}

private void newResourcesAvailable(Collection<? extends PhysicalSlot> physicalSlots) {
state.tryRun(
ResourceListener.class,
Expand Down Expand Up @@ -1093,6 +1121,7 @@ public void updateJobResourceRequirements(JobResourceRequirements jobResourceReq
if (maybeUpdateVertexParallelismStore.isPresent()) {
this.jobInformation =
new JobGraphJobInformation(jobGraph, maybeUpdateVertexParallelismStore.get());
driveRescaleTimelineByNewResourceRequirements();
declareDesiredResources();
state.tryRun(
ResourceListener.class,
Expand All @@ -1101,6 +1130,27 @@ public void updateJobResourceRequirements(JobResourceRequirements jobResourceReq
}
}

private void driveRescaleTimelineByNewResourceRequirements() {
rescaleTimeline.updateRescale(
rescale ->
rescale.addSchedulerState(state)
.setTerminatedReason(TerminatedReason.RESOURCE_REQUIREMENTS_UPDATED)
.setEndTimestamp(Instant.now().toEpochMilli())
.log());
rescaleTimeline.newRescale(true);
rescaleTimeline.updateRescale(
rescale ->
rescale.setStartTimestamp(Instant.now().toEpochMilli())
.setDesiredVertexParallelism(jobInformation)
.setTriggerCause(TriggerCause.UPDATE_REQUIREMENT)
.setDesiredSlots(jobInformation)
.setMinimalRequiredSlots(jobInformation)
.setPreRescaleSlotsAndParallelisms(
jobInformation,
rescaleTimeline.latestRescale(TerminalState.COMPLETED))
.log());
}

// ----------------------------------------------------------------

@Override
Expand Down Expand Up @@ -1258,6 +1308,15 @@ public void goToCanceling(
OperatorCoordinatorHandler operatorCoordinatorHandler,
List<ExceptionHistoryEntry> failureCollection) {

if (!rescaleTimeline.isIdling()) {
rescaleTimeline.updateRescale(
rescale ->
rescale.addSchedulerState(state)
.setEndTimestamp(Instant.now().toEpochMilli())
.setTerminatedReason(TerminatedReason.JOB_CANCELED)
.log());
}

transitionToState(
new Canceling.Factory(
this,
Expand All @@ -1278,6 +1337,8 @@ public void goToRestarting(
@Nullable VertexParallelism restartWithParallelism,
List<ExceptionHistoryEntry> failureCollection) {

driveRescaleTimelineByJobRestarting(restartWithParallelism);

for (ExecutionVertex executionVertex : executionGraph.getAllExecutionVertices()) {
final int attemptNumber =
executionVertex.getCurrentExecutionAttempt().getAttemptNumber();
Expand Down Expand Up @@ -1306,13 +1367,54 @@ public void goToRestarting(
}
}

private void driveRescaleTimelineByJobRestarting(VertexParallelism restartWithParallelism) {
if (restartWithParallelism == null) {
// For the failover restarting.
if (!rescaleTimeline.isIdling()) {
// Process by https://lists.apache.org/thread/hh7w2p6lnmbo1q6d9ngkttdyrw4lp74h.
LOG.info(
"Merge the current non-terminated rescale and the new rescale triggered by recoverable failover into the current rescale.");
rescaleTimeline.updateRescale(Rescale::clearSchedulerStates);
} else if (rescaleTimeline.isIdling()) {
rescaleTimeline.newRescale(false);
}
rescaleTimeline.updateRescale(
rescale ->
rescale.setStartTimestamp(Instant.now().toEpochMilli())
.setTriggerCause(TriggerCause.RECOVERABLE_FAILOVER)
.setMinimalRequiredSlots(jobInformation)
.setPreRescaleSlotsAndParallelisms(
jobInformation,
rescaleTimeline.latestRescale(TerminalState.COMPLETED))
.setDesiredVertexParallelism(jobInformation)
.setDesiredSlots(jobInformation)
.log());
} else {
// For the normal rescaling restarting.
rescaleTimeline.updateRescale(
rescale ->
rescale.setMinimalRequiredSlots(jobInformation)
.setDesiredVertexParallelism(jobInformation)
.setDesiredSlots(jobInformation)
.log());
}
}

@Override
public void goToFailing(
ExecutionGraph executionGraph,
ExecutionGraphHandler executionGraphHandler,
OperatorCoordinatorHandler operatorCoordinatorHandler,
Throwable failureCause,
List<ExceptionHistoryEntry> failureCollection) {
rescaleTimeline.updateRescale(
rescale ->
rescale.setEndTimestamp(Instant.now().toEpochMilli())
.addSchedulerState(state, failureCause)
.setTerminatedReason(TerminatedReason.JOB_FAILED)
.setStringifiedException(
ExceptionUtils.stringifyException(failureCause))
.log());
transitionToState(
new Failing.Factory(
this,
Expand Down Expand Up @@ -1351,6 +1453,12 @@ public CompletableFuture<String> goToStopWithSavepoint(

@Override
public void goToFinished(ArchivedExecutionGraph archivedExecutionGraph) {
rescaleTimeline.updateRescale(
rescale ->
rescale.addSchedulerState(state)
.setEndTimestamp(Instant.now().toEpochMilli())
.setTerminatedReason(TerminatedReason.JOB_FINISHED)
.log());
transitionToState(new Finished.Factory(this, archivedExecutionGraph, LOG));
}

Expand Down Expand Up @@ -1620,6 +1728,9 @@ <T extends State> T transitionToState(StateFactory<T> targetState) {
final JobStatus previousJobStatus = state.getJobStatus();

state.onLeave(targetState.getStateClass());

rescaleTimeline.updateRescale(rescale -> rescale.addSchedulerState(state));

T targetStateInstance = targetState.getState();
state = targetStateInstance;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
package org.apache.flink.runtime.scheduler.adaptive;

import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.scheduler.adaptive.allocator.JobInformation;
import org.apache.flink.runtime.scheduler.adaptive.timeline.RescaleTimeline;
import org.apache.flink.runtime.scheduler.adaptive.timeline.TriggerCause;

import org.slf4j.Logger;

Expand All @@ -39,9 +42,24 @@ public JobStatus getJobStatus() {

/** Starts the scheduling by going into the {@link WaitingForResources} state. */
void startScheduling() {
driveRescaleTimelineByInitialScheduling();
context.goToWaitingForResources(null);
}

private void driveRescaleTimelineByInitialScheduling() {
RescaleTimeline rescaleTimeline = context.getRescaleTimeline();
JobInformation jobInformation = rescaleTimeline.getJobInformation();
rescaleTimeline.newRescale(true);
rescaleTimeline.updateRescale(
rescale ->
rescale.setTriggerCause(TriggerCause.INITIAL_SCHEDULE)
.setStartTimestamp(getDurable().getEnterTimestamp())
.setDesiredVertexParallelism(jobInformation)
.setDesiredSlots(jobInformation)
.setMinimalRequiredSlots(jobInformation)
.log());
}

/** Context of the {@link Created} state. */
interface Context
extends StateWithoutExecutionGraph.Context, StateTransitions.ToWaitingForResources {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.apache.flink.runtime.scheduler.GlobalFailureHandler;
import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
import org.apache.flink.runtime.scheduler.adaptive.timeline.RescaleTimeline;
import org.apache.flink.runtime.scheduler.adaptive.timeline.TerminatedReason;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
Expand All @@ -42,6 +44,7 @@
import javax.annotation.Nullable;

import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -97,6 +100,9 @@ private void handleExecutionGraphCreation(
CreatingExecutionGraph.class.getSimpleName(),
Executing.class.getSimpleName(),
throwable);

driveRescaleTimelineByException(throwable);

context.goToFinished(context.getArchivedExecutionGraph(JobStatus.FAILED, throwable));
} else {
for (ExecutionVertex vertex :
Expand All @@ -108,6 +114,8 @@ private void handleExecutionGraphCreation(
context.tryToAssignSlots(executionGraphWithVertexParallelism);

if (result.isSuccess()) {
driveRescaleTimelineByGraphCreation(executionGraphWithVertexParallelism);

getLogger()
.debug(
"Successfully reserved and assigned the required slots for the ExecutionGraph.");
Expand Down Expand Up @@ -155,6 +163,31 @@ private void handleExecutionGraphCreation(
}
}

private void driveRescaleTimelineByGraphCreation(
ExecutionGraphWithVertexParallelism executionGraphWithVertexParallelism) {
RescaleTimeline rescaleTimeline = context.getRescaleTimeline();
rescaleTimeline.updateRescale(
rescale ->
rescale.setPostRescaleVertexParallelism(
rescaleTimeline.getJobInformation(),
executionGraphWithVertexParallelism.getVertexParallelism())
.setPostRescaleSlots(
executionGraphWithVertexParallelism.jobSchedulingPlan
.getSlotAssignments())
.log());
}

private void driveRescaleTimelineByException(Throwable throwable) {
final long epochMilli = Instant.now().toEpochMilli();
context.getRescaleTimeline()
.updateRescale(
rescale ->
rescale.setTerminatedReason(TerminatedReason.EXCEPTION_OCCURRED)
.setEndTimestamp(epochMilli)
.addSchedulerState(this, throwable)
.log());
}

@Override
public JobStatus getJobStatus() {
return JobStatus.CREATED;
Expand Down
Loading