From 370224f5cf1af2fec3ad941ba941e036168c28d8 Mon Sep 17 00:00:00 2001 From: tomsun28 Date: Fri, 10 Dec 2021 12:54:42 +0800 Subject: [PATCH] =?UTF-8?q?[alerter]=20=E5=91=8A=E8=AD=A6=E5=A4=84?= =?UTF-8?q?=E7=90=86=E6=8C=87=E6=A0=87=E6=95=B0=E6=8D=AE=EF=BC=8C=E5=91=8A?= =?UTF-8?q?=E8=AD=A6=E8=A1=A8=E8=BE=BE=E5=BC=8F=E8=AE=A1=E7=AE=97=EF=BC=8C?= =?UTF-8?q?=E5=86=85=E5=AE=B9=E6=A8=A1=E7=89=88=E5=85=B3=E9=94=AE=E5=AD=97?= =?UTF-8?q?=E6=9B=BF=E6=8D=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- alerter/README.md | 3 +- .../com/usthe/alert/AlerterConfiguration.java | 22 +++ .../com/usthe/alert/AlerterDataQueue.java | 44 +++++ .../com/usthe/alert/AlerterProperties.java | 101 ++++++++++++ .../com/usthe/alert/AlerterWorkerPool.java | 54 +++++++ .../usthe/alert/calculate/CalculateAlarm.java | 153 ++++++++++++++++++ .../controller/AlertDefineController.java | 4 +- .../com/usthe/alert/dao/AlertDefineDao.java | 14 ++ .../alert/entrance/KafkaDataConsume.java | 80 +++++++++ .../KafkaMetricsDataDeserializer.java | 24 +++ .../com/usthe/alert/pojo/entity/Alert.java | 84 ++++++++++ .../usthe/alert/pojo/entity/AlertDefine.java | 4 +- .../alert/service/AlertDefineService.java | 10 ++ .../service/impl/AlertDefineServiceImpl.java | 12 ++ .../usthe/alert/util/AlertTemplateUtil.java | 39 +++++ .../main/resources/META-INF/spring.factories | 8 +- .../com/usthe/common/entity/dto/Message.java | 4 +- .../usthe/common/util/CommonConstants.java | 27 ++-- .../manager/controller/AccountController.java | 8 +- .../manager/controller/MonitorController.java | 4 +- .../service/impl/MonitorServiceImpl.java | 10 +- .../support/GlobalExceptionHandler.java | 24 +-- manager/src/main/resources/db/schema.sql | 4 +- 23 files changed, 693 insertions(+), 44 deletions(-) create mode 100644 alerter/src/main/java/com/usthe/alert/AlerterConfiguration.java create mode 100644 alerter/src/main/java/com/usthe/alert/AlerterDataQueue.java create mode 100644 alerter/src/main/java/com/usthe/alert/AlerterProperties.java create mode 100644 alerter/src/main/java/com/usthe/alert/AlerterWorkerPool.java create mode 100644 alerter/src/main/java/com/usthe/alert/calculate/CalculateAlarm.java create mode 100644 alerter/src/main/java/com/usthe/alert/entrance/KafkaDataConsume.java create mode 100644 alerter/src/main/java/com/usthe/alert/entrance/KafkaMetricsDataDeserializer.java create mode 100644 alerter/src/main/java/com/usthe/alert/pojo/entity/Alert.java create mode 100644 alerter/src/main/java/com/usthe/alert/util/AlertTemplateUtil.java diff --git a/alerter/README.md b/alerter/README.md index d1f723f..49e4a1d 100644 --- a/alerter/README.md +++ b/alerter/README.md @@ -1,3 +1,4 @@ ### 告警服务 -根据告警规则配置信息,处理指标数据判断告警,告警分发。 \ No newline at end of file +根据告警规则配置信息,处理指标数据判断告警,告警分发。 +- TODO 告警自动恢复 \ No newline at end of file diff --git a/alerter/src/main/java/com/usthe/alert/AlerterConfiguration.java b/alerter/src/main/java/com/usthe/alert/AlerterConfiguration.java new file mode 100644 index 0000000..39f449a --- /dev/null +++ b/alerter/src/main/java/com/usthe/alert/AlerterConfiguration.java @@ -0,0 +1,22 @@ +package com.usthe.alert; + +import com.googlecode.aviator.AviatorEvaluator; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @author tomsun28 + * @date 2021/11/3 12:55 + */ +@Configuration +public class AlerterConfiguration { + + private static final int AVIATOR_LRU_CACHE_SIZE = 1024; + + @Bean + public void configAviatorEvaluator() { + // 配置AviatorEvaluator使用LRU缓存编译后的表达式 + AviatorEvaluator.getInstance() + .useLRUExpressionCache(AVIATOR_LRU_CACHE_SIZE); + } +} diff --git a/alerter/src/main/java/com/usthe/alert/AlerterDataQueue.java b/alerter/src/main/java/com/usthe/alert/AlerterDataQueue.java new file mode 100644 index 0000000..6d92427 --- /dev/null +++ b/alerter/src/main/java/com/usthe/alert/AlerterDataQueue.java @@ -0,0 +1,44 @@ +package com.usthe.alert; + +import com.usthe.alert.pojo.entity.Alert; +import com.usthe.common.entity.message.CollectRep; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * 采集数据队列 + * @author tom + * @date 2021/11/24 17:58 + */ +@Component +@Slf4j +public class AlerterDataQueue { + + private final LinkedBlockingQueue metricsDataQueue; + private final LinkedBlockingQueue alertDataQueue; + + public AlerterDataQueue() { + metricsDataQueue = new LinkedBlockingQueue<>(); + alertDataQueue = new LinkedBlockingQueue<>(); + } + + public void addMetricsData(CollectRep.MetricsData metricsData) { + metricsDataQueue.offer(metricsData); + } + + public CollectRep.MetricsData pollMetricsData() throws InterruptedException { + return metricsDataQueue.poll(2, TimeUnit.SECONDS); + } + + public void addAlertData(Alert alert) { + alertDataQueue.offer(alert); + } + + public Alert pollAlertData() throws InterruptedException { + return alertDataQueue.poll(2, TimeUnit.SECONDS); + } + +} diff --git a/alerter/src/main/java/com/usthe/alert/AlerterProperties.java b/alerter/src/main/java/com/usthe/alert/AlerterProperties.java new file mode 100644 index 0000000..216eb1c --- /dev/null +++ b/alerter/src/main/java/com/usthe/alert/AlerterProperties.java @@ -0,0 +1,101 @@ +package com.usthe.alert; + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +/** + * 数据仓储配置属性 + * @author tom + * @date 2021/11/24 10:38 + */ +@Component +@ConfigurationProperties(prefix = "alerter") +public class AlerterProperties { + + /** + * 数据入口配置属性 + */ + private EntranceProperties entrance; + + public EntranceProperties getEntrance() { + return entrance; + } + + public void setEntrance(EntranceProperties entrance) { + this.entrance = entrance; + } + + /** + * 数据入口配置属性 + * 入口可以是从kafka rabbitmq rocketmq等消息中间件获取数据 + */ + public static class EntranceProperties { + + /** + * kafka配置信息 + */ + private KafkaProperties kafka; + + public KafkaProperties getKafka() { + return kafka; + } + + public void setKafka(KafkaProperties kafka) { + this.kafka = kafka; + } + + public static class KafkaProperties { + /** + * kafka数据入口是否启动 + */ + private boolean enabled = true; + + /** + * kafka的连接服务器url + */ + private String servers = "127.0.0.1:9092"; + /** + * 接收数据的topic名称 + */ + private String topic; + /** + * 消费者组ID + */ + private String groupId; + + public boolean isEnabled() { + return enabled; + } + + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + + public String getServers() { + return servers; + } + + public void setServers(String servers) { + this.servers = servers; + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public String getGroupId() { + return groupId; + } + + public void setGroupId(String groupId) { + this.groupId = groupId; + } + } + + } + +} diff --git a/alerter/src/main/java/com/usthe/alert/AlerterWorkerPool.java b/alerter/src/main/java/com/usthe/alert/AlerterWorkerPool.java new file mode 100644 index 0000000..98a3555 --- /dev/null +++ b/alerter/src/main/java/com/usthe/alert/AlerterWorkerPool.java @@ -0,0 +1,54 @@ +package com.usthe.alert; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * warehouse 工作线程池 + * @author tom + * @date 2021/11/24 18:09 + */ +@Component +@Slf4j +public class AlerterWorkerPool { + + private ThreadPoolExecutor workerExecutor; + + public AlerterWorkerPool() { + initWorkExecutor(); + } + + private void initWorkExecutor() { + // 线程工厂 + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setUncaughtExceptionHandler((thread, throwable) -> { + log.error("workerExecutor has uncaughtException."); + log.error(throwable.getMessage(), throwable); }) + .setDaemon(true) + .setNameFormat("alerter-worker-%d") + .build(); + workerExecutor = new ThreadPoolExecutor(6, + 10, + 10, + TimeUnit.SECONDS, + new SynchronousQueue<>(), + threadFactory, + new ThreadPoolExecutor.AbortPolicy()); + } + + /** + * 运行alerter任务 + * @param runnable 任务 + * @throws RejectedExecutionException when 线程池满 + */ + public void executeJob(Runnable runnable) throws RejectedExecutionException { + workerExecutor.execute(runnable); + } +} diff --git a/alerter/src/main/java/com/usthe/alert/calculate/CalculateAlarm.java b/alerter/src/main/java/com/usthe/alert/calculate/CalculateAlarm.java new file mode 100644 index 0000000..b81d7ea --- /dev/null +++ b/alerter/src/main/java/com/usthe/alert/calculate/CalculateAlarm.java @@ -0,0 +1,153 @@ +package com.usthe.alert.calculate; + +import com.googlecode.aviator.AviatorEvaluator; +import com.googlecode.aviator.Expression; +import com.usthe.alert.AlerterProperties; +import com.usthe.alert.AlerterWorkerPool; +import com.usthe.alert.AlerterDataQueue; +import com.usthe.alert.entrance.KafkaDataConsume; +import com.usthe.alert.pojo.entity.Alert; +import com.usthe.alert.pojo.entity.AlertDefine; +import com.usthe.alert.service.AlertDefineService; +import com.usthe.alert.util.AlertTemplateUtil; +import com.usthe.common.entity.message.CollectRep; +import com.usthe.common.util.CommonConstants; +import com.usthe.common.util.CommonUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.AutoConfigureAfter; +import org.springframework.context.annotation.Configuration; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * 根据告警定义规则和采集数据匹配计算告警 + * @author tom + * @date 2021/12/9 14:19 + */ +@Configuration +@AutoConfigureAfter(value = {KafkaDataConsume.class}) +@Slf4j +public class CalculateAlarm { + + private AlerterWorkerPool workerPool; + private AlerterDataQueue dataQueue; + private AlertDefineService alertDefineService; + + public CalculateAlarm (AlerterProperties properties, AlerterWorkerPool workerPool, + AlerterDataQueue dataQueue, AlertDefineService alertDefineService) { + this.workerPool = workerPool; + this.dataQueue = dataQueue; + this.alertDefineService = alertDefineService; + startCalculate(); + } + + private void startCalculate() { + Runnable runnable = () -> { + while (!Thread.currentThread().isInterrupted()) { + try { + CollectRep.MetricsData metricsData = dataQueue.pollMetricsData(); + if (metricsData != null) { + calculate(metricsData); + } + } catch (InterruptedException e) { + log.error(e.getMessage()); + } + } + }; + workerPool.executeJob(runnable); + workerPool.executeJob(runnable); + workerPool.executeJob(runnable); + } + + private void calculate(CollectRep.MetricsData metricsData) { + long monitorId = metricsData.getId(); + String app = metricsData.getApp(); + String metrics = metricsData.getMetrics(); + // 先判断采集响应数据状态 UN_REACHABLE/UN_CONNECTABLE 则需发最高级别告警 + if (metricsData.getCode() != CollectRep.Code.SUCCESS) { + // 采集异常 + if (metricsData.getCode() == CollectRep.Code.UN_REACHABLE + || metricsData.getCode() == CollectRep.Code.UN_CONNECTABLE) { + // 连接型可用性异常 UN_REACHABLE 对端不可达(网络层icmp) UN_CONNECTABLE 对端连接失败(传输层tcp,udp) + Alert alert = Alert.builder() + .monitorId(monitorId) + .priority((byte) 0) + .status((byte) 0) + .target(CommonConstants.AVAILABLE) + .duration(300) + .content("监控紧急可用性告警: " + metricsData.getCode().name()) + .build(); + dataQueue.addAlertData(alert); + } + return; + } + // 查出此监控类型下的此指标集合下关联配置的告警定义信息 + // field - define[] + Map> defineMap = alertDefineService.getAlertDefines(monitorId, app, metrics); + if (defineMap == null || defineMap.isEmpty()) { + return; + } + List fields = metricsData.getFieldsList(); + Map fieldValueMap = new HashMap<>(16); + fieldValueMap.put("app", app); + fieldValueMap.put("metric", metrics); + for (CollectRep.ValueRow valueRow : metricsData.getValuesList()) { + if (!valueRow.getColumnsList().isEmpty()) { + String instance = valueRow.getInstance(); + if (!"".equals(instance)) { + fieldValueMap.put("instance", instance); + } else { + fieldValueMap.remove("instance"); + } + for (int index = 0; index < valueRow.getColumnsList().size(); index++) { + String valueStr = valueRow.getColumns(index); + CollectRep.Field field = fields.get(index); + if (field.getType() == CommonConstants.TYPE_NUMBER) { + Double doubleValue = CommonUtil.parseDoubleStr(valueStr); + if (doubleValue != null) { + fieldValueMap.put(field.getName(), doubleValue); + } else { + fieldValueMap.remove(field.getName()); + } + } else { + if (!"".equals(valueStr)) { + fieldValueMap.put(field.getName(), valueStr); + } else { + fieldValueMap.remove(field.getName()); + } + } + } + for (Map.Entry> entry : defineMap.entrySet()) { + List defines = entry.getValue(); + for (AlertDefine define : defines) { + String expr = define.getExpr(); + try { + Expression expression = AviatorEvaluator.compile(expr, true); + Boolean match = (Boolean) expression.execute(fieldValueMap); + if (match) { + // 阈值规则匹配,触发告警 todo 告警延迟delay参数实现 + Alert alert = Alert.builder() + .monitorId(monitorId) + .priority(define.getPriority()) + .status((byte) 0) + .target(app + "." + metrics + "." + define.getField()) + .duration(define.getDuration()) + // 模板中关键字匹配替换 + .content(AlertTemplateUtil.render(define.getTemplate(), fieldValueMap)) + .build(); + dataQueue.addAlertData(alert); + // 此优先级以下的阈值规则则忽略 + break; + } + } catch (Exception e) { + log.warn(e.getMessage()); + } + } + } + + } + } + } +} diff --git a/alerter/src/main/java/com/usthe/alert/controller/AlertDefineController.java b/alerter/src/main/java/com/usthe/alert/controller/AlertDefineController.java index 7f613ae..e277182 100644 --- a/alerter/src/main/java/com/usthe/alert/controller/AlertDefineController.java +++ b/alerter/src/main/java/com/usthe/alert/controller/AlertDefineController.java @@ -21,7 +21,7 @@ import javax.validation.Valid; import java.util.Map; -import static com.usthe.common.util.CommonConstants.MONITOR_NOT_EXIST; +import static com.usthe.common.util.CommonConstants.MONITOR_NOT_EXIST_CODE; import static org.springframework.http.MediaType.APPLICATION_JSON_VALUE; /** @@ -63,7 +63,7 @@ public class AlertDefineController { AlertDefine alertDefine = alertDefineService.getAlertDefine(id); Message.MessageBuilder messageBuilder = Message.builder(); if (alertDefine == null) { - messageBuilder.code(MONITOR_NOT_EXIST).msg("AlertDefine not exist."); + messageBuilder.code(MONITOR_NOT_EXIST_CODE).msg("AlertDefine not exist."); } else { messageBuilder.data(alertDefine); } diff --git a/alerter/src/main/java/com/usthe/alert/dao/AlertDefineDao.java b/alerter/src/main/java/com/usthe/alert/dao/AlertDefineDao.java index 9b20ff5..8f0adf4 100644 --- a/alerter/src/main/java/com/usthe/alert/dao/AlertDefineDao.java +++ b/alerter/src/main/java/com/usthe/alert/dao/AlertDefineDao.java @@ -3,7 +3,10 @@ package com.usthe.alert.dao; import com.usthe.alert.pojo.entity.AlertDefine; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.JpaSpecificationExecutor; +import org.springframework.data.jpa.repository.Query; +import org.springframework.data.repository.query.Param; +import java.util.List; import java.util.Set; /** @@ -18,4 +21,15 @@ public interface AlertDefineDao extends JpaRepository, JpaSpe * @param alertDefineIds 告警定义ID列表 */ void deleteAllByIdIn(Set alertDefineIds); + + /** + * 根据监控ID查询与之关联的告警定义列表 + * @param monitorId 监控ID + * @param metrics 指标组 + * @return 告警定义列表 + */ + @Query("select define from AlertDefine define join AlertDefineBind bind on bind.alertDefineId = define.id " + + "where bind.monitorId = :monitorId and define.metric = :metrics") + List queryAlertDefinesByMonitor(@Param(value = "monitorId") Long monitorId, + @Param(value = "metrics") String metrics); } diff --git a/alerter/src/main/java/com/usthe/alert/entrance/KafkaDataConsume.java b/alerter/src/main/java/com/usthe/alert/entrance/KafkaDataConsume.java new file mode 100644 index 0000000..ef16143 --- /dev/null +++ b/alerter/src/main/java/com/usthe/alert/entrance/KafkaDataConsume.java @@ -0,0 +1,80 @@ +package com.usthe.alert.entrance; + +import com.usthe.alert.AlerterProperties; +import com.usthe.alert.AlerterWorkerPool; +import com.usthe.alert.AlerterDataQueue; +import com.usthe.common.entity.message.CollectRep; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.boot.autoconfigure.AutoConfigureAfter; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Configuration; + +import java.time.Duration; +import java.util.Collections; +import java.util.Properties; + +/** + * 从Kafka消费指标组采集数据处理 + * @author tom + * @date 2021/11/24 18:03 + */ +@Configuration +@AutoConfigureAfter(value = {AlerterProperties.class}) +@ConditionalOnProperty(prefix = "alerter.entrance.kafka", + name = "enabled", havingValue = "true", matchIfMissing = true) +@Slf4j +public class KafkaDataConsume implements DisposableBean { + + private KafkaConsumer consumer; + private AlerterWorkerPool workerPool; + private AlerterDataQueue dataQueue; + public KafkaDataConsume(AlerterProperties properties, AlerterWorkerPool workerPool, + AlerterDataQueue dataQueue) { + this.workerPool = workerPool; + this.dataQueue = dataQueue; + initConsumer(properties); + startConsumeData(); + } + + private void startConsumeData() { + Runnable runnable = () -> { + Thread.currentThread().setName("warehouse-kafka-data-consumer"); + while (!Thread.currentThread().isInterrupted()) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + records.forEach(record -> { + dataQueue.addMetricsData(record.value()); + }); + } + }; + workerPool.executeJob(runnable); + } + + private void initConsumer(AlerterProperties properties) { + if (properties == null || properties.getEntrance() == null || properties.getEntrance().getKafka() == null) { + log.error("init error, please config Warehouse kafka props in application.yml"); + throw new IllegalArgumentException("please config Warehouse kafka props"); + } + AlerterProperties.EntranceProperties.KafkaProperties kafkaProp = properties.getEntrance().getKafka(); + Properties consumerProp = new Properties(); + consumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProp.getServers()); + consumerProp.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProp.getGroupId()); + consumerProp.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); + consumerProp.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaMetricsDataDeserializer.class); + consumerProp.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); + consumerProp.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000); + consumer = new KafkaConsumer<>(consumerProp); + consumer.subscribe(Collections.singleton(kafkaProp.getTopic())); + } + + @Override + public void destroy() throws Exception { + if (consumer != null) { + consumer.close(); + } + } +} diff --git a/alerter/src/main/java/com/usthe/alert/entrance/KafkaMetricsDataDeserializer.java b/alerter/src/main/java/com/usthe/alert/entrance/KafkaMetricsDataDeserializer.java new file mode 100644 index 0000000..c0ab7d6 --- /dev/null +++ b/alerter/src/main/java/com/usthe/alert/entrance/KafkaMetricsDataDeserializer.java @@ -0,0 +1,24 @@ +package com.usthe.alert.entrance; + +import com.usthe.common.entity.message.CollectRep; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.serialization.Deserializer; + +/** + * MetricsData的反序列化 + * @author tom + * @date 2021/11/24 17:29 + */ +@Slf4j +public class KafkaMetricsDataDeserializer implements Deserializer { + + @Override + public CollectRep.MetricsData deserialize(String topicName, byte[] bytes) { + try { + return CollectRep.MetricsData.parseFrom(bytes); + } catch (Exception e) { + log.error(e.getMessage(), e); + } + return null; + } +} diff --git a/alerter/src/main/java/com/usthe/alert/pojo/entity/Alert.java b/alerter/src/main/java/com/usthe/alert/pojo/entity/Alert.java new file mode 100644 index 0000000..999dc1a --- /dev/null +++ b/alerter/src/main/java/com/usthe/alert/pojo/entity/Alert.java @@ -0,0 +1,84 @@ +package com.usthe.alert.pojo.entity; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.hibernate.validator.constraints.Length; + +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.GeneratedValue; +import javax.persistence.GenerationType; +import javax.persistence.Id; +import javax.persistence.Table; +import javax.validation.constraints.Max; +import javax.validation.constraints.Min; + +import java.time.LocalDateTime; + +import static io.swagger.annotations.ApiModelProperty.AccessMode.READ_ONLY; +import static io.swagger.annotations.ApiModelProperty.AccessMode.READ_WRITE; + +/** + * 告警记录 + * @author tom + * @date 2021/12/9 15:37 + */ +@Entity +@Table(name = "alert") +@Data +@Builder +@AllArgsConstructor +@NoArgsConstructor +@ApiModel(description = "告警记录实体") +public class Alert { + + @Id + @GeneratedValue(strategy = GenerationType.IDENTITY) + @ApiModelProperty(value = "告警记录实体主键索引ID", example = "87584674384", accessMode = READ_ONLY, position = 0) + private Long id; + + @ApiModelProperty(value = "告警监控对象ID", example = "87432674336", accessMode = READ_WRITE, position = 1) + private Long monitorId; + + @ApiModelProperty(value = "告警监控对象名称", example = "87432674336", accessMode = READ_WRITE, position = 2) + private String monitorName; + + @ApiModelProperty(value = "告警级别 0:高-emergency-紧急告警-红色 1:中-critical-严重告警-橙色 2:低-warning-警告告警-黄色", + example = "1", accessMode = READ_WRITE, position = 3) + @Min(0) + @Max(2) + private byte priority; + + @ApiModelProperty(value = "告警状态: 0-待发送 1-已发送 2-已过期(已经超过持续时间)", + example = "1", accessMode = READ_WRITE, position = 4) + @Min(0) + @Max(2) + private byte status; + + @ApiModelProperty(value = "告警目标对象: 监控可用性-available 指标-app.metrics.field", + example = "1", accessMode = READ_WRITE, position = 4) + @Min(0) + @Max(2) + private String target; + + @ApiModelProperty(value = "触发告警后持续时间,单位s", example = "60", accessMode = READ_WRITE, position = 7) + @Min(0) + private int duration; + + @ApiModelProperty(value = "告警通知实际内容", example = "linux_192.134.32.1: 534543534 cpu usage high", + accessMode = READ_WRITE, position = 10) + @Length(max = 1024) + private String content; + + /** + * 记录创建时间 + */ + @ApiModelProperty(value = "记录创建时间(毫秒时间戳)", example = "1612198922000", accessMode = READ_ONLY, position = 13) + @Column(insertable = false, updatable = false) + private LocalDateTime gmtCreate; + +} diff --git a/alerter/src/main/java/com/usthe/alert/pojo/entity/AlertDefine.java b/alerter/src/main/java/com/usthe/alert/pojo/entity/AlertDefine.java index 8a6f3ac..c2003ad 100644 --- a/alerter/src/main/java/com/usthe/alert/pojo/entity/AlertDefine.java +++ b/alerter/src/main/java/com/usthe/alert/pojo/entity/AlertDefine.java @@ -56,7 +56,7 @@ public class AlertDefine { @ApiModelProperty(value = "是否是默认预置告警", example = "false", accessMode = READ_WRITE, position = 4) private boolean preset; - @ApiModelProperty(value = "告警触发条件表达式", example = "usage>90", accessMode = READ_WRITE, position = 5) + @ApiModelProperty(value = "告警阈值触发条件表达式", example = "usage>90", accessMode = READ_WRITE, position = 5) @Length(max = 1024) private String expr; @@ -80,7 +80,7 @@ public class AlertDefine { @ApiModelProperty(value = "告警通知内容", example = "linux {monitor_name}: {monitor_id} cpu usage high", accessMode = READ_WRITE, position = 10) @Length(max = 1024) - private String content; + private String template; /** * 此条记录创建者 diff --git a/alerter/src/main/java/com/usthe/alert/service/AlertDefineService.java b/alerter/src/main/java/com/usthe/alert/service/AlertDefineService.java index 0ac6fab..021e51a 100644 --- a/alerter/src/main/java/com/usthe/alert/service/AlertDefineService.java +++ b/alerter/src/main/java/com/usthe/alert/service/AlertDefineService.java @@ -5,6 +5,7 @@ import org.springframework.data.domain.Page; import org.springframework.data.domain.PageRequest; import org.springframework.data.jpa.domain.Specification; +import java.util.List; import java.util.Map; import java.util.Set; @@ -74,4 +75,13 @@ public interface AlertDefineService { * @param monitorMap 监控ID-名称 MAP */ void applyBindAlertDefineMonitors(Long alertId, Map monitorMap); + + /** + * 查询与此监控ID关联的指定指标组匹配的告警定义 + * @param monitorId 监控ID + * @param app 监控类型 + * @param metrics 指标组 + * @return field - define[] + */ + Map> getAlertDefines(long monitorId, String app, String metrics); } diff --git a/alerter/src/main/java/com/usthe/alert/service/impl/AlertDefineServiceImpl.java b/alerter/src/main/java/com/usthe/alert/service/impl/AlertDefineServiceImpl.java index 20d4cce..c340805 100644 --- a/alerter/src/main/java/com/usthe/alert/service/impl/AlertDefineServiceImpl.java +++ b/alerter/src/main/java/com/usthe/alert/service/impl/AlertDefineServiceImpl.java @@ -13,6 +13,7 @@ import org.springframework.data.jpa.domain.Specification; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -84,4 +85,15 @@ public class AlertDefineServiceImpl implements AlertDefineService { .collect(Collectors.toList()); alertDefineBindDao.saveAll(alertDefineBinds); } + + @Override + public Map> getAlertDefines(long monitorId, String app, String metrics) { + List defines = alertDefineDao.queryAlertDefinesByMonitor(monitorId, metrics); + if (defines == null || defines.isEmpty()) { + return null; + } + // 将告警阈值定义从告警级别0-3数字升序排序,数字越小告警基本越高,即从最高的告警阈值开始匹配计算 + return defines.stream().sorted(Comparator.comparing(AlertDefine::getPriority)) + .collect(Collectors.groupingBy(AlertDefine::getField)); + } } diff --git a/alerter/src/main/java/com/usthe/alert/util/AlertTemplateUtil.java b/alerter/src/main/java/com/usthe/alert/util/AlertTemplateUtil.java new file mode 100644 index 0000000..f18a86a --- /dev/null +++ b/alerter/src/main/java/com/usthe/alert/util/AlertTemplateUtil.java @@ -0,0 +1,39 @@ +package com.usthe.alert.util; + +import lombok.extern.slf4j.Slf4j; + +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * 告警模版关键字匹配替换引擎工具 + * @author tom + * @date 2021/12/10 11:53 + */ +@Slf4j +public class AlertTemplateUtil { + + /** + * 匹配 ${key} 的变量 + * eg: Alert, the instance: ${instance} metrics: ${metrics} is over flow. + */ + private static final Pattern PATTERN = Pattern.compile("\\$\\{(\\w+)\\}"); + + public static String render(String template, Map replaceData) { + try { + Matcher matcher = PATTERN.matcher(template); + StringBuffer buffer = new StringBuffer(); + while (matcher.find()) { + Object objectValue = replaceData.getOrDefault(matcher.group(1), "NullValue"); + String value = objectValue.toString(); + matcher.appendReplacement(buffer, value); + } + matcher.appendTail(buffer); + return buffer.toString(); + } catch (Exception e) { + log.error(e.getMessage(), e); + return template; + } + } +} diff --git a/alerter/src/main/resources/META-INF/spring.factories b/alerter/src/main/resources/META-INF/spring.factories index 750462c..c06e47e 100644 --- a/alerter/src/main/resources/META-INF/spring.factories +++ b/alerter/src/main/resources/META-INF/spring.factories @@ -1,3 +1,9 @@ org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.usthe.alert.service.impl.AlertDefineServiceImpl,\ -com.usthe.alert.controller.AlertDefineController \ No newline at end of file +com.usthe.alert.controller.AlertDefineController,\ +com.usthe.alert.AlerterWorkerPool,\ +com.usthe.alert.AlerterProperties,\ +com.usthe.alert.AlerterDataQueue,\ +com.usthe.alert.entrance.KafkaDataConsume,\ +com.usthe.alert.AlerterConfiguration,\ +com.usthe.alert.calculate.CalculateAlarm \ No newline at end of file diff --git a/common/src/main/java/com/usthe/common/entity/dto/Message.java b/common/src/main/java/com/usthe/common/entity/dto/Message.java index d1fb406..6738d85 100644 --- a/common/src/main/java/com/usthe/common/entity/dto/Message.java +++ b/common/src/main/java/com/usthe/common/entity/dto/Message.java @@ -7,7 +7,7 @@ import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; -import static com.usthe.common.util.CommonConstants.SUCCESS; +import static com.usthe.common.util.CommonConstants.SUCCESS_CODE; /** * Unified message structure definition for front and back ends @@ -43,7 +43,7 @@ public class Message { * response code, not http code */ @ApiModelProperty(value = "携带编码", position = 2) - private byte code = SUCCESS; + private byte code = SUCCESS_CODE; public Message(String msg) { this.msg = msg; diff --git a/common/src/main/java/com/usthe/common/util/CommonConstants.java b/common/src/main/java/com/usthe/common/util/CommonConstants.java index 4fd7b49..6356c35 100644 --- a/common/src/main/java/com/usthe/common/util/CommonConstants.java +++ b/common/src/main/java/com/usthe/common/util/CommonConstants.java @@ -10,57 +10,57 @@ public interface CommonConstants { /** * 响应状态码: 成功 */ - byte SUCCESS = 0x00; + byte SUCCESS_CODE = 0x00; /** * 响应状态码: 参数校验失败 */ - byte PARAM_INVALID = 0x01; + byte PARAM_INVALID_CODE = 0x01; /** * 响应状态码: 探测失败 */ - byte DETECT_FAILED = 0x02; + byte DETECT_FAILED_CODE = 0x02; /** * 响应状态码: 监控不存在 */ - byte MONITOR_NOT_EXIST = 0x03; + byte MONITOR_NOT_EXIST_CODE = 0x03; /** * 响应状态码: 监控服务冲突 */ - byte MONITOR_CONFLICT = 0x04; + byte MONITOR_CONFLICT_CODE = 0x04; /** * 响应状态码: 登陆账户密码错误 */ - byte MONITOR_LOGIN_FAILED = 0x05; + byte MONITOR_LOGIN_FAILED_CODE = 0x05; /** * 监控状态码: 未管理 */ - byte UN_MANAGE = 0x00; + byte UN_MANAGE_CODE = 0x00; /** * 监控状态码: 可用 */ - byte AVAILABLE = 0x01; + byte AVAILABLE_CODE = 0x01; /** * 监控状态码: 不可用 */ - byte UN_AVAILABLE = 0x02; + byte UN_AVAILABLE_CODE = 0x02; /** * 监控状态码: 不可达 */ - byte UN_REACHABLE = 0x03; + byte UN_REACHABLE_CODE = 0x03; /** * 监控状态码: 挂起 */ - byte SUSPENDING = 0x04; + byte SUSPENDING_CODE = 0x04; /** @@ -82,4 +82,9 @@ public interface CommonConstants { * 采集指标值:null空值占位符 */ String NULL_VALUE = " "; + + /** + * 可用性对象 + */ + String AVAILABLE = "available"; } diff --git a/manager/src/main/java/com/usthe/manager/controller/AccountController.java b/manager/src/main/java/com/usthe/manager/controller/AccountController.java index 03f842a..6a3578a 100644 --- a/manager/src/main/java/com/usthe/manager/controller/AccountController.java +++ b/manager/src/main/java/com/usthe/manager/controller/AccountController.java @@ -21,7 +21,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; -import static com.usthe.common.util.CommonConstants.MONITOR_LOGIN_FAILED; +import static com.usthe.common.util.CommonConstants.MONITOR_LOGIN_FAILED_CODE; import static org.springframework.http.MediaType.APPLICATION_JSON_VALUE; /** @@ -53,7 +53,7 @@ public class AccountController { SurenessAccount account = accountProvider.loadAccount(identifier); if (account == null || account.getPassword() == null) { Message message = Message.builder().msg("账户密码错误") - .code(MONITOR_LOGIN_FAILED).build(); + .code(MONITOR_LOGIN_FAILED_CODE).build(); return ResponseEntity.ok(message); } else { if (account.getSalt() != null) { @@ -61,12 +61,12 @@ public class AccountController { } if (!account.getPassword().equals(password)) { Message message = Message.builder().msg("账户密码错误") - .code(MONITOR_LOGIN_FAILED).build(); + .code(MONITOR_LOGIN_FAILED_CODE).build(); return ResponseEntity.ok(message); } if (account.isDisabledAccount() || account.isExcessiveAttempts()) { Message message = Message.builder().msg("账户过期或被锁定") - .code(MONITOR_LOGIN_FAILED).build(); + .code(MONITOR_LOGIN_FAILED_CODE).build(); return ResponseEntity.ok(message); } } diff --git a/manager/src/main/java/com/usthe/manager/controller/MonitorController.java b/manager/src/main/java/com/usthe/manager/controller/MonitorController.java index 755b2b1..9b922e7 100644 --- a/manager/src/main/java/com/usthe/manager/controller/MonitorController.java +++ b/manager/src/main/java/com/usthe/manager/controller/MonitorController.java @@ -19,7 +19,7 @@ import org.springframework.web.bind.annotation.RestController; import javax.validation.Valid; -import static com.usthe.common.util.CommonConstants.MONITOR_NOT_EXIST; +import static com.usthe.common.util.CommonConstants.MONITOR_NOT_EXIST_CODE; import static org.springframework.http.MediaType.APPLICATION_JSON_VALUE; /** @@ -69,7 +69,7 @@ public class MonitorController { MonitorDto monitorDto = monitorService.getMonitor(id); Message.MessageBuilder messageBuilder = Message.builder(); if (monitorDto == null) { - messageBuilder.code(MONITOR_NOT_EXIST).msg("Monitor not exist."); + messageBuilder.code(MONITOR_NOT_EXIST_CODE).msg("Monitor not exist."); } else { messageBuilder.data(monitorDto); } diff --git a/manager/src/main/java/com/usthe/manager/service/impl/MonitorServiceImpl.java b/manager/src/main/java/com/usthe/manager/service/impl/MonitorServiceImpl.java index a10e8c8..487b956 100644 --- a/manager/src/main/java/com/usthe/manager/service/impl/MonitorServiceImpl.java +++ b/manager/src/main/java/com/usthe/manager/service/impl/MonitorServiceImpl.java @@ -105,7 +105,7 @@ public class MonitorServiceImpl implements MonitorService { try { monitor.setId(monitorId); monitor.setJobId(jobId); - monitor.setStatus(CommonConstants.AVAILABLE); + monitor.setStatus(CommonConstants.AVAILABLE_CODE); monitorDao.save(monitor); paramDao.saveAll(params); } catch (Exception e) { @@ -292,8 +292,8 @@ public class MonitorServiceImpl implements MonitorService { // jobId不删除 待启动纳管之后再次复用jobId List managedMonitors = monitorDao.findMonitorsByIdIn(ids) .stream().filter(monitor -> - monitor.getStatus() != CommonConstants.UN_MANAGE && monitor.getJobId() != null) - .peek(monitor -> monitor.setStatus(CommonConstants.UN_MANAGE)) + monitor.getStatus() != CommonConstants.UN_MANAGE_CODE && monitor.getJobId() != null) + .peek(monitor -> monitor.setStatus(CommonConstants.UN_MANAGE_CODE)) .collect(Collectors.toList()); if (!managedMonitors.isEmpty()) { monitorDao.saveAll(managedMonitors); @@ -308,8 +308,8 @@ public class MonitorServiceImpl implements MonitorService { // 更新监控状态 新增对应的监控周期性任务 List unManagedMonitors = monitorDao.findMonitorsByIdIn(ids) .stream().filter(monitor -> - monitor.getStatus() == CommonConstants.UN_MANAGE && monitor.getJobId() != null) - .peek(monitor -> monitor.setStatus(CommonConstants.AVAILABLE)) + monitor.getStatus() == CommonConstants.UN_MANAGE_CODE && monitor.getJobId() != null) + .peek(monitor -> monitor.setStatus(CommonConstants.AVAILABLE_CODE)) .collect(Collectors.toList()); if (!unManagedMonitors.isEmpty()) { monitorDao.saveAll(unManagedMonitors); diff --git a/manager/src/main/java/com/usthe/manager/support/GlobalExceptionHandler.java b/manager/src/main/java/com/usthe/manager/support/GlobalExceptionHandler.java index 777061c..271234c 100644 --- a/manager/src/main/java/com/usthe/manager/support/GlobalExceptionHandler.java +++ b/manager/src/main/java/com/usthe/manager/support/GlobalExceptionHandler.java @@ -20,9 +20,9 @@ import org.springframework.web.bind.annotation.RestControllerAdvice; import java.lang.reflect.Field; -import static com.usthe.common.util.CommonConstants.DETECT_FAILED; -import static com.usthe.common.util.CommonConstants.MONITOR_CONFLICT; -import static com.usthe.common.util.CommonConstants.PARAM_INVALID; +import static com.usthe.common.util.CommonConstants.DETECT_FAILED_CODE; +import static com.usthe.common.util.CommonConstants.MONITOR_CONFLICT_CODE; +import static com.usthe.common.util.CommonConstants.PARAM_INVALID_CODE; /** * controller exception handler @@ -54,7 +54,7 @@ public class GlobalExceptionHandler { @ExceptionHandler(MonitorDetectException.class) @ResponseBody ResponseEntity> handleMonitorDetectException(MonitorDetectException exception) { - Message message = Message.builder().msg(exception.getMessage()).code(DETECT_FAILED).build(); + Message message = Message.builder().msg(exception.getMessage()).code(DETECT_FAILED_CODE).build(); return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(message); } @@ -66,7 +66,7 @@ public class GlobalExceptionHandler { @ExceptionHandler(MonitorDatabaseException.class) @ResponseBody ResponseEntity> handleMonitorDatabaseException(MonitorDatabaseException exception) { - Message message = Message.builder().msg(exception.getMessage()).code(MONITOR_CONFLICT).build(); + Message message = Message.builder().msg(exception.getMessage()).code(MONITOR_CONFLICT_CODE).build(); return ResponseEntity.status(HttpStatus.CONFLICT).body(message); } @@ -78,7 +78,7 @@ public class GlobalExceptionHandler { @ExceptionHandler(IllegalArgumentException.class) @ResponseBody ResponseEntity> handleIllegalArgumentException(IllegalArgumentException exception) { - Message message = Message.builder().msg(exception.getMessage()).code(PARAM_INVALID).build(); + Message message = Message.builder().msg(exception.getMessage()).code(PARAM_INVALID_CODE).build(); return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(message); } @@ -91,10 +91,10 @@ public class GlobalExceptionHandler { @ResponseBody ResponseEntity> handleHttpMessageNotReadableException(HttpMessageNotReadableException exception) { try { - Message message = Message.builder().msg((String) detailMessage.get(exception)).code(PARAM_INVALID).build(); + Message message = Message.builder().msg((String) detailMessage.get(exception)).code(PARAM_INVALID_CODE).build(); return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(message); } catch (Exception e) { - Message message = Message.builder().msg(exception.getMessage()).code(PARAM_INVALID).build(); + Message message = Message.builder().msg(exception.getMessage()).code(PARAM_INVALID_CODE).build(); return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(message); } } @@ -129,7 +129,7 @@ public class GlobalExceptionHandler { if (log.isDebugEnabled()) { log.debug("[input argument not valid happen]-{}", errorMessage, e); } - Message message = Message.builder().msg(errorMessage.toString()).code(PARAM_INVALID).build(); + Message message = Message.builder().msg(errorMessage.toString()).code(PARAM_INVALID_CODE).build(); return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(message); } @@ -146,7 +146,7 @@ public class GlobalExceptionHandler { errorMessage = exception.getMessage(); } log.warn("[scheduler warning]-{}", errorMessage); - Message message = Message.builder().msg(errorMessage).code(MONITOR_CONFLICT).build(); + Message message = Message.builder().msg(errorMessage).code(MONITOR_CONFLICT_CODE).build(); return ResponseEntity.status(HttpStatus.CONFLICT).body(message); } @@ -163,7 +163,7 @@ public class GlobalExceptionHandler { errorMessage = exception.getMessage(); } log.warn("[database error happen]-{}", errorMessage, exception); - Message message = Message.builder().msg(errorMessage).code(MONITOR_CONFLICT).build(); + Message message = Message.builder().msg(errorMessage).code(MONITOR_CONFLICT_CODE).build(); return ResponseEntity.status(HttpStatus.CONFLICT).body(message); } @@ -197,7 +197,7 @@ public class GlobalExceptionHandler { errorMessage = exception.getMessage(); } log.error("[monitor]-[unknown error happen]-{}", errorMessage, exception); - Message message = Message.builder().msg(errorMessage).code(MONITOR_CONFLICT).build(); + Message message = Message.builder().msg(errorMessage).code(MONITOR_CONFLICT_CODE).build(); return ResponseEntity.status(HttpStatus.CONFLICT).body(message); } diff --git a/manager/src/main/resources/db/schema.sql b/manager/src/main/resources/db/schema.sql index 4ce10f2..86e11fc 100644 --- a/manager/src/main/resources/db/schema.sql +++ b/manager/src/main/resources/db/schema.sql @@ -80,7 +80,7 @@ CREATE TABLE alert_define duration int not null comment '触发告警后持续时间,单位s', enable boolean not null default true comment '告警触发后是否发送', delay int not null comment '告警延迟时间,即延迟多久再发送告警,单位s', - content varchar(255) not null comment '告警通知内容', + template varchar(255) not null comment '告警通知模板内容', creator varchar(100) comment '创建者', modifier varchar(100) comment '最新修改者', gmt_create timestamp default current_timestamp comment 'create time', @@ -101,7 +101,7 @@ CREATE TABLE alert_define_monitor_bind gmt_create timestamp default current_timestamp comment 'create time', gmt_update datetime default current_timestamp on update current_timestamp comment 'update time', primary key (id), - unique key unique_bind (alert_define_id, monitor_id) + index index_bind (alert_define_id, monitor_id) ) ENGINE = InnoDB DEFAULT CHARSET=utf8mb4;