gRPC和Protobuf

gRPC 是什么?

参考 http://doc.oschina.net/grpc?t=58008
在 gRPC 里客户端应用可以像调用本地对象一样直接调用另一台不同的机器上服务端应用的方法,使得您能够更容易地创建分布式应用和服务。与许多 RPC 系统类似,gRPC 也是基于以下理念:定义一个服务,指定其能够被远程调用的方法(包含参数和返回类型)。在服务端实现这个接口,并运行一个 gRPC 服务器来处理客户端调用。在客户端拥有一个存根能够像服务端一样的方法。

gRPC 客户端和服务端可以在多种环境中运行和交互 - 从 google 内部的服务器到你自己的笔记本,并且可以用任何 gRPC 支持的语言来编写。所以,你可以很容易地用 Java 创建一个 gRPC 服务端,用 Go、Python、Ruby 来创建客户端。

使用 protocol buffers

gRPC 默认使用 protocol buffers,这是 Google 开源的一套成熟的结构数据序列化机制(当然也可以使用其他数据格式如 JSON)。正如你将在下方例子里所看到的,你用 proto files 创建 gRPC 服务,用 protocol buffers 消息类型来定义方法参数和返回类型。

protobuf 定义服务,JAVA 实现

protobuf 的详细用法https://blog.51cto.com/9291927/2331980

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
syntax = "proto3";

option java_package = "io.grpc.examples";

package helloworld;

// The greeter service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
string name = 1;
}

// The response message containing the greetings
message HelloReply {
string message = 1;
}

编译 protobuf 生成服务端接口和客户端存根

一旦定义好服务,我们可以使用 protocol buffer 编译器 protoc 来生成创建应用所需的特定客户端和服务端的代码 - 你可以生成任意 gRPC 支持的语言的代码,当然 PHP 和 Objective-C 仅支持创建客户端代码。生成的代码同时包括客户端的存根和服务端要实现的抽象接口,均包含 Greeter 所定义的方法。
以下类包含所有我们需要创建这个例子所有的代码:

  • HelloRequest.java, HelloResponse.java 和其他文件包含所有 protocol buffer 用来填充、序列化和提取 HelloRequest 和 HelloReply 消息类型的代码。
  • GreeterGrpc.java, 包含 (还有其他有用的代码):
    Greeter 服务端需要实现的接口
1
2
3
4
public static interface Greeter {
public void sayHello(Helloworld.HelloRequest request,
StreamObserver<HelloReply> responseObserver);
}

客户端用来与 Greeter 服务端进行对话的 存根 类。就像你所看到的,异步存根也实现了 Greeter 接口。

1
2
3
4
public static class GreeterStub extends AbstractStub<GreeterStub>
implements Greeter {
...
}

写一个服务器

现在让我们写点代码!首先我们将创建一个服务应用来实现服务

服务实现

GreeterImpl.java 准确地实现了 Greeter 服务所需要的行为。
正如你所见,GreeterImpl 类通过实现 sayHello 方法,实现了从 IDL 生成的 GreeterGrpc.Greeter 接口 。

1
2
3
4
5
6
7
8
public  static GreeterImpl impliment Greeter {
@Override
public void sayHello(HelloRequest req, StreamObserver<HelloReply> responseObserver) {
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
}

sayHello 有两个参数:

  • HelloRequest,请求。
  • StreamObserver: 应答观察者,一个特殊的接口,服务器用应答来调用它。

为了返回给客户端应答并且完成调用:

  • 用我们的激动人心的消息构建并填充一个在我们接口定义的 HelloReply 应答对象。
  • 将 HelloReply 返回给客户端,然后表明我们已经完成了对 RPC 的处理。

服务端实现

需要提供一个 gRPC 服务的另一个主要功能是让这个服务实在在网络上可用。
HelloWorldServer.java 提供了以下代码作为 Java 的例子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/* The port on which the server should run */
private int port = 50051;
private Server server;
private void start() throws Exception {
server = ServerBuilder.forPort(port)
.addService(GreeterGrpc.bindService(new GreeterImpl()))
.build()
.start();
logger.info("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
// Use stderr here since the logger may has been reset by its JVM shutdown hook.
System.err.println("*** shutting down gRPC server since JVM is shutting down");
HelloWorldServer.this.stop();
System.err.println("*** server shut down");
}
});
}

客户端实现

