[collector] 采集任务调度优化,kafka发送消息编码

This commit is contained in:
tomsun28
2021-11-17 01:30:49 +08:00
parent a081f34c1b
commit 9760472d61
15 changed files with 242 additions and 101 deletions

View File

@@ -1,7 +1,7 @@
package com.usthe.collector.dispatch;
import com.usthe.collector.dispatch.timer.WheelTimerJob;
import com.usthe.collector.dispatch.timer.Timeout;
import com.usthe.common.entity.job.Metrics;
import com.usthe.common.entity.message.CollectRep;
@@ -14,10 +14,10 @@ public interface CollectDataDispatch {
/**
* 处理分发采集结果数据
* @param timerJob 时间轮任务
* @param timeout 时间轮timeout
* @param metrics 下面的指标组采集任务
* @param metricsData 采集结果数据
*/
void dispatchCollectData(WheelTimerJob timerJob, Metrics metrics, CollectRep.MetricsData metricsData);
void dispatchCollectData(Timeout timeout, Metrics metrics, CollectRep.MetricsData metricsData);
}

View File

@@ -1,8 +1,9 @@
package com.usthe.collector.dispatch;
import com.usthe.collector.dispatch.export.KafkaDataExporter;
import com.usthe.collector.dispatch.timer.Timeout;
import com.usthe.collector.dispatch.timer.TimerDispatch;
import com.usthe.collector.dispatch.timer.WheelTimerJob;
import com.usthe.collector.dispatch.timer.WheelTimerTask;
import com.usthe.common.entity.job.Job;
import com.usthe.common.entity.job.Metrics;
import com.usthe.common.entity.message.CollectRep;
@@ -11,7 +12,6 @@ import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.EventListener;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -100,13 +100,14 @@ public class CommonDispatcher implements MetricsTaskDispatch, CollectDataDispatc
MetricsTime metricsTime = entry.getValue();
if (metricsTime.getStartTime() < deadline) {
// 指标组采集超时
WheelTimerTask timerJob = (WheelTimerTask) metricsTime.getTimeout().task();
CollectRep.MetricsData metricsData = CollectRep.MetricsData.newBuilder()
.setId(metricsTime.getTimerJob().getJob().getMonitorId())
.setApp(metricsTime.getTimerJob().getJob().getApp())
.setId(timerJob.getJob().getMonitorId())
.setApp(timerJob.getJob().getApp())
.setMetrics(metricsTime.getMetrics().getName())
.setTime(System.currentTimeMillis())
.setCode(CollectRep.Code.TIMEOUT).setMsg("collect timeout").build();
dispatchCollectData(metricsTime.getTimerJob(), metricsTime.getMetrics(), metricsData);
dispatchCollectData(metricsTime.timeout, metricsTime.getMetrics(), metricsData);
}
}
Thread.sleep(20000);
@@ -118,22 +119,24 @@ public class CommonDispatcher implements MetricsTaskDispatch, CollectDataDispatc
}
@Override
public void dispatchMetricsTask(WheelTimerJob timerJob) {
public void dispatchMetricsTask(Timeout timeout) {
// 将单个应用的采集任务根据其下的指标组拆分为对应的指标组采集任务 AbstractCollect
// 将每个指标组放入线程池进行调度
Job job = timerJob.getJob();
WheelTimerTask timerTask = (WheelTimerTask) timeout.task();
Job job = timerTask.getJob();
job.constructPriorMetrics();
Set<Metrics> metricsSet = job.getNextCollectMetrics(null, true);
metricsSet.forEach(metrics -> {
MetricsCollect metricsCollect = new MetricsCollect(metrics, timerJob, this);
MetricsCollect metricsCollect = new MetricsCollect(metrics, timeout, this);
jobRequestQueue.addJob(metricsCollect);
metricsTimeoutMonitorMap.put(job.getId() + metrics.getName(),
new MetricsTime(System.currentTimeMillis(), metrics, timerJob));
new MetricsTime(System.currentTimeMillis(), metrics, timeout));
});
}
@Override
public void dispatchCollectData(WheelTimerJob timerJob, Metrics metrics, CollectRep.MetricsData metricsData) {
public void dispatchCollectData(Timeout timeout, Metrics metrics, CollectRep.MetricsData metricsData) {
WheelTimerTask timerJob = (WheelTimerTask) timeout.task();
Job job = timerJob.getJob();
Set<Metrics> metricsSet = job.getNextCollectMetrics(metrics, false);
if (job.isCyclic()) {
@@ -143,7 +146,10 @@ public class CommonDispatcher implements MetricsTaskDispatch, CollectDataDispatc
// 此Job所有指标组采集执行完成
// 周期性任务再次将任务push到时间轮
// 先判断此次任务执行时间与任务采集间隔时间
long spendTime = System.currentTimeMillis() - job.getTimestamp();
if (timeout.isCancelled()) {
return;
}
long spendTime = System.currentTimeMillis() - job.getDispatchTime();
long interval = job.getInterval() - spendTime / 1000;
interval = interval <= 0 ? 0 : interval;
// 重置构造执行指标组视图
@@ -152,10 +158,10 @@ public class CommonDispatcher implements MetricsTaskDispatch, CollectDataDispatc
} else if (!metricsSet.isEmpty()) {
// 当前级别指标组执行完成,开始执行下一级别的指标组
metricsSet.forEach(metricItem -> {
MetricsCollect metricsCollect = new MetricsCollect(metricItem, timerJob, this);
MetricsCollect metricsCollect = new MetricsCollect(metricItem, timeout, this);
jobRequestQueue.addJob(metricsCollect);
metricsTimeoutMonitorMap.put(job.getId() + metrics.getName(),
new MetricsTime(System.currentTimeMillis(), metrics, timerJob));
new MetricsTime(System.currentTimeMillis(), metrics, timeout));
});
} else {
// 当前执行级别的指标组列表未全执行完成,
@@ -168,14 +174,14 @@ public class CommonDispatcher implements MetricsTaskDispatch, CollectDataDispatc
if (metricsSet == null) {
// 此Job所有指标组采集执行完成
// 将所有指标组数据组合一起通知结果监听器
timerDispatch.responseSyncJobData(job.getId(), job.getMetricsDataTemps());
timerDispatch.responseSyncJobData(job.getId(), job.getResponseDataTemp());
} else if (!metricsSet.isEmpty()) {
// 当前级别指标组执行完成,开始执行下一级别的指标组
metricsSet.forEach(metricItem -> {
MetricsCollect metricsCollect = new MetricsCollect(metricItem, timerJob, this);
MetricsCollect metricsCollect = new MetricsCollect(metricItem, timeout, this);
jobRequestQueue.addJob(metricsCollect);
metricsTimeoutMonitorMap.put(job.getId() + metrics.getName(),
new MetricsTime(System.currentTimeMillis(), metrics, timerJob));
new MetricsTime(System.currentTimeMillis(), metrics, timeout));
});
} else {
// 当前执行级别的指标组列表未全执行完成,
@@ -189,6 +195,6 @@ public class CommonDispatcher implements MetricsTaskDispatch, CollectDataDispatc
private static class MetricsTime {
private long startTime;
private Metrics metrics;
private WheelTimerJob timerJob;
private Timeout timeout;
}
}

View File

@@ -5,7 +5,9 @@ import com.googlecode.aviator.AviatorEvaluator;
import com.googlecode.aviator.Expression;
import com.usthe.collector.collect.AbstractCollect;
import com.usthe.collector.collect.http.HttpCollectImpl;
import com.usthe.collector.dispatch.timer.WheelTimerJob;
import com.usthe.collector.dispatch.timer.Timeout;
import com.usthe.collector.dispatch.timer.WheelTimerTask;
import com.usthe.common.entity.job.Job;
import com.usthe.common.entity.job.Metrics;
import com.usthe.common.entity.message.CollectRep;
import lombok.Data;
@@ -17,45 +19,75 @@ import java.util.Map;
import java.util.stream.Collectors;
/**
* parent job
* 指标组采集
* @author tomsun28
* @date 2021/10/10 15:35
*/
@Slf4j
@Data
public class MetricsCollect implements Runnable, Comparable<MetricsCollect> {
/**
* 监控ID
*/
protected long monitorId;
/**
* 监控类型名称
*/
protected String app;
/**
* 指标组配置
*/
protected Metrics metrics;
/**
* 时间轮timeout
*/
protected Timeout timeout;
/**
* 任务和数据调度
*/
protected CollectDataDispatch collectDataDispatch;
/**
* 任务执行优先级
*/
protected byte runPriority;
/**
* 是周期性采集还是一次性采集 true-周期性 false-一次性
*/
protected boolean isCyclic;
/**
* 指标组采集任务新建时间
*/
protected long newTime;
/**
* 指标组采集任务开始执行时间
*/
protected long startTime;
protected Metrics metrics;
protected WheelTimerJob timerJob;
protected CollectDataDispatch collectDataDispatch;
public MetricsCollect(Metrics metrics, WheelTimerJob timerJob, CollectDataDispatch collectDataDispatch) {
public MetricsCollect(Metrics metrics, Timeout timeout, CollectDataDispatch collectDataDispatch) {
this.newTime = System.currentTimeMillis();
this.timerJob = timerJob;
this.timeout = timeout;
this.metrics = metrics;
WheelTimerTask timerJob = (WheelTimerTask) timeout.task();
Job job = timerJob.getJob();
this.monitorId = job.getMonitorId();
this.app = job.getApp();
this.collectDataDispatch = collectDataDispatch;
if (DispatchConstants.AVAILABILITY.equals(metrics.getName())) {
runPriority = (byte) 1;
} else {
this.isCyclic = job.isCyclic();
// 临时一次性任务执行优先级高
if (isCyclic) {
runPriority = (byte) -1;
} else {
runPriority = (byte) 1;
}
}
@Override
public void run() {
this.startTime = System.currentTimeMillis();
setNewThreadName(timerJob, metrics);
setNewThreadName(monitorId, app, startTime, metrics);
CollectRep.MetricsData.Builder response = CollectRep.MetricsData.newBuilder();
response.setApp(timerJob.getJob().getApp());
response.setId(timerJob.getJob().getId());
response.setApp(app);
response.setId(monitorId);
response.setMetrics(metrics.getName());
// 根据指标组采集协议,应用类型等来调度到真正的应用指标组采集实现类
@@ -69,15 +101,14 @@ public class MetricsCollect implements Runnable, Comparable<MetricsCollect> {
}
if (abstractCollect == null) {
log.error("[Dispatcher] - not support this: app: {}, metrics: {}, protocol: {}.",
timerJob.getJob().getApp(), metrics.getName(), metrics.getProtocol());
app, metrics.getName(), metrics.getProtocol());
response.setCode(CollectRep.Code.FAIL);
response.setMsg("not support " + timerJob.getJob().getApp() + ", "
response.setMsg("not support " + app + ", "
+ metrics.getName() + ", " + metrics.getProtocol());
return;
} else {
try {
abstractCollect.collect(response, timerJob.getJob().getMonitorId(),
timerJob.getJob().getApp(), metrics);
abstractCollect.collect(response, monitorId, app, metrics);
} catch (Exception e) {
log.error("[Metrics Collect]: {}.", e.getMessage(), e);
response.setCode(CollectRep.Code.FAIL);
@@ -85,11 +116,15 @@ public class MetricsCollect implements Runnable, Comparable<MetricsCollect> {
}
}
// 别名属性表达式替换计算
if (fastFailed()) {
return;
}
calculateFields(metrics, response);
CollectRep.MetricsData metricsData = validateResponse(response);
collectDataDispatch.dispatchCollectData(timerJob, metrics, metricsData);
collectDataDispatch.dispatchCollectData(timeout, metrics, metricsData);
}
/**
* 根据 calculates 和 aliasFields 配置计算出真正的指标(fields)值
* @param metrics 指标组配置
@@ -142,6 +177,10 @@ public class MetricsCollect implements Runnable, Comparable<MetricsCollect> {
}
}
private boolean fastFailed() {
return this.timeout == null || this.timeout.isCancelled();
}
private CollectRep.MetricsData validateResponse(CollectRep.MetricsData.Builder builder) {
long endTime = System.currentTimeMillis();
builder.setTime(endTime);
@@ -154,10 +193,10 @@ public class MetricsCollect implements Runnable, Comparable<MetricsCollect> {
return builder.build();
}
private void setNewThreadName(WheelTimerJob timerJob, Metrics metrics) {
String currentName = timerJob.getJob().getMonitorId() + "-" + timerJob.getJob().getApp()
+ "-" + metrics.getName() + "-" + timerJob.getJob().getId();
Thread.currentThread().setName(currentName);
private void setNewThreadName(long monitorId, String app, long startTime, Metrics metrics) {
String builder = monitorId + "-" + app + "-" + metrics.getName() +
"-" + String.valueOf(startTime).substring(9);
Thread.currentThread().setName(builder);
}
@Override

View File

@@ -1,6 +1,6 @@
package com.usthe.collector.dispatch;
import com.usthe.collector.dispatch.timer.WheelTimerJob;
import com.usthe.collector.dispatch.timer.Timeout;
/**
* 指标组采集任务调度器接口
@@ -11,7 +11,7 @@ public interface MetricsTaskDispatch {
/**
* 调度
* @param timerJob timerJob
* @param timeout timeout
*/
void dispatchMetricsTask(WheelTimerJob timerJob);
void dispatchMetricsTask(Timeout timeout);
}

View File

@@ -34,7 +34,8 @@ public class KafkaDataExporter {
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getServers());
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaMetricsDataSerializer.class);
// kafkaProducer = new KafkaProducer<>(properties);
kafkaProducer = new KafkaProducer<>(properties);
kafkaProducer.send(new ProducerRecord<>(kafkaProperties.getTopic(), CollectRep.MetricsData.newBuilder().setApp("dddd").build()));
} catch (Exception e) {
log.error(e.getMessage(), e);
}

View File

@@ -18,7 +18,7 @@ public interface TimerDispatch {
/**
* 增加新的job
* @param addJob job
* @param eventListener 一次性同步任务监听器,异步任务不需要
* @param eventListener 一次性同步任务监听器,异步任务不需要listener
*/
void addJob(Job addJob, CollectResponseEventListener eventListener);
@@ -28,7 +28,7 @@ public interface TimerDispatch {
* @param interval 开始调度的间隔时间
* @param timeUnit 时间单位
*/
void cyclicJob(WheelTimerJob timerTask, long interval, TimeUnit timeUnit);
void cyclicJob(WheelTimerTask timerTask, long interval, TimeUnit timeUnit);
/**
* 删除存在的job

View File

@@ -5,7 +5,6 @@ import com.usthe.common.entity.job.Job;
import com.usthe.common.entity.message.CollectRep;
import org.springframework.stereotype.Component;
import java.util.EventListener;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -49,7 +48,7 @@ public class TimerDispatcher implements TimerDispatch {
@Override
public void addJob(Job addJob, CollectResponseEventListener eventListener) {
WheelTimerJob timerJob = new WheelTimerJob(addJob);
WheelTimerTask timerJob = new WheelTimerTask(addJob);
if (addJob.isCyclic()) {
Timeout timeout = wheelTimer.newTimeout(timerJob, addJob.getInterval(), TimeUnit.SECONDS);
currentCyclicTaskMap.put(addJob.getId(), timeout);
@@ -61,7 +60,7 @@ public class TimerDispatcher implements TimerDispatch {
}
@Override
public void cyclicJob(WheelTimerJob timerTask, long interval, TimeUnit timeUnit) {
public void cyclicJob(WheelTimerTask timerTask, long interval, TimeUnit timeUnit) {
Long jobId = timerTask.getJob().getId();
// 判断此周期性job是否已经被取消
if (currentCyclicTaskMap.containsKey(jobId)) {

View File

@@ -1,30 +0,0 @@
package com.usthe.collector.dispatch.timer;
import com.usthe.collector.dispatch.MetricsTaskDispatch;
import com.usthe.collector.util.SpringContextHolder;
import com.usthe.common.entity.job.Job;
/**
* TimerTask实现
* @author tomsun28
* @date 2021/11/1 17:18
*/
public class WheelTimerJob implements TimerTask {
private Job job;
private MetricsTaskDispatch metricsTaskDispatch;
public WheelTimerJob(Job job) {
this.job = job;
this.metricsTaskDispatch = SpringContextHolder.getBean(MetricsTaskDispatch.class);
}
@Override
public void run(Timeout timeout) throws Exception {
metricsTaskDispatch.dispatchMetricsTask(this);
}
public Job getJob() {
return job;
}
}

View File

@@ -0,0 +1,106 @@
package com.usthe.collector.dispatch.timer;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive;
import com.usthe.collector.dispatch.MetricsTaskDispatch;
import com.usthe.collector.util.SpringContextHolder;
import com.usthe.common.entity.job.Configmap;
import com.usthe.common.entity.job.Job;
import com.usthe.common.entity.job.Metrics;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* TimerTask实现
* @author tomsun28
* @date 2021/11/1 17:18
*/
public class WheelTimerTask implements TimerTask {
private Job job;
private MetricsTaskDispatch metricsTaskDispatch;
public WheelTimerTask(Job job) {
this.metricsTaskDispatch = SpringContextHolder.getBean(MetricsTaskDispatch.class);
this.job = job;
// 初始化job 将监控实际参数值对采集字段进行替换
initJobMetrics(job);
}
/**
* 初始化job 将监控实际参数值对采集字段进行替换
* @param job job
*/
private void initJobMetrics(Job job) {
List<Configmap> config = job.getConfigmap();
Map<String, Configmap> configmap = config.stream().collect(Collectors.toMap(Configmap::getKey, item -> item));
List<Metrics> metrics = job.getMetrics();
Gson gson = new Gson();
List<Metrics> metricsTmp = new ArrayList<>(metrics.size());
for (Metrics metric : metrics) {
JsonElement jsonElement = gson.toJsonTree(metric);
jsonElement = replaceSpecialValue(jsonElement, configmap);
metric = gson.fromJson(jsonElement, Metrics.class);
metricsTmp.add(metric);
}
job.setMetrics(metricsTmp);
}
private JsonElement replaceSpecialValue(JsonElement jsonElement, Map<String, Configmap> configmap) {
if (jsonElement.isJsonObject()) {
JsonObject jsonObject = jsonElement.getAsJsonObject();
jsonObject.entrySet().forEach(entry -> {
JsonElement element = entry.getValue();
if (element.isJsonPrimitive()) {
// 判断是否含有特殊字符 替换
String value = element.getAsString();
if (value.startsWith("^_^")) {
value = value.replaceAll("\\^_\\^", "");
Configmap param = configmap.get(value);
value = (String) param.getValue();
jsonObject.addProperty(entry.getKey(), value);
}
} else {
jsonObject.add(entry.getKey(), replaceSpecialValue(entry.getValue(), configmap));
}
});
} else if (jsonElement.isJsonArray()) {
JsonArray jsonArray = jsonElement.getAsJsonArray();
for (int i = 0; i < jsonArray.size(); i++) {
JsonElement element = jsonArray.get(i);
if (element.isJsonPrimitive()) {
// 判断是否含有特殊字符 替换
String value = element.getAsString();
if (value.startsWith("^_^")) {
value = value.replaceAll("\\^_\\^", "");
Configmap param = configmap.get(value);
value = (String) param.getValue();
jsonArray.set(i, new JsonPrimitive(value));
}
} else {
jsonArray.set(i, replaceSpecialValue(element, configmap));
}
}
}
return jsonElement;
}
@Override
public void run(Timeout timeout) throws Exception {
job.setDispatchTime(System.currentTimeMillis());
metricsTaskDispatch.dispatchMetricsTask(timeout);
}
public Job getJob() {
return job;
}
}

View File

@@ -15,5 +15,5 @@ collector:
export:
kafka:
enabled: true
servers: localhost:9092
servers: 139.198.109.64:9092
topic: async-collect-data

View File

@@ -69,7 +69,7 @@
<!-- 开发环境配置 -->
<springProfile name="dev">
<root level="DEBUG">
<root level="INFO">
<appender-ref ref="ConsoleAppender"/>
<appender-ref ref="SystemOutFileAppender"/>
<appender-ref ref="ErrOutFileAppender"/>

View File

@@ -66,6 +66,12 @@ public class Job {
*/
private List<Configmap> configmap;
/**
* collector使用 - 任务被时间轮开始调度的时间戳
*/
@JsonIgnore
private transient long dispatchTime;
/**
* collector使用 - 任务版本,此字段不存储于etcd
*/
@@ -88,7 +94,7 @@ public class Job {
* collector使用 - 临时存储一次性任务指标组响应数据
*/
@JsonIgnore
private transient List<CollectRep.MetricsData> metricsDataTemps;
private transient List<CollectRep.MetricsData> responseDataTemp;
/**
* collector使用 - 构造初始化指标组执行视图
@@ -100,13 +106,9 @@ public class Job {
if (metric.getAliasFields() == null || metric.getAliasFields().isEmpty()) {
metric.setAliasFields(metric.getFields().stream().map(Metrics.Field::getField).collect(Collectors.toList()));
}
// 设置默认的指标组执行优先级
// 设置默认的指标组执行优先级,不填则默认最后优先级
if (metric.getPriority() == null) {
if (AVAILABILITY.equals(metric.getName())) {
metric.setPriority((byte)0);
} else {
metric.setPriority(Byte.MAX_VALUE);
}
metric.setPriority(Byte.MAX_VALUE);
}
})
.collect(Collectors.groupingBy(Metrics::getPriority));
@@ -167,9 +169,9 @@ public class Job {
}
public void addCollectMetricsData(CollectRep.MetricsData metricsData) {
if (metricsDataTemps == null) {
metricsDataTemps = new LinkedList<>();
if (responseDataTemp == null) {
responseDataTemp = new LinkedList<>();
}
metricsDataTemps.add(metricsData);
responseDataTemp.add(metricsData);
}
}

View File

@@ -10,6 +10,7 @@ import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.List;
import java.util.Objects;
/**
* 监控采集的指标集合详情 eg: cpu | memory | health
@@ -68,6 +69,23 @@ public class Metrics {
*/
private JdbcProtocol jdbc;
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Metrics metrics = (Metrics) o;
return name.equals(metrics.name);
}
@Override
public int hashCode() {
return Objects.hash(name);
}
@Data
@AllArgsConstructor
@NoArgsConstructor

View File

@@ -73,7 +73,7 @@ public class Monitor {
/**
* 监控状态 0:未监控,1:可用,2:不可用,3:不可达,4:挂起
*/
@ApiModelProperty(value = "监控状态 0:未监控,1:可用,2:不可用,3:不可达,4:挂起", example = "1", accessMode = READ_WRITE, position = 6)
@ApiModelProperty(value = "监控状态 0:未监控,1:可用,2:不可用,3:不可达,4:挂起", accessMode = READ_WRITE, position = 6)
@Min(0)
@Max(4)
private byte status;

View File

@@ -62,7 +62,7 @@ public class Param {
/**
* 参数类型 0:数字 1:字符串 2:加密串
*/
@ApiModelProperty(value = "参数类型 0:数字 1:字符串 2:加密串", example = "0", accessMode = READ_WRITE, position = 4)
@ApiModelProperty(value = "参数类型 0:数字 1:字符串 2:加密串", accessMode = READ_WRITE, position = 4)
@Min(0)
@Max(2)
private byte type;