Przeglądaj źródła

1.将延时任务放到 file-service 中实现
2.file-service 提供一个 JobService rpc 接口供其他服务提交延时任务

reghao 1 rok temu
rodzic
commit
3a52bf1f6e
26 zmienionych plików z 505 dodań i 236 usunięć
  1. 11 0
      content/content-api/src/main/java/cn/reghao/tnb/content/api/iface/MallService.java
  2. 3 0
      content/content-api/src/main/java/cn/reghao/tnb/content/api/iface/UserContentService.java
  3. 25 1
      content/content-service/src/main/java/cn/reghao/tnb/content/app/mall/db/repository/MallRepository.java
  4. 0 33
      content/content-service/src/main/java/cn/reghao/tnb/content/app/mall/job/JobService.java
  5. 0 32
      content/content-service/src/main/java/cn/reghao/tnb/content/app/mall/job/task/OrderTask.java
  6. 36 0
      content/content-service/src/main/java/cn/reghao/tnb/content/app/mall/rpc/MallServiceImpl.java
  7. 14 21
      content/content-service/src/main/java/cn/reghao/tnb/content/app/mall/service/OrderService.java
  8. 4 0
      content/content-service/src/main/java/cn/reghao/tnb/content/app/vod/db/repository/VideoRepository.java
  9. 4 3
      content/content-service/src/main/java/cn/reghao/tnb/content/app/vod/model/po/VideoPost.java
  10. 13 0
      content/content-service/src/main/java/cn/reghao/tnb/content/app/vod/rpc/UserContentServiceImpl.java
  11. 24 6
      content/content-service/src/main/java/cn/reghao/tnb/content/app/vod/service/impl/VideoPostServiceImpl.java
  12. 1 1
      content/content-service/src/main/resources/application.yml
  13. 1 2
      file/file-api/src/main/java/cn/reghao/file/api/iface/FileService.java
  14. 11 0
      file/file-api/src/main/java/cn/reghao/file/api/iface/JobService.java
  15. 26 0
      file/file-service/src/main/java/cn/reghao/tnb/file/app/delay/DelayJob.java
  16. 71 0
      file/file-service/src/main/java/cn/reghao/tnb/file/app/delay/JobContext.java
  17. 15 15
      file/file-service/src/main/java/cn/reghao/tnb/file/app/delay/task/ConvertTask.java
  18. 31 0
      file/file-service/src/main/java/cn/reghao/tnb/file/app/delay/task/ConvertTaskInfo.java
  19. 22 0
      file/file-service/src/main/java/cn/reghao/tnb/file/app/delay/task/OrderTask.java
  20. 22 0
      file/file-service/src/main/java/cn/reghao/tnb/file/app/delay/task/PublishVideoTask.java
  21. 56 0
      file/file-service/src/main/java/cn/reghao/tnb/file/app/model/constant/JobStatus.java
  22. 6 4
      file/file-service/src/main/java/cn/reghao/tnb/file/app/model/po/JobDetail.java
  23. 0 28
      file/file-service/src/main/java/cn/reghao/tnb/file/app/model/vo/ConvertJob.java
  24. 3 19
      file/file-service/src/main/java/cn/reghao/tnb/file/app/rpc/FileServiceImpl.java
  25. 106 0
      file/file-service/src/main/java/cn/reghao/tnb/file/app/rpc/JobServiceImpl.java
  26. 0 71
      file/file-service/src/main/java/cn/reghao/tnb/file/app/service/ConvertService.java

+ 11 - 0
content/content-api/src/main/java/cn/reghao/tnb/content/api/iface/MallService.java

@@ -0,0 +1,11 @@
+package cn.reghao.tnb.content.api.iface;
+
+import cn.reghao.jutil.jdk.result.Result;
+
+/**
+ * @author reghao
+ * @date 2024-12-04 11:37:08
+ */
+public interface MallService {
+    Result orderTimeout(long orderId);
+}

+ 3 - 0
content/content-api/src/main/java/cn/reghao/tnb/content/api/iface/UserContentService.java

@@ -1,5 +1,7 @@
 package cn.reghao.tnb.content.api.iface;
 
+import cn.reghao.jutil.jdk.result.Result;
+
 import java.util.List;
 
 /**
@@ -15,4 +17,5 @@ public interface UserContentService {
      * @date 2024-08-03 23:08:488
      */
     List<Long> getContentUser();
+    Result publishVideoPost(String videoId);
 }

+ 25 - 1
content/content-service/src/main/java/cn/reghao/tnb/content/app/mall/db/repository/MallRepository.java

@@ -1,10 +1,16 @@
 package cn.reghao.tnb.content.app.mall.db.repository;
 
+import cn.reghao.tnb.content.app.mall.db.mapper.OrderMapper;
 import cn.reghao.tnb.content.app.mall.db.mapper.ProductMapper;
+import cn.reghao.tnb.content.app.mall.db.mapper.ProductSnapshotMapper;
+import cn.reghao.tnb.content.app.mall.model.po.Order;
 import cn.reghao.tnb.content.app.mall.model.po.Product;
+import cn.reghao.tnb.content.app.mall.model.po.ProductSnapshot;
+import cn.reghao.tnb.content.app.mall.model.vo.OrderDetail;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.cache.annotation.Cacheable;
 import org.springframework.stereotype.Repository;
