[alerter] 告警处理指标数据,告警表达式计算,内容模版关键字替换

This commit is contained in:
tomsun28
2021-12-10 12:54:42 +08:00
parent 29b3e23d02
commit 370224f5cf
23 changed files with 693 additions and 44 deletions

View File

@@ -0,0 +1,22 @@
package com.usthe.alert;
import com.googlecode.aviator.AviatorEvaluator;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author tomsun28
* @date 2021/11/3 12:55
*/
@Configuration
public class AlerterConfiguration {
private static final int AVIATOR_LRU_CACHE_SIZE = 1024;
@Bean
public void configAviatorEvaluator() {
// 配置AviatorEvaluator使用LRU缓存编译后的表达式
AviatorEvaluator.getInstance()
.useLRUExpressionCache(AVIATOR_LRU_CACHE_SIZE);
}
}

View File

@@ -0,0 +1,44 @@
package com.usthe.alert;
import com.usthe.alert.pojo.entity.Alert;
import com.usthe.common.entity.message.CollectRep;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* 采集数据队列
* @author tom
* @date 2021/11/24 17:58
*/
@Component
@Slf4j
public class AlerterDataQueue {
private final LinkedBlockingQueue<CollectRep.MetricsData> metricsDataQueue;
private final LinkedBlockingQueue<Alert> alertDataQueue;
public AlerterDataQueue() {
metricsDataQueue = new LinkedBlockingQueue<>();
alertDataQueue = new LinkedBlockingQueue<>();
}
public void addMetricsData(CollectRep.MetricsData metricsData) {
metricsDataQueue.offer(metricsData);
}
public CollectRep.MetricsData pollMetricsData() throws InterruptedException {
return metricsDataQueue.poll(2, TimeUnit.SECONDS);
}
public void addAlertData(Alert alert) {
alertDataQueue.offer(alert);
}
public Alert pollAlertData() throws InterruptedException {
return alertDataQueue.poll(2, TimeUnit.SECONDS);
}
}

View File

