首页 > 科技 > SpringBoot项目:RedisTemplate实现轻量级消息队列

SpringBoot项目:RedisTemplate实现轻量级消息队列

背景

公司项目有个需求, 前端上传excel文件, 后端读取数据、处理数据、返回错误数据, 最简单的方式同步处理, 客户端上传文件后一直阻塞等待响应, 但用户体验无疑很差, 处理数据可能十分耗时, 没人愿意傻等, 由于项目暂未使用ActiveMQ等消息队列中间件, 而redis的lpush和rpop很适合作为一种轻量级的消息队列实现, 所以用它完成此次功能开发

一、本文涉及知识点

  • excel文件读写--阿里easyexcel sdk
  • 文件上传、下载--腾讯云对象存储
  • 远程服务调用--restTemplate
  • 生产者、消费者--redisTemplate leftPush和rightPop操作
  • 异步处理数据--Executors线程池
  • 读取网络文件流--HttpClient
  • 自定义注解实现用户身份认证--JWT token认证, 拦截器拦截标注有@LoginRequired注解的请求入口

当然, Java实现咯

涉及的知识点比较多, 每一个知识点都可以作为专题进行学习分析, 本文将完整实现呈现出来, 后期拆分与小伙伴分享学习

二、项目目录结构

说明: 数据库DAO层放到另一个模块了, 不是本文重点

三、主要maven依赖

1.easyexcel

1.1.2-beta4com.alibabaeasyexcel${easyexcel-latestVersion}

2.JWT

io.jsonwebtokenjjwt0.7.0

3.redis

org.springframework.bootspring-boot-starter-redis1.3.5.RELEASE

4.腾讯cos

com.qcloudcos_api5.4.5

四、流程

  • 用户上传文件
  • 将文件存储到腾讯cos
  • 将上传后的文件id及上传记录保存到数据库
  • redis生产一条导入消息, 即保存文件id到redis
  • 请求结束, 返回"处理中"状态
  • redis消费消息
  • 读取cos文件, 异步处理数据
  • 将错误数据以excel形式上传至cos, 以供用户下载, 并更新处理状态为"处理完成"
  • 客户端轮询查询处理状态, 并可以下载错误文件
  • 结束

五、实现效果

1.上传文件

2.数据库导入记录

3.导入的数据

4.下载错误文件

5.错误数据提示

6.查询导入记录

六、代码实现

1、导入excel控制层

@LoginRequired@RequestMapping(value="doImport",method=RequestMethod.POST)publicJsonResponsedoImport(@RequestParam("file")MultipartFilefile,HttpServletRequestrequest){PLUseruser=getUser(request);returnorderImportService.doImport(file,user.getId());}

2、service层

