Explorar o código

[manager]启动采集任务初始化

tomsun28 %!s(int64=4) %!d(string=hai) anos
pai
achega
4cfcaad7e2

+ 5 - 3
collector/src/main/java/com/usthe/collector/dispatch/entrance/internal/CollectJobService.java

@@ -57,10 +57,12 @@ public class CollectJobService {
      * @return long 任务ID
      */
     public long addAsyncCollectJob(Job job) {
-        long jobId = SnowFlakeIdGenerator.generateId();
-        job.setId(jobId);
+        if (job.getId() == 0L) {
+            long jobId = SnowFlakeIdGenerator.generateId();
+            job.setId(jobId);
+        }
         timerDispatch.addJob(job, null);
-        return jobId;
+        return job.getId();
     }
 
     /**

+ 7 - 0
manager/src/main/java/com/usthe/manager/dao/MonitorDao.java

@@ -41,6 +41,13 @@ public interface MonitorDao extends JpaRepository<Monitor, Long>, JpaSpecificati
     List<Monitor> findMonitorsByAppEquals(String app);
 
     /**
+     * 查询已下发采集任务的监控
+     * @param status 监控状态
+     * @return 监控列表
+     */
+    List<Monitor> findMonitorsByStatusNotInAndAndJobIdNotNull(List<Byte> status);
+
+    /**
      * 根据监控名称查询监控
      * @param name 监控名称
      * @return 监控列表

+ 66 - 0
manager/src/main/java/com/usthe/manager/service/JobSchedulerInit.java

@@ -0,0 +1,66 @@
+package com.usthe.manager.service;
+
+import com.usthe.collector.dispatch.entrance.internal.CollectJobService;
+import com.usthe.common.entity.job.Configmap;
+import com.usthe.common.entity.job.Job;
+import com.usthe.common.entity.manager.Monitor;
+import com.usthe.common.entity.manager.Param;
+import com.usthe.manager.dao.MonitorDao;
+import com.usthe.manager.dao.ParamDao;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.core.annotation.Order;
+import org.springframework.stereotype.Service;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * 采集任务调度初始化
+ * @author tom
+ * @date 2022/2/1 16:24
+ */
+@Service
+@Order(value = 2)
+@Slf4j
+public class JobSchedulerInit implements CommandLineRunner {
+
+    @Autowired
+    private AppService appService;
+
+    @Autowired
+    private CollectJobService collectJobService;
+
+    @Autowired
+    private MonitorDao monitorDao;
+
+    @Autowired
+    private ParamDao paramDao;
+
+    @Override
+    public void run(String... args) throws Exception {
+        // 读取数据库已经添加应用 构造采集任务
+        List<Monitor> monitors = monitorDao.findMonitorsByStatusNotInAndAndJobIdNotNull(Arrays.asList((byte)0, (byte)4));
+        for (Monitor monitor : monitors) {
+            try {
+                // 构造采集任务Job实体
+                Job appDefine = appService.getAppDefine(monitor.getApp());
+                appDefine.setId(monitor.getJobId());
+                appDefine.setMonitorId(monitor.getId());
+                appDefine.setInterval(monitor.getIntervals());
+                appDefine.setCyclic(true);
+                appDefine.setTimestamp(System.currentTimeMillis());
+                List<Param> params = paramDao.findParamsByMonitorId(monitor.getId());
+                List<Configmap> configmaps = params.stream().map(param ->
+                        new Configmap(param.getField(), param.getValue(), param.getType())).collect(Collectors.toList());
+                appDefine.setConfigmap(configmaps);
+                // 下发采集任务
+                collectJobService.addAsyncCollectJob(appDefine);
+            } catch (Exception e) {
+                log.error("init monitor job: {} error,continue next monitor", monitor, e);
+            }
+        }
+    }
+}

+ 2 - 0
manager/src/main/java/com/usthe/manager/service/impl/AppServiceImpl.java

@@ -10,6 +10,7 @@ import com.usthe.manager.service.AppService;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.CommandLineRunner;
+import org.springframework.core.annotation.Order;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
 import org.yaml.snakeyaml.Yaml;
@@ -32,6 +33,7 @@ import java.util.concurrent.ConcurrentHashMap;
  * @date 2021/11/14 17:17
  */
 @Service
+@Order(value = 1)
 @Transactional(rollbackFor = Exception.class)
 @Slf4j
 public class AppServiceImpl implements AppService, CommandLineRunner {