forked from rongchao/epmet-cloud-rizhao
10 changed files with 257 additions and 15 deletions
@ -0,0 +1,88 @@ |
|||||
|
package com.epmet.mq.producer; |
||||
|
|
||||
|
import com.epmet.commons.tools.exception.ExceptionUtils; |
||||
|
import com.epmet.mq.properties.RocketMQProperties; |
||||
|
import lombok.extern.slf4j.Slf4j; |
||||
|
import org.apache.rocketmq.acl.common.AclClientRPCHook; |
||||
|
import org.apache.rocketmq.acl.common.SessionCredentials; |
||||
|
import org.apache.rocketmq.client.exception.MQClientException; |
||||
|
import org.apache.rocketmq.client.producer.DefaultMQProducer; |
||||
|
import org.apache.rocketmq.client.producer.SendResult; |
||||
|
import org.apache.rocketmq.client.producer.SendStatus; |
||||
|
import org.apache.rocketmq.common.message.Message; |
||||
|
import org.apache.rocketmq.remoting.common.RemotingHelper; |
||||
|
import org.springframework.beans.factory.annotation.Autowired; |
||||
|
import org.springframework.stereotype.Component; |
||||
|
|
||||
|
import javax.annotation.PostConstruct; |
||||
|
import javax.annotation.PreDestroy; |
||||
|
|
||||
|
/** |
||||
|
* @Description 描述 |
||||
|
* @Author wangxianzhang |
||||
|
* @Date 2021/11/26 12:33 下午 |
||||
|
* @Version 1.0 |
||||
|
*/ |
||||
|
@Component |
||||
|
@Slf4j |
||||
|
public class BlockChainProducer { |
||||
|
|
||||
|
// topic
|
||||
|
public static final String TOPIC_PROJECT = "project"; |
||||
|
public static final String TOPIC_AUTH = "auth"; |
||||
|
|
||||
|
// tag
|
||||
|
public static final String TAG_SEND_PROJECT = "send_project"; |
||||
|
public static final String TAG_SEND_PROCESS = "send_process"; |
||||
|
public static final String TAG_SEND_ASSIGNED_STAFFS = "send_assigned_staffs"; |
||||
|
|
||||
|
// 组
|
||||
|
public static final String GROUP_EPMET_CLOUD_PROJECT_SENDER = "epmet_cloud_project_sender"; |
||||
|
|
||||
|
private DefaultMQProducer producer; |
||||
|
|
||||
|
@Autowired |
||||
|
private RocketMQProperties rocketMQProperties; |
||||
|
|
||||
|
@PostConstruct |
||||
|
private void postConstruct() { |
||||
|
try { |
||||
|
producer = new DefaultMQProducer(GROUP_EPMET_CLOUD_PROJECT_SENDER, |
||||
|
new AclClientRPCHook(new SessionCredentials(rocketMQProperties.getBlockChain().getAccessKey(), rocketMQProperties.getBlockChain().getSecretKey()))); |
||||
|
producer.setNamesrvAddr(rocketMQProperties.getBlockChain().getNameServer()); |
||||
|
producer.start(); |
||||
|
} catch (MQClientException e) { |
||||
|
e.printStackTrace(); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 销毁producer |
||||
|
*/ |
||||
|
@PreDestroy |
||||
|
private void preDestroy() { |
||||
|
producer.shutdown(); |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 发送消息 |
||||
|
* |
||||
|
* @param topic |
||||
|
* @param tag |
||||
|
*/ |
||||
|
public void sendMsg(String topic, String tag, String content) { |
||||
|
try { |
||||
|
Message msg = new Message(topic, tag, content.getBytes(RemotingHelper.DEFAULT_CHARSET)); |
||||
|
SendResult sendResult = producer.send(msg); |
||||
|
if (SendStatus.SEND_OK == sendResult.getSendStatus() |
||||
|
|| SendStatus.SLAVE_NOT_AVAILABLE == sendResult.getSendStatus()) { |
||||
|
log.info("消息发送区块链MQ成功, topic:{}, tag:{}, content:{}", topic, tag, content); |
||||
|
} else { |
||||
|
log.error("消息发送区块链MQ失败, topic:{}, tag:{}, content:{}, status: {}", topic, tag, content, sendResult.getSendStatus()); |
||||
|
} |
||||
|
} catch (Exception e) { |
||||
|
log.error("消息发送区块链MQ出错, topic:{}, tag:{}, content:{}, 错误信息:{]", topic, tag, content, ExceptionUtils.getErrorStackTrace(e)); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
} |
@ -0,0 +1,33 @@ |
|||||
|
package com.epmet.mq.properties; |
||||
|
|
||||
|
import lombok.Data; |
||||
|
import org.springframework.boot.context.properties.ConfigurationProperties; |
||||
|
|
||||
|
/** |
||||
|
* @Description 描述 |
||||
|
* @Author wangxianzhang |
||||
|
* @Date 2021/11/26 10:03 上午 |
||||
|
* @Version 1.0 |
||||
|
*/ |
||||
|
@ConfigurationProperties(prefix = "rocketmq") |
||||
|
@Data |
||||
|
public class RocketMQProperties { |
||||
|
|
||||
|
// 这里必须new出对象来才能成功封装
|
||||
|
private EpmetCloudInnerProperties epmetCloudInner = new EpmetCloudInnerProperties(); |
||||
|
private BlockChainProperties blockChain = new BlockChainProperties(); |
||||
|
|
||||
|
@Data |
||||
|
public class EpmetCloudInnerProperties { |
||||
|
private Boolean enable; |
||||
|
private String nameServer; |
||||
|
} |
||||
|
|
||||
|
@Data |
||||
|
public class BlockChainProperties { |
||||
|
private Boolean enable; |
||||
|
private String nameServer; |
||||
|
private String accessKey; |
||||
|
private String secretKey; |
||||
|
} |
||||
|
} |
@ -0,0 +1,14 @@ |
|||||
|
package com.epmet.service; |
||||
|
|
||||
|
import com.epmet.dto.form.BlockChainCreateProjectFormDTO; |
||||
|
import com.epmet.dto.form.BlockChainProcessProjectFormDTO; |
||||
|
|
||||
|
/** |
||||
|
* 区块链:项目service |
||||
|
*/ |
||||
|
public interface BlockChainProjectService { |
||||
|
|
||||
|
void blockChainCreateProject(BlockChainCreateProjectFormDTO input); |
||||
|
void blockChainProcessProject(BlockChainProcessProjectFormDTO input); |
||||
|
|
||||
|
} |
@ -0,0 +1,44 @@ |
|||||
|
package com.epmet.service.impl; |
||||
|
|
||||
|
import com.alibaba.fastjson.JSON; |
||||
|
import com.epmet.dto.form.BlockChainCreateProjectFormDTO; |
||||
|
import com.epmet.dto.form.BlockChainProcessProjectFormDTO; |
||||
|
import com.epmet.mq.producer.BlockChainProducer; |
||||
|
import com.epmet.mq.properties.RocketMQProperties; |
||||
|
import com.epmet.service.BlockChainProjectService; |
||||
|
import org.springframework.beans.factory.annotation.Autowired; |
||||
|
import org.springframework.stereotype.Service; |
||||
|
|
||||
|
/** |
||||
|
* @Description 描述 |
||||
|
* @Author wangxianzhang |
||||
|
* @Date 2021/11/26 9:59 上午 |
||||
|
* @Version 1.0 |
||||
|
*/ |
||||
|
@Service |
||||
|
public class BlockChainProjectServiceImpl implements BlockChainProjectService { |
||||
|
|
||||
|
@Autowired |
||||
|
private BlockChainProducer blockChainProducer; |
||||
|
|
||||
|
@Override |
||||
|
public void blockChainCreateProject(BlockChainCreateProjectFormDTO input) { |
||||
|
|
||||
|
String projectString = JSON.toJSONString(input.getProject()); |
||||
|
String processString = JSON.toJSONString(input.getProcess()); |
||||
|
String assignedStaffsString = JSON.toJSONString(input.getAssignedStaffs()); |
||||
|
|
||||
|
blockChainProducer.sendMsg(BlockChainProducer.TOPIC_PROJECT, BlockChainProducer.TAG_SEND_PROJECT, projectString); |
||||
|
blockChainProducer.sendMsg(BlockChainProducer.TOPIC_PROJECT, BlockChainProducer.TAG_SEND_PROCESS, processString); |
||||
|
blockChainProducer.sendMsg(BlockChainProducer.TOPIC_PROJECT, BlockChainProducer.TAG_SEND_ASSIGNED_STAFFS, assignedStaffsString); |
||||
|
} |
||||
|
|
||||
|
@Override |
||||
|
public void blockChainProcessProject(BlockChainProcessProjectFormDTO input) { |
||||
|
String processString = JSON.toJSONString(input.getProcess()); |
||||
|
String assignedStaffsString = JSON.toJSONString(input.getAssignedStaffs()); |
||||
|
|
||||
|
blockChainProducer.sendMsg(BlockChainProducer.TOPIC_PROJECT, BlockChainProducer.TAG_SEND_PROCESS, processString); |
||||
|
blockChainProducer.sendMsg(BlockChainProducer.TOPIC_PROJECT, BlockChainProducer.TAG_SEND_ASSIGNED_STAFFS, assignedStaffsString); |
||||
|
} |
||||
|
} |
Loading…
Reference in new issue