服务端流
服务端:User.proto
syntax="proto3";
package service;
option go_package = "gaodongfei.com/service";
message UserInfo{
int32 user_id=1;
int32 user_score=2;
}
message UserScoreRequest{
repeated UserInfo users=1;
}
message UserScoreResponse{
repeated UserInfo users=1;
}
service UserService{
rpc GetUserScoreByServerStream(UserScoreRequest)returns(stream UserScoreResponse){}
}
UserService.go
package service
import (
"context"
"io"
"log"
"time"
)
type UserService struct {
}
func (this *UserService) mustEmbedUnimplementedProdServiceServer(){
}
func (this *UserService) GetUserScoreByServerStream(request *UserScoreRequest,stream UserService_GetUserScoreByServerStreamServer) error{
var score int32 = 101
users := make([]*UserInfo,0)
for index,user := range request.Users{
user.UserScore = score
score ++
users = append(users,user)
if (index+1) % 2 == 0{
err := stream.Send(&UserScoreResponse{Users:users})
if err != nil{
return err
}
users = (users)[0:0]
}
time.Sleep(time.Second)
}
if len(users) > 0{
err := stream.Send(&UserScoreResponse{Users:users})
if err != nil{
return err
}
}
return nil
}
客户端: main.go
package main
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
. "grpcclient/service"
"io"
"io/ioutil"
"log"
)
func main(){
//从证书相关文件中读取和解析信息,得到证书公钥、密钥对
cert,_ := tls.LoadX509KeyPair("cert/client.pem","cert/client.key")
// 创建一个新的、空的 CertPool
certPool := x509.NewCertPool()
ca,_ := ioutil.ReadFile("cert/ca.pem")
//尝试解析所传入的 PEM 编码的证书。如果解析成功会将其加到 CertPool 中,便于后面的使用
certPool.AppendCertsFromPEM(ca)
//构建基于 TLS 的 TransportCredentials 选项
creds := credentials.NewTLS(&tls.Config{
//设置证书链,允许包含一个或多个
Certificates: []tls.Certificate{cert},
//要求必须校验客户端的证书。可以根据实际情况选用以下参数
ServerName: "localhost",
RootCAs:certPool,
})
conn,err := grpc.Dial(":8081",grpc.WithTransportCredentials(creds))
if err != nil{
log.Fatal(err)
return
}
defer conn.Close()
// 服务端流
prodClient := NewUserServiceClient(conn)
users := []*UserInfo{}
users = append(users,&UserInfo{UserId:1})
users = append(users,&UserInfo{UserId:1})
users = append(users,&UserInfo{UserId:1})
users = append(users,&UserInfo{UserId:1})
users = append(users,&UserInfo{UserId:1})
stream,err :=prodClient.GetUserScoreByServerStream(context.Background(),&UserScoreRequest{Users:users})
if err != nil{
log.Fatal(err)
}
for {
res,err :=stream.Recv()
if err == io.EOF{
break
}
if err != nil{
log.Fatal(err)
continue
}
fmt.Println(res.Users)
}
}