DelayQueue实现支付系统异步通知

2018-03-10 20:54:41
959次阅读
0个评论
支付系统的异步通知实现可以参考支付宝的异步通知,每个订单的异步通知实行分频率发送:10m 20m 30m 40m 50m 1h,具体异步通知频率可根据业务需求做相应调整。通过java的DelayQueue来实现支付系统的异步通知功能。
 
支付系统异步通知的需求:
1、需要按照既有频率发送异步通知给调用方;
2、回调成功则停止异步通知;
3、回调失败,先判断是否超出既定频次,超出则停止发送,否则按照既有频率继续发送异步通知;
 

下面就通过DelayQueue来实现支付系统异步通知


/** 
 * delay键值对 
 * @author lh 
 * @version 2.0 
 * @since 2017-06-23 
 * 
 * @param <K> 
 * @param <V> 
 */  
public class Pair<K, V> {  
      
    private K first;  
  
    private V second;  
      
    public Pair() {}  
      
    public Pair(K first, V second) {  
        this.first = first;  
        this.second = second;  
    }  
  
    public K getFirst() {  
        return first;  
    }  
  
    public void setFirst(K first) {  
        this.first = first;  
    }  
  
    public V getSecond() {  
        return second;  
    }  
  
    public void setSecond(V second) {  
        this.second = second;  
    }  
      
      
}
import java.util.concurrent.Delayed;  
import java.util.concurrent.TimeUnit;  
import java.util.concurrent.atomic.AtomicLong;  
  
/** 
 * Delayed接口的实现类 
 *  
 * <p> 内部实现了getDelay()和compareTo()方法,分别用来获取延迟时间和按两个任务的延迟时间进行排序</p> 
 *  
 * @author lh 
 * @version 2.0 
 * @since 2017-06-23 
 * 
 * @param <T> 
 */  
public class DelayItem<T> implements Delayed {  
  
    private static final long NANO_ORIGIN = System.nanoTime();  
  
    final static long now() {  
        return System.nanoTime() - NANO_ORIGIN;  
    }  
  
    private static final AtomicLong sequencer = new AtomicLong(0);  
  
    private final long sequenceNumber;  
  
    private final long time;  
  
    private final T item;  
  
    public DelayItem(T submit, long timeout) {  
        this.time = now() + timeout;  
        this.item = submit;  
        this.sequenceNumber = sequencer.getAndIncrement();  
    }  
  
    public T getItem() {  
        return this.item;  
    }  
  
    public long getDelay(TimeUnit unit) {  
        long d = unit.convert(time - now(), TimeUnit.NANOSECONDS);  
        return d;  
    }  
  
    public int compareTo(Delayed other) {  
        if (other == this)   
            return 0;  
        if (other instanceof DelayItem) {  
            DelayItem<?> x = (DelayItem<?>) other;  
            long diff = time - x.time;  
            if (diff < 0)  
                return -1;  
            else if (diff > 0)  
                return 1;  
            else if (sequenceNumber < x.sequenceNumber)  
                return -1;  
            else  
                return 1;  
        }  
        long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS));  
        return (d == 0) ? 0 : ((d < 0) ? -1 : 1);  
    }  
}
/** 
 * 响应报文 
 * @author lh 
 * @version 2.0 
 * @since 2017-06-23 
 * 
 */  
public class RetMessage {  
  
    /** 
     * 回调地址 
     */  
    private String url;  
    /** 
     * 报文 
     */  
    private String reqData;  
    /** 
     * 已重试次数 
     */  
    private int times;  
    /** 
     * 是否成功 
     */  
    private boolean success;  
      
      
    public RetMessage(String url, String reqData) {  
        super();  
        this.url = url;  
        this.reqData = reqData;  
        this.times = 1;  
        this.success = false;  
    }  
      
    public RetMessage(String url, String reqData, int times, boolean success) {  
        super();  
        this.url = url;  
        this.reqData = reqData;  
        this.times = times;  
        this.success = success;  
    }  
  
