CalculateAlarm.java 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. package com.usthe.alert.calculate;
  2. import com.googlecode.aviator.AviatorEvaluator;
  3. import com.googlecode.aviator.Expression;
  4. import com.usthe.alert.AlerterProperties;
  5. import com.usthe.alert.AlerterWorkerPool;
  6. import com.usthe.alert.AlerterDataQueue;
  7. import com.usthe.alert.entrance.KafkaDataConsume;
  8. import com.usthe.alert.pojo.entity.Alert;
  9. import com.usthe.alert.pojo.entity.AlertDefine;
  10. import com.usthe.alert.service.AlertDefineService;
  11. import com.usthe.alert.util.AlertTemplateUtil;
  12. import com.usthe.common.entity.message.CollectRep;
  13. import com.usthe.common.util.CommonConstants;
  14. import com.usthe.common.util.CommonUtil;
  15. import lombok.extern.slf4j.Slf4j;
  16. import org.springframework.boot.autoconfigure.AutoConfigureAfter;
  17. import org.springframework.context.annotation.Configuration;
  18. import java.util.HashMap;
  19. import java.util.List;
  20. import java.util.Map;
  21. /**
  22. * 根据告警定义规则和采集数据匹配计算告警
  23. * @author tom
  24. * @date 2021/12/9 14:19
  25. */
  26. @Configuration
  27. @AutoConfigureAfter(value = {KafkaDataConsume.class})
  28. @Slf4j
  29. public class CalculateAlarm {
  30. private AlerterWorkerPool workerPool;
  31. private AlerterDataQueue dataQueue;
  32. private AlertDefineService alertDefineService;
  33. public CalculateAlarm (AlerterProperties properties, AlerterWorkerPool workerPool,
  34. AlerterDataQueue dataQueue, AlertDefineService alertDefineService) {
  35. this.workerPool = workerPool;
  36. this.dataQueue = dataQueue;
  37. this.alertDefineService = alertDefineService;
  38. startCalculate();
  39. }
  40. private void startCalculate() {
  41. Runnable runnable = () -> {
  42. while (!Thread.currentThread().isInterrupted()) {
  43. try {
  44. CollectRep.MetricsData metricsData = dataQueue.pollMetricsData();
  45. if (metricsData != null) {
  46. calculate(metricsData);
  47. }
  48. } catch (InterruptedException e) {
  49. log.error(e.getMessage());
  50. }
  51. }
  52. };
  53. workerPool.executeJob(runnable);
  54. workerPool.executeJob(runnable);
  55. workerPool.executeJob(runnable);
  56. }
  57. private void calculate(CollectRep.MetricsData metricsData) {
  58. long monitorId = metricsData.getId();
  59. String app = metricsData.getApp();
  60. String metrics = metricsData.getMetrics();
  61. // 先判断采集响应数据状态 UN_REACHABLE/UN_CONNECTABLE 则需发最高级别告警
  62. if (metricsData.getCode() != CollectRep.Code.SUCCESS) {
  63. // 采集异常
  64. if (metricsData.getCode() == CollectRep.Code.UN_REACHABLE
  65. || metricsData.getCode() == CollectRep.Code.UN_CONNECTABLE) {
  66. // 连接型可用性异常 UN_REACHABLE 对端不可达(网络层icmp) UN_CONNECTABLE 对端连接失败(传输层tcp,udp)
  67. Alert alert = Alert.builder()
  68. .monitorId(monitorId)
  69. .priority((byte) 0)
  70. .status((byte) 0)
  71. .target(CommonConstants.AVAILABLE)
  72. .duration(300)
  73. .content("监控紧急可用性告警: " + metricsData.getCode().name())
  74. .build();
  75. dataQueue.addAlertData(alert);
  76. }
  77. return;
  78. }
  79. // 查出此监控类型下的此指标集合下关联配置的告警定义信息
  80. // field - define[]
  81. Map<String, List<AlertDefine>> defineMap = alertDefineService.getAlertDefines(monitorId, app, metrics);
  82. if (defineMap == null || defineMap.isEmpty()) {
  83. return;
  84. }
  85. List<CollectRep.Field> fields = metricsData.getFieldsList();
  86. Map<String, Object> fieldValueMap = new HashMap<>(16);
  87. fieldValueMap.put("app", app);
  88. fieldValueMap.put("metric", metrics);
  89. for (CollectRep.ValueRow valueRow : metricsData.getValuesList()) {
  90. if (!valueRow.getColumnsList().isEmpty()) {
  91. String instance = valueRow.getInstance();
  92. if (!"".equals(instance)) {
  93. fieldValueMap.put("instance", instance);
  94. } else {
  95. fieldValueMap.remove("instance");
  96. }
  97. for (int index = 0; index < valueRow.getColumnsList().size(); index++) {
  98. String valueStr = valueRow.getColumns(index);
  99. CollectRep.Field field = fields.get(index);
  100. if (field.getType() == CommonConstants.TYPE_NUMBER) {
  101. Double doubleValue = CommonUtil.parseDoubleStr(valueStr);
  102. if (doubleValue != null) {
  103. fieldValueMap.put(field.getName(), doubleValue);
  104. } else {
  105. fieldValueMap.remove(field.getName());
  106. }
  107. } else {
  108. if (!"".equals(valueStr)) {
  109. fieldValueMap.put(field.getName(), valueStr);
  110. } else {
  111. fieldValueMap.remove(field.getName());
  112. }
  113. }
  114. }
  115. for (Map.Entry<String, List<AlertDefine>> entry : defineMap.entrySet()) {
  116. List<AlertDefine> defines = entry.getValue();
  117. for (AlertDefine define : defines) {
  118. String expr = define.getExpr();
  119. try {
  120. Expression expression = AviatorEvaluator.compile(expr, true);
  121. Boolean match = (Boolean) expression.execute(fieldValueMap);
  122. if (match) {
  123. // 阈值规则匹配,触发告警 todo 告警延迟delay参数实现
  124. Alert alert = Alert.builder()
  125. .monitorId(monitorId)
  126. .priority(define.getPriority())
  127. .status((byte) 0)
  128. .target(app + "." + metrics + "." + define.getField())
  129. .duration(define.getDuration())
  130. // 模板中关键字匹配替换
  131. .content(AlertTemplateUtil.render(define.getTemplate(), fieldValueMap))
  132. .build();
  133. dataQueue.addAlertData(alert);
  134. // 此优先级以下的阈值规则则忽略
  135. break;
  136. }
  137. } catch (Exception e) {
  138. log.warn(e.getMessage());
  139. }
  140. }
  141. }
  142. }
  143. }
  144. }
  145. }