[collector]http database 采集修改
This commit is contained in:
@@ -0,0 +1,261 @@
|
||||
package com.usthe.collector.collect.database;
|
||||
|
||||
import com.usthe.collector.collect.AbstractCollect;
|
||||
import com.usthe.collector.common.cache.CacheIdentifier;
|
||||
import com.usthe.collector.common.cache.CommonCache;
|
||||
import com.usthe.collector.common.cache.support.CommonJdbcConnect;
|
||||
import com.usthe.collector.util.CollectorConstants;
|
||||
import com.usthe.common.entity.job.Metrics;
|
||||
import com.usthe.common.entity.job.protocol.JdbcProtocol;
|
||||
import com.usthe.common.entity.message.CollectRep;
|
||||
import com.usthe.common.util.CommonConstants;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.DriverManager;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* 数据库JDBC通用查询
|
||||
* @author tomsun28
|
||||
* @date 2021/12/1 21:37
|
||||
*/
|
||||
@Slf4j
|
||||
public class JdbcCommonCollect extends AbstractCollect {
|
||||
|
||||
private static final String QUERY_TYPE_ONE_ROW = "oneRow";
|
||||
private static final String QUERY_TYPE_MULTI_ROW = "multiRow";
|
||||
private static final String QUERY_TYPE_COLUMNS = "columns";
|
||||
|
||||
private JdbcCommonCollect(){}
|
||||
|
||||
public static JdbcCommonCollect getInstance() {
|
||||
return Singleton.INSTANCE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void collect(CollectRep.MetricsData.Builder builder, long appId, String app, Metrics metrics) {
|
||||
long startTime = System.currentTimeMillis();
|
||||
// 简单校验必有参数
|
||||
if (metrics == null || metrics.getJdbc() == null) {
|
||||
builder.setCode(CollectRep.Code.FAIL);
|
||||
builder.setMsg("DATABASE collect must has jdbc params");
|
||||
return;
|
||||
}
|
||||
JdbcProtocol jdbcProtocol = metrics.getJdbc();
|
||||
String databaseUrl = constructDatabaseUrl(jdbcProtocol);
|
||||
try {
|
||||
Statement statement = getConnection(jdbcProtocol.getUsername(),
|
||||
jdbcProtocol.getPassword(), databaseUrl);
|
||||
switch (jdbcProtocol.getQueryType()) {
|
||||
case QUERY_TYPE_ONE_ROW:
|
||||
queryOneRow(statement, jdbcProtocol.getSql(), metrics.getAliasFields(), builder, startTime);
|
||||
break;
|
||||
case QUERY_TYPE_MULTI_ROW:
|
||||
queryMultiRow(statement, jdbcProtocol.getSql(), metrics.getAliasFields(), builder, startTime);
|
||||
break;
|
||||
case QUERY_TYPE_COLUMNS:
|
||||
queryOneRowByMatchTwoColumns(statement, jdbcProtocol.getSql(), metrics.getAliasFields(), builder, startTime);
|
||||
break;
|
||||
default:
|
||||
builder.setCode(CollectRep.Code.FAIL);
|
||||
builder.setMsg("Not support database query type: " + jdbcProtocol.getQueryType());
|
||||
break;
|
||||
}
|
||||
} catch (SQLException sqlException) {
|
||||
log.error("Jdbc sql error: {}, code: {}.", sqlException.getMessage(),
|
||||
sqlException.getErrorCode(), sqlException);
|
||||
builder.setCode(CollectRep.Code.FAIL);
|
||||
builder.setMsg("Query Error: " + sqlException.getMessage() + " Code: " + sqlException.getErrorCode());
|
||||
} catch (Exception e) {
|
||||
log.error("Jdbc error: {}.", e.getMessage(), e);
|
||||
builder.setCode(CollectRep.Code.FAIL);
|
||||
builder.setMsg("Query Error: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private Statement getConnection(String username, String password, String url) throws Exception {
|
||||
CacheIdentifier identifier = CacheIdentifier.builder()
|
||||
.ip(url)
|
||||
.username(username).password(password).build();
|
||||
Optional<Object> cacheOption = CommonCache.getInstance().getCache(identifier, true);
|
||||
Statement statement = null;
|
||||
if (cacheOption.isPresent()) {
|
||||
CommonJdbcConnect jdbcConnect = (CommonJdbcConnect) cacheOption.get();
|
||||
try {
|
||||
statement = jdbcConnect.getConnection().createStatement();
|
||||
// 设置查询超时时间10秒
|
||||
statement.setQueryTimeout(10);
|
||||
// 设置查询最大行数1000行
|
||||
statement.setMaxRows(1000);
|
||||
} catch (Exception e) {
|
||||
log.info("The jdbc connect form cache, create statement error: {}", e.getMessage());
|
||||
try {
|
||||
if (statement != null) {
|
||||
statement.close();
|
||||
}
|
||||
jdbcConnect.close();
|
||||
statement = null;
|
||||
} catch (Exception e2) {
|
||||
log.error(e2.getMessage());
|
||||
}
|
||||
CommonCache.getInstance().removeCache(identifier);
|
||||
}
|
||||
}
|
||||
if (statement != null) {
|
||||
return statement;
|
||||
}
|
||||
// 复用失败则新建连接
|
||||
Connection connection = DriverManager.getConnection(url, username, password);
|
||||
statement = connection.createStatement();
|
||||
// 设置查询超时时间10秒
|
||||
statement.setQueryTimeout(10);
|
||||
// 设置查询最大行数1000行
|
||||
statement.setMaxRows(1000);
|
||||
CommonJdbcConnect jdbcConnect = new CommonJdbcConnect(connection);
|
||||
CommonCache.getInstance().addCache(identifier, jdbcConnect, 10000L);
|
||||
return statement;
|
||||
}
|
||||
|
||||
/**
|
||||
* 查询一行数据, 通过查询返回结果集的列名称,和查询的字段映射
|
||||
* eg:
|
||||
* 查询字段:one tow three four
|
||||
* 查询SQL:select one, tow, three, four from book limit 1;
|
||||
* @param statement 执行器
|
||||
* @param sql sql
|
||||
* @param columns 查询的列头(一般是数据库表字段,也可能包含特殊字段,eg: responseTime)
|
||||
* @throws Exception when error happen
|
||||
*/
|
||||
private void queryOneRow(Statement statement, String sql, List<String> columns,
|
||||
CollectRep.MetricsData.Builder builder, long startTime) throws Exception {
|
||||
statement.setMaxRows(1);
|
||||
ResultSet resultSet = statement.executeQuery(sql);
|
||||
try {
|
||||
if (resultSet.next()) {
|
||||
CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder();
|
||||
for (String column : columns) {
|
||||
if (CollectorConstants.RESPONSE_TIME.equals(column)) {
|
||||
long time = System.currentTimeMillis() - startTime;
|
||||
valueRowBuilder.addColumns(String.valueOf(time));
|
||||
} else {
|
||||
String value = resultSet.getString(column);
|
||||
value = value == null ? CommonConstants.NULL_VALUE : value;
|
||||
valueRowBuilder.addColumns(value);
|
||||
}
|
||||
}
|
||||
builder.addValues(valueRowBuilder.build());
|
||||
}
|
||||
} finally {
|
||||
resultSet.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 查询一行数据, 通过查询的两列数据(key-value),key和查询的字段匹配,value为查询字段的值
|
||||
* eg:
|
||||
* 查询字段:one tow three four
|
||||
* 查询SQL:select key, value from book;
|
||||
* 返回的key映射查询字段
|
||||
* @param statement 执行器
|
||||
* @param sql sql
|
||||
* @param columns 查询的列头(一般是数据库表字段,也可能包含特殊字段,eg: responseTime)
|
||||
* @throws Exception when error happen
|
||||
*/
|
||||
private void queryOneRowByMatchTwoColumns(Statement statement, String sql, List<String> columns,
|
||||
CollectRep.MetricsData.Builder builder, long startTime) throws Exception {
|
||||
ResultSet resultSet = statement.executeQuery(sql);
|
||||
try {
|
||||
HashMap<String, String> values = new HashMap<>(columns.size());
|
||||
while (resultSet.next()) {
|
||||
if (resultSet.getString(1) != null) {
|
||||
values.put(resultSet.getString(1).toLowerCase(), resultSet.getString(2));
|
||||
}
|
||||
}
|
||||
CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder();
|
||||
for (String column : columns) {
|
||||
if (CollectorConstants.RESPONSE_TIME.equals(column)) {
|
||||
long time = System.currentTimeMillis() - startTime;
|
||||
valueRowBuilder.addColumns(String.valueOf(time));
|
||||
} else {
|
||||
String value = values.get(column.toLowerCase());
|
||||
value = value == null ? CommonConstants.NULL_VALUE : value;
|
||||
valueRowBuilder.addColumns(value);
|
||||
}
|
||||
}
|
||||
builder.addValues(valueRowBuilder.build());
|
||||
} finally {
|
||||
resultSet.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 查询多行数据, 通过查询返回结果集的列名称,和查询的字段映射
|
||||
* eg:
|
||||
* 查询字段:one tow three four
|
||||
* 查询SQL:select one, tow, three, four from book limit 1;
|
||||
* @param statement 执行器
|
||||
* @param sql sql
|
||||
* @param columns 查询的列头(一般是数据库表字段,也可能包含特殊字段,eg: responseTime)
|
||||
* @throws Exception when error happen
|
||||
*/
|
||||
private void queryMultiRow(Statement statement, String sql, List<String> columns,
|
||||
CollectRep.MetricsData.Builder builder, long startTime) throws Exception {
|
||||
ResultSet resultSet = statement.executeQuery(sql);
|
||||
try {
|
||||
while (resultSet.next()) {
|
||||
CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder();
|
||||
for (String column : columns) {
|
||||
if (CollectorConstants.RESPONSE_TIME.equals(column)) {
|
||||
long time = System.currentTimeMillis() - startTime;
|
||||
valueRowBuilder.addColumns(String.valueOf(time));
|
||||
} else {
|
||||
String value = resultSet.getString(column);
|
||||
value = value == null ? CommonConstants.NULL_VALUE : value;
|
||||
valueRowBuilder.addColumns(value);
|
||||
}
|
||||
}
|
||||
builder.addValues(valueRowBuilder.build());
|
||||
}
|
||||
} finally {
|
||||
resultSet.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据jdbc入参构造数据库URL
|
||||
* @param jdbcProtocol jdbc
|
||||
* @return URL
|
||||
*/
|
||||
private String constructDatabaseUrl(JdbcProtocol jdbcProtocol) {
|
||||
if (Objects.nonNull(jdbcProtocol.getUrl())
|
||||
&& !Objects.equals("", jdbcProtocol.getUrl())
|
||||
&& jdbcProtocol.getUrl().startsWith("jdbc")) {
|
||||
// 入参数URL有效 则优先级最高返回
|
||||
return jdbcProtocol.getUrl();
|
||||
}
|
||||
String url;
|
||||
switch (jdbcProtocol.getPlatform()) {
|
||||
case "mysql":
|
||||
url = "jdbc:mysql://" + jdbcProtocol.getHost() + ":" + jdbcProtocol.getPort()
|
||||
+ "/" + (jdbcProtocol.getDatabase() == null ? "" : jdbcProtocol.getDatabase())
|
||||
+ "?useUnicode=true&characterEncoding=utf-8&useSSL=false";
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("Not support database platform: " + jdbcProtocol.getPlatform());
|
||||
|
||||
}
|
||||
return url;
|
||||
}
|
||||
|
||||
private static class Singleton {
|
||||
private static final JdbcCommonCollect INSTANCE = new JdbcCommonCollect();
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user