    public String getUrl() {  
        return url;  
    }  
    public void setUrl(String url) {  
        this.url = url;  
    }  
    public String getReqData() {  
        return reqData;  
    }  
    public void setReqData(String reqData) {  
        this.reqData = reqData;  
    }  
    public int getTimes() {  
        return times;  
    }  
    public void setTimes(int times) {  
        this.times = times;  
    }  
    public boolean isSuccess() {  
        return success;  
    }  
    public void setSuccess(boolean success) {  
        this.success = success;  
    }  
  
}
import java.util.HashMap;  
import java.util.concurrent.DelayQueue;  
import java.util.concurrent.TimeUnit;  
  
import org.slf4j.LoggerFactory;  
  
  
/** 
 * 延迟队列发送通知 
 * <p> 
 * 说明:一共5次 第一次不成功等待10分钟,第二次20分钟...50分钟 
 * </p> 
 *  
 * @author lh 
 * @version 2.0 
 * @since 2017-06-23 
 * 
 */  
public class Task {  
  
    private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(Task.class);  
  
    //时间单元:十分钟  
    private static final long TIME_UNIT = 10;  
    //返回结果  
    private static final String RES_SUCCESS = "success";  
      
    private static Task instance = new Task();  
  
    public static Task getInstance() {  
        return instance;  
    }  
  
    // DelayQueue队列没有大小限制,因此向队列插数据不会阻塞  
    // DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。否则线程阻塞  
    private static DelayQueue<DelayItem<Pair<String, RetMessage>>> queue = new DelayQueue<DelayItem<Pair<String, RetMessage>>>();  
  
    private Thread taskThread;  
  
    private Task() {  
        taskThread = new Thread(new Runnable() {  
            public void run() {  
                execute();  
            }  
        });  
        taskThread.setName("Task Thread");  
        taskThread.start();  
    }  
  
    private void execute() {  
        for (;;) {  
            try {  
                DelayItem<Pair<String, RetMessage>> delayItem = queue.take();  
                if (delayItem != null) {  
                    // 到期处理  
                    Pair<String, RetMessage> pair = delayItem.getItem();  
                    RetMessage msg = pair.getSecond();  
                    if (!msg.isSuccess() && msg.getTimes() <= 5) {  
                        HashMap<String, String> paramMap = new HashMap<String, String>();  
                        paramMap.put("reqData", msg.getReqData());  
                        try {  
                            String httpResult = HttpsUtil.getInstance().doPostRetString(msg.getUrl(), null, paramMap);  
  
                            LOGGER.info("第{}次异步回调,返回结果{},返回参数:{},响应结果:{}", msg.getTimes(), httpResult,  
                                    paramMap.get("reqData"), RES_SUCCESS.equals(httpResult));  
                            if (!RES_SUCCESS.equals(httpResult)) {  
                                msg.setTimes(msg.getTimes() + 1);  
                                msg.setSuccess(false);  
                                Task.getInstance().put(pair.getFirst(), msg);  
                            }  
                            // TODO 如果需要入库,请在此操作  
  
                        } catch (Exception e) {  
                            LOGGER.warn(e.getMessage(), e);  
                        }  
                    }  
  
                }  
            } catch (InterruptedException e) {  
                LOGGER.warn(e.getMessage(), e);  
                break;  
            }  
        }  
    }  
  
    /** 
     * 添加通知对象 
     *  
     * @param key 
     *            唯一性key值,建议为:merchantNo + orderNo 
     * @param msg 
     *            响应报文 
     */  
    public void put(String key, RetMessage msg) {  
        if (queue.contains(key)) {  
            queue.remove(key);  
        }  
  
        long nanoTime = TIME_UNIT + TimeUnit.NANOSECONDS.convert((msg.getTimes() -1) * TIME_UNIT, TimeUnit.MINUTES);  
        queue.put(new DelayItem<Pair<String, RetMessage>>(new Pair<String, RetMessage>(key, msg), nanoTime));  
    }  
  
    public static void main(String[] args) throws Exception {  
        String orderNo = System.currentTimeMillis()+"";  
        RetMessage msg = new RetMessage("www.baidu.com", "a=1&b=2");  
        Task.getInstance().put(orderNo, msg);  
    }  
  
}
里面用到了HttpsUtil工具类,需要先引入httpclient所需的jar


