[manager,collector]新增mysql指标,采集器调度第0优先级失败则取消后续的优化
This commit is contained in:
@@ -1,5 +1,6 @@
|
|||||||
package com.usthe.collector.collect.database;
|
package com.usthe.collector.collect.database;
|
||||||
|
|
||||||
|
import com.mysql.cj.jdbc.exceptions.CommunicationsException;
|
||||||
import com.usthe.collector.collect.AbstractCollect;
|
import com.usthe.collector.collect.AbstractCollect;
|
||||||
import com.usthe.collector.collect.common.cache.CacheIdentifier;
|
import com.usthe.collector.collect.common.cache.CacheIdentifier;
|
||||||
import com.usthe.collector.collect.common.cache.CommonCache;
|
import com.usthe.collector.collect.common.cache.CommonCache;
|
||||||
@@ -68,9 +69,12 @@ public class JdbcCommonCollect extends AbstractCollect {
|
|||||||
builder.setMsg("Not support database query type: " + jdbcProtocol.getQueryType());
|
builder.setMsg("Not support database query type: " + jdbcProtocol.getQueryType());
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
} catch (CommunicationsException communicationsException) {
|
||||||
|
log.warn("Jdbc sql error: {}, code: {}.", communicationsException.getMessage(), communicationsException.getErrorCode());
|
||||||
|
builder.setCode(CollectRep.Code.UN_REACHABLE);
|
||||||
|
builder.setMsg("Query Error: " + communicationsException.getMessage() + " Code: " + communicationsException.getErrorCode());
|
||||||
} catch (SQLException sqlException) {
|
} catch (SQLException sqlException) {
|
||||||
log.error("Jdbc sql error: {}, code: {}.", sqlException.getMessage(),
|
log.warn("Jdbc sql error: {}, code: {}.", sqlException.getMessage(), sqlException.getErrorCode());
|
||||||
sqlException.getErrorCode(), sqlException);
|
|
||||||
builder.setCode(CollectRep.Code.FAIL);
|
builder.setCode(CollectRep.Code.FAIL);
|
||||||
builder.setMsg("Query Error: " + sqlException.getMessage() + " Code: " + sqlException.getErrorCode());
|
builder.setMsg("Query Error: " + sqlException.getMessage() + " Code: " + sqlException.getErrorCode());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
|||||||
@@ -144,7 +144,10 @@ public class CommonDispatcher implements MetricsTaskDispatch, CollectDataDispatc
|
|||||||
if (job.isCyclic()) {
|
if (job.isCyclic()) {
|
||||||
// 若是异步的周期性循环任务,直接发送指标组的采集数据到消息中间件
|
// 若是异步的周期性循环任务,直接发送指标组的采集数据到消息中间件
|
||||||
kafkaDataExporter.send(metricsData);
|
kafkaDataExporter.send(metricsData);
|
||||||
if (metricsSet == null) {
|
// 若metricsSet为null表示执行完成
|
||||||
|
// 或判断采集指标组是否优先级为0,即为可用性采集指标组 若可用性采集失败 则取消后面的指标组调度直接进入下一轮调度
|
||||||
|
if (metricsSet == null
|
||||||
|
|| (metrics.getPriority() == (byte)0 && metricsData.getCode() != CollectRep.Code.SUCCESS)) {
|
||||||
// 此Job所有指标组采集执行完成
|
// 此Job所有指标组采集执行完成
|
||||||
// 周期性任务再次将任务push到时间轮
|
// 周期性任务再次将任务push到时间轮
|
||||||
// 先判断此次任务执行时间与任务采集间隔时间
|
// 先判断此次任务执行时间与任务采集间隔时间
|
||||||
|
|||||||
@@ -65,3 +65,76 @@ metrics:
|
|||||||
# sql
|
# sql
|
||||||
sql: show global variables where Variable_name like 'version%' or Variable_name = 'max_connections' or Variable_name = 'datadir' or Variable_name = 'port';
|
sql: show global variables where Variable_name like 'version%' or Variable_name = 'max_connections' or Variable_name = 'datadir' or Variable_name = 'port';
|
||||||
url: ^_^url^_^
|
url: ^_^url^_^
|
||||||
|
|
||||||
|
- name: status
|
||||||
|
priority: 1
|
||||||
|
fields:
|
||||||
|
# 指标信息 包括 field名称 type字段类型:0-number数字,1-string字符串 instance是否为实例主键 unit:指标单位
|
||||||
|
- field: threads_created
|
||||||
|
type: 0
|
||||||
|
- field: threads_connected
|
||||||
|
type: 0
|
||||||
|
- field: threads_cached
|
||||||
|
type: 0
|
||||||
|
- field: threads_running
|
||||||
|
type: 0
|
||||||
|
# (非必须)监控指标别名,与上面的指标名映射。用于采集接口数据字段不直接是最终指标名称,需要此别名做映射转换
|
||||||
|
aliasFields:
|
||||||
|
- threads_created
|
||||||
|
- threads_connected
|
||||||
|
- threads_cached
|
||||||
|
- threads_running
|
||||||
|
# (非必须)指标计算表达式,与上面的别名一起作用,计算出最终需要的指标值
|
||||||
|
# eg: cores=core1+core2, usage=usage, waitTime=allTime-runningTime
|
||||||
|
calculates:
|
||||||
|
- threads_created=threads_created
|
||||||
|
- threads_connected=threads_connected
|
||||||
|
- threads_cached=threads_cached
|
||||||
|
- threads_running=threads_running
|
||||||
|
protocol: jdbc
|
||||||
|
jdbc:
|
||||||
|
# 主机host: ipv4 ipv6 域名
|
||||||
|
host: ^_^host^_^
|
||||||
|
# 端口
|
||||||
|
port: ^_^port^_^
|
||||||
|
platform: mysql
|
||||||
|
username: ^_^username^_^
|
||||||
|
password: ^_^password^_^
|
||||||
|
database: ^_^database^_^
|
||||||
|
# SQL查询方式: oneRow, multiRow, columns
|
||||||
|
queryType: columns
|
||||||
|
# sql
|
||||||
|
sql: show global status where Variable_name like 'thread%' or Variable_name = 'com_commit' or Variable_name = 'com_rollback' or Variable_name = 'questions' or Variable_name = 'uptime';
|
||||||
|
url: ^_^url^_^
|
||||||
|
|
||||||
|
- name: innodb
|
||||||
|
priority: 2
|
||||||
|
fields:
|
||||||
|
# 指标信息 包括 field名称 type字段类型:0-number数字,1-string字符串 instance是否为实例主键 unit:指标单位
|
||||||
|
- field: innodb_data_reads
|
||||||
|
type: 0
|
||||||
|
unit: 次数
|
||||||
|
- field: innodb_data_writes
|
||||||
|
type: 0
|
||||||
|
unit: 次数
|
||||||
|
- field: innodb_data_read
|
||||||
|
type: 0
|
||||||
|
unit: kb
|
||||||
|
- field: innodb_data_written
|
||||||
|
type: 0
|
||||||
|
unit: kb
|
||||||
|
protocol: jdbc
|
||||||
|
jdbc:
|
||||||
|
# 主机host: ipv4 ipv6 域名
|
||||||
|
host: ^_^host^_^
|
||||||
|
# 端口
|
||||||
|
port: ^_^port^_^
|
||||||
|
platform: mysql
|
||||||
|
username: ^_^username^_^
|
||||||
|
password: ^_^password^_^
|
||||||
|
database: ^_^database^_^
|
||||||
|
# SQL查询方式: oneRow, multiRow, columns
|
||||||
|
queryType: columns
|
||||||
|
# sql
|
||||||
|
sql: show global status where Variable_name like 'innodb%';
|
||||||
|
url: ^_^url^_^
|
||||||
@@ -62,7 +62,7 @@ public class MemoryDataStorage implements DisposableBean {
|
|||||||
private void saveData(CollectRep.MetricsData metricsData) {
|
private void saveData(CollectRep.MetricsData metricsData) {
|
||||||
String hashKey = metricsData.getId() + metricsData.getMetrics();
|
String hashKey = metricsData.getId() + metricsData.getMetrics();
|
||||||
if (metricsData.getValuesList().isEmpty()) {
|
if (metricsData.getValuesList().isEmpty()) {
|
||||||
log.info("[warehouse] redis flush metrics data {} is null, ignore.", hashKey);
|
log.debug("[warehouse memory] redis flush metrics data {} is null, ignore.", hashKey);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
metricsDataMap.put(hashKey, metricsData);
|
metricsDataMap.put(hashKey, metricsData);
|
||||||
|
|||||||
@@ -70,7 +70,7 @@ public class RedisDataStorage implements DisposableBean {
|
|||||||
String key = String.valueOf(metricsData.getId());
|
String key = String.valueOf(metricsData.getId());
|
||||||
String hashKey = metricsData.getMetrics();
|
String hashKey = metricsData.getMetrics();
|
||||||
if (metricsData.getValuesList().isEmpty()) {
|
if (metricsData.getValuesList().isEmpty()) {
|
||||||
log.info("[warehouse] redis flush metrics data {}:{} is null, ignore.", key, hashKey);
|
log.info("[warehouse redis] redis flush metrics data {} - {} is null, ignore.", key, hashKey);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
RedisAsyncCommands<String, CollectRep.MetricsData> commands = connection.async();
|
RedisAsyncCommands<String, CollectRep.MetricsData> commands = connection.async();
|
||||||
|
|||||||
Reference in New Issue
Block a user