我命由我,不由天!


  • 搜索
prometheus docker golang linux kubernetes

gRPC双向流

发表于 2021-01-11 | 分类于 golang | 0 | 阅读次数 842

双向流

服务端:User.proto

syntax="proto3";

package service;
option go_package = "gaodongfei.com/service";

import "google/api/annotations.proto";

message UserInfo{
    int32 user_id=1;
    int32 user_score=2;
}

message UserScoreRequest{
    repeated UserInfo users=1;
}

message UserScoreResponse{
    repeated UserInfo users=1;
}

service UserService{
    rpc GetUserScoreByTWS(stream UserScoreRequest)returns(stream UserScoreResponse){}
}

UserService.go

package service

import (
	"context"
	"io"
	"log"
	"time"
)

type UserService struct {


}


// 双向流
func (this *UserService) GetUserScoreByTWS(stream UserService_GetUserScoreByTWSServer) error{

	var score int32 = 101
	users := make([]*UserInfo,0)
	for {
		req,err := stream.Recv()
		if err == io.EOF{
			return nil
		}
		if err != nil{
			return err
		}
		for _,user := range req.Users{
			user.UserScore = score
			score ++
			users = append(users,user)
		}
		err = stream.Send(&UserScoreResponse{Users:users})
		if err != nil{
			log.Fatal(err)
		}
		users = (users)[0:0]
		time.Sleep(time.Second)
	}
}

客户端:main.go

package main

import (
	"context"
	"crypto/tls"
	"crypto/x509"
	"fmt"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials"
	"io"
	"io/ioutil"
	"log"
	."grpcclient/service"
)
func main(){
	//从证书相关文件中读取和解析信息,得到证书公钥、密钥对
	cert,_ := tls.LoadX509KeyPair("cert/client.pem","cert/client.key")
	// 创建一个新的、空的 CertPool
	certPool  := x509.NewCertPool()
	ca,_ := ioutil.ReadFile("cert/ca.pem")
	//尝试解析所传入的 PEM 编码的证书。如果解析成功会将其加到 CertPool 中,便于后面的使用
	certPool.AppendCertsFromPEM(ca)
	//构建基于 TLS 的 TransportCredentials 选项
	creds := credentials.NewTLS(&tls.Config{
		//设置证书链,允许包含一个或多个
		Certificates: []tls.Certificate{cert},
		//要求必须校验客户端的证书。可以根据实际情况选用以下参数
		ServerName: "localhost",
		RootCAs:certPool,
	})
	conn,err := grpc.Dial(":8081",grpc.WithTransportCredentials(creds))
	if err != nil{
		log.Fatal(err)
		return
	}
	defer conn.Close()

	// 双向流
	prodClient := NewUserServiceClient(conn)
	users := []*UserInfo{}
	stream,err :=prodClient.GetUserScoreByTWS(context.Background())
	if err != nil{
		log.Fatal(err)
	}
	
	for j:=0;j<3;j++{
		for i:=0;i<2;i++{
			users = append(users,&UserInfo{UserId:1})
		}
		err := stream.Send(&UserScoreRequest{Users:users})
		if err != nil{
			fmt.Println(err)
		}
		res,err :=stream.Recv()
		if err == io.EOF{
			break
		}
		if err != nil{
			log.Fatal(err)
			continue
		}
		fmt.Println(res.Users)
		users = (users)[0:0]
	}
}

  • 本文作者: Dante
  • 本文链接: https://gaodongfei.com/archives/grpc-client-server-stream
  • 版权声明: 本博客所有文章除特别声明外,均采用CC BY-NC-SA 3.0 许可协议。转载请注明出处!
# golang
gRPC 客户端流
gRPC 使用grpc-gateway 生成http服务
  • 文章目录
  • 站点概览
Dante

Dante

119 日志
5 分类
5 标签
RSS
Creative Commons
0%
© 2023 Dante
由 Halo 强力驱动
|
主题 - NexT.Pisces v5.1.4
沪ICP备2020033702号