解决多线程安全问题-线程未及时中断,导致工作线程重复启动/突然不工作的问题

线程未及时中断,导致工作线程重复启动/突然不工作的问题

场景:

  1. 后端有一批文件,需要解析后,通过websocket向前端推送,
  2. 要求当websocket连接数为零的时候停止推送服务,
  3. 如果有人连接就一直循环推送,
  4. 每次推送需要间隔随机的时间模拟真实场景
  5. 期望多个文件解析和推送并行
  6. 每个客户端可以自定义接收那些文件的坐标

分析:

当调用目标列表的时候就加载文件列表,并建立连接,开启文件解析推送线程。

问题:

  1. 最开始拿到文件以后,直接 new Thread(()->{}).start(); ,但是如果是多个人都连接了ws调用了解析文件接口,那么针对同一个文件就有多个线程在解析推送。解决方案:增加初始化标志位
  2. 即便是增加了初始化标志位后,和状态标志位以后,当前端刷新页面是,连接断开后立马又连接上,这个时间差短;按照预想的逻辑是将status标志位改为false,线程会退出,但是实际上线程别没有退出,或则是有些线程并没有退出,导致了同一个文件就有多个线程在解析推送。后面详细分析。

集成websocket

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

编写websocketconfig配置类

@Configuration
@EnableWebSocket
@ConditionalOnWebApplication
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
  
		@Bean
    public SpringWebSocketConfigurator springWebSocketConfigurator() {
        return new SpringWebSocketConfigurator();
    }
}

类SpringWebSocketConfigurator

public class SpringWebSocketConfigurator extends ServerEndpointConfig.Configurator implements ApplicationContextAware {

    private static volatile BeanFactory applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    /**
     * 重写该方法是为了将endpoint的实例从Spring的上下文中获取
     *
     * @param clazz
     * @param <T>
     * @return
     * @throws InstantiationException
     */
    @Override
    public <T> T getEndpointInstance(Class<T> clazz) throws InstantiationException {
        return applicationContext.getBean(clazz);
    }
}

WebSocketServer类

@Slf4j
@ServerEndpoint(value = "/ws/asset", configurator = SpringWebSocketConfigurator.class)
@Component
@RequiredArgsConstructor
public class WebSocketServer {

    private static final AtomicInteger ONLINE_COUNT = new AtomicInteger(0);

    // concurrent包的线程安全Set,用来存放每个客户端对应的Session对象。
    private static final CopyOnWriteArraySet<Session> SESSION_SET = new CopyOnWriteArraySet<>();

    private static final ConcurrentHashMap<String, Set<Long>> SESSION_SET_CONCURRENT_HASH_MAP = new ConcurrentHashMap<>();

    @Getter
    private final ReentrantLock lock = new ReentrantLock();

    @Getter
    private final Condition statusCondition = lock.newCondition();

    @OnOpen
    public void onOpen(Session session) {
        SESSION_SET.add(session);
        // 在线数加1
        int cnt = ONLINE_COUNT.incrementAndGet();
        log.info("有连接加入,当前连接数为:{}", cnt);

        String sessionId = session.getId();
        RestResponse<String> response = RestResponse.ok(sessionId);
        sendMessage(session, JSON.toJSONString(response));
    }

    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose(Session session) {
        SESSION_SET.remove(session);
        SESSION_SET_CONCURRENT_HASH_MAP.remove(session.getId());
        int cnt = ONLINE_COUNT.decrementAndGet();
        if (cnt == 0) {
            lock.lock();
            try {
                // 通知监听线程执行任务停止
                statusCondition.signalAll();
            } finally {
                lock.unlock();
            }
        }
        log.info("有连接关闭,当前连接数为:{}", cnt);
    }

    /**
     * 收到客户端消息后调用的方法
     *
     * @param message 客户端发送过来的消息
     */
    @OnMessage
    public void onMessage(String message, Session session) throws IOException {
        log.info("来自客户端的消息:{}", message);
//        BroadCastInfo(session.getId() + message);
    }

    /**
     * 出现错误
     *
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("发生错误:{},Session ID:{}", error.getMessage(), session.getId());
    }

    /**
     * 发送消息,实践表明,每次浏览器刷新,session会发生变化。
     *
     * @param session
     * @param message
     */
    public void sendMessage(Session session, String message) {
        try {
            if (session.isOpen()) {
                session.getBasicRemote().sendText(message);
            } else {
                log.debug("session id: {}, is not open", session.getId());
            }
        } catch (Exception e) {
            log.error("发送消息出错:{}", e.getMessage());
        }
    }

