CalculateAlarm.java 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. package com.usthe.alert.calculate;
  2. import com.googlecode.aviator.AviatorEvaluator;
  3. import com.googlecode.aviator.Expression;
  4. import com.usthe.alert.AlerterWorkerPool;
  5. import com.usthe.alert.AlerterDataQueue;
  6. import com.usthe.alert.entrance.KafkaDataConsume;
  7. import com.usthe.alert.pojo.entity.Alert;
  8. import com.usthe.alert.pojo.entity.AlertDefine;
  9. import com.usthe.alert.service.AlertDefineService;
  10. import com.usthe.alert.util.AlertTemplateUtil;
  11. import com.usthe.common.entity.message.CollectRep;
  12. import com.usthe.common.util.CommonConstants;
  13. import com.usthe.common.util.CommonUtil;
  14. import lombok.extern.slf4j.Slf4j;
  15. import org.springframework.boot.autoconfigure.AutoConfigureAfter;
  16. import org.springframework.context.annotation.Configuration;
  17. import java.util.HashMap;
  18. import java.util.List;
  19. import java.util.Map;
  20. import java.util.concurrent.ConcurrentHashMap;
  21. /**
  22. * 根据告警定义规则和采集数据匹配计算告警
  23. * @author tom
  24. * @date 2021/12/9 14:19
  25. */
  26. @Configuration
  27. @AutoConfigureAfter(value = {KafkaDataConsume.class})
  28. @Slf4j
  29. public class CalculateAlarm {
  30. private AlerterWorkerPool workerPool;
  31. private AlerterDataQueue dataQueue;
  32. private AlertDefineService alertDefineService;
  33. private Map<String, Alert> triggeredAlertMap;
  34. private Map<Long, CollectRep.Code> triggeredMonitorStateAlertMap;
  35. public CalculateAlarm (AlerterWorkerPool workerPool, AlerterDataQueue dataQueue,
  36. AlertDefineService alertDefineService) {
  37. this.workerPool = workerPool;
  38. this.dataQueue = dataQueue;
  39. this.alertDefineService = alertDefineService;
  40. this.triggeredAlertMap = new ConcurrentHashMap<>(128);
  41. this.triggeredMonitorStateAlertMap = new ConcurrentHashMap<>(128);
  42. startCalculate();
  43. }
  44. private void startCalculate() {
  45. Runnable runnable = () -> {
  46. while (!Thread.currentThread().isInterrupted()) {
  47. try {
  48. CollectRep.MetricsData metricsData = dataQueue.pollMetricsData();
  49. if (metricsData != null) {
  50. calculate(metricsData);
  51. }
  52. } catch (InterruptedException e) {
  53. log.error(e.getMessage());
  54. }
  55. }
  56. };
  57. workerPool.executeJob(runnable);
  58. workerPool.executeJob(runnable);
  59. workerPool.executeJob(runnable);
  60. }
  61. private void calculate(CollectRep.MetricsData metricsData) {
  62. long monitorId = metricsData.getId();
  63. String app = metricsData.getApp();
  64. String metrics = metricsData.getMetrics();
  65. // 先判断调度优先级为0的指标组采集响应数据状态 UN_REACHABLE/UN_CONNECTABLE 则需发最高级别告警进行监控状态变更
  66. if (metricsData.getPriority() == 0) {
  67. if (metricsData.getCode() != CollectRep.Code.SUCCESS) {
  68. // 采集异常
  69. Alert.AlertBuilder alertBuilder = Alert.builder()
  70. .monitorId(monitorId)
  71. .priority((byte) 0)
  72. .status((byte) 0)
  73. .times(1);
  74. if (metricsData.getCode() == CollectRep.Code.UN_REACHABLE) {
  75. // UN_REACHABLE 对端不可达(网络层icmp)
  76. alertBuilder.target(CommonConstants.REACHABLE)
  77. .content("监控紧急可达性告警: " + metricsData.getCode().name());
  78. triggeredMonitorStateAlertMap.put(monitorId, CollectRep.Code.UN_REACHABLE);
  79. dataQueue.addAlertData(alertBuilder.build());
  80. } else if (metricsData.getCode() == CollectRep.Code.UN_CONNECTABLE) {
  81. // UN_CONNECTABLE 对端连接失败(传输层tcp,udp)
  82. alertBuilder.target(CommonConstants.AVAILABLE)
  83. .content("监控紧急可用性告警: " + metricsData.getCode().name());
  84. triggeredMonitorStateAlertMap.put(monitorId, CollectRep.Code.UN_CONNECTABLE);
  85. dataQueue.addAlertData(alertBuilder.build());
  86. } else {
  87. // todo 其它规范异常 TIMEOUT ...
  88. return;
  89. }
  90. return;
  91. } else {
  92. // 判断关联监控之前是否有可用性或者不可达告警,发送恢复告警进行监控状态恢复
  93. CollectRep.Code stateCode = triggeredMonitorStateAlertMap.remove(monitorId);
  94. if (stateCode != null) {
  95. // 发送告警恢复
  96. Alert resumeAlert = Alert.builder()
  97. .monitorId(monitorId).status((byte) 2).build();
  98. dataQueue.addAlertData(resumeAlert);
  99. }
  100. }
  101. }
  102. // 查出此监控类型下的此指标集合下关联配置的告警定义信息
  103. // field - define[]
  104. Map<String, List<AlertDefine>> defineMap = alertDefineService.getMonitorBindAlertDefines(monitorId, app, metrics);
  105. if (defineMap == null || defineMap.isEmpty()) {
  106. return;
  107. }
  108. List<CollectRep.Field> fields = metricsData.getFieldsList();
  109. Map<String, Object> fieldValueMap = new HashMap<>(16);
  110. fieldValueMap.put("app", app);
  111. fieldValueMap.put("metrics", metrics);
  112. for (CollectRep.ValueRow valueRow : metricsData.getValuesList()) {
  113. if (!valueRow.getColumnsList().isEmpty()) {
  114. String instance = valueRow.getInstance();
  115. if (!"".equals(instance)) {
  116. fieldValueMap.put("instance", instance);
  117. } else {
  118. fieldValueMap.remove("instance");
  119. }
  120. for (int index = 0; index < valueRow.getColumnsList().size(); index++) {
  121. String valueStr = valueRow.getColumns(index);
  122. CollectRep.Field field = fields.get(index);
  123. fieldValueMap.put("metric", field.getName());
  124. if (field.getType() == CommonConstants.TYPE_NUMBER) {
  125. Double doubleValue = CommonUtil.parseDoubleStr(valueStr);
  126. if (doubleValue != null) {
  127. fieldValueMap.put(field.getName(), doubleValue);
  128. } else {
  129. fieldValueMap.remove(field.getName());
  130. }
  131. } else {
  132. if (!"".equals(valueStr)) {
  133. fieldValueMap.put(field.getName(), valueStr);
  134. } else {
  135. fieldValueMap.remove(field.getName());
  136. }
  137. }
  138. }
  139. for (Map.Entry<String, List<AlertDefine>> entry : defineMap.entrySet()) {
  140. List<AlertDefine> defines = entry.getValue();
  141. for (AlertDefine define : defines) {
  142. String expr = define.getExpr();
  143. try {
  144. Expression expression = AviatorEvaluator.compile(expr, true);
  145. Boolean match = (Boolean) expression.execute(fieldValueMap);
  146. if (match) {
  147. // 阈值规则匹配,判断已触发阈值次数,触发告警
  148. String monitorAlertKey = String.valueOf(monitorId) + define.getId();
  149. Alert triggeredAlert = triggeredAlertMap.get(monitorAlertKey);
  150. if (triggeredAlert != null) {
  151. int times = triggeredAlert.getTimes() + 1;
  152. triggeredAlert.setTimes(times);
  153. if (times >= define.getTimes()) {
  154. triggeredAlertMap.remove(monitorAlertKey);
  155. dataQueue.addAlertData(triggeredAlert);
  156. }
  157. } else {
  158. int times = 1;
  159. Alert alert = Alert.builder()
  160. .monitorId(monitorId)
  161. .alertDefineId(define.getId())
  162. .priority(define.getPriority())
  163. .status((byte) 0)
  164. .target(app + "." + metrics + "." + define.getField())
  165. .times(times)
  166. // 模板中关键字匹配替换
  167. .content(AlertTemplateUtil.render(define.getTemplate(), fieldValueMap))
  168. .build();
  169. if (times >= define.getTimes()) {
  170. dataQueue.addAlertData(alert);
  171. } else {
  172. triggeredAlertMap.put(monitorAlertKey, alert);
  173. }
  174. }
  175. // 此优先级以下的阈值规则则忽略
  176. break;
  177. }
  178. } catch (Exception e) {
  179. log.warn(e.getMessage());
  180. }
  181. }
  182. }
  183. }
  184. }
  185. }
  186. }