From a59d738091aa19c3d94948ab02a34d2e89dc634a Mon Sep 17 00:00:00 2001 From: och5351 Date: Sat, 11 Apr 2026 15:25:24 +0900 Subject: [PATCH 1/2] [FLINK-38899][runtime-web] Introduce the Rescales/History sub-page for streaming jobs with the adaptive scheduler enabled Co-authored-by: Matthias Pohl Co-authored-by: Yuepeng Pan --- .../src/app/interfaces/job-rescales.ts | 59 ++- .../job/rescales/job-rescales.component.html | 363 ++++++++++++++++++ .../job/rescales/job-rescales.component.less | 78 ++++ .../job/rescales/job-rescales.component.ts | 79 +++- .../src/app/services/job.service.ts | 18 +- 5 files changed, 589 insertions(+), 8 deletions(-) diff --git a/flink-runtime-web/web-dashboard/src/app/interfaces/job-rescales.ts b/flink-runtime-web/web-dashboard/src/app/interfaces/job-rescales.ts index b5db266ddb6a3..85e90c98428b5 100644 --- a/flink-runtime-web/web-dashboard/src/app/interfaces/job-rescales.ts +++ b/flink-runtime-web/web-dashboard/src/app/interfaces/job-rescales.ts @@ -16,7 +16,64 @@ * limitations under the License. */ -export interface RescalesConfig { +export type RescalesHistory = BriefJobRescaleDetails[]; + +export interface BriefJobRescaleDetails { + rescaleUuid: string; + resourceRequirementsUuid: string; + rescaleAttemptId: number; + vertices: { [jobVertexId: string]: VertexParallelismRescaleInfo }; + slots: { [slotSharingGroupId: string]: SlotSharingGroupRescaleInfo }; + schedulerStates: SchedulerState[]; + startTimestampInMillis: number; + endTimestampInMillis: number; + terminalState: string; + triggerCause: string; + terminatedReason: string; +} + +export interface JobRescaleDetails extends BriefJobRescaleDetails {} + +export interface VertexParallelismRescaleInfo { + jobVertexId: string; + jobVertexName: string; + slotSharingGroupId: string; + slotSharingGroupName: string; + desiredParallelism: number; + sufficientParallelism: number; + preRescaleParallelism: number; + postRescaleParallelism: number; +} + +export interface SlotSharingGroupRescaleInfo { + slotSharingGroupId: string; + slotSharingGroupName: string; + requestResourceProfile: ResourceProfileInfo; + desiredSlots: number; + minimalRequiredSlots: number; + preRescaleSlots: number; + postRescaleSlots: number; + acquiredResourceProfile: ResourceProfileInfo; +} + +export interface ResourceProfileInfo { + cpuCores: number; + taskHeapMemory: number; + taskOffHeapMemory: number; + managedMemory: number; + networkMemory: number; + extendedResources: { [key: string]: unknown }; +} + +export interface SchedulerState { + state: string; + enterTimestampInMillis: number; + leaveTimestampInMillis: number; + durationInMillis: number; + stringifiedException: string; +} + +export interface JobRescaleConfigInfo { rescaleHistoryMax: number; schedulerExecutionMode: string; submissionResourceWaitTimeoutInMillis: number; diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/rescales/job-rescales.component.html b/flink-runtime-web/web-dashboard/src/app/pages/job/rescales/job-rescales.component.html index 2e90f2fbb7f61..c282aa8c94545 100644 --- a/flink-runtime-web/web-dashboard/src/app/pages/job/rescales/job-rescales.component.html +++ b/flink-runtime-web/web-dashboard/src/app/pages/job/rescales/job-rescales.component.html @@ -21,7 +21,358 @@ [nzSize]="'small'" [nzAnimated]="{ inkBar: true, tabPane: false }" [nzTabBarExtraContent]="extraTemplate" + (nzSelectedIndexChange)="refresh()" > + + + + + + + Rescale UUID + + + Requirements ID + + + Attempt ID + + + Trigger Cause + + + Terminal State + + + Terminated Reason + + + Start Time + + + Duration + + + End Time + + + + + + + + + {{ truncateUuid(jobRescaleDetails.rescaleUuid) }} + + + {{ truncateUuid(jobRescaleDetails.resourceRequirementsUuid) }} + + {{ jobRescaleDetails.rescaleAttemptId }} + {{ jobRescaleDetails.triggerCause }} + + + - + + {{ jobRescaleDetails.terminatedReason || '-' }} + + {{ jobRescaleDetails.startTimestampInMillis | date: 'yyyy-MM-dd HH:mm:ss.SSS' }} + + + {{ + (jobRescaleDetails.endTimestampInMillis || Date.now()) - + jobRescaleDetails.startTimestampInMillis | humanizeDuration + }} + + + {{ + jobRescaleDetails.endTimestampInMillis + ? (jobRescaleDetails.endTimestampInMillis | date: 'yyyy-MM-dd HH:mm:ss.SSS') + : '-' + }} + + + + +
+
+

Rescale Details

+ + + + Rescale UUID: + {{ truncateUuid(jobRescaleDetails.rescaleUuid) }} + + + + Requirements ID: + {{ truncateUuid(jobRescaleDetails.resourceRequirementsUuid) }} + + + + Attempt ID: + {{ jobRescaleDetails.rescaleAttemptId }} + + + + Trigger Cause: + {{ jobRescaleDetails.triggerCause }} + + + + Terminal State: + {{ jobRescaleDetails?.terminalState || '-' }} + + + + Terminal Reason: + {{ jobRescaleDetails?.terminatedReason || '-' }} + + + + + + + + + ID + + + Name + + + Previous Parallelism + + + Acquired Parallelism + + + Desired Parallelism + + + Sufficient Parallelism + + + Slot Sharing Group ID + + + + + + + {{ truncateUuid(vertex.value.jobVertexId) }} + + + {{ truncateName(vertex.value.jobVertexName) }} + + {{ vertex.value.preRescaleParallelism }} + {{ vertex.value.postRescaleParallelism || '-' }} + {{ vertex.value.desiredParallelism }} + {{ vertex.value.sufficientParallelism }} + + {{ truncateUuid(vertex.value.slotSharingGroupId) }} + + + + + + + + + + Slot Sharing Group ID + + + Slot Sharing Group Name + + + Previous Slots + + + Acquired Slots + + + Desired Slots + + + Sufficient Slots + + + Required Profile + + + Acquired Profile + + + + + + + {{ truncateUuid(slot.value.slotSharingGroupId) }} + + {{ slot.value.slotSharingGroupName }} + {{ slot.value.preRescaleSlots }} + {{ slot.value.postRescaleSlots || '-' }} + {{ slot.value.desiredSlots }} + {{ slot.value.minimalRequiredSlots }} + +
{{
+                              slot.value.requestResourceProfile | json
+                            }}
+ + +
{{
+                              slot.value.acquiredResourceProfile | json
+                            }}
+ + + +
+ + + + + + State + + + Enter Time + + + Leave Time + + + Duration + + + Exception + + + + + + {{ state.state }} + + {{ state.enterTimestampInMillis | date: 'yyyy-MM-dd HH:mm:ss.SSS' }} + + + {{ state.leaveTimestampInMillis | date: 'yyyy-MM-dd HH:mm:ss.SSS' }} + + {{ state.durationInMillis | humanizeDuration }} + {{ state.stringifiedException }} + + + +
+ +
Loading...
+
+
+ + +
+ +
+
+ + Vertices + + + + Slots + + + + Scheduler State History + + diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/rescales/job-rescales.component.less b/flink-runtime-web/web-dashboard/src/app/pages/job/rescales/job-rescales.component.less index 6b78072948107..bbca1186be45c 100644 --- a/flink-runtime-web/web-dashboard/src/app/pages/job/rescales/job-rescales.component.less +++ b/flink-runtime-web/web-dashboard/src/app/pages/job/rescales/job-rescales.component.less @@ -27,6 +27,14 @@ .ant-tabs-nav-list { padding: 4px 16px; } + + > .ant-table-wrapper { + .ant-table { + > tbody > tr.ant-table-expanded-row > td { + padding: 0 !important; + } + } + } } } @@ -37,3 +45,73 @@ nz-empty { padding: 24px; } + +::ng-deep { + .nz-disable-td { + width: 100% !important; + padding: 0 !important; + } + + .collapse-td { + width: 100% !important; + padding: 0 !important; + } + + tr.ant-table-expanded-row { + width: 100%; + + > td { + width: 100% !important; + } + } +} + +.rescale-details-wrapper { + display: block; + box-sizing: border-box; + width: 100%; + padding: 0 24px; +} + +.rescale-details-box { + margin: 16px 0; + padding: 16px; + border: 1px solid #f0f0f0; + background-color: #fff; + + ::ng-deep { + nz-table.ant-table-wrapper { + display: block !important; + width: 100% !important; + margin-bottom: 16px; + + &:last-child { + margin-bottom: 0; + } + } + + .ant-spin-nested-loading { + display: block; + width: 100%; + } + + .ant-spin-container { + display: block; + width: 100%; + } + + .ant-table { + width: 100% !important; + table-layout: fixed !important; + + colgroup { + display: none; + } + } + + table { + width: 100% !important; + table-layout: fixed !important; + } + } +} diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/rescales/job-rescales.component.ts b/flink-runtime-web/web-dashboard/src/app/pages/job/rescales/job-rescales.component.ts index 4b80254bedfc7..6f1a1ecc1c527 100644 --- a/flink-runtime-web/web-dashboard/src/app/pages/job/rescales/job-rescales.component.ts +++ b/flink-runtime-web/web-dashboard/src/app/pages/job/rescales/job-rescales.component.ts @@ -16,13 +16,20 @@ * limitations under the License. */ -import { NgIf } from '@angular/common'; +import { NgFor, NgIf, JsonPipe, DatePipe, KeyValuePipe } from '@angular/common'; import { ChangeDetectionStrategy, ChangeDetectorRef, Component, OnDestroy, OnInit } from '@angular/core'; import { forkJoin, of, Subject } from 'rxjs'; import { catchError, distinctUntilChanged, switchMap, takeUntil } from 'rxjs/operators'; import { HumanizeDurationPipe } from '@flink-runtime-web/components/humanize-duration.pipe'; -import { RescalesConfig, JobDetail } from '@flink-runtime-web/interfaces'; +import { JobBadgeComponent } from '@flink-runtime-web/components/job-badge/job-badge.component'; +import { + BriefJobRescaleDetails, + JobRescaleDetails, + JobRescaleConfigInfo, + JobDetail, + RescalesHistory +} from '@flink-runtime-web/interfaces'; import { JobService } from '@flink-runtime-web/services'; import { NzButtonModule } from 'ng-zorro-antd/button'; import { NzCollapseModule } from 'ng-zorro-antd/collapse'; @@ -42,8 +49,13 @@ import { JobLocalService } from '../job-local.service'; changeDetection: ChangeDetectionStrategy.OnPush, imports: [ NgIf, + NgFor, + JsonPipe, + DatePipe, + KeyValuePipe, NzTabsModule, NzDividerModule, + JobBadgeComponent, HumanizeDurationPipe, NzTableModule, NzIconModule, @@ -54,8 +66,12 @@ import { JobLocalService } from '../job-local.service'; ] }) export class JobRescalesComponent implements OnInit, OnDestroy { - public rescalesConfig?: RescalesConfig; + public rescalesHistory?: RescalesHistory; + public rescaleDetailsMap = new Map(); + public rescalesConfig?: JobRescaleConfigInfo; public jobDetail: JobDetail; + public expandedRowsSet = new Set(); + public readonly Date = Date; private refresh$ = new Subject(); private destroy$ = new Subject(); @@ -71,6 +87,11 @@ export class JobRescalesComponent implements OnInit, OnDestroy { .pipe( switchMap(() => forkJoin([ + this.jobService.loadRescalesHistory(this.jobDetail.jid).pipe( + catchError(() => { + return of(undefined); + }) + ), this.jobService.loadRescalesConfig(this.jobDetail.jid).pipe( catchError(() => { return of(undefined); @@ -80,7 +101,8 @@ export class JobRescalesComponent implements OnInit, OnDestroy { ), takeUntil(this.destroy$) ) - .subscribe(([config]) => { + .subscribe(([history, config]) => { + this.rescalesHistory = history; this.rescalesConfig = config; this.cdr.markForCheck(); }); @@ -105,6 +127,55 @@ export class JobRescalesComponent implements OnInit, OnDestroy { } public refresh(): void { + const expandedUuids = Array.from(this.expandedRowsSet); + + expandedUuids.forEach(uuid => { + this.jobService + .loadRescaleDetail(this.jobDetail.jid, uuid) + .pipe(takeUntil(this.destroy$)) + .subscribe(detail => { + this.rescaleDetailsMap.set(uuid, detail); + this.cdr.markForCheck(); + }); + }); + this.refresh$.next(); } + + public trackById(item: BriefJobRescaleDetails): string { + return item.rescaleUuid; + } + + public onExpandChange(jobRescaleDetails: BriefJobRescaleDetails, expanded: boolean): void { + if (expanded) { + this.expandedRowsSet.add(jobRescaleDetails.rescaleUuid); + if (!this.rescaleDetailsMap.has(jobRescaleDetails.rescaleUuid)) { + this.jobService + .loadRescaleDetail(this.jobDetail.jid, jobRescaleDetails.rescaleUuid) + .pipe(takeUntil(this.destroy$)) + .subscribe(detail => { + this.rescaleDetailsMap.set(jobRescaleDetails.rescaleUuid, detail); + this.cdr.markForCheck(); + }); + } + } else { + this.expandedRowsSet.delete(jobRescaleDetails.rescaleUuid); + } + } + + public isExpanded(rescaleUuid: string): boolean { + return this.expandedRowsSet.has(rescaleUuid); + } + + public getDetail(rescaleUuid: string): JobRescaleDetails | undefined { + return this.rescaleDetailsMap.get(rescaleUuid); + } + + public truncateUuid(uuid: string): string { + return uuid ? uuid.substring(0, 8) : ''; + } + + public truncateName(name: string, maxLength: number = 32): string { + return name && name.length > maxLength ? `${name.substring(0, maxLength)}...` : name; + } } diff --git a/flink-runtime-web/web-dashboard/src/app/services/job.service.ts b/flink-runtime-web/web-dashboard/src/app/services/job.service.ts index 313a3e9308ea4..aff1e903b6d79 100644 --- a/flink-runtime-web/web-dashboard/src/app/services/job.service.ts +++ b/flink-runtime-web/web-dashboard/src/app/services/job.service.ts @@ -26,7 +26,9 @@ import { CheckpointConfig, CheckpointDetail, CheckpointSubTask, - RescalesConfig, + RescalesHistory, + JobRescaleDetails, + JobRescaleConfigInfo, JobAccumulators, JobBackpressure, JobConfig, @@ -179,8 +181,18 @@ export class JobService { ); } - public loadRescalesConfig(jobId: string): Observable { - return this.httpClient.get(`${this.configService.BASE_URL}/jobs/${jobId}/rescales/config`); + public loadRescalesHistory(jobId: string): Observable { + return this.httpClient.get(`${this.configService.BASE_URL}/jobs/${jobId}/rescales/history`); + } + + public loadRescaleDetail(jobId: string, rescaleUuid: string): Observable { + return this.httpClient.get( + `${this.configService.BASE_URL}/jobs/${jobId}/rescales/details/${rescaleUuid}` + ); + } + + public loadRescalesConfig(jobId: string): Observable { + return this.httpClient.get(`${this.configService.BASE_URL}/jobs/${jobId}/rescales/config`); } public loadJobResourceRequirements(jobId: string): Observable { From 5c9edc703bc7277978b3aa844d8b8ad1ea5795eb Mon Sep 17 00:00:00 2001 From: och5351 Date: Sat, 11 Apr 2026 15:27:08 +0900 Subject: [PATCH 2/2] [hotfix][runtime-web] Handle hyphen in the rescales/configuration tab when the schedulerExecutionMode variable value is null Co-authored-by: Matthias Pohl Co-authored-by: Yuepeng Pan --- .../src/app/pages/job/rescales/job-rescales.component.html | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/rescales/job-rescales.component.html b/flink-runtime-web/web-dashboard/src/app/pages/job/rescales/job-rescales.component.html index c282aa8c94545..08eb8f69a691d 100644 --- a/flink-runtime-web/web-dashboard/src/app/pages/job/rescales/job-rescales.component.html +++ b/flink-runtime-web/web-dashboard/src/app/pages/job/rescales/job-rescales.component.html @@ -391,8 +391,7 @@

Rescale Details

Scheduler Execution Mode - REACTIVE - + {{ rescalesConfig['schedulerExecutionMode'] || '-' }} Submission Resource Wait Timeout