    /**
     * 群发消息
     *
     * @param message
     * @throws IOException
     */
    public void broadCastInfo(String message) throws IOException {
        for (Session session : SESSION_SET) {
            if (session.isOpen()) {
                sendMessage(session, message);
            }
        }
    }

    /**
     * 指定Session发送消息
     *
     * @param sessionId
     * @param message
     * @throws IOException
     */
    public void sendMessage(String message, String sessionId) throws IOException {
        Session session = null;
        for (Session s : SESSION_SET) {
            if (s.getId().equals(sessionId)) {
                session = s;
                break;
            }
        }
        if (session != null) {
            sendMessage(session, message);
        } else {
            log.warn("没有找到你指定ID的会话:{}", sessionId);
        }
    }

    public void sendWsMessage(String message, Long fileId) {
        Iterator<Map.Entry<String, Set<Long>>> iterator = SESSION_SET_CONCURRENT_HASH_MAP.entrySet().iterator();
        while (iterator.hasNext()) {
            Map.Entry<String, Set<Long>> entry = iterator.next();
            if (entry.getValue().contains(fileId)) {
                try {
                    sendMessage(message, entry.getKey());
                } catch (IOException e) {
                    log.error("发送websocket失败:", e);
                }
            }
        }
    }

    /**
     * 新增监听文件ID
     * 这里虽然使用了线程安全的集合,但是临界区里面涉及到了读和写的操作,所以依然需要上锁,保证线程安全
     * @param sessionId
     * @param fileIdList
     */
    public synchronized void insertFileListener(String sessionId, List<Long> fileIdList) {
        try {
            Set<Long> fileIdSet = SESSION_SET_CONCURRENT_HASH_MAP.get(sessionId);
            if (fileIdSet == null) {
                fileIdSet = new HashSet<>();
            }
            fileIdSet.addAll(fileIdList);
            SESSION_SET_CONCURRENT_HASH_MAP.put(sessionId, fileIdSet);
        } catch (Exception e) {
            log.error("insertFileListener error:", e);
        }
    }

    /**
     * 停止监听
     *
     * @param sessionId
     * @param fileIdList
     */
    public synchronized void removeFileListener(String sessionId, List<Long> fileIdList) {
        try {
            Set<Long> fileIdSet = SESSION_SET_CONCURRENT_HASH_MAP.get(sessionId);
            if (fileIdSet != null) {
                fileIdList.forEach(fileIdSet::remove);
                SESSION_SET_CONCURRENT_HASH_MAP.put(sessionId, fileIdSet);
            }
        } catch (Exception e) {
            log.error("removeFileListener error:", e);
        }
    }

}
  1. @ServerEndpoint(value = "/ws/asset", configurator = SpringWebSocketConfigurator.class) 主要是解决websocketserver默认不是单例的导致后面在使用某些功能的时候出错了,这个也是无意间发现的,解决方案参考博客
  2. private final Condition statusCondition = lock.newCondition();条件变量用于通知别的线程,当连接人数为零的时候停止线程解析任务
  3. 因为将其改为了单例模式,所以成员变量都需要通过static + 原子类来保证线程安全
    1. private static final AtomicInteger ONLINE_COUNT = new AtomicInteger(0); :记录连接人数
    2. private static final ConcurrentHashMap<String, Set<Long>> SESSION_SET_CONCURRENT_HASH_MAP = new ConcurrentHashMap<>(); :记录每个客户端接收那些文件的坐标
  4. insertFileListenerremoveFileListener两个方法为什么还要加 synchronized关键字呢?虽然ConcurrentHashMap是线程安全的集合,他们的**get()方法和put()**方法单独使用时都是线程安全的,但是组合在一起的时候就不一定了。所以还是需要加锁。保证线程安全。

前端控制器

@Tag(name = "轨迹控制器")
@RestController
@RequestMapping("trajectory")
@RequiredArgsConstructor
public class TrajectoryController {

    private final TrajectoryService trajectoryService;

    @Operation(summary = "目标列表")
    @PostMapping("/targetList")
    public RestResponse<List<TargetListOutput>> targetList(){
        List<TargetListOutput> targetListOutputList = trajectoryService.targetList();
        return RestResponse.ok(targetListOutputList);
    }

    @Operation(summary = "ws新增文件ID")
    @PostMapping("/insertFileId")
    public RestResponse<Boolean> insertFileId(@RequestBody InsertFileIdInput input) {
        Boolean result = trajectoryService.insertFileId(input);
        return RestResponse.ok(result);
    }

