CalculateAlarm.java 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. package com.usthe.alert.calculate;
  2. import com.googlecode.aviator.AviatorEvaluator;
  3. import com.googlecode.aviator.Expression;
  4. import com.usthe.alert.AlerterWorkerPool;
  5. import com.usthe.alert.AlerterDataQueue;
  6. import com.usthe.alert.dao.AlertMonitorDao;
  7. import com.usthe.common.entity.alerter.Alert;
  8. import com.usthe.common.entity.alerter.AlertDefine;
  9. import com.usthe.alert.service.AlertDefineService;
  10. import com.usthe.alert.util.AlertTemplateUtil;
  11. import com.usthe.collector.dispatch.export.MetricsDataExporter;
  12. import com.usthe.common.entity.manager.Monitor;
  13. import com.usthe.common.entity.message.CollectRep;
  14. import com.usthe.common.util.CommonConstants;
  15. import com.usthe.common.util.CommonUtil;
  16. import lombok.extern.slf4j.Slf4j;
  17. import org.springframework.context.annotation.Configuration;
  18. import java.util.Arrays;
  19. import java.util.HashMap;
  20. import java.util.List;
  21. import java.util.Map;
  22. import java.util.concurrent.ConcurrentHashMap;
  23. /**
  24. * 根据告警定义规则和采集数据匹配计算告警
  25. * @author tom
  26. * @date 2021/12/9 14:19
  27. */
  28. @Configuration
  29. @Slf4j
  30. public class CalculateAlarm {
  31. private AlerterWorkerPool workerPool;
  32. private AlerterDataQueue dataQueue;
  33. private MetricsDataExporter dataExporter;
  34. private AlertDefineService alertDefineService;
  35. private Map<String, Alert> triggeredAlertMap;
  36. private Map<Long, CollectRep.Code> triggeredMonitorStateAlertMap;
  37. public CalculateAlarm (AlerterWorkerPool workerPool, AlerterDataQueue dataQueue,
  38. AlertDefineService alertDefineService, MetricsDataExporter dataExporter,
  39. AlertMonitorDao monitorDao) {
  40. this.workerPool = workerPool;
  41. this.dataQueue = dataQueue;
  42. this.dataExporter = dataExporter;
  43. this.alertDefineService = alertDefineService;
  44. this.triggeredAlertMap = new ConcurrentHashMap<>(128);
  45. this.triggeredMonitorStateAlertMap = new ConcurrentHashMap<>(128);
  46. // 初始化stateAlertMap
  47. List<Monitor> monitors = monitorDao.findMonitorsByStatusIn(Arrays.asList((byte)2, (byte)3));
  48. if (monitors != null) {
  49. for (Monitor monitor : monitors) {
  50. this.triggeredMonitorStateAlertMap.put(monitor.getId(), CollectRep.Code.UN_AVAILABLE);
  51. }
  52. }
  53. startCalculate();
  54. }
  55. private void startCalculate() {
  56. Runnable runnable = () -> {
  57. while (!Thread.currentThread().isInterrupted()) {
  58. try {
  59. CollectRep.MetricsData metricsData = dataExporter.pollAlertMetricsData();
  60. if (metricsData != null) {
  61. calculate(metricsData);
  62. }
  63. } catch (InterruptedException e) {
  64. log.error(e.getMessage());
  65. }
  66. }
  67. };
  68. workerPool.executeJob(runnable);
  69. workerPool.executeJob(runnable);
  70. workerPool.executeJob(runnable);
  71. }
  72. private void calculate(CollectRep.MetricsData metricsData) {
  73. long monitorId = metricsData.getId();
  74. String app = metricsData.getApp();
  75. String metrics = metricsData.getMetrics();
  76. // 先判断调度优先级为0的指标组采集响应数据状态 UN_REACHABLE/UN_CONNECTABLE 则需发最高级别告警进行监控状态变更
  77. if (metricsData.getPriority() == 0) {
  78. if (metricsData.getCode() != CollectRep.Code.SUCCESS) {
  79. // 采集异常
  80. Alert.AlertBuilder alertBuilder = Alert.builder()
  81. .monitorId(monitorId)
  82. .priority(CommonConstants.ALERT_PRIORITY_CODE_EMERGENCY)
  83. .status(CommonConstants.ALERT_STATUS_CODE_PENDING)
  84. .times(1);
  85. if (metricsData.getCode() == CollectRep.Code.UN_AVAILABLE) {
  86. // 采集器不可用
  87. alertBuilder.target(CommonConstants.AVAILABLE)
  88. .content("监控紧急可用性告警: " + metricsData.getCode().name());
  89. triggeredMonitorStateAlertMap.put(monitorId, CollectRep.Code.UN_AVAILABLE);
  90. dataQueue.addAlertData(alertBuilder.build());
  91. } else if (metricsData.getCode() == CollectRep.Code.UN_REACHABLE) {
  92. // UN_REACHABLE 对端不可达(网络层icmp)
  93. alertBuilder.target(CommonConstants.REACHABLE)
  94. .content("监控紧急可达性告警: " + metricsData.getCode().name());
  95. triggeredMonitorStateAlertMap.put(monitorId, CollectRep.Code.UN_REACHABLE);
  96. dataQueue.addAlertData(alertBuilder.build());
  97. } else if (metricsData.getCode() == CollectRep.Code.UN_CONNECTABLE) {
  98. // UN_CONNECTABLE 对端连接失败(传输层tcp,udp)
  99. alertBuilder.target(CommonConstants.AVAILABLE)
  100. .content("监控紧急可用性告警: " + metricsData.getCode().name());
  101. triggeredMonitorStateAlertMap.put(monitorId, CollectRep.Code.UN_CONNECTABLE);
  102. dataQueue.addAlertData(alertBuilder.build());
  103. } else {
  104. // 其他异常
  105. alertBuilder.target(CommonConstants.AVAILABLE)
  106. .content("监控紧急可用性告警: " + metricsData.getCode().name());
  107. triggeredMonitorStateAlertMap.put(monitorId, metricsData.getCode());
  108. dataQueue.addAlertData(alertBuilder.build());
  109. }
  110. return;
  111. } else {
  112. // 判断关联监控之前是否有可用性或者不可达告警,发送恢复告警进行监控状态恢复
  113. CollectRep.Code stateCode = triggeredMonitorStateAlertMap.remove(monitorId);
  114. if (stateCode != null) {
  115. // 发送告警恢复
  116. Alert resumeAlert = Alert.builder()
  117. .monitorId(monitorId)
  118. .target(CommonConstants.AVAILABLE)
  119. .content("告警恢复通知, 此监控状态已恢复正常")
  120. .priority(CommonConstants.ALERT_PRIORITY_CODE_WARNING)
  121. .status(CommonConstants.ALERT_STATUS_CODE_RESTORED).build();
  122. dataQueue.addAlertData(resumeAlert);
  123. }
  124. }
  125. }
  126. // 查出此监控类型下的此指标集合下关联配置的告警定义信息
  127. // field - define[]
  128. Map<String, List<AlertDefine>> defineMap = alertDefineService.getMonitorBindAlertDefines(monitorId, app, metrics);
  129. if (defineMap == null || defineMap.isEmpty()) {
  130. return;
  131. }
  132. List<CollectRep.Field> fields = metricsData.getFieldsList();
  133. Map<String, Object> fieldValueMap = new HashMap<>(16);
  134. for (CollectRep.ValueRow valueRow : metricsData.getValuesList()) {
  135. if (!valueRow.getColumnsList().isEmpty()) {
  136. fieldValueMap.clear();
  137. String instance = valueRow.getInstance();
  138. if (!"".equals(instance)) {
  139. fieldValueMap.put("instance", instance);
  140. }
  141. for (int index = 0; index < valueRow.getColumnsList().size(); index++) {
  142. String valueStr = valueRow.getColumns(index);
  143. CollectRep.Field field = fields.get(index);
  144. if (field.getType() == CommonConstants.TYPE_NUMBER) {
  145. Double doubleValue = CommonUtil.parseDoubleStr(valueStr);
  146. if (doubleValue != null) {
  147. fieldValueMap.put(field.getName(), doubleValue);
  148. }
  149. } else {
  150. if (!"".equals(valueStr)) {
  151. fieldValueMap.put(field.getName(), valueStr);
  152. }
  153. }
  154. }
  155. for (Map.Entry<String, List<AlertDefine>> entry : defineMap.entrySet()) {
  156. List<AlertDefine> defines = entry.getValue();
  157. for (AlertDefine define : defines) {
  158. String expr = define.getExpr();
  159. try {
  160. Expression expression = AviatorEvaluator.compile(expr, true);
  161. Boolean match = (Boolean) expression.execute(fieldValueMap);
  162. if (match) {
  163. // 阈值规则匹配,判断已触发阈值次数,触发告警
  164. String monitorAlertKey = String.valueOf(monitorId) + define.getId();
  165. Alert triggeredAlert = triggeredAlertMap.get(monitorAlertKey);
  166. if (triggeredAlert != null) {
  167. int times = triggeredAlert.getTimes() + 1;
  168. triggeredAlert.setTimes(times);
  169. if (times >= define.getTimes()) {
  170. triggeredAlertMap.remove(monitorAlertKey);
  171. dataQueue.addAlertData(triggeredAlert);
  172. }
  173. } else {
  174. int times = 1;
  175. fieldValueMap.put("app", app);
  176. fieldValueMap.put("metrics", metrics);
  177. fieldValueMap.put("metric", define.getField());
  178. Alert alert = Alert.builder()
  179. .monitorId(monitorId)
  180. .alertDefineId(define.getId())
  181. .priority(define.getPriority())
  182. .status(CommonConstants.ALERT_STATUS_CODE_PENDING)
  183. .target(app + "." + metrics + "." + define.getField())
  184. .times(times)
  185. // 模板中关键字匹配替换
  186. .content(AlertTemplateUtil.render(define.getTemplate(), fieldValueMap))
  187. .build();
  188. if (times >= define.getTimes()) {
  189. dataQueue.addAlertData(alert);
  190. } else {
  191. triggeredAlertMap.put(monitorAlertKey, alert);
  192. }
  193. }
  194. // 此优先级以下的阈值规则则忽略
  195. break;
  196. }
  197. } catch (Exception e) {
  198. log.warn(e.getMessage());
  199. }
  200. }
  201. }
  202. }
  203. }
  204. }
  205. }