客户端的 gRPC 非常简单。在这一步,我们将用生成的代码写一个简单的客户程序来访问我们在上一节里创建的 Greeter 服务器。
首先我们看一下我们如何连接 Greeter 服务器。我们需要创建一个 gRPC 频道,指定我们要连接的主机名和服务器端口。然后我们用这个频道创建存根实例。

1
2
3
4
5
6
7
8
9
10
private final ManagedChannel channel;
private final GreeterGrpc.GreeterBlockingStub blockingStub;
public HelloWorldClient(String host, int port) {
channel = ManagedChannelBuilder.forAddress(host, port)
.usePlaintext(true)
.build();
blockingStub = GreeterGrpc.newBlockingStub(channel);
HelloRequest req = HelloRequest.newBuilder().setName(name).build();
HelloReply reply = blockingStub.sayHello(req);
}

在这个例子里,我们创建了一个阻塞的存根。这意味着 RPC 调用要等待服务器应答,将会返回一个应答或抛出一个异常。 gRPC Java 还可以有其他种类的存根,可以向服务器发出非阻塞的调用,这种情况下应答是异步返回的。

  • 我们创建并填充一个 HelloRequest 发送给服务。
  • 我们用请求调用存根的 SayHello(),如果 RPC 成功,会得到一个填充的 HelloReply ,从其中我们可以获得 greeting。

gRPC 进阶

在 protobuf 中定义服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
syntax = "proto3";

option java_multiple_files = true;
option java_package = "io.grpc.examples.routeguide"; //指定java文件中的package包名
option java_outer_classname = "RouteGuideProto";
option objc_class_prefix = "RTG";

package routeguide;

// Interface exported by the server.
service RouteGuide {//service用于指定服务,定义rpc方法,指定请求和相应类型
//一个 简单 RPC , 客户端使用存根发送请求到服务器并等待响应返回,就像平常的函数调用一样。
rpc GetFeature(Point) returns (Feature) {}

// 一个 服务器端流式 RPC , 客户端发送请求到服务器,拿到一个流去读取返回的消息序列。 客户端读取返回的流,直到里面没有任何消息。从例子中可以看出,通过在 响应 类型前插入 stream 关键字,可以指定一个服务器端的流方法。
rpc ListFeatures(Rectangle) returns (stream Feature) {}

// 一个 客户端流式 RPC , 客户端写入一个消息序列并将其发送到服务器,同样也是使用流。一旦 客户端完成写入消息,它等待服务器完成读取返回它的响应。通过在 请求 类型前指定 stream 关键字来指定一个客户端的流方法。
rpc RecordRoute(stream Point) returns (RouteSummary) {}

// 一个 双向流式 RPC 是双方使用读写流去发送一个消息序列。两个流独立操作,因此客户端和服务器 可以以任意喜欢的顺序读写:比如, 服务器可以在写入响应前等待接收所有的客户端消息,或者可以交替 的读取和写入消息,或者其他读写的组合。 每个流中的消息顺序被预留。你可以通过在请求和响应前加 stream 关键字去制定方法的类型。
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
}
我们的 .proto 文件也包含了所有请求的 protocol buffer 消息类型定义以及在服务方法中使用的响
应类型——比如,下面的Point消息类型:
// Points are represented as latitude-longitude pairs in the E7 representation
// (degrees multiplied by 10**7 and rounded to the nearest integer).
// Latitudes should be in the range +/- 90 degrees and longitude should be in
// the range +/- 180 degrees (inclusive).
message Point {
int32 latitude = 1;
int32 longitude = 2;
}

// A latitude-longitude rectangle, represented as two diagonally opposite
// points "lo" and "hi".
message Rectangle {
// One corner of the rectangle.
Point lo = 1;

// The other corner of the rectangle.
Point hi = 2;
}

// A feature names something at a given point.
//
// If a feature could not be named, the name is empty.
message Feature {
// The name of the feature.
string name = 1;

// The point where the feature is detected.
Point location = 2;
}

// Not used in the RPC. Instead, this is here for the form serialized to disk.
message FeatureDatabase {
repeated Feature feature = 1;
}

// A RouteNote is a message sent while at a given point.
message RouteNote {
// The location from which the message is sent.
Point location = 1;

// The message to be sent.
string message = 2;
}

