[manager]启动采集任务初始化

This commit is contained in:
tomsun28
2022-02-01 16:57:20 +08:00
parent c65b620a5a
commit 9347097905
4 changed files with 80 additions and 3 deletions

View File

@@ -57,10 +57,12 @@ public class CollectJobService {
* @return long 任务ID
*/
public long addAsyncCollectJob(Job job) {
long jobId = SnowFlakeIdGenerator.generateId();
job.setId(jobId);
if (job.getId() == 0L) {
long jobId = SnowFlakeIdGenerator.generateId();
job.setId(jobId);
}
timerDispatch.addJob(job, null);
return jobId;
return job.getId();
}
/**

View File

@@ -40,6 +40,13 @@ public interface MonitorDao extends JpaRepository<Monitor, Long>, JpaSpecificati
*/
List<Monitor> findMonitorsByAppEquals(String app);
/**
* 查询已下发采集任务的监控
* @param status 监控状态
* @return 监控列表
*/
List<Monitor> findMonitorsByStatusNotInAndAndJobIdNotNull(List<Byte> status);
/**
* 根据监控名称查询监控
* @param name 监控名称

View File

@@ -0,0 +1,66 @@
package com.usthe.manager.service;
import com.usthe.collector.dispatch.entrance.internal.CollectJobService;
import com.usthe.common.entity.job.Configmap;
import com.usthe.common.entity.job.Job;
import com.usthe.common.entity.manager.Monitor;
import com.usthe.common.entity.manager.Param;
import com.usthe.manager.dao.MonitorDao;
import com.usthe.manager.dao.ParamDao;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Service;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
/**
* 采集任务调度初始化
* @author tom
* @date 2022/2/1 16:24
*/
@Service
@Order(value = 2)
@Slf4j
public class JobSchedulerInit implements CommandLineRunner {
@Autowired
private AppService appService;
@Autowired
private CollectJobService collectJobService;
@Autowired
private MonitorDao monitorDao;
@Autowired
private ParamDao paramDao;
@Override
public void run(String... args) throws Exception {
// 读取数据库已经添加应用 构造采集任务
List<Monitor> monitors = monitorDao.findMonitorsByStatusNotInAndAndJobIdNotNull(Arrays.asList((byte)0, (byte)4));
for (Monitor monitor : monitors) {
try {
// 构造采集任务Job实体
Job appDefine = appService.getAppDefine(monitor.getApp());
appDefine.setId(monitor.getJobId());
appDefine.setMonitorId(monitor.getId());
appDefine.setInterval(monitor.getIntervals());
appDefine.setCyclic(true);
appDefine.setTimestamp(System.currentTimeMillis());
List<Param> params = paramDao.findParamsByMonitorId(monitor.getId());
List<Configmap> configmaps = params.stream().map(param ->
new Configmap(param.getField(), param.getValue(), param.getType())).collect(Collectors.toList());
appDefine.setConfigmap(configmaps);
// 下发采集任务
collectJobService.addAsyncCollectJob(appDefine);
} catch (Exception e) {
log.error("init monitor job: {} error,continue next monitor", monitor, e);
}
}
}
}

View File

@@ -10,6 +10,7 @@ import com.usthe.manager.service.AppService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.yaml.snakeyaml.Yaml;
@@ -32,6 +33,7 @@ import java.util.concurrent.ConcurrentHashMap;
* @date 2021/11/14 17:17
*/
@Service
@Order(value = 1)
@Transactional(rollbackFor = Exception.class)
@Slf4j
public class AppServiceImpl implements AppService, CommandLineRunner {