    @Operation(summary = "ws删除文件ID")
    @PostMapping("/removeFileId")
    public RestResponse<Boolean> removeFileId(@RequestBody InsertFileIdInput input) {
        Boolean result = trajectoryService.removeFileId(input);
        return RestResponse.ok(result);
    }

}

TrajectoryServiceImpl

@Slf4j
@Service
@RequiredArgsConstructor
public class TrajectoryServiceImpl implements TrajectoryService {

    private final ParseFileUtil parseFileUtil;

    private final FileContainer fileContainer;

    private final WebSocketServer webSocketServer;

    private final IFileRecordConverter fileRecordConverter;

    private final RealTimeAnalysisUtil realTimeAnalysisUtil;

    private final HttpClientUtil httpClientUtil;

    @Value("${target.file.path:}")
    private String filePath;

    

    @Override
    public Boolean insertFileId(InsertFileIdInput input) {
        if (input.getFileId() != null) {
            webSocketServer.insertFileListener(input.getSessionId(), Collections.singletonList(input.getFileId()));
        }
        return true;
    }

    @Override
    public Boolean removeFileId(InsertFileIdInput input) {
        if (input.getFileId() != null) {
            webSocketServer.removeFileListener(input.getSessionId(), Collections.singletonList(input.getFileId()));
        }
        return true;
    }

    @Override
    public List<TargetListOutput> targetList() {

        List<TargetListOutput> outputList;
        // 从缓存中获取
        List<FileContainer.ShareFileDto> shareFileList = fileContainer.getShareFileList();
        if (CollectionUtils.isEmpty(shareFileList)) {
            List<File> fileList = parseFileUtil.getFileList(filePath);
            for (File file : fileList) {
                FileContainer.ShareFileDto output = new FileContainer.ShareFileDto();
                output.setFileId(parseFileUtil.getFileId());
                output.setFileName(file.getName());
                output.setFilePath(file.getAbsolutePath());
                shareFileList.add(output);
            }
            // 存入缓存中
            fileContainer.putShareFileDto(shareFileList);
        }
        outputList = fileRecordConverter.toTargetListOutput(shareFileList);
        // 开始推送
        realTimeAnalysisUtil.start();
        return outputList;
    }
}
  1. List<FileContainer.ShareFileDto> shareFileList = fileContainer.getShareFileList();这里采用了本地缓存,将文件列表在一次访问的时候将其加入缓存中,当然这里没有做线程安全的保护。

FileContainer类

@Component
public class FileContainer {

    private final Set<ShareFileDto> SHARE_FILE_CACHE = new HashSet<>();

    /**
     * key: 文件ID
     * value: 文件列表
     */
    private final Cache<Long, FileParseDto> FILE_PARSE_CACHE = CacheBuilder
            .newBuilder()
            //设置缓存最大容量
            .maximumSize(128)
            .build();


    /**
     * key: session
     * value: 文件列表
     */
    private final Cache<String, List<FileContext>> FILE_CACHE = CacheBuilder.newBuilder()
            //设置缓存最大容量
            .maximumSize(128)
            //过期策略,写入30分钟后过期
            .expireAfterWrite(30, TimeUnit.MINUTES)
            .build();