// A RouteSummary is received in response to a RecordRoute rpc.
//
// It contains the number of individual points received, the number of
// detected features, and the total distance covered as the cumulative sum of
// the distance between each point.
message RouteSummary {
// The number of points received.
int32 point_count = 1;

// The number of known features passed while traversing the route.
int32 feature_count = 2;

// The distance covered in metres.
int32 distance = 3;

// The duration of the traversal in seconds.
int32 elapsed_time = 4;
}

生成客户端和服务端代码

接下来我们需要从 .proto 的服务定义中生成 gRPC 客户端和服务器端的接口。我们通过 protocol buffer 的编译器 protoc 以及一个特殊的 gRPC Java 插件来完成。为了生成 gRPC 服务,你必须使用 proto3 编译器。
下面的类都是从我们的服务定义中生成:

  • 包含了所有填充,序列化以及获取请求和应答的消息类型的 Feature.java,Point.java, Rectangle.java 以及其它类文件。
  • RouteGuideGrpc.java 文件包含(以及其它一些有用的代码):
    • RouteGuide 服务器要实现的一个接口 RouteGuideGrpc.RouteGuide,其中所有的方法都定 义在 RouteGuide 服务中。
    • 客户端可以用来和 RouteGuide 服务器交互的 存根 类。 异步的存根也实现了 RouteGuide 接口。

创建服务器

让 RouteGuide 服务工作有两个部分:

  • 实现我们服务定义的生成的服务接口:做我们的服务的实际的“工作”。
  • 运行一个 gRPC 服务器,监听来自客户端的请求并返回服务的响应。

实现 RouteGuide

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
public class RouteGuideServer {
private static final Logger logger = Logger.getLogger(RouteGuideServer.class.getName());

private final int port;
private final Server server;

public RouteGuideServer(int port) throws IOException {
this(port, RouteGuideUtil.getDefaultFeaturesFile());
}

/** Create a RouteGuide server listening on {@code port} using {@code featureFile} database. */
public RouteGuideServer(int port, URL featureFile) throws IOException {
this(ServerBuilder.forPort(port), port, RouteGuideUtil.parseFeatures(featureFile));
}

/** Create a RouteGuide server using serverBuilder as a base and features as data. */
public RouteGuideServer(ServerBuilder<?> serverBuilder, int port, Collection<Feature> features) {
this.port = port;
server = serverBuilder.addService(new RouteGuideService(features))
.build();
}

/** Start serving requests. */
public void start() throws IOException {
server.start();
logger.info("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("*** shutting down gRPC server since JVM is shutting down");
try {
RouteGuideServer.this.stop();
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
System.err.println("*** server shut down");
}
});
}

/** Stop serving requests and shutdown resources. */
public void stop() throws InterruptedException {
if (server != null) {
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
}
}

/**
* Await termination on the main thread since the grpc library uses daemon threads.
*/
private void blockUntilShutdown() throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
}

/**
* Main method. This comment makes the linter happy.
*/
public static void main(String[] args) throws Exception {
RouteGuideServer server = new RouteGuideServer(8980);
server.start();
server.blockUntilShutdown();
}

/**
* 我们的服务器有一个实现了生成的 RouteGuideGrpc.Service 接口的 RouteGuideService 类:
*
* <p>See route_guide.proto for details of the methods.
*/
private static class RouteGuideService extends RouteGuideGrpc.RouteGuideImplBase {
private final Collection<Feature> features;
private final ConcurrentMap<Point, List<RouteNote>> routeNotes =
new ConcurrentHashMap<Point, List<RouteNote>>();

RouteGuideService(Collection<Feature> features) {
this.features = features;
}

/**
* 简单 RPC
* 1. 如在我们的服务定义中指定的那样,我们组织并填充一个 Feature 应答对象返回给客户端。在这个 例子中,我们通过一个单独的私有方法checkFeature()来实现。
* 2. 我们使用应答观察者的 onNext() 方法返回 Feature。
* 3. 我们使用应答观察者的 onCompleted() 方法来指出我们已经完成了和 RPC的交互。
* @param request the requested location for the feature. 请求
* @param responseObserver the observer that will receive the feature at the requested point.一个应答的观察者,实际上是服务器调用它应答的一个特殊接口。
*/
@Override
public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
responseObserver.onNext(checkFeature(request));
responseObserver.onCompleted();
}

