Prechádzať zdrojové kódy

[collector]http公共客户端,公共缓存

tomsun28 4 rokov pred
rodič
commit
a7d932b6c9

+ 14 - 0
collector/src/main/java/com/usthe/collector/collect/common/cache/CacheCloseable.java

@@ -0,0 +1,14 @@
+package com.usthe.collector.collect.common.cache;
+
+/**
+ * 连接资源关闭回调接口
+ * @author tomsun28
+ * @date 2022/1/1 21:03
+ */
+public interface CacheCloseable {
+
+    /**
+     * 在缓存remove掉此对象前,回调接口对连接对象进行相关资源的释放
+     */
+    void close();
+}

+ 23 - 0
collector/src/main/java/com/usthe/collector/collect/common/cache/CacheIdentifier.java

@@ -0,0 +1,23 @@
+package com.usthe.collector.collect.common.cache;
+
+import lombok.Builder;
+import lombok.Data;
+
+/**
+ * 缓存key唯一标识符
+ * @author tomsun28
+ * @date 2021/12/1 21:30
+ */
+@Data
+@Builder
+public class CacheIdentifier {
+
+    private String ip;
+
+    private String port;
+
+    private String username;
+
+    private String password;
+
+}

+ 211 - 0
collector/src/main/java/com/usthe/collector/collect/common/cache/CommonCache.java

