Forráskód Böngészése

[monitor-collector-scheduler] 监控探测接口,一次性临时任务调度编码

tomsun28 4 éve
szülő
commit
c0216c4551

+ 5 - 2
collector/server/src/main/java/com/usthe/collector/dispatch/CommonDispatcher.java

@@ -11,6 +11,7 @@ import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Component;
 
+import java.util.EventListener;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -162,10 +163,12 @@ public class CommonDispatcher implements MetricsTaskDispatch, CollectDataDispatc
             }
         } else {
             // 若是临时性一次任务,需等待所有指标组的采集数据统一包装返回
-            // todo 将当前指标组数据插入job里统一组装
+            // 将当前指标组数据插入job里统一组装
+            job.addCollectMetricsData(metricsData);
             if (metricsSet == null) {
                 // 此Job所有指标组采集执行完成
-                // todo 将所有指标组数据组合一起发送到回调函数
+                // 将所有指标组数据组合一起通知结果监听器
+                timerDispatch.responseSyncJobData(job.getId(), job.getMetricsDataTemps());
             } else if (!metricsSet.isEmpty()) {
                 // 当前级别指标组执行完成,开始执行下一级别的指标组
                 metricsSet.forEach(metricItem -> {

+ 35 - 3
collector/server/src/main/java/com/usthe/collector/dispatch/entrance/http/CollectJobController.java

@@ -1,10 +1,19 @@
 package com.usthe.collector.dispatch.entrance.http;
 
+import com.usthe.collector.dispatch.timer.TimerDispatch;
 import com.usthe.common.entity.job.Job;
+import com.usthe.common.entity.message.CollectRep;
+import com.usthe.common.util.ProtoJsonUtil;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.MediaType;
 import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RestController;
 import reactor.core.publisher.Mono;
 
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * 采集job管理提供api接口
  * @author tomsun28
@@ -13,13 +22,36 @@ import reactor.core.publisher.Mono;
 @RestController
 public class CollectJobController {
 
+    @Autowired
+    private TimerDispatch timerDispatch;
+
     /**
      * 执行一次性采集任务,获取采集数据响应
      * @return 采集结果
      */
-    @PostMapping("/job")
-    public Mono<Object> collectJobData(Job job) {
-        return null;
+    @PostMapping(path = "/job/sync", consumes = MediaType.APPLICATION_JSON_VALUE,
+            produces = MediaType.APPLICATION_JSON_VALUE)
+    public Mono<List<String>> collectSyncJobData(@RequestBody Job job) {
+        return Mono.create(sink -> {
+            CollectResponseEventListener listener = new CollectResponseEventListener() {
+                @Override
+                public void response(List<CollectRep.MetricsData> responseMetrics) {
+                    if (responseMetrics == null || responseMetrics.isEmpty()) {
+                        sink.success();
+                    } else {
+                        List<String> jsons = new ArrayList<>(responseMetrics.size());
+                        for (CollectRep.MetricsData metricsData : responseMetrics) {
+                            String json = ProtoJsonUtil.toJsonStr(metricsData);
+                            if (json != null) {
+                                jsons.add(json);
+                            }
+                        }
+                        sink.success(jsons);
+                    }
+                }
+            };
+            timerDispatch.addJob(job, listener);
+        });
     }
 
 }

+ 20 - 0
collector/server/src/main/java/com/usthe/collector/dispatch/entrance/http/CollectResponseEventListener.java

@@ -0,0 +1,20 @@
+package com.usthe.collector.dispatch.entrance.http;
+
+import com.usthe.common.entity.message.CollectRep;
+
+import java.util.EventListener;
+import java.util.List;
+
+/**
+ * 一次性采集任务响应结果监听器
+ * @author tomsun28
+ * @date 2021/11/16 10:09
+ */
+public interface CollectResponseEventListener extends EventListener {
+
+    /**
+     * 采集任务完成结果通知
+     * @param responseMetrics 响应数据
+     */
+    public default void response(List<CollectRep.MetricsData> responseMetrics) {}
+}

+ 12 - 1
collector/server/src/main/java/com/usthe/collector/dispatch/timer/TimerDispatch.java

@@ -1,8 +1,11 @@
 package com.usthe.collector.dispatch.timer;
 
 
+import com.usthe.collector.dispatch.entrance.http.CollectResponseEventListener;
 import com.usthe.common.entity.job.Job;
+import com.usthe.common.entity.message.CollectRep;
 
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -15,8 +18,9 @@ public interface TimerDispatch {
     /**
      * 增加新的job
      * @param addJob job
+     * @param eventListener 一次性同步任务监听器,异步任务不需要
      */
-    void addJob(Job addJob);
+    void addJob(Job addJob, CollectResponseEventListener eventListener);
 
     /**
      * 调度循环周期性job
@@ -32,4 +36,11 @@ public interface TimerDispatch {
      * @param isCyclic 是否是周期性任务,true是, false为临时性任务
      */
     void deleteJob(long jobId, boolean isCyclic);
+
+    /**
+     * 一次性同步采集任务采集结果通知监听器
+     * @param jobId  jobId
+     * @param metricsDataTemps 采集结果数据
+     */
+    void responseSyncJobData(long jobId, List<CollectRep.MetricsData> metricsDataTemps);
 }

+ 22 - 3
collector/server/src/main/java/com/usthe/collector/dispatch/timer/TimerDispatcher.java

@@ -1,8 +1,12 @@
 package com.usthe.collector.dispatch.timer;
 
+import com.usthe.collector.dispatch.entrance.http.CollectResponseEventListener;
 import com.usthe.common.entity.job.Job;
+import com.usthe.common.entity.message.CollectRep;
 import org.springframework.stereotype.Component;
 
+import java.util.EventListener;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
@@ -26,6 +30,11 @@ public class TimerDispatcher implements TimerDispatch {
      * 已存在的临时性调度任务
      */
     private Map<Long, Timeout> currentTempTaskMap;
+    /**
+     * 一次性任务响应监听器持有
+     * jobId - listener
+     */
+    private Map<Long, CollectResponseEventListener> eventListeners;
 
     public TimerDispatcher() {
         this.wheelTimer = new HashedWheelTimer(r -> {
@@ -34,17 +43,20 @@ public class TimerDispatcher implements TimerDispatch {
             return ret;
         }, 10, TimeUnit.SECONDS, 512);
         this.currentCyclicTaskMap = new ConcurrentHashMap<>(1024);
-        this.currentTempTaskMap = new ConcurrentHashMap<>(1024);
+        this.currentTempTaskMap = new ConcurrentHashMap<>(64);
+        eventListeners = new ConcurrentHashMap<>(64);
     }
 
     @Override
-    public void addJob(Job addJob) {
+    public void addJob(Job addJob, CollectResponseEventListener eventListener) {
         WheelTimerJob timerJob = new WheelTimerJob(addJob);
-        Timeout timeout = wheelTimer.newTimeout(timerJob, addJob.getInterval(), TimeUnit.SECONDS);
         if (addJob.isCyclic()) {
+            Timeout timeout = wheelTimer.newTimeout(timerJob, addJob.getInterval(), TimeUnit.SECONDS);
             currentCyclicTaskMap.put(addJob.getId(), timeout);
         } else {
+            Timeout timeout = wheelTimer.newTimeout(timerJob, 0, TimeUnit.SECONDS);
             currentTempTaskMap.put(addJob.getId(), timeout);
+            eventListeners.put(addJob.getId(), eventListener);
         }
     }
 
@@ -72,4 +84,11 @@ public class TimerDispatcher implements TimerDispatch {
             }
         }
     }
+
+    @Override
+    public void responseSyncJobData(long jobId, List<CollectRep.MetricsData> metricsDataTemps) {
+        currentTempTaskMap.remove(jobId);
+        CollectResponseEventListener eventListener = eventListeners.remove(jobId);
+        eventListener.response(metricsDataTemps);
+    }
 }

+ 1 - 1
collector/server/src/main/resources/application.yml

@@ -1,5 +1,5 @@
 server:
-  port: 8081
+  port: 1157
 spring:
   application:
     name: ${HOSTNAME:@collecor@}${PID}

+ 1 - 1
collector/server/src/main/resources/logback-spring.xml

@@ -69,7 +69,7 @@
 
     <!-- 开发环境配置 -->
     <springProfile name="dev">
-        <root level="INFO">
+        <root level="DEBUG">
             <appender-ref ref="ConsoleAppender"/>
             <appender-ref ref="SystemOutFileAppender"/>
             <appender-ref ref="ErrOutFileAppender"/>

+ 5 - 0
common/pom.xml

@@ -43,5 +43,10 @@
             <artifactId>gson</artifactId>
             <version>2.8.8</version>
         </dependency>
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java-util</artifactId>
+            <version>3.19.1</version>
+        </dependency>
     </dependencies>
 </project>

+ 18 - 1
common/src/main/java/com/usthe/common/entity/job/Job.java

@@ -1,6 +1,8 @@
 package com.usthe.common.entity.job;
 
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.usthe.common.entity.message.CollectRep;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
@@ -67,6 +69,7 @@ public class Job {
     /**
      * collector使用 - 任务版本,此字段不存储于etcd
      */
+    @JsonIgnore
     private transient long version;
     /**
      * collector使用 - 指标组任务执行优先级视图
@@ -78,10 +81,17 @@ public class Job {
      * 126 - otherMetrics
      * 127 - lastPriorMetrics
      */
+    @JsonIgnore
     private transient List<Set<Metrics>> priorMetrics;
 
     /**
-     * collector使用 - 构造初始化指标组
+     * collector使用 - 临时存储一次性任务指标组响应数据
+     */
+    @JsonIgnore
+    private transient List<CollectRep.MetricsData> metricsDataTemps;
+
+    /**
+     * collector使用 - 构造初始化指标组执行视图
      */
     public synchronized void constructPriorMetrics() {
         Map<Byte, List<Metrics>> map = metrics.stream()
@@ -155,4 +165,11 @@ public class Job {
             return Collections.emptySet();
         }
     }
+
+    public void addCollectMetricsData(CollectRep.MetricsData metricsData) {
+        if (metricsDataTemps == null) {
+            metricsDataTemps = new LinkedList<>();
+        }
+        metricsDataTemps.add(metricsData);
+    }
 }

+ 45 - 0
common/src/main/java/com/usthe/common/util/ProtoJsonUtil.java

@@ -0,0 +1,45 @@
+package com.usthe.common.util;
+
+import com.google.protobuf.Message;
+import com.google.protobuf.MessageOrBuilder;
+import com.google.protobuf.util.JsonFormat;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * protobuf json相互转换工具类
+ * @author tomsun28
+ * @date 2021/11/16 12:16
+ */
+@Slf4j
+public class ProtoJsonUtil {
+
+    /**
+     * protobuf 转 json
+     * @param proto protobuf
+     * @return json
+     */
+    public static String toJsonStr(Message proto) {
+        try {
+            return JsonFormat.printer().print(proto);
+        } catch (Exception e) {
+            log.error(e.getMessage(), e);
+            return null;
+        }
+    }
+
+    /**
+     * json转protobuf
+     * @param json json str
+     * @param builder proto instance builder
+     * @return protobuf
+     */
+    public static Message toProtobuf(String json, Message.Builder builder) {
+        try {
+            JsonFormat.parser().merge(json, builder);
+            return builder.build();
+        } catch (Exception e) {
+            log.error(e.getMessage(), e);
+            return null;
+        }
+    }
+}

+ 6 - 0
manager/pom.xml

@@ -39,6 +39,12 @@
             <artifactId>spring-boot-configuration-processor</artifactId>
             <optional>true</optional>
         </dependency>
+        <!-- feign -->
+        <dependency>
+            <groupId>org.springframework.cloud</groupId>
+            <artifactId>spring-cloud-starter-openfeign</artifactId>
+            <version>3.0.5</version>
+        </dependency>
         <!-- data jdbc -->
         <dependency>
             <groupId>org.springframework.boot</groupId>

+ 2 - 0
manager/src/main/java/com/usthe/manager/Manager.java

@@ -2,6 +2,7 @@ package com.usthe.manager;
 
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.cloud.openfeign.EnableFeignClients;
 
 /**
  * @author tomsun28
@@ -9,6 +10,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
  */
 
 @SpringBootApplication
+@EnableFeignClients(basePackages = {"com.usthe"})
 public class Manager {
 
     public static void main(String[] args) {

+ 5 - 5
manager/src/main/java/com/usthe/manager/controller/MonitorController.java

@@ -7,7 +7,6 @@ import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.http.ResponseEntity;
-import org.springframework.validation.annotation.Validated;
 import org.springframework.web.bind.annotation.DeleteMapping;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.PathVariable;
@@ -17,6 +16,8 @@ import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RestController;
 
+import javax.validation.Valid;
+
 import static com.usthe.common.util.CommonConstants.MONITOR_NOT_EXIST;
 import static org.springframework.http.MediaType.APPLICATION_JSON_VALUE;
 
@@ -28,7 +29,6 @@ import static org.springframework.http.MediaType.APPLICATION_JSON_VALUE;
 @Api(tags = "监控管理API")
 @RestController
 @RequestMapping(path = "/monitor", produces = {APPLICATION_JSON_VALUE})
-@Validated
 public class MonitorController {
 
     @Autowired
@@ -36,7 +36,7 @@ public class MonitorController {
 
     @PostMapping
     @ApiOperation(value = "新增监控", notes = "新增一个监控应用")
-    public ResponseEntity<Message<Void>> addNewMonitor(@Validated @RequestBody MonitorDto monitorDto) {
+    public ResponseEntity<Message<Void>> addNewMonitor(@Valid @RequestBody MonitorDto monitorDto) {
         // 校验请求数据
         monitorService.validate(monitorDto, false);
         if (monitorDto.getDetected()) {
@@ -49,7 +49,7 @@ public class MonitorController {
 
     @PutMapping
     @ApiOperation(value = "修改监控", notes = "修改一个已存在监控应用")
-    public ResponseEntity<Message<Void>> modifyMonitor(@Validated @RequestBody MonitorDto monitorDto) {
+    public ResponseEntity<Message<Void>> modifyMonitor(@Valid @RequestBody MonitorDto monitorDto) {
         // 校验请求数据
         monitorService.validate(monitorDto, true);
         if (monitorDto.getDetected()) {
@@ -84,7 +84,7 @@ public class MonitorController {
 
     @PostMapping(path = "/detect")
     @ApiOperation(value = "探测监控", notes = "根据监控信息去对此监控进行可用性探测")
-    public ResponseEntity<Message<Void>> detectMonitor(@Validated @RequestBody MonitorDto monitorDto) {
+    public ResponseEntity<Message<Void>> detectMonitor(@Valid @RequestBody MonitorDto monitorDto) {
         monitorService.validate(monitorDto, false);
         monitorService.detectMonitor(monitorDto.getMonitor(), monitorDto.getParams());
         return ResponseEntity.ok(new Message<>("Detect success."));

+ 4 - 0
manager/src/main/java/com/usthe/manager/pojo/dto/MonitorDto.java

@@ -5,7 +5,9 @@ import com.usthe.manager.pojo.entity.Param;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
+import org.springframework.validation.annotation.Validated;
 
+import javax.validation.Valid;
 import javax.validation.constraints.NotNull;
 import java.util.List;
 
@@ -25,6 +27,7 @@ public class MonitorDto {
      */
     @ApiModelProperty(value = "监控实体", accessMode = READ_WRITE, position = 0)
     @NotNull
+    @Valid
     private Monitor monitor;
 
     /**
@@ -32,6 +35,7 @@ public class MonitorDto {
      */
     @ApiModelProperty(value = "监控参数", accessMode = READ_WRITE, position = 1)
     @NotNull
+    @Valid
     private List<Param> params;
 
     /**

+ 4 - 1
manager/src/main/java/com/usthe/manager/service/impl/MonitorServiceImpl.java

@@ -61,8 +61,11 @@ public class MonitorServiceImpl implements MonitorService {
         List<Configmap> configmaps = params.stream().map(param ->
                 new Configmap(param.getField(), param.getValue(), param.getType())).collect(Collectors.toList());
         appDefine.setConfigmap(configmaps);
-        CollectRep collectRep = jobScheduling.addSyncCollectJob(appDefine);
+        List<CollectRep.MetricsData> collectRep = jobScheduling.addSyncCollectJob(appDefine);
         // 判断探测结果 失败则抛出探测异常
+        if (collectRep == null || collectRep.isEmpty() || collectRep.get(0).getCode() != CollectRep.Code.SUCCESS) {
+            throw new MonitorDetectException(collectRep.get(0).getMsg());
+        }
     }
 
     @Override

+ 33 - 12
manager/src/main/java/com/usthe/manager/support/GlobalExceptionHandler.java

@@ -9,12 +9,16 @@ import org.springframework.dao.DataAccessException;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.ResponseEntity;
 import org.springframework.http.converter.HttpMessageNotReadableException;
+import org.springframework.validation.BindException;
+import org.springframework.validation.FieldError;
 import org.springframework.web.HttpRequestMethodNotSupportedException;
 import org.springframework.web.bind.MethodArgumentNotValidException;
 import org.springframework.web.bind.annotation.ExceptionHandler;
 import org.springframework.web.bind.annotation.ResponseBody;
 import org.springframework.web.bind.annotation.RestControllerAdvice;
 
+import javax.validation.ConstraintViolationException;
+import javax.validation.ValidationException;
 import java.lang.reflect.Field;
 
 import static com.usthe.common.util.CommonConstants.DETECT_FAILED;
@@ -32,10 +36,14 @@ public class GlobalExceptionHandler {
 
     private static Field detailMessage;
 
+    private static Field fieldErrorField;
+
     static {
         try {
             detailMessage = Throwable.class.getDeclaredField("detailMessage");
             detailMessage.setAccessible(true);
+            fieldErrorField = FieldError.class.getDeclaredField("field");
+            fieldErrorField.setAccessible(true);
         } catch (Exception e) {}
     }
 
@@ -95,19 +103,32 @@ public class GlobalExceptionHandler {
 
     /**
      * handler the exception thrown for data input verify
-     * @param exception data input verify exception
+     * valid注解校验框架校验异常统一处理
+     * @param e data input verify exception
      * @return response
      */
-    @ExceptionHandler(MethodArgumentNotValidException.class)
+    @ExceptionHandler({MethodArgumentNotValidException.class, BindException.class})
     @ResponseBody
-    ResponseEntity<Message<Void>> handleInputValidException(MethodArgumentNotValidException exception) {
+    ResponseEntity<Message<Void>> handleInputValidException(Exception e) {
         StringBuffer errorMessage = new StringBuffer();
-        if (exception != null) {
-            exception.getBindingResult().getAllErrors().forEach(error ->
-                    errorMessage.append(error.getDefaultMessage()).append("."));
+        if (e instanceof MethodArgumentNotValidException) {
+            MethodArgumentNotValidException exception = (MethodArgumentNotValidException)e;
+            exception.getBindingResult().getAllErrors().forEach(error -> {
+                try {
+                    String field = (String) fieldErrorField.get(error);
+                    errorMessage.append(field).append(":").append(error.getDefaultMessage()).append("||");
+                } catch (Exception e1) {
+                    errorMessage.append(error.getDefaultMessage()).append("||");
+                }
+            });
+        } else if (e instanceof BindException) {
+            BindException exception = (BindException)e;
+            exception.getAllErrors().forEach(error -> {
+                errorMessage.append(error.getDefaultMessage()).append("||");
+            });
         }
         if (log.isDebugEnabled()) {
-            log.debug("[sample-tom]-[input argument not valid happen]-{}", errorMessage, exception);
+            log.debug("[input argument not valid happen]-{}", errorMessage, e);
         }
         Message<Void> message = Message.<Void>builder().msg(errorMessage.toString()).build();
         return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(message);
@@ -125,7 +146,7 @@ public class GlobalExceptionHandler {
         if (exception != null) {
             errorMessage = exception.getMessage();
         }
-        log.warn("[sample-tom]-[database error happen]-{}", errorMessage, exception);
+        log.warn("[database error happen]-{}", errorMessage, exception);
         Message<Void> message = Message.<Void>builder().msg(errorMessage).build();
         return ResponseEntity.status(HttpStatus.CONFLICT).body(message);
     }
@@ -137,13 +158,13 @@ public class GlobalExceptionHandler {
      */
     @ExceptionHandler(HttpRequestMethodNotSupportedException.class)
     @ResponseBody
-    ResponseEntity<Message> handleMethodNotSupportException(HttpRequestMethodNotSupportedException exception) {
+    ResponseEntity<Message<Void>> handleMethodNotSupportException(HttpRequestMethodNotSupportedException exception) {
         String errorMessage = "Request method not supported";
         if (exception != null && exception.getMessage() != null) {
             errorMessage = exception.getMessage();
         }
         log.info("[monitor]-[Request method not supported]-{}", errorMessage);
-        Message message = Message.builder().msg(errorMessage).build();
+        Message<Void> message = Message.<Void>builder().msg(errorMessage).build();
         return ResponseEntity.status(HttpStatus.METHOD_NOT_ALLOWED).body(message);
     }
 
@@ -154,13 +175,13 @@ public class GlobalExceptionHandler {
      */
     @ExceptionHandler(Exception.class)
     @ResponseBody
-    ResponseEntity<Message> handleUnknownException(Exception exception) {
+    ResponseEntity<Message<Void>> handleUnknownException(Exception exception) {
         String errorMessage = "unknown error happen";
         if (exception != null) {
             errorMessage = exception.getMessage();
         }
         log.error("[monitor]-[unknown error happen]-{}", errorMessage, exception);
-        Message message = Message.builder().msg(errorMessage).build();
+        Message<Void> message = Message.<Void>builder().msg(errorMessage).build();
         return ResponseEntity.status(HttpStatus.CONFLICT).body(message);
     }