/**
* 服务器端流式 RPC
* 这次我们得到了需要返回给客户端的足够多的 Feature 对象(在这个场景下,我们根据他们是否在我们的 Rectangle 请求中,从服务的特性集合中选择他们),并且使用 onNext() 方法轮流往响应观察* 者写入。最后,和简单 RPC 的例子一样,我们使用响应观察者的 onCompleted() 方法去告诉 gRPC 写入应答已完成。
* @param request the bounding rectangle for the requested features.
* @param responseObserver the observer that will receive the features.
*/
@Override
public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
int left = min(request.getLo().getLongitude(), request.getHi().getLongitude());
int right = max(request.getLo().getLongitude(), request.getHi().getLongitude());
int top = max(request.getLo().getLatitude(), request.getHi().getLatitude());
int bottom = min(request.getLo().getLatitude(), request.getHi().getLatitude());

for (Feature feature : features) {
if (!RouteGuideUtil.exists(feature)) {
continue;
}

int lat = feature.getLocation().getLatitude();
int lon = feature.getLocation().getLongitude();
if (lon >= left && lon <= right && lat >= bottom && lat <= top) {
responseObserver.onNext(feature);
}
}
responseObserver.onCompleted();
}

/**
* 客户端流式 RPC
* 如你所见,我们的方法和前面的方法类型相似,拿到一个 StreamObserver 应答观察者参数,但是这次它返回一个 StreamObserver 以便客户端写入它的 Point。
* 在这个方法体中,我们返回了一个匿名 StreamObserver 实例,其中我们:
* 覆写了 onNext() 方法,每次客户端写入一个 Point 到消息流时,拿到特性和其它信息。
* 覆写了 onCompleted() 方法(在 客户端 结束写入消息时调用),用来填充和构建我们的 RouteSummary。然后我们用 RouteSummary 调用方法自己的的响应观察者的 onNext(),之后调用它的 onCompleted() 方法,结束服务器端的调用。
*
* @param responseObserver an observer to receive the response summary.
* @return an observer to receive the requested route points.
*/
@Override
public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
return new StreamObserver<Point>() {
int pointCount;
int featureCount;
int distance;
Point previous;
final long startTime = System.nanoTime();

@Override
public void onNext(Point point) {
pointCount++;
if (RouteGuideUtil.exists(checkFeature(point))) {
featureCount++;
}
// For each point after the first, add the incremental distance from the previous point to
// the total distance value.
if (previous != null) {
distance += calcDistance(previous, point);
}
previous = point;
}

@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "recordRoute cancelled");
}

@Override
public void onCompleted() {
long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);
responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount)
.setFeatureCount(featureCount).setDistance(distance)
.setElapsedTime((int) seconds).build());
responseObserver.onCompleted();
}
};
}

/**
* 双向流式RPC(与客户端流的差别主要是在responseObserver.onNext()方法的调用位置,这个调用位置位于onNext()方法中。而客户端流是在onComplete()方法中,意味着需要把客户端流处理完成后,才能调用onNext方法处理并立即结束。)
* 和我们的客户端流的例子一样,我们拿到和返回一个 StreamObserver 应答观察者,除了这次我们在客户端仍然写入消息到 它们的 消息流时通过我们方法的应答观察者返回值。这里读写的语法和客户端流以及服务器流方法一样。虽然每一端都会按照它们写入的顺序拿到另一端的消息,客户端和服务器都可以任意顺序读写——流的操作是互不依赖的。
*
* @param responseObserver an observer to receive the stream of previous messages.
* @return an observer to handle requested message/location pairs.
*/
@Override
public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
return new StreamObserver<RouteNote>() {
@Override
public void onNext(RouteNote note) {
List<RouteNote> notes = getOrCreateNotes(note.getLocation());

// Respond with all previous notes at this location.
for (RouteNote prevNote : notes.toArray(new RouteNote[0])) {
responseObserver.onNext(prevNote);
}

// Now add the new note to the list
notes.add(note);
}

@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "routeChat cancelled");
}

@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}

/**
* Get the notes list for the given location. If missing, create it.
*/
private List<RouteNote> getOrCreateNotes(Point location) {
List<RouteNote> notes = Collections.synchronizedList(new ArrayList<RouteNote>());
List<RouteNote> prevNotes = routeNotes.putIfAbsent(location, notes);
return prevNotes != null ? prevNotes : notes;
}