@@ -0,0 +1,211 @@
+package com.usthe.collector.collect.common.cache;
+
+
+import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * lru cache 对连接对象进行缓存
+ * @author tomsun28
+ * @date 2021-12-10 23:17
+ */
+@Slf4j
+public class CommonCache {
+
+    /**
+     * 默认缓存时间 30minute
+     */
+    private static final long DEFAULT_CACHE_TIMEOUT = 30 * 60 * 1000L;
+
+    /**
+     * 默认最大缓存数量
+     */
+    private static final int DEFAULT_MAX_CAPACITY = 10000;
+
+    /**
+     * cacheTime数组大小
+     */
+    private static final int CACHE_TIME_LENGTH = 2;
+
+    /**
+     * 存储对象的数据过期时间点
+     */
+    private Map<Object, Long[]> timeoutMap;
+
+    /**
+     * 存储缓存对象
+     */
+    private ConcurrentLinkedHashMap<Object, Object> cacheMap;
+
+    /**
+     * 过期数据清理线程池
+     */
+    private ThreadPoolExecutor cleanTimeoutExecutor;
+
+    private CommonCache() { init();}
+
+    /**
+     * 初始化 cache
+     */
+    private void init() {
+        // 初始化lru hashmap
+        cacheMap = new ConcurrentLinkedHashMap
+                .Builder<>()
+                .maximumWeightedCapacity(DEFAULT_MAX_CAPACITY)
+                .listener((key, value) -> {
+                    timeoutMap.remove(key);
+                    if (value instanceof CacheCloseable) {
+                        ((CacheCloseable)value).close();
+                    }
+                    log.info("lru cache discard key: {}, value: {}.", key, value);
+                }).build();
+
+        // 初始化时间纪录map
+        timeoutMap = new ConcurrentHashMap<>(DEFAULT_MAX_CAPACITY >> 6);
+
+        // 初始化过期数据清理线程池
+        cleanTimeoutExecutor = new ThreadPoolExecutor(1, 1,
+                1, TimeUnit.SECONDS,
+                new ArrayBlockingQueue<>(1), r -> new Thread("lru-cache-timeout-cleaner"),
+                new ThreadPoolExecutor.DiscardOldestPolicy());
+
+        // 初始化可用性探测定位任务,每次探测间隔时间为20分钟
+        ScheduledThreadPoolExecutor scheduledExecutor =  new ScheduledThreadPoolExecutor(1,
+                r -> new Thread(r, "lru-cache-available-detector"));
+        scheduledExecutor.scheduleWithFixedDelay(this::detectCacheAvailable,
+                2,20, TimeUnit.MINUTES);
+    }
+
+    /**
+     * 探测所有可探测的缓存对象的可用性,清除不可用和过期对象
+     */
+    private void detectCacheAvailable() {
+        try {
+            cacheMap.forEach((key, value) -> {
+                // 先判断是否过期
+                Long[] cacheTime = timeoutMap.get(key);
+                long currentTime = System.currentTimeMillis();
+                if (cacheTime == null || cacheTime.length != CACHE_TIME_LENGTH
+                        || cacheTime[0] + cacheTime[1] < currentTime) {
+                    cacheMap.remove(key);
+                    timeoutMap.remove(key);
+                    if (value instanceof CacheCloseable) {
+                        ((CacheCloseable)value).close();
+                    }
+
+                }
+            });
+        } catch (Exception e) {
+            log.error("detect cache available error: {}.", e.getMessage(), e);
+        }
+    }
+
+    /**
+     * 清理过期线程
+     */
+    private void cleanTimeoutCache() {
+        try {
+            cacheMap.forEach((key, value) -> {
+                // index 0 is startTime, 1 is timeDiff
+                Long[] cacheTime = timeoutMap.get(key);
+                long currentTime = System.currentTimeMillis();
+                if (cacheTime == null || cacheTime.length != CACHE_TIME_LENGTH) {
+                    timeoutMap.put(key, new Long[]{currentTime, DEFAULT_CACHE_TIMEOUT});
+                } else if (cacheTime[0] + cacheTime[1] < currentTime) {
+                    // 过期了 discard 关闭这个cache的资源
+                    timeoutMap.remove(key);
+                    cacheMap.remove(key);
+                    if (value instanceof CacheCloseable) {
+                        ((CacheCloseable)value).close();
+                    }
+                }
+            });
+        } catch (Exception e) {
+            log.error("clean timeout cache error: {}.", e.getMessage(), e);
+        }
+    }
+
+    /**
+     * 新增或更新cache
+     * @param key 存储对象key
+     * @param value 存储对象
+     * @param timeDiff 缓存对象保存时间 millis
+     */
+    public void addCache(Object key, Object value, Long timeDiff) {
+        if (timeDiff == null) {
+            timeDiff = DEFAULT_CACHE_TIMEOUT;
+        }
+        cacheMap.put(key, value);
+        timeoutMap.put(key, new Long[]{System.currentTimeMillis(), timeDiff});
+        cleanTimeoutExecutor.execute(() -> {
+            try {
+                cleanTimeoutCache();
+                Thread.sleep(10 * 1000);
+            } catch (InterruptedException e) {
+                log.error(e.getMessage(), e);
+            }
+        });
+    }
+
+    /**
+     * 根据缓存key获取缓存对象
+     * @param key key
+     * @param refreshCache 是否刷新命中的缓存的存活时间 true是,false否
+     * @return 缓存对象
+     */
+    public Optional<Object> getCache(Object key, boolean refreshCache) {
+        Long[] cacheTime = timeoutMap.get(key);
+        if (cacheTime == null || cacheTime.length != CACHE_TIME_LENGTH) {
+            return Optional.empty();
+        }
+        if (cacheTime[0] + cacheTime[1] < System.currentTimeMillis()) {
+            timeoutMap.remove(key);
+            cacheMap.remove(key);
+            return Optional.empty();
+        }
+        Object value = cacheMap.get(key);
+        if (value == null) {
+            cacheMap.remove(key);
+            timeoutMap.remove(key);
+        } else if (refreshCache) {
+            cacheTime[0] = System.currentTimeMillis();
+            timeoutMap.put(key, cacheTime);
+        }
+        return Optional.ofNullable(value);
+    }
+
+    /**
+     * 根据缓存key删除缓存对象
+     * @param key key
+     */
+    public void removeCache(Object key) {
+        timeoutMap.remove(key);
+        cacheMap.remove(key);
+    }
+
+    /**
+     * 获取缓存实例
+     * @return cache
+     */
+    public static CommonCache getInstance() {
+        return SingleInstance.INSTANCE;
+    }
+
+    /**
+     * 静态内部类
+     */
+    private static class SingleInstance {
+        /**
+         * 单例
+         */
+        private static final CommonCache INSTANCE= new CommonCache();
+    }
+}

+ 40 - 0
collector/src/main/java/com/usthe/collector/collect/common/cache/JdbcConnect.java

@@ -0,0 +1,40 @@
+package com.usthe.collector.collect.common.cache;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.sql.Connection;
+
+/**
+ * @author tomsun28
+ * @date 2022/1/1 21:24
+ */
+@Slf4j
+public class JdbcConnect implements CacheCloseable {
+
+    private Connection connection;
+
+    public JdbcConnect(Connection connection) {
+        this.connection = connection;
+    }
+
+    @Override
+    public void close() {
+        try {
+            if (connection != null) {
+                connection.close();
+            }
+        } catch (Exception e) {
+            log.error("close jdbc connect error: {}", e.getMessage());
+        }
+    }
+
+    @Override
+    protected void finalize() throws Throwable {
+        close();
+        super.finalize();
+    }
+
+    public Connection getConnection() {
+        return connection;
+    }
+}

+ 140 - 0
collector/src/main/java/com/usthe/collector/collect/common/http/CommonHttpClient.java

