菜单
登录注册
欢 迎
登录
自动登录
忘记密码?
新朋友
注册
注册
老朋友
登录
笔记内容为空!
TA的笔记树 >>
java异步http请求工具类(org.asynchttpclient)
Java工具类
[TOC] #### 简述 ``` 本工具类是对 org.asynchttpclient 异步http请求的封装,支持: 1、同步请求(阻塞接收响应数据) 2、异步请求-Future 3、异步请求-监听回调 ``` ``` 使用场景1:http请求某接口,需要等待响应数据 使用场景2:http批量请求N个接口,对响应数据集中处理(Future) 使用场景3:http请求接口,不需阻塞,只当有响应数据时自动调用自定义回调函数 使用场景4:http请求只管异步请求,不管响应数据 ``` #### maven依赖 ```
org.asynchttpclient
async-http-client
2.12.2
commons-io
commons-io
2.8.0
org.apache.commons
commons-lang3
3.11
org.slf4j
slf4j-log4j12
1.7.25
``` #### AsyncHttpClientComponent > 针对`org.asynchttpclient.AsyncHttpClient`的最基层封装(只提供client) ``` import io.netty.channel.EventLoopGroup; import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import org.asynchttpclient.AsyncHttpClient; import org.asynchttpclient.AsyncHttpClientConfig; import org.asynchttpclient.DefaultAsyncHttpClient; import org.asynchttpclient.DefaultAsyncHttpClientConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; /** * Created by vanki on 2018/7/19 18:10. */ public class AsyncHttpClientComponent { private static final Logger LOG = LoggerFactory.getLogger(AsyncHttpClientComponent.class); private static final int eventLoopGroupThreadNum = 1; private static final int connectTimeoutMilliseconds = 5000; private static final int requestTimeoutMilliseconds = 300000; private static final int readTimeoutTimeoutMilliseconds = 300000; private EventLoopGroup eventLoopGroup; private AsyncHttpClient asyncHttpClient; private AsyncHttpClientComponent() { init(); } public static AsyncHttpClientComponent getInstance() { return ClientComponentHolder.clientComponent; } public AsyncHttpClient client() { return this.asyncHttpClient; } private synchronized void init() { if (getInstance() != null) { return; } this.asyncHttpClient = new DefaultAsyncHttpClient(initConfig()); Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown)); LOG.warn("启动asyncHttpClient服务!"); } private AsyncHttpClientConfig initConfig() { String osName = System.getProperty("os.name"); LOG.warn("初始化asyncHttpClient配置,检测到环境为:" + osName); if ("Linux".equalsIgnoreCase(osName)) { eventLoopGroup = new EpollEventLoopGroup(); } else { eventLoopGroup = new NioEventLoopGroup(eventLoopGroupThreadNum); } return new DefaultAsyncHttpClientConfig .Builder() .setConnectTimeout(connectTimeoutMilliseconds) .setRequestTimeout(requestTimeoutMilliseconds) .setReadTimeout(readTimeoutTimeoutMilliseconds) .setEventLoopGroup(eventLoopGroup) // 设置信任所有ssl .setUseInsecureTrustManager(true) .build(); } public void shutdown() { if (this.asyncHttpClient != null) { try { this.asyncHttpClient.close(); eventLoopGroup.shutdownGracefully(); LOG.warn("关闭asyncHttpClient服务!"); } catch (IOException exception) { LOG.error("Ops!", exception); } } } private static class ClientComponentHolder { private static final AsyncHttpClientComponent clientComponent = new AsyncHttpClientComponent(); } } ``` #### AsyncHttpClientUtil > 依赖于`AsyncHttpClientComponent`,可以此进行扩充方法,由于本人用`POST、GET`较多,所以,该两类方法提供还是较为全面,可仿照扩充 ``` import org.apache.commons.lang3.tuple.Pair; import org.asynchttpclient.BoundRequestBuilder; import org.asynchttpclient.ListenableFuture; import org.asynchttpclient.Request; import org.asynchttpclient.Response; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.Consumer; /** * Http 请求工具类 * * Created by vanki on 2018/7/19 18:40. */ public class AsyncHttpClientUtil { private static final Logger LOG = LoggerFactory.getLogger(AsyncHttpClientUtil.class); private static final ExecutorService resultHandlerExecutor = new ThreadPoolExecutor(0, 64, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); public static Pair
doGet(String url) throws ExecutionException, InterruptedException { return doGet(url, null); } /** * GET阻塞请求,直到响应数据 * * @param url 请求地址 * @param preRequest 预请求信息,如:参数、头、请求体等 * * @return 响应数据 */ public static Pair
doGet(String url, Consumer
preRequest) throws ExecutionException, InterruptedException { Pair
> result = doGetInFuture(url, preRequest); return Pair.of(result.getKey(), futureGet(result.getValue())); } /** * GET异步请求,有响应数据时会自动调用resultHandler进入处理 * * @param url 请求地址 * @param resultHandler 响应数据回调处理函数 */ public static void doGetWithResultHandler(String url, BiConsumer
resultHandler) { doGetWithResultHandler(url, null, resultHandler); } /** * GET异步请求,有响应数据时会自动调用resultHandler进入处理 * * @param url 请求地址 * @param preRequest 预请求信息,如:参数、头、请求体等 * @param resultHandler 响应数据回调处理函数 */ public static void doGetWithResultHandler(String url, Consumer
preRequest, BiConsumer
resultHandler) { Pair
> result = doGetInFuture(url, preRequest); addListener(resultHandler, result.getKey(), result.getValue()); } /** * GET异步请求,可批量发送请求,然后对Future集中处理 * * @param url 请求地址 * * @return */ public static ListenableFuture
doGetInFuture(String url) { return doGetInFuture(url, null).getValue(); } /** * GET异步请求,可批量发送请求,然后对Future集中处理 * * @param url 请求地址 * @param preRequest 预请求信息,如:参数、头、请求体等 * * @return key: 请求参数,val:响应数据引用,可通过get()方法获取响应数据(抛异常时为null) */ public static Pair
> doGetInFuture(String url, Consumer
preRequest) { if (url == null) { return null; } BoundRequestBuilder builder = AsyncHttpClientComponent.getInstance().client().prepareGet(url); if (preRequest != null) { preRequest.accept(builder); } return Pair.of(builder.build(), builder.execute()); } /** * POST阻塞请求,直到响应数据 * * @param url 请求地址 * @param preRequest 预请求信息,如:参数、头、请求体等 * * @return 响应数据 */ public static Pair
doPost(String url, Consumer
preRequest) throws ExecutionException, InterruptedException { Pair
> result = doPostInFuture(url, preRequest); return Pair.of(result.getKey(), futureGet(result.getValue())); } /** * POST异步请求,有响应数据时会自动调用resultHandler进入处理 * * @param url 请求地址 * @param preRequest 预请求信息,如:参数、头、请求体等 * @param resultHandler 响应数据回调处理函数 */ public static void doPostWithResultHandler(String url, Consumer
preRequest, BiConsumer
resultHandler) { Pair
> result = doPostInFuture(url, preRequest); addListener(resultHandler, result.getKey(), result.getValue()); } /** * POST异步请求,可批量发送请求,然后对Future集中处理 * * @param url 请求地址 * @param preRequest 预请求信息,如:参数、头、请求体等 * * @return key: 请求参数,val:响应数据引用,可通过get()方法获取响应数据(抛异常时为null) */ public static Pair
> doPostInFuture(String url, Consumer
preRequest) { if (url == null) { return null; } BoundRequestBuilder builder = AsyncHttpClientComponent.getInstance().client().preparePost(url); if (preRequest != null) { preRequest.accept(builder); } return Pair.of(builder.build(), builder.execute()); } /** * POST阻塞请求,直到响应数据 * * @param url 请求地址 * @param preRequest 预请求信息,如:参数、头、请求体等 * * @return 响应数据 */ public static Pair
doPut(String url, Consumer
preRequest) throws ExecutionException, InterruptedException { Pair
> result = doPutInFuture(url, preRequest); return Pair.of(result.getKey(), futureGet(result.getValue())); } /** * POST异步请求,可批量发送请求,然后对Future集中处理 * * @param url 请求地址 * @param preRequest 预请求信息,如:参数、头、请求体等 * * @return key: 请求参数,val:响应数据引用,可通过get()方法获取响应数据(抛异常时为null) */ public static Pair
> doPutInFuture(String url, Consumer
preRequest) { if (url == null) { return null; } BoundRequestBuilder builder = AsyncHttpClientComponent.getInstance().client().preparePut(url); if (preRequest != null) { preRequest.accept(builder); } return Pair.of(builder.build(), builder.execute()); } private static Response futureGet(ListenableFuture
future) throws ExecutionException, InterruptedException { if (future == null) { return null; } return future.get(); } private static void addListener(BiConsumer
resultHandler, Request preRequest, ListenableFuture
future) { if (future == null || resultHandler == null) { return; } future.addListener(() -> { try { resultHandler.accept(preRequest, future.get()); } catch (Exception e) { resultHandler.accept(preRequest, null); LOG.error("", e); } }, resultHandlerExecutor); } } ``` #### 使用示例 ``` public static void main(String[] args) throws ExecutionException, InterruptedException { /* 阻塞 */ Pair
res1 = doGet("https://localhost:8080/test?a=aaa&b=bbb"); System.out.println("响应数据:" + res1.getValue().getResponseBody()); Pair
res2 = doGet("https://localhost:8080/test", builder -> { builder.setHeader("headInfo", "val"); builder.addQueryParam("param", "paramVal"); }); System.out.println("响应数据:" + res2.getValue()); /* 异步回调处理 */ doGetWithResultHandler("https://localhost:8080/test", builder -> { builder.setHeader("headInfo", "val"); builder.addQueryParam("param", "paramVal"); }, (req, res) -> { System.out.println("请求数据:" + req); System.out.println("响应数据:" + res); }); /* 异步Future */ List
> futureList = new ArrayList<>(128); for (int i = 0; i < 100; i++) { futureList.add(doGetInFuture("https://localhost:8080/test?a=aaa&b=bbb")); } // TODO 这里可以做其它事情 futureList.forEach(future -> { // 批量处理响应数据 try { System.out.println("响应数据:" + future.get()); } catch (Exception e) { e.printStackTrace(); } }); } ```
vanki
莫道儒冠误此生,从来诗书不负人。
浏览:
9368
创建:
2018-08-23 00:29:09
更新:
2022-10-25 10:11:52
TA的最新笔记
spring-boot配置redis多数据源
linux源修改(阿里)
python安装postgresql依赖
arthas使用
java基于spring的NamedParameterJdbcTemplate封装的获取sql工具类
Impala添加负载
S3常用使用
redis常用操作
hdfs相关命令
crontab使用
TA的最热笔记
java异步http请求工具类(org.asynchttpclient)
iTerm2主题配置与常用技巧
java基于spring.redisTemplate实现分布式锁工具类
Kotlin + SpringBoot + JPA(Hibernate) + Repository自定义方法
IDEA汉化
Sequel Pro连接mysql8打开数据库报错
centos-Hadoop2.7.3完全分布式搭建(HA)
SpringBoot上传文件报错(The temporary upload location [..] is not valid)
mac常用软件
kotlin对象属性值拷贝工具类