/**
* Gets the feature at the given point.
*
* @param location the location to check.
* @return The feature object at the point. Note that an empty name indicates no feature.
*/
private Feature checkFeature(Point location) {
for (Feature feature : features) {
if (feature.getLocation().getLatitude() == location.getLatitude()
&& feature.getLocation().getLongitude() == location.getLongitude()) {
return feature;
}
}

// No feature was found, return an unnamed feature.
return Feature.newBuilder().setName("").setLocation(location).build();
}

/**
* Calculate the distance between two points using the "haversine" formula.
* The formula is based on http://mathforum.org/library/drmath/view/51879.html.
*
* @param start The starting point
* @param end The end point
* @return The distance between the points in meters
*/
private static int calcDistance(Point start, Point end) {
int r = 6371000; // earth radius in meters
double lat1 = toRadians(RouteGuideUtil.getLatitude(start));
double lat2 = toRadians(RouteGuideUtil.getLatitude(end));
double lon1 = toRadians(RouteGuideUtil.getLongitude(start));
double lon2 = toRadians(RouteGuideUtil.getLongitude(end));
double deltaLat = lat2 - lat1;
double deltaLon = lon2 - lon1;

double a = sin(deltaLat / 2) * sin(deltaLat / 2)
+ cos(lat1) * cos(lat2) * sin(deltaLon / 2) * sin(deltaLon / 2);
double c = 2 * atan2(sqrt(a), sqrt(1 - a));

return (int) (r * c);
}
}
}

启动服务器

1
2
3
4
5
6
7
public void start() {
gRpcServer = NettyServerBuilder.forPort(port)
.addService(RouteGuideGrpc.bindService(new RouteGuideService(features)))
.build().start();
logger.info("Server started, listening on " + port);
...
}

为了做到这个,我们需要:

  • 创建我们服务实现类 RouteGuideService 的一个实例并且将其传给生成的 RouteGuideGrpc 类的静态方法 bindService() 去获得服务定义。
  • 使用生成器的 forPort() 方法指定地址以及期望客户端请求监听的端口。
  • 通过传入将 bindService() 返回的服务定义,用生成器注册我们的服务实现到生成器的 addService() 方法。
  • 调用生成器上的 build() 和 start() 方法为我们的服务创建和启动一个 RPC 服务器。

创建客户端

创建存根

为了调用服务方法,我们需要首先创建一个 存根,或者两个存根:

  • 一个 阻塞/同步 存根:这意味着 RPC 调用等待服务器响应,并且要么返回应答,要么造成异常。
  • 一个 非阻塞/异步 存根可以向服务器发起非阻塞调用,应答会异步返回。你可以使用异步存根去发起特定类型的流式调用。

我们首先为存根创建一个 gRPC channel,指明服务器地址和我们想连接的端口号:

1
2
3
channel = NettyChannelBuilder.forAddress(host, port)
.negotiationType(NegotiationType.PLAINTEXT)
.build();

如你所见,我们用一个 NettyServerBuilder 构建和启动服务器。这个服务器的生成器基于 Netty 传输框架。
我们使用 Netty 传输框架,所以我们用一个 NettyServerBuilder 启动服务器。
现在我们可以通过从 .proto 中生成的 RouteGuideGrpc 类的 newStub 和 newBlockingStub 方法,使用频道去创建我们的存根。

1
2
blockingStub = RouteGuideGrpc.newBlockingStub(channel);
asyncStub = RouteGuideGrpc.newStub(channel);

调用服务方法

简单 RPC

在阻塞存根上调用简单 RPC GetFeature 几乎是和调用一个本地方法一样直观。

1
2
Point request = Point.newBuilder().setLatitude(lat).setLongitude(lon).build();
Feature feature = blockingStub.getFeature(request);

我们创建和填充了一个请求 protocol buffer 对象(在这个场景下是 Point),在我们的阻塞存根上将其传给 getFeature() 方法,拿回一个 Feature。

服务器端流式 RPC

接下来,让我们看一个对于 ListFeatures 的服务器端流式调用,这个调用会返回一个地理性的 Feature 流:

1
2
3
4
5
Rectangle request =
Rectangle.newBuilder()
.setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build())
.setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build();
Iterator<Feature> features = blockingStub.listFeatures(request);

如你所见,这和我们刚看过的简单 RPC 很相似,除了方法返回客户端用来读取所有返回的 Feature 的 一个 Iterator,而不是单个的 Feature。

客户端流式 RPC

