背景
公司项目有个需求, 前端上传excel文件, 后端读取数据、处理数据、返回错误数据, 最简单的方式同步处理, 客户端上传文件后一直阻塞等待响应, 但用户体验无疑很差, 处理数据可能十分耗时, 没人愿意傻等, 由于项目暂未使用ActiveMQ等消息队列中间件, 而redis的lpush和rpop很适合作为一种轻量级的消息队列实现, 所以用它完成此次功能开发
一、本文涉及知识点
- excel文件读写--阿里easyexcel sdk
- 文件上传、下载--腾讯云对象存储
- 远程服务调用--restTemplate
- 生产者、消费者--redisTemplate leftPush和rightPop操作
- 异步处理数据--Executors线程池
- 读取网络文件流--HttpClient
- 自定义注解实现用户身份认证--JWT token认证, 拦截器拦截标注有@LoginRequired注解的请求入口
当然, Java实现咯
涉及的知识点比较多, 每一个知识点都可以作为专题进行学习分析, 本文将完整实现呈现出来, 后期拆分与小伙伴分享学习
二、项目目录结构
说明: 数据库DAO层放到另一个模块了, 不是本文重点
三、主要maven依赖
1.easyexcel
1.1.2-beta4 com.alibaba easyexcel${easyexcel-latestVersion}
2.JWT
io.jsonwebtoken jjwt0.7.0
3.redis
org.springframework.boot spring-boot-starter-redis1.3.5.RELEASE
4.腾讯cos
com.qcloud cos_api5.4.5
四、流程
- 用户上传文件
- 将文件存储到腾讯cos
- 将上传后的文件id及上传记录保存到数据库
- redis生产一条导入消息, 即保存文件id到redis
- 请求结束, 返回"处理中"状态
- redis消费消息
- 读取cos文件, 异步处理数据
- 将错误数据以excel形式上传至cos, 以供用户下载, 并更新处理状态为"处理完成"
- 客户端轮询查询处理状态, 并可以下载错误文件
- 结束
五、实现效果
1.上传文件
2.数据库导入记录
3.导入的数据
4.下载错误文件
5.错误数据提示
6.查询导入记录
六、代码实现
1、导入excel控制层
@LoginRequired@RequestMapping(value="doImport",method=RequestMethod.POST)publicJsonResponsedoImport(@RequestParam("file")MultipartFilefile,HttpServletRequestrequest){PLUseruser=getUser(request);returnorderImportService.doImport(file,user.getId());}
2、service层
@OverridepublicJsonResponsedoImport(MultipartFilefile,IntegeruserId){if(null==file||file.isEmpty()){thrownewServiceException("文件不能为空");}Stringfilename=file.getOriginalFilename();if(!checkFileSuffix(filename)){thrownewServiceException("当前仅支持xlsx格式的excel");}//存储文件StringfileId=saveToOss(file);if(StringUtils.isBlank(fileId)){thrownewServiceException("文件上传失败,请稍后重试");}//保存记录到数据库saveRecordToDB(userId,fileId,filename);//生产一条订单导入消息redisProducer.produce(RedisKey.orderImportKey,fileId);returnJsonResponse.ok("导入成功,处理中...");}/***校验文件格式*@paramfileName*@return*/privatestaticbooleancheckFileSuffix(StringfileName){if(StringUtils.isBlank(fileName)||fileName.lastIndexOf(".")<=0){returnfalse;}intpointIndex=fileName.lastIndexOf(".");Stringsuffix=fileName.substring(pointIndex,fileName.length()).toLowerCase();if(".xlsx".equals(suffix)){returntrue;}returnfalse;}/***将文件存储到腾讯OSS*@paramfile*@return*/privateStringsaveToOss(MultipartFilefile){InputStreamins=null;try{ins=file.getInputStream();}catch(IOExceptione){e.printStackTrace();}StringfileId;try{StringoriginalFilename=file.getOriginalFilename();Filef=newFile(originalFilename);inputStreamToFile(ins,f);FileSystemResourceresource=newFileSystemResource(f);MultiValueMapparam=newLinkedMultiValueMap<>();param.add("file",resource);ResponseResultresponseResult=restTemplate.postForObject(txOssUploadUrl,param,ResponseResult.class);fileId=(String)responseResult.getData();}catch(Exceptione){fileId=null;}returnfileId;}
3、redis生产者
@ServicepublicclassRedisProducerImplimplementsRedisProducer{@AutowiredprivateRedisTemplateredisTemplate;@OverridepublicJsonResponseproduce(Stringkey,Stringmsg){Mapmap=Maps.newHashMap();map.put("fileId",msg);redisTemplate.opsForList().leftPush(key,map);returnJsonResponse.ok();}}
4、redis消费者
@ServicepublicclassRedisConsumer{@AutowiredpublicRedisTemplateredisTemplate;@Value("${txOssFileUrl}")privateStringtxOssFileUrl;@Value("${txOssUploadUrl}")privateStringtxOssUploadUrl;@PostConstructpublicvoidinit(){processOrderImport();}/***处理订单导入*/privatevoidprocessOrderImport(){ExecutorServiceexecutorService=Executors.newCachedThreadPool();executorService.execute(()->{while(true){Objectobject=redisTemplate.opsForList().rightPop(RedisKey.orderImportKey,1,TimeUnit.SECONDS);if(null==object){continue;}Stringmsg=JSON.toJSONString(object);executorService.execute(newOrderImportTask(msg,txOssFileUrl,txOssUploadUrl));}});}}
5、处理任务线程类
publicclassOrderImportTaskimplementsRunnable{publicOrderImportTask(Stringmsg,StringtxOssFileUrl,StringtxOssUploadUrl){this.msg=msg;this.txOssFileUrl=txOssFileUrl;this.txOssUploadUrl=txOssUploadUrl;}}/***注入bean*/privatevoidautowireBean(){this.restTemplate=BeanContext.getApplicationContext().getBean(RestTemplate.class);this.transactionTemplate=BeanContext.getApplicationContext().getBean(TransactionTemplate.class);this.orderImportService=BeanContext.getApplicationContext().getBean(OrderImportService.class);}@Overridepublicvoidrun(){//注入beanautowireBean();JSONObjectjsonObject=JSON.parseObject(msg);StringfileId=jsonObject.getString("fileId");MultiValueMapparam=newLinkedMultiValueMap<>();param.add("id",fileId);ResponseResultresponseResult=restTemplate.postForObject(txOssFileUrl,param,ResponseResult.class);StringfileUrl=(String)responseResult.getData();if(StringUtils.isBlank(fileUrl)){return;}InputStreaminputStream=HttpClientUtil.readFileFromURL(fileUrl);List
说明: 处理数据的业务逻辑代码就不用贴了
6、上传文件到cos
@RequestMapping("/txOssUpload")@ResponseBodypublicResponseResulttxOssUpload(@RequestParam("file")MultipartFilefile)throwsUnsupportedEncodingException{if(null==file||file.isEmpty()){returnResponseResult.fail("文件不能为空");}StringoriginalFilename=file.getOriginalFilename();originalFilename=MimeUtility.decodeText(originalFilename);//解决中文乱码问题StringcontentType=getContentType(originalFilename);Stringkey;InputStreamins=null;Filef=null;try{ins=file.getInputStream();f=newFile(originalFilename);inputStreamToFile(ins,f);key=iFileStorageClient.txOssUpload(newFileInputStream(f),originalFilename,contentType);}catch(Exceptione){returnResponseResult.fail(e.getMessage());}finally{if(null!=ins){try{ins.close();}catch(IOExceptione){e.printStackTrace();}}if(f.exists()){//删除临时文件f.delete();}}returnResponseResult.ok(key);}publicstaticvoidinputStreamToFile(InputStreamins,Filefile){try{OutputStreamos=newFileOutputStream(file);intbytesRead=0;byte[]buffer=newbyte[8192];while((bytesRead=ins.read(buffer,0,8192))!=-1){os.write(buffer,0,bytesRead);}os.close();ins.close();}catch(Exceptione){e.printStackTrace();}}publicStringtxOssUpload(FileInputStreaminputStream,Stringkey,StringcontentType){key=Uuid.getUuid()+"-"+key;OSSUtil.txOssUpload(inputStream,key,contentType);try{if(null!=inputStream){inputStream.close();}}catch(IOExceptione){e.printStackTrace();}returnkey;}publicstaticvoidtxOssUpload(FileInputStreaminputStream,Stringkey,StringcontentType){ObjectMetadataobjectMetadata=newObjectMetadata();try{intlength=inputStream.available();objectMetadata.setContentLength(length);}catch(Exceptione){logger.info(e.getMessage());}objectMetadata.setContentType(contentType);cosclient.putObject(txbucketName,key,inputStream,objectMetadata);}
7、下载文件
/***腾讯云文件下载*@paramresponse*@paramid*@return*/@RequestMapping("/txOssDownload")publicObjecttxOssDownload(HttpServletResponseresponse,Stringid){COSObjectInputStreamcosObjectInputStream=iFileStorageClient.txOssDownload(id,response);StringcontentType=getContentType(id);FileUtil.txOssDownload(response,contentType,cosObjectInputStream,id);returnnull;}publicstaticvoidtxOssDownload(HttpServletResponseresponse,StringcontentType,InputStreamfileStream,StringfileName){FileOutputStreamfos=null;response.reset();OutputStreamos=null;try{response.setContentType(contentType+";charset=utf-8");if(!contentType.equals(PlConstans.FileContentType.image)){try{response.setHeader("Content-Disposition","attachment;filename="+newString(fileName.getBytes("UTF-8"),"ISO8859-1"));}catch(UnsupportedEncodingExceptione){response.setHeader("Content-Disposition","attachment;filename="+fileName);logger.error("encodingfilenamefailed",e);}}os=response.getOutputStream();byte[]b=newbyte[1024*1024];intlen;while((len=fileStream.read(b))>0){os.write(b,0,len);os.flush();try{if(fos!=null){fos.write(b,0,len);fos.flush();}}catch(Exceptione){logger.error(e.getMessage());}}}catch(IOExceptione){IOUtils.closeQuietly(fos);fos=null;}finally{IOUtils.closeQuietly(os);IOUtils.closeQuietly(fileStream);if(fos!=null){IOUtils.closeQuietly(fos);}}}
8、读取网络文件流
/***读取网络文件流*@paramurl*@return*/publicstaticInputStreamreadFileFromURL(Stringurl){if(StringUtils.isBlank(url)){returnnull;}HttpClienthttpClient=newDefaultHttpClient();HttpGetmethodGet=newHttpGet(url);try{HttpResponseresponse=httpClient.execute(methodGet);if(response.getStatusLine().getStatusCode()==200){HttpEntityentity=response.getEntity();returnentity.getContent();}}catch(Exceptione){e.printStackTrace();}returnnull;}
9、ExcelUtil
/***读excel*@paraminputStream文件输入流*@returnlist集合*/publicstaticList
说明: 至此, 整个流程算是完整了, 下面将其他知识点代码也贴出来参考
七、其他
1、@LoginRequired注解
/***在需要登录验证的Controller的方法上使用此注解*/@Target({ElementType.METHOD})@Retention(RetentionPolicy.RUNTIME)public@interfaceLoginRequired{}
2、MyControllerAdvice
@ControllerAdvicepublicclassMyControllerAdvice{@ResponseBody@ExceptionHandler(TokenValidationException.class)publicJsonResponsetokenValidationExceptionHandler(){returnJsonResponse.loginInvalid();}@ResponseBody@ExceptionHandler(ServiceException.class)publicJsonResponseserviceExceptionHandler(ServiceExceptionse){returnJsonResponse.fail(se.getMsg());}@ResponseBody@ExceptionHandler(Exception.class)publicJsonResponseexceptionHandler(Exceptione){e.printStackTrace();returnJsonResponse.fail(e.getMessage());}}
3、AuthenticationInterceptor
publicclassAuthenticationInterceptorimplementsHandlerInterceptor{privatestaticfinalStringCURRENT_USER="user";@AutowiredprivateUserServiceuserService;@OverridepublicbooleanpreHandle(HttpServletRequestrequest,HttpServletResponseresponse,Objecthandler){//如果不是映射到方法直接通过if(!(handlerinstanceofHandlerMethod)){returntrue;}HandlerMethodhandlerMethod=(HandlerMethod)handler;Methodmethod=handlerMethod.getMethod();//判断接口是否有@LoginRequired注解,有则需要登录LoginRequiredmethodAnnotation=method.getAnnotation(LoginRequired.class);if(methodAnnotation!=null){//验证tokenIntegeruserId=JwtUtil.verifyToken(request);PLUserplUser=userService.selectByPrimaryKey(userId);if(null==plUser){thrownewRuntimeException("用户不存在,请重新登录");}request.setAttribute(CURRENT_USER,plUser);returntrue;}returntrue;}@OverridepublicvoidpostHandle(HttpServletRequesthttpServletRequest,HttpServletResponsehttpServletResponse,Objecto,ModelAndViewmodelAndView)throwsException{}@OverridepublicvoidafterCompletion(HttpServletRequesthttpServletRequest,HttpServletResponsehttpServletResponse,Objecto,Exceptione)throwsException{}}
4、JwtUtil
publicstaticfinallongEXPIRATION_TIME=2592_000_000L;//有效期30天publicstaticfinalStringSECRET="pl_token_secret";publicstaticfinalStringHEADER="token";publicstaticfinalStringUSER_ID="userId";/***根据userId生成token*@paramuserId*@return*/publicstaticStringgenerateToken(StringuserId){HashMapmap=newHashMap<>();map.put(USER_ID,userId);Stringjwt=Jwts.builder().setClaims(map).setExpiration(newDate(System.currentTimeMillis()+EXPIRATION_TIME)).signWith(SignatureAlgorithm.HS512,SECRET).compact();returnjwt;}/***验证token*@paramrequest*@return验证通过返回userId*/publicstaticIntegerverifyToken(HttpServletRequestrequest){Stringtoken=request.getHeader(HEADER);if(token!=null){try{Mapbody=Jwts.parser().setSigningKey(SECRET).parseClaimsJws(token).getBody();for(Map.Entryentry:body.entrySet()){Objectkey=entry.getKey();Objectvalue=entry.getValue();if(key.toString().equals(USER_ID)){returnInteger.valueOf(value.toString());//userId}}returnnull;}catch(Exceptione){logger.error(e.getMessage());thrownewTokenValidationException("unauthorized");}}else{thrownewTokenValidationException("missingtoken");}}
本文来自投稿,不代表本人立场,如若转载,请注明出处:http://www.souzhinan.com/kj/276086.html