Преглед изворни кода

JobScheduler 中添加获取任务列表, 取消任务等方法

reghao пре 2 дана
родитељ
комит
01a20e8375

+ 10 - 17
mgr/src/main/java/cn/reghao/devops/mgr/ops/builder/controller/BuildDeployController.java

@@ -1,24 +1,17 @@
 package cn.reghao.devops.mgr.ops.builder.controller;
 
 import cn.reghao.devops.mgr.ops.builder.model.po.PipelineStep;
-import cn.reghao.devops.mgr.ops.builder.model.vo.AppBuildingVO;
-import cn.reghao.devops.mgr.ops.builder.model.vo.AppDeployingVO;
-import cn.reghao.devops.mgr.ops.builder.model.vo.BuildLogVO;
-import cn.reghao.devops.mgr.ops.builder.model.vo.BuildTask;
+import cn.reghao.devops.mgr.ops.builder.model.vo.*;
 import cn.reghao.devops.mgr.ops.app.service.BuildTimeMetrics;
-import cn.reghao.devops.mgr.ops.builder.service.BuildService;
+import cn.reghao.devops.mgr.ops.builder.service.*;
 import cn.reghao.devops.mgr.util.JsonUtils;
 import cn.reghao.jutil.jdk.serializer.JsonConverter;
 import cn.reghao.jutil.jdk.web.db.PageList;
 import cn.reghao.devops.mgr.ops.app.service.AppBuildService;
-import cn.reghao.devops.mgr.ops.builder.service.DeployApp;
-import cn.reghao.devops.mgr.ops.builder.service.GetApp;
 import cn.reghao.devops.mgr.ops.builder.service.webhook.WebhookService;
 import cn.reghao.devops.mgr.ops.app.db.query.AppBuildQuery;
 import cn.reghao.devops.mgr.ops.app.db.query.AppDeployQuery;
-import cn.reghao.devops.mgr.ops.builder.model.vo.BuildConfigSnapshot;
 import cn.reghao.devops.mgr.ops.builder.model.po.BuildLog;
-import cn.reghao.devops.mgr.ops.builder.service.BuildApp;
 import cn.reghao.jutil.web.WebResult;
 import io.swagger.v3.oas.annotations.tags.Tag;
 import io.swagger.v3.oas.annotations.Operation;