现在看看稍微复杂点的东西:我们在客户端流方法 RecordRoute 中发送了一个 Point 流给服务器并且拿到一个 RouteSummary。为了这个方法,我们需要使用异步存根。如果你已经阅读了
创建服务器,一些部分看起来很相近——异步流式 RPC 是在两端通过相似的方式实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public void recordRoute(List<Feature> features, int numPoints) throws Exception {
info("*** RecordRoute");
final SettableFuture<Void> finishFuture = SettableFuture.create();
StreamObserver<RouteSummary> responseObserver = new StreamObserver<RouteSummary>() {
@Override
public void onNext(RouteSummary summary) {
info("Finished trip with {0} points. Passed {1} features. "
+ "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(),
summary.getFeatureCount(), summary.getDistance(), summary.getElapsedTime());
}

@Override
public void onError(Throwable t) {
finishFuture.setException(t);
}

@Override
public void onCompleted() {
finishFuture.set(null);
}
};

StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
try {
// Send numPoints points randomly selected from the features list.
StringBuilder numMsg = new StringBuilder();
Random rand = new Random();
for (int i = 0; i < numPoints; ++i) {
int index = rand.nextInt(features.size());
Point point = features.get(index).getLocation();
info("Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point),
RouteGuideUtil.getLongitude(point));
requestObserver.onNext(point);
// Sleep for a bit before sending the next one.
Thread.sleep(rand.nextInt(1000) + 500);
if (finishFuture.isDone()) {
break;
}
}
info(numMsg.toString());
requestObserver.onCompleted();

finishFuture.get();
info("Finished RecordRoute");
} catch (Exception e) {
requestObserver.onError(e);
logger.log(Level.WARNING, "RecordRoute Failed", e);
throw e;
}
}

如你所见,为了调用这个方法我们需要创建一个 StreamObserver,它为了服务器用它的 RouteSummary 应答实现了一个特殊的接口。在 StreamObserver 中,我们:

覆写了 onNext() 方法,在服务器把 RouteSummary 写入到消息流时,打印出返回的信息。
覆写了 onCompleted() 方法(在 服务器 完成自己的调用时调用)去设置 SettableFuture,这样我们可以检查服务器是不是完成写入。
之后,我们将 StreamObserver 传给异步存根的 recordRoute() 方法,拿到我们自己的 StreamObserver 请求观察者将 Point 发给服务器。一旦完成点的写入,我们使用请求观察者的 onCompleted() 方法告诉 gRPC 我们已经完成了客户端的写入。一旦完成,我们就检查 SettableFuture 验证服务器是否已经完成写入。

双向流式 RPC

最后,让我们看看双向流式 RPC RouteChat()。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public void routeChat() throws Exception {
info("*** RoutChat");
final SettableFuture<Void> finishFuture = SettableFuture.create();
StreamObserver<RouteNote> requestObserver =
asyncStub.routeChat(new StreamObserver<RouteNote>() {
@Override
public void onNext(RouteNote note) {
info("Got message \"{0}\" at {1}, {2}", note.getMessage(), note.getLocation()
.getLatitude(), note.getLocation().getLongitude());
}

@Override
public void onError(Throwable t) {
finishFuture.setException(t);
}

@Override
public void onCompleted() {
finishFuture.set(null);
}
});

try {
RouteNote[] requests =
{newNote("First message", 0, 0), newNote("Second message", 0, 1),
newNote("Third message", 1, 0), newNote("Fourth message", 1, 1)};

for (RouteNote request : requests) {
info("Sending message \"{0}\" at {1}, {2}", request.getMessage(), request.getLocation()
.getLatitude(), request.getLocation().getLongitude());
requestObserver.onNext(request);
}
requestObserver.onCompleted();

finishFuture.get();
info("Finished RouteChat");
} catch (Exception t) {
requestObserver.onError(t);
logger.log(Level.WARNING, "RouteChat Failed", t);
throw t;
}
}

和我们的客户端流的例子一样,我们拿到和返回一个 StreamObserver 应答观察者,除了这次我们在客户端仍然写入消息到 它们的 消息流时通过我们方法的应答观察者返回值。这里读写的语法和客户端流以及服务器流方法一样。虽然每一端都会按照它们写入的顺序拿到另一端的消息,客户端和服务器都可以任意顺序读写——流的操作是互不依赖的。

ProtoBuf 存储原理

核心是 Google 提出了“Base 128 Varints”编码,这是一种变字节长度的编码,官方描述为:varints 是用一个或多个字节序列化整形的一种方法。