@@ -0,0 +1,101 @@
package com.usthe.alert;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* 数据仓储配置属性
* @author tom
* @date 2021/11/24 10:38
*/
@Component
@ConfigurationProperties(prefix = "alerter")
public class AlerterProperties {
/**
* 数据入口配置属性
*/
private EntranceProperties entrance;
public EntranceProperties getEntrance() {
return entrance;
}
public void setEntrance(EntranceProperties entrance) {
this.entrance = entrance;
}
/**
* 数据入口配置属性
* 入口可以是从kafka rabbitmq rocketmq等消息中间件获取数据
*/
public static class EntranceProperties {
/**
* kafka配置信息
*/
private KafkaProperties kafka;
public KafkaProperties getKafka() {
return kafka;
}
public void setKafka(KafkaProperties kafka) {
this.kafka = kafka;
}
public static class KafkaProperties {
/**
* kafka数据入口是否启动
*/
private boolean enabled = true;
/**
* kafka的连接服务器url
*/
private String servers = "127.0.0.1:9092";
/**
* 接收数据的topic名称
*/
private String topic;
/**
* 消费者组ID
*/
private String groupId;
public boolean isEnabled() {
return enabled;
}
public void setEnabled(boolean enabled) {
this.enabled = enabled;
}
public String getServers() {
return servers;
}
public void setServers(String servers) {
this.servers = servers;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getGroupId() {
return groupId;
}
public void setGroupId(String groupId) {
this.groupId = groupId;
}
}
}
}

View File

@@ -0,0 +1,54 @@
package com.usthe.alert;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* warehouse 工作线程池
* @author tom
* @date 2021/11/24 18:09
*/
@Component
@Slf4j
public class AlerterWorkerPool {
private ThreadPoolExecutor workerExecutor;
public AlerterWorkerPool() {
initWorkExecutor();
}
private void initWorkExecutor() {
// 线程工厂
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setUncaughtExceptionHandler((thread, throwable) -> {
log.error("workerExecutor has uncaughtException.");
log.error(throwable.getMessage(), throwable); })
.setDaemon(true)
.setNameFormat("alerter-worker-%d")
.build();
workerExecutor = new ThreadPoolExecutor(6,
10,
10,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
threadFactory,
new ThreadPoolExecutor.AbortPolicy());
}
/**
* 运行alerter任务
* @param runnable 任务
* @throws RejectedExecutionException when 线程池满
*/
public void executeJob(Runnable runnable) throws RejectedExecutionException {
workerExecutor.execute(runnable);
}
}

View File

@@ -0,0 +1,153 @@
package com.usthe.alert.calculate;
import com.googlecode.aviator.AviatorEvaluator;
import com.googlecode.aviator.Expression;
import com.usthe.alert.AlerterProperties;
import com.usthe.alert.AlerterWorkerPool;
import com.usthe.alert.AlerterDataQueue;
import com.usthe.alert.entrance.KafkaDataConsume;
import com.usthe.alert.pojo.entity.Alert;
import com.usthe.alert.pojo.entity.AlertDefine;
import com.usthe.alert.service.AlertDefineService;
import com.usthe.alert.util.AlertTemplateUtil;
import com.usthe.common.entity.message.CollectRep;
import com.usthe.common.util.CommonConstants;
import com.usthe.common.util.CommonUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 根据告警定义规则和采集数据匹配计算告警
* @author tom
* @date 2021/12/9 14:19
*/
@Configuration
@AutoConfigureAfter(value = {KafkaDataConsume.class})
@Slf4j
public class CalculateAlarm {
private AlerterWorkerPool workerPool;
private AlerterDataQueue dataQueue;
private AlertDefineService alertDefineService;
public CalculateAlarm (AlerterProperties properties, AlerterWorkerPool workerPool,
AlerterDataQueue dataQueue, AlertDefineService alertDefineService) {
this.workerPool = workerPool;
this.dataQueue = dataQueue;
this.alertDefineService = alertDefineService;
startCalculate();
}
private void startCalculate() {
Runnable runnable = () -> {
while (!Thread.currentThread().isInterrupted()) {
try {
CollectRep.MetricsData metricsData = dataQueue.pollMetricsData();
if (metricsData != null) {
calculate(metricsData);
}
} catch (InterruptedException e) {
log.error(e.getMessage());
}
}
};
workerPool.executeJob(runnable);
workerPool.executeJob(runnable);
workerPool.executeJob(runnable);
}
private void calculate(CollectRep.MetricsData metricsData) {
long monitorId = metricsData.getId();
String app = metricsData.getApp();
String metrics = metricsData.getMetrics();
// 先判断采集响应数据状态 UN_REACHABLE/UN_CONNECTABLE 则需发最高级别告警
if (metricsData.getCode() != CollectRep.Code.SUCCESS) {
// 采集异常
if (metricsData.getCode() == CollectRep.Code.UN_REACHABLE
|| metricsData.getCode() == CollectRep.Code.UN_CONNECTABLE) {
// 连接型可用性异常 UN_REACHABLE 对端不可达(网络层icmp) UN_CONNECTABLE 对端连接失败(传输层tcp,udp)
Alert alert = Alert.builder()
.monitorId(monitorId)
.priority((byte) 0)
.status((byte) 0)
.target(CommonConstants.AVAILABLE)
.duration(300)
.content("监控紧急可用性告警: " + metricsData.getCode().name())
.build();
dataQueue.addAlertData(alert);
}
return;
}
// 查出此监控类型下的此指标集合下关联配置的告警定义信息
// field - define[]
Map<String, List<AlertDefine>> defineMap = alertDefineService.getAlertDefines(monitorId, app, metrics);
if (defineMap == null || defineMap.isEmpty()) {
return;
}
List<CollectRep.Field> fields = metricsData.getFieldsList();
Map<String, Object> fieldValueMap = new HashMap<>(16);
fieldValueMap.put("app", app);
fieldValueMap.put("metric", metrics);
for (CollectRep.ValueRow valueRow : metricsData.getValuesList()) {
if (!valueRow.getColumnsList().isEmpty()) {
String instance = valueRow.getInstance();
if (!"".equals(instance)) {
fieldValueMap.put("instance", instance);
} else {
fieldValueMap.remove("instance");
}
for (int index = 0; index < valueRow.getColumnsList().size(); index++) {
String valueStr = valueRow.getColumns(index);
CollectRep.Field field = fields.get(index);
if (field.getType() == CommonConstants.TYPE_NUMBER) {
Double doubleValue = CommonUtil.parseDoubleStr(valueStr);
if (doubleValue != null) {
fieldValueMap.put(field.getName(), doubleValue);
} else {
fieldValueMap.remove(field.getName());
}
} else {
if (!"".equals(valueStr)) {
fieldValueMap.put(field.getName(), valueStr);
} else {
fieldValueMap.remove(field.getName());
}
}
}
for (Map.Entry<String, List<AlertDefine>> entry : defineMap.entrySet()) {
List<AlertDefine> defines = entry.getValue();
for (AlertDefine define : defines) {
String expr = define.getExpr();
try {
Expression expression = AviatorEvaluator.compile(expr, true);
Boolean match = (Boolean) expression.execute(fieldValueMap);
if (match) {
// 阈值规则匹配,触发告警 todo 告警延迟delay参数实现
Alert alert = Alert.builder()
.monitorId(monitorId)
.priority(define.getPriority())
.status((byte) 0)
.target(app + "." + metrics + "." + define.getField())
.duration(define.getDuration())
// 模板中关键字匹配替换
.content(AlertTemplateUtil.render(define.getTemplate(), fieldValueMap))
.build();
dataQueue.addAlertData(alert);
// 此优先级以下的阈值规则则忽略
break;
}
} catch (Exception e) {
log.warn(e.getMessage());
}
}
}
}
}
}
}

View File

@@ -21,7 +21,7 @@ import javax.validation.Valid;
import java.util.Map;
import static com.usthe.common.util.CommonConstants.MONITOR_NOT_EXIST;
import static com.usthe.common.util.CommonConstants.MONITOR_NOT_EXIST_CODE;
import static org.springframework.http.MediaType.APPLICATION_JSON_VALUE;
/**
@@ -63,7 +63,7 @@ public class AlertDefineController {
AlertDefine alertDefine = alertDefineService.getAlertDefine(id);
Message.MessageBuilder<AlertDefine> messageBuilder = Message.builder();
if (alertDefine == null) {
messageBuilder.code(MONITOR_NOT_EXIST).msg("AlertDefine not exist.");
messageBuilder.code(MONITOR_NOT_EXIST_CODE).msg("AlertDefine not exist.");
} else {
messageBuilder.data(alertDefine);
}

View File

@@ -3,7 +3,10 @@ package com.usthe.alert.dao;
import com.usthe.alert.pojo.entity.AlertDefine;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import java.util.List;
import java.util.Set;
/**
@@ -18,4 +21,15 @@ public interface AlertDefineDao extends JpaRepository<AlertDefine, Long>, JpaSpe
* @param alertDefineIds 告警定义ID列表
*/
void deleteAllByIdIn(Set<Long> alertDefineIds);
/**
* 根据监控ID查询与之关联的告警定义列表
* @param monitorId 监控ID
* @param metrics 指标组
* @return 告警定义列表
*/
@Query("select define from AlertDefine define join AlertDefineBind bind on bind.alertDefineId = define.id " +
"where bind.monitorId = :monitorId and define.metric = :metrics")
List<AlertDefine> queryAlertDefinesByMonitor(@Param(value = "monitorId") Long monitorId,
@Param(value = "metrics") String metrics);
}

