[scheduler] 初步完成周期任务调度器编码

This commit is contained in:
tomsun28
2021-11-12 16:06:08 +08:00
parent 3a50946939
commit b1ff63883d
12 changed files with 208 additions and 25 deletions

View File

@@ -17,6 +17,7 @@ import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpUriRequest; import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.client.methods.RequestBuilder; import org.apache.http.client.methods.RequestBuilder;
import org.apache.http.client.protocol.HttpClientContext; import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.protocol.HttpContext; import org.apache.http.protocol.HttpContext;
import org.apache.http.util.EntityUtils; import org.apache.http.util.EntityUtils;
@@ -185,15 +186,16 @@ public class HttpCollectImpl extends AbstractCollect {
private HttpUriRequest createHttpRequest(HttpProtocol httpProtocol) { private HttpUriRequest createHttpRequest(HttpProtocol httpProtocol) {
RequestBuilder requestBuilder; RequestBuilder requestBuilder;
// method // method
if (HttpMethod.GET.matches(httpProtocol.getMethod())) { String httpMethod = httpProtocol.getMethod().toUpperCase();
if (HttpMethod.GET.matches(httpMethod)) {
requestBuilder = RequestBuilder.get(); requestBuilder = RequestBuilder.get();
} else if (HttpMethod.POST.matches(httpProtocol.getMethod())) { } else if (HttpMethod.POST.matches(httpMethod)) {
requestBuilder = RequestBuilder.post(); requestBuilder = RequestBuilder.post();
} else if (HttpMethod.PUT.matches(httpProtocol.getMethod())) { } else if (HttpMethod.PUT.matches(httpMethod)) {
requestBuilder = RequestBuilder.put(); requestBuilder = RequestBuilder.put();
} else if (HttpMethod.DELETE.matches(httpProtocol.getMethod())) { } else if (HttpMethod.DELETE.matches(httpMethod)) {
requestBuilder = RequestBuilder.delete(); requestBuilder = RequestBuilder.delete();
} else if (HttpMethod.PATCH.matches(httpProtocol.getMethod())) { } else if (HttpMethod.PATCH.matches(httpMethod)) {
requestBuilder = RequestBuilder.patch(); requestBuilder = RequestBuilder.patch();
} else { } else {
// not support the method // not support the method

View File

@@ -68,7 +68,9 @@ public class CommonDispatcher implements MetricsTaskDispatch, CollectDataDispatc
MetricsCollect metricsCollect = null; MetricsCollect metricsCollect = null;
try { try {
metricsCollect = jobRequestQueue.getJob(); metricsCollect = jobRequestQueue.getJob();
workerPool.executeJob(metricsCollect); if (metricsCollect != null) {
workerPool.executeJob(metricsCollect);
}
} catch (RejectedExecutionException rejected) { } catch (RejectedExecutionException rejected) {
log.info("[Dispatcher]-the worker pool is full, reject this metrics task, " + log.info("[Dispatcher]-the worker pool is full, reject this metrics task, " +
"sleep and put in queue again."); "sleep and put in queue again.");
@@ -119,7 +121,7 @@ public class CommonDispatcher implements MetricsTaskDispatch, CollectDataDispatc
// 将单个应用的采集任务根据其下的指标组拆分为对应的指标组采集任务 AbstractCollect // 将单个应用的采集任务根据其下的指标组拆分为对应的指标组采集任务 AbstractCollect
// 将每个指标组放入线程池进行调度 // 将每个指标组放入线程池进行调度
Job job = timerJob.getJob(); Job job = timerJob.getJob();
job.constructMetrics(); job.constructPriorMetrics();
Set<Metrics> metricsSet = job.getNextCollectMetrics(null, true); Set<Metrics> metricsSet = job.getNextCollectMetrics(null, true);
metricsSet.forEach(metrics -> { metricsSet.forEach(metrics -> {
MetricsCollect metricsCollect = new MetricsCollect(metrics, timerJob, this); MetricsCollect metricsCollect = new MetricsCollect(metrics, timerJob, this);
@@ -143,6 +145,8 @@ public class CommonDispatcher implements MetricsTaskDispatch, CollectDataDispatc
long spendTime = System.currentTimeMillis() - job.getTimestamp(); long spendTime = System.currentTimeMillis() - job.getTimestamp();
long interval = job.getInterval() - spendTime / 1000; long interval = job.getInterval() - spendTime / 1000;
interval = interval <= 0 ? 0 : interval; interval = interval <= 0 ? 0 : interval;
// 重置构造执行指标组视图
job.constructPriorMetrics();
timerDispatch.cyclicJob(timerJob, interval, TimeUnit.SECONDS); timerDispatch.cyclicJob(timerJob, interval, TimeUnit.SECONDS);
} else if (!metricsSet.isEmpty()) { } else if (!metricsSet.isEmpty()) {
// 当前级别指标组执行完成,开始执行下一级别的指标组 // 当前级别指标组执行完成,开始执行下一级别的指标组

View File

@@ -155,7 +155,8 @@ public class MetricsCollect implements Runnable, Comparable<MetricsCollect> {
} }
private void setNewThreadName(WheelTimerJob timerJob, Metrics metrics) { private void setNewThreadName(WheelTimerJob timerJob, Metrics metrics) {
String currentName = timerJob.getJob().getAppId() + timerJob.getJob().getApp() + metrics.getName() + timerJob.getJob().getId(); String currentName = timerJob.getJob().getAppId() + "-" + timerJob.getJob().getApp()
+ "-" + metrics.getName() + "-" + timerJob.getJob().getId();
Thread.currentThread().setName(currentName); Thread.currentThread().setName(currentName);
} }

View File

@@ -34,7 +34,7 @@ public class KafkaDataExporter {
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getServers()); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getServers());
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaMetricsDataSerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaMetricsDataSerializer.class);
kafkaProducer = new KafkaProducer<>(properties); // kafkaProducer = new KafkaProducer<>(properties);
} catch (Exception e) { } catch (Exception e) {
log.error(e.getMessage(), e); log.error(e.getMessage(), e);
} }

View File

@@ -1,5 +1,5 @@
server: server:
port: 8080 port: 8081
spring: spring:
application: application:
name: ${HOSTNAME:@collecor@}${PID} name: ${HOSTNAME:@collecor@}${PID}

View File

@@ -69,7 +69,7 @@
<!-- 开发环境配置 --> <!-- 开发环境配置 -->
<springProfile name="dev"> <springProfile name="dev">
<root level="DEBUG"> <root level="INFO">
<appender-ref ref="ConsoleAppender"/> <appender-ref ref="ConsoleAppender"/>
<appender-ref ref="SystemOutFileAppender"/> <appender-ref ref="SystemOutFileAppender"/>
<appender-ref ref="ErrOutFileAppender"/> <appender-ref ref="ErrOutFileAppender"/>

View File

@@ -80,18 +80,10 @@ public class Job {
*/ */
private transient List<Set<Metrics>> priorMetrics; private transient List<Set<Metrics>> priorMetrics;
/**
* collector使用 - 构造初始化标志
*/
private transient boolean isConstruct = false;
/** /**
* collector使用 - 构造初始化指标组 * collector使用 - 构造初始化指标组
*/ */
public synchronized void constructMetrics() { public synchronized void constructPriorMetrics() {
if (isConstruct) {
return;
}
Map<Byte, List<Metrics>> map = metrics.stream() Map<Byte, List<Metrics>> map = metrics.stream()
.peek(metric -> { .peek(metric -> {
// 判断是否配置aliasFields 没有则配置默认 // 判断是否配置aliasFields 没有则配置默认
@@ -134,26 +126,30 @@ public class Job {
* 返回有数据集合表示:获取到下一组优先级的指标组任务 * 返回有数据集合表示:获取到下一组优先级的指标组任务
*/ */
public synchronized Set<Metrics> getNextCollectMetrics(Metrics metrics, boolean first) { public synchronized Set<Metrics> getNextCollectMetrics(Metrics metrics, boolean first) {
if (!isConstruct || priorMetrics == null || priorMetrics.isEmpty()) { if (priorMetrics == null || priorMetrics.isEmpty()) {
return null; return null;
} }
Set<Metrics> metricsSet = priorMetrics.get(0); Set<Metrics> metricsSet = priorMetrics.get(0);
if (first) { if (first) {
log.error("metrics must has one [availability] metrics at least."); if (metricsSet.isEmpty()) {
log.error("metrics must has one [availability] metrics at least.");
}
return metricsSet; return metricsSet;
} }
if (metrics == null) { if (metrics == null) {
log.error("metrics can not null when not first get"); log.error("metrics can not null when not first get");
return null;
} }
if (metrics != null && !metricsSet.remove(metrics)) { if (!metricsSet.remove(metrics)) {
log.error("Job {} appId {} app {} metrics {} remove empty error in priorMetrics.", log.error("Job {} appId {} app {} metrics {} remove empty error in priorMetrics.",
id, appId, app, metrics.getName()); id, appId, app, metrics.getName());
} }
if (metricsSet.isEmpty()) { if (metricsSet.isEmpty()) {
if (priorMetrics.size() == 1) { priorMetrics.remove(0);
if (priorMetrics.size() == 0) {
return null; return null;
} }
priorMetrics.remove(0);
return priorMetrics.get(0); return priorMetrics.get(0);
} else { } else {
return Collections.emptySet(); return Collections.emptySet();

View File

@@ -15,4 +15,34 @@
</properties> </properties>
<dependencies>
<!-- common -->
<dependency>
<groupId>com.usthe.tancloud</groupId>
<artifactId>common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!-- scheduler -->
<dependency>
<groupId>com.usthe.tancloud</groupId>
<artifactId>scheduler</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!-- spring -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<!-- swagger -->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-boot-starter</artifactId>
</dependency>
</dependencies>
</project> </project>

View File

@@ -0,0 +1,17 @@
package com.usthe.manager;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author tomsun28
* @date 2021/11/11 16:45
*/
@SpringBootApplication
public class Manager {
public static void main(String[] args) {
SpringApplication.run(Manager.class, args);
}
}

View File

@@ -0,0 +1,48 @@
package com.usthe.manager.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.oas.annotations.EnableOpenApi;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import java.util.Collections;
/**
* swagger config
* url: /swagger-ui/
* @author tomsun28
* @date 2021/11/11 17:01
*/
@Configuration
@EnableOpenApi
public class SwaggerConfig {
@Bean
public Docket docket(){
return new Docket(DocumentationType.OAS_30)
.apiInfo(apiInfo())
.enable(true)
.groupName("usthe.com")
.select()
.apis(RequestHandlerSelectors.any())
.paths(PathSelectors.any())
.build();
}
private ApiInfo apiInfo(){
return new ApiInfo(
"usthe api",
"monitor project",
"v1.0",
"usthe.com",
new Contact("tom", "usthe.com", "tomsun28@outlook.com"),
"Apache 2.0",
"http://www.apache.org/licenses/LICENSE-2.0",
Collections.emptyList());
}
}

View File

@@ -0,0 +1,79 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="true">
<springProperty scope="context" name="application_name" source="spring.application.name" defaultValue="collector"/>
<!-- 输出日志到控制台 ConsoleAppender -->
<appender name="ConsoleAppender" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<!--<pattern>%d %p (%file:%line\)- %m%n</pattern>-->
<!--格式化输出:%d:表示日期 %thread:表示线程名 %-5level:级别从左显示5个字符宽度 %msg:日志消息 %n:是换行符-->
<pattern>1-%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger - %msg%n</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<appender name="SystemOutFileAppender" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!-- 日志记录器的滚动策略,按日期,按大小记录 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- 归档的日志文件的路径。%d{yyyy-MM-dd}指定日期格式,%i指定索引 -->
<fileNamePattern>logs/${application_name}-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<!-- 除按日志记录之外还配置了日志文件不能超过200M若超过200M日志文件会以索引0开始 -->
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>200MB</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
</rollingPolicy>
<!-- 追加方式记录日志 -->
<append>true</append>
<!-- 日志文件的格式 -->
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>===%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level %logger Line:%-3L - %msg%n</pattern>
<charset>utf-8</charset>
</encoder>
</appender>
<appender name="ErrOutFileAppender" class="ch.qos.logback.core.rolling.RollingFileAppender">
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>logs/${application_name}-%d{yyyy-MM-dd}-error.%i.log</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>200MB</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
</rollingPolicy>
<!-- 追加方式记录日志 -->
<append>true</append>
<!-- 日志文件的格式 -->
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>===%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level %logger Line:%-3L - %msg%n</pattern>
<charset>utf-8</charset>
</encoder>
<!-- 此日志文件记录error及以上级别的 -->
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>ERROR</level>
</filter>
</appender>
<!--这个logger的设置是举例在org.springframework包下面的所有输出日志必须级别level在info及以上级别才会被输出-->
<!--这样可以避免输出一些spring框架的许多常见debug信息!-->
<logger name="org.springframework" level="info" />
<logger name="org.json" level="error"/>
<logger name="io.netty" level="info"/>
<logger name="org.slf4j" level="info"/>
<logger name="ch.qos.logback" level="info"/>
<!-- 生产环境配置 -->
<springProfile name="prod">
<root level="DEBUG">
<appender-ref ref="ConsoleAppender"/>
<appender-ref ref="SystemOutFileAppender"/>
<appender-ref ref="ErrOutFileAppender"/>
</root>
</springProfile>
<!-- 开发环境配置 -->
<springProfile name="dev">
<root level="INFO">
<appender-ref ref="ConsoleAppender"/>
<appender-ref ref="SystemOutFileAppender"/>
<appender-ref ref="ErrOutFileAppender"/>
</root>
</springProfile>
</configuration>

View File

@@ -24,6 +24,7 @@
<slf4j.version>1.7.21</slf4j.version> <slf4j.version>1.7.21</slf4j.version>
<xml.bind.version>2.3.0</xml.bind.version> <xml.bind.version>2.3.0</xml.bind.version>
<lombok.version>1.18.20</lombok.version> <lombok.version>1.18.20</lombok.version>
<swagger.version>3.0.0</swagger.version>
<!-- Test 3rd-party dependencies: --> <!-- Test 3rd-party dependencies: -->
<junit.version>5.7.0</junit.version> <junit.version>5.7.0</junit.version>
<easymock.version>4.0.2</easymock.version> <easymock.version>4.0.2</easymock.version>
@@ -44,6 +45,11 @@
<artifactId>jaxb-api</artifactId> <artifactId>jaxb-api</artifactId>
<version>${xml.bind.version}</version> <version>${xml.bind.version}</version>
</dependency> </dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-boot-starter</artifactId>
<version>${swagger.version}</version>
</dependency>
</dependencies> </dependencies>
</dependencyManagement> </dependencyManagement>