[collector] 采集器参数替换校验补充
This commit is contained in:
@@ -6,6 +6,7 @@ import com.usthe.collector.dispatch.DispatchConstants;
|
|||||||
import com.usthe.common.entity.job.Metrics;
|
import com.usthe.common.entity.job.Metrics;
|
||||||
import com.usthe.common.entity.job.protocol.HttpProtocol;
|
import com.usthe.common.entity.job.protocol.HttpProtocol;
|
||||||
import com.usthe.common.entity.message.CollectRep;
|
import com.usthe.common.entity.message.CollectRep;
|
||||||
|
import com.usthe.common.util.IpDomainUtil;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.apache.http.HttpHeaders;
|
import org.apache.http.HttpHeaders;
|
||||||
import org.apache.http.HttpStatus;
|
import org.apache.http.HttpStatus;
|
||||||
@@ -99,7 +100,10 @@ public class HttpCollectImpl extends AbstractCollect {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (ClientProtocolException e1) {
|
} catch (ClientProtocolException e1) {
|
||||||
log.error(e1.getMessage(), e1);
|
log.error(e1.getCause().getMessage(), e1);
|
||||||
|
builder.setCode(CollectRep.Code.UN_CONNECTABLE);
|
||||||
|
builder.setMsg(e1.getCause().getMessage());
|
||||||
|
return;
|
||||||
} catch (UnknownHostException e2) {
|
} catch (UnknownHostException e2) {
|
||||||
// 对端不可达
|
// 对端不可达
|
||||||
log.info(e2.getMessage());
|
log.info(e2.getMessage());
|
||||||
@@ -157,13 +161,15 @@ public class HttpCollectImpl extends AbstractCollect {
|
|||||||
HttpProtocol.Authorization auth = httpProtocol.getAuthorization();
|
HttpProtocol.Authorization auth = httpProtocol.getAuthorization();
|
||||||
if (auth != null && !DispatchConstants.BEARER_TOKEN.equals(auth.getType())) {
|
if (auth != null && !DispatchConstants.BEARER_TOKEN.equals(auth.getType())) {
|
||||||
HttpClientContext clientContext = new HttpClientContext();
|
HttpClientContext clientContext = new HttpClientContext();
|
||||||
if (DispatchConstants.BASIC_AUTH.equals(auth.getType())) {
|
if (DispatchConstants.BASIC_AUTH.equals(auth.getType()) && auth.getBasicAuthUsername() != null
|
||||||
|
&& auth.getBasicAuthPassword() != null) {
|
||||||
CredentialsProvider provider = new BasicCredentialsProvider();
|
CredentialsProvider provider = new BasicCredentialsProvider();
|
||||||
UsernamePasswordCredentials credentials
|
UsernamePasswordCredentials credentials
|
||||||
= new UsernamePasswordCredentials(auth.getBasicAuthUsername(), auth.getBasicAuthPassword());
|
= new UsernamePasswordCredentials(auth.getBasicAuthUsername(), auth.getBasicAuthPassword());
|
||||||
provider.setCredentials(AuthScope.ANY, credentials);
|
provider.setCredentials(AuthScope.ANY, credentials);
|
||||||
clientContext.setCredentialsProvider(provider);
|
clientContext.setCredentialsProvider(provider);
|
||||||
} else if (DispatchConstants.DIGEST_AUTH.equals(auth.getType())) {
|
} else if (DispatchConstants.DIGEST_AUTH.equals(auth.getType()) && auth.getDigestAuthUsername() != null
|
||||||
|
&& auth.getDigestAuthPassword() != null) {
|
||||||
CredentialsProvider provider = new BasicCredentialsProvider();
|
CredentialsProvider provider = new BasicCredentialsProvider();
|
||||||
UsernamePasswordCredentials credentials
|
UsernamePasswordCredentials credentials
|
||||||
= new UsernamePasswordCredentials(auth.getBasicAuthUsername(), auth.getBasicAuthPassword());
|
= new UsernamePasswordCredentials(auth.getBasicAuthUsername(), auth.getBasicAuthPassword());
|
||||||
@@ -238,7 +244,15 @@ public class HttpCollectImpl extends AbstractCollect {
|
|||||||
// todo 处理请求内容 body 暂不支持body
|
// todo 处理请求内容 body 暂不支持body
|
||||||
|
|
||||||
// uri
|
// uri
|
||||||
requestBuilder.setUri(httpProtocol.getUrl());
|
if (IpDomainUtil.isHasSchema(httpProtocol.getHost())) {
|
||||||
|
requestBuilder.setUri(httpProtocol.getHost() + ":" + httpProtocol.getPort() + httpProtocol.getUrl());
|
||||||
|
} else {
|
||||||
|
if (httpProtocol.isSsl()) {
|
||||||
|
requestBuilder.setUri("https://" + httpProtocol.getHost() + ":" + httpProtocol.getPort() + httpProtocol.getUrl());
|
||||||
|
} else {
|
||||||
|
requestBuilder.setUri("http://" + httpProtocol.getHost() + ":" + httpProtocol.getPort() + httpProtocol.getUrl());
|
||||||
|
}
|
||||||
|
}
|
||||||
return requestBuilder.build();
|
return requestBuilder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -108,6 +108,7 @@ public class CommonDispatcher implements MetricsTaskDispatch, CollectDataDispatc
|
|||||||
.setTime(System.currentTimeMillis())
|
.setTime(System.currentTimeMillis())
|
||||||
.setCode(CollectRep.Code.TIMEOUT).setMsg("collect timeout").build();
|
.setCode(CollectRep.Code.TIMEOUT).setMsg("collect timeout").build();
|
||||||
dispatchCollectData(metricsTime.timeout, metricsTime.getMetrics(), metricsData);
|
dispatchCollectData(metricsTime.timeout, metricsTime.getMetrics(), metricsData);
|
||||||
|
metricsTimeoutMonitorMap.remove(entry.getKey());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Thread.sleep(20000);
|
Thread.sleep(20000);
|
||||||
@@ -129,7 +130,7 @@ public class CommonDispatcher implements MetricsTaskDispatch, CollectDataDispatc
|
|||||||
metricsSet.forEach(metrics -> {
|
metricsSet.forEach(metrics -> {
|
||||||
MetricsCollect metricsCollect = new MetricsCollect(metrics, timeout, this);
|
MetricsCollect metricsCollect = new MetricsCollect(metrics, timeout, this);
|
||||||
jobRequestQueue.addJob(metricsCollect);
|
jobRequestQueue.addJob(metricsCollect);
|
||||||
metricsTimeoutMonitorMap.put(job.getId() + metrics.getName(),
|
metricsTimeoutMonitorMap.put(job.getId() + "-" + metrics.getName(),
|
||||||
new MetricsTime(System.currentTimeMillis(), metrics, timeout));
|
new MetricsTime(System.currentTimeMillis(), metrics, timeout));
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@@ -138,6 +139,7 @@ public class CommonDispatcher implements MetricsTaskDispatch, CollectDataDispatc
|
|||||||
public void dispatchCollectData(Timeout timeout, Metrics metrics, CollectRep.MetricsData metricsData) {
|
public void dispatchCollectData(Timeout timeout, Metrics metrics, CollectRep.MetricsData metricsData) {
|
||||||
WheelTimerTask timerJob = (WheelTimerTask) timeout.task();
|
WheelTimerTask timerJob = (WheelTimerTask) timeout.task();
|
||||||
Job job = timerJob.getJob();
|
Job job = timerJob.getJob();
|
||||||
|
metricsTimeoutMonitorMap.remove(job.getId() + "-" + metrics.getName());
|
||||||
Set<Metrics> metricsSet = job.getNextCollectMetrics(metrics, false);
|
Set<Metrics> metricsSet = job.getNextCollectMetrics(metrics, false);
|
||||||
if (job.isCyclic()) {
|
if (job.isCyclic()) {
|
||||||
// 若是异步的周期性循环任务,直接发送指标组的采集数据到消息中间件
|
// 若是异步的周期性循环任务,直接发送指标组的采集数据到消息中间件
|
||||||
|
|||||||
@@ -110,11 +110,17 @@ public class MetricsCollect implements Runnable, Comparable<MetricsCollect> {
|
|||||||
try {
|
try {
|
||||||
abstractCollect.collect(response, monitorId, app, metrics);
|
abstractCollect.collect(response, monitorId, app, metrics);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("[Metrics Collect]: {}.", e.getMessage(), e);
|
String msg = e.getMessage();
|
||||||
|
if (msg == null && e.getCause() != null) {
|
||||||
|
msg = e.getCause().getMessage();
|
||||||
|
}
|
||||||
|
log.error("[Metrics Collect]: {}.", msg, e);
|
||||||
response.setCode(CollectRep.Code.FAIL);
|
response.setCode(CollectRep.Code.FAIL);
|
||||||
|
if (msg != null) {
|
||||||
response.setMsg(e.getMessage());
|
response.setMsg(e.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
// 别名属性表达式替换计算
|
// 别名属性表达式替换计算
|
||||||
if (fastFailed()) {
|
if (fastFailed()) {
|
||||||
return;
|
return;
|
||||||
|
|||||||
@@ -88,6 +88,8 @@ public class TimerDispatcher implements TimerDispatch {
|
|||||||
public void responseSyncJobData(long jobId, List<CollectRep.MetricsData> metricsDataTemps) {
|
public void responseSyncJobData(long jobId, List<CollectRep.MetricsData> metricsDataTemps) {
|
||||||
currentTempTaskMap.remove(jobId);
|
currentTempTaskMap.remove(jobId);
|
||||||
CollectResponseEventListener eventListener = eventListeners.remove(jobId);
|
CollectResponseEventListener eventListener = eventListeners.remove(jobId);
|
||||||
|
if (eventListener != null) {
|
||||||
eventListener.response(metricsDataTemps);
|
eventListener.response(metricsDataTemps);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
package com.usthe.collector.dispatch.timer;
|
package com.usthe.collector.dispatch.timer;
|
||||||
|
|
||||||
import com.google.gson.Gson;
|
import com.google.gson.Gson;
|
||||||
import com.google.gson.GsonBuilder;
|
|
||||||
import com.google.gson.JsonArray;
|
import com.google.gson.JsonArray;
|
||||||
import com.google.gson.JsonElement;
|
import com.google.gson.JsonElement;
|
||||||
import com.google.gson.JsonObject;
|
import com.google.gson.JsonObject;
|
||||||
@@ -13,6 +12,7 @@ import com.usthe.common.entity.job.Job;
|
|||||||
import com.usthe.common.entity.job.Metrics;
|
import com.usthe.common.entity.job.Metrics;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
@@ -24,8 +24,9 @@ import java.util.stream.Collectors;
|
|||||||
*/
|
*/
|
||||||
public class WheelTimerTask implements TimerTask {
|
public class WheelTimerTask implements TimerTask {
|
||||||
|
|
||||||
private Job job;
|
private final Job job;
|
||||||
private MetricsTaskDispatch metricsTaskDispatch;
|
private final MetricsTaskDispatch metricsTaskDispatch;
|
||||||
|
private static final Gson GSON = new Gson();
|
||||||
|
|
||||||
public WheelTimerTask(Job job) {
|
public WheelTimerTask(Job job) {
|
||||||
this.metricsTaskDispatch = SpringContextHolder.getBean(MetricsTaskDispatch.class);
|
this.metricsTaskDispatch = SpringContextHolder.getBean(MetricsTaskDispatch.class);
|
||||||
@@ -35,63 +36,77 @@ public class WheelTimerTask implements TimerTask {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 初始化job 将监控实际参数值对采集字段进行替换
|
* 初始化job填充信息
|
||||||
* @param job job
|
* @param job job
|
||||||
*/
|
*/
|
||||||
private void initJobMetrics(Job job) {
|
private void initJobMetrics(Job job) {
|
||||||
|
// 将监控实际参数值对采集字段进行替换
|
||||||
List<Configmap> config = job.getConfigmap();
|
List<Configmap> config = job.getConfigmap();
|
||||||
Map<String, Configmap> configmap = config.stream().collect(Collectors.toMap(Configmap::getKey, item -> item));
|
Map<String, Configmap> configmap = config.stream().collect(Collectors.toMap(Configmap::getKey, item -> item));
|
||||||
List<Metrics> metrics = job.getMetrics();
|
List<Metrics> metrics = job.getMetrics();
|
||||||
|
|
||||||
Gson gson = new Gson();
|
|
||||||
List<Metrics> metricsTmp = new ArrayList<>(metrics.size());
|
List<Metrics> metricsTmp = new ArrayList<>(metrics.size());
|
||||||
for (Metrics metric : metrics) {
|
for (Metrics metric : metrics) {
|
||||||
JsonElement jsonElement = gson.toJsonTree(metric);
|
JsonElement jsonElement = GSON.toJsonTree(metric);
|
||||||
jsonElement = replaceSpecialValue(jsonElement, configmap);
|
jsonElement = replaceSpecialValue(jsonElement, configmap);
|
||||||
metric = gson.fromJson(jsonElement, Metrics.class);
|
metric = GSON.fromJson(jsonElement, Metrics.class);
|
||||||
metricsTmp.add(metric);
|
metricsTmp.add(metric);
|
||||||
}
|
}
|
||||||
job.setMetrics(metricsTmp);
|
job.setMetrics(metricsTmp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* json参数替换
|
||||||
|
* @param jsonElement json
|
||||||
|
* @param configmap 参数map
|
||||||
|
* @return json
|
||||||
|
*/
|
||||||
private JsonElement replaceSpecialValue(JsonElement jsonElement, Map<String, Configmap> configmap) {
|
private JsonElement replaceSpecialValue(JsonElement jsonElement, Map<String, Configmap> configmap) {
|
||||||
if (jsonElement.isJsonObject()) {
|
if (jsonElement.isJsonObject()) {
|
||||||
JsonObject jsonObject = jsonElement.getAsJsonObject();
|
JsonObject jsonObject = jsonElement.getAsJsonObject();
|
||||||
jsonObject.entrySet().forEach(entry -> {
|
Iterator<Map.Entry<String, JsonElement>> iterator = jsonObject.entrySet().iterator();
|
||||||
|
while (iterator.hasNext()) {
|
||||||
|
Map.Entry<String, JsonElement> entry = iterator.next();
|
||||||
JsonElement element = entry.getValue();
|
JsonElement element = entry.getValue();
|
||||||
if (element.isJsonPrimitive()) {
|
if (element.isJsonPrimitive()) {
|
||||||
// 判断是否含有特殊字符 替换
|
// 判断是否含有特殊字符 替换
|
||||||
String value = element.getAsString();
|
String value = element.getAsString();
|
||||||
if (value.startsWith("^_^")) {
|
if (value.startsWith("^_^") && value.endsWith("^_^")) {
|
||||||
value = value.replaceAll("\\^_\\^", "");
|
value = value.replaceAll("\\^_\\^", "");
|
||||||
Configmap param = configmap.get(value);
|
Configmap param = configmap.get(value);
|
||||||
if (param != null) {
|
if (param != null) {
|
||||||
value = (String) param.getValue();
|
value = (String) param.getValue();
|
||||||
jsonObject.addProperty(entry.getKey(), value);
|
jsonObject.addProperty(entry.getKey(), value);
|
||||||
|
} else {
|
||||||
|
iterator.remove();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
jsonObject.add(entry.getKey(), replaceSpecialValue(entry.getValue(), configmap));
|
jsonObject.add(entry.getKey(), replaceSpecialValue(entry.getValue(), configmap));
|
||||||
}
|
}
|
||||||
});
|
}
|
||||||
} else if (jsonElement.isJsonArray()) {
|
} else if (jsonElement.isJsonArray()) {
|
||||||
JsonArray jsonArray = jsonElement.getAsJsonArray();
|
JsonArray jsonArray = jsonElement.getAsJsonArray();
|
||||||
for (int i = 0; i < jsonArray.size(); i++) {
|
Iterator<JsonElement> iterator = jsonArray.iterator();
|
||||||
JsonElement element = jsonArray.get(i);
|
int index = 0;
|
||||||
|
while (iterator.hasNext()) {
|
||||||
|
JsonElement element = iterator.next();
|
||||||
if (element.isJsonPrimitive()) {
|
if (element.isJsonPrimitive()) {
|
||||||
// 判断是否含有特殊字符 替换
|
// 判断是否含有特殊字符 替换
|
||||||
String value = element.getAsString();
|
String value = element.getAsString();
|
||||||
if (value.startsWith("^_^")) {
|
if (value.startsWith("^_^") && value.endsWith("^_^")) {
|
||||||
value = value.replaceAll("\\^_\\^", "");
|
value = value.replaceAll("\\^_\\^", "");
|
||||||
Configmap param = configmap.get(value);
|
Configmap param = configmap.get(value);
|
||||||
if (param != null) {
|
if (param != null) {
|
||||||
value = (String) param.getValue();
|
value = (String) param.getValue();
|
||||||
jsonArray.set(i, new JsonPrimitive(value));
|
jsonArray.set(index, new JsonPrimitive(value));
|
||||||
|
} else {
|
||||||
|
iterator.remove();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
jsonArray.set(i, replaceSpecialValue(element, configmap));
|
jsonArray.set(index, replaceSpecialValue(element, configmap));
|
||||||
}
|
}
|
||||||
|
index++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return jsonElement;
|
return jsonElement;
|
||||||
|
|||||||
@@ -19,6 +19,8 @@ public class IpDomainUtil {
|
|||||||
|
|
||||||
private static final String LOCALHOST = "localhost";
|
private static final String LOCALHOST = "localhost";
|
||||||
|
|
||||||
|
private static final Pattern DOMAIN_SCHEMA = Pattern.compile("^([hH][tT]{2}[pP]://|[hH][tT]{2}[pP][sS]://)");
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 校验判断是否是 ip或者domain
|
* 校验判断是否是 ip或者domain
|
||||||
* @param ipDomain ip domain string
|
* @param ipDomain ip domain string
|
||||||
@@ -37,4 +39,13 @@ public class IpDomainUtil {
|
|||||||
return DOMAIN_PATTERN.matcher(ipDomain).matches();
|
return DOMAIN_PATTERN.matcher(ipDomain).matches();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 判断 domain or ip 是否存在http / https 头
|
||||||
|
* @param domainIp host
|
||||||
|
* @return 存在true
|
||||||
|
*/
|
||||||
|
public static boolean isHasSchema(String domainIp) {
|
||||||
|
return DOMAIN_SCHEMA.matcher(domainIp).matches();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user