View File

@@ -0,0 +1,80 @@
package com.usthe.alert.entrance;
import com.usthe.alert.AlerterProperties;
import com.usthe.alert.AlerterWorkerPool;
import com.usthe.alert.AlerterDataQueue;
import com.usthe.common.entity.message.CollectRep;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Configuration;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
/**
* 从Kafka消费指标组采集数据处理
* @author tom
* @date 2021/11/24 18:03
*/
@Configuration
@AutoConfigureAfter(value = {AlerterProperties.class})
@ConditionalOnProperty(prefix = "alerter.entrance.kafka",
name = "enabled", havingValue = "true", matchIfMissing = true)
@Slf4j
public class KafkaDataConsume implements DisposableBean {
private KafkaConsumer<Long, CollectRep.MetricsData> consumer;
private AlerterWorkerPool workerPool;
private AlerterDataQueue dataQueue;
public KafkaDataConsume(AlerterProperties properties, AlerterWorkerPool workerPool,
AlerterDataQueue dataQueue) {
this.workerPool = workerPool;
this.dataQueue = dataQueue;
initConsumer(properties);
startConsumeData();
}
private void startConsumeData() {
Runnable runnable = () -> {
Thread.currentThread().setName("warehouse-kafka-data-consumer");
while (!Thread.currentThread().isInterrupted()) {
ConsumerRecords<Long, CollectRep.MetricsData> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {
dataQueue.addMetricsData(record.value());
});
}
};
workerPool.executeJob(runnable);
}
private void initConsumer(AlerterProperties properties) {
if (properties == null || properties.getEntrance() == null || properties.getEntrance().getKafka() == null) {
log.error("init error, please config Warehouse kafka props in application.yml");
throw new IllegalArgumentException("please config Warehouse kafka props");
}
AlerterProperties.EntranceProperties.KafkaProperties kafkaProp = properties.getEntrance().getKafka();
Properties consumerProp = new Properties();
consumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProp.getServers());
consumerProp.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProp.getGroupId());
consumerProp.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
consumerProp.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaMetricsDataDeserializer.class);
consumerProp.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
consumerProp.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
consumer = new KafkaConsumer<>(consumerProp);
consumer.subscribe(Collections.singleton(kafkaProp.getTopic()));
}
@Override
public void destroy() throws Exception {
if (consumer != null) {
consumer.close();
}
}
}

