钉钉部分
钉钉消息发送部分使用DingtalkChatbot,然后需要获取三个值:AppSecret、access_token、secret
AppSecret来自钉钉开放平台
access_token、secret在将机器人添加到群组后可以看到
python部分
index.py:
import requests
import json
import time
import hmac
import hashlib
import base64
import urllib.parse
from dingtalkchatbot.chatbot import DingtalkChatbot
requests.urllib3.disable_warnings()
# 函数执行超时时间稍微设置大一点
# 配置
access_token = "xxxxxx"
app_secret = 'xxxxxx'
# 下方为 [机器人安全设置 -> 加签]
secret = 'SECxxxxxx'
def getSign(timestamp, dsecret):
secret_enc = dsecret.encode('utf-8')
string_to_sign = '{}\n{}'.format(timestamp, dsecret)
string_to_sign_enc = string_to_sign.encode('utf-8')
hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
return sign
def checkPostSign(timestamp, sign):
app_sign = getSign(timestamp, app_secret)
current_timestamp = str(round(time.time() * 1000))
# 验签
if abs(int(current_timestamp) - int(timestamp)) < 3600000 and sign == urllib.parse.unquote_plus(app_sign):
return True
else:
return False
def getTg():
url = 'https://ovooa.com/API/tgrj/api.php'
res = requests.get(url)
return res.text
def sendMsg(msg, userId=''):
new_webhook = 'https://oapi.dingtalk.com/robot/send?access_token={}'.format(access_token)
dingtalk = DingtalkChatbot(new_webhook, secret=secret, pc_slide=True, fail_notice=False)
msgtype, msg = setSendMsg(msg)
if msgtype == 'img':
dingtalk.send_markdown(title='图来了', text='\n'.format(msg))
elif msgtype == 'text':
dingtalk.send_text(msg=msg)
else:
dingtalk.send_text(msg=msg, at_dingtalk_ids=[userId])
def setSendMsg(post_msg):
try:
if '舔狗' in post_msg:
msg = getTg()
return 'text_at', msg
else:
return 'text', "爬"
except:
return 'text_at', '获取失败'
def main_handler(event, context):
# api网关触发
if 'httpMethod' in event.keys():
if event['httpMethod'] == 'POST':
try:
headers = json.loads(json.dumps(event['headers']))
body = json.loads(event['body'])
sign = headers.get('sign').strip()
timestamp = headers.get('timestamp').strip()
if sign and timestamp and checkPostSign(timestamp, sign):
userId = body.get('senderId').strip()
msg = body.get('text').get('content').strip()
sendMsg(msg, userId)
return {
"isBase64Encoded": False,
"statusCode": 200,
"headers": {"Content-Type": "text/html"},
"body": "ok"
}
except:
return {
"isBase64Encoded": False,
"statusCode": 503,
"headers": {"Content-Type": "text/html"},
"body": "Error"
}
return {
"isBase64Encoded": False,
"statusCode": 200,
"headers": {"Content-Type": "text/html"},
"body": "<html><body><h1>Hello World</h1></body></html>"
}
然后打包上传至腾讯云函数,缺少库可以参考pip install requests -t dirname
到py文件同级目录下一起打包上传至腾讯云,然后会获得一个API网关访问地址,将其添加至机器人的消息接收地址即可
PHP部分
因为腾讯云函数开始收费了,所以打算找个php挂着来使用,也有现成的包dingtalk-robot,使用起来比较简单,大致代码如下
<?php
require __DIR__ . '/vendor/autoload.php';
use Zing\DingtalkRobot\Robot;
use Zing\DingtalkRobot\Messages\MarkdownMessage;
use Zing\DingtalkRobot\Messages\TextMessage;
function checkPostSign($current_timestamp, $http_timestamp, $http_sign, $app_secret)
{
if ($current_timestamp - $http_timestamp <= 3600000 and $http_sign === Robot::sign($http_timestamp, $app_secret)) {
return true;
} else {
return false;
}
}
list($t1, $t2) = explode(' ', microtime());
$current_timestamp = (float)sprintf('%.0f', (floatval($t1) + floatval($t2)) * 1000);
$http_sign = $_SERVER['HTTP_SIGN'];
$http_timestamp = $_SERVER['HTTP_TIMESTAMP'];
$access_token = "xxx";
$app_secret = 'xxx';
$secret = 'SECxxx';
if ($http_timestamp and $http_sign and checkPostSign($current_timestamp, $http_timestamp, $http_sign, $app_secret)) {
try {
$postdata = json_decode(file_get_contents("php://input"));
$senderId = $postdata->senderId;
$content = $postdata->text->content;
$robot = new Robot($access_token, $secret);
$robot->send("测试123");
} catch (\Zing\DingtalkRobot\Exceptions\CannotSendException $e) {
die();
} catch (\Zing\DingtalkRobot\Exceptions\InvalidArgumentException $e) {
die();
} catch (Exception $e) {
die();
}
} else {
echo "xx";
}
评论已关闭