<dependency>  
    <groupId>org.apache.httpcomponents</groupId>  
    <artifactId>httpclient</artifactId>  
    <version>4.4.1</version>  
</dependency>  
  
<dependency>  
    <groupId>org.apache.httpcomponents</groupId>  
    <artifactId>httpmime</artifactId>  
    <version>4.4.1</version>  
</dependency>


import java.io.IOException;  
import java.io.InputStream;  
import java.io.InputStreamReader;  
import java.net.URISyntaxException;  
import java.nio.charset.Charset;  
import java.security.KeyManagementException;  
import java.security.KeyStore;  
import java.security.KeyStoreException;  
import java.security.NoSuchAlgorithmException;  
import java.security.cert.CertificateException;  
import java.security.cert.X509Certificate;  
import java.util.HashMap;  
import java.util.LinkedList;  
import java.util.List;  
import java.util.Map;  
import java.util.Map.Entry;  
import java.util.Set;  
  
import javax.net.ssl.SSLContext;  
  
import org.apache.commons.logging.Log;  
import org.apache.commons.logging.LogFactory;  
import org.apache.http.HttpException;  
import org.apache.http.HttpResponse;  
import org.apache.http.NameValuePair;  
import org.apache.http.client.ClientProtocolException;  
import org.apache.http.client.HttpClient;  
import org.apache.http.client.config.RequestConfig;  
import org.apache.http.client.entity.UrlEncodedFormEntity;  
import org.apache.http.client.methods.HttpGet;  
import org.apache.http.client.methods.HttpPost;  
import org.apache.http.client.utils.URIBuilder;  
import org.apache.http.config.ConnectionConfig;  
import org.apache.http.config.Registry;  
import org.apache.http.config.RegistryBuilder;  
import org.apache.http.config.SocketConfig;  
import org.apache.http.conn.socket.ConnectionSocketFactory;  
import org.apache.http.conn.socket.LayeredConnectionSocketFactory;  
import org.apache.http.conn.socket.PlainConnectionSocketFactory;  
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;  
import org.apache.http.conn.ssl.SSLContexts;  
import org.apache.http.conn.ssl.TrustStrategy;  
import org.apache.http.cookie.Cookie;  
import org.apache.http.cookie.CookieOrigin;  
import org.apache.http.entity.mime.FormBodyPart;  
import org.apache.http.entity.mime.HttpMultipartMode;  
import org.apache.http.entity.mime.MultipartEntityBuilder;  
import org.apache.http.impl.client.BasicCookieStore;  
import org.apache.http.impl.client.HttpClientBuilder;  
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;  
import org.apache.http.impl.cookie.BasicClientCookie;  
import org.apache.http.impl.cookie.BestMatchSpec;  
import org.apache.http.message.BasicNameValuePair;  
import org.slf4j.Logger;  
import org.slf4j.LoggerFactory;  
  
class AnyTrustStrategy implements TrustStrategy {  
    @Override  
    public boolean isTrusted(X509Certificate[] chain, String authType) throws CertificateException {  
        return true;  
    }  
  
}  
  
public class HttpsUtil {  
    private static final Logger logger = LoggerFactory.getLogger(HttpsUtil.class);  
  
    private static final Log log = LogFactory.getLog(HttpsUtil.class);  
  
    private static int bufferSize = 1024;  
  
    private static final int CONNECT_TIMEOUT = 6 * 1000;  
    private static final int REQUEST_TIMEOUT = 3 * 1000;  
  
    private static volatile HttpsUtil instance;  
  
    private ConnectionConfig connConfig;  
  
    private SocketConfig socketConfig;  
  
    private ConnectionSocketFactory plainSF;  
  
    private KeyStore trustStore;  
  
    private SSLContext sslContext;  
  
    private LayeredConnectionSocketFactory sslSF;  
  
    private Registry<ConnectionSocketFactory> registry;  
  
    private PoolingHttpClientConnectionManager connManager;  
  
    private volatile HttpClient client;  
  
    private volatile BasicCookieStore cookieStore;  
  
    public static String defaultEncoding = "utf-8";  
  
