gRPC服务端流

服务端流

服务端:User.proto

syntax="proto3";

package service;
option go_package = "gaodongfei.com/service";

message UserInfo{
    int32 user_id=1;
    int32 user_score=2;
}

message UserScoreRequest{
    repeated UserInfo users=1;
}

message UserScoreResponse{
    repeated UserInfo users=1;
}

service UserService{
    rpc GetUserScoreByServerStream(UserScoreRequest)returns(stream UserScoreResponse){}
}

UserService.go

package service

import (
	"context"
	"io"
	"log"
	"time"
)

type UserService struct {


}

func (this *UserService) mustEmbedUnimplementedProdServiceServer(){

}

func (this *UserService) GetUserScoreByServerStream(request *UserScoreRequest,stream UserService_GetUserScoreByServerStreamServer) error{
	var score int32 = 101
	users := make([]*UserInfo,0)
	for index,user := range request.Users{
		user.UserScore = score
		score ++
		users = append(users,user)
		if (index+1) % 2 == 0{
			err := stream.Send(&UserScoreResponse{Users:users})
			if err != nil{
				return err
			}
			users = (users)[0:0]
		}
		time.Sleep(time.Second)
	}
	if len(users) > 0{
		err := stream.Send(&UserScoreResponse{Users:users})
		if err != nil{
			return err
		}
	}
	return nil
}

客户端: main.go

package main

import (
	"context"
	"crypto/tls"
	"crypto/x509"
	"fmt"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials"
	. "grpcclient/service"
	"io"
	"io/ioutil"
	"log"
)


func main(){
	//从证书相关文件中读取和解析信息,得到证书公钥、密钥对
	cert,_ := tls.LoadX509KeyPair("cert/client.pem","cert/client.key")
	// 创建一个新的、空的 CertPool
	certPool  := x509.NewCertPool()
	ca,_ := ioutil.ReadFile("cert/ca.pem")
	//尝试解析所传入的 PEM 编码的证书。如果解析成功会将其加到 CertPool 中,便于后面的使用
	certPool.AppendCertsFromPEM(ca)
	//构建基于 TLS 的 TransportCredentials 选项
	creds := credentials.NewTLS(&tls.Config{
		//设置证书链,允许包含一个或多个
		Certificates: []tls.Certificate{cert},
		//要求必须校验客户端的证书。可以根据实际情况选用以下参数
		ServerName: "localhost",
		RootCAs:certPool,
	})
	conn,err := grpc.Dial(":8081",grpc.WithTransportCredentials(creds))
	if err != nil{
		log.Fatal(err)
		return
	}
	defer conn.Close()
	// 服务端流
	prodClient := NewUserServiceClient(conn)
	users := []*UserInfo{}
	users = append(users,&UserInfo{UserId:1})
	users = append(users,&UserInfo{UserId:1})
	users = append(users,&UserInfo{UserId:1})
	users = append(users,&UserInfo{UserId:1})
	users = append(users,&UserInfo{UserId:1})
	stream,err :=prodClient.GetUserScoreByServerStream(context.Background(),&UserScoreRequest{Users:users})
	if err != nil{
		log.Fatal(err)
	}

	for {
		res,err :=stream.Recv()
		if err == io.EOF{
			break
		}
		if err != nil{
			log.Fatal(err)
			continue
		}
		fmt.Println(res.Users)
	}
}
# golang   gRPC  

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×