5 changed files with 308 additions and 79 deletions
@ -0,0 +1,185 @@ |
|||
package com.epmet.commons.tools.utils; |
|||
|
|||
import com.alibaba.fastjson.JSON; |
|||
import com.epmet.commons.tools.dto.form.DingTalkTextMsg; |
|||
import com.google.common.cache.Cache; |
|||
import com.google.common.cache.CacheBuilder; |
|||
import com.google.common.collect.Lists; |
|||
import org.apache.commons.codec.binary.Base64; |
|||
import org.apache.commons.lang3.StringUtils; |
|||
import org.slf4j.Logger; |
|||
import org.slf4j.LoggerFactory; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
import javax.annotation.PreDestroy; |
|||
import javax.crypto.Mac; |
|||
import javax.crypto.spec.SecretKeySpec; |
|||
import java.io.IOException; |
|||
import java.net.URLEncoder; |
|||
import java.util.concurrent.ArrayBlockingQueue; |
|||
import java.util.concurrent.TimeUnit; |
|||
import java.util.concurrent.atomic.AtomicInteger; |
|||
|
|||
/** |
|||
* desc: 发送消息工具类 |
|||
* |
|||
* @date: 2020/6/29 8:43 |
|||
* @author: jianjun liu |
|||
*/ |
|||
@Component |
|||
public class DingdingMsgSender { |
|||
|
|||
private final Logger logger = LoggerFactory.getLogger(DingdingMsgSender.class); |
|||
//如果不设置则为 开发环境机器人地址
|
|||
private static final String webHook = "https://oapi.dingtalk.com/robot/send?access_token=90782b119f82a5b6bb8e0f819b6a77bbc2102b53aa2d7d2e24fa10b66d580b1c"; |
|||
private static final String secret = "SEC080aac67ff78e79fdaba132aa51e3fb3f6060dec99492feaac82cabf9f8b6a19"; |
|||
/** |
|||
* 默认10 |
|||
*/ |
|||
private static Integer maxQueueSize = 10; |
|||
|
|||
/** |
|||
* 有序队列 |
|||
*/ |
|||
private ArrayBlockingQueue<DingTalkTextMsg> msgQueue = new ArrayBlockingQueue<>(maxQueueSize); |
|||
|
|||
|
|||
private volatile boolean running = false; |
|||
|
|||
private Cache<String, AtomicInteger> limitCache = CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES).maximumSize(1000).build(); |
|||
|
|||
public DingdingMsgSender() { |
|||
|
|||
} |
|||
|
|||
|
|||
private void handleMsg() { |
|||
DingTalkTextMsg msg = null; |
|||
try { |
|||
//阻塞取元素
|
|||
msg = msgQueue.take(); |
|||
if (msg != null) { |
|||
AtomicInteger limitCount = limitCache.getIfPresent(msg.getWebHook()); |
|||
if (limitCount == null) { |
|||
limitCount = new AtomicInteger(1); |
|||
limitCache.put(msg.getWebHook(), limitCount); |
|||
} |
|||
if (limitCount.intValue() > maxQueueSize) { |
|||
msgQueue.offer(msg); |
|||
Thread.sleep(1000); |
|||
} else { |
|||
sendPostByJSON(msg); |
|||
limitCount.addAndGet(1); |
|||
} |
|||
} else { |
|||
Thread.sleep(1000); |
|||
} |
|||
} catch (Exception e) { |
|||
logger.warn("handleMsg exception,serverUrl:" + msg.getWebHook() + ",msg:" + JSON.toJSONString(msg), e); |
|||
} |
|||
} |
|||
|
|||
|
|||
/** |
|||
* desc:异步发送消息 |
|||
* |
|||
* @param messageParam |
|||
* @return |
|||
* @throws IOException |
|||
*/ |
|||
public boolean sendMsgAsync(DingTalkTextMsg messageParam) { |
|||
if (!running) { |
|||
running = true; |
|||
getThread().start(); |
|||
} |
|||
boolean flag = false; |
|||
int currentQueueSize = msgQueue.size(); |
|||
//非阻塞 添加/删除元素
|
|||
if (currentQueueSize < maxQueueSize) { |
|||
flag = msgQueue.offer(messageParam); |
|||
} else { |
|||
msgQueue.poll(); |
|||
|
|||
DingTalkTextMsg param = new DingTalkTextMsg(); |
|||
param.setContent("待发送消息队列已满,当前队列个数" + msgQueue.size() + "\n" + "最新消息内容:" + JSON.toJSONString(messageParam)); |
|||
param.setWebHook(messageParam.getWebHook()); |
|||
sendPostByJSON(param); |
|||
} |
|||
return flag; |
|||
} |
|||
|
|||
/** |
|||
* 同步发送报警 |
|||
* |
|||
* @param messageParam |
|||
* @return |
|||
* @throws IOException |
|||
*/ |
|||
public Result<String> sendMsgSync(DingTalkTextMsg messageParam) { |
|||
return sendPostByJSON(messageParam); |
|||
} |
|||
|
|||
private Thread getThread() { |
|||
Thread sendMsgThread = new Thread("MsgSender-Thread") { |
|||
@Override |
|||
public void run() { |
|||
while (running) { |
|||
handleMsg(); |
|||
} |
|||
} |
|||
}; |
|||
return sendMsgThread; |
|||
} |
|||
|
|||
|
|||
@PreDestroy |
|||
public void destroy() { |
|||
running = false; |
|||
} |
|||
|
|||
/** |
|||
* 发送POST 请求 |
|||
* |
|||
* @param param 请求参数,JSON格式 |
|||
* @return |
|||
*/ |
|||
private Result<String> sendPostByJSON(DingTalkTextMsg param) { |
|||
if (StringUtils.isBlank(param.getWebHook())) { |
|||
param.setWebHook(webHook); |
|||
} |
|||
if (StringUtils.isBlank(param.getSecret())) { |
|||
param.setSecret(secret); |
|||
} |
|||
Result<String> result = new Result<String>().error(); |
|||
Long timestamp = System.currentTimeMillis(); |
|||
try { |
|||
String stringToSign = timestamp + "\n" + param.getSecret(); |
|||
Mac mac = Mac.getInstance("HmacSHA256"); |
|||
mac.init(new SecretKeySpec(param.getSecret().getBytes("UTF-8"), "HmacSHA256")); |
|||
byte[] signData = mac.doFinal(stringToSign.getBytes("UTF-8")); |
|||
String sign = URLEncoder.encode(new String(Base64.encodeBase64(signData)), "UTF-8"); |
|||
String url = param.getWebHook(); |
|||
url = url.concat("×tamp=" + timestamp + "&sign=" + sign); |
|||
String jsonStrParam = param.getMsgContent(); |
|||
result = HttpClientManager.getInstance().sendPostByJSON(url, jsonStrParam); |
|||
} catch (Exception e) { |
|||
logger.warn("sendPostByJSON error", e); |
|||
} |
|||
return result; |
|||
} |
|||
|
|||
|
|||
public static void main(String[] args) { |
|||
for (int i = 0; i < 50; i++) { |
|||
|
|||
DingTalkTextMsg dingTalkTextMsg = new DingTalkTextMsg(); |
|||
dingTalkTextMsg.setWebHook(""); |
|||
dingTalkTextMsg.setContent("测试消息" + i); |
|||
dingTalkTextMsg.setAtMobiles(Lists.newArrayList()); |
|||
dingTalkTextMsg.setAtAll(false); |
|||
dingTalkTextMsg.setSecret(""); |
|||
DingdingMsgSender msgSender = new DingdingMsgSender(); |
|||
msgSender.sendMsgAsync(dingTalkTextMsg); |
|||
} |
|||
} |
|||
} |
Loading…
Reference in new issue