Job.java 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. package com.usthe.common.entity.job;
  2. import com.fasterxml.jackson.annotation.JsonIgnore;
  3. import com.google.gson.Gson;
  4. import com.usthe.common.entity.message.CollectRep;
  5. import lombok.AllArgsConstructor;
  6. import lombok.Builder;
  7. import lombok.Data;
  8. import lombok.NoArgsConstructor;
  9. import lombok.extern.slf4j.Slf4j;
  10. import java.util.Collections;
  11. import java.util.Comparator;
  12. import java.util.HashSet;
  13. import java.util.LinkedList;
  14. import java.util.List;
  15. import java.util.Map;
  16. import java.util.Optional;
  17. import java.util.Set;
  18. import java.util.stream.Collectors;
  19. /**
  20. * 采集任务详情
  21. * @author tomsun28
  22. * @date 2021/10/17 21:19
  23. */
  24. @Data
  25. @AllArgsConstructor
  26. @NoArgsConstructor
  27. @Builder
  28. @Slf4j
  29. public class Job {
  30. private static final String AVAILABILITY = "availability";
  31. /**
  32. * 任务ID
  33. */
  34. private long id;
  35. /**
  36. * 监控ID 应用ID
  37. */
  38. private long monitorId;
  39. /**
  40. * 监控的类型 eg: linux | mysql | jvm
  41. */
  42. private String app;
  43. /**
  44. * 监控类型的国际化名称
  45. * zh-CN: PING连通性
  46. * en-US: PING CONNECT
  47. */
  48. private Map<String, String> name;
  49. /**
  50. * 任务派发开始时间戳
  51. */
  52. private long timestamp;
  53. /**
  54. * 任务采集时间间隔(单位秒) eg: 30,60,600
  55. */
  56. private long interval = 600L;
  57. /**
  58. * 是否是循环周期性任务 true为是,false为否
  59. */
  60. private boolean isCyclic = false;
  61. /**
  62. * 指标组配置 eg: cpu memory
  63. */
  64. private List<Metrics> metrics;
  65. /**
  66. * 监控配置参数属性及值 eg: username password timeout host
  67. */
  68. private List<Configmap> configmap;
  69. /**
  70. * collector使用 - 任务被时间轮开始调度的时间戳
  71. */
  72. @JsonIgnore
  73. private transient long dispatchTime;
  74. /**
  75. * collector使用 - 任务版本,此字段不存储于etcd
  76. */
  77. @JsonIgnore
  78. private transient long version;
  79. /**
  80. * collector使用 - 指标组任务执行优先级视图
  81. * 0 - availability
  82. * 1 - cpu | memory
  83. * 2 - health
  84. * 3 - otherMetrics
  85. * ....
  86. * 126 - otherMetrics
  87. * 127 - lastPriorMetrics
  88. */
  89. @JsonIgnore
  90. private transient List<Set<Metrics>> priorMetrics;
  91. /**
  92. * collector使用 - 临时存储一次性任务指标组响应数据
  93. */
  94. @JsonIgnore
  95. private transient List<CollectRep.MetricsData> responseDataTemp;
  96. /**
  97. * collector使用 - 构造初始化指标组执行视图
  98. */
  99. public synchronized void constructPriorMetrics() {
  100. Map<Byte, List<Metrics>> map = metrics.stream()
  101. .peek(metric -> {
  102. // 判断是否配置aliasFields 没有则配置默认
  103. if (metric.getAliasFields() == null || metric.getAliasFields().isEmpty()) {
  104. metric.setAliasFields(metric.getFields().stream().map(Metrics.Field::getField).collect(Collectors.toList()));
  105. }
  106. // 设置默认的指标组执行优先级,不填则默认最后优先级
  107. if (metric.getPriority() == null) {
  108. metric.setPriority(Byte.MAX_VALUE);
  109. }
  110. })
  111. .collect(Collectors.groupingBy(Metrics::getPriority));
  112. // 构造指标组任务执行顺序链表
  113. priorMetrics = new LinkedList<>();
  114. map.values().forEach(metric -> {
  115. Set<Metrics> metricsSet = new HashSet<>(metric);
  116. priorMetrics.add(metricsSet);
  117. });
  118. priorMetrics.sort(Comparator.comparing(e -> {
  119. Optional<Metrics> metric = e.stream().findAny();
  120. if (metric.isPresent()) {
  121. return metric.get().getPriority();
  122. } else {
  123. return Byte.MAX_VALUE;
  124. }
  125. }));
  126. }
  127. /**
  128. * collector使用 - 获取下一组优先级的指标组任务
  129. * @param metrics 当前指标组
  130. * @param first 是否是第一次获取
  131. * @return 指标组任务
  132. * 返回null表示:job已完成,所有指标组采集结束
  133. * 返回empty的集合表示:当前级别下还有指标组采集任务未结束,无法进行下一级别的指标组任务采集
  134. * 返回有数据集合表示:获取到下一组优先级的指标组任务
  135. */
  136. public synchronized Set<Metrics> getNextCollectMetrics(Metrics metrics, boolean first) {
  137. if (priorMetrics == null || priorMetrics.isEmpty()) {
  138. return null;
  139. }
  140. Set<Metrics> metricsSet = priorMetrics.get(0);
  141. if (first) {
  142. if (metricsSet.isEmpty()) {
  143. log.error("metrics must has one [availability] metrics at least.");
  144. }
  145. return metricsSet;
  146. }
  147. if (metrics == null) {
  148. log.error("metrics can not null when not first get");
  149. return null;
  150. }
  151. if (!metricsSet.remove(metrics)) {
  152. log.error("Job {} appId {} app {} metrics {} remove empty error in priorMetrics.",
  153. id, monitorId, app, metrics.getName());
  154. }
  155. if (metricsSet.isEmpty()) {
  156. priorMetrics.remove(0);
  157. if (priorMetrics.size() == 0) {
  158. return null;
  159. }
  160. return priorMetrics.get(0);
  161. } else {
  162. return Collections.emptySet();
  163. }
  164. }
  165. public void addCollectMetricsData(CollectRep.MetricsData metricsData) {
  166. if (responseDataTemp == null) {
  167. responseDataTemp = new LinkedList<>();
  168. }
  169. responseDataTemp.add(metricsData);
  170. }
  171. private static final Gson GSON = new Gson();
  172. public Job clone(){
  173. // 深度克隆
  174. return GSON.fromJson(GSON.toJsonTree(this), Job.class);
  175. }
  176. }