DelayQueue实现支付系统异步通知
2018-03-10 20:54:41
959次阅读
0个评论
支付系统的异步通知实现可以参考支付宝的异步通知,每个订单的异步通知实行分频率发送:10m 20m 30m 40m 50m 1h,具体异步通知频率可根据业务需求做相应调整。通过java的DelayQueue来实现支付系统的异步通知功能。
支付系统异步通知的需求:
1、需要按照既有频率发送异步通知给调用方;
2、回调成功则停止异步通知;
3、回调失败,先判断是否超出既定频次,超出则停止发送,否则按照既有频率继续发送异步通知;
支付系统异步通知的需求:
1、需要按照既有频率发送异步通知给调用方;
2、回调成功则停止异步通知;
3、回调失败,先判断是否超出既定频次,超出则停止发送,否则按照既有频率继续发送异步通知;
下面就通过DelayQueue来实现支付系统异步通知
/**
* delay键值对
* @author lh
* @version 2.0
* @since 2017-06-23
*
* @param <K>
* @param <V>
*/
public class Pair<K, V> {
private K first;
private V second;
public Pair() {}
public Pair(K first, V second) {
this.first = first;
this.second = second;
}
public K getFirst() {
return first;
}
public void setFirst(K first) {
this.first = first;
}
public V getSecond() {
return second;
}
public void setSecond(V second) {
this.second = second;
}
}
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
* Delayed接口的实现类
*
* <p> 内部实现了getDelay()和compareTo()方法,分别用来获取延迟时间和按两个任务的延迟时间进行排序</p>
*
* @author lh
* @version 2.0
* @since 2017-06-23
*
* @param <T>
*/
public class DelayItem<T> implements Delayed {
private static final long NANO_ORIGIN = System.nanoTime();
final static long now() {
return System.nanoTime() - NANO_ORIGIN;
}
private static final AtomicLong sequencer = new AtomicLong(0);
private final long sequenceNumber;
private final long time;
private final T item;
public DelayItem(T submit, long timeout) {
this.time = now() + timeout;
this.item = submit;
this.sequenceNumber = sequencer.getAndIncrement();
}
public T getItem() {
return this.item;
}
public long getDelay(TimeUnit unit) {
long d = unit.convert(time - now(), TimeUnit.NANOSECONDS);
return d;
}
public int compareTo(Delayed other) {
if (other == this)
return 0;
if (other instanceof DelayItem) {
DelayItem<?> x = (DelayItem<?>) other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS));
return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
}
}
/**
* 响应报文
* @author lh
* @version 2.0
* @since 2017-06-23
*
*/
public class RetMessage {
/**
* 回调地址
*/
private String url;
/**
* 报文
*/
private String reqData;
/**
* 已重试次数
*/
private int times;
/**
* 是否成功
*/
private boolean success;
public RetMessage(String url, String reqData) {
super();
this.url = url;
this.reqData = reqData;
this.times = 1;
this.success = false;
}
public RetMessage(String url, String reqData, int times, boolean success) {
super();
this.url = url;
this.reqData = reqData;
this.times = times;
this.success = success;
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public String getReqData() {
return reqData;
}
public void setReqData(String reqData) {
this.reqData = reqData;
}
public int getTimes() {
return times;
}
public void setTimes(int times) {
this.times = times;
}
public boolean isSuccess() {
return success;
}
public void setSuccess(boolean success) {
this.success = success;
}
}
import java.util.HashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
import org.slf4j.LoggerFactory;
/**
* 延迟队列发送通知
* <p>
* 说明:一共5次 第一次不成功等待10分钟,第二次20分钟...50分钟
* </p>
*
* @author lh
* @version 2.0
* @since 2017-06-23
*
*/
public class Task {
private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(Task.class);
//时间单元:十分钟
private static final long TIME_UNIT = 10;
//返回结果
private static final String RES_SUCCESS = "success";
private static Task instance = new Task();
public static Task getInstance() {
return instance;
}
// DelayQueue队列没有大小限制,因此向队列插数据不会阻塞
// DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。否则线程阻塞
private static DelayQueue<DelayItem<Pair<String, RetMessage>>> queue = new DelayQueue<DelayItem<Pair<String, RetMessage>>>();
private Thread taskThread;
private Task() {
taskThread = new Thread(new Runnable() {
public void run() {
execute();
}
});
taskThread.setName("Task Thread");
taskThread.start();
}
private void execute() {
for (;;) {
try {
DelayItem<Pair<String, RetMessage>> delayItem = queue.take();
if (delayItem != null) {
// 到期处理
Pair<String, RetMessage> pair = delayItem.getItem();
RetMessage msg = pair.getSecond();
if (!msg.isSuccess() && msg.getTimes() <= 5) {
HashMap<String, String> paramMap = new HashMap<String, String>();
paramMap.put("reqData", msg.getReqData());
try {
String httpResult = HttpsUtil.getInstance().doPostRetString(msg.getUrl(), null, paramMap);
LOGGER.info("第{}次异步回调,返回结果{},返回参数:{},响应结果:{}", msg.getTimes(), httpResult,
paramMap.get("reqData"), RES_SUCCESS.equals(httpResult));
if (!RES_SUCCESS.equals(httpResult)) {
msg.setTimes(msg.getTimes() + 1);
msg.setSuccess(false);
Task.getInstance().put(pair.getFirst(), msg);
}
// TODO 如果需要入库,请在此操作
} catch (Exception e) {
LOGGER.warn(e.getMessage(), e);
}
}
}
} catch (InterruptedException e) {
LOGGER.warn(e.getMessage(), e);
break;
}
}
}
/**
* 添加通知对象
*
* @param key
* 唯一性key值,建议为:merchantNo + orderNo
* @param msg
* 响应报文
*/
public void put(String key, RetMessage msg) {
if (queue.contains(key)) {
queue.remove(key);
}
long nanoTime = TIME_UNIT + TimeUnit.NANOSECONDS.convert((msg.getTimes() -1) * TIME_UNIT, TimeUnit.MINUTES);
queue.put(new DelayItem<Pair<String, RetMessage>>(new Pair<String, RetMessage>(key, msg), nanoTime));
}
public static void main(String[] args) throws Exception {
String orderNo = System.currentTimeMillis()+"";
RetMessage msg = new RetMessage("www.baidu.com", "a=1&b=2");
Task.getInstance().put(orderNo, msg);
}
}
里面用到了HttpsUtil工具类,需要先引入httpclient所需的jar
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpmime</artifactId>
<version>4.4.1</version>
</dependency>
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URISyntaxException;
import java.nio.charset.Charset;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import javax.net.ssl.SSLContext;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.HttpException;
import org.apache.http.HttpResponse;
import org.apache.http.NameValuePair;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.HttpClient;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.config.ConnectionConfig;
import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.config.SocketConfig;
import org.apache.http.conn.socket.ConnectionSocketFactory;
import org.apache.http.conn.socket.LayeredConnectionSocketFactory;
import org.apache.http.conn.socket.PlainConnectionSocketFactory;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.conn.ssl.SSLContexts;
import org.apache.http.conn.ssl.TrustStrategy;
import org.apache.http.cookie.Cookie;
import org.apache.http.cookie.CookieOrigin;
import org.apache.http.entity.mime.FormBodyPart;
import org.apache.http.entity.mime.HttpMultipartMode;
import org.apache.http.entity.mime.MultipartEntityBuilder;
import org.apache.http.impl.client.BasicCookieStore;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.impl.cookie.BasicClientCookie;
import org.apache.http.impl.cookie.BestMatchSpec;
import org.apache.http.message.BasicNameValuePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class AnyTrustStrategy implements TrustStrategy {
@Override
public boolean isTrusted(X509Certificate[] chain, String authType) throws CertificateException {
return true;
}
}
public class HttpsUtil {
private static final Logger logger = LoggerFactory.getLogger(HttpsUtil.class);
private static final Log log = LogFactory.getLog(HttpsUtil.class);
private static int bufferSize = 1024;
private static final int CONNECT_TIMEOUT = 6 * 1000;
private static final int REQUEST_TIMEOUT = 3 * 1000;
private static volatile HttpsUtil instance;
private ConnectionConfig connConfig;
private SocketConfig socketConfig;
private ConnectionSocketFactory plainSF;
private KeyStore trustStore;
private SSLContext sslContext;
private LayeredConnectionSocketFactory sslSF;
private Registry<ConnectionSocketFactory> registry;
private PoolingHttpClientConnectionManager connManager;
private volatile HttpClient client;
private volatile BasicCookieStore cookieStore;
public static String defaultEncoding = "utf-8";
private static List<NameValuePair> paramsConverter(Map<String, String> params) {
List<NameValuePair> nvps = new LinkedList<NameValuePair>();
Set<Entry<String, String>> paramsSet = params.entrySet();
for (Entry<String, String> paramEntry : paramsSet) {
nvps.add(new BasicNameValuePair(paramEntry.getKey(), paramEntry.getValue()));
}
return nvps;
}
public static String readStream(InputStream in, String encoding) {
if (in == null) {
return null;
}
try {
InputStreamReader inReader = null;
if (encoding == null) {
inReader = new InputStreamReader(in, defaultEncoding);
} else {
inReader = new InputStreamReader(in, encoding);
}
char[] buffer = new char[bufferSize];
int readLen = 0;
StringBuffer sb = new StringBuffer();
while ((readLen = inReader.read(buffer)) != -1) {
sb.append(buffer, 0, readLen);
}
inReader.close();
return sb.toString();
} catch (IOException e) {
log.error("读取返回内容出错", e);
}
return null;
}
private HttpsUtil() {
// 设置连接参数
connConfig = ConnectionConfig.custom().setCharset(Charset.forName(defaultEncoding)).build();
socketConfig = SocketConfig.custom().setSoTimeout(100000).build();
RegistryBuilder<ConnectionSocketFactory> registryBuilder = RegistryBuilder.<ConnectionSocketFactory>create();
plainSF = new PlainConnectionSocketFactory();
registryBuilder.register("http", plainSF);
// 指定信任密钥存储对象和连接套接字工厂
try {
trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
sslContext = SSLContexts.custom().useTLS().loadTrustMaterial(trustStore, new AnyTrustStrategy()).build();
sslSF = new SSLConnectionSocketFactory(sslContext, SSLConnectionSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER);
registryBuilder.register("https", sslSF);
} catch (KeyStoreException e) {
throw new RuntimeException(e);
} catch (KeyManagementException e) {
throw new RuntimeException(e);
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException(e);
}
registry = registryBuilder.build();
// 设置连接管理器
connManager = new PoolingHttpClientConnectionManager(registry);
connManager.setDefaultConnectionConfig(connConfig);
connManager.setDefaultSocketConfig(socketConfig);
// 指定cookie存储对象
cookieStore = new BasicCookieStore();
// 构建客户端
client = HttpClientBuilder.create().setDefaultCookieStore(cookieStore).setConnectionManager(connManager)
.build();
}
public static HttpsUtil getInstance() {
synchronized (HttpsUtil.class) {
if (HttpsUtil.instance == null) {
instance = new HttpsUtil();
}
return instance;
}
}
public InputStream doGet(String url) throws URISyntaxException, ClientProtocolException, IOException {
HttpResponse response = this.doGet(url, null);
return response != null ? response.getEntity().getContent() : null;
}
public String doGetForString(String url) throws URISyntaxException, ClientProtocolException, IOException {
return HttpsUtil.readStream(this.doGet(url), null);
}
public InputStream doGetForStream(String url, Map<String, String> queryParams)
throws URISyntaxException, ClientProtocolException, IOException {
HttpResponse response = this.doGet(url, queryParams);
return response != null ? response.getEntity().getContent() : null;
}
public String doGetForString(String url, Map<String, String> queryParams)
throws URISyntaxException, ClientProtocolException, IOException {
return HttpsUtil.readStream(this.doGetForStream(url, queryParams), null);
}
/**
* 基本的Get请求
*
* @param url
* 请求url
* @param queryParams
* 请求头的查询参数
* @return
* @throws URISyntaxException
* @throws IOException
* @throws ClientProtocolException
*/
public HttpResponse doGet(String url, Map<String, String> queryParams)
throws URISyntaxException, ClientProtocolException, IOException {
HttpGet gm = new HttpGet();
URIBuilder builder = new URIBuilder(url);
// 填入查询参数
if (queryParams != null && !queryParams.isEmpty()) {
builder.setParameters(HttpsUtil.paramsConverter(queryParams));
}
gm.setURI(builder.build());
return client.execute(gm);
}
public InputStream doPostForStream(String url, Map<String, String> queryParams)
throws URISyntaxException, ClientProtocolException, IOException {
HttpResponse response = this.doPost(url, queryParams, null);
return response != null ? response.getEntity().getContent() : null;
}
public String doPostForString(String url, Map<String, String> queryParams)
throws URISyntaxException, ClientProtocolException, IOException {
return HttpsUtil.readStream(this.doPostForStream(url, queryParams), null);
}
public InputStream doPostForStream(String url, Map<String, String> queryParams, Map<String, String> formParams)
throws URISyntaxException, ClientProtocolException, IOException {
HttpResponse response = this.doPost(url, queryParams, formParams);
logger.info("异步响应:{},{}", response.getEntity().getContent(), response.getStatusLine());
return response != null ? response.getEntity().getContent() : null;
}
public String doPostRetString(String url, Map<String, String> queryParams, Map<String, String> formParams)
throws URISyntaxException, ClientProtocolException, IOException {
return HttpsUtil.readStream(this.doPostForStream(url, queryParams, formParams), null);
}
/**
* 基本的Post请求
*
* @param url
* 请求url
* @param queryParams
* 请求头的查询参数
* @param formParams
* post表单的参数
* @return
* @throws URISyntaxException
* @throws IOException
* @throws ClientProtocolException
*/
public HttpResponse doPost(String url, Map<String, String> queryParams, Map<String, String> formParams)
throws URISyntaxException, ClientProtocolException, IOException {
HttpPost pm = new HttpPost();
URIBuilder builder = new URIBuilder(url);
// 填入查询参数
if (queryParams != null && !queryParams.isEmpty()) {
builder.setParameters(HttpsUtil.paramsConverter(queryParams));
}
pm.setURI(builder.build());
// 填入表单参数
if (formParams != null && !formParams.isEmpty()) {
pm.setEntity(new UrlEncodedFormEntity(HttpsUtil.paramsConverter(formParams), defaultEncoding));
}
// pm.setConfig(getRequestConfig());
return client.execute(pm);
}
/**
* 多块Post请求
*
* @param url
* 请求url
* @param queryParams
* 请求头的查询参数
* @param formParts
* post表单的参数,支持字符串-文件(FilePart)和字符串-字符串(StringPart)形式的参数
* @throws URISyntaxException
* @throws ClientProtocolException
* @throws HttpException
* @throws IOException
*/
public HttpResponse multipartPost(String url, Map<String, String> queryParams, List<FormBodyPart> formParts)
throws URISyntaxException, ClientProtocolException, IOException {
HttpPost pm = new HttpPost();
URIBuilder builder = new URIBuilder(url);
// 填入查询参数
if (queryParams != null && !queryParams.isEmpty()) {
builder.setParameters(HttpsUtil.paramsConverter(queryParams));
}
pm.setURI(builder.build());
// 填入表单参数
if (formParts != null && !formParts.isEmpty()) {
MultipartEntityBuilder entityBuilder = MultipartEntityBuilder.create();
entityBuilder = entityBuilder.setMode(HttpMultipartMode.BROWSER_COMPATIBLE);
for (FormBodyPart formPart : formParts) {
entityBuilder = entityBuilder.addPart(formPart.getName(), formPart.getBody());
}
pm.setEntity(entityBuilder.build());
}
return client.execute(pm);
}
/**
* 获取当前Http客户端状态中的Cookie
*
* @param domain
* 作用域
* @param port
* 端口 传null 默认80
* @param path
* Cookie路径 传null 默认"/"
* @param useSecure
* Cookie是否采用安全机制 传null 默认false
* @return
*/
public Map<String, Cookie> getCookie(String domain, Integer port, String path, Boolean useSecure) {
if (domain == null) {
return null;
}
if (port == null) {
port = 80;
}
if (path == null) {
path = "/";
}
if (useSecure == null) {
useSecure = false;
}
List<Cookie> cookies = cookieStore.getCookies();
if (cookies == null || cookies.isEmpty()) {
return null;
}
CookieOrigin origin = new CookieOrigin(domain, port, path, useSecure);
BestMatchSpec cookieSpec = new BestMatchSpec();
Map<String, Cookie> retVal = new HashMap<String, Cookie>();
for (Cookie cookie : cookies) {
if (cookieSpec.match(cookie, origin)) {
retVal.put(cookie.getName(), cookie);
}
}
return retVal;
}
/**
* 批量设置Cookie
*
* @param cookies
* cookie键值对图
* @param domain
* 作用域 不可为空
* @param path
* 路径 传null默认为"/"
* @param useSecure
* 是否使用安全机制 传null 默认为false
* @return 是否成功设置cookie
*/
public boolean setCookie(Map<String, String> cookies, String domain, String path, Boolean useSecure) {
synchronized (cookieStore) {
if (domain == null) {
return false;
}
if (path == null) {
path = "/";
}
if (useSecure == null) {
useSecure = false;
}
if (cookies == null || cookies.isEmpty()) {
return true;
}
Set<Entry<String, String>> set = cookies.entrySet();
String key = null;
String value = null;
for (Entry<String, String> entry : set) {
key = entry.getKey();
if (key == null || key.isEmpty() || value == null || value.isEmpty()) {
throw new IllegalArgumentException("cookies key and value both can not be empty");
}
BasicClientCookie cookie = new BasicClientCookie(key, value);
cookie.setDomain(domain);
cookie.setPath(path);
cookie.setSecure(useSecure);
cookieStore.addCookie(cookie);
}
return true;
}
}
/**
* 设置单个Cookie
*
* @param key
* Cookie键
* @param value
* Cookie值
* @param domain
* 作用域 不可为空
* @param path
* 路径 传null默认为"/"
* @param useSecure
* 是否使用安全机制 传null 默认为false
* @return 是否成功设置cookie
*/
public boolean setCookie(String key, String value, String domain, String path, Boolean useSecure) {
Map<String, String> cookies = new HashMap<String, String>();
cookies.put(key, value);
return setCookie(cookies, domain, path, useSecure);
}
public RequestConfig getRequestConfig() {
return RequestConfig.custom().setConnectionRequestTimeout(REQUEST_TIMEOUT) // 设置从connect
// Manager获取Connection
// 超时时间,单位毫秒
.setConnectTimeout(CONNECT_TIMEOUT) // 设置连接超时时间,单位毫秒
.setSocketTimeout(CONNECT_TIMEOUT) // 请求获取数据的超时时间,单位毫秒
.build();
}
}
00