@OverridepublicJsonResponsedoImport(MultipartFilefile,IntegeruserId){if(null==file||file.isEmpty()){thrownewServiceException("文件不能为空");}Stringfilename=file.getOriginalFilename();if(!checkFileSuffix(filename)){thrownewServiceException("当前仅支持xlsx格式的excel");}//存储文件StringfileId=saveToOss(file);if(StringUtils.isBlank(fileId)){thrownewServiceException("文件上传失败,请稍后重试");}//保存记录到数据库saveRecordToDB(userId,fileId,filename);//生产一条订单导入消息redisProducer.produce(RedisKey.orderImportKey,fileId);returnJsonResponse.ok("导入成功,处理中...");}/***校验文件格式*@paramfileName*@return*/privatestaticbooleancheckFileSuffix(StringfileName){if(StringUtils.isBlank(fileName)||fileName.lastIndexOf(".")<=0){returnfalse;}intpointIndex=fileName.lastIndexOf(".");Stringsuffix=fileName.substring(pointIndex,fileName.length()).toLowerCase();if(".xlsx".equals(suffix)){returntrue;}returnfalse;}/***将文件存储到腾讯OSS*@paramfile*@return*/privateStringsaveToOss(MultipartFilefile){InputStreamins=null;try{ins=file.getInputStream();}catch(IOExceptione){e.printStackTrace();}StringfileId;try{StringoriginalFilename=file.getOriginalFilename();Filef=newFile(originalFilename);inputStreamToFile(ins,f);FileSystemResourceresource=newFileSystemResource(f);MultiValueMapparam=newLinkedMultiValueMap<>();param.add("file",resource);ResponseResultresponseResult=restTemplate.postForObject(txOssUploadUrl,param,ResponseResult.class);fileId=(String)responseResult.getData();}catch(Exceptione){fileId=null;}returnfileId;}

3、redis生产者

@ServicepublicclassRedisProducerImplimplementsRedisProducer{@AutowiredprivateRedisTemplateredisTemplate;@OverridepublicJsonResponseproduce(Stringkey,Stringmsg){Mapmap=Maps.newHashMap();map.put("fileId",msg);redisTemplate.opsForList().leftPush(key,map);returnJsonResponse.ok();}}

4、redis消费者

@ServicepublicclassRedisConsumer{@AutowiredpublicRedisTemplateredisTemplate;@Value("${txOssFileUrl}")privateStringtxOssFileUrl;@Value("${txOssUploadUrl}")privateStringtxOssUploadUrl;@PostConstructpublicvoidinit(){processOrderImport();}/***处理订单导入*/privatevoidprocessOrderImport(){ExecutorServiceexecutorService=Executors.newCachedThreadPool();executorService.execute(()->{while(true){Objectobject=redisTemplate.opsForList().rightPop(RedisKey.orderImportKey,1,TimeUnit.SECONDS);if(null==object){continue;}Stringmsg=JSON.toJSONString(object);executorService.execute(newOrderImportTask(msg,txOssFileUrl,txOssUploadUrl));}});}}

5、处理任务线程类

publicclassOrderImportTaskimplementsRunnable{publicOrderImportTask(Stringmsg,StringtxOssFileUrl,StringtxOssUploadUrl){this.msg=msg;this.txOssFileUrl=txOssFileUrl;this.txOssUploadUrl=txOssUploadUrl;}}/***注入bean*/privatevoidautowireBean(){this.restTemplate=BeanContext.getApplicationContext().getBean(RestTemplate.class);this.transactionTemplate=BeanContext.getApplicationContext().getBean(TransactionTemplate.class);this.orderImportService=BeanContext.getApplicationContext().getBean(OrderImportService.class);}@Overridepublicvoidrun(){//注入beanautowireBean();JSONObjectjsonObject=JSON.parseObject(msg);StringfileId=jsonObject.getString("fileId");MultiValueMapparam=newLinkedMultiValueMap<>();param.add("id",fileId);ResponseResultresponseResult=restTemplate.postForObject(txOssFileUrl,param,ResponseResult.class);StringfileUrl=(String)responseResult.getData();if(StringUtils.isBlank(fileUrl)){return;}InputStreaminputStream=HttpClientUtil.readFileFromURL(fileUrl);Listlist=ExcelUtil.read(inputStream);process(list,fileId);}/***将文件上传至oss*@paramfile*@return*/privateStringsaveToOss(Filefile){StringfileId;try{FileSystemResourceresource=newFileSystemResource(file);MultiValueMapparam=newLinkedMultiValueMap<>();param.add("file",resource);ResponseResultresponseResult=restTemplate.postForObject(txOssUploadUrl,param,ResponseResult.class);fileId=(String)responseResult.getData();}catch(Exceptione){fileId=null;}returnfileId;}

说明: 处理数据的业务逻辑代码就不用贴了

6、上传文件到cos

@RequestMapping("/txOssUpload")@ResponseBodypublicResponseResulttxOssUpload(@RequestParam("file")MultipartFilefile)throwsUnsupportedEncodingException{if(null==file||file.isEmpty()){returnResponseResult.fail("文件不能为空");}StringoriginalFilename=file.getOriginalFilename();originalFilename=MimeUtility.decodeText(originalFilename);//解决中文乱码问题StringcontentType=getContentType(originalFilename);Stringkey;InputStreamins=null;Filef=null;try{ins=file.getInputStream();f=newFile(originalFilename);inputStreamToFile(ins,f);key=iFileStorageClient.txOssUpload(newFileInputStream(f),originalFilename,contentType);}catch(Exceptione){returnResponseResult.fail(e.getMessage());}finally{if(null!=ins){try{ins.close();}catch(IOExceptione){e.printStackTrace();}}if(f.exists()){//删除临时文件f.delete();}}returnResponseResult.ok(key);}publicstaticvoidinputStreamToFile(InputStreamins,Filefile){try{OutputStreamos=newFileOutputStream(file);intbytesRead=0;byte[]buffer=newbyte[8192];while((bytesRead=ins.read(buffer,0,8192))!=-1){os.write(buffer,0,bytesRead);}os.close();ins.close();}catch(Exceptione){e.printStackTrace();}}publicStringtxOssUpload(FileInputStreaminputStream,Stringkey,StringcontentType){key=Uuid.getUuid()+"-"+key;OSSUtil.txOssUpload(inputStream,key,contentType);try{if(null!=inputStream){inputStream.close();}}catch(IOExceptione){e.printStackTrace();}returnkey;}publicstaticvoidtxOssUpload(FileInputStreaminputStream,Stringkey,StringcontentType){ObjectMetadataobjectMetadata=newObjectMetadata();try{intlength=inputStream.available();objectMetadata.setContentLength(length);}catch(Exceptione){logger.info(e.getMessage());}objectMetadata.setContentType(contentType);cosclient.putObject(txbucketName,key,inputStream,objectMetadata);}

7、下载文件

/***腾讯云文件下载*@paramresponse*@paramid*@return*/@RequestMapping("/txOssDownload")publicObjecttxOssDownload(HttpServletResponseresponse,Stringid){COSObjectInputStreamcosObjectInputStream=iFileStorageClient.txOssDownload(id,response);StringcontentType=getContentType(id);FileUtil.txOssDownload(response,contentType,cosObjectInputStream,id);returnnull;}publicstaticvoidtxOssDownload(HttpServletResponseresponse,StringcontentType,InputStreamfileStream,StringfileName){FileOutputStreamfos=null;response.reset();OutputStreamos=null;try{response.setContentType(contentType+";charset=utf-8");if(!contentType.equals(PlConstans.FileContentType.image)){try{response.setHeader("Content-Disposition","attachment;filename="+newString(fileName.getBytes("UTF-8"),"ISO8859-1"));}catch(UnsupportedEncodingExceptione){response.setHeader("Content-Disposition","attachment;filename="+fileName);logger.error("encodingfilenamefailed",e);}}os=response.getOutputStream();byte[]b=newbyte[1024*1024];intlen;while((len=fileStream.read(b))>0){os.write(b,0,len);os.flush();try{if(fos!=null){fos.write(b,0,len);fos.flush();}}catch(Exceptione){logger.error(e.getMessage());}}}catch(IOExceptione){IOUtils.closeQuietly(fos);fos=null;}finally{IOUtils.closeQuietly(os);IOUtils.closeQuietly(fileStream);if(fos!=null){IOUtils.closeQuietly(fos);}}}

8、读取网络文件流

/***读取网络文件流*@paramurl*@return*/publicstaticInputStreamreadFileFromURL(Stringurl){if(StringUtils.isBlank(url)){returnnull;}HttpClienthttpClient=newDefaultHttpClient();HttpGetmethodGet=newHttpGet(url);try{HttpResponseresponse=httpClient.execute(methodGet);if(response.getStatusLine().getStatusCode()==200){HttpEntityentity=response.getEntity();returnentity.getContent();}}catch(Exceptione){e.printStackTrace();}returnnull;}

9、ExcelUtil

/***读excel*@paraminputStream文件输入流*@returnlist集合*/publicstaticListread(InputStreaminputStream){returnEasyExcelFactory.read(inputStream,newSheet(1,1));}/***写excel*@paramdatalist数据*@paramclazz*@paramsaveFilePath文件保存路径*@throwsIOException*/publicstaticvoidwrite(Listdata,Classclazz,StringsaveFilePath)throwsIOException{FiletempFile=newFile(saveFilePath);OutputStreamout=newFileOutputStream(tempFile);ExcelWriterwriter=EasyExcelFactory.getWriter(out);Sheetsheet=newSheet(1,3,clazz,"Sheet1",null);writer.write(data,sheet);writer.finish();out.close();}

说明: 至此, 整个流程算是完整了, 下面将其他知识点代码也贴出来参考



七、其他

1、@LoginRequired注解

/***在需要登录验证的Controller的方法上使用此注解*/@Target({ElementType.METHOD})@Retention(RetentionPolicy.RUNTIME)public@interfaceLoginRequired{}

2、MyControllerAdvice

@ControllerAdvicepublicclassMyControllerAdvice{@ResponseBody@ExceptionHandler(TokenValidationException.class)publicJsonResponsetokenValidationExceptionHandler(){returnJsonResponse.loginInvalid();}@ResponseBody@ExceptionHandler(ServiceException.class)publicJsonResponseserviceExceptionHandler(ServiceExceptionse){returnJsonResponse.fail(se.getMsg());}@ResponseBody@ExceptionHandler(Exception.class)publicJsonResponseexceptionHandler(Exceptione){e.printStackTrace();returnJsonResponse.fail(e.getMessage());}}

3、AuthenticationInterceptor

publicclassAuthenticationInterceptorimplementsHandlerInterceptor{privatestaticfinalStringCURRENT_USER="user";@AutowiredprivateUserServiceuserService;@OverridepublicbooleanpreHandle(HttpServletRequestrequest,HttpServletResponseresponse,Objecthandler){//如果不是映射到方法直接通过if(!(handlerinstanceofHandlerMethod)){returntrue;}HandlerMethodhandlerMethod=(HandlerMethod)handler;Methodmethod=handlerMethod.getMethod();//判断接口是否有@LoginRequired注解,有则需要登录LoginRequiredmethodAnnotation=method.getAnnotation(LoginRequired.class);if(methodAnnotation!=null){//验证tokenIntegeruserId=JwtUtil.verifyToken(request);PLUserplUser=userService.selectByPrimaryKey(userId);if(null==plUser){thrownewRuntimeException("用户不存在,请重新登录");}request.setAttribute(CURRENT_USER,plUser);returntrue;}returntrue;}@OverridepublicvoidpostHandle(HttpServletRequesthttpServletRequest,HttpServletResponsehttpServletResponse,Objecto,ModelAndViewmodelAndView)throwsException{}@OverridepublicvoidafterCompletion(HttpServletRequesthttpServletRequest,HttpServletResponsehttpServletResponse,Objecto,Exceptione)throwsException{}}

4、JwtUtil

publicstaticfinallongEXPIRATION_TIME=2592_000_000L;//有效期30天publicstaticfinalStringSECRET="pl_token_secret";publicstaticfinalStringHEADER="token";publicstaticfinalStringUSER_ID="userId";/***根据userId生成token*@paramuserId*@return*/publicstaticStringgenerateToken(StringuserId){HashMapmap=newHashMap<>();map.put(USER_ID,userId);Stringjwt=Jwts.builder().setClaims(map).setExpiration(newDate(System.currentTimeMillis()+EXPIRATION_TIME)).signWith(SignatureAlgorithm.HS512,SECRET).compact();returnjwt;}/***验证token*@paramrequest*@return验证通过返回userId*/publicstaticIntegerverifyToken(HttpServletRequestrequest){Stringtoken=request.getHeader(HEADER);if(token!=null){try{Mapbody=Jwts.parser().setSigningKey(SECRET).parseClaimsJws(token).getBody();for(Map.Entryentry:body.entrySet()){Objectkey=entry.getKey();Objectvalue=entry.getValue();if(key.toString().equals(USER_ID)){returnInteger.valueOf(value.toString());//userId}}returnnull;}catch(Exceptione){logger.error(e.getMessage());thrownewTokenValidationException("unauthorized");}}else{thrownewTokenValidationException("missingtoken");}}

本文来自投稿,不代表本人立场,如若转载,请注明出处:http://www.souzhinan.com/kj/276086.html