    private static List<NameValuePair> paramsConverter(Map<String, String> params) {  
        List<NameValuePair> nvps = new LinkedList<NameValuePair>();  
        Set<Entry<String, String>> paramsSet = params.entrySet();  
        for (Entry<String, String> paramEntry : paramsSet) {  
            nvps.add(new BasicNameValuePair(paramEntry.getKey(), paramEntry.getValue()));  
        }  
        return nvps;  
    }  
  
    public static String readStream(InputStream in, String encoding) {  
        if (in == null) {  
            return null;  
        }  
        try {  
            InputStreamReader inReader = null;  
            if (encoding == null) {  
                inReader = new InputStreamReader(in, defaultEncoding);  
            } else {  
                inReader = new InputStreamReader(in, encoding);  
            }  
            char[] buffer = new char[bufferSize];  
            int readLen = 0;  
            StringBuffer sb = new StringBuffer();  
            while ((readLen = inReader.read(buffer)) != -1) {  
                sb.append(buffer, 0, readLen);  
            }  
            inReader.close();  
            return sb.toString();  
        } catch (IOException e) {  
            log.error("读取返回内容出错", e);  
        }  
        return null;  
    }  
  
    private HttpsUtil() {  
        // 设置连接参数  
        connConfig = ConnectionConfig.custom().setCharset(Charset.forName(defaultEncoding)).build();  
        socketConfig = SocketConfig.custom().setSoTimeout(100000).build();  
        RegistryBuilder<ConnectionSocketFactory> registryBuilder = RegistryBuilder.<ConnectionSocketFactory>create();  
        plainSF = new PlainConnectionSocketFactory();  
        registryBuilder.register("http", plainSF);  
        // 指定信任密钥存储对象和连接套接字工厂  
        try {  
            trustStore = KeyStore.getInstance(KeyStore.getDefaultType());  
            sslContext = SSLContexts.custom().useTLS().loadTrustMaterial(trustStore, new AnyTrustStrategy()).build();  
            sslSF = new SSLConnectionSocketFactory(sslContext, SSLConnectionSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER);  
            registryBuilder.register("https", sslSF);  
        } catch (KeyStoreException e) {  
            throw new RuntimeException(e);  
        } catch (KeyManagementException e) {  
            throw new RuntimeException(e);  
        } catch (NoSuchAlgorithmException e) {  
            throw new RuntimeException(e);  
        }  
        registry = registryBuilder.build();  
        // 设置连接管理器  
        connManager = new PoolingHttpClientConnectionManager(registry);  
        connManager.setDefaultConnectionConfig(connConfig);  
        connManager.setDefaultSocketConfig(socketConfig);  
        // 指定cookie存储对象  
        cookieStore = new BasicCookieStore();  
        // 构建客户端  
        client = HttpClientBuilder.create().setDefaultCookieStore(cookieStore).setConnectionManager(connManager)  
                .build();  
    }  
  
    public static HttpsUtil getInstance() {  
        synchronized (HttpsUtil.class) {  
            if (HttpsUtil.instance == null) {  
                instance = new HttpsUtil();  
            }  
            return instance;  
        }  
    }  
  
    public InputStream doGet(String url) throws URISyntaxException, ClientProtocolException, IOException {  
        HttpResponse response = this.doGet(url, null);  
        return response != null ? response.getEntity().getContent() : null;  
    }  
  
    public String doGetForString(String url) throws URISyntaxException, ClientProtocolException, IOException {  
        return HttpsUtil.readStream(this.doGet(url), null);  
    }  
  
    public InputStream doGetForStream(String url, Map<String, String> queryParams)  
            throws URISyntaxException, ClientProtocolException, IOException {  
        HttpResponse response = this.doGet(url, queryParams);  
        return response != null ? response.getEntity().getContent() : null;  
    }  
  
    public String doGetForString(String url, Map<String, String> queryParams)  
            throws URISyntaxException, ClientProtocolException, IOException {  
        return HttpsUtil.readStream(this.doGetForStream(url, queryParams), null);  
    }  
  