@@ -42,25 +35,25 @@ import java.util.*;
 public class BuildDeployController {
     private final AppBuildQuery appBuildQuery;
     private final AppDeployQuery appDeployQuery;
-    private final BuildApp buildApp;
     private final DeployApp deployApp;
     private final GetApp getApp;
     private final AppBuildService appBuildService;
     private final WebhookService webhookService;
     private final int pageSize = 10;
     private final BuildService buildService;
+    private final JobScheduler jobScheduler;
 
-    public BuildDeployController(AppBuildQuery appBuildQuery, AppDeployQuery appDeployQuery, BuildApp buildApp,
-                                 DeployApp deployApp, GetApp getApp, AppBuildService appBuildService,
-                                 WebhookService webhookService, BuildService buildService) {
+    public BuildDeployController(AppBuildQuery appBuildQuery, AppDeployQuery appDeployQuery, DeployApp deployApp,
+                                 GetApp getApp, AppBuildService appBuildService, WebhookService webhookService,
+                                 BuildService buildService, JobScheduler jobScheduler) {
         this.appBuildQuery = appBuildQuery;
         this.appDeployQuery = appDeployQuery;
-        this.buildApp = buildApp;
         this.deployApp = deployApp;
         this.getApp = getApp;
         this.appBuildService = appBuildService;
         this.webhookService = webhookService;
         this.buildService = buildService;
+        this.jobScheduler = jobScheduler;
     }
 
     @Operation(summary = "应用构建页面", description = "N")
@@ -99,8 +92,8 @@ public class BuildDeployController {
     @Operation(summary = "构建任务页面", description = "N")
     @GetMapping(value = "/task")
     public String buildTaskPage() {
-        List<BuildTask> list = buildApp.getBuildTasks();
-        return WebResult.success(list);
+        List<JobStatus> jobStatusList = jobScheduler.getTasks();
+        return WebResult.success(jobStatusList);
     }
 
     @Operation(summary = "重置应用构建状态", description = "N")
@@ -113,7 +106,7 @@ public class BuildDeployController {
     @Operation(summary = "取消构建任务", description = "N")
     @PostMapping(value = "/cancel_task", produces = MediaType.APPLICATION_JSON_VALUE)
     public String cancelTask(String appId) {
-        //buildApp.cancelTask(appId);
+        jobScheduler.cancelJob(appId);
         return WebResult.failWithMsg("接口未实现");
     }
 

+ 26 - 0
mgr/src/main/java/cn/reghao/devops/mgr/ops/builder/model/vo/JobHolder.java

@@ -0,0 +1,26 @@
+package cn.reghao.devops.mgr.ops.builder.model.vo;
+
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+import java.util.concurrent.Future;
+
+/**
+ * @author reghao
+ * @date 2026-03-19 13:20:48
+ */
+@NoArgsConstructor
+@Setter
+@Getter
+public class JobHolder {
+    private String jobId;
+    private Future<?> future;
+    private long submitTime;
+    private volatile boolean running = false; // 是否已进入运行阶段(获取到信号量)
+
+    public JobHolder(String jobId) {
+        this.jobId = jobId;
+        this.submitTime = System.currentTimeMillis();
+    }
+}

+ 20 - 0
mgr/src/main/java/cn/reghao/devops/mgr/ops/builder/model/vo/JobStatus.java

@@ -0,0 +1,20 @@
+package cn.reghao.devops.mgr.ops.builder.model.vo;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+/**
+ * @author reghao
+ * @date 2026-03-19 13:23:09
+ */
+@NoArgsConstructor
+@AllArgsConstructor
+@Setter
+@Getter
+public class JobStatus {
+    private String appId;
+    private String status;
+    private long waitTime;
+}

+ 53 - 7
mgr/src/main/java/cn/reghao/devops/mgr/ops/builder/service/JobScheduler.java

@@ -1,39 +1,85 @@
 package cn.reghao.devops.mgr.ops.builder.service;
 
+import cn.reghao.devops.mgr.ops.builder.model.vo.JobHolder;
+import cn.reghao.devops.mgr.ops.builder.model.vo.JobStatus;
+import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Component;
 
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Semaphore;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.*;
+import java.util.stream.Collectors;
 
 /**
  * @author reghao
  * @date 2026-02-26 13:19:26
  */
+@Slf4j
 @Component
 public class JobScheduler {
     // 限制同时运行的容器总数为 cpu 数量
     private final Semaphore semaphore = new Semaphore(Runtime.getRuntime().availableProcessors());
     // 用于异步执行任务的线程池
     private final ExecutorService executorService = Executors.newCachedThreadPool();
+    // 核心:保存 jobId -> 任务包装对象的映射
+    private final Map<String, JobHolder> jobMap = new ConcurrentHashMap<>();
 
     public void submitJob(String jobId, Runnable buildTask) {
-        executorService.submit(() -> {
+        JobHolder holder = new JobHolder(jobId);
+        Future<?> future = executorService.submit(() -> {
             try {
-                System.out.println("Job [" + jobId + "] 正在等待资源...");
+                log.info("Job [{}] 正在等待资源...", jobId);
                 // 1. 申请许可 (如果没有空位,线程会阻塞在这里)
                 semaphore.acquire();
 
-                System.out.println("Job [" + jobId + "] 获取资源,开始运行容器。");
+                // 检查是否在等待期间被取消
+                if (Thread.currentThread().isInterrupted()) return;
+                holder.setRunning(true);
+                log.info("Job [{}] 获取资源,开始运行容器...", jobId);
                 // 2. 执行真正的构建逻辑
                 buildTask.run();
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
             } finally {
                 // 3. 无论构建成功还是抛出异常,必须释放许可
-                System.out.println("Job [" + jobId + "] 运行结束,释放资源。");
                 semaphore.release();
+                jobMap.remove(jobId);
+                log.info("Job [{}] 运行结束/取消,释放资源...", jobId);
             }
         });
+
+        holder.setFuture(future);
+        jobMap.put(jobId, holder);
+    }
+
+    /**
+     * 获取当前所有任务
+     */
+    public List<JobStatus> getTasks() {
+        return jobMap.values().stream().map(holder -> {
+            JobStatus jobStatus = new JobStatus();
+            jobStatus.setAppId(holder.getJobId());
+            // 如果 running 为 true,说明已经过了 semaphore.acquire()
+            jobStatus.setStatus(holder.isRunning() ? "RUNNING" : "WAITING");
+            jobStatus.setWaitTime((System.currentTimeMillis() - holder.getSubmitTime()/1000));
+            return jobStatus;
+        }).collect(Collectors.toList());
+    }
+
+    /**
+     * 取消任务
+     */
+    public boolean cancelJob(String jobId) {
+        JobHolder holder = jobMap.get(jobId);
+        if (holder != null && !holder.isRunning()) {
+            log.info("正在尝试取消 Job [{}]", jobId);
+            // mayInterruptIfRunning = true 会向线程发送 interrupt 信号
+            // 如果线程在 semaphore.acquire() 阻塞,会抛出 InterruptedException
+            // 如果线程正在执行 buildTask,需要 buildTask 内部支持响应中断
+            boolean canceled = holder.getFuture().cancel(true);
+            jobMap.remove(jobId);
+            return canceled;
+        }
+        return false;
     }
 }