@@ -0,0 +1,140 @@
+package com.usthe.collector.collect.common.http;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.config.Registry;
+import org.apache.http.config.RegistryBuilder;
+import org.apache.http.conn.socket.ConnectionSocketFactory;
+import org.apache.http.conn.socket.PlainConnectionSocketFactory;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+import org.apache.http.ssl.SSLContexts;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * 统一的http客户端连接池
+ * @author tomsun28
+ * @date 2021/12/30 21:23
+ */
+@Slf4j
+public class CommonHttpClient {
+
+    private static CloseableHttpClient httpClient;
+
+    private static PoolingHttpClientConnectionManager connectionManager;
+
+    /**
+     * 此连接池所能提供的最大连接数
+     */
+    private final static int MAX_TOTAL_CONNECTIONS = 50000;
+
+    /**
+     * 每个路由所能分配的最大连接数
+     */
+    private final static int MAX_PER_ROUTE_CONNECTIONS = 80;
+
+    /**
+     * 从连接池中获取连接的默认超时时间 4秒
+     */
+    private final static int REQUIRE_CONNECT_TIMEOUT = 4000;
+
+    /**
+     * 双端建立连接超时时间 4秒
+     */
+    private final static int CONNECT_TIMEOUT = 4000;
+
+    /**
+     * socketReadTimeout 响应tcp报文的最大间隔超时时间
+     */
+    private final static int SOCKET_TIMEOUT = 60000;
+
+    /**
+     * 空闲连接免检的有效时间,被重用的空闲连接若超过此时间,需检查此连接的可用性
+     */
+    private final static int INACTIVITY_VALIDATED_TIME = 10000;
+
+    /**
+     * ssl版本
+     */
+    private final static String[] SUPPORTED_SSL = {"TLSv1","TLSv1.1","TLSv1.2","SSLv3"};
+
+    static {
+        try {
+            // 初始化ssl上下文
+            SSLContext sslContext = SSLContexts.createDefault();
+            X509TrustManager x509TrustManager = new X509TrustManager() {
+                @Override
+                public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException { }
+                @Override
+                public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException { }
+                @Override
+                public X509Certificate[] getAcceptedIssuers() { return null; }
+            };
+            sslContext.init(null, new TrustManager[]{x509TrustManager}, null);
+            // 设置支持的ssl版本
+            SSLConnectionSocketFactory sslFactory = new SSLConnectionSocketFactory(sslContext, SUPPORTED_SSL, null, new NoopHostnameVerifier());
+            // 注册 http https
+            Registry<ConnectionSocketFactory> registry = RegistryBuilder.<ConnectionSocketFactory>create()
+                    .register("http", PlainConnectionSocketFactory.INSTANCE)
+                    .register("https", sslFactory)
+                    .build();
+            // 网络请求默认配置
+            RequestConfig requestConfig = RequestConfig.custom()
+                    // 从连接池获取连接超时时间
+                    .setConnectionRequestTimeout(REQUIRE_CONNECT_TIMEOUT)
+                    // 和对端新连接建立时间,三次握手时间
+                    .setConnectTimeout(CONNECT_TIMEOUT)
+                    // 数据传输最大响应间隔时间
+                    .setSocketTimeout(SOCKET_TIMEOUT)
+                    // 遇到301 302不自动重定向跳转
+                    .setRedirectsEnabled(false)
+                    .build();
+            // 连接池
+            connectionManager = new PoolingHttpClientConnectionManager(registry);
+            connectionManager.setMaxTotal(MAX_TOTAL_CONNECTIONS);
+            connectionManager.setDefaultMaxPerRoute(MAX_PER_ROUTE_CONNECTIONS);
+            connectionManager.setValidateAfterInactivity(INACTIVITY_VALIDATED_TIME);
+            // 构造单例 httpClient
+            httpClient = HttpClients.custom()
+                    .setConnectionManager(connectionManager)
+                    .setDefaultRequestConfig(requestConfig)
+                    // 定期清理不可用过期连接
+                    .evictExpiredConnections()
+                    // 定期清理可用但空闲的连接
+                    .evictIdleConnections(100, TimeUnit.SECONDS)
+                    .build();
+            // 构造连接清理器
+            Thread connectCleaner = new Thread(() -> {
+                while (Thread.currentThread().isInterrupted()) {
+                    try {
+                        Thread.sleep(30000);
+                        connectionManager.closeExpiredConnections();
+                        connectionManager.closeIdleConnections(100, TimeUnit.SECONDS);
+                    } catch (InterruptedException e) {
+                    }
+                }
+            });
+            connectCleaner.setName("HttpConnectCleaner");
+            connectCleaner.setDaemon(true);
+            connectCleaner.start();
+        } catch (Exception e) {
+        }
+    }
+
+    public static CloseableHttpClient getHttpClient() {
+        return httpClient;
+    }
+
+    public static PoolingHttpClientConnectionManager getConnectionManager() {
+        return connectionManager;
+    }
+}