CalculateAlarm.java 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. package com.usthe.alert.calculate;
  2. import com.googlecode.aviator.AviatorEvaluator;
  3. import com.googlecode.aviator.Expression;
  4. import com.usthe.alert.AlerterProperties;
  5. import com.usthe.alert.AlerterWorkerPool;
  6. import com.usthe.alert.AlerterDataQueue;
  7. import com.usthe.alert.entrance.KafkaDataConsume;
  8. import com.usthe.alert.pojo.entity.Alert;
  9. import com.usthe.alert.pojo.entity.AlertDefine;
  10. import com.usthe.alert.service.AlertDefineService;
  11. import com.usthe.alert.util.AlertTemplateUtil;
  12. import com.usthe.common.entity.message.CollectRep;
  13. import com.usthe.common.util.CommonConstants;
  14. import com.usthe.common.util.CommonUtil;
  15. import lombok.extern.slf4j.Slf4j;
  16. import org.springframework.boot.autoconfigure.AutoConfigureAfter;
  17. import org.springframework.context.annotation.Configuration;
  18. import java.util.HashMap;
  19. import java.util.List;
  20. import java.util.Map;
  21. import java.util.concurrent.ConcurrentHashMap;
  22. /**
  23. * 根据告警定义规则和采集数据匹配计算告警
  24. * @author tom
  25. * @date 2021/12/9 14:19
  26. */
  27. @Configuration
  28. @AutoConfigureAfter(value = {KafkaDataConsume.class})
  29. @Slf4j
  30. public class CalculateAlarm {
  31. private AlerterWorkerPool workerPool;
  32. private AlerterDataQueue dataQueue;
  33. private AlertDefineService alertDefineService;
  34. private Map<String, Alert> triggeredAlertMap;
  35. public CalculateAlarm (AlerterProperties properties, AlerterWorkerPool workerPool,
  36. AlerterDataQueue dataQueue, AlertDefineService alertDefineService) {
  37. this.workerPool = workerPool;
  38. this.dataQueue = dataQueue;
  39. this.alertDefineService = alertDefineService;
  40. this.triggeredAlertMap = new ConcurrentHashMap<>(128);
  41. startCalculate();
  42. }
  43. private void startCalculate() {
  44. Runnable runnable = () -> {
  45. while (!Thread.currentThread().isInterrupted()) {
  46. try {
  47. CollectRep.MetricsData metricsData = dataQueue.pollMetricsData();
  48. if (metricsData != null) {
  49. calculate(metricsData);
  50. }
  51. } catch (InterruptedException e) {
  52. log.error(e.getMessage());
  53. }
  54. }
  55. };
  56. workerPool.executeJob(runnable);
  57. workerPool.executeJob(runnable);
  58. workerPool.executeJob(runnable);
  59. }
  60. private void calculate(CollectRep.MetricsData metricsData) {
  61. long monitorId = metricsData.getId();
  62. String app = metricsData.getApp();
  63. String metrics = metricsData.getMetrics();
  64. // 先判断采集响应数据状态 UN_REACHABLE/UN_CONNECTABLE 则需发最高级别告警
  65. if (metricsData.getCode() != CollectRep.Code.SUCCESS) {
  66. // 采集异常
  67. Alert.AlertBuilder alertBuilder = Alert.builder()
  68. .monitorId(monitorId)
  69. .priority((byte) 0)
  70. .status((byte) 0)
  71. .times(1);
  72. if (metricsData.getCode() == CollectRep.Code.UN_REACHABLE) {
  73. // UN_REACHABLE 对端不可达(网络层icmp)
  74. alertBuilder.target(CommonConstants.REACHABLE)
  75. .content("监控紧急可达性告警: " + metricsData.getCode().name());
  76. dataQueue.addAlertData(alertBuilder.build());
  77. } else if (metricsData.getCode() == CollectRep.Code.UN_CONNECTABLE) {
  78. // UN_CONNECTABLE 对端连接失败(传输层tcp,udp)
  79. alertBuilder.target(CommonConstants.AVAILABLE)
  80. .content("监控紧急可用性告警: " + metricsData.getCode().name());
  81. dataQueue.addAlertData(alertBuilder.build());
  82. } else {
  83. // todo 其它规范异常 TIMEOUT ...
  84. return;
  85. }
  86. return;
  87. }
  88. // 查出此监控类型下的此指标集合下关联配置的告警定义信息
  89. // field - define[]
  90. Map<String, List<AlertDefine>> defineMap = alertDefineService.getMonitorBindAlertDefines(monitorId, app, metrics);
  91. if (defineMap == null || defineMap.isEmpty()) {
  92. return;
  93. }
  94. List<CollectRep.Field> fields = metricsData.getFieldsList();
  95. Map<String, Object> fieldValueMap = new HashMap<>(16);
  96. fieldValueMap.put("app", app);
  97. fieldValueMap.put("metric", metrics);
  98. for (CollectRep.ValueRow valueRow : metricsData.getValuesList()) {
  99. if (!valueRow.getColumnsList().isEmpty()) {
  100. String instance = valueRow.getInstance();
  101. if (!"".equals(instance)) {
  102. fieldValueMap.put("instance", instance);
  103. } else {
  104. fieldValueMap.remove("instance");
  105. }
  106. for (int index = 0; index < valueRow.getColumnsList().size(); index++) {
  107. String valueStr = valueRow.getColumns(index);
  108. CollectRep.Field field = fields.get(index);
  109. if (field.getType() == CommonConstants.TYPE_NUMBER) {
  110. Double doubleValue = CommonUtil.parseDoubleStr(valueStr);
  111. if (doubleValue != null) {
  112. fieldValueMap.put(field.getName(), doubleValue);
  113. } else {
  114. fieldValueMap.remove(field.getName());
  115. }
  116. } else {
  117. if (!"".equals(valueStr)) {
  118. fieldValueMap.put(field.getName(), valueStr);
  119. } else {
  120. fieldValueMap.remove(field.getName());
  121. }
  122. }
  123. }
  124. for (Map.Entry<String, List<AlertDefine>> entry : defineMap.entrySet()) {
  125. List<AlertDefine> defines = entry.getValue();
  126. for (AlertDefine define : defines) {
  127. String expr = define.getExpr();
  128. try {
  129. Expression expression = AviatorEvaluator.compile(expr, true);
  130. Boolean match = (Boolean) expression.execute(fieldValueMap);
  131. if (match) {
  132. // 阈值规则匹配,判断已触发阈值次数,触发告警
  133. String monitorAlertKey = String.valueOf(monitorId) + define.getId();
  134. Alert triggeredAlert = triggeredAlertMap.get(monitorAlertKey);
  135. if (triggeredAlert != null) {
  136. int times = triggeredAlert.getTimes() + 1;
  137. triggeredAlert.setTimes(times);
  138. if (times >= define.getTimes()) {
  139. triggeredAlertMap.remove(monitorAlertKey);
  140. dataQueue.addAlertData(triggeredAlert);
  141. }
  142. } else {
  143. int times = 1;
  144. Alert alert = Alert.builder()
  145. .monitorId(monitorId)
  146. .alertDefineId(define.getId())
  147. .priority(define.getPriority())
  148. .status((byte) 0)
  149. .target(app + "." + metrics + "." + define.getField())
  150. .times(times)
  151. // 模板中关键字匹配替换
  152. .content(AlertTemplateUtil.render(define.getTemplate(), fieldValueMap))
  153. .build();
  154. if (times >= define.getTimes()) {
  155. dataQueue.addAlertData(alert);
  156. } else {
  157. triggeredAlertMap.put(monitorAlertKey, alert);
  158. }
  159. }
  160. // 此优先级以下的阈值规则则忽略
  161. break;
  162. }
  163. } catch (Exception e) {
  164. log.warn(e.getMessage());
  165. }
  166. }
  167. }
  168. }
  169. }
  170. }
  171. }