View File

@@ -0,0 +1,24 @@
package com.usthe.alert.entrance;
import com.usthe.common.entity.message.CollectRep;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Deserializer;
/**
* MetricsData的反序列化
* @author tom
* @date 2021/11/24 17:29
*/
@Slf4j
public class KafkaMetricsDataDeserializer implements Deserializer<CollectRep.MetricsData> {
@Override
public CollectRep.MetricsData deserialize(String topicName, byte[] bytes) {
try {
return CollectRep.MetricsData.parseFrom(bytes);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
return null;
}
}

View File

@@ -0,0 +1,84 @@
package com.usthe.alert.pojo.entity;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.hibernate.validator.constraints.Length;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.Table;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import java.time.LocalDateTime;
import static io.swagger.annotations.ApiModelProperty.AccessMode.READ_ONLY;
import static io.swagger.annotations.ApiModelProperty.AccessMode.READ_WRITE;
/**
* 告警记录
* @author tom
* @date 2021/12/9 15:37
*/
@Entity
@Table(name = "alert")
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@ApiModel(description = "告警记录实体")
public class Alert {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@ApiModelProperty(value = "告警记录实体主键索引ID", example = "87584674384", accessMode = READ_ONLY, position = 0)
private Long id;
@ApiModelProperty(value = "告警监控对象ID", example = "87432674336", accessMode = READ_WRITE, position = 1)
private Long monitorId;
@ApiModelProperty(value = "告警监控对象名称", example = "87432674336", accessMode = READ_WRITE, position = 2)
private String monitorName;
@ApiModelProperty(value = "告警级别 0:高-emergency-紧急告警-红色 1:中-critical-严重告警-橙色 2:低-warning-警告告警-黄色",
example = "1", accessMode = READ_WRITE, position = 3)
@Min(0)
@Max(2)
private byte priority;
@ApiModelProperty(value = "告警状态: 0-待发送 1-已发送 2-已过期(已经超过持续时间)",
example = "1", accessMode = READ_WRITE, position = 4)
@Min(0)
@Max(2)
private byte status;
@ApiModelProperty(value = "告警目标对象: 监控可用性-available 指标-app.metrics.field",
example = "1", accessMode = READ_WRITE, position = 4)
@Min(0)
@Max(2)
private String target;
@ApiModelProperty(value = "触发告警后持续时间,单位s", example = "60", accessMode = READ_WRITE, position = 7)
@Min(0)
private int duration;
@ApiModelProperty(value = "告警通知实际内容", example = "linux_192.134.32.1: 534543534 cpu usage high",
accessMode = READ_WRITE, position = 10)
@Length(max = 1024)
private String content;
/**
* 记录创建时间
*/
@ApiModelProperty(value = "记录创建时间(毫秒时间戳)", example = "1612198922000", accessMode = READ_ONLY, position = 13)
@Column(insertable = false, updatable = false)
private LocalDateTime gmtCreate;
}

View File

@@ -56,7 +56,7 @@ public class AlertDefine {
@ApiModelProperty(value = "是否是默认预置告警", example = "false", accessMode = READ_WRITE, position = 4)
private boolean preset;
@ApiModelProperty(value = "告警触发条件表达式", example = "usage>90", accessMode = READ_WRITE, position = 5)
@ApiModelProperty(value = "告警阈值触发条件表达式", example = "usage>90", accessMode = READ_WRITE, position = 5)
@Length(max = 1024)
private String expr;
@@ -80,7 +80,7 @@ public class AlertDefine {
@ApiModelProperty(value = "告警通知内容", example = "linux {monitor_name}: {monitor_id} cpu usage high",
accessMode = READ_WRITE, position = 10)
@Length(max = 1024)
private String content;
private String template;
/**
* 此条记录创建者

View File

@@ -5,6 +5,7 @@ import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.jpa.domain.Specification;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -74,4 +75,13 @@ public interface AlertDefineService {
* @param monitorMap 监控ID-名称 MAP
*/
void applyBindAlertDefineMonitors(Long alertId, Map<Long, String> monitorMap);
/**
* 查询与此监控ID关联的指定指标组匹配的告警定义
* @param monitorId 监控ID
* @param app 监控类型
* @param metrics 指标组
* @return field - define[]
*/
Map<String, List<AlertDefine>> getAlertDefines(long monitorId, String app, String metrics);
}

View File

@@ -13,6 +13,7 @@ import org.springframework.data.jpa.domain.Specification;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -84,4 +85,15 @@ public class AlertDefineServiceImpl implements AlertDefineService {
.collect(Collectors.toList());
alertDefineBindDao.saveAll(alertDefineBinds);
}
@Override
public Map<String, List<AlertDefine>> getAlertDefines(long monitorId, String app, String metrics) {
List<AlertDefine> defines = alertDefineDao.queryAlertDefinesByMonitor(monitorId, metrics);
if (defines == null || defines.isEmpty()) {
return null;
}
// 将告警阈值定义从告警级别0-3数字升序排序数字越小告警基本越高即从最高的告警阈值开始匹配计算
return defines.stream().sorted(Comparator.comparing(AlertDefine::getPriority))
.collect(Collectors.groupingBy(AlertDefine::getField));
}
}

View File

@@ -0,0 +1,39 @@
package com.usthe.alert.util;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* 告警模版关键字匹配替换引擎工具
* @author tom
* @date 2021/12/10 11:53
*/
@Slf4j
public class AlertTemplateUtil {
/**
* 匹配 ${key} 的变量
* eg: Alert, the instance: ${instance} metrics: ${metrics} is over flow.
*/
private static final Pattern PATTERN = Pattern.compile("\\$\\{(\\w+)\\}");
public static String render(String template, Map<String, Object> replaceData) {
try {
Matcher matcher = PATTERN.matcher(template);
StringBuffer buffer = new StringBuffer();
while (matcher.find()) {
Object objectValue = replaceData.getOrDefault(matcher.group(1), "NullValue");
String value = objectValue.toString();
matcher.appendReplacement(buffer, value);
}
matcher.appendTail(buffer);
return buffer.toString();
} catch (Exception e) {
log.error(e.getMessage(), e);
return template;
}
}
}

View File

@@ -1,3 +1,9 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.usthe.alert.service.impl.AlertDefineServiceImpl,\
com.usthe.alert.controller.AlertDefineController
com.usthe.alert.controller.AlertDefineController,\
com.usthe.alert.AlerterWorkerPool,\
com.usthe.alert.AlerterProperties,\
com.usthe.alert.AlerterDataQueue,\
com.usthe.alert.entrance.KafkaDataConsume,\
com.usthe.alert.AlerterConfiguration,\
com.usthe.alert.calculate.CalculateAlarm