28 changed files with 668 additions and 35 deletions
@ -0,0 +1,17 @@ |
|||||
|
package com.epmet.commons.rocketmq.messages; |
||||
|
|
||||
|
import lombok.AllArgsConstructor; |
||||
|
import lombok.Data; |
||||
|
|
||||
|
import java.io.Serializable; |
||||
|
|
||||
|
/** |
||||
|
* desc:项目变动通知消息实体类 |
||||
|
* @author liujianjun |
||||
|
*/ |
||||
|
@Data |
||||
|
@AllArgsConstructor |
||||
|
public class ProjectChangedMQMsg implements Serializable { |
||||
|
|
||||
|
private String customerId; |
||||
|
} |
@ -0,0 +1,118 @@ |
|||||
|
package com.epmet.mq; |
||||
|
|
||||
|
import com.alibaba.fastjson.JSON; |
||||
|
import com.epmet.commons.rocketmq.messages.ProjectChangedMQMsg; |
||||
|
import com.epmet.commons.tools.distributedlock.DistributedLock; |
||||
|
import com.epmet.commons.tools.exception.RenException; |
||||
|
import com.epmet.commons.tools.utils.SpringContextUtils; |
||||
|
import com.epmet.dto.extract.form.ExtractOriginFormDTO; |
||||
|
import com.epmet.service.evaluationindex.extract.todata.FactOriginExtractService; |
||||
|
import com.epmet.service.evaluationindex.extract.toscreen.ScreenExtractService; |
||||
|
import com.epmet.util.DimIdGenerator; |
||||
|
import lombok.extern.slf4j.Slf4j; |
||||
|
import org.apache.commons.lang3.StringUtils; |
||||
|
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; |
||||
|
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; |
||||
|
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; |
||||
|
import org.apache.rocketmq.common.message.MessageExt; |
||||
|
import org.redisson.api.RLock; |
||||
|
import org.slf4j.Logger; |
||||
|
import org.slf4j.LoggerFactory; |
||||
|
|
||||
|
import javax.annotation.PreDestroy; |
||||
|
import java.util.Date; |
||||
|
import java.util.List; |
||||
|
import java.util.concurrent.ExecutionException; |
||||
|
import java.util.concurrent.Future; |
||||
|
import java.util.concurrent.TimeUnit; |
||||
|
import java.util.stream.Collectors; |
||||
|
|
||||
|
/** |
||||
|
* @Description 项目变动-监听器 |
||||
|
* @return |
||||
|
* @author wxz |
||||
|
* @date 2021.03.03 16:10 |
||||
|
*/ |
||||
|
@Slf4j |
||||
|
public class ProjectChangedCustomListener implements MessageListenerConcurrently { |
||||
|
|
||||
|
private Logger logger = LoggerFactory.getLogger(getClass()); |
||||
|
|
||||
|
@Override |
||||
|
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { |
||||
|
long start = System.currentTimeMillis(); |
||||
|
try { |
||||
|
List<String> customerIds = msgs.stream().map(messageExt -> new String(messageExt.getBody())).distinct().collect(Collectors.toList()); |
||||
|
customerIds.forEach(this::consumeMessage); |
||||
|
} catch (Exception e) { |
||||
|
//失败不重发
|
||||
|
logger.error("consumeMessage fail,msg:{}",e.getMessage()); |
||||
|
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; |
||||
|
} |
||||
|
log.info("consumeMessage success, cost:{} ms",System.currentTimeMillis() - start); |
||||
|
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; |
||||
|
} |
||||
|
|
||||
|
private void consumeMessage(String msg) { |
||||
|
logger.info("receive customerId:{}", JSON.toJSONString(msg)); |
||||
|
ProjectChangedMQMsg msgObj = JSON.parseObject(msg, ProjectChangedMQMsg.class); |
||||
|
|
||||
|
DistributedLock distributedLock = null; |
||||
|
RLock lock = null; |
||||
|
try { |
||||
|
String customerId = msgObj.getCustomerId(); |
||||
|
distributedLock = SpringContextUtils.getBean(DistributedLock.class); |
||||
|
lock = distributedLock.getLock(String.format("lock:project_changed:%s", customerId) |
||||
|
,30L, 30L, TimeUnit.SECONDS); |
||||
|
|
||||
|
if (StringUtils.isBlank(customerId)){ |
||||
|
logger.error("consumer project_changed fail,msg:{}",customerId); |
||||
|
return; |
||||
|
} |
||||
|
//消息被消费太快 业务数据还没有完成 歇一会先
|
||||
|
try { |
||||
|
Thread.sleep(60L); |
||||
|
} catch (InterruptedException e) { |
||||
|
logger.error("consumeMessage sleep exception",e); |
||||
|
} |
||||
|
ExtractOriginFormDTO extractOriginFormDTO = new ExtractOriginFormDTO(); |
||||
|
extractOriginFormDTO.setCustomerId(customerId); |
||||
|
|
||||
|
String dateId = DimIdGenerator.getDateDimId(new Date()); |
||||
|
extractOriginFormDTO.setDateId(dateId); |
||||
|
Future<?> aBoolean = SpringContextUtils.getBean(FactOriginExtractService.class).submitProjectRelationData(extractOriginFormDTO,null); |
||||
|
try { |
||||
|
aBoolean.get(); |
||||
|
} catch (InterruptedException e) { |
||||
|
e.printStackTrace(); |
||||
|
} catch (ExecutionException e) { |
||||
|
e.printStackTrace(); |
||||
|
} |
||||
|
if (aBoolean.isDone()){ |
||||
|
// 指标库,实时数据只更新:【screen_project_data】
|
||||
|
// 【screen_project_process_attachment】
|
||||
|
// 【screen_project_img_data】
|
||||
|
// 【screen_project_process】这四个表
|
||||
|
SpringContextUtils.getBean(ScreenExtractService.class).extractPartData(customerId,dateId); |
||||
|
} |
||||
|
logger.info("consumer projectChanged msg success,{}",aBoolean); |
||||
|
} catch (RenException e) { |
||||
|
// 如果是我们手动抛出的异常,说明在业务可控范围内。目前不需要MQ重试
|
||||
|
logger.error("【RocketMQ】消费项目变动消息失败:",e); |
||||
|
} catch (Exception e) { |
||||
|
// 不是我们自己抛出的异常,可以让MQ重试
|
||||
|
logger.error("【RocketMQ】消费项目变动消息异常:",e); |
||||
|
throw e; |
||||
|
} finally { |
||||
|
if (distributedLock != null){ |
||||
|
distributedLock.unLock(lock); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
@PreDestroy |
||||
|
public void saveCalStatus() { |
||||
|
//todo
|
||||
|
log.info("data-statical-server服务被关闭,执行未执行完的动作"); |
||||
|
|
||||
|
} |
||||
|
} |
@ -0,0 +1,68 @@ |
|||||
|
package com.epmet.mq; |
||||
|
|
||||
|
import com.epmet.commons.rocketmq.constants.ConsomerGroupConstants; |
||||
|
import com.epmet.commons.rocketmq.constants.TopicConstants; |
||||
|
import com.epmet.commons.tools.constant.NumConstant; |
||||
|
import lombok.extern.slf4j.Slf4j; |
||||
|
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; |
||||
|
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; |
||||
|
import org.apache.rocketmq.client.exception.MQClientException; |
||||
|
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; |
||||
|
import org.springframework.beans.factory.annotation.Value; |
||||
|
import org.springframework.core.annotation.Order; |
||||
|
import org.springframework.stereotype.Component; |
||||
|
|
||||
|
import javax.annotation.PostConstruct; |
||||
|
|
||||
|
@Slf4j |
||||
|
@Component |
||||
|
@Order(value = 111) |
||||
|
public class RocketMQConsumerRegister { |
||||
|
|
||||
|
@Value("${rocketmq.name-server}") |
||||
|
private String nameServer; |
||||
|
|
||||
|
/** |
||||
|
* @return |
||||
|
* @Description 注册监听器 |
||||
|
* @author wxz |
||||
|
* @date 2021.03.03 16:09 |
||||
|
*/ |
||||
|
@PostConstruct |
||||
|
public void registerAllListeners() { |
||||
|
try { |
||||
|
register(ConsomerGroupConstants.PROJECT_CHANGED_COMPONENTS_GROUP, MessageModel.CLUSTERING, TopicConstants.PROJECT_CHANGED, "*", new ProjectChangedCustomListener()); |
||||
|
} catch (MQClientException e) { |
||||
|
log.error("registerAllListeners exception", e); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
public void register(String group, MessageModel messageModel, String topic, String subException, MessageListenerConcurrently listener) throws MQClientException { |
||||
|
// 实例化消费者
|
||||
|
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group); |
||||
|
|
||||
|
// 设置NameServer的地址
|
||||
|
consumer.setNamesrvAddr(nameServer); |
||||
|
consumer.setMessageModel(messageModel); |
||||
|
consumer.setInstanceName(buildInstanceName()); |
||||
|
// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
|
||||
|
consumer.subscribe(topic, subException); |
||||
|
// 注册回调实现类来处理从broker拉取回来的消息
|
||||
|
consumer.registerMessageListener(listener); |
||||
|
//一次批量拉去10条消息
|
||||
|
consumer.setConsumeMessageBatchMaxSize(NumConstant.TEN); |
||||
|
// 启动消费者实例
|
||||
|
consumer.start(); |
||||
|
} |
||||
|
|
||||
|
private String buildInstanceName() { |
||||
|
String instanceName = ""; |
||||
|
for (int i = 0; i < 4; i++) { |
||||
|
int t = (int) (Math.random() * 10); |
||||
|
instanceName = instanceName.concat(t + ""); |
||||
|
} |
||||
|
|
||||
|
return instanceName; |
||||
|
} |
||||
|
|
||||
|
} |
@ -0,0 +1,69 @@ |
|||||
|
package com.epmet.send; |
||||
|
|
||||
|
import com.alibaba.fastjson.JSON; |
||||
|
import com.epmet.commons.rocketmq.messages.ProjectChangedMQMsg; |
||||
|
import com.epmet.commons.tools.constant.NumConstant; |
||||
|
import com.epmet.commons.tools.utils.Result; |
||||
|
import com.epmet.constant.SystemMessageType; |
||||
|
import com.epmet.dto.form.SystemMsgFormDTO; |
||||
|
import com.epmet.feign.EpmetMessageOpenFeignClient; |
||||
|
import lombok.extern.slf4j.Slf4j; |
||||
|
|
||||
|
/** |
||||
|
* desc: 发送mq消息直接到rocketMq 系统 |
||||
|
* |
||||
|
* @author: LiuJanJun |
||||
|
* @date: 2021/4/23 2:39 下午 |
||||
|
* @versio: 1.0 |
||||
|
*/ |
||||
|
@Slf4j |
||||
|
public class SendMqMsgUtil { |
||||
|
private static final SendMqMsgUtil INSTANCE = new SendMqMsgUtil(); |
||||
|
|
||||
|
private SendMqMsgUtil() { |
||||
|
|
||||
|
} |
||||
|
|
||||
|
private EpmetMessageOpenFeignClient epmetMessageOpenFeignClient; |
||||
|
|
||||
|
public static SendMqMsgUtil build() { |
||||
|
return INSTANCE; |
||||
|
} |
||||
|
|
||||
|
public SendMqMsgUtil openFeignClient(EpmetMessageOpenFeignClient epmetMessageOpenFeignClient) { |
||||
|
this.epmetMessageOpenFeignClient = epmetMessageOpenFeignClient; |
||||
|
return this; |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* desc: 发送小组成就消息,计算小组成就 |
||||
|
* |
||||
|
* @param msgContent |
||||
|
* @return boolean |
||||
|
* @author LiuJanJun |
||||
|
* @date 2021/4/23 3:01 下午 |
||||
|
* @remark 失败重试1次,调用端自行判断如果失败是否要继续执行 |
||||
|
*/ |
||||
|
public boolean sendProjectChangedMqMsg(ProjectChangedMQMsg msgContent) { |
||||
|
try { |
||||
|
SystemMsgFormDTO systemMsgFormDTO = new SystemMsgFormDTO(); |
||||
|
systemMsgFormDTO.setMessageType(SystemMessageType.PROJECT_CHANGED); |
||||
|
systemMsgFormDTO.setContent(msgContent); |
||||
|
Result sendMsgResult = null; |
||||
|
log.info("sendProjectChangedMqMsg param:{}",msgContent); |
||||
|
int retryTime = 0; |
||||
|
do { |
||||
|
sendMsgResult = epmetMessageOpenFeignClient.sendSystemMsgByMQ(systemMsgFormDTO); |
||||
|
} while ((sendMsgResult == null || !sendMsgResult.success()) && retryTime++ < NumConstant.TWO); |
||||
|
|
||||
|
if (sendMsgResult != null && sendMsgResult.success()) { |
||||
|
return true; |
||||
|
} |
||||
|
log.error("发送(项目变动)系统消息到message服务失败:{},msg:{}", JSON.toJSONString(sendMsgResult), JSON.toJSONString(systemMsgFormDTO)); |
||||
|
} catch (Exception e) { |
||||
|
log.error("sendMqMsg exception", e); |
||||
|
} |
||||
|
return false; |
||||
|
} |
||||
|
|
||||
|
} |
@ -0,0 +1,101 @@ |
|||||
|
/** |
||||
|
* Copyright 2018 人人开源 https://www.renren.io
|
||||
|
* <p> |
||||
|
* This program is free software: you can redistribute it and/or modify |
||||
|
* it under the terms of the GNU General Public License as published by |
||||
|
* the Free Software Foundation, either version 3 of the License, or |
||||
|
* (at your option) any later version. |
||||
|
* <p> |
||||
|
* This program is distributed in the hope that it will be useful, |
||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of |
||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
||||
|
* GNU General Public License for more details. |
||||
|
* <p> |
||||
|
* You should have received a copy of the GNU General Public License |
||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
*/ |
||||
|
|
||||
|
package com.epmet.modules.group.controller; |
||||
|
|
||||
|
import com.alibaba.fastjson.JSON; |
||||
|
import com.epmet.commons.tools.dto.form.mq.MqBaseMsgDTO; |
||||
|
import com.epmet.commons.tools.scan.param.TextScanParamDTO; |
||||
|
import com.epmet.commons.tools.scan.param.TextTaskDTO; |
||||
|
import com.epmet.commons.tools.scan.result.SyncScanResult; |
||||
|
import com.epmet.commons.tools.utils.Result; |
||||
|
import com.epmet.commons.tools.utils.ScanContentUtils; |
||||
|
import com.epmet.commons.tools.utils.SendMqMsgUtils; |
||||
|
import org.apache.commons.lang3.StringUtils; |
||||
|
import org.springframework.web.bind.annotation.RequestMapping; |
||||
|
import org.springframework.web.bind.annotation.RequestParam; |
||||
|
import org.springframework.web.bind.annotation.RestController; |
||||
|
|
||||
|
import java.util.ArrayList; |
||||
|
import java.util.List; |
||||
|
|
||||
|
|
||||
|
/** |
||||
|
* 群组信息表 |
||||
|
* |
||||
|
* @author generator generator@elink-cn.com |
||||
|
* @since v1.0.0 2020-03-28 |
||||
|
*/ |
||||
|
@RestController |
||||
|
public class TestPController { |
||||
|
|
||||
|
@RequestMapping("scan") |
||||
|
public Result setVisitSwitch(@RequestParam String content){ |
||||
|
String url = "https://epmet-dev.elinkservice.cn/api/epmetscan/api/textSyncScan"; |
||||
|
TextTaskDTO p = new TextTaskDTO(); |
||||
|
p.setDataId("1"); |
||||
|
String content1 = "开着房车去旅行,已经成了不少人出游的新方式。临近“五一”,房车租赁市场持续火爆,尽管价格比平时贵了不少,可仍一车难求。\n" + |
||||
|
"\n" + |
||||
|
"“临近‘五一’,想开一辆房车拉着一家人去草原旅游,没想到一周前就已经租不到车了。”读者刘先生说,最近一段时间,他总是在一些短视频平台上看到有人开着房车出门旅行的视频。眼下春暖花开,正是出游的好时节,开上一辆房车,不仅节省了住宿费,还能体验一种不同的出行方式。不过,这周起刘先生搜索相关的房车租赁信息时,却发现不少房车租赁公司已经是一车难求了。\n" + |
||||
|
"在某房车租车软件上记者发现,位于房山和顺义的两个取车还车点,五一期间的全部车型已经被租赁一空。而在另一家房车租赁公司的价格目录里,明确标注了租赁价格分为平时价和节假日价,后者是前者的1.2倍。\n" + |
||||
|
"\n" + |
||||
|
"“‘五一’期间,北京的房车租赁情况很紧张,目前只剩一台东风了,你可以考虑一下。”一家房车租赁公司的工作人员介绍,他们公司规定“五一”期间租赁时间为五天起租,仅剩的东风车型每天的租金在1278元。“‘五一’期间的价格会比平时高一些,我建议您在5月6日之后租,一方面没有租赁时长的限制,另一方面租金会便宜不少,每天也就900多元。"; |
||||
|
if (StringUtils.isBlank(content)){ |
||||
|
p.setContent(content1); |
||||
|
}else{ |
||||
|
p.setContent(content); |
||||
|
} |
||||
|
List<TextTaskDTO> list = new ArrayList<>(); |
||||
|
list.add(p); |
||||
|
TextScanParamDTO param = new TextScanParamDTO(); |
||||
|
param.setTasks(list); |
||||
|
Result<SyncScanResult> imgSyncScanResult = ScanContentUtils.textSyncScan(url, param); |
||||
|
System.out.println("==================="+ JSON.toJSONString(imgSyncScanResult)); |
||||
|
SyncScanResult result = new SyncScanResult(); |
||||
|
if (imgSyncScanResult != null){ |
||||
|
SyncScanResult imgSyncScanResultData = imgSyncScanResult.getData(); |
||||
|
if (imgSyncScanResult.success()&&imgSyncScanResultData.isAllPass()) { |
||||
|
result.setAllPass(imgSyncScanResultData.isAllPass()); |
||||
|
result.getSuccessDataIds().addAll(imgSyncScanResultData.getSuccessDataIds()); |
||||
|
result.getFailDataIds().addAll(imgSyncScanResultData.getFailDataIds()); |
||||
|
System.out.println("================"+JSON.toJSONString(result)); |
||||
|
} |
||||
|
} |
||||
|
return new Result().ok(result); |
||||
|
} |
||||
|
|
||||
|
@RequestMapping("test-mq") |
||||
|
public Result testMqMsg(@RequestParam String content){ |
||||
|
MqBaseMsgDTO msg = null; |
||||
|
if (StringUtils.isBlank(content)){ |
||||
|
content = "{\"appId\":\"202007161443499985fa2d397436d10356542134c8f008c48\",\"eventClass\":\"resi_group\",\"eventTag\":\"shift_topic_to_issue\",\"msg\":\"[{\\\"actionFlag\\\":\\\"plus\\\",\\\"agencyId\\\":\\\"e9b55911549fe7b0d0427b557f7c5efc\\\",\\\"customerId\\\":\\\"45687aa479955f9d06204d415238f7cc\\\",\\\"eventClass\\\":\\\"resi_group\\\",\\\"eventTag\\\":\\\"topic_to_issue\\\",\\\"gridId\\\":\\\"708a3567b54ced7219666489f9d838ab\\\",\\\"groupId\\\":\\\"6838a625e4ccc0dc036f9903af833ad9\\\",\\\"isCommon\\\":false,\\\"remark\\\":\\\"小组成就3小组中发布的话题\\\\\\\"还是不行这里在测试测试测试为什么发反反复复反反复复反反复复方法不够爽肤水的分身乏术分身乏术发烧反反复复方法分为玩儿玩儿玩儿啦啦啦啦\\\\\\\"被转为议题\\\",\\\"sourceId\\\":\\\"67a06a217ecf46cd92944040323e70e0\\\",\\\"sourceType\\\":\\\"issue\\\",\\\"userId\\\":\\\"74b5d6792d89fb2fb2d0363de3cbc1aa\\\"},{\\\"actionFlag\\\":\\\"plus\\\",\\\"agencyId\\\":\\\"e9b55911549fe7b0d0427b557f7c5efc\\\",\\\"customerId\\\":\\\"45687aa479955f9d06204d415238f7cc\\\",\\\"eventTag\\\":\\\"shift_topic_to_issue\\\",\\\"gridId\\\":\\\"708a3567b54ced7219666489f9d838ab\\\",\\\"groupId\\\":\\\"6838a625e4ccc0dc036f9903af833ad9\\\",\\\"isCommon\\\":false,\\\"remark\\\":\\\"将话题\\\\\\\"还是不行\\\\\\\"转为议题\\\",\\\"sourceId\\\":\\\"67a06a217ecf46cd92944040323e70e0\\\",\\\"sourceType\\\":\\\"issue\\\",\\\"userId\\\":\\\"74b5d6792d89fb2fb2d0363de3cbc1aa\\\"}]\",\"requestUrl\":\"https://epmet-dev.elinkservice.cn/estos/producerService/producer/sendMsg\",\"token\":\"52d9d9b0e7d0eb5b8b81c205b579e07c\"}"; |
||||
|
msg = JSON.parseObject(content,MqBaseMsgDTO.class); |
||||
|
}else{ |
||||
|
msg = new MqBaseMsgDTO(); |
||||
|
msg.setMsg(content); |
||||
|
msg.setEventClass("resi_group"); |
||||
|
msg.setEventTag("shift_topic_to_issue"); |
||||
|
} |
||||
|
|
||||
|
|
||||
|
Result<String> stringResult = SendMqMsgUtils.sendMsg(msg); |
||||
|
System.out.println("=========="+JSON.toJSONString(stringResult)); |
||||
|
return stringResult; |
||||
|
} |
||||
|
|
||||
|
|
||||
|
} |
Loading…
Reference in new issue