异步回调的应用

异步回调的应用

场景:在配置摄像头算法中,需要修改本地数据库,修改缓存,和线程池任务,同时需要调用硬件接口修改摄像头算法配置信息,但是硬件的接口响应时间过长,我们需要更具调用算法同步的返回结果来决定本地是否需要修改数据库和缓存以及任务池任务,具体流程图如下:

这个时候可以采用异步回调+手动事物来实现。

伪代码实现

定义一个接口

/**
 * @Description
 * @Author wuhuaming
 * @Date 2023/10/23
 */
public interface Worker {

    String active(Object o);

}

监听器接口:

/**
 * @Description
 * @Author wuhuaming
 * @Date 2023/10/23
 */
public interface Listener {
    Object result(Object o);

}

定义包装器:

/**
 * @Description
 * @Author wuhuaming
 * @Date 2023/10/23
 */
@Data
public class Wrapper {

    private Object param;

    private Worker worker;

    private Listener listener;

    private final LinkedBlockingQueue<Object> result;

    public Wrapper(int size) {
        this.result = new LinkedBlockingQueue<>(size);
    }
}

工具类:

package com.example.spring.boot.test.async;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.example.spring.boot.test.async.interfaces.Worker;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * @Description
 * @Author wuhuaming
 * @Date 2023/11/1
 */
public class AsyncUtil {

    /**
     * 这里为了演示效果,通过方法实现的接口,真实场景可以替换为实际的实现类
     * @return
     */
    public static Worker newWorker(){
        return new Worker() {
            @Override
            public Object active(Object o) {
                try{
                    System.out.println(Thread.currentThread().getName() + ": " + "开始调用硬件接口");
                    Thread.sleep(5 * 1000);
                }catch (Exception e){
                    e.printStackTrace();
                }
                Map<String, Object> map = new HashMap<>();
                map.put("code", 200);
                map.put("message", "xxxx错误");
                map.put("data", false);
                return JSON.toJSONString(map);
            }
        };
    }

    public static Wrapper execute(Wrapper wrapper){
        new Thread(()->{
            Worker worker = wrapper.getWorker();
            Object workerResult = worker.active(wrapper.getParam());
            System.out.println(Thread.currentThread().getName() + ": " + "算法返回结果:" + workerResult);
            Object listenerResult = wrapper.getListener().result(workerResult);
            try {
                LinkedBlockingQueue<Object> queue = wrapper.getResult();
                if(listenerResult != null){
                    queue.put(listenerResult);
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }).start();
        return wrapper;
    }

}

测试类:

/**
 * @Description
 * @Author wuhuaming
 * @Date 2023/10/23
 */
public class CallBackTest {

    public static void main(String[] args) throws InterruptedException {
        System.out.println(Thread.currentThread().getName() + ": " + "参数校验");
        Worker worker = AsyncUtil.newWorker();
        Wrapper wrapper = new Wrapper(64);
        wrapper.setWorker(worker);
        wrapper.setParam("test param");
        AsyncUtil.execute(wrapper).setListener(new Listener() {
            @Override
            public Object result(Object o) {
                System.out.println(Thread.currentThread().getName() + ": " + "开始执行回调函数");
                JSONObject jsonObject = JSONObject.parseObject((String) o);
                if(jsonObject.containsKey("code") && jsonObject.getInteger("code") == 200){
                    if(jsonObject.containsKey("data") && jsonObject.getBoolean("data")){
                        return "算法调用成功";
                    }
                }
                // TODO 回滚事务
                System.out.println(Thread.currentThread().getName() + ": " + "事务已回滚");
                return "算法调用失败";
            }
        });
        syncSaveData();
        Object take = wrapper.getResult().take();
        System.out.println(Thread.currentThread().getName() + ": " + "算法返回结果:" + take);
    }

    /**
     * 在同一个事务中
     */
    private static void syncSaveData(){
        System.out.println(Thread.currentThread().getName() + ": " + "更新缓存");
        System.out.println(Thread.currentThread().getName() + ": " + "更新数据库");
        System.out.println(Thread.currentThread().getName() + ": " + "更新线程池任务");
    }

}

结果:

main: 参数校验
main: 更新缓存
main: 更新数据库
main: 更新线程池任务
Thread-0: 开始调用硬件接口
Thread-0: 算法返回结果:{"code":200,"data":false,"message":"xxxx错误"}
Thread-0: 开始执行回调函数
Thread-0: 事务已回滚
main: 算法返回结果:算法调用失败

CompletableFuture类实现

测试类伪代码

public class TestCompletableFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println(Thread.currentThread() +"入参校验");
        CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + " 调用算法接口");
            return 1;
        });

        CompletableFuture<Integer> cf2 = cf1.thenApplyAsync((result) -> {
            System.out.println(Thread.currentThread() + " cf2 do something....");
            result += 2;
            return result;
        });

        System.out.println(Thread.currentThread() +"同步缓存");
        System.out.println(Thread.currentThread() +"修改数据库数据");

        //等待回调执行完成
        Integer result = cf2.get();
        System.out.println(Thread.currentThread() +"cf2结果->" + result);
        if(result == 3){
            System.out.println(Thread.currentThread() +"算法返回失败,回滚事物");
        }else{
            System.out.println(Thread.currentThread() +"算法返回成功");
        }

    }

结果:

Thread[main,5,main]入参校验
Thread[ForkJoinPool.commonPool-worker-9,5,main] 调用算法接口
Thread[main,5,main]同步缓存
Thread[main,5,main]修改数据库数据
Thread[ForkJoinPool.commonPool-worker-9,5,main] cf2 do something....
Thread[main,5,main]cf2结果->3
Thread[main,5,main]算法返回失败,回滚事物

关于CompletableFuture 的更多用法请参考:https://blog.csdn.net/zsx_xiaoxin/article/details/123898171

0%