-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
1. 增加部分常量 2. 增加加密的函数 3. 修改json包的位置,并增加相应功能 4. 增加map里面的功能 5. 增加对kafka的封装
- Loading branch information
zionjxyu
committed
May 27, 2022
1 parent
2ea4545
commit a0bc577
Showing
8 changed files
with
405 additions
and
26 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
package json | ||
|
||
import ( | ||
"bytes" | ||
"encoding/json" | ||
"fmt" | ||
|
||
"github.com/prometheus/common/log" | ||
) | ||
|
||
// String2Json 字符串转换为json | ||
func String2Json(j string, v interface{}) error { | ||
return json.Unmarshal([]byte(j), v) | ||
} | ||
|
||
// Map2JsonString map对象转化成json string | ||
func Map2JsonString(data map[string]interface{}) string { | ||
jsonData, err := json.Marshal(data) | ||
if err != nil { | ||
return "" | ||
} | ||
return string(jsonData) | ||
} | ||
|
||
// IsJSON 判断字符串是否是合法的JSON字符串 | ||
func IsJSON(str string) bool { | ||
var js json.RawMessage | ||
return json.Unmarshal([]byte(str), &js) == nil | ||
} | ||
|
||
// JSONToMap 将JSON字符串转化为map——value是任意类型 | ||
func JSONToMap(jsonStr string) (map[string]interface{}, error) { | ||
m := make(map[string]interface{}) | ||
err := json.Unmarshal([]byte(jsonStr), &m) | ||
if err != nil { | ||
log.Infof("JSON解析异常:%+v", err) | ||
return nil, err | ||
} | ||
return m, nil | ||
} | ||
|
||
// GetJSONKeyValue 解码未知结构的json并获取指定键值 | ||
func GetJSONKeyValue(unknownJSON interface{}, key string) (value interface{}) { | ||
unknownJSONMap, ok := unknownJSON.(map[string]interface{}) | ||
if ok { | ||
return unknownJSONMap[key] | ||
} | ||
return | ||
} | ||
|
||
//GetJSONKeyStringValue 获取JSON对象中指定key的字符串值 | ||
func GetJSONKeyStringValue(object interface{}, key string) string { | ||
val := GetJSONKeyValue(object, key) | ||
strVal := fmt.Sprintf("%v", val) | ||
return strVal | ||
} | ||
|
||
// DumpString 解析为JSON字符串 | ||
func DumpString(v interface{}) (str string) { | ||
bs, err := json.Marshal(v) | ||
b := bytes.Buffer{} | ||
if err != nil { | ||
b.WriteString("{err:\"JSON格式错误.") | ||
b.WriteString(err.Error()) | ||
b.WriteString("\"}") | ||
} else { | ||
b.Write(bs) | ||
} | ||
str = b.String() | ||
return str | ||
} | ||
|
||
// PrintUnknownJSON 解码未知结构的json | ||
func PrintUnknownJSON(unknownJSON interface{}) { | ||
unknownJSONMap, ok := unknownJSON.(map[string]interface{}) | ||
if ok { | ||
for k, v := range unknownJSONMap { | ||
switch kv := v.(type) { | ||
case string: | ||
fmt.Println(k, "is string", kv) | ||
case int: | ||
fmt.Println(k, "is int", kv) | ||
case bool: | ||
fmt.Println(k, "is bool", kv) | ||
case []interface{}: | ||
fmt.Println(k, "is an array:") | ||
for i, iv := range kv { | ||
fmt.Println(i, iv) | ||
} | ||
default: | ||
fmt.Println(k, "类型未知") | ||
} | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
package mq | ||
|
||
import ( | ||
"fmt" | ||
"time" | ||
|
||
"github.com/Shopify/sarama" | ||
cluster "github.com/bsm/sarama-cluster" | ||
"github.com/prometheus/common/log" | ||
gjson "github.com/shootz-developer/gtool/json" | ||
) | ||
|
||
// InitKafkaProducer Kafka生产者初始化 | ||
func InitKafkaProducer(kafkaBrokers []string) (kafkaProducer sarama.SyncProducer) { | ||
config := sarama.NewConfig() | ||
config.Producer.RequiredAcks = sarama.WaitForLocal | ||
config.Producer.Retry.Max = 5 | ||
config.Producer.Return.Successes = true | ||
var err error | ||
kafkaProducer, err = sarama.NewSyncProducer(kafkaBrokers, config) | ||
if err != nil { | ||
log.Errorf("Kafka生产者初始化失败:%+v \n", err) | ||
} | ||
log.Infof("Kafka生产者初始化成功") | ||
return | ||
} | ||
|
||
// ProduceKafkaMsg 生产Kafka消息 | ||
func ProduceKafkaMsg(kafkaProducer sarama.SyncProducer, kafkaTopic string, msg string) (retMsg string) { | ||
msgX := &sarama.ProducerMessage{ | ||
Topic: kafkaTopic, | ||
Value: sarama.StringEncoder(msg), | ||
} | ||
partition, offset, err := kafkaProducer.SendMessage(msgX) | ||
if err != nil { | ||
retMsg = fmt.Sprintf("消息发送(%+v)出错:%+v \n", gjson.DumpString(msgX), err) | ||
} else { | ||
retMsg = fmt.Sprintf("消息发送(%+v)成功并存储在topic(%s)/partition(%d)/offset(%d)\n", | ||
gjson.DumpString(msgX), kafkaTopic, partition, offset) | ||
} | ||
return | ||
} | ||
|
||
// InitKafkaConsumer Kafka消费者初始化 | ||
func InitKafkaConsumer( | ||
kafkaBrokers []string, | ||
kafkaTopic string, | ||
groupID string) ( | ||
kafkaConsumer *cluster.Consumer) { | ||
var err error | ||
config := cluster.NewConfig() | ||
config.Consumer.Return.Errors = true | ||
config.Group.Return.Notifications = true | ||
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange | ||
config.Consumer.Offsets.Initial = -2 | ||
config.Consumer.Offsets.CommitInterval = 1 * time.Second | ||
config.Group.Return.Notifications = true | ||
kafkaConsumer, err = cluster.NewConsumer(kafkaBrokers, groupID, []string{kafkaTopic}, config) | ||
if err != nil { | ||
log.Errorf(err.Error()) | ||
} | ||
if kafkaConsumer == nil { | ||
log.Errorf(fmt.Sprintf("Kafka消费者为空 {brokers:%v, topic:%v, group:%v}", kafkaBrokers, kafkaTopic, groupID)) | ||
} else { | ||
log.Infof("Kafka消费者初始化成功 {consumer:%v, topic:%v}", kafkaConsumer, kafkaTopic) | ||
} | ||
return | ||
} | ||
|
||
// KeepKafkaConsuming 保持消费 | ||
func KeepKafkaConsuming( | ||
kafkaConsumer *cluster.Consumer, | ||
handleKafkaConsumerMsg func(*cluster.Consumer, *sarama.ConsumerMessage)) { | ||
for { | ||
log.Infof("循环读取kafka") | ||
select { | ||
case msg, ok := <-kafkaConsumer.Messages(): | ||
if ok { | ||
handleKafkaConsumerMsg(kafkaConsumer, msg) | ||
} else { | ||
log.Errorf("Kafka监听服务失败") | ||
} | ||
case err, ok := <-kafkaConsumer.Errors(): | ||
if ok { | ||
log.Errorf("Kafka消费者报错: %+v", err) | ||
} | ||
case ntf, ok := <-kafkaConsumer.Notifications(): | ||
if ok { | ||
log.Infof("Kafka消费者提醒: %+v", ntf) | ||
} | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
package strings | ||
|
||
import ( | ||
"crypto/md5" | ||
"crypto/rand" | ||
"crypto/rsa" | ||
"crypto/x509" | ||
"encoding/hex" | ||
) | ||
|
||
// GenerateRSAKeyX509 生成rsa公私钥,然后用x509编码 | ||
func GenerateRSAKeyX509() (privateKey, publicKey string, err error) { | ||
privKey, err := rsa.GenerateKey(rand.Reader, 1024) | ||
if err != nil { | ||
return | ||
} | ||
privateKey = string(x509.MarshalPKCS1PrivateKey(privKey)) | ||
pubKey := privKey.PublicKey | ||
publicKeyBytes, err := x509.MarshalPKIXPublicKey(&pubKey) | ||
if err != nil { | ||
return | ||
} | ||
publicKey = string(publicKeyBytes) | ||
return | ||
} | ||
|
||
// RSAEncrypt RSA加密 publicKeyX509为x509编码后rsa公钥 | ||
func RSAEncrypt(plainText, publicKeyX509 string) (cypherText string) { | ||
//X509解码 | ||
pubKey, err := x509.ParsePKIXPublicKey([]byte(publicKeyX509)) | ||
if err != nil { | ||
return "" | ||
} | ||
publicKey, ok := pubKey.(*rsa.PublicKey) | ||
if !ok { | ||
return "" | ||
} | ||
cyphText, err := rsa.EncryptPKCS1v15(rand.Reader, publicKey, []byte(plainText)) | ||
if err != nil { | ||
return "" | ||
} | ||
cypherText = string(cyphText) | ||
return | ||
} | ||
|
||
// RSADecrypt RSA解密 privateKeyX509为x509编码后rsa私钥 | ||
func RSADecrypt(cypherText, privateKeyX509 string) (plainText string) { | ||
//X509解码 | ||
privateKey, err := x509.ParsePKCS1PrivateKey([]byte(privateKeyX509)) | ||
if err != nil { | ||
return "" | ||
} | ||
//对密文进行解密 | ||
plaText, err := rsa.DecryptPKCS1v15(rand.Reader, privateKey, []byte(cypherText)) | ||
plainText = string(plaText) | ||
//返回明文 | ||
return | ||
} | ||
|
||
// MD5Encrypt md5加密 | ||
func MD5Encrypt(plainText string) (cypherText string) { | ||
md5Ctx := md5.New() | ||
md5Ctx.Write([]byte(plainText)) | ||
data := md5Ctx.Sum(nil) | ||
cypherText = hex.EncodeToString(data) | ||
return | ||
} |
Oops, something went wrong.