序列化方式

protobuf 把 message 通过一系列 key_value 对来表示。
Key 的算法为:(field_number << 3)| wired_type
这里 field_number 就是具体的索引,wired_type 的值按下表查询。

wired_type .proto 类型
0 Varint int32, int64, uint32, uint64, sint32, sint64, bool, enum
1 64-bit fixed64, sfixed64, double
2 Length-delimited string, bytes, embedded messages, packed repeated fields
5 32-bit fixed32, sfixed32, float
对于 int,bool,enum 类型,value 就是 Varint。

而对于 string,bytes,message 等等类型,value 是长度+原始内容编码。

  • 举例 int 类型存储(Varint 存储原理)
    存储一个 int32 类型的数字,通常是 4 个字节。但是 Varints 最少只需要一个字节就可以了。
    Varints 规定小于 128 的数字都可以用一个字节来表示,比如 10, 它就会用一个字节 0000 1010 来存储。
    对于大于 128 的数字,则用更多个字节存储。
    以 150 举例:protobuf 的存储字节是 1001 0110 0000 0001。
    为什么会这样标识呢?首先我们了解一个字节共 8 位,表示的数字是 255,但是 Varints 只用一个字节表示小于 128 的数字,换句话说,就是 Varints 只用了 8 位中的 7 位来表示数字,而还有一位被用来干嘛了呢?
    Varints 在官方规定中表示,每个字节的最高位是有特殊含义,当最高位为 1 的时候,代表后续的字节也是该数字的一部分。当最高位为 0 的时候,则表示结束。
    比如过 150,二进制表示为 1001 0110。
    先取后七位 001 0110, 作为第一个字节的内容。
    再取余下 1 位,前面补 0 凑齐 7 位,就是 000 0001,作为第二字节。
    对于 intel 机器,是小端字节序,低字节位于地址低的。0010110 是低字节地址,因此排在前面,因为后面的也是数字的一部分,所以高位补 1,也就成了 10010110。 同样的,高字节 000 0001,排在后面,并且它后面没有后续字节了,所以补 0,也就成了 0000 0001。
    因此 150 在 protobuf 中的表示方式为 1001 0110 0000 0001。

  • 举例 string 类型存储

    1
    2
    3
    message Test {
    required string desc = 2;
    }

    假如把 a 设置为 “testing”的话, 那么序列化后的就是
    12 07 74 65 73 74 69 64 67
    其中 12 是 key。剩下的是 value。
    怎么算的呢?先看 12, 这里的 12,是个 16 进制数字,其二进制位表示为 0001 0010。
    0010 就是类型 string 的对应的 Type 值,根据上表,也就是 2。
    field_number (required string desc)是 2,也就是 0010,左移三位,就成了 0001 0000。
    按照 key 的计算公式,和 Type 值取并后就变成了 0001 0010,即 12。
    Value 是长度加原始内容编码。
    07 就是长度, 代表 string 总长 7 个字节。 后面 7 个数字一次代表每个字母所对应的 16 进制表示。

json 与 protobuf 的互转

1
2
3
4
5
6
<!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java-util -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
<version>3.6.1</version>
</dependency>

自定义的 bean 与 proto 是可以通过 Json 相互转换的,然而它们之间的转换需要第三方 JSON 转换工具和 protobuf util 的支持。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
//to Json
JsonFormat.Printer printer = JsonFormat.printer();
String print = "";
try {
print = printer.print(person);
System.out.println(print);
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}

//to Object
JsonFormat.Parser parser = JsonFormat.parser();
try {
PersonProto.Person.Builder newBuilder = PersonProto.Person.newBuilder();
parser.merge(print, newBuilder);
System.out.println(newBuilder.build());
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}

//添加java bean 此类对性数据库的字段,同时与proto类属性名相同
public class Person implements Serializable {
private String name;
private Integer age;
private Boolean sex;
private Date dirthday;//此处注意这里是时间类型而非proto类中的long类型
private String address;
private List<Car> cars = new ArrayList<Car>();
private Map<String, String> other = new HashMap<String, String>();
}

public class Car implements Serializable {
private String name;
private String color;
}

//在上面的转换中间添加以下代码,发现同样转换成功
Person myPerson = JsonUtil.toObject(print, Person.class);
System.out.println(myPerson);
print = JsonUtil.toJson(myPerson);