    /** 
     * 基本的Get请求 
     *  
     * @param url 
     *            请求url 
     * @param queryParams 
     *            请求头的查询参数 
     * @return 
     * @throws URISyntaxException 
     * @throws IOException 
     * @throws ClientProtocolException 
     */  
    public HttpResponse doGet(String url, Map<String, String> queryParams)  
            throws URISyntaxException, ClientProtocolException, IOException {  
        HttpGet gm = new HttpGet();  
        URIBuilder builder = new URIBuilder(url);  
        // 填入查询参数  
        if (queryParams != null && !queryParams.isEmpty()) {  
            builder.setParameters(HttpsUtil.paramsConverter(queryParams));  
        }  
        gm.setURI(builder.build());  
        return client.execute(gm);  
    }  
  
    public InputStream doPostForStream(String url, Map<String, String> queryParams)  
            throws URISyntaxException, ClientProtocolException, IOException {  
        HttpResponse response = this.doPost(url, queryParams, null);  
        return response != null ? response.getEntity().getContent() : null;  
    }  
  
    public String doPostForString(String url, Map<String, String> queryParams)  
            throws URISyntaxException, ClientProtocolException, IOException {  
        return HttpsUtil.readStream(this.doPostForStream(url, queryParams), null);  
    }  
  
    public InputStream doPostForStream(String url, Map<String, String> queryParams, Map<String, String> formParams)  
            throws URISyntaxException, ClientProtocolException, IOException {  
        HttpResponse response = this.doPost(url, queryParams, formParams);  
        logger.info("异步响应:{},{}", response.getEntity().getContent(), response.getStatusLine());  
        return response != null ? response.getEntity().getContent() : null;  
    }  
  
    public String doPostRetString(String url, Map<String, String> queryParams, Map<String, String> formParams)  
            throws URISyntaxException, ClientProtocolException, IOException {  
        return HttpsUtil.readStream(this.doPostForStream(url, queryParams, formParams), null);  
    }  
  
    /** 
     * 基本的Post请求 
     *  
     * @param url 
     *            请求url 
     * @param queryParams 
     *            请求头的查询参数 
     * @param formParams 
     *            post表单的参数 
     * @return 
     * @throws URISyntaxException 
     * @throws IOException 
     * @throws ClientProtocolException 
     */  
    public HttpResponse doPost(String url, Map<String, String> queryParams, Map<String, String> formParams)  
            throws URISyntaxException, ClientProtocolException, IOException {  
        HttpPost pm = new HttpPost();  
        URIBuilder builder = new URIBuilder(url);  
        // 填入查询参数  
        if (queryParams != null && !queryParams.isEmpty()) {  
            builder.setParameters(HttpsUtil.paramsConverter(queryParams));  
        }  
        pm.setURI(builder.build());  
        // 填入表单参数  
        if (formParams != null && !formParams.isEmpty()) {  
            pm.setEntity(new UrlEncodedFormEntity(HttpsUtil.paramsConverter(formParams), defaultEncoding));  
        }  
  
        // pm.setConfig(getRequestConfig());  
  
        return client.execute(pm);  
    }  
  
    /** 
     * 多块Post请求 
     *  
     * @param url 
     *            请求url 
     * @param queryParams 
     *            请求头的查询参数 
     * @param formParts 
     *            post表单的参数,支持字符串-文件(FilePart)和字符串-字符串(StringPart)形式的参数 
     * @throws URISyntaxException 
     * @throws ClientProtocolException 
     * @throws HttpException 
     * @throws IOException 
     */  
    public HttpResponse multipartPost(String url, Map<String, String> queryParams, List<FormBodyPart> formParts)  
            throws URISyntaxException, ClientProtocolException, IOException {  
        HttpPost pm = new HttpPost();  
        URIBuilder builder = new URIBuilder(url);  
        // 填入查询参数  
        if (queryParams != null && !queryParams.isEmpty()) {  
            builder.setParameters(HttpsUtil.paramsConverter(queryParams));  
        }  
        pm.setURI(builder.build());  
        // 填入表单参数  
        if (formParts != null && !formParts.isEmpty()) {  
            MultipartEntityBuilder entityBuilder = MultipartEntityBuilder.create();  
            entityBuilder = entityBuilder.setMode(HttpMultipartMode.BROWSER_COMPATIBLE);  
            for (FormBodyPart formPart : formParts) {  
                entityBuilder = entityBuilder.addPart(formPart.getName(), formPart.getBody());  
            }  
            pm.setEntity(entityBuilder.build());  
        }  
        return client.execute(pm);  
    }  
  