+import org.springframework.transaction.annotation.Transactional;
 
 /**
  * @author reghao
@@ -14,9 +20,20 @@ import org.springframework.stereotype.Repository;
 @Repository
 public class MallRepository {
     private final ProductMapper productMapper;
+    private final ProductSnapshotMapper productSnapshotMapper;
+    private final OrderMapper orderMapper;
 
-    public MallRepository(ProductMapper productMapper) {
+    public MallRepository(ProductMapper productMapper, ProductSnapshotMapper productSnapshotMapper,
+                          OrderMapper orderMapper) {
         this.productMapper = productMapper;
+        this.productSnapshotMapper = productSnapshotMapper;
+        this.orderMapper = orderMapper;
+    }
+
+    @Transactional(rollbackFor = Exception.class)
+    public void saveOrder(Order order, ProductSnapshot productSnapshot) {
+        orderMapper.save(order);
+        productSnapshotMapper.save(productSnapshot);
     }
 
     @Cacheable(cacheNames = "tnb:mall:product", key = "#itemId", unless = "#result == null")
@@ -24,4 +41,11 @@ public class MallRepository {
         log.info("miss cache");
         return productMapper.findByItemId(itemId);
     }
+
+    public OrderDetail getOrderDetail(long orderId) {
+        Order order = orderMapper.findByOrderId(orderId);
+        ProductSnapshot productSnapshot = productSnapshotMapper.findByOrderId(orderId);
+        OrderDetail orderDetail = new OrderDetail(order, productSnapshot);
+        return orderDetail;
+    }
 }

+ 0 - 33
content/content-service/src/main/java/cn/reghao/tnb/content/app/mall/job/JobService.java

@@ -1,33 +0,0 @@
-package cn.reghao.tnb.content.app.mall.job;
-
-import cn.reghao.jutil.jdk.thread.ThreadPoolWrapper;
-import cn.reghao.tnb.content.app.mall.db.mapper.OrderMapper;
-import cn.reghao.tnb.content.app.mall.job.task.OrderTask;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Service;
-
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-/**
- * @author reghao
- * @date 2024-09-14 16:01:30
- */
-@Slf4j
-@Service
-public class JobService {
-    private final ScheduledExecutorService scheduler;
-    private final OrderMapper orderMapper;
-
-    public JobService(OrderMapper orderMapper) {
-        this.scheduler = ThreadPoolWrapper.scheduledThreadPool("delay-job", 10);
-        this.orderMapper = orderMapper;
-    }
-
-    public void addDelayJob(long orderId) {
-        OrderTask orderTask = new OrderTask(orderId, orderMapper);
-        ScheduledFuture<?> future = scheduler.schedule(orderTask, 15, TimeUnit.MINUTES);
-        log.info("add order related delay job");
-    }
-}

+ 0 - 32
content/content-service/src/main/java/cn/reghao/tnb/content/app/mall/job/task/OrderTask.java

@@ -1,32 +0,0 @@
-package cn.reghao.tnb.content.app.mall.job.task;
-
-import cn.reghao.tnb.content.app.mall.db.mapper.OrderMapper;
-import cn.reghao.tnb.content.app.mall.model.constant.OrderStatus;
-import cn.reghao.tnb.content.app.mall.model.po.Order;
-import lombok.extern.slf4j.Slf4j;
-
-/**
- * @author reghao
- * @date 2024-09-14 16:06:01
- */
-@Slf4j
-public class OrderTask implements Runnable {
-    private final long orderId;
-    private final OrderMapper orderMapper;
-
-    public OrderTask(long orderId, OrderMapper orderMapper) {
-        this.orderId = orderId;
-        this.orderMapper = orderMapper;
-    }
-
-    @Override
-    public void run() {
-        Order order = orderMapper.findByOrderId(orderId);
-        int status = order.getStatus();
-        if (status == OrderStatus.toPay.getCode()) {
-            orderMapper.updateOrderStatus(orderId, OrderStatus.cancelled.getCode());
-            log.info("cancel order");
-        }
-        log.info("exec order task");
-    }
-}

+ 36 - 0
content/content-service/src/main/java/cn/reghao/tnb/content/app/mall/rpc/MallServiceImpl.java

@@ -0,0 +1,36 @@
+package cn.reghao.tnb.content.app.mall.rpc;
+
+import cn.reghao.jutil.jdk.result.Result;
+import cn.reghao.tnb.content.api.iface.MallService;
+import cn.reghao.tnb.content.app.mall.db.mapper.OrderMapper;
+import cn.reghao.tnb.content.app.mall.model.constant.OrderStatus;
+import cn.reghao.tnb.content.app.mall.model.po.Order;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.dubbo.config.annotation.DubboService;
+import org.springframework.stereotype.Service;
+
+/**
+ * @author reghao
+ * @date 2024-12-04 11:37:41
+ */
+@Slf4j
+@DubboService
+@Service
+public class MallServiceImpl implements MallService {
+    private final OrderMapper orderMapper;
+
+    public MallServiceImpl(OrderMapper orderMapper) {
+        this.orderMapper = orderMapper;
+    }
+
+    public Result orderTimeout(long orderId) {
+        Order order = orderMapper.findByOrderId(orderId);
+        int currentStatus = order.getStatus();
+        if (currentStatus == OrderStatus.toPay.getCode()) {
+            orderMapper.updateOrderStatus(orderId, OrderStatus.cancelled.getCode());
+            log.info("cancel order");
+        }
+
+        return Result.success();
+    }
+}

+ 14 - 21
content/content-service/src/main/java/cn/reghao/tnb/content/app/mall/service/OrderService.java

@@ -1,11 +1,10 @@
 package cn.reghao.tnb.content.app.mall.service;
 
+import cn.reghao.file.api.iface.JobService;
 import cn.reghao.jutil.tool.id.SnowFlake;
 import cn.reghao.tnb.common.auth.UserContext;
 import cn.reghao.tnb.content.app.mall.db.mapper.OrderMapper;
-import cn.reghao.tnb.content.app.mall.db.mapper.ProductMapper;
-import cn.reghao.tnb.content.app.mall.db.mapper.ProductSnapshotMapper;
-import cn.reghao.tnb.content.app.mall.job.JobService;
+import cn.reghao.tnb.content.app.mall.db.repository.MallRepository;
 import cn.reghao.tnb.content.app.mall.model.constant.OrderStatus;
 import cn.reghao.tnb.content.app.mall.model.dto.OrderDto;
 import cn.reghao.tnb.content.app.mall.model.dto.OrderItem;