    public List<FileContext> getList(String session) {
        try {
            return FILE_CACHE.get(session, ArrayList::new);
        } catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    public void put(String session, List<FileContext> fileContextList) {
        FILE_CACHE.put(session, fileContextList);
    }


    public FileParseDto getFileParseDto(Long fileId) {
        try {
            return FILE_PARSE_CACHE.get(fileId, () -> null);
        } catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    public void putFileParseDto(Long fileId, FileParseDto fileParseDto) {
        FILE_PARSE_CACHE.put(fileId, fileParseDto);
    }


    public List<ShareFileDto> getShareFileList() {
        return new ArrayList<>(SHARE_FILE_CACHE);
    }

    public void putShareFileDto(List<ShareFileDto> shareFileDtos) {
        SHARE_FILE_CACHE.addAll(shareFileDtos);
    }



    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class FileContext {

        private Long fileId;

        private String fileHeader;

        private List<ParseFileListOutput> dataList;

        private String fileName;

    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class FileParseDto {

        private Long fileId;

        private String fileHeader;

        private List<ParseFileListOutput> dataList;

    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class ShareFileDto {

        private Long fileId;

        private String filePath;

        private String fileName;

    }
}
  1. realTimeAnalysisUtil.start(); 这里是核心点

RealTimeAnalysisUtil 工具类

@Slf4j
@Component
@RequiredArgsConstructor
public class RealTimeAnalysisUtil {

    private final FileContainer fileContainer;

    private final ParseFileUtil parseFileUtil;

    private final WebSocketServer webSocketServer;

    private final HttpClientUtil httpClientUtil;

    /**
     * 犹豫模式标志位:保证线程不会启动多次
     */
    private volatile boolean init;


    /**
     * 用于线程里面循环的标志位,通过这个标志位能判断线程是否结束:true-运行;false-停止
     */
    @Getter
    private volatile boolean status;

    /**
     * 保证共享变量 init 的线程安全
     */
    private final ReentrantLock lock = new ReentrantLock();

    /**
     * 存储文件解析的线程
     */
    private final List<Thread> threadList = new ArrayList<>();


    /**
     * 开始往前端推送文件解析数据
     */
    public void start() {

        this.setStatus(true);
        log.debug("开始解析文件:status: {}; init: {};", this.status, this.init);
        List<FileContainer.ShareFileDto> shareFileList = fileContainer.getShareFileList();
        // 犹豫模式保证线程不会启动多次
        lock.lock();
        try {
            if (init) {
                return;
            }
            this.init = true;
        } finally {
            lock.unlock();
        }

        this.listenerStop();

        if (!CollectionUtils.isEmpty(shareFileList)) {
            log.debug("开始推送文件解析数据");
            for (FileContainer.ShareFileDto shareFileDto : shareFileList) {
                RealTimePushRunnable realTimePushRunnable = new RealTimePushRunnable(shareFileDto, parseFileUtil, webSocketServer, this, httpClientUtil);
                Thread thread = new Thread(realTimePushRunnable, shareFileDto.getFileName());
                threadList.add(thread);
                thread.start();
            }
        }
    }

    /**
     * 开启新的线程监听连接人数为0的情况
     */
    public void listenerStop() {
        new Thread(() -> {
            ReentrantLock serverLock = webSocketServer.getLock();
            serverLock.lock();
            try {
                log.debug("监听任务停现成启动......");
                // 条件变量:阻塞住,等待满足条件时唤醒线程继续工作
                webSocketServer.getStatusCondition().await();
                this.stop();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                serverLock.unlock();
            }
        }, "listenerStop").start();
    }

    /**
     * 停止后:需要修改标志位的值
     */
    public void stop() {
        lock.lock();
        try {
            this.init = false;
            this.setStatus(false);
            // 终止所有线程
            log.debug("开始停止所有解析任务");
            for (Thread thread : threadList) {
                thread.interrupt();
            }
            // 清除集合
            threadList.clear();
            log.debug("所有解析任务已终止");
        } finally {
            lock.unlock();
        }
    }

    public void setStatus(boolean status) {
        lock.lock();
        try {
            this.status = status;
        } finally {
            lock.unlock();
        }
    }
}

重点关注 stop() 方法

最开始的时候:我直接修改了两个标志的值为false,但是并没有解决重复解析文件的问题?

public void stop() {
    lock.lock();
    try {
        this.init = false;
        this.setStatus(false);
        log.debug("所有解析任务已终止");
    } finally {
        lock.unlock();
    }
}

分析:如果前端刷新页面,断开连接——> 建立连接 ——> 获取目标列表(启动分析线程);重复刷新两三次后。

由于线程里面 SleepUtil.sleep(); 每个线程睡眠的时间单位是秒,所以导致了,线程在睡眠期间,还没有执行

// 监控状态是否停止
if (!realTimeAnalysisUtil.isStatus()) {
    break;
}

这行代码。此时 status 的值变化过程: true -> false -> true ,当线程睡眠结束以后在执行上面的代码时,这个时候status 已经是 true了,所以线程并没有退出,还在继续运行。

问题:怎样在执行stop方法的时候能保证线程的正常结束呢?

通过改造,将启动的线程放入成员变量集合中,通过执行 interrupt() 方法打断睡眠中的线程。修改后的代码:

/**
     * 停止后:需要修改标志位的值
     */
    public void stop() {
        lock.lock();
        try {
            this.init = false;
            this.setStatus(false);
            // 终止所有线程
            log.debug("开始停止所有解析任务");
            for (Thread thread : threadList) {
                thread.interrupt();
            }
            // 清除集合
            threadList.clear();
            log.debug("所有解析任务已终止");
        } finally {
            lock.unlock();
        }
    }

问题:为什么有些线程突然不推送了呢?

以前的代码:

@Slf4j
@Component
@RequiredArgsConstructor
public class RealTimeAnalysisUtil {

    private final FileContainer fileContainer;

    private final ParseFileUtil parseFileUtil;

    private final WebSocketServer webSocketServer;

    private volatile boolean init;

    private final Object lock = new Object();

    /**
     * 开始往前端推送文件解析数据
     */
    public void start() {
        synchronized (lock) {
            log.debug("开始解析文件:running: {}; init: {}", webSocketServer.getStatus(), this.init);
            if (webSocketServer.getStatus() && !init) {
                List<FileContainer.ShareFileDto> shareFileList = fileContainer.getShareFileList();
                // 初始化状态设置为true
                this.init = true;
                if (!CollectionUtils.isEmpty(shareFileList)) {
                    log.debug("开始推送文件解析数据");
                    List<RealTimePushThread> realTimePushThreadList = new ArrayList<>();
                    for (FileContainer.ShareFileDto shareFileDto : shareFileList) {
                        RealTimePushThread realTimePushThread = new RealTimePushThread(shareFileDto, parseFileUtil, webSocketServer);
                        realTimePushThread.start();
                        realTimePushThreadList.add(realTimePushThread);
                    }
                    new Thread(() -> {
                        log.debug("等待所有线程任务结束");
                        for (RealTimePushThread realTimePushThread : realTimePushThreadList) {
                            try {
                                realTimePushThread.join();
                            } catch (InterruptedException e) {
                                throw new RuntimeException(e);
                            }
                        }
                        this.init = false;
                    }).start();
                }
            }
        }
    }

}

代码中的: webSocketServer.getStatus() 是在 连接数为0的时候修改为false的

可以看见,之前是通过对象锁的方式即 synchronized 关键字的方式对共享变量 init进行修改的,原本是没有问题的;前端刷新页面,断开连接——> 建立连接 ——> 获取目标列表(启动分析线程);但是通过上面的分析,有些线程可能刚好在 status 状态改为false的时候执行到了线面这个代码块:然后就结束了,还有些线程并没有结束,还在继续运行,因为join();方法是阻塞的;下面 this.init = false; 这行代码就不会执行,这样就会导致,前端刷新页面的时候会导致某些文件突然就推送不了了。

// 监控状态是否停止
if (!realTimeAnalysisUtil.isStatus()) {
    break;
}

RealTimePushRunnable

@Slf4j
@RequiredArgsConstructor
public class RealTimePushRunnable implements Runnable {

    private final FileContainer.ShareFileDto shareFileDto;

    private final ParseFileUtil parseFileUtil;

    private final WebSocketServer webSocketServer;

    private final RealTimeAnalysisUtil realTimeAnalysisUtil;

    private final HttpClientUtil httpClientUtil;


    @Override
    public void run() {
        log.debug("开始文件分析:{}", shareFileDto.getFileName());
        MultipartFile file = parseFileUtil.fileToMultipartFile(new File(shareFileDto.getFilePath()));
        List<ParseFileListOutput> outputList = parseFileUtil.parseFileList(file, new StringBuilder());
        List<ParseFileListOutput> sortList = Optional.ofNullable(outputList).orElse(new ArrayList<>()).stream()
                .filter(item -> item.getDatePoint() != null)
                .sorted(Comparator.comparing(ParseFileListOutput::getDatePoint))
                .collect(Collectors.toList());
        while (realTimeAnalysisUtil.isStatus()) {
            for (ParseFileListOutput output : sortList) {
                output.setFileId(shareFileDto.getFileId());
                // 设置原来时间点
                output.setOriginalTime(output.getDatePoint());
                output.setDatePoint(new Date());
                WebSocketResponse<ParseFileListOutput> response = WebSocketResponse.ok(WebSocketMessageConstants.REAL_TIME_PUSH_DATE_POINT, output);
                webSocketServer.sendWsMessage(JSON.toJSONString(response), shareFileDto.getFileId());
                // TODO 推送算法识别
//                IntentionAnalysisOutput intentionAnalysis = httpClientUtil.realTimeIntentionAnalysis(shareFileDto.getFileId(), output);
                // TODO 发送结果给前端
//                WebSocketResponse<IntentionAnalysisOutput> result = WebSocketResponse.ok(WebSocketMessageConstants.REAL_TIME_PUSH_INTENTION_ANALYSIS, intentionAnalysis);
//                webSocketServer.sendWsMessage(JSON.toJSONString(result), shareFileDto.getFileId());

                SleepUtil.sleep();
                // 监控状态是否停止
                if (!realTimeAnalysisUtil.isStatus()) {
                    break;
                }
            }
        }
    }
}
  1. while (realTimeAnalysisUtil.isStatus()) : 通过标志位决定是否继续运行任务

  2. // 监控状态是否停止
    if (!realTimeAnalysisUtil.isStatus()) {
        break;
    }
  3. SleepUtil.sleep(); 让线程随机等待n秒后再推送数据;

0%