    /** 
     * 获取当前Http客户端状态中的Cookie 
     *  
     * @param domain 
     *            作用域 
     * @param port 
     *            端口 传null 默认80 
     * @param path 
     *            Cookie路径 传null 默认"/" 
     * @param useSecure 
     *            Cookie是否采用安全机制 传null 默认false 
     * @return 
     */  
    public Map<String, Cookie> getCookie(String domain, Integer port, String path, Boolean useSecure) {  
        if (domain == null) {  
            return null;  
        }  
        if (port == null) {  
            port = 80;  
        }  
        if (path == null) {  
            path = "/";  
        }  
        if (useSecure == null) {  
            useSecure = false;  
        }  
        List<Cookie> cookies = cookieStore.getCookies();  
        if (cookies == null || cookies.isEmpty()) {  
            return null;  
        }  
  
        CookieOrigin origin = new CookieOrigin(domain, port, path, useSecure);  
        BestMatchSpec cookieSpec = new BestMatchSpec();  
        Map<String, Cookie> retVal = new HashMap<String, Cookie>();  
        for (Cookie cookie : cookies) {  
            if (cookieSpec.match(cookie, origin)) {  
                retVal.put(cookie.getName(), cookie);  
            }  
        }  
        return retVal;  
    }  
  
    /** 
     * 批量设置Cookie 
     *  
     * @param cookies 
     *            cookie键值对图 
     * @param domain 
     *            作用域 不可为空 
     * @param path 
     *            路径 传null默认为"/" 
     * @param useSecure 
     *            是否使用安全机制 传null 默认为false 
     * @return 是否成功设置cookie 
     */  
    public boolean setCookie(Map<String, String> cookies, String domain, String path, Boolean useSecure) {  
        synchronized (cookieStore) {  
            if (domain == null) {  
                return false;  
            }  
            if (path == null) {  
                path = "/";  
            }  
            if (useSecure == null) {  
                useSecure = false;  
            }  
            if (cookies == null || cookies.isEmpty()) {  
                return true;  
            }  
            Set<Entry<String, String>> set = cookies.entrySet();  
            String key = null;  
            String value = null;  
            for (Entry<String, String> entry : set) {  
                key = entry.getKey();  
                if (key == null || key.isEmpty() || value == null || value.isEmpty()) {  
                    throw new IllegalArgumentException("cookies key and value both can not be empty");  
                }  
                BasicClientCookie cookie = new BasicClientCookie(key, value);  
                cookie.setDomain(domain);  
                cookie.setPath(path);  
                cookie.setSecure(useSecure);  
                cookieStore.addCookie(cookie);  
            }  
            return true;  
        }  
    }  
  
    /** 
     * 设置单个Cookie 
     *  
     * @param key 
     *            Cookie键 
     * @param value 
     *            Cookie值 
     * @param domain 
     *            作用域 不可为空 
     * @param path 
     *            路径 传null默认为"/" 
     * @param useSecure 
     *            是否使用安全机制 传null 默认为false 
     * @return 是否成功设置cookie 
     */  
    public boolean setCookie(String key, String value, String domain, String path, Boolean useSecure) {  
        Map<String, String> cookies = new HashMap<String, String>();  
        cookies.put(key, value);  
        return setCookie(cookies, domain, path, useSecure);  
    }  
  
    public RequestConfig getRequestConfig() {  
        return RequestConfig.custom().setConnectionRequestTimeout(REQUEST_TIMEOUT) // 设置从connect  
                                                                                    // Manager获取Connection  
                                                                                    // 超时时间,单位毫秒  
                .setConnectTimeout(CONNECT_TIMEOUT) // 设置连接超时时间,单位毫秒  
                .setSocketTimeout(CONNECT_TIMEOUT) // 请求获取数据的超时时间,单位毫秒  
                .build();  
    }  
  
}








收藏00

登录 后评论。没有帐号? 注册 一个。