@@ -13,6 +12,7 @@ import cn.reghao.tnb.content.app.mall.model.po.Order;
 import cn.reghao.tnb.content.app.mall.model.po.Product;
 import cn.reghao.tnb.content.app.mall.model.po.ProductSnapshot;
 import cn.reghao.tnb.content.app.mall.model.vo.OrderDetail;
+import org.apache.dubbo.config.annotation.DubboReference;
 import org.springframework.stereotype.Service;
 
 import java.util.ArrayList;
@@ -26,25 +26,22 @@ import java.util.stream.Collectors;
  */
 @Service
 public class OrderService {
+    @DubboReference(check = false)
+    private JobService jobService;
     private final OrderMapper orderMapper;
     private final SnowFlake idGenerator;
     private final CartService cartService;
-    private final ProductMapper productMapper;
-    private final ProductSnapshotMapper productSnapshotMapper;
-    private final JobService jobService;
+    private final MallRepository mallRepository;
 
     public OrderService(OrderMapper orderMapper, CartService cartService,
-                        ProductMapper productMapper, ProductSnapshotMapper productSnapshotMapper,
-                        JobService jobService) {
+                        MallRepository mallRepository) {
         this.orderMapper = orderMapper;
         this.idGenerator = new SnowFlake(1L, 1L);
         this.cartService = cartService;
-        this.productMapper = productMapper;
-        this.productSnapshotMapper = productSnapshotMapper;
-        this.jobService = jobService;
+        this.mallRepository = mallRepository;
     }
 
-    public long submit(OrderDto orderDto) {
+    public long submitCart(OrderDto orderDto) {
         long deliveryId = orderDto.getDeliveryId();
         // shop -> products
         Map<Long, List<OrderItem>> map = orderDto.getItems().stream().collect(Collectors.groupingBy(OrderItem::getShopId));
@@ -52,7 +49,7 @@ public class OrderService {
         map.forEach((shopId, items) -> {
             items.forEach(item -> {
                 long orderId = idGenerator.nextId();
-                Product product = productMapper.findByItemId(item.getItemId());
+                Product product = mallRepository.getProduct(item.getItemId());
                 Order order = new Order(deliveryId, product.getPrice(), orderId, item);
                 orders.add(order);
             });
@@ -77,13 +74,11 @@ public class OrderService {
         long deliveryId = orderDto.getDeliveryId();
         OrderItem orderItem = orderDto.getItems().get(0);
         long orderId = idGenerator.nextId();
-        Product product = productMapper.findByItemId(orderItem.getItemId());
+        Product product = mallRepository.getProduct(orderItem.getItemId());
         Order order = new Order(orderId, product.getPrice(), deliveryId, orderItem);
-        orderMapper.save(order);
-
         ProductSnapshot productSnapshot = new ProductSnapshot(orderId, product);
-        productSnapshotMapper.save(productSnapshot);
-        jobService.addDelayJob(orderId);
+        mallRepository.saveOrder(order, productSnapshot);
+        long jobId = jobService.addOrderTimeoutJob(orderId);
         return orderId;
     }
 
@@ -117,9 +112,7 @@ public class OrderService {
     }
 
     public OrderDetail getOrderDetail(long orderId) {
-        Order order = orderMapper.findByOrderId(orderId);
-        ProductSnapshot productSnapshot = productSnapshotMapper.findByOrderId(orderId);
-        OrderDetail orderDetail = new OrderDetail(order, productSnapshot);
+        OrderDetail orderDetail = mallRepository.getOrderDetail(orderId);
         return orderDetail;
     }
 }

+ 4 - 0
content/content-service/src/main/java/cn/reghao/tnb/content/app/vod/db/repository/VideoRepository.java

@@ -56,6 +56,10 @@ public class VideoRepository {
         videoPostMapper.updateVideoPublish(videoFileId, VideoStatus.publish.getValue());
     }
 
+    public void updateVideoPublish(String videoId) {
+        videoPostMapper.updateVideoPublish(videoId, VideoStatus.publish.getValue());
+    }
+
     public List<String> deleteVideoFile(String videoFileId) {
         List<String> objectIds = videoFileMapper.findByVideoFileId(videoFileId).stream()
                 .map(VideoFile::getObjectId)

+ 4 - 3
content/content-service/src/main/java/cn/reghao/tnb/content/app/vod/model/po/VideoPost.java

@@ -42,7 +42,8 @@ public class VideoPost extends BaseObject<Integer> {
     private LocalDateTime publishAt;
     private Long publishBy;
 
-    public VideoPost(String videoId, VideoPublishSbt video, String coverUrl, VideoFile videoInfo, String codec) {
+    public VideoPost(String videoId, VideoPublishSbt video, String coverUrl,
+                     VideoFile videoInfo, int status, long publishAt) {
         this.videoId = videoId;
         this.videoFileId = video.getVideoFileId();
         this.channelId = video.getChannelCode();
@@ -56,8 +57,8 @@ public class VideoPost extends BaseObject<Integer> {
         this.horizontal = videoInfo.getHorizontal();
         this.coverUrl = coverUrl;
         this.scope = video.getScope();
-        this.status = VideoStatus.publish.getCode();
-        this.publishAt = video.getScheduledTime() != null ? DateTimeConverter.localDateTime(video.getScheduledTime()) : LocalDateTime.now();
+        this.status = status;
+        this.publishAt = DateTimeConverter.localDateTime(publishAt);
         this.publishBy = UserContext.getUser();
     }
 

+ 13 - 0
content/content-service/src/main/java/cn/reghao/tnb/content/app/vod/rpc/UserContentServiceImpl.java

@@ -1,8 +1,11 @@
 package cn.reghao.tnb.content.app.vod.rpc;
 
+import cn.reghao.jutil.jdk.result.Result;
+import cn.reghao.tnb.content.api.constant.VideoStatus;
 import cn.reghao.tnb.content.api.iface.UserContentService;
 import cn.reghao.tnb.content.app.vod.db.repository.VideoRepository;
 import cn.reghao.tnb.common.db.GroupCount;
+import cn.reghao.tnb.content.app.vod.model.po.VideoPost;
 import org.apache.dubbo.config.annotation.DubboService;
 import org.springframework.stereotype.Service;
 
@@ -29,4 +32,14 @@ public class UserContentServiceImpl implements UserContentService {
                 .map(groupCount -> Long.valueOf(groupCount.getId()))
                 .collect(Collectors.toList());
     }
+
+    @Override
+    public Result publishVideoPost(String videoId) {
+        VideoPost videoPost = videoRepository.getVideoPost(videoId);
+        if (videoPost.getStatus() != VideoStatus.publish.getValue()) {
+            videoRepository.updateVideoPublish(videoId);
+        }
+
+        return Result.success();
+    }
 }

+ 24 - 6
content/content-service/src/main/java/cn/reghao/tnb/content/app/vod/service/impl/VideoPostServiceImpl.java

@@ -1,9 +1,8 @@
 package cn.reghao.tnb.content.app.vod.service.impl;
 
-import cn.reghao.file.api.iface.FileService;
+import cn.reghao.file.api.iface.JobService;
 import cn.reghao.file.api.iface.OssService;
 import cn.reghao.jutil.jdk.result.Result;
-import cn.reghao.jutil.jdk.result.ResultStatus;
 import cn.reghao.jutil.tool.id.IdGenerator;
 import cn.reghao.oss.sdk.model.dto.media.ImageInfo;
 import cn.reghao.oss.sdk.model.dto.media.VideoInfo;
@@ -37,7 +36,7 @@ public class VideoPostServiceImpl implements VideoPostService {
     @DubboReference(check = false)
     private OssService ossService;
     @DubboReference(check = false)
-    private FileService fileService;
+    private JobService jobService;
 
     private int pageSize = 12;
     private final VideoPostMapper videoPostMapper;
@@ -100,18 +99,37 @@ public class VideoPostServiceImpl implements VideoPostService {
             String videoId = idGenerator.getUuid();
             Set<String> tags = videoPublishSbt.getTags();
             String tags1 = tags.toString().replace("[", "").replace("]", "");
-            VideoPost videoPost = new VideoPost(videoId, videoPublishSbt, coverUrl, videoFile, codec);
+
+            VideoPost videoPost;
+            Long publishAt = videoPublishSbt.getScheduledTime();
+            if (publishAt == null) {
+                int postStatus = VideoStatus.publish.getValue();
+                long publishAt1 = System.currentTimeMillis();
+                videoPost = new VideoPost(videoId, videoPublishSbt, coverUrl, videoFile, postStatus, publishAt1);
+            } else {
+                int postStatus = VideoStatus.censor.getValue();
+                videoPost = new VideoPost(videoId, videoPublishSbt, coverUrl, videoFile, postStatus, publishAt);
+
+                long duration = publishAt-System.currentTimeMillis();
+                if (duration < 600_000) {
+                    return Result.fail("定时发布的时间至少应在 10 分钟后");
+                }
+            }
 
             if (!videoInfo.getFormatName().contains("mov,mp4")) {
-                Result result1 = fileService.convertVideo(videoFileId, channelCode);
+                long jobId = jobService.addConvertVideoJob(videoFileId, channelCode);
+                /*Result result1 = fileService.convertVideo(videoFileId, channelCode);
                 if (result1.getCode() == ResultStatus.SUCCESS.getCode()) {
                     videoPost.setStatus(VideoStatus.converted.getValue());
                 } else {
                     log.error(result1.getMsg());
-                }
+                }*/
             }
 
             saveVideo(videoFile, videoPost, tags1);
+            if (publishAt != null) {
+                long jobId = jobService.addPublishVideoJob(videoId, videoPublishSbt.getScheduledTime());
+            }
             result = Result.success();
         } catch (Exception e) {
             result = Result.fail(e.getMessage());

+ 1 - 1
content/content-service/src/main/resources/application.yml

@@ -1,6 +1,6 @@
 dubbo:
   scan:
-    base-packages: cn.reghao.tnb.content.app.vod.rpc
+    base-packages: cn.reghao.tnb.content.app.vod.rpc,cn.reghao.tnb.content.app.mall.rpc
   protocol:
     name: dubbo
     port: 6105

+ 1 - 2
file/file-api/src/main/java/cn/reghao/file/api/iface/FileService.java

@@ -10,6 +10,5 @@ import cn.reghao.jutil.jdk.result.Result;
  */
 public interface FileService {
     String getAccountAvatar(long userId);
-    Result convertVideo(String videoFileId, int channelCode);
-    PageList<JobInfo> getConvertJobs(int pn, int ps);
+    PageList<JobInfo> getJobs(int pn, int ps);
 }

+ 11 - 0
file/file-api/src/main/java/cn/reghao/file/api/iface/JobService.java

@@ -0,0 +1,11 @@
+package cn.reghao.file.api.iface;
+
+/**
+ * @author reghao
+ * @date 2024-12-04 11:46:04
+ */
+public interface JobService {
+    long addOrderTimeoutJob(long orderId);
+    long addPublishVideoJob(String videoId, long publishAt);
+    long addConvertVideoJob(String videoFileId, int channelCode);
+}

+ 26 - 0
file/file-service/src/main/java/cn/reghao/tnb/file/app/delay/DelayJob.java

@@ -0,0 +1,26 @@
+package cn.reghao.tnb.file.app.delay;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+/**
+ * @author reghao
+ * @date 2024-12-04 10:49:06
+ */
+@AllArgsConstructor
+@Getter
+public class DelayJob {
+    private long jobId;
+    private String jobName;
+    private Runnable job;
+    private long delaySecond;
+    private long execTime;
+
+    public DelayJob(long jobId, Runnable job, long delaySecond) {
+        this.jobId = jobId;
+        this.jobName = job.getClass().getSimpleName();
+        this.job = job;
+        this.delaySecond = delaySecond;
+        this.execTime = System.currentTimeMillis() + delaySecond*1000;
+    }
+}

+ 71 - 0
file/file-service/src/main/java/cn/reghao/tnb/file/app/delay/JobContext.java

@@ -0,0 +1,71 @@
+package cn.reghao.tnb.file.app.delay;
+
+import cn.reghao.jutil.jdk.thread.ThreadPoolWrapper;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.PostConstruct;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author reghao
+ * @date 2024-09-14 16:01:30
+ */
+@Slf4j
+@Service
+public class JobContext {
+    private final ScheduledExecutorService scheduler;
+    private final Map<Long, Future<?>> jobMap = new HashMap<>();
+
+    public JobContext() {
+        this.scheduler = ThreadPoolWrapper.scheduledThreadPool("delay-job", 10);
+    }
+
+    public void addJob(DelayJob delayJob) {
+        ScheduledFuture<?> future = scheduler.schedule(delayJob.getJob(), delayJob.getDelaySecond(), TimeUnit.SECONDS);
+        jobMap.put(delayJob.getJobId(), future);
+    }
+
+    public void cancelJob(long jobId) {
+        Future<?> future = jobMap.get(jobId);
+        if (future != null && !future.isCancelled()) {
+            future.cancel(true);
+        }
+    }
+
+    public int getJobCount() {
+        List<Long> jobIds = new ArrayList<>();
+        jobMap.forEach((jobId, future) -> {
+            if (!future.isDone()) {
+                jobIds.add(jobId);
+            }
+        });
+        return jobIds.size();
+    }
+
+    @PostConstruct
+    public void sync() {
+        scheduler.scheduleAtFixedRate(new SyncTask(), 0, 10, TimeUnit.SECONDS);
+    }
+
+    class SyncTask implements Runnable {
+        @Override
+        public void run() {
+            List<Long> jobCompleted = new ArrayList<>();
+            jobMap.forEach((jobId, future) -> {
+                if (future.isDone()) {
+                    log.info("延时任务 {} 已完成", jobId);
+                    jobCompleted.add(jobId);
+                }
+            });
+            jobCompleted.forEach(jobMap::remove);
+        }
+    }
+}

+ 15 - 15
file/file-service/src/main/java/cn/reghao/tnb/file/app/task/ConvertTask.java → file/file-service/src/main/java/cn/reghao/tnb/file/app/delay/task/ConvertTask.java

@@ -1,4 +1,4 @@
-package cn.reghao.tnb.file.app.task;
+package cn.reghao.tnb.file.app.delay.task;
 
 import cn.reghao.jutil.media.FFmpegWrapper;
 import cn.reghao.oss.sdk.OssConsoleClient;
@@ -6,7 +6,7 @@ import cn.reghao.oss.sdk.model.rest.UploadFileRet;
 import cn.reghao.tnb.content.api.dto.VideoConvertedDto;
 import cn.reghao.tnb.content.api.iface.AdminVideoService;
 import cn.reghao.tnb.file.app.db.mapper.JobDetailMapper;
-import cn.reghao.tnb.file.app.model.vo.ConvertJob;
+import cn.reghao.tnb.file.app.model.constant.JobStatus;
 import org.apache.commons.io.FileUtils;
 
 import java.io.File;
@@ -17,30 +17,31 @@ import java.time.LocalDateTime;
  * @date 2024-11-25 09:01:04
  */
 public class ConvertTask implements Runnable {
-    private final ConvertJob convertJob;
+    private final long jobId;
     private final JobDetailMapper jobDetailMapper;
+    private final ConvertTaskInfo convertTaskInfo;
     private final OssConsoleClient ossConsoleClient;
     private final AdminVideoService adminVideoService;
 
-    public ConvertTask(ConvertJob convertJob, JobDetailMapper jobDetailMapper,
-                       OssConsoleClient ossConsoleClient, AdminVideoService adminVideoService) {
-        this.convertJob = convertJob;
+    public ConvertTask(long jobId, JobDetailMapper jobDetailMapper, ConvertTaskInfo convertTaskInfo) {
+        this.jobId = jobId;
         this.jobDetailMapper = jobDetailMapper;
-        this.ossConsoleClient = ossConsoleClient;
-        this.adminVideoService = adminVideoService;
+        this.convertTaskInfo = convertTaskInfo;
+        this.ossConsoleClient = convertTaskInfo.getOssConsoleClient();
+        this.adminVideoService = convertTaskInfo.getAdminVideoService();
     }
 
     @Override
     public void run() {
-        String videoFileId = convertJob.getVideoFileId();
-        String srcPath = convertJob.getSrcPath();
-        String destPath = convertJob.getDestPath();
+        String videoFileId = convertTaskInfo.getVideoFileId();
+        String srcPath = convertTaskInfo.getSrcPath();
+        String destPath = convertTaskInfo.getDestPath();
         String format = "mp4";
         try {
-            String status = "terminated";
+            String status = JobStatus.Success.getDesc();
             int ret = FFmpegWrapper.formatCovert(srcPath, destPath, format);
             if (ret == 0) {
-                int channelCode = convertJob.getChannelCode();
+                int channelCode = convertTaskInfo.getChannelCode();
                 File destFile = new File(destPath);
                 UploadFileRet uploadFileRet = ossConsoleClient.postObject(destFile, channelCode);
                 if (uploadFileRet != null) {
@@ -49,13 +50,12 @@ public class ConvertTask implements Runnable {
                     adminVideoService.addConvertedVideo(videoConvertedDto);
                 }
             } else {
-                status = "failed";
+                status = JobStatus.Fail.getDesc();
             }
 
             FileUtils.deleteQuietly(new File(srcPath));
             FileUtils.deleteQuietly(new File(destPath));
 
-            long jobId = convertJob.getJobId();
             LocalDateTime current = LocalDateTime.now();
             jobDetailMapper.updateSetEnd(jobId, status, current);
         } catch (Exception e) {

+ 31 - 0
file/file-service/src/main/java/cn/reghao/tnb/file/app/delay/task/ConvertTaskInfo.java

@@ -0,0 +1,31 @@
+package cn.reghao.tnb.file.app.delay.task;
+
+import cn.reghao.oss.sdk.OssConsoleClient;
+import cn.reghao.tnb.content.api.iface.AdminVideoService;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+
+/**
+ * @author reghao
+ * @date 2024-11-25 09:21:25
+ */
+@NoArgsConstructor
+@Getter
+public class ConvertTaskInfo {
+    private String videoFileId;
+    private int channelCode;
+    private String srcPath;
+    private String destPath;
+    private OssConsoleClient ossConsoleClient;
+    private AdminVideoService adminVideoService;
+
+    public ConvertTaskInfo(String videoFileId, int channelCode, String srcPath, String destPath,
+                           OssConsoleClient ossConsoleClient, AdminVideoService adminVideoService) {
+        this.videoFileId = videoFileId;
+        this.channelCode = channelCode;
+        this.srcPath = srcPath;
+        this.destPath = destPath;
+        this.ossConsoleClient = ossConsoleClient;
+        this.adminVideoService = adminVideoService;
+    }
+}

+ 22 - 0
file/file-service/src/main/java/cn/reghao/tnb/file/app/delay/task/OrderTask.java

@@ -0,0 +1,22 @@
+package cn.reghao.tnb.file.app.delay.task;
+
+import cn.reghao.tnb.content.api.iface.MallService;
+
+/**
+ * @author reghao
+ * @date 2024-12-04 13:35:31
+ */
+public class OrderTask implements Runnable {
+    private final MallService mallService;
+    private final long orderId;
+
+    public OrderTask(MallService mallService, long orderId) {
+        this.mallService =mallService;
+        this.orderId = orderId;
+    }
+
+    @Override
+    public void run() {
+        mallService.orderTimeout(orderId);
+    }
+}

+ 22 - 0
file/file-service/src/main/java/cn/reghao/tnb/file/app/delay/task/PublishVideoTask.java

@@ -0,0 +1,22 @@
+package cn.reghao.tnb.file.app.delay.task;
+
+import cn.reghao.tnb.content.api.iface.UserContentService;
+
+/**
+ * @author reghao
+ * @date 2024-12-04 13:46:25
+ */
+public class PublishVideoTask implements Runnable {
+    private final UserContentService userContentService;
+    private final String videoId;
+
+    public PublishVideoTask(UserContentService userContentService, String videoId) {
+        this.userContentService = userContentService;
+        this.videoId = videoId;
+    }
+
+    @Override
+    public void run() {
+        userContentService.publishVideoPost(videoId);
+    }
+}

+ 56 - 0
file/file-service/src/main/java/cn/reghao/tnb/file/app/model/constant/JobStatus.java

@@ -0,0 +1,56 @@
+package cn.reghao.tnb.file.app.model.constant;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author reghao
+ * @date 2024-12-04 14:07:27
+ */
+public enum JobStatus {
+    Prepare(1, "准备执行"),
+    Running(2, "正在执行"),
+    Success(3, "执行成功"),
+    Fail(4, "执行失败");
+
+    private final int code;
+    private final String desc;
+    private static Map<Integer, String> descMap = new HashMap<>();
+    static {
+        for (JobStatus type : JobStatus.values()) {
+            descMap.put(type.code, type.desc);
+        }
+    }
+
+    JobStatus(Integer code, String desc) {
+        this.code = code;
+        this.desc = desc;
+    }
+
+    public String getName() {
+        return this.name();
+    }
+
+    /**
+     * 提供给 @ValidEnum 调用
+     *
+     * @param
+     * @return
+     * @date 2023-10-11 14:44:42
+     */
+    public int getValue() {
+        return this.code;
+    }
+
+    public Integer getCode() {
+        return code;
+    }
+
+    public String getDesc() {
+        return desc;
+    }
+
+    public static String getDescByCode(int code) {
+        return descMap.get(code);
+    }
+}

+ 6 - 4
file/file-service/src/main/java/cn/reghao/tnb/file/app/model/po/JobDetail.java

@@ -1,7 +1,9 @@
 package cn.reghao.tnb.file.app.model.po;
 
 import cn.reghao.jutil.jdk.db.BaseObject;
-import cn.reghao.tnb.file.app.model.vo.ConvertJob;
+import cn.reghao.tnb.file.app.delay.DelayJob;
+import cn.reghao.tnb.file.app.delay.task.ConvertTaskInfo;
+import cn.reghao.tnb.file.app.model.constant.JobStatus;
 import lombok.Getter;
 import lombok.NoArgsConstructor;
 import lombok.Setter;
@@ -22,10 +24,10 @@ public class JobDetail extends BaseObject<Integer> {
     private LocalDateTime startAt;
     private LocalDateTime endAt;
 
-    public JobDetail(long jobId, ConvertJob convertJob) {
+    public JobDetail(long jobId, DelayJob delayJob, String status) {
         this.jobId = jobId;
-        this.jobName = convertJob.getJobName();
-        this.status = "running";
+        this.jobName = delayJob.getJobName();
+        this.status = status;
         this.startAt = LocalDateTime.now();
     }
 }

+ 0 - 28
file/file-service/src/main/java/cn/reghao/tnb/file/app/model/vo/ConvertJob.java

@@ -1,28 +0,0 @@
-package cn.reghao.tnb.file.app.model.vo;
-
-import lombok.Getter;
-import lombok.NoArgsConstructor;
-
-/**
- * @author reghao
- * @date 2024-11-25 09:21:25
- */
-@NoArgsConstructor
-@Getter
-public class ConvertJob {
-    private long jobId;
-    private String jobName;
-    private String videoFileId;
-    private int channelCode;
-    private String srcPath;
-    private String destPath;
-
-    public ConvertJob(long jobId, String videoFileId, int channelCode, String srcPath, String destPath) {
-        this.jobId = jobId;
-        this.jobName = "视频转码任务";
-        this.videoFileId = videoFileId;
-        this.channelCode = channelCode;
-        this.srcPath = srcPath;
-        this.destPath = destPath;
-    }
-}

+ 3 - 19
file/file-service/src/main/java/cn/reghao/tnb/file/app/rpc/FileServiceImpl.java

@@ -3,7 +3,6 @@ package cn.reghao.tnb.file.app.rpc;
 import cn.reghao.file.api.dto.JobInfo;
 import cn.reghao.file.api.iface.FileService;
 import cn.reghao.jutil.jdk.db.PageList;
-import cn.reghao.jutil.jdk.result.Result;
 import cn.reghao.oss.sdk.model.dto.ServerInfo;
 import cn.reghao.oss.sdk.model.dto.media.ImageInfo;
 import cn.reghao.oss.sdk.model.rest.UploadFileRet;
@@ -13,7 +12,6 @@ import cn.reghao.tnb.file.app.config.AppProperties;
 import cn.reghao.tnb.file.app.config.OssConsoleClientFactory;
 import cn.reghao.tnb.file.app.db.mapper.JobDetailMapper;
 import cn.reghao.tnb.file.app.model.po.JobDetail;
-import cn.reghao.tnb.file.app.service.ConvertService;
 import cn.reghao.tnb.file.app.util.ImageUtil;
 import cn.reghao.tnb.file.app.model.constant.OssType;
 import org.apache.commons.io.FileUtils;
@@ -36,17 +34,14 @@ import java.util.stream.Collectors;
 public class FileServiceImpl implements FileService {
     private final OssConsoleClientFactory ossConsoleClientFactory;
     private final int ossType;
-    private final ConvertService convertService;
     private final JobDetailMapper jobDetailMapper;
     private final String baseDir;
     private final AppProperties appProperties;
 
-    private FileServiceImpl(OssConsoleClientFactory ossConsoleClientFactory, ConvertService convertService,
-                            JobDetailMapper jobDetailMapper, ServerProperties serverProperties,
-                            AppProperties appProperties) {
+    private FileServiceImpl(OssConsoleClientFactory ossConsoleClientFactory, JobDetailMapper jobDetailMapper,
+                            ServerProperties serverProperties, AppProperties appProperties) {
         this.ossConsoleClientFactory = ossConsoleClientFactory;
         this.ossType = appProperties.getOssType();
-        this.convertService = convertService;
         this.jobDetailMapper = jobDetailMapper;
         this.baseDir = serverProperties.getTomcat().getBasedir().getAbsolutePath();
         this.appProperties = appProperties;
@@ -98,17 +93,7 @@ public class FileServiceImpl implements FileService {
     }
 
     @Override
-    public Result convertVideo(String videoFileId, int channelCode) {
-        try {
-            convertService.convert(videoFileId, channelCode);
-            return Result.success();
-        } catch (Exception e) {
-            return Result.fail(e.getMessage());
-        }
-    }
-
-    @Override
-    public PageList<JobInfo> getConvertJobs(int pn, int ps) {
+    public PageList<JobInfo> getJobs(int pn, int ps) {
         List<JobDetail> list = jobDetailMapper.findAll();
         List<JobInfo> jobList = list.stream().map(jobDetail -> {
             long jobId = jobDetail.getJobId();
@@ -118,7 +103,6 @@ public class FileServiceImpl implements FileService {
             LocalDateTime endAt = jobDetail.getEndAt();
             return new JobInfo(jobId, jobName, status, startAt, endAt);
         }).collect(Collectors.toList());
-
         return PageList.pageList(pn, ps, jobList.size(), jobList);
     }
 }

+ 106 - 0
file/file-service/src/main/java/cn/reghao/tnb/file/app/rpc/JobServiceImpl.java

@@ -0,0 +1,106 @@
+package cn.reghao.tnb.file.app.rpc;
+
+import cn.reghao.file.api.iface.JobService;
+import cn.reghao.jutil.tool.id.SnowFlake;
+import cn.reghao.oss.sdk.OssConsoleClient;
+import cn.reghao.oss.sdk.model.dto.ObjectInfo;
+import cn.reghao.tnb.content.api.iface.AdminVideoService;
+import cn.reghao.tnb.content.api.iface.MallService;
+import cn.reghao.tnb.content.api.iface.UserContentService;
+import cn.reghao.tnb.file.app.config.OssConsoleClientFactory;
+import cn.reghao.tnb.file.app.db.mapper.JobDetailMapper;
+import cn.reghao.tnb.file.app.delay.JobContext;
+import cn.reghao.tnb.file.app.delay.DelayJob;
+import cn.reghao.tnb.file.app.delay.task.ConvertTask;
+import cn.reghao.tnb.file.app.delay.task.ConvertTaskInfo;
+import cn.reghao.tnb.file.app.delay.task.OrderTask;
+import cn.reghao.tnb.file.app.delay.task.PublishVideoTask;
+import cn.reghao.tnb.file.app.model.constant.JobStatus;
+import cn.reghao.tnb.file.app.model.po.JobDetail;
+import org.apache.dubbo.config.annotation.DubboReference;
+import org.apache.dubbo.config.annotation.DubboService;
+import org.springframework.boot.autoconfigure.web.ServerProperties;
+import org.springframework.stereotype.Service;
+
+import java.util.UUID;
+
+/**
+ * @author reghao
+ * @date 2024-12-04 13:31:48
+ */
+@DubboService
+@Service
+public class JobServiceImpl implements JobService {
+    @DubboReference(check = false, timeout = 60_000)
+    private UserContentService userContentService;
+    @DubboReference(check = false, timeout = 60_000)
+    private MallService mallService;
+    @DubboReference(check = false, timeout = 60_000)
+    private AdminVideoService adminVideoService;
+
+    private final SnowFlake idGenerator;
+    private final JobContext jobContext;
+    private final OssConsoleClientFactory ossConsoleClientFactory;
+    private final JobDetailMapper jobDetailMapper;
+    private final String baseDir;
+
+    public JobServiceImpl(JobContext jobContext, OssConsoleClientFactory ossConsoleClientFactory,
+                          JobDetailMapper jobDetailMapper, ServerProperties serverProperties) {
+        this.idGenerator = new SnowFlake(1L, 1L);
+        this.jobContext = jobContext;
+        this.ossConsoleClientFactory = ossConsoleClientFactory;
+        this.jobDetailMapper = jobDetailMapper;
+        this.baseDir = serverProperties.getTomcat().getBasedir().getAbsolutePath();
+    }
+
+    @Override
+    public long addOrderTimeoutJob(long orderId) {
+        long jobId = idGenerator.nextId();
+        OrderTask orderTask = new OrderTask(mallService, orderId);
+        long delaySecond = 15;
+        DelayJob delayJob = new DelayJob(jobId, orderTask, delaySecond);
+        jobContext.addJob(delayJob);
+
+        return jobId;
+    }
+
+    @Override
+    public long addPublishVideoJob(String videoId, long publishAt) {
+        long jobId = idGenerator.nextId();
+        PublishVideoTask publishVideoTask = new PublishVideoTask(userContentService, videoId);
+        long delaySecond = (publishAt - System.currentTimeMillis())/1000;
+        DelayJob delayJob = new DelayJob(jobId, publishVideoTask, delaySecond);
+        jobContext.addJob(delayJob);
+        return jobId;
+    }
+
+    @Override
+    public long addConvertVideoJob(String videoFileId, int channelCode) {
+        long jobId = idGenerator.nextId();
+        try {
+            OssConsoleClient ossConsoleClient = ossConsoleClientFactory.getOssConsoleClient();
+            ObjectInfo objectInfo = ossConsoleClient.getObjectInfo(channelCode, videoFileId);
+            if (objectInfo == null) {
+                throw new Exception("not found ObjectInfo by videoFileId " + videoFileId);
+            }
+
+            String objectName = objectInfo.getObjectName();
+            String localPath = ossConsoleClient.getObject(objectName, channelCode, baseDir);
+            String destPath = String.format("%s/%s", baseDir, UUID.randomUUID());
+            ConvertTaskInfo convertTaskInfo =
+                    new ConvertTaskInfo(videoFileId, channelCode, localPath, destPath, ossConsoleClient, adminVideoService);
+            ConvertTask convertTask = new ConvertTask(jobId, jobDetailMapper, convertTaskInfo);
+            long delaySecond = 5;
+            DelayJob delayJob = new DelayJob(jobId, convertTask, delaySecond);
+            jobContext.addJob(delayJob);
+
+            String status = JobStatus.Running.getDesc();
+            JobDetail jobDetail = new JobDetail(jobId, delayJob, status);
+            jobDetailMapper.save(jobDetail);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+
+        return jobId;
+    }
+}

+ 0 - 71
file/file-service/src/main/java/cn/reghao/tnb/file/app/service/ConvertService.java

@@ -1,71 +0,0 @@
-package cn.reghao.tnb.file.app.service;
-
-import cn.reghao.jutil.jdk.thread.ThreadPoolWrapper;
-import cn.reghao.jutil.tool.id.SnowFlake;
-import cn.reghao.oss.sdk.OssConsoleClient;
-import cn.reghao.oss.sdk.model.dto.ObjectInfo;
-import cn.reghao.tnb.content.api.iface.AdminVideoService;
-import cn.reghao.tnb.file.app.config.OssConsoleClientFactory;
-import cn.reghao.tnb.file.app.db.mapper.JobDetailMapper;
-import cn.reghao.tnb.file.app.model.po.JobDetail;
-import cn.reghao.tnb.file.app.model.vo.ConvertJob;
-import cn.reghao.tnb.file.app.task.ConvertTask;
-import org.apache.dubbo.config.annotation.DubboReference;
-import org.springframework.boot.autoconfigure.web.ServerProperties;
-import org.springframework.stereotype.Service;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-
-/**
- * @author reghao
- * @date 2024-11-25 08:52:52
- */
-@Service
-public class ConvertService {
-    @DubboReference(check = false, timeout = 60_000)
-    private AdminVideoService adminVideoService;
-    private final ExecutorService threadPool = ThreadPoolWrapper.threadPool("convert-pool", 20);
-    private final OssConsoleClientFactory ossConsoleClientFactory;
-    private final JobDetailMapper jobDetailMapper;
-    private final String baseDir;
-    private final Map<Long, Future<?>> futureMap = new HashMap<>();
-    private final SnowFlake idGenerator;
-
-    public ConvertService(OssConsoleClientFactory ossConsoleClientFactory, JobDetailMapper jobDetailMapper,
-                          ServerProperties serverProperties) {
-        this.ossConsoleClientFactory = ossConsoleClientFactory;
-        this.jobDetailMapper = jobDetailMapper;
-        this.baseDir = serverProperties.getTomcat().getBasedir().getAbsolutePath();
-        this.idGenerator = new SnowFlake(1, 1);
-    }
-
-    public void convert(String videoFileId, int channelCode) throws Exception {
-        OssConsoleClient ossConsoleClient = ossConsoleClientFactory.getOssConsoleClient();
-        ObjectInfo objectInfo = ossConsoleClient.getObjectInfo(channelCode, videoFileId);
-        if (objectInfo == null) {
-            throw new Exception("not found ObjectInfo by videoFileId " + videoFileId);
-        }
-
-        String objectName = objectInfo.getObjectName();
-        String localPath = ossConsoleClient.getObject(objectName, channelCode, baseDir);
-        String destPath = String.format("%s/%s", baseDir, UUID.randomUUID());
-
-        long jobId = idGenerator.nextId();
-        ConvertJob convertJob = new ConvertJob(jobId, videoFileId, channelCode, localPath, destPath);
-        JobDetail jobDetail = new JobDetail(jobId, convertJob);
-        jobDetailMapper.save(jobDetail);
-        Future<?> future = threadPool.submit(new ConvertTask(convertJob, jobDetailMapper, ossConsoleClient, adminVideoService));
-        futureMap.put(jobId, future);
-    }
-
-    public void cancel(long jobId) {
-        Future<?> future = futureMap.get(jobId);
-        if (future != null && !future.isCancelled()) {
-            future.cancel(true);
-        }
-    }
-}