双向流
服务端:User.proto
syntax="proto3";
package service;
option go_package = "gaodongfei.com/service";
import "google/api/annotations.proto";
message UserInfo{
int32 user_id=1;
int32 user_score=2;
}
message UserScoreRequest{
repeated UserInfo users=1;
}
message UserScoreResponse{
repeated UserInfo users=1;
}
service UserService{
rpc GetUserScoreByTWS(stream UserScoreRequest)returns(stream UserScoreResponse){}
}
UserService.go
package service
import (
"context"
"io"
"log"
"time"
)
type UserService struct {
}
// 双向流
func (this *UserService) GetUserScoreByTWS(stream UserService_GetUserScoreByTWSServer) error{
var score int32 = 101
users := make([]*UserInfo,0)
for {
req,err := stream.Recv()
if err == io.EOF{
return nil
}
if err != nil{
return err
}
for _,user := range req.Users{
user.UserScore = score
score ++
users = append(users,user)
}
err = stream.Send(&UserScoreResponse{Users:users})
if err != nil{
log.Fatal(err)
}
users = (users)[0:0]
time.Sleep(time.Second)
}
}
客户端:main.go
package main
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"io"
"io/ioutil"
"log"
."grpcclient/service"
)
func main(){
//从证书相关文件中读取和解析信息,得到证书公钥、密钥对
cert,_ := tls.LoadX509KeyPair("cert/client.pem","cert/client.key")
// 创建一个新的、空的 CertPool
certPool := x509.NewCertPool()
ca,_ := ioutil.ReadFile("cert/ca.pem")
//尝试解析所传入的 PEM 编码的证书。如果解析成功会将其加到 CertPool 中,便于后面的使用
certPool.AppendCertsFromPEM(ca)
//构建基于 TLS 的 TransportCredentials 选项
creds := credentials.NewTLS(&tls.Config{
//设置证书链,允许包含一个或多个
Certificates: []tls.Certificate{cert},
//要求必须校验客户端的证书。可以根据实际情况选用以下参数
ServerName: "localhost",
RootCAs:certPool,
})
conn,err := grpc.Dial(":8081",grpc.WithTransportCredentials(creds))
if err != nil{
log.Fatal(err)
return
}
defer conn.Close()
// 双向流
prodClient := NewUserServiceClient(conn)
users := []*UserInfo{}
stream,err :=prodClient.GetUserScoreByTWS(context.Background())
if err != nil{
log.Fatal(err)
}
for j:=0;j<3;j++{
for i:=0;i<2;i++{
users = append(users,&UserInfo{UserId:1})
}
err := stream.Send(&UserScoreRequest{Users:users})
if err != nil{
fmt.Println(err)
}
res,err :=stream.Recv()
if err == io.EOF{
break
}
if err != nil{
log.Fatal(err)
continue
}
fmt.Println(res.Users)
users = (users)[0:0]
}
}