解决多线程安全问题-线程未及时中断,导致工作线程重复启动/突然不工作的问题
线程未及时中断,导致工作线程重复启动/突然不工作的问题
场景:
- 后端有一批文件,需要解析后,通过websocket向前端推送,
- 要求当websocket连接数为零的时候停止推送服务,
- 如果有人连接就一直循环推送,
- 每次推送需要间隔随机的时间模拟真实场景
- 期望多个文件解析和推送并行
- 每个客户端可以自定义接收那些文件的坐标
分析:
当调用目标列表的时候就加载文件列表,并建立连接,开启文件解析推送线程。
问题:
- 最开始拿到文件以后,直接
new Thread(()->{}).start();
,但是如果是多个人都连接了ws调用了解析文件接口,那么针对同一个文件就有多个线程在解析推送。解决方案:增加初始化标志位- 即便是增加了初始化标志位后,和状态标志位以后,当前端刷新页面是,连接断开后立马又连接上,这个时间差短;按照预想的逻辑是将status标志位改为false,线程会退出,但是实际上线程别没有退出,或则是有些线程并没有退出,导致了同一个文件就有多个线程在解析推送。后面详细分析。
集成websocket
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
编写websocketconfig配置类
@Configuration
@EnableWebSocket
@ConditionalOnWebApplication
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
@Bean
public SpringWebSocketConfigurator springWebSocketConfigurator() {
return new SpringWebSocketConfigurator();
}
}
类SpringWebSocketConfigurator
public class SpringWebSocketConfigurator extends ServerEndpointConfig.Configurator implements ApplicationContextAware {
private static volatile BeanFactory applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
/**
* 重写该方法是为了将endpoint的实例从Spring的上下文中获取
*
* @param clazz
* @param <T>
* @return
* @throws InstantiationException
*/
@Override
public <T> T getEndpointInstance(Class<T> clazz) throws InstantiationException {
return applicationContext.getBean(clazz);
}
}
WebSocketServer类
@Slf4j
@ServerEndpoint(value = "/ws/asset", configurator = SpringWebSocketConfigurator.class)
@Component
@RequiredArgsConstructor
public class WebSocketServer {
private static final AtomicInteger ONLINE_COUNT = new AtomicInteger(0);
// concurrent包的线程安全Set,用来存放每个客户端对应的Session对象。
private static final CopyOnWriteArraySet<Session> SESSION_SET = new CopyOnWriteArraySet<>();
private static final ConcurrentHashMap<String, Set<Long>> SESSION_SET_CONCURRENT_HASH_MAP = new ConcurrentHashMap<>();
@Getter
private final ReentrantLock lock = new ReentrantLock();
@Getter
private final Condition statusCondition = lock.newCondition();
@OnOpen
public void onOpen(Session session) {
SESSION_SET.add(session);
// 在线数加1
int cnt = ONLINE_COUNT.incrementAndGet();
log.info("有连接加入,当前连接数为:{}", cnt);
String sessionId = session.getId();
RestResponse<String> response = RestResponse.ok(sessionId);
sendMessage(session, JSON.toJSONString(response));
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose(Session session) {
SESSION_SET.remove(session);
SESSION_SET_CONCURRENT_HASH_MAP.remove(session.getId());
int cnt = ONLINE_COUNT.decrementAndGet();
if (cnt == 0) {
lock.lock();
try {
// 通知监听线程执行任务停止
statusCondition.signalAll();
} finally {
lock.unlock();
}
}
log.info("有连接关闭,当前连接数为:{}", cnt);
}
/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message, Session session) throws IOException {
log.info("来自客户端的消息:{}", message);
// BroadCastInfo(session.getId() + message);
}
/**
* 出现错误
*
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("发生错误:{},Session ID:{}", error.getMessage(), session.getId());
}
/**
* 发送消息,实践表明,每次浏览器刷新,session会发生变化。
*
* @param session
* @param message
*/
public void sendMessage(Session session, String message) {
try {
if (session.isOpen()) {
session.getBasicRemote().sendText(message);
} else {
log.debug("session id: {}, is not open", session.getId());
}
} catch (Exception e) {
log.error("发送消息出错:{}", e.getMessage());
}
}
/**
* 群发消息
*
* @param message
* @throws IOException
*/
public void broadCastInfo(String message) throws IOException {
for (Session session : SESSION_SET) {
if (session.isOpen()) {
sendMessage(session, message);
}
}
}
/**
* 指定Session发送消息
*
* @param sessionId
* @param message
* @throws IOException
*/
public void sendMessage(String message, String sessionId) throws IOException {
Session session = null;
for (Session s : SESSION_SET) {
if (s.getId().equals(sessionId)) {
session = s;
break;
}
}
if (session != null) {
sendMessage(session, message);
} else {
log.warn("没有找到你指定ID的会话:{}", sessionId);
}
}
public void sendWsMessage(String message, Long fileId) {
Iterator<Map.Entry<String, Set<Long>>> iterator = SESSION_SET_CONCURRENT_HASH_MAP.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, Set<Long>> entry = iterator.next();
if (entry.getValue().contains(fileId)) {
try {
sendMessage(message, entry.getKey());
} catch (IOException e) {
log.error("发送websocket失败:", e);
}
}
}
}
/**
* 新增监听文件ID
* 这里虽然使用了线程安全的集合,但是临界区里面涉及到了读和写的操作,所以依然需要上锁,保证线程安全
* @param sessionId
* @param fileIdList
*/
public synchronized void insertFileListener(String sessionId, List<Long> fileIdList) {
try {
Set<Long> fileIdSet = SESSION_SET_CONCURRENT_HASH_MAP.get(sessionId);
if (fileIdSet == null) {
fileIdSet = new HashSet<>();
}
fileIdSet.addAll(fileIdList);
SESSION_SET_CONCURRENT_HASH_MAP.put(sessionId, fileIdSet);
} catch (Exception e) {
log.error("insertFileListener error:", e);
}
}
/**
* 停止监听
*
* @param sessionId
* @param fileIdList
*/
public synchronized void removeFileListener(String sessionId, List<Long> fileIdList) {
try {
Set<Long> fileIdSet = SESSION_SET_CONCURRENT_HASH_MAP.get(sessionId);
if (fileIdSet != null) {
fileIdList.forEach(fileIdSet::remove);
SESSION_SET_CONCURRENT_HASH_MAP.put(sessionId, fileIdSet);
}
} catch (Exception e) {
log.error("removeFileListener error:", e);
}
}
}
@ServerEndpoint(value = "/ws/asset", configurator = SpringWebSocketConfigurator.class)
主要是解决websocketserver默认不是单例的导致后面在使用某些功能的时候出错了,这个也是无意间发现的,解决方案参考博客private final Condition statusCondition = lock.newCondition();
条件变量用于通知别的线程,当连接人数为零的时候停止线程解析任务- 因为将其改为了单例模式,所以成员变量都需要通过static + 原子类来保证线程安全
private static final AtomicInteger ONLINE_COUNT = new AtomicInteger(0);
:记录连接人数private static final ConcurrentHashMap<String, Set<Long>> SESSION_SET_CONCURRENT_HASH_MAP = new ConcurrentHashMap<>();
:记录每个客户端接收那些文件的坐标
insertFileListener
,removeFileListener
两个方法为什么还要加synchronized
关键字呢?虽然ConcurrentHashMap
是线程安全的集合,他们的**get()方法和put()**方法单独使用时都是线程安全的,但是组合在一起的时候就不一定了。所以还是需要加锁。保证线程安全。
前端控制器
@Tag(name = "轨迹控制器")
@RestController
@RequestMapping("trajectory")
@RequiredArgsConstructor
public class TrajectoryController {
private final TrajectoryService trajectoryService;
@Operation(summary = "目标列表")
@PostMapping("/targetList")
public RestResponse<List<TargetListOutput>> targetList(){
List<TargetListOutput> targetListOutputList = trajectoryService.targetList();
return RestResponse.ok(targetListOutputList);
}
@Operation(summary = "ws新增文件ID")
@PostMapping("/insertFileId")
public RestResponse<Boolean> insertFileId(@RequestBody InsertFileIdInput input) {
Boolean result = trajectoryService.insertFileId(input);
return RestResponse.ok(result);
}
@Operation(summary = "ws删除文件ID")
@PostMapping("/removeFileId")
public RestResponse<Boolean> removeFileId(@RequestBody InsertFileIdInput input) {
Boolean result = trajectoryService.removeFileId(input);
return RestResponse.ok(result);
}
}
TrajectoryServiceImpl
@Slf4j
@Service
@RequiredArgsConstructor
public class TrajectoryServiceImpl implements TrajectoryService {
private final ParseFileUtil parseFileUtil;
private final FileContainer fileContainer;
private final WebSocketServer webSocketServer;
private final IFileRecordConverter fileRecordConverter;
private final RealTimeAnalysisUtil realTimeAnalysisUtil;
private final HttpClientUtil httpClientUtil;
@Value("${target.file.path:}")
private String filePath;
@Override
public Boolean insertFileId(InsertFileIdInput input) {
if (input.getFileId() != null) {
webSocketServer.insertFileListener(input.getSessionId(), Collections.singletonList(input.getFileId()));
}
return true;
}
@Override
public Boolean removeFileId(InsertFileIdInput input) {
if (input.getFileId() != null) {
webSocketServer.removeFileListener(input.getSessionId(), Collections.singletonList(input.getFileId()));
}
return true;
}
@Override
public List<TargetListOutput> targetList() {
List<TargetListOutput> outputList;
// 从缓存中获取
List<FileContainer.ShareFileDto> shareFileList = fileContainer.getShareFileList();
if (CollectionUtils.isEmpty(shareFileList)) {
List<File> fileList = parseFileUtil.getFileList(filePath);
for (File file : fileList) {
FileContainer.ShareFileDto output = new FileContainer.ShareFileDto();
output.setFileId(parseFileUtil.getFileId());
output.setFileName(file.getName());
output.setFilePath(file.getAbsolutePath());
shareFileList.add(output);
}
// 存入缓存中
fileContainer.putShareFileDto(shareFileList);
}
outputList = fileRecordConverter.toTargetListOutput(shareFileList);
// 开始推送
realTimeAnalysisUtil.start();
return outputList;
}
}
List<FileContainer.ShareFileDto> shareFileList = fileContainer.getShareFileList();
这里采用了本地缓存,将文件列表在一次访问的时候将其加入缓存中,当然这里没有做线程安全的保护。
FileContainer类
@Component
public class FileContainer {
private final Set<ShareFileDto> SHARE_FILE_CACHE = new HashSet<>();
/**
* key: 文件ID
* value: 文件列表
*/
private final Cache<Long, FileParseDto> FILE_PARSE_CACHE = CacheBuilder
.newBuilder()
//设置缓存最大容量
.maximumSize(128)
.build();
/**
* key: session
* value: 文件列表
*/
private final Cache<String, List<FileContext>> FILE_CACHE = CacheBuilder.newBuilder()
//设置缓存最大容量
.maximumSize(128)
//过期策略,写入30分钟后过期
.expireAfterWrite(30, TimeUnit.MINUTES)
.build();
public List<FileContext> getList(String session) {
try {
return FILE_CACHE.get(session, ArrayList::new);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
public void put(String session, List<FileContext> fileContextList) {
FILE_CACHE.put(session, fileContextList);
}
public FileParseDto getFileParseDto(Long fileId) {
try {
return FILE_PARSE_CACHE.get(fileId, () -> null);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
public void putFileParseDto(Long fileId, FileParseDto fileParseDto) {
FILE_PARSE_CACHE.put(fileId, fileParseDto);
}
public List<ShareFileDto> getShareFileList() {
return new ArrayList<>(SHARE_FILE_CACHE);
}
public void putShareFileDto(List<ShareFileDto> shareFileDtos) {
SHARE_FILE_CACHE.addAll(shareFileDtos);
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class FileContext {
private Long fileId;
private String fileHeader;
private List<ParseFileListOutput> dataList;
private String fileName;
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class FileParseDto {
private Long fileId;
private String fileHeader;
private List<ParseFileListOutput> dataList;
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class ShareFileDto {
private Long fileId;
private String filePath;
private String fileName;
}
}
realTimeAnalysisUtil.start();
这里是核心点
RealTimeAnalysisUtil 工具类
@Slf4j
@Component
@RequiredArgsConstructor
public class RealTimeAnalysisUtil {
private final FileContainer fileContainer;
private final ParseFileUtil parseFileUtil;
private final WebSocketServer webSocketServer;
private final HttpClientUtil httpClientUtil;
/**
* 犹豫模式标志位:保证线程不会启动多次
*/
private volatile boolean init;
/**
* 用于线程里面循环的标志位,通过这个标志位能判断线程是否结束:true-运行;false-停止
*/
@Getter
private volatile boolean status;
/**
* 保证共享变量 init 的线程安全
*/
private final ReentrantLock lock = new ReentrantLock();
/**
* 存储文件解析的线程
*/
private final List<Thread> threadList = new ArrayList<>();
/**
* 开始往前端推送文件解析数据
*/
public void start() {
this.setStatus(true);
log.debug("开始解析文件:status: {}; init: {};", this.status, this.init);
List<FileContainer.ShareFileDto> shareFileList = fileContainer.getShareFileList();
// 犹豫模式保证线程不会启动多次
lock.lock();
try {
if (init) {
return;
}
this.init = true;
} finally {
lock.unlock();
}
this.listenerStop();
if (!CollectionUtils.isEmpty(shareFileList)) {
log.debug("开始推送文件解析数据");
for (FileContainer.ShareFileDto shareFileDto : shareFileList) {
RealTimePushRunnable realTimePushRunnable = new RealTimePushRunnable(shareFileDto, parseFileUtil, webSocketServer, this, httpClientUtil);
Thread thread = new Thread(realTimePushRunnable, shareFileDto.getFileName());
threadList.add(thread);
thread.start();
}
}
}
/**
* 开启新的线程监听连接人数为0的情况
*/
public void listenerStop() {
new Thread(() -> {
ReentrantLock serverLock = webSocketServer.getLock();
serverLock.lock();
try {
log.debug("监听任务停现成启动......");
// 条件变量:阻塞住,等待满足条件时唤醒线程继续工作
webSocketServer.getStatusCondition().await();
this.stop();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
serverLock.unlock();
}
}, "listenerStop").start();
}
/**
* 停止后:需要修改标志位的值
*/
public void stop() {
lock.lock();
try {
this.init = false;
this.setStatus(false);
// 终止所有线程
log.debug("开始停止所有解析任务");
for (Thread thread : threadList) {
thread.interrupt();
}
// 清除集合
threadList.clear();
log.debug("所有解析任务已终止");
} finally {
lock.unlock();
}
}
public void setStatus(boolean status) {
lock.lock();
try {
this.status = status;
} finally {
lock.unlock();
}
}
}
重点关注 stop()
方法
最开始的时候:我直接修改了两个标志的值为false,但是并没有解决重复解析文件的问题?
public void stop() {
lock.lock();
try {
this.init = false;
this.setStatus(false);
log.debug("所有解析任务已终止");
} finally {
lock.unlock();
}
}
分析:如果前端刷新页面,断开连接——> 建立连接 ——> 获取目标列表(启动分析线程);重复刷新两三次后。
由于线程里面 SleepUtil.sleep();
每个线程睡眠的时间单位是秒,所以导致了,线程在睡眠期间,还没有执行
// 监控状态是否停止
if (!realTimeAnalysisUtil.isStatus()) {
break;
}
这行代码。此时 status 的值变化过程: true -> false -> true ,当线程睡眠结束以后在执行上面的代码时,这个时候status 已经是 true了,所以线程并没有退出,还在继续运行。
问题:怎样在执行stop方法的时候能保证线程的正常结束呢?
通过改造,将启动的线程放入成员变量集合中,通过执行 interrupt()
方法打断睡眠中的线程。修改后的代码:
/**
* 停止后:需要修改标志位的值
*/
public void stop() {
lock.lock();
try {
this.init = false;
this.setStatus(false);
// 终止所有线程
log.debug("开始停止所有解析任务");
for (Thread thread : threadList) {
thread.interrupt();
}
// 清除集合
threadList.clear();
log.debug("所有解析任务已终止");
} finally {
lock.unlock();
}
}
问题:为什么有些线程突然不推送了呢?
以前的代码:
@Slf4j
@Component
@RequiredArgsConstructor
public class RealTimeAnalysisUtil {
private final FileContainer fileContainer;
private final ParseFileUtil parseFileUtil;
private final WebSocketServer webSocketServer;
private volatile boolean init;
private final Object lock = new Object();
/**
* 开始往前端推送文件解析数据
*/
public void start() {
synchronized (lock) {
log.debug("开始解析文件:running: {}; init: {}", webSocketServer.getStatus(), this.init);
if (webSocketServer.getStatus() && !init) {
List<FileContainer.ShareFileDto> shareFileList = fileContainer.getShareFileList();
// 初始化状态设置为true
this.init = true;
if (!CollectionUtils.isEmpty(shareFileList)) {
log.debug("开始推送文件解析数据");
List<RealTimePushThread> realTimePushThreadList = new ArrayList<>();
for (FileContainer.ShareFileDto shareFileDto : shareFileList) {
RealTimePushThread realTimePushThread = new RealTimePushThread(shareFileDto, parseFileUtil, webSocketServer);
realTimePushThread.start();
realTimePushThreadList.add(realTimePushThread);
}
new Thread(() -> {
log.debug("等待所有线程任务结束");
for (RealTimePushThread realTimePushThread : realTimePushThreadList) {
try {
realTimePushThread.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
this.init = false;
}).start();
}
}
}
}
}
代码中的: webSocketServer.getStatus()
是在 连接数为0的时候修改为false的
可以看见,之前是通过对象锁的方式即 synchronized
关键字的方式对共享变量 init
进行修改的,原本是没有问题的;前端刷新页面,断开连接——> 建立连接 ——> 获取目标列表(启动分析线程);但是通过上面的分析,有些线程可能刚好在 status 状态改为false的时候执行到了线面这个代码块:然后就结束了,还有些线程并没有结束,还在继续运行,因为join();
方法是阻塞的;下面 this.init = false;
这行代码就不会执行,这样就会导致,前端刷新页面的时候会导致某些文件突然就推送不了了。
// 监控状态是否停止
if (!realTimeAnalysisUtil.isStatus()) {
break;
}
RealTimePushRunnable
@Slf4j
@RequiredArgsConstructor
public class RealTimePushRunnable implements Runnable {
private final FileContainer.ShareFileDto shareFileDto;
private final ParseFileUtil parseFileUtil;
private final WebSocketServer webSocketServer;
private final RealTimeAnalysisUtil realTimeAnalysisUtil;
private final HttpClientUtil httpClientUtil;
@Override
public void run() {
log.debug("开始文件分析:{}", shareFileDto.getFileName());
MultipartFile file = parseFileUtil.fileToMultipartFile(new File(shareFileDto.getFilePath()));
List<ParseFileListOutput> outputList = parseFileUtil.parseFileList(file, new StringBuilder());
List<ParseFileListOutput> sortList = Optional.ofNullable(outputList).orElse(new ArrayList<>()).stream()
.filter(item -> item.getDatePoint() != null)
.sorted(Comparator.comparing(ParseFileListOutput::getDatePoint))
.collect(Collectors.toList());
while (realTimeAnalysisUtil.isStatus()) {
for (ParseFileListOutput output : sortList) {
output.setFileId(shareFileDto.getFileId());
// 设置原来时间点
output.setOriginalTime(output.getDatePoint());
output.setDatePoint(new Date());
WebSocketResponse<ParseFileListOutput> response = WebSocketResponse.ok(WebSocketMessageConstants.REAL_TIME_PUSH_DATE_POINT, output);
webSocketServer.sendWsMessage(JSON.toJSONString(response), shareFileDto.getFileId());
// TODO 推送算法识别
// IntentionAnalysisOutput intentionAnalysis = httpClientUtil.realTimeIntentionAnalysis(shareFileDto.getFileId(), output);
// TODO 发送结果给前端
// WebSocketResponse<IntentionAnalysisOutput> result = WebSocketResponse.ok(WebSocketMessageConstants.REAL_TIME_PUSH_INTENTION_ANALYSIS, intentionAnalysis);
// webSocketServer.sendWsMessage(JSON.toJSONString(result), shareFileDto.getFileId());
SleepUtil.sleep();
// 监控状态是否停止
if (!realTimeAnalysisUtil.isStatus()) {
break;
}
}
}
}
}
-
while (realTimeAnalysisUtil.isStatus())
: 通过标志位决定是否继续运行任务 -
// 监控状态是否停止 if (!realTimeAnalysisUtil.isStatus()) { break; }
-
SleepUtil.sleep();
让线程随机等待n秒后再推送数据;