0%

gRPC 是什么?

参考 http://doc.oschina.net/grpc?t=58008
在 gRPC 里客户端应用可以像调用本地对象一样直接调用另一台不同的机器上服务端应用的方法,使得您能够更容易地创建分布式应用和服务。与许多 RPC 系统类似,gRPC 也是基于以下理念:定义一个服务,指定其能够被远程调用的方法(包含参数和返回类型)。在服务端实现这个接口,并运行一个 gRPC 服务器来处理客户端调用。在客户端拥有一个存根能够像服务端一样的方法。

gRPC 客户端和服务端可以在多种环境中运行和交互 - 从 google 内部的服务器到你自己的笔记本,并且可以用任何 gRPC 支持的语言来编写。所以,你可以很容易地用 Java 创建一个 gRPC 服务端,用 Go、Python、Ruby 来创建客户端。

使用 protocol buffers

gRPC 默认使用 protocol buffers,这是 Google 开源的一套成熟的结构数据序列化机制(当然也可以使用其他数据格式如 JSON)。正如你将在下方例子里所看到的,你用 proto files 创建 gRPC 服务,用 protocol buffers 消息类型来定义方法参数和返回类型。

protobuf 定义服务,JAVA 实现

protobuf 的详细用法https://blog.51cto.com/9291927/2331980

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
syntax = "proto3";

option java_package = "io.grpc.examples";

package helloworld;

// The greeter service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
string name = 1;
}

// The response message containing the greetings
message HelloReply {
string message = 1;
}

编译 protobuf 生成服务端接口和客户端存根

一旦定义好服务,我们可以使用 protocol buffer 编译器 protoc 来生成创建应用所需的特定客户端和服务端的代码 - 你可以生成任意 gRPC 支持的语言的代码,当然 PHP 和 Objective-C 仅支持创建客户端代码。生成的代码同时包括客户端的存根和服务端要实现的抽象接口,均包含 Greeter 所定义的方法。
以下类包含所有我们需要创建这个例子所有的代码:

  • HelloRequest.java, HelloResponse.java 和其他文件包含所有 protocol buffer 用来填充、序列化和提取 HelloRequest 和 HelloReply 消息类型的代码。
  • GreeterGrpc.java, 包含 (还有其他有用的代码):
    Greeter 服务端需要实现的接口
1
2
3
4
public static interface Greeter {
public void sayHello(Helloworld.HelloRequest request,
StreamObserver<HelloReply> responseObserver);
}

客户端用来与 Greeter 服务端进行对话的 存根 类。就像你所看到的,异步存根也实现了 Greeter 接口。

1
2
3
4
public static class GreeterStub extends AbstractStub<GreeterStub>
implements Greeter {
...
}

写一个服务器

现在让我们写点代码!首先我们将创建一个服务应用来实现服务

服务实现

GreeterImpl.java 准确地实现了 Greeter 服务所需要的行为。
正如你所见,GreeterImpl 类通过实现 sayHello 方法,实现了从 IDL 生成的 GreeterGrpc.Greeter 接口 。

1
2
3
4
5
6
7
8
public  static GreeterImpl impliment Greeter {
@Override
public void sayHello(HelloRequest req, StreamObserver<HelloReply> responseObserver) {
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
}

sayHello 有两个参数:

  • HelloRequest,请求。
  • StreamObserver: 应答观察者,一个特殊的接口,服务器用应答来调用它。

为了返回给客户端应答并且完成调用:

  • 用我们的激动人心的消息构建并填充一个在我们接口定义的 HelloReply 应答对象。
  • 将 HelloReply 返回给客户端,然后表明我们已经完成了对 RPC 的处理。

服务端实现

需要提供一个 gRPC 服务的另一个主要功能是让这个服务实在在网络上可用。
HelloWorldServer.java 提供了以下代码作为 Java 的例子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/* The port on which the server should run */
private int port = 50051;
private Server server;
private void start() throws Exception {
server = ServerBuilder.forPort(port)
.addService(GreeterGrpc.bindService(new GreeterImpl()))
.build()
.start();
logger.info("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
// Use stderr here since the logger may has been reset by its JVM shutdown hook.
System.err.println("*** shutting down gRPC server since JVM is shutting down");
HelloWorldServer.this.stop();
System.err.println("*** server shut down");
}
});
}

客户端实现

客户端的 gRPC 非常简单。在这一步,我们将用生成的代码写一个简单的客户程序来访问我们在上一节里创建的 Greeter 服务器。
首先我们看一下我们如何连接 Greeter 服务器。我们需要创建一个 gRPC 频道,指定我们要连接的主机名和服务器端口。然后我们用这个频道创建存根实例。

1
2
3
4
5
6
7
8
9
10
private final ManagedChannel channel;
private final GreeterGrpc.GreeterBlockingStub blockingStub;
public HelloWorldClient(String host, int port) {
channel = ManagedChannelBuilder.forAddress(host, port)
.usePlaintext(true)
.build();
blockingStub = GreeterGrpc.newBlockingStub(channel);
HelloRequest req = HelloRequest.newBuilder().setName(name).build();
HelloReply reply = blockingStub.sayHello(req);
}

在这个例子里,我们创建了一个阻塞的存根。这意味着 RPC 调用要等待服务器应答,将会返回一个应答或抛出一个异常。 gRPC Java 还可以有其他种类的存根,可以向服务器发出非阻塞的调用,这种情况下应答是异步返回的。

  • 我们创建并填充一个 HelloRequest 发送给服务。
  • 我们用请求调用存根的 SayHello(),如果 RPC 成功,会得到一个填充的 HelloReply ,从其中我们可以获得 greeting。

gRPC 进阶

在 protobuf 中定义服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
syntax = "proto3";

option java_multiple_files = true;
option java_package = "io.grpc.examples.routeguide"; //指定java文件中的package包名
option java_outer_classname = "RouteGuideProto";
option objc_class_prefix = "RTG";

package routeguide;

// Interface exported by the server.
service RouteGuide {//service用于指定服务,定义rpc方法,指定请求和相应类型
//一个 简单 RPC , 客户端使用存根发送请求到服务器并等待响应返回,就像平常的函数调用一样。
rpc GetFeature(Point) returns (Feature) {}

// 一个 服务器端流式 RPC , 客户端发送请求到服务器,拿到一个流去读取返回的消息序列。 客户端读取返回的流,直到里面没有任何消息。从例子中可以看出,通过在 响应 类型前插入 stream 关键字,可以指定一个服务器端的流方法。
rpc ListFeatures(Rectangle) returns (stream Feature) {}

// 一个 客户端流式 RPC , 客户端写入一个消息序列并将其发送到服务器,同样也是使用流。一旦 客户端完成写入消息,它等待服务器完成读取返回它的响应。通过在 请求 类型前指定 stream 关键字来指定一个客户端的流方法。
rpc RecordRoute(stream Point) returns (RouteSummary) {}

// 一个 双向流式 RPC 是双方使用读写流去发送一个消息序列。两个流独立操作,因此客户端和服务器 可以以任意喜欢的顺序读写:比如, 服务器可以在写入响应前等待接收所有的客户端消息,或者可以交替 的读取和写入消息,或者其他读写的组合。 每个流中的消息顺序被预留。你可以通过在请求和响应前加 stream 关键字去制定方法的类型。
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
}
我们的 .proto 文件也包含了所有请求的 protocol buffer 消息类型定义以及在服务方法中使用的响
应类型——比如,下面的Point消息类型:
// Points are represented as latitude-longitude pairs in the E7 representation
// (degrees multiplied by 10**7 and rounded to the nearest integer).
// Latitudes should be in the range +/- 90 degrees and longitude should be in
// the range +/- 180 degrees (inclusive).
message Point {
int32 latitude = 1;
int32 longitude = 2;
}

// A latitude-longitude rectangle, represented as two diagonally opposite
// points "lo" and "hi".
message Rectangle {
// One corner of the rectangle.
Point lo = 1;

// The other corner of the rectangle.
Point hi = 2;
}

// A feature names something at a given point.
//
// If a feature could not be named, the name is empty.
message Feature {
// The name of the feature.
string name = 1;

// The point where the feature is detected.
Point location = 2;
}

// Not used in the RPC. Instead, this is here for the form serialized to disk.
message FeatureDatabase {
repeated Feature feature = 1;
}

// A RouteNote is a message sent while at a given point.
message RouteNote {
// The location from which the message is sent.
Point location = 1;

// The message to be sent.
string message = 2;
}

// A RouteSummary is received in response to a RecordRoute rpc.
//
// It contains the number of individual points received, the number of
// detected features, and the total distance covered as the cumulative sum of
// the distance between each point.
message RouteSummary {
// The number of points received.
int32 point_count = 1;

// The number of known features passed while traversing the route.
int32 feature_count = 2;

// The distance covered in metres.
int32 distance = 3;

// The duration of the traversal in seconds.
int32 elapsed_time = 4;
}

生成客户端和服务端代码

接下来我们需要从 .proto 的服务定义中生成 gRPC 客户端和服务器端的接口。我们通过 protocol buffer 的编译器 protoc 以及一个特殊的 gRPC Java 插件来完成。为了生成 gRPC 服务,你必须使用 proto3 编译器。
下面的类都是从我们的服务定义中生成:

  • 包含了所有填充,序列化以及获取请求和应答的消息类型的 Feature.java,Point.java, Rectangle.java 以及其它类文件。
  • RouteGuideGrpc.java 文件包含(以及其它一些有用的代码):
    • RouteGuide 服务器要实现的一个接口 RouteGuideGrpc.RouteGuide,其中所有的方法都定 义在 RouteGuide 服务中。
    • 客户端可以用来和 RouteGuide 服务器交互的 存根 类。 异步的存根也实现了 RouteGuide 接口。

创建服务器

让 RouteGuide 服务工作有两个部分:

  • 实现我们服务定义的生成的服务接口:做我们的服务的实际的“工作”。
  • 运行一个 gRPC 服务器,监听来自客户端的请求并返回服务的响应。

实现 RouteGuide

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
public class RouteGuideServer {
private static final Logger logger = Logger.getLogger(RouteGuideServer.class.getName());

private final int port;
private final Server server;

public RouteGuideServer(int port) throws IOException {
this(port, RouteGuideUtil.getDefaultFeaturesFile());
}

/** Create a RouteGuide server listening on {@code port} using {@code featureFile} database. */
public RouteGuideServer(int port, URL featureFile) throws IOException {
this(ServerBuilder.forPort(port), port, RouteGuideUtil.parseFeatures(featureFile));
}

/** Create a RouteGuide server using serverBuilder as a base and features as data. */
public RouteGuideServer(ServerBuilder<?> serverBuilder, int port, Collection<Feature> features) {
this.port = port;
server = serverBuilder.addService(new RouteGuideService(features))
.build();
}

/** Start serving requests. */
public void start() throws IOException {
server.start();
logger.info("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("*** shutting down gRPC server since JVM is shutting down");
try {
RouteGuideServer.this.stop();
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
System.err.println("*** server shut down");
}
});
}

/** Stop serving requests and shutdown resources. */
public void stop() throws InterruptedException {
if (server != null) {
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
}
}

/**
* Await termination on the main thread since the grpc library uses daemon threads.
*/
private void blockUntilShutdown() throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
}

/**
* Main method. This comment makes the linter happy.
*/
public static void main(String[] args) throws Exception {
RouteGuideServer server = new RouteGuideServer(8980);
server.start();
server.blockUntilShutdown();
}

/**
* 我们的服务器有一个实现了生成的 RouteGuideGrpc.Service 接口的 RouteGuideService 类:
*
* <p>See route_guide.proto for details of the methods.
*/
private static class RouteGuideService extends RouteGuideGrpc.RouteGuideImplBase {
private final Collection<Feature> features;
private final ConcurrentMap<Point, List<RouteNote>> routeNotes =
new ConcurrentHashMap<Point, List<RouteNote>>();

RouteGuideService(Collection<Feature> features) {
this.features = features;
}

/**
* 简单 RPC
* 1. 如在我们的服务定义中指定的那样,我们组织并填充一个 Feature 应答对象返回给客户端。在这个 例子中,我们通过一个单独的私有方法checkFeature()来实现。
* 2. 我们使用应答观察者的 onNext() 方法返回 Feature。
* 3. 我们使用应答观察者的 onCompleted() 方法来指出我们已经完成了和 RPC的交互。
* @param request the requested location for the feature. 请求
* @param responseObserver the observer that will receive the feature at the requested point.一个应答的观察者,实际上是服务器调用它应答的一个特殊接口。
*/
@Override
public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
responseObserver.onNext(checkFeature(request));
responseObserver.onCompleted();
}

/**
* 服务器端流式 RPC
* 这次我们得到了需要返回给客户端的足够多的 Feature 对象(在这个场景下,我们根据他们是否在我们的 Rectangle 请求中,从服务的特性集合中选择他们),并且使用 onNext() 方法轮流往响应观察* 者写入。最后,和简单 RPC 的例子一样,我们使用响应观察者的 onCompleted() 方法去告诉 gRPC 写入应答已完成。
* @param request the bounding rectangle for the requested features.
* @param responseObserver the observer that will receive the features.
*/
@Override
public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
int left = min(request.getLo().getLongitude(), request.getHi().getLongitude());
int right = max(request.getLo().getLongitude(), request.getHi().getLongitude());
int top = max(request.getLo().getLatitude(), request.getHi().getLatitude());
int bottom = min(request.getLo().getLatitude(), request.getHi().getLatitude());

for (Feature feature : features) {
if (!RouteGuideUtil.exists(feature)) {
continue;
}

int lat = feature.getLocation().getLatitude();
int lon = feature.getLocation().getLongitude();
if (lon >= left && lon <= right && lat >= bottom && lat <= top) {
responseObserver.onNext(feature);
}
}
responseObserver.onCompleted();
}

/**
* 客户端流式 RPC
* 如你所见,我们的方法和前面的方法类型相似,拿到一个 StreamObserver 应答观察者参数,但是这次它返回一个 StreamObserver 以便客户端写入它的 Point。
* 在这个方法体中,我们返回了一个匿名 StreamObserver 实例,其中我们:
* 覆写了 onNext() 方法,每次客户端写入一个 Point 到消息流时,拿到特性和其它信息。
* 覆写了 onCompleted() 方法(在 客户端 结束写入消息时调用),用来填充和构建我们的 RouteSummary。然后我们用 RouteSummary 调用方法自己的的响应观察者的 onNext(),之后调用它的 onCompleted() 方法,结束服务器端的调用。
*
* @param responseObserver an observer to receive the response summary.
* @return an observer to receive the requested route points.
*/
@Override
public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
return new StreamObserver<Point>() {
int pointCount;
int featureCount;
int distance;
Point previous;
final long startTime = System.nanoTime();

@Override
public void onNext(Point point) {
pointCount++;
if (RouteGuideUtil.exists(checkFeature(point))) {
featureCount++;
}
// For each point after the first, add the incremental distance from the previous point to
// the total distance value.
if (previous != null) {
distance += calcDistance(previous, point);
}
previous = point;
}

@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "recordRoute cancelled");
}

@Override
public void onCompleted() {
long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);
responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount)
.setFeatureCount(featureCount).setDistance(distance)
.setElapsedTime((int) seconds).build());
responseObserver.onCompleted();
}
};
}

/**
* 双向流式RPC(与客户端流的差别主要是在responseObserver.onNext()方法的调用位置,这个调用位置位于onNext()方法中。而客户端流是在onComplete()方法中,意味着需要把客户端流处理完成后,才能调用onNext方法处理并立即结束。)
* 和我们的客户端流的例子一样,我们拿到和返回一个 StreamObserver 应答观察者,除了这次我们在客户端仍然写入消息到 它们的 消息流时通过我们方法的应答观察者返回值。这里读写的语法和客户端流以及服务器流方法一样。虽然每一端都会按照它们写入的顺序拿到另一端的消息,客户端和服务器都可以任意顺序读写——流的操作是互不依赖的。
*
* @param responseObserver an observer to receive the stream of previous messages.
* @return an observer to handle requested message/location pairs.
*/
@Override
public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
return new StreamObserver<RouteNote>() {
@Override
public void onNext(RouteNote note) {
List<RouteNote> notes = getOrCreateNotes(note.getLocation());

// Respond with all previous notes at this location.
for (RouteNote prevNote : notes.toArray(new RouteNote[0])) {
responseObserver.onNext(prevNote);
}

// Now add the new note to the list
notes.add(note);
}

@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "routeChat cancelled");
}

@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}

/**
* Get the notes list for the given location. If missing, create it.
*/
private List<RouteNote> getOrCreateNotes(Point location) {
List<RouteNote> notes = Collections.synchronizedList(new ArrayList<RouteNote>());
List<RouteNote> prevNotes = routeNotes.putIfAbsent(location, notes);
return prevNotes != null ? prevNotes : notes;
}

/**
* Gets the feature at the given point.
*
* @param location the location to check.
* @return The feature object at the point. Note that an empty name indicates no feature.
*/
private Feature checkFeature(Point location) {
for (Feature feature : features) {
if (feature.getLocation().getLatitude() == location.getLatitude()
&& feature.getLocation().getLongitude() == location.getLongitude()) {
return feature;
}
}

// No feature was found, return an unnamed feature.
return Feature.newBuilder().setName("").setLocation(location).build();
}

/**
* Calculate the distance between two points using the "haversine" formula.
* The formula is based on http://mathforum.org/library/drmath/view/51879.html.
*
* @param start The starting point
* @param end The end point
* @return The distance between the points in meters
*/
private static int calcDistance(Point start, Point end) {
int r = 6371000; // earth radius in meters
double lat1 = toRadians(RouteGuideUtil.getLatitude(start));
double lat2 = toRadians(RouteGuideUtil.getLatitude(end));
double lon1 = toRadians(RouteGuideUtil.getLongitude(start));
double lon2 = toRadians(RouteGuideUtil.getLongitude(end));
double deltaLat = lat2 - lat1;
double deltaLon = lon2 - lon1;

double a = sin(deltaLat / 2) * sin(deltaLat / 2)
+ cos(lat1) * cos(lat2) * sin(deltaLon / 2) * sin(deltaLon / 2);
double c = 2 * atan2(sqrt(a), sqrt(1 - a));

return (int) (r * c);
}
}
}

启动服务器

1
2
3
4
5
6
7
public void start() {
gRpcServer = NettyServerBuilder.forPort(port)
.addService(RouteGuideGrpc.bindService(new RouteGuideService(features)))
.build().start();
logger.info("Server started, listening on " + port);
...
}

为了做到这个,我们需要:

  • 创建我们服务实现类 RouteGuideService 的一个实例并且将其传给生成的 RouteGuideGrpc 类的静态方法 bindService() 去获得服务定义。
  • 使用生成器的 forPort() 方法指定地址以及期望客户端请求监听的端口。
  • 通过传入将 bindService() 返回的服务定义,用生成器注册我们的服务实现到生成器的 addService() 方法。
  • 调用生成器上的 build() 和 start() 方法为我们的服务创建和启动一个 RPC 服务器。

创建客户端

创建存根

为了调用服务方法,我们需要首先创建一个 存根,或者两个存根:

  • 一个 阻塞/同步 存根:这意味着 RPC 调用等待服务器响应,并且要么返回应答,要么造成异常。
  • 一个 非阻塞/异步 存根可以向服务器发起非阻塞调用,应答会异步返回。你可以使用异步存根去发起特定类型的流式调用。

我们首先为存根创建一个 gRPC channel,指明服务器地址和我们想连接的端口号:

1
2
3
channel = NettyChannelBuilder.forAddress(host, port)
.negotiationType(NegotiationType.PLAINTEXT)
.build();

如你所见,我们用一个 NettyServerBuilder 构建和启动服务器。这个服务器的生成器基于 Netty 传输框架。
我们使用 Netty 传输框架,所以我们用一个 NettyServerBuilder 启动服务器。
现在我们可以通过从 .proto 中生成的 RouteGuideGrpc 类的 newStub 和 newBlockingStub 方法,使用频道去创建我们的存根。

1
2
blockingStub = RouteGuideGrpc.newBlockingStub(channel);
asyncStub = RouteGuideGrpc.newStub(channel);

调用服务方法

简单 RPC

在阻塞存根上调用简单 RPC GetFeature 几乎是和调用一个本地方法一样直观。

1
2
Point request = Point.newBuilder().setLatitude(lat).setLongitude(lon).build();
Feature feature = blockingStub.getFeature(request);

我们创建和填充了一个请求 protocol buffer 对象(在这个场景下是 Point),在我们的阻塞存根上将其传给 getFeature() 方法,拿回一个 Feature。

服务器端流式 RPC

接下来,让我们看一个对于 ListFeatures 的服务器端流式调用,这个调用会返回一个地理性的 Feature 流:

1
2
3
4
5
Rectangle request =
Rectangle.newBuilder()
.setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build())
.setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build();
Iterator<Feature> features = blockingStub.listFeatures(request);

如你所见,这和我们刚看过的简单 RPC 很相似,除了方法返回客户端用来读取所有返回的 Feature 的 一个 Iterator,而不是单个的 Feature。

客户端流式 RPC

现在看看稍微复杂点的东西:我们在客户端流方法 RecordRoute 中发送了一个 Point 流给服务器并且拿到一个 RouteSummary。为了这个方法,我们需要使用异步存根。如果你已经阅读了
创建服务器,一些部分看起来很相近——异步流式 RPC 是在两端通过相似的方式实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public void recordRoute(List<Feature> features, int numPoints) throws Exception {
info("*** RecordRoute");
final SettableFuture<Void> finishFuture = SettableFuture.create();
StreamObserver<RouteSummary> responseObserver = new StreamObserver<RouteSummary>() {
@Override
public void onNext(RouteSummary summary) {
info("Finished trip with {0} points. Passed {1} features. "
+ "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(),
summary.getFeatureCount(), summary.getDistance(), summary.getElapsedTime());
}

@Override
public void onError(Throwable t) {
finishFuture.setException(t);
}

@Override
public void onCompleted() {
finishFuture.set(null);
}
};

StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
try {
// Send numPoints points randomly selected from the features list.
StringBuilder numMsg = new StringBuilder();
Random rand = new Random();
for (int i = 0; i < numPoints; ++i) {
int index = rand.nextInt(features.size());
Point point = features.get(index).getLocation();
info("Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point),
RouteGuideUtil.getLongitude(point));
requestObserver.onNext(point);
// Sleep for a bit before sending the next one.
Thread.sleep(rand.nextInt(1000) + 500);
if (finishFuture.isDone()) {
break;
}
}
info(numMsg.toString());
requestObserver.onCompleted();

finishFuture.get();
info("Finished RecordRoute");
} catch (Exception e) {
requestObserver.onError(e);
logger.log(Level.WARNING, "RecordRoute Failed", e);
throw e;
}
}

如你所见,为了调用这个方法我们需要创建一个 StreamObserver,它为了服务器用它的 RouteSummary 应答实现了一个特殊的接口。在 StreamObserver 中,我们:

覆写了 onNext() 方法,在服务器把 RouteSummary 写入到消息流时,打印出返回的信息。
覆写了 onCompleted() 方法(在 服务器 完成自己的调用时调用)去设置 SettableFuture,这样我们可以检查服务器是不是完成写入。
之后,我们将 StreamObserver 传给异步存根的 recordRoute() 方法,拿到我们自己的 StreamObserver 请求观察者将 Point 发给服务器。一旦完成点的写入,我们使用请求观察者的 onCompleted() 方法告诉 gRPC 我们已经完成了客户端的写入。一旦完成,我们就检查 SettableFuture 验证服务器是否已经完成写入。

双向流式 RPC

最后,让我们看看双向流式 RPC RouteChat()。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public void routeChat() throws Exception {
info("*** RoutChat");
final SettableFuture<Void> finishFuture = SettableFuture.create();
StreamObserver<RouteNote> requestObserver =
asyncStub.routeChat(new StreamObserver<RouteNote>() {
@Override
public void onNext(RouteNote note) {
info("Got message \"{0}\" at {1}, {2}", note.getMessage(), note.getLocation()
.getLatitude(), note.getLocation().getLongitude());
}

@Override
public void onError(Throwable t) {
finishFuture.setException(t);
}

@Override
public void onCompleted() {
finishFuture.set(null);
}
});

try {
RouteNote[] requests =
{newNote("First message", 0, 0), newNote("Second message", 0, 1),
newNote("Third message", 1, 0), newNote("Fourth message", 1, 1)};

for (RouteNote request : requests) {
info("Sending message \"{0}\" at {1}, {2}", request.getMessage(), request.getLocation()
.getLatitude(), request.getLocation().getLongitude());
requestObserver.onNext(request);
}
requestObserver.onCompleted();

finishFuture.get();
info("Finished RouteChat");
} catch (Exception t) {
requestObserver.onError(t);
logger.log(Level.WARNING, "RouteChat Failed", t);
throw t;
}
}

和我们的客户端流的例子一样,我们拿到和返回一个 StreamObserver 应答观察者,除了这次我们在客户端仍然写入消息到 它们的 消息流时通过我们方法的应答观察者返回值。这里读写的语法和客户端流以及服务器流方法一样。虽然每一端都会按照它们写入的顺序拿到另一端的消息,客户端和服务器都可以任意顺序读写——流的操作是互不依赖的。

ProtoBuf 存储原理

核心是 Google 提出了“Base 128 Varints”编码,这是一种变字节长度的编码,官方描述为:varints 是用一个或多个字节序列化整形的一种方法。

序列化方式

protobuf 把 message 通过一系列 key_value 对来表示。
Key 的算法为:(field_number << 3)| wired_type
这里 field_number 就是具体的索引,wired_type 的值按下表查询。

wired_type .proto 类型
0 Varint int32, int64, uint32, uint64, sint32, sint64, bool, enum
1 64-bit fixed64, sfixed64, double
2 Length-delimited string, bytes, embedded messages, packed repeated fields
5 32-bit fixed32, sfixed32, float
对于 int,bool,enum 类型,value 就是 Varint。

而对于 string,bytes,message 等等类型,value 是长度+原始内容编码。

  • 举例 int 类型存储(Varint 存储原理)
    存储一个 int32 类型的数字,通常是 4 个字节。但是 Varints 最少只需要一个字节就可以了。
    Varints 规定小于 128 的数字都可以用一个字节来表示,比如 10, 它就会用一个字节 0000 1010 来存储。
    对于大于 128 的数字,则用更多个字节存储。
    以 150 举例:protobuf 的存储字节是 1001 0110 0000 0001。
    为什么会这样标识呢?首先我们了解一个字节共 8 位,表示的数字是 255,但是 Varints 只用一个字节表示小于 128 的数字,换句话说,就是 Varints 只用了 8 位中的 7 位来表示数字,而还有一位被用来干嘛了呢?
    Varints 在官方规定中表示,每个字节的最高位是有特殊含义,当最高位为 1 的时候,代表后续的字节也是该数字的一部分。当最高位为 0 的时候,则表示结束。
    比如过 150,二进制表示为 1001 0110。
    先取后七位 001 0110, 作为第一个字节的内容。
    再取余下 1 位,前面补 0 凑齐 7 位,就是 000 0001,作为第二字节。
    对于 intel 机器,是小端字节序,低字节位于地址低的。0010110 是低字节地址,因此排在前面,因为后面的也是数字的一部分,所以高位补 1,也就成了 10010110。 同样的,高字节 000 0001,排在后面,并且它后面没有后续字节了,所以补 0,也就成了 0000 0001。
    因此 150 在 protobuf 中的表示方式为 1001 0110 0000 0001。

  • 举例 string 类型存储

    1
    2
    3
    message Test {
    required string desc = 2;
    }

    假如把 a 设置为 “testing”的话, 那么序列化后的就是
    12 07 74 65 73 74 69 64 67
    其中 12 是 key。剩下的是 value。
    怎么算的呢?先看 12, 这里的 12,是个 16 进制数字,其二进制位表示为 0001 0010。
    0010 就是类型 string 的对应的 Type 值,根据上表,也就是 2。
    field_number (required string desc)是 2,也就是 0010,左移三位,就成了 0001 0000。
    按照 key 的计算公式,和 Type 值取并后就变成了 0001 0010,即 12。
    Value 是长度加原始内容编码。
    07 就是长度, 代表 string 总长 7 个字节。 后面 7 个数字一次代表每个字母所对应的 16 进制表示。

json 与 protobuf 的互转

1
2
3
4
5
6
<!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java-util -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
<version>3.6.1</version>
</dependency>

自定义的 bean 与 proto 是可以通过 Json 相互转换的,然而它们之间的转换需要第三方 JSON 转换工具和 protobuf util 的支持。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
//to Json
JsonFormat.Printer printer = JsonFormat.printer();
String print = "";
try {
print = printer.print(person);
System.out.println(print);
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}

//to Object
JsonFormat.Parser parser = JsonFormat.parser();
try {
PersonProto.Person.Builder newBuilder = PersonProto.Person.newBuilder();
parser.merge(print, newBuilder);
System.out.println(newBuilder.build());
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}

//添加java bean 此类对性数据库的字段,同时与proto类属性名相同
public class Person implements Serializable {
private String name;
private Integer age;
private Boolean sex;
private Date dirthday;//此处注意这里是时间类型而非proto类中的long类型
private String address;
private List<Car> cars = new ArrayList<Car>();
private Map<String, String> other = new HashMap<String, String>();
}

public class Car implements Serializable {
private String name;
private String color;
}

//在上面的转换中间添加以下代码,发现同样转换成功
Person myPerson = JsonUtil.toObject(print, Person.class);
System.out.println(myPerson);
print = JsonUtil.toJson(myPerson);

Web 基础和 tomcat

tomcat 学艺

JavaEE 规范:JDBC,JNDI,EJB,RMI,JSP,Servlets,XML,JMS,Java IDL,JTS,JTA,JavaMail,JAF。
Tomcat 只实现了俩:Servlet 和 JSP。其他服务器比如 JBoss、Weblogic 啥的都是完全支持的。所以人们往往更愿意叫 Tomcat 为轻量级的服务器,也有叫它 Servlet/JSP 容器的。所以 java 开发时不需要额外依赖 servlet 和 JSP,但是需要额外依赖 JDBC,因为 tomcat 里只实现了 Servlet 和 JSP。

服务器

服务器的主要作用:

  • 将资源对外暴露
  • 配合各种传输协议进行响应输出
    请先了解以下三个概念:
  • IP:电子设备在网络中的唯一表示,一个 IP 对应一台实体电脑。
  • 端口:应用程序在计算机中的唯一标识,一个端口只能被唯一程序占用。
  • 传输协议:数据传输的规则。
    浏览器与服务器通信流程如下:

    实际请求时,最终还是要换算成 IP 去访问。总得来说有两种换算的途径:1.本机的 hosts 文件 2.DNS 服务器
    DNS 解析域名得到对应的 IP 后,Request 请求里还是会带上 host。为什么?

因为:域名!=IP。

实际上一个 IP 可以对应多个域名。也就是说一台实体服务器(大铁柜),理论上可以有多个域名(虚拟主机)。实体服务器和网站是两个概念。IP 只是对应实体服务器,而域名对应具体的网站。

比如上面百度服务器,虽然看起来 115.239.210.27 这个 IP 完全等同于http://www.baidu.com,但也有可能这个IP对应的服务器上配置了两个虚拟主机:www.baidu.com和tieba.baidu.com。所以即使找到了IP对应的服务器实体,Request请求还是要带上host主机名,以确定是哪个虚拟主机。

另外,如果两个域名对应同一个 IP,那么必须设置其中一个域名为默认的,不然同一台服务器有两个虚拟主机,我该访问谁?
已经知道 IP,就无需 DNS 解析,可直接访问服务器。若这个 IP 对应的服务器有两个虚拟主机,而用户 Request 请求行中又没有指定 host,则会访问默认主机(因此服务器要事先指定默认主机!Tomcat 默认 localhost,localhost 这个名称是一个保留域名)

最后,再用 Tomcat 举个例子。比如,现在我有一台笔记本电脑(一个实体服务器),它的本机 IP 是 192.168.112.1,我在上面装了 Tomcat。如果 Tomcat 不改动配置,则默认只有一个虚拟主机 localhost(默认主机)。接着我开发了一个 JavaWeb 程序 demo1 部署到 Tomcat,然后我同事在浏览器输入下方地址。192.168.112.1:8080/demo1/index.html。访问我的电脑。虽然没有带 host,但是 localhost 是默认的,于是访问它。

3 个容易混淆的概念

其实,Tomcat 服务器 = Web 服务器 + Servlet/JSP 容器(Web 容器)。
Web 服务器的作用是接收客户端的请求,给客户端作出响应。但是很明显,服务器不止静态资源呀,所以客户端发起请求后,如果是动态资源,Web 服务器不可能直接把它响应回去(比如 JSP),因为浏览器只认识静态资源。所以对于 JavaWeb 程序而言,还需要 JSP/Servlet 容器,JSP/Servlet 容器的基本功能是把动态资源转换成静态资源。我们 JavaWeb 工程师需要使用 Web 服务器和 JSP/Servlet 容器,而通常这两者会集于一身,比如 Tomcat。

我们开发的 Web 应用都是半成品

我们开发 JavaWeb 时,你操心过如何接收 HTTP 请求和响应 HTTP 请求吗?显然没有嘛!因为服务器都已经完成了。所以,我们用 Java 开发的 Web 应用只是一个半成品,类似于一个插件,而服务器则像一个收发器:

什么是动态资源

其实对于何谓动态资源,我也没有很精准的概念。要讲清楚一个东西是什么,有时是比较难的事。不如先说它不是什么。

首先,动态资源不等同于动态页面。所谓动态页面,就是页面会动,而会动的页面不一定是动态资源。比如我可以用 JQuery 执行一段代码,让一个 Div 不断放大缩小,但是很显然它还是一个 HTML 页面。

所谓动态资源,其实最显著的特征就是它能动态地生成 HTML!比如 JSP。动态资源有个“特色”:它的数据是“可拼装”的、而且“可以随时间变化”。下面用号称可以抗住 8 个明星同时出轨的新浪服务器举个例子:

Tomcat 架构

目录

Tomcat 的架构

server.xml 配置



简略解释一下 xml 里的配置:

tomcat 中的 DefaultServlet

实际使用 tomcat 的时候,即使没有编写 Servlet,tomcat 依然可以处理请求返回结果,是因为 tomcat 有一个 DefaultServlet。像 Tomcat 这样的 Servlet 容器来说,任何一个请求的背后肯定有个 Servlet 在默默处理:

在 tomcat/conf/web.xml 中,声明了一个 DefaultServlet。我们每个动态 web 工程都有个 web.xml,而 conf 里的这个,是它们的“老爹”。它里面的配置,如果动态 web 工程没有覆盖,就会被“继承”下来。我们会发现,conf/web.xml 里配置了一个 DefaultServlet:

DefaultServlet 的作用:最低级匹配,当没有对应的 Servlet 处理当前请求时,才轮到它处理。要么找到并响应请求的资源,要么给出 404 页面

JSP 剖析

我们都知道 JSP 是“化了浓妆”的 Servlet,但是好不容易伪装成了一个 JSP,是谁帮它卸妆的呢?另外,大家仔细想想,一般来说 JavaWeb 阶段我们访问资源有三种“形式”:

1
2
3
localhost:8080/demo/AServlet:很明显,我们手动写了一个AServlet处理它
localhost:8080/demo/haha.html:虽然我们没写,但是Tomcat自己准备了DefaultServlet
localhost:8080/demo/index.jsp:我擦,谁来处理?

对呀,细思恐极,这*.jsp 的资源,谁来处理?其实就是 JspServlet。它的作用简而言之就是:

首先,根据请求路径找到 JSP
然后,将它“翻译成”Servlet

JSP 的 Servlet 也定义在 conf/web.xml。

tomcat 处理请求的几种方式

tomcat 处理 http 流程

1、用户点击网页内容,请求被发送到本机端口 8080,被在那里监听的 Coyote HTTP/1.1 Connector 获得。
2、Connector 把该请求交给它所在的 Service 的 Engine 来处理,并等待 Engine 的回应。
3、Engine 获得请求 localhost/servlet/helloServlet,匹配所有的虚拟主机 Host。
4、Engine 匹配到名为 localhost 的 Host(即使匹配不到也把请求交给该 Host 处理,因为该 Host 被定义为该 Engine 的默认主机),名为 localhost 的 Host 获得求/servlet/HelloServlet,匹配它所拥有的所有的 Context。Host 匹配到路径为/servlet 的 Context(如果匹配不到就把该请求交给路径名为“ ”的 Context 去处理)。
5、path=“/servlet”的 Context 获得请求/HelloServlet,在它的 mapping table 中寻找出对应的 Servlet。
6、构造 HttpServletRequest 对象和 HttpServletResponse 对象,作为参数调用 Servlet 的 doGet()或 doPost().执行业务逻辑、数据存储等程序。
7、Context 把执行完之后的 HttpServletResponse 对象返回给 Host。
8、Host 把 HttpServletResponse 对象返回给 Engine。
9、Engine 把 HttpServletResponse 对象返回 Connector。
10、Connector 把 HttpServletResponse 对象返回给客户 Browser。

Servlet

Java Servlet 技术简称 Servlet 技术,是 Java 开发 Web 应用的底层技术。由 Sun 公司于 1996 年发布,用来代替 CGU—-当时生成 Web 动态内容的主流技术。CGI 的问题是每一个 WEB 请求都需要重新启动一个进程来处理。创建进程需要消耗不少 CPU 周期,导致难以编写刻苦鏖战的 CGI 程序,而 Servlet 在创建后(处理第一个请求时)就一直保存在内存中,这就比 CGI 有着更好的性能。

Servlet 是一个 Java 程序,一个 servlet 应用有一个或多个 Servlet 程序。JSP 页面会被转换和编译成 servlet 程序。

Servlet 程序无法独立运行,必须运行在 Servlet 容器中。Servlet 容器将用户的请求床底给 servlet 应用,并将结果返回给用户。由于大部分 Servlet 用用都包含多个 JSP 页面,因此更准确地说是“Servlet/JSP 应用”。

Servlet/JSP 容器是一个可以同时处理 Servlet 和静态内容的 Web 容器。过去,由于通常认为 HTTP 服务器比 Servlet/JSP 容器更加可靠,因此人们习惯将 servlet 容器当做 HTTP 服务器的一个模块,这种模式下,HTTP 服务器用来处理静态资源,Servlet 容器则负责生成动态内容。

例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
public class HelloServlet extends HttpServlet {

/**
* Constructor of the object.
*/
public HelloServlet() {
super();
}

/**
* Destruction of the servlet. <br>
*/
public void destroy() {
super.destroy(); // Just puts "destroy" string in log
// Put your code here
}

/**
* The doGet method of the servlet. <br>
*
* This method is called when a form has its tag value method equals to get.
*
* @param request the request send by the client to the server
* @param response the response send by the server to the client
* @throws ServletException if an error occurred
* @throws IOException if an error occurred
*/
public void doGet(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {

response.setContentType("text/html");
PrintWriter out = response.getWriter();
out.println("<!DOCTYPE HTML PUBLIC \"-//W3C//DTD HTML 4.01 Transitional//EN\">");
out.println("<HTML>");
out.println(" <HEAD><TITLE>A Servlet</TITLE></HEAD>");
out.println(" <BODY>");
out.print(" This is ");
out.print(this.getClass());
out.println(", using the GET method");
out.println(" </BODY>");
out.println("</HTML>");
out.flush();
out.close();
}

/**
* The doPost method of the servlet. <br>
*
* This method is called when a form has its tag value method equals to post.
*
* @param request the request send by the client to the server
* @param response the response send by the server to the client
* @throws ServletException if an error occurred
* @throws IOException if an error occurred
*/
public void doPost(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {

response.setContentType("text/html");
PrintWriter out = response.getWriter();
out.println("<!DOCTYPE HTML PUBLIC \"-//W3C//DTD HTML 4.01 Transitional//EN\">");
out.println("<HTML>");
out.println(" <HEAD><TITLE>A Servlet</TITLE></HEAD>");
out.println(" <BODY>");
out.print(" This is ");
out.print(this.getClass());
out.println(", using the POST method");
out.println(" </BODY>");
out.println("</HTML>");
out.flush();
out.close();
}

/**
* Initialization of the servlet. <br>
*
* @throws ServletException if an error occurs
*/
public void init() throws ServletException {
// Put your code here
}

}

我们可以看见,这个 HelloServlet 继承了 HTTPServlet 类,主要有 init()、doGet()、doPost()、destroy()四个方法。我们在浏览器中分别用 get 和 post 访问这个 servlet。

我们可以看到,这就是上文我们编写的 doGet 方法中的 Html 生成的页面。

Servlet 前世今生

一个 web 请求过程:

  • 接受请求
  • 处理请求
  • 相应请求
    web 服务器负责接受请求和相应请求,如果是静态资源那么 web 服务器就够用了。如果是动态资源就要处理请求,这时候就需要 servlet 了。 servlet 专门用来处理请求,编写业务逻辑。后来三层架构出现了,servlet 就把一些任务分担到 servier 和 dao,形成了 servlet(contrller)+service+dao。servlet 本身不擅长往浏览器输出 HTML 页面,所以出现了 JSP。
    等 Spring 家族出现后,Servlet 开始退居幕后,取而代之的是 SpringMVC。SpringMVC 的核心组件 DispatcherServlet 其实本质就是一个 Servlet。但它已经自立门户,在原来 HTTPServlet 的基础上,又封装了一条逻辑。
    Servlet 是 J2EE 规范中的一种,主要是为了扩展 java 作为 web 服务的功能,事实上,servlet 就是一个 Java 接口。

Servlet 与 SpringMVC

参考连接:https://zhuanlan.zhihu.com/p/65658315

ServletContext

ServletContext 对象的作用是在整个 Web 应用的动态资源(Servlet/JSP)之间共享数据。例如在 AServlet 中向 ServletContext 对象保存一个值,然后在 BServlet 中就可以获取这个值。

共享数据

这种用来装载共享数据的对象,在 JavaWeb 中共有 4 个,而且更习惯被成为“域对象”:

  • ServletContext 域(Servlet 间共享数据)
  • Session 域(一次会话间共享数据,也可以理解为多次请求间共享数据)
  • Request 域(同一次请求共享数据)
  • Page 域(JSP 页面内共享数据)
    它们都可以看做是 map,都有 getAttribute()/setAttribute()方法。

ServletContext 的方法

所以,获取 ServletContext 的方法共 5 种(page 域这里不考虑,JSP 太少用了):

  • ServletConfig#getServletContext();
  • GenericServlet#getServletContext();
  • HttpSession#getServletContext();
  • HttpServletRequest#getServletContext();
  • ServletContextEvent#getServletContext();

Filter 过滤器

Filter 更详细的拦截其实是这样:
最外层那个圈,可以理解成 ServletContext,FORWARD/INCLUDE 这些都是内部请求。如果在 web.xml 中配置 Filter 时 4 种拦截方式全配上,那么服务器内部的分发跳转都会被过滤。下图中灰色的墙就是 fliter。红色字代表各种请求方式,FORWARD 和 INCLUDE 的位置相同。

映射器

对于静态资源,Tomcat 最后会交由一个叫做 DefaultServlet 的类来处理
对于 Servlet ,Tomcat 最后会交由一个叫做 InvokerServlet 的类来处理
对于 JSP,Tomcat 最后会交由一个叫做 JspServlet 的类来处理

DispatcherServlet 与 SpringMVC

tomcat 是一个 Servlet 容器,这个容器内可以存在多个 servlet,处理不同的请求。
加入 springmvc,它提供了一个 Servlet,可以处理.do 结尾的请求,还有静态资源的请求和其它请求。如果,我们为 springmvc 在 web.xml 配置的映射 url 为/*则代表所有请求都要走 springmvc,tomcat 中的 DefaultServlet 和 JSPServlet 都无法得到请求。因为这个两个 servlet 是在 tomcat 的 conf/web.xml 中配置的,这个 web.xml 的配置会被各个应用自己的 web.xml 覆盖,最终 tomcat 按照应用定义的 web.xml 来做映射。
SpringMVC 的核心控制器叫 DispatcherServlet,映射原理和我们上面山寨版的一样,因为本质还是个 Servlet。但 SpringMVC 提供了一个标签,解决上面/无法读取静态资源的问题:

1
2
<!-- 静态资源处理  css js imgs -->
<mvc:resources location="/resources/**" mapping="/resources"/>

其他的我也不说了,一张图,大家体会一下 DispatcherServlet 与 SpringMVC 到底是什么关系:

DispatcherServlet 确实是一个 Servlet,但它只是入口,SpringMVC 要比想象的庞大。DispatcherServlet 只是 Springmvc 的一个入口,Springmvc 内部是十分复杂的。

对比 Servlt 和 CGI

java 的 servlet 实现是自己规定了一套协议,与 CGI 和 fastcgi 协议不同。
Servlet 与 CGI 的区别
对比一:当用户浏览器发出一个 Http/CGI 的请求,或者说调用一个 CGI 程序的时候,服务器端就要新启用一个进程(而且是每次都要调用),调用 CGI 程序越多(特别是访问量高的时候),就要消耗系统越多的处理时间,只剩下越来越少的系统资源,对于用户来说,只能是漫长的等待服务器端的返回页面了,这对于电子商务激烈发展的今天来说,不能不说是一种技术上的遗憾。

而 Servlet 充分发挥了服务器端的资源并高效的利用。每次调用 Servlet 时并不是新启用一个进程,而是在一个 Web 服务器的进程中共享和分离线程,而线程最大的好处在于可以共享一个数据源,使系统资源被有效利用。

对比二:传统的 CGI 程序,不具备平台无关性特征,系统环境发生变化,CGI 程序就要瘫痪,而 Servlet 具备 Java 的平台无关性,在系统开发过程中保持了系统的可扩展性、高效性。

对比三:传统技术中,一般大都为二层的系统架构,即 Web 服务器+数据库服务器,导致网站访问量大的时候,无法克服 CGI 程序与数据库建立连接时速度慢的瓶颈,从而死机、数据库死锁现象频繁发生。而 Servlet 有连接池的概念,它可以利用多线程的优点,在系统缓存中事先建立好若干与数据库的连接,到时候若想和数据库打交道可以随时跟系统”要”一个连接即可,反应速度可想而知。

Java Servlet 为什么不做成 FastCGI 模式?

1、JVM 多线程架构健壮性非常强

理论上多进程确实健壮性更强,但实际上,JVM 进程远比普通 C/C++进程坚固。

用 C/C++写程序,随便一个空指针、内存越界、内存泄漏就可能意外终止进程,更遑论很多 C/C++程序处理严重错误的方式都是很粗暴的 exit 或者 panic。

而用 Java 写代码,要想不小心终止进程很难。异常机制可以保证几乎没有任何 Java 代码能够直接干掉一个线程或进程,Java 程序的线程几乎跟普通 C/C++的进程一样脆弱/坚固。

2、Tomcat 静态文件性能并不差
别的 Servlet 容器不清楚,但是得益于独立的 connector 设计,较新版本的 Tomcat 有了 NIO、APR 等技术的加持,静态文件性能不会比 apache httpd 差到哪儿去。

“很多网站直接用 servlet 容器当 web 服务器”,因为这样很简单,而且暂时可能没遇到或不关心性能瓶颈。

Tomcat 是非常流行的 Web Server,它还是一个满足 Servlet 规范的容器。那么想一想,Tomcat 和我们的 Web 应用是什么关系?

从感性上来说,我们一般需要把 Web 应用打成 WAR 包部署到 Tomcat 中,在我们的 Web 应用中,我们要指明 URL 被哪个类的哪个方法所处理「不论是原始的 Servlet 开发,还是现在流行的 Spring MVC 都必须指明」。

由于我们的 Web 应用是运行在 Tomcat 中,请求必定是先到达 Tomcat 的。Tomcat 对于请求实际上会进行如下的处理。

第一,提供 Socket 服务

Tomcat 的启动,必然是 Socket 服务,只不过它支持 HTTP 协议而已!

这里其实可以扩展思考下,Tomcat 既然是基于 Socket,那么是基于 BIO or NIO or AIO 呢?

第二,进行请求的分发

要知道一个 Tomcat 可以为多个 Web 应用提供服务,很显然,Tomcat 可以把 URL 下发到不同的 Web 应用。

第三,需要把请求和响应封装成 request/response

我们在 Web 应用这一层,可从来没有封装过 request/response 的,我们都是直接使用的,这就是因为 Tomcat 已经为你做好了!

话不多说,先来看一眼工程截图。

一、封装请求对象


这里可以清楚的看到,通过输入流,对 HTTP 协议进行解析,拿到了 HTTP 请求头的方法以及 URL。

二、封装响应对象


基于 HTTP 协议的格式进行输出写入。

三、Servlet 请求处理基类


前文说 Tomcat 是满足 Servlet 规范的容器,那么自然 Tomcat 需要提供 API。这里看到了 Servlet 常见的 doGet/doPost/service 方法。

# 四、Servlet 实现类


提供这 2 个具体的 Servlet 实现,只是为了后续的测试!

五、Servlet 配置



你应该有些感觉了吧?在 servlet 开发中,会在 web.xml 中通过来进行指定哪个 URL 交给哪个 servlet 进行处理。

六、启动类




这里,你能够看到 Tomcat 的处理流程,即把 URL 对应处理的 Servlet 关系形成,解析 HTTP 协议,封装请求/响应对象,利用反射实例化具体的 Servlet 进行处理即可。

七、测试


实现一个简易版的 Tomcat 就这么 7 大步搞定,大家都来动手实现一下吧,代码最好不要复制,自己动手一个个敲,这样实现之后,对 Tomcat 就没那么陌生了。

Play 框架架构
当请求从浏览器发到服务端时,会有 Routes 来处理,如我们请求 /use/1 的 URL 时。这个请求将在 Routes 找到对应的函数来处理,在 Django 中处理请求的函数叫做 URL Dispatcher。


这里的路由规则分为了两个部分:请求动态数据或静态资源。
浏览器一般先请求获得 html 页面,然后 html 内容如果有 ajax 则会再发送请求过来,请求动态数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Routes
# This file defines all application routes (Higher priority routes first)
# ~~~~

# Home page
GET / controllers.ApplicationController.index
POST /user controllers.ApplicationController.addUser
GET /delete/:id controllers.ApplicationController.deleteUser(id : Long)
GET /user/:id controllers.ApplicationController.getUser(id : Long)
GET /api/user/:id controllers.ApiController.getUser(id : Long)
POST /api/user controllers.ApiController.createUser

# Map static resources from the /public folder to the /assets URL path
GET /assets/*file controllers.Assets.versioned(path="/public", file: Asset)

随后在我们的 ApiController 中会有一个对应的 getUser 的方法来处理,如下所示:

1
2
3
4
5
6
7
8
class ApiController extends Controller {
def getUser(id: Long) = Action.async { implicit request =>
UserService.getUser(id).map {
case None => NotFound(Json.obj("error" -> "Not Found"))
case Some(user) => Ok(Json.toJson(user))
}
}
}

随后还是相应的 UserService 去取相应的用户

1
2
3
4
5
6
7
8
9
10
11
12
import models.{User, Users}
import scala.concurrent.Future

object UserService {

def addUser(user: User): Future[String] = {
Users.add(user)
}
def getUser(id: Long): Future[Option[User]] = {
Users.get(id)
}
}

最后便会执行到 model 层:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
object Users {

val dbConfig = DatabaseConfigProvider.get[JdbcProfile](Play.current)

val users = TableQuery[UserTableDef]

def add(user: User): Future[String] = {
dbConfig.db.run(users += user).map(res => user.toString()).recover {
case ex: Exception => ex.getCause.getMessage
}
}
def get(id: Long): Future[Option[User]] = {
dbConfig.db.run(users.filter(_.id === id).result.headOption)
}
}

然后我们就取到了这个用户。如果只是从这个过程上来说,我觉得和一般的 MVC 框架并没有太大的区别。

编程的本质

当写过许许多多程序后,接触了那么多编程模式、设计模式、框架、语言、算法、数据结构以后,就会发现编程的本质万变不离其宗就是,操纵一坨数据。当然操纵的方式有许多,存储的方式也五花八门,但是本质不变,就是访问数据(读取以及改变)。

下面谈一谈我对两种编程方式的理解,就是面向对象编程,以及函数式编程。我用 JavaScript 这种神奇的语言来解释,因为 JavaScript 既可以面向对象编程,也可以函数式编程。

数据存放方式

1、对于 OO(面向对象,下同),数据存放在对象的属性(成员变量)里面,以及静态成员(全局变量)

2、对于函数式,数据存放在闭包(各级作用域)里面,作用域包括全局作用域。

数据访问方式

数据存放方式决定了访问的方式。

1、对于 OO 来说,访问数据(全局变量除外)需要先获取对象的引用,然后再进行操作(直接访问——公共属性,或者调用成员函数/方法访问——私有属性)

2、对于函数式,访问数据是直接访问(通过函数入参或者作用域链查找)

下面上一段代码

1
2
3
4
5
6
7
8
//OO
class Foo {
constructor(){
this.bar = 0
}
}
let foo = new Foo()
foo.bar ++

//函数式

1
2
3
4
5
let bar = 0
function foo(){
bar ++
}
foo()

这是随便写了一些没有用代码,只是为了展示两种风格的编程方式。

从中我们可以看出一些东西,OO 是通过持有,以及传递对象的方式去让别的对象来操作数据,而对象也会是其他对象的成员,层层嵌套。当你想要访问某一个数据的时候,就需要顺着对象的引用链条去找,一步步去操作。

函数式传递的则是函数,调用函数即操作数据,传递函数的时候其实隐含着传递了函数创建的时候所附带的作用域,这个在表面上看不出来,在底层是有的。

OO 所谓的对象,本质上就是函数式中的作用域。

为了理解这句话,我们来看高阶函数的本质

1
2
3
4
5
6
7
8
function foo(x){
let bar = x
return function(){
return bar
}
}
let Bar = foo(1)
console.log(Bar())//1

当调用 foo(1)的时候返回一个函数,这个函数可以访问 foo 函数内部的 bar 变量,这就是高阶函数。如果翻译成 OO 思想大家就知道怎么回事了。

1
2
3
4
5
6
7
8
9
10
class Foo{
constructor(x){
this.bar = x
}
Bar(){
return this.bar
}
}
let foo = new Foo(1)
console.log(foo.Bar())

看上去是不是差不多。其实在 C#中对 lambda 表达式的编译就是这个原理。会内部创建一些类。

当然高阶函数的写法肯定不会这么啰嗦,会写的比较优雅。(这就是本质区别)

1
2
3
let foo = bar=>()=>bar
let Bar = foo(1)
console.log(Bar())//1

所以 OO 编程,是面向作用域编程,而函数式编程,是面向功能编程。

函数式编程有一个巨大的优势,就是作用域链,也就是说函数可以访问的变量范围要远远大于 OO,灵活性就成几何级数上升。换句话也可以说 OO 的约束性更强。

FP 适合数据处理,OOP 适合外包业务领域

函数式编程(FP)和面向对象编程(OOP)直接相比是一种常见的误解。这种比较应该是 FP 和 OOP 支持者之间互相怼的产物。

事实上,FP 和 OOP 是两种不同的看待事物的方式。FP 强调“everything is lambda”,并且强调在逻辑处理中不变性的重要性。不变到什么地步呢?原教旨主义的 FP 就连普通的循环都不可以写(因为循环都有个变化的 idx 或者条件之类的变量),必须用递归实现。这样做的结果就是把一切“状态”都消除。任何“状态”都是由确定的输入经过确定的一组函数处理得到的最终结果。 FP 适合的领域是处理数据。

OOP 强调“everything is object”,以及 object 之间的消息传递。通过消息传递改变每个 Object 的内部状态。OOP 之父 Alan Kay 表示”OOP is all about messaging”。利用 OOP 建模,都会通过某种消息机制来模拟一些场景的处理。比如交易=下单 Object,支付 Object,积分 Object 等之间进行交互当然,实际的 OOP 的程序运行时为了效率一般会用方法调用,而不是真的传递一个物理消息。OOP 适合的领域是企业 ERP,外包业务等。

如果你看懂了上面两个概念,就会发现他们说的事情压根就不在一个频道里。因此各自的好处也不能证明另一方有缺点。

现在经常看到文章表达 FP 如何如何优于 OOP,大概原因有这么两点:第一点是,OOP 早期不切实际的吹牛皮,吹爆了。很多人谈起 OOP,都会有“用了 OOP,代码耦合就小了,就容易维护了,扩展就方便了,代码就更容易复用了等等“的第一印象。但实际上这并不一定发生。软件设计并非因为 OOP 就直接自动变好了。因此很多程序员在趟坑多年后可能会感觉“我擦,学了这么多年,全是假的“。更进一步的,像 Java 这样的“纯 OOP”语言迫使程序员并不需要 OOP 的情况下也得照着 OOP 的方式去写代码,结果啰嗦又臃肿。所以很多人越来越讨厌 OOP 其实是可以理解的。(比如这篇 Goodbye, Objected Oriented Programming)。现在的 Java 程序大量使用反射、lambda 等技术,已经不是早期那个单纯 OOP 语言了。第二点是现代程序开始往并发发展。而 FP 的不可变,没有副作用等特性恰好让并发编程变得不容易出错。并且配合多种并发模型(如 CSP、Map Reduce、Fork & Join、Promise 等),可以解决很多高并发的问题,显得高、大、上、酷。

但是,我非常赞同《人月神话》的著名论断——没有银弹。不论 OOP 还是 FP,用好了都可以发挥作用,用不好一样吃瘪。

举几个例子,一个业务领域建模,其实模拟的就是现实当中的不同角色的人/机构的工作方式。因为如果是人/机构互相协作,就是通过消息来协作的。比如博士生想发文章,先得自己写,写了老板审阅,完事发给期刊编辑,编辑找同行评议,完事发表,发表的结果会收录到某个文献索引数据库。这个过程就是多个独立的“对象”在相互协作的结果。因此 OOP 在这个层面上对这个流程进行抽象是很合适的。当然你也可以说,这时我用 FP 的各种动作函数的组织来描述这个过程,也是可以的。但是如果比较一下,这个场景用 FP 和 OOP 建模,哪个更容易理解呢?

再比如,对一组数据做加工,先查询,然后聚合,聚合后排序,再 join,再排序,再聚合,再转换(map)得到最终的结果。这个过程,用 FP 的函数就很自然,因为这一看就是 result = func1(func2(func3…funcN(x))))这时用 OOP 呢?给每一个步骤建一个 class?然后把排序、聚合等操作放在 class 里?抽象个基类?或者弄个 XXXUtils 的静态方法集合类?当然都可以做,但是很明显这不是个好的设计。再再比如,一个业务流程,就是一组步骤:第一步如何如何,第二部如何如何……。这时用 FP 和 OOP 都不能很好的表达问题(可能 FP 接近点)。这其实是典型的“指令式编程“。如果业务逻辑如此,那么就照着一步一步做就是最好的,而不是抽取函数和不变状态;或者定义一些根本无意义的 class。

说了这么多,其实希望表达的意思是:到底用哪种编程模式,要看问题本身适合哪个。哪个用起来自然,和问题本身特质搭配,那就用哪个。用对了,事半功倍;用错了,就各种纠结拧巴。你希望你一个东西模拟为 Object,前提是这个东西本身容易抽象成一个 Object;你希望你一个数据可以抽象为一组函数执行的组合,前提是这样理解更自然,更舒服。此外,同一个问题可以拆解为不同的层次,不同的层次可以使用各自适合的方式。比如高层的可以 OOP,具体到某个执行逻辑里可以用 FP 或者指令编程。

对比面向过程,面向对象,函数式编程

函数式编程、面向对象编程和面向过程编程都是思维方式,具体语言的语法不构成限制。这三者可以当作是一条光谱上不同的颜色,一头是命令式风格,一头是声明式风格。

1
2
               ↓<过程式>           ↓<面向对象>       ↓<函数式>
[命令式风格] o----------------------------------------------------> [声明式风格]

箭头从左到右抽象程度逐渐上升。过程式风格有函数的概念,和纯粹的指令相比可以用简短的函数替代一段指令;面向对象风格包装了一层 class 这样的概念,允许打包的一组数据自己持有一些信息,使用者调用方法前不需要知道所有的事情;函数式风格允许把“处理过程”本身当作参数,传入的处理过程的参数就代表具体操作时遇到的实体。
函数式编程就是用函数的组合来解决问题,面向对象是建很多对象来互相交互解决问题,面向过程就是下一步下一步下一步。

例子

输入十个整数,输出大于平均值的数,然后输出排序后的十个数面向过程/C/

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
int input[10];
int sum = 0;
int tmp;
double average;
for (int i = 0; i < 10; i++)
{
scanf("%d", &input[i]);
sum += input[i];
}

average = sum / 10.0;
for (int i = 0; i < 10; i++)
{
if (input[i] > average)
{
printf("%d\n", input[i]);
}

}
/_Bubble Sort_/
for (int i = 0; i < 9; i++)
{
for (int j = 0; j < 9 - i; j++)
{
if (input[j] > input[j + 1])
{
tmp = input[j];
input[j] = input[j + 1];
input[j + 1] = tmp;
}
}
}

for (int i = 0; i < 10; i++)
{
printf("%d\n", input[i]);
}

面向对象//C++

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class solve
{
public:
solve() noexcept;
void sort();
void show();//输出大于均值的数
void print();//打印数组
private:
int input[10];
int sum;
double average;
};
//具体实现省略
int main()
{
auto test = new solve();
test->show();
test->sort();
test->print();
return 0;
}

函数式编程#Python

1
2
3
4
array = [int(input()) for i in range(10)]
average = sum(array) / 10;
print(list(filter(lambda x: x > average, array)))
print(sorted(array))

一、简单明快的早期时代


可称之为 Web 1.0 时代,非常适合创业型小项目,不分前后端,经常 3-5 人搞定所有开发。页面由 JSP、PHP 等工程师在服务端生成,浏览器负责展现。基本上是服务端给什么浏览器就展现什么,展现的控制在 Web Server 层。

这种模式的好处是:简单明快,本地起一个 Tomcat 或 Apache 就能开发,调试什么的都还好,只要业务不太复杂。

然而业务总会变复杂,这是好事情,否则很可能就意味着创业失败了。业务的复杂会让 Service 越来越多,参与开发的人员也很可能从几个人快速扩招到几十人。在这种情况下,会遇到一些典型问题:

1、Service 越来越多,调用关系变复杂,前端搭建本地环境不再是一件简单的事。考虑团队协作,往往会考虑搭建集中式的开发服务器来解决。这种解决方案对编译型的后端开发来说也许还好,但对前端开发来说并不友好。天哪,我只是想调整下按钮样式,却要本地开发、代码上传、验证生效等好几个步骤。也许习惯了也还好,但开发服务器总是不那么稳定,出问题时往往需要依赖后端开发搞定。看似仅仅是前端开发难以本地化,但这对研发效率的影响其实蛮大。

2、JSP 等代码的可维护性越来越差。JSP 非常强大,可以内嵌 Java 代码。这种强大使得前后端的职责不清晰,JSP 变成了一个灰色地带。经常为了赶项目,为了各种紧急需求,会在 JSP 里揉杂大量业务代码。积攒到一定阶段时,往往会带来大量维护成本。

这个时期,为了提高可维护性,可以通过下面的方式实现前端的组件化:


理论上,如果大家都能按照最佳实践去书写代码,那么无论是 JSP 还是 PHP,可维护性都不会差。但可维护性更多是工程含义,有时候需要通过限制带来自由,需要某种约定,使得即便是新手也不会写出太糟糕的代码。

如何让前后端分工更合理高效,如何提高代码的可维护性,在 Web 开发中很重要。下面我们继续来看,技术架构的演变如何解决这两个问题。

二、后端为主的 MVC 时代

为了降低复杂度,以后端为出发点,有了 Web Server 层的架构升级,比如 Structs、Spring MVC 等,这是后端的 MVC 时代。


代码可维护性得到明显好转,MVC 是个非常好的协作模式,从架构层面让开发者懂得什么代码应该写在什么地方。为了让 View 层更简单干脆,还可以选择 Velocity、Freemaker 等模板,使得模板里写不了 Java 代码。看起来是功能变弱了,但正是这种限制使得前后端分工更清晰。然而依旧并不是那么清晰,这个阶段的典型问题是:

1、前端开发重度依赖开发环境。这种架构下,前后端协作有两种模式:一种是前端写 demo,写好后,让后端去套模板。淘宝早期包括现在依旧有大量业务线是这种模式。好处很明显,demo 可以本地开发,很高效。不足是还需要后端套模板,有可能套错,套完后还需要前端确定,来回沟通调整的成本比较大。另一种协作模式是前端负责浏览器端的所有开发和服务器端的 View 层模板开发,支付宝是这种模式。好处是 UI 相关的代码都是前端去写就好,后端不用太关注,不足就是前端开发重度绑定后端环境,环境成为影响前端开发效率的重要因素。

2、前后端职责依旧纠缠不清。Velocity 模板还是蛮强大的,变量、逻辑、宏等特性,依旧可以通过拿到的上下文变量来实现各种业务逻辑。这样,只要前端弱势一点,往往就会被后端要求在模板层写出不少业务代码。还有一个很大的灰色地带是 Controller,页面路由等功能本应该是前端最关注的,但却是由后端来实现。Controller 本身与 Model 往往也会纠缠不清,看了让人咬牙的代码经常会出现在 Controller 层。这些问题不能全归结于程序员的素养,否则 JSP 就够了。

经常会有人吐槽 Java,但 Java 在工程化开发方面真的做了大量思考和架构尝试。Java 蛮符合马云的一句话:让平凡人做非凡事。

三、Ajax 带来的 SPA 时代

历史滚滚往前,2004 年 Gmail 像风一样的女子来到人间,很快 2005 年 Ajax 正式提出,加上 CDN 开始大量用于静态资源存储,于是出现了 JavaScript 王者归来的 SPA (Single Page Application 单页面应用)时代。


这种模式下,前后端的分工非常清晰,前后端的关键协作点是 Ajax 接口。看起来是如此美妙,但回过头来看看的话,这与 JSP 时代区别不大。复杂度从服务端的 JSP 里移到了浏览器的 JavaScript,浏览器端变得很复杂。类似 Spring MVC,这个时代开始出现浏览器端的分层架构:


对于 SPA 应用,有几个很重要的挑战:

1、前后端接口的约定。如果后端的接口一塌糊涂,如果后端的业务模型不够稳定,那么前端开发会很痛苦。这一块在业界有 API Blueprint 等方案来约定和沉淀接口,在阿里,不少团队也有类似尝试,通过接口规则、接口平台等方式来做。有了和后端一起沉淀的接口规则,还可以用来模拟数据,使得前后端可以在约定接口后实现高效并行开发。相信这一块会越做越好。

2、前端开发的复杂度控制。SPA 应用大多以功能交互型为主,JavaScript 代码过十万行很正常。大量 JS 代码的组织,与 View 层的绑定等,都不是容易的事情。典型的解决方案是业界的 Backbone,但 Backbone 做的事还很有限,依旧存在大量空白区域需要挑战。

SPA 让前端看到了一丝绿色,但依旧是在荒漠中行走。

四、前端为主的 MV* 时代

为了降低前端开发复杂度,除了 Backbone,还有大量框架涌现,比如 EmberJS、KnockoutJS、AngularJS 等等。这些框架总的原则是先按类型分层,比如 Templates、Controllers、Models,然后再在层内做切分,如下图:


好处很明显:

1、前后端职责很清晰。前端工作在浏览器端,后端工作在服务端。清晰的分工,可以让开发并行,测试数据的模拟不难,前端可以本地开发。后端则可以专注于业务逻辑的处理,输出 RESTful 等接口。

2、前端开发的复杂度可控。前端代码很重,但合理的分层,让前端代码能各司其职。这一块蛮有意思的,简单如模板特性的选择,就有很多很多讲究。并非越强大越好,限制什么,留下哪些自由,代码应该如何组织,所有这一切设计,得花一本的厚度去说明。

3、部署相对独立,产品体验可以快速改进。

但依旧有不足之处:

代码不能复用。比如后端依旧需要对数据做各种校验,校验逻辑无法复用浏览器端的代码。如果可以复用,那么后端的数据校验可以相对简单化。
全异步,对 SEO 不利。往往还需要服务端做同步渲染的降级方案。
性能并非最佳,特别是移动互联网环境下。
SPA 不能满足所有需求,依旧存在大量多页面应用。URL Design 需要后端配合,前端无法完全掌控。
五、Node 带来的全栈时代
前端为主的 MV* 模式解决了很多很多问题,但如上所述,依旧存在不少不足之处。随着 Node.js 的兴起,JavaScript 开始有能力运行在服务端。这意味着可以有一种新的研发模式:


在这种研发模式下,前后端的职责很清晰。对前端来说,两个 UI 层各司其职:

1、Front-end UI layer 处理浏览器层的展现逻辑。通过 CSS 渲染样式,通过 JavaScript 添加交互功能,HTML 的生成也可以放在这层,具体看应用场景。

2、Back-end UI layer 处理路由、模板、数据获取、cookie 等。通过路由,前端终于可以自主把控 URL Design,这样无论是单页面应用还是多页面应用,前端都可以自由调控。后端也终于可以摆脱对展现的强关注,转而可以专心于业务逻辑层的开发。

通过 Node,Web Server 层也是 JavaScript 代码,这意味着部分代码可前后复用,需要 SEO 的场景可以在服务端同步渲染,由于异步请求太多导致的性能问题也可以通过服务端来缓解。前一种模式的不足,通过这种模式几乎都能完美解决掉。

与 JSP 模式相比,全栈模式看起来是一种回归,也的确是一种向原始开发模式的回归,不过是一种螺旋上升式的回归。

基于 Node 的全栈模式,依旧面临很多挑战:

需要前端对服务端编程有更进一步的认识。比如 network/tcp、PE 等知识的掌握。
Node 层与 Java 层的高效通信。Node 模式下,都在服务器端,RESTful HTTP 通信未必高效,通过 SOAP 等方式通信更高效。一切需要在验证中前行。
对部署、运维层面的熟练了解,需要更多知识点和实操经验。
大量历史遗留问题如何过渡。这可能是最大最大的阻力。
六、小结
回顾历史总是让人感慨,展望未来则让人兴奋。上面讲到的研发模式,除了最后一种还在探索期,其他各种在各大公司都已有大量实践。几点小结:

模式没有好坏高下之分,只有合不合适。
Ajax 给前端开发带来了一次质的飞跃,Node 很可能是第二次。
SoC(关注度分离) 是一条伟大的原则。上面种种模式,都是让前后端的职责更清晰,分工更合理高效。
还有个原则,让合适的人做合适的事。比如 Web Server 层的 UI Layer 开发,前端是更合适的人选。

“被调用者保存”和“调用者保存”寄存器

编译器的设计中有个概念叫做“被调用者保存”和“调用者保存”,可以近似的按子函数保存和父函数保存对应来理解,这一概念的出现完全是由于寄存器资源个数有限造成的。当父函数在调用子函数时,由于子函数可能访问到父函数用于保存数值的寄存器,为了互不干扰和造成覆盖,编译器就制定了相应的规则,%eax、%edx 和%ecx 被划分为“调用者保存”寄存器,顾名思义,这些寄存器上面存储的值,需要调用者(父函数)自己想办法先备份好,否则过会子函数直接使用这些寄存器时将无情的覆盖。如何备份?当然是事先压入栈中,等子函数调用完,再通过出栈恢复这些寄存器原本在父函数运行时的旧值;另外三个寄存器%ebx、%esi、%edi 被划分为“被调用者保存”寄存器,同样的,这些寄存器上有值,在使用前需要被调用者(子函数)自己想办法帮调用者(父函数)进行备份,具体方法就是子函数在覆盖他们之前,先进行入栈备份,等子函数返回时,再出栈还原父函数运行时这些寄存器上的旧值。

七种寻址方式

1 立即寻址方式

操作数作为指令的一部分而直接写在指令中,这种操作数称为立即数,这种寻址方式也就称为立即数寻址方式。

立即数可以是 8 位、16 位或 32 位,该数值紧跟在操作码之后。如果立即数为 16 位或 32 位,那么,它将按“高高低低”的原则进行存储。例如:

MOV AH, 80H     ADD AX, 1234H     MOV ECX, 123456H
MOV B1, 12H     MOV W1, 3456H    ADD D1, 32123456H

其中:B1、W1 和 D1 分别是字节、字和双字单元。

以上指令中的第二操作数都是立即数,在汇编语言中,规定:立即数不能作为指令中的第二操作数。该规定与高级语言中“赋值语句的左边不能是常量”的规定相一致。

立即数寻址方式通常用于对通用寄存器或内存单元赋初值。图是指令“MOV AX, 4576H”存储形式和执行示意图。

2 寄存器寻址方式

指令所要的操作数已存储在某寄存器中,或把目标操作数存入寄存器。把在指令中指出所使用寄存器(即:寄存器的助忆符)的寻址方式称为寄存器寻址方式。
指令中可以引用的寄存器及其符号名称如下:
   8 位寄存器有:AH、AL、BH、BL、CH、CL、DH 和 DL 等;
   16 位寄存器有:AX、BX、CX、DX、SI、DI、SP、BP 和段寄存器等;
   32 位寄存器有:EAX、EBX、ECX、EDX、ESI、EDI、ESP 和 EBP 等。
寄存器寻址方式是一种简单快捷的寻址方式,源和目的操作数都可以是寄存器。

1、源操作数是寄存器寻址方式
  如:ADD VARD, EAX    ADD VARW, AX     MOV VARB, BH 等。
  其中:VARD、VARW 和 VARB 是双字,字和字节类型的内存变量。在第 4 章将会学到如何定义它们。

2、目的操作数是寄存器寻址方式
  如:ADD BH, 78h      ADD AX, 1234h     MOV EBX, 12345678H 等。

3、源和目的操作数都是寄存器寻址方式
  如:MOV EAX, EBX     MOV AX, BX      MOV DH, BL 等。

由于指令所需的操作数已存储在寄存器中,或操作的结果存入寄存器,这样,在指令执行过程中,会减少读/写存储器单元的次数,所以,使用寄存器寻址方式的指令具有较快的执行速度。通常情况下,我们提倡在编写汇编语言程序时,应尽可能地使用寄存器寻址方式,但也不要把它绝对化。

3 七种寻址方式(直接寻址方式)

指令所要的操作数存放在内存中,在指令中直接给出该操作数的有效地址,这种寻址方式为直接寻址方式。

在通常情况下,操作数存放在数据段中,所以,其物理地址将由数据段寄存器 DS 和指令中给出的有效地址直接形成,但如果使用段超越前缀,那么,操作数可存放在其它段。

例:假设有指令:MOV BX, [1234H],在执行时,(DS)=2000H,内存单元 21234H 的值为 5213H。问该指令执行后,BX 的值是什么?

解:根据直接寻址方式的寻址规则,把该指令的具体执行过程用下图来表示。

从图中,可看出执行该指令要分三部分:

由于 1234H 是一个直接地址,它紧跟在指令的操作码之后,随取指令而被读出;

访问数据段的段寄存器是 DS,所以,用 DS 的值和偏移量 1234H 相加,得存储单元的物理地址:21234H;

取单元 21234H 的值 5213H,并按“高高低低”的原则存入寄存器 BX 中。

所以,在执行该指令后,BX 的值就为 5213H。

由于数据段的段寄存器默认为 DS,如果要指定访问其它段内的数据,可在指令中用段前缀的方式显式地书写出来。

下面指令的目标操作数就是带有段前缀的直接寻址方式。

MOV   ES:[1000H], AX

直接寻址方式常用于处理内存单元的数据,其操作数是内存变量的值,该寻址方式可在 64K 字节的段内进行寻址。

注意:立即寻址方式和直接寻址方式的书写格式的不同,直接寻址的地址要写在括号“[”,“]”内。在程序中,直接地址通常用内存变量名来表示,如:MOV BX, VARW,其中,VARW 是内存字变量。

试比较下列指令中源操作数的寻址方式(VARW 是内存字变量):
   MOV   AX, 1234H MOV   AX, [1234H] ;前者是立即寻址,后者是直接寻址
   MOV   AX, VARW MOV   AX, [VARW] ;两者是等效的,均为直接寻址

4 七种寻址方式(寄存器间接寻址方式)

操作数在存储器中,操作数的有效地址用 SI、DI、BX 和 BP 等四个寄存器之一来指定,称这种寻址方式为寄存器间接寻址方式。该寻址方式物理地址的计算方法如下:

寄存器间接寻址方式读取存储单元的原理如图所示。

在不使用段超越前缀的情况下,有下列规定:

若有效地址用 SI、DI 和 BX 等之一来指定,则其缺省的段寄存器为 DS;
若有效地址用 BP 来指定,则其缺省的段寄存器为 SS(即:堆栈段)。

例:假设有指令:MOV BX,[DI],在执行时,(DS)=1000H,(DI)=2345H,存储单元 12345H 的内容是 4354H。问执行指令后,BX 的值是什么?

解:根据寄存器间接寻址方式的规则,在执行本例指令时,寄存器 DI 的值不是操作数,而是操作数的地址。该操作数的物理地址应由 DS 和 DI 的值形成,即:

PA=(DS)16+DI=1000H16+2345H=12345H。

所以,该指令的执行效果是:把从物理地址为 12345H 开始的一个字的值传送给 BX。

其执行过程如图所示。

5 七种寻址方式(寄存器相对寻址方式)

操作数在存储器中,其有效地址是一个基址寄存器(BX、BP)或变址寄存器(SI、D

I)的内容和指令中的 8 位/16 位偏移量之和。其有效地址的计算公式如公式所示。

在不使用段超越前缀的情况下,有下列规定:

若有效地址用SI、DI和BX等之一来指定,则其缺省的段寄存器为DS;

若有效地址用BP来指定,则其缺省的段寄存器为SS。

指令中给出的 8 位/16 位偏移量用补码表示。在计算有效地址时,如果偏移量是 8 位,则进行符号扩展成 16 位。当所得的有效地址超过 0FFFFH,则取其 64K 的模。

例:假设指令:MOV BX, [SI+100H],在执行它时,(DS)=1000H,(SI)=2345H,内存单元 12445H 的内容为 2715H,问该指令执行后,BX 的值是什么?

解:根据寄存器相对寻址方式的规则,在执行本例指令时,源操作数的有效地址 EA 为:

EA=(SI)+100H=2345H+100H=2445H

该操作数的物理地址应由 DS 和 EA 的值形成,即:

PA=(DS)16+EA=1000H16+2445H=12445H。

所以,该指令的执行效果是:把从物理地址为 12445H 开始的一个字的值传送给 BX。

其执行过程如图所示。

6 七种寻址方式(基址加变址寻址方式)

操作数在存储器中,其有效地址是一个基址寄存器(BX、BP)和一个变址寄存器(SI、DI)的内容之和。其有效地址的计算公式如公式所示。

在不使用段超越前缀的情况下,规定:如果有效地址中含有 BP,则缺省的段寄存器为 SS;否则,缺省的段寄存器为 DS。

例:假设指令:MOV BX, [BX+SI],在执行时,(DS)=1000H,(BX)=2100H,(SI)=0011H,内存单元 12111H 的内容为 1234H。问该指令执行后,BX 的值是什么?

解:根据基址加变址寻址方式的规则,在执行本例指令时,源操作数的有效地址 EA 为:

EA=(BX)+(SI)=2100H+0011H=2111H

该操作数的物理地址应由 DS 和 EA 的值形成,即:

PA=(DS)16+EA=1000H16+2111H=12111H

所以,该指令的执行效果是:把从物理地址为 12111H 开始的一个字的值传送给 BX。

其执行过程如图所示。

7 七种寻址方式(相对基址加变址寻址方式)

操作数在存储器中,其有效地址是一个基址寄存器(BX、BP)的值、一个变址寄存器(SI、DI)的值和指令中的 8 位/16 位偏移量之和。其有效地址的计算公式如公式所示。

在不使用段超越前缀的情况下,规定:如果有效地址中含有 BP,则其缺省的段寄存器为 SS;否则,其缺省的段寄存器为 DS。

指令中给出的 8 位/16 位偏移量用补码表示。在计算有效地址时,如果偏移量是 8 位,则进行符号扩展成 16 位。当所得的有效地址超过 0FFFFH,则取其 64K 的模。

例:假设指令:MOV AX, [BX+SI+200H],在执行时,(DS)=1000H,(BX)=2100H,(SI)=0010H,内存单元 12310H 的内容为 1234H。问该指令执行后,AX 的值是什么?

解:根据相对基址加变址寻址方式的规则,在执行本例指令时,源操作数的有效地址 EA 为:

EA=(BX)+(SI)+200H=2100H+0010H+200H=2310H

该操作数的物理地址应由 DS 和 EA 的值形成,即:

PA=(DS)16+EA=1000H16+2310H=12310H

所以,该指令的执行效果是:把从物理地址为 12310H 开始的一个字的值传送给 AX。其执行过程如图所示。

从相对基址加变址这种寻址方式来看,由于它的可变因素较多,看起来就显得复杂些,但正因为其可变因素多,它的灵活性也就很高。比如:

用 D1[i]来访问一维数组 D1 的第 i 个元素,它的寻址有一个自由度,用 D2[i][j]来访问二维数组 D2 的第 i 行、第 j 列的元素,其寻址有二个自由度。多一个可变的量,其寻址方式的灵活度也就相应提高了。

相对基址加变址寻址方式有多种等价的书写方式,下面的书写格式都是正确的,并且其寻址含义也是一致的。

MOV   AX, [BX+SI+1000H]    MOV   AX, 1000H[BX+SI]
MOV   AX, 1000H[BX][si]    MOV   AX, 1000H[SI][bx]

但书写格式 BX [1000+SI]和 SI[1000H+BX]等是错误的,即所用寄存器不能在“[“,”]”之外,该限制对寄存器相对寻址方式的书写也同样起作用。

相对基址加变址寻址方式是以上 7 种寻址方式中最复杂的一种寻址方式,它可变形为其它类型的存储器寻址方式。下表列举出该寻址方式与其它寻址方式之间的变形关系。

1.根目录下的结构

1
2
[zk: localhost:2181(CONNECTED) 72] ls /
[isr_change_notification, zookeeper, admin, consumers, cluster, config, latest_producer_id_block, controller, brokers, controller_epoch]

2. admin 的结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
[zk: localhost:2181(CONNECTED) 73] ls /admin
[delete_topics]
[zk: localhost:2181(CONNECTED) 74] ls /admin/delete_topics
[]
[zk: localhost:2181(CONNECTED) 75] get /admin/delete_topics
null
cZxid = 0xe
ctime = Sun Sep 02 20:04:12 PDT 2018
mZxid = 0xe
mtime = Sun Sep 02 20:04:12 PDT 2018
pZxid = 0xe
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 0
[zk: localhost:2181(CONNECTED) 76]

3.consumers 的结构

本环境上没有设置消费组

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
[zk: localhost:2181(CONNECTED) 78] ls /consumers
[]
[zk: localhost:2181(CONNECTED) 79] get /consumers
null
cZxid = 0x2
ctime = Sun Sep 02 20:04:12 PDT 2018
mZxid = 0x2
mtime = Sun Sep 02 20:04:12 PDT 2018
pZxid = 0x2
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 0
[zk: localhost:2181(CONNECTED) 80]
## 4.config的结构
[zk: localhost:2181(CONNECTED) 82] ls /config
[topics, clients, changes]
[zk: localhost:2181(CONNECTED) 83] ls /config/topics
[__consumer_offsets, test, test2]
[zk: localhost:2181(CONNECTED) 84] ls /config/topics/test2
[]
[zk: localhost:2181(CONNECTED) 85] get /config/topics/test2
{"version":1,"config":{}}
cZxid = 0xcc
ctime = Sun Sep 02 20:18:06 PDT 2018
mZxid = 0xcc
mtime = Sun Sep 02 20:18:06 PDT 2018
pZxid = 0xcc
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 25
numChildren = 0
[zk: localhost:2181(CONNECTED) 86]

5.controllers 的结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
[zk: localhost:2181(CONNECTED) 92] ls /controller
[]
[zk: localhost:2181(CONNECTED) 93] get /controller
{"version":1,"brokerid":0,"timestamp":"1535943857541"} //表示broker0为kafka的控制节点
cZxid = 0x15
ctime = Sun Sep 02 20:04:17 PDT 2018
mZxid = 0x15
mtime = Sun Sep 02 20:04:17 PDT 2018
pZxid = 0x15
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x1659d6268f60000
dataLength = 54
numChildren = 0
[zk: localhost:2181(CONNECTED) 94]
## 6.brokers的结构
[zk: localhost:2181(CONNECTED) 95] ls /brokers
[seqid, topics, ids]
[zk: localhost:2181(CONNECTED) 96] ls /brokers/ids
[0]
[zk: localhost:2181(CONNECTED) 97] get /brokers/ids/0
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://localhost:9092"],"jmx_port":-1,"host":"localhost","timestamp":"1535943859255","port":9092,"version":4}
cZxid = 0x1c
ctime = Sun Sep 02 20:04:19 PDT 2018
mZxid = 0x1c
mtime = Sun Sep 02 20:04:19 PDT 2018
pZxid = 0x1c
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x1659d6268f60000
dataLength = 188
numChildren = 0
[zk: localhost:2181(CONNECTED) 98]

------------------------------brokers/topics----------------------
[zk: localhost:2181(CONNECTED) 100] ls /brokers/topics
[__consumer_offsets, test, test2]
[zk: localhost:2181(CONNECTED) 101] ls /brokers/topics/test

test test2
[zk: localhost:2181(CONNECTED) 101] ls /brokers/topics/test2
[partitions]
[zk: localhost:2181(CONNECTED) 102] ls /brokers/topics/test2/partitions
[2, 1, 0]
[zk: localhost:2181(CONNECTED) 103] ls /brokers/topics/test2/partitions/2
[state]
[zk: localhost:2181(CONNECTED) 104] ls /brokers/topics/test2/partitions/2/state
[]
[zk: localhost:2181(CONNECTED) 105] get /brokers/topics/test2/partitions/2/state
{"controller_epoch":1,"leader":0,"version":1,"leader_epoch":0,"isr":[0]}
//表示partition 0 的leader是在0 broker上
cZxid = 0xd2
ctime = Sun Sep 02 20:18:06 PDT 2018
mZxid = 0xd2
mtime = Sun Sep 02 20:18:06 PDT 2018
pZxid = 0xd2
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 72
numChildren = 0
[zk: localhost:2181(CONNECTED) 106]

6.深入理解生产者和消费者

https://www.cnblogs.com/mcbye/p/kafka-producer-in-detail.html
https://www.cnblogs.com/mcbye/p/kafka-consumer-in-detail.html

7.重平衡

说完消费者组,再来说说与消费者组息息相关的重平衡机制。重平衡可以说是 kafka 为人诟病最多的一个点了。

重平衡其实就是一个协议,它规定了如何让消费者组下的所有消费者来分配 topic 中的每一个分区。比如一个 topic 有 100 个分区,一个消费者组内有 20 个消费者,在协调者的控制下让组内每一个消费者分配到 5 个分区,这个分配的过程就是重平衡。

重平衡的触发条件主要有三个:

  • 消费者组内成员发生变更,这个变更包括了增加和减少消费者。注意这里的减少有很大的可能是被动的,就是某个消费者崩溃退出了
  • 主题的分区数发生变更,kafka 目前只支持增加分区,当增加的时候就会触发重平衡
  • 订阅的主题发生变化,当消费者组使用正则表达式订阅主题,而恰好又新建了对应的主题,就会触发重平衡

为什么说重平衡为人诟病呢?因为重平衡过程中,消费者无法从 kafka 消费消息,这对 kafka 的 TPS 影响极大,而如果 kafka 集内节点较多,比如数百个,那重平衡可能会耗时极多。数分钟到数小时都有可能,而这段时间 kafka 基本处于不可用状态。所以在实际环境中,应该尽量避免重平衡发生。

了解了什么是重平衡,重平衡的缺点和触发条件后,我们先来看看重平衡的三种不同策略,然后说说应该如何避免重平衡发生。

三种重平衡策略

kafka 提供了三种重平衡分配策略,这里顺便介绍一下:

Range

具体实现位于,package org.apache.kafka.clients.consumer.RangeAssignor。

这种分配是基于每个主题的分区分配,如果主题的分区分区不能平均分配给组内每个消费者,那么对该主题,某些消费者会被分配到额外的分区。我们来看看具体的例子。

举例:目前有两个消费者 C0 和 C1,两个主题 t0 和 t1,每个主题三个分区,分别是 t0p0,t0p1,t0p2,和 t1p0,t1p1,t1p2。

那么分配情况会是:

  • C0:t0p0, t0p1, t1p0, t1p1
  • C1:t0p2, t1p2

我来大概解释一下,range 这种模式,消费者被分配的单位是基于主题的,拿上面的例子来说,是主题 t0 的三个分区分配给 2 个消费者,t1 三个分区分配给消费者。于是便会出现消费者 c0 分配到主题 t0 两个分区,以及 t1 两个分区的情况(一个主题有三个分区,三个分区无法匹配两个消费者,势必有一个消费者分到两个分区),而非每个消费者分配两个主题各三个分区。

RoundRobin

具体实现位于,package org.apache.kafka.clients.consumer.RoundRobinAssignor。

RoundRobin 是基于全部主题的分区来进行分配的,同时这种分配也是 kafka 默认的 rebalance 分区策略。还是用刚刚的例子来看,

举例:两个消费者 C0 和 C1,两个主题 t0 和 t1,每个主题三个分区,分别是 t0p0,t0p1,t0p2,和 t1p0,t1p1,t1p2。

由于是基于全部主题的分区,那么分配情况会是:

  • C0:t0p0, t0p1, t1p1
  • C1:t1p0, t0p2, t1p2
    因为是基于全部主题的分区来平均分配给消费者,所以这种分配策略能更加均衡得分配分区给每一个消费者。

上面说的都是同一消费者组内消费组都订阅相同主题的情况。更复杂的情况是,同一组内的消费者订阅不同的主题,那么任然可能会导致分区不均衡的情况。

还是举例说明,有三个消费者 C0,C1,C2 。三个主题 t0,t1,t2,分别有 1,2,3 个分区 t0p0,t1p0,t1p1,t2p0,t2p1,t2p2。

其中,C0 订阅 t0,C1 订阅 t0,t1。C2 订阅 t0,t1,t2。最终订阅情况如下:

  • C0:t0p0
  • C1:t1p0
  • C2:t1p1,t2p0,t2p1,t2p2
    这个结果乍一看有点迷,其实可以这样理解,按照序号顺序进行循环分配,t0 只有一个分区,先碰到 C0 就分配给它了。t1 有两个分区,被 C1 和 C2 订阅,那么会循环将两个分区分配出去,最后到 t2,有三个分区,却只有 C2 订阅,那么就将三个分区分配给 C2。

Sticky

Sticky 分配策略是最新的也是最复杂的策略,其具体实现位于 package org.apache.kafka.clients.consumer.StickyAssignor。

这种分配策略是在 0.11.0 才被提出来的,主要是为了一定程度解决上面提到的重平衡非要重新分配全部分区的问题。称为粘性分配策略。

听名字就知道,主要是为了让目前的分配尽可能保持不变,只挪动尽可能少的分区来实现重平衡。

还是举例说明,有三个消费者 C0,C1,C2 。三个主题 t0,t1,t2,t3。每个主题各有两个分区, t0p0,t0p1,t1p0,t1p1,t2p0,t2p1,t3p0,t3p1。

现在订阅情况如下:

  • C0:t0p0,t1p1,t3p0
  • C1:t0p1,t2p0,t3p1
  • C2:t1p0,t2p1

假设现在 C1 挂掉了,如果是 RoundRobin 分配策略,那么会变成下面这样:

  • C0:t0p0,t1p0,t2p0,t3p0
  • C2:t0p1,t1p1,t2p1,t3p1

就是说它会全部重新打乱,再分配,而如何使用 Sticky 分配策略,会变成这样:

  • C0:t0p0,t1p1,t3p0,t2p0
  • C2:t1p0,t2p1,t0p1,t3p1

也就是说,尽可能保留了原来的分区情况,不去改变它,在这个基础上进行均衡分配,不过这个策略目前似乎还有些 bug,所以实际使用也不多。

避免重平衡

要说完全避免重平衡,那是不可能滴,因为你无法完全保证消费者不会故障。而消费者故障其实也是最常见的引发重平衡的地方,所以这里主要介绍如何尽力避免消费者故障。

而其他几种触发重平衡的方式,增加分区,或是增加订阅的主题,抑或是增加消费者,更多的是主动控制,这里也不多讨论。

首先要知道,如果消费者真正挂掉了,那我们是没有什么办法的,但实际中,会有一些情况,会让 kafka 错误地认为一个正常的消费者已经挂掉了,我们要的就是避免这样的情况出现。

当然要避免,那首先要知道哪些情况会出现错误判断挂掉的情况。在分布式系统中,通常是通过心跳来维持分布式系统的,kafka 也不例外。对这部分内容有兴趣可以看看我之前的这篇分布式系统一致性问题与 Raft 算法(上)。这里要说的是,在分布式系统中,由于网络问题你不清楚没接收到心跳,是因为对方真正挂了还是只是因为负载过重没来得及发生心跳或是网络堵塞。所以一般会约定一个时间,超时即判定对方挂了。而在 kafka 消费者场景中,session.timout.ms 参数就是规定这个超时时间是多少。

还有一个参数,heartbeat.interval.ms,这个参数控制发送心跳的频率,频率越高越不容易被误判,但也会消耗更多资源。

此外,还有最后一个参数,max.poll.interval.ms,我们都知道消费者 poll 数据后,需要一些处理,再进行拉取。如果两次拉取时间间隔超过这个参数设置的值,那么消费者就会被踢出消费者组。也就是说,拉取,然后处理,这个处理的时间不能超过 max.poll.interval.ms 这个参数的值。这个参数的默认值是 5 分钟,而如果消费者接收到数据后会执行耗时的操作,则应该将其设置得大一些。

小结一下,其实主要就是三个参数,session.timout.ms 控制心跳超时时间,heartbeat.interval.ms 控制心跳发送频率,以及 max.poll.interval.ms 控制 poll 的间隔。这里给出一个相对较为合理的配置,如下:

session.timout.ms:设置为 6s
heartbeat.interval.ms:设置 2s
max.poll.interval.ms:推荐为消费者处理消息最长耗时再加 1 分钟

8.无法消费消息

kafka 消费不到数据的原因,首先检查配置之类的,如是否设置了 group.id,对应的 topic 是否正确等等,这些不多说。

下面是我遇到的几种 kafka 消费不到数据的情况:

8.1 重复消费

参考https://www.jianshu.com/p/d63c1576e6cc
下面进行详细分析:

“消费确认”是所有消息中间件都要解决的一个问题,在 kafka 中涉及到两个消费位置:

(1)当前取消息所在的 consume offset;

(2)程序处理完毕发送 ack(确认字符)后所确定的 committed offset。

很显然,在异步模式下,committed offset 要落后于 consume offset。假如 consumer 挂了重启,那么它将从 commited offset 位置处开始重新消费,而不是 consume offset 位置,这也就意味着很可能重复消费,所以会导致一条数据也抓不到。

那么怎么解决这个问题呢?

答案就是自己保存 commited offset,而不是依赖 kafka 的集群保存 commited offset,把消息的处理和保存 offset 做成一个原子操作。

如何将消息的处理和保存 offset 做成一个原子操作呢,Kafka 的官方文档列举了自己保存 offset 的两种使用场景:

要自己保存 committed offset,就要做到以下几个操作:

1
2
3
Configure enable.auto.commit=false   //禁用自动ack
Use the offset provided with each ConsumerRecord to save your position. //每次取到消息,把对应的offset存下来
On restart restore the position of the consumer using seek(TopicPartition, long).//下次重启,通过consumer.seek函数,定位到自己保存的offset,从那开始消费

Kafka 本身的机制只能保证消息不漏,即”at least once”,而通过自己来保存 committed offset,我们可以实现消费端的消息不重,即”exactly once”,达到消息不重不丢的目的。

8.2 消息被清理掉

https://www.cnblogs.com/sylvialucy/p/7827044.html

1.长时间不消费导致 log.retention.hours 或者 log.retention.minutes 超时,清除 log,Offset.Stored 失效

1
consumer.Assign(new List<TopicPartitionOffset>(){ new TopicPartitionOffset(new TopicPartition("topic", 1), new Offset(index)) });

2.我一次加数据太多导致磁盘耗尽,kafka 管理员帮我改到 20G 内存,但是仍然有一部分数据超出,分区 offset 靠前的数据被清除,导致再次消费不到。清除掉的数据无法再次被消费,但是还保存的数据可以消费到.

解决办法

1
consumer.Assign(new List<TopicPartitionOffset>(){ new TopicPartitionOffset(new TopicPartition("topic", 1), Offset.Beginning) });

或者在配置中加

1
2
auto.offset.reset=smallest //.NET 默认是largest
auto.offset.reset=earliest//Java 默认是latest

8.3 kafka 手动清除 topic

当手动删除 Kafka 某一分片上的消息日志时,如上图蓝线所示,此是只是将 Kafka Log 中的信息清 0 了,但是 Zookeeper 中的 Partition 和 Offset 数据依然会记录。当重新启动 Kafka 后,我们会发现如下二种情况:

 A、客户端无法正常用消费;

 B、在使用Kafka Consumer Offset  Monitor工具进行Kafka监控时会发现Lag(还有多少消息数未读取(Lag=logSize-Offset))为负数;其中此种情况的删除操作需要我们重点关注,后面我们也会详细介绍其对应的操作步骤。

一般正常情况,如果想让 Kafka 客户端正常消费,那么需要 Zookeeper 和 Kafka Log 中的记录保持如上图黄色所示。
二、Kafka 消息日志清除

操作步骤主要包括:

 1、停止Kafka运行;

 2、删除Kafka消息日志;

 3、修改ZK的偏移量;

 4、重启Kafka;

上述步骤重点介绍其中的关键步骤。

第 2 步:删除 Kafka 消息日志时,进入 Kafka 消息日志路径(可通过查看$KAFKA_HOME/config/server.properties 中的“log.dirs”知晓),删除相应 topic 文件夹下所有文件(如:“rm -rf ./topicA”);

第 3 步:修改 ZK 的偏移量时,进入 ZK 的安装目录下,运行./bin/zkCli.sh -server (中间以,分割),如果不带 server 默认修改的为本机。

示例如下:

 A.运行$ZOOKEEPER_HOME/bin/zkCli.sh -server Master:2181,Slave1:2181,Slave2:2181

 B.在ZK上运行ls /consumers/对应的分组/offset/对应的topic,就可以看到此topic下的所有分区了;

  通过get /consumers/对应的分组/offset/对应的topic/对应的分区号,可以查询到该分区上记录的offset;

  通过set /consumers/对应的分组/offset/对应的topic/对应的分区号 修改后的值(一般为0,重置),即可完成对offset的修改;

(注意:B 步骤中的“/consumers”由实际配置情况决定)

三、重建 Topic

操作步骤主要包括如下:

  1、删除Topic;

  2、删除log日志;

  3、删除ZK中的Topic记录

第一步:删除 Topic

运行$KAFKA_HOME/bin/kafka-topics.sh -delete -zookeeper [zookeeper server] -topic [topic name];如果 kafka 启动时加载的配置文件 server.properties 没有配置 delete.topic.enable = true,那么此时的删除并不是真正的删除。而只是把 topic 标记为:marked for deletion,此时就需要执行第 3 步的操作;

第三步:删除 ZK 中的 Topic 记录

示例如下:

 A.运行$ZOOKEEPER_HOME/bin/zkCli.sh -server Master:2181,Slave1:2181,Slave2:2181

 B.进入/admin/delete_topics目录下,找到删除的topic,删除对应的信息。

四、重新启动 Kafka 集群

控制器

Kafka 使用 Zookeeper 的临时节点来选举控制器, 并在节点加入集群或退出集群时通知控制器。控制器负责在节点加入或离开集群时进行分区首领选举。控制器使用 epoch 来避免“脑裂” 。“脑裂 ”是指两个节点同时认为自己是 当前的控制器。

脑裂问题

  • 什么是脑裂?
    kafka 中只有一个控制器 controller 负责分区的 leader 选举,同步 broker 的新增或删除消息,但有时由于网络问题,可能同时有两个 broker 认为自己是 controller,这时候其他的 broker 就会发生脑裂,不知道该听从谁的。

  • 如何解决?controller epoch
    每当新的 controller 产生的时候就会在 zk 中生成一个全新的、数值更大的 controller epoch 的标识,并同步给其他的 broker 进行保存,这样当第二个 controller 发送指令时,其他的 broker 就会自动忽略。

  • 选举问题?
    每个消息有自己的 topic 每个 topic 有多个分区 多个分区位于不同的 broker 每个分区有一个主分区和多个从分区。

每个分区都有一个主分区(leader)和多个从分区(fowller) 当一个 broker 宕机时,存在与该 broker 的主分区也会停止服务,因此要重新选举新的 leader 分区。

  • 如何选举?
    控制器会从 zk 中读取 ISR 列表 选取下一个有效的分区副本成为新的 leader

Kafka 会在 Zookeeper 上针对每个 Topic 维护一个称为 ISR(in-sync replica,已同步的副本)的集合,该集合中是一些分区的副本。如果这个集合有增减,kafka 会更新 zookeeper 上的记录。

通信模型

在 kafka 架构中,通信涉及到以下几点:

  1. producer 向 broker 集群生产数据数据 push 形式;
  2. consumer 从 broker 集群消费数据属于 pull 形式;
  3. broker 之间在 replication.factor>1 时,会存在副本之间数据同步,表现为:follower partition 从 leader partition pull 数据,来保证最大限度的拉近 partition 数据不一致。

Kafka Stream

支持的功能展示:

  • 字数统计这个例子用于演示 map 与 filter 模式以及简单的聚合
  • 另一个股票交易试产的各种统计信息,用于演示基于时间窗口的聚合
  • 最后使用填充点击事件流的例子来演示流的连接

KTable

KTable 可以存储状态,分为两种,内存和磁盘。磁盘依赖于 rockdb,内存依赖于 map。
KTable 具有容错功能,使用 kafka 存储事件流,用于恢复。
当使用基于 rockdb 的状态存储时,KTable 会优先从本地恢复,如果本地文件丢失则从 kafka 的 change log 事件流 topic 中,回放消息来恢复。
当使用基于内存的状态存储时,KTable 会直接从 kafka 的 change log 事件流 topic 中,回放消息来恢复。

Kafka Connect 有待了解???

Kafka 分区选择源码实现

消息 key 为空时,如果有缓存分区,使用缓存分区,没有缓存则随机选择
消息 key 不为空时,对 key 进行 HASH,然后对分区数取模
https://blog.csdn.net/liangwenmail/article/details/108321143#:~:text=StickyPartitionCache%20%E6%98%AF%20Kafka%20Client%20%E5%86%85%E9%83%A8%E7%9A%84%E4%B8%80%E4%B8%AA%E7%B1%BB%EF%BC%8C%E7%94%A8%E4%BA%8E%E7%AE%A1%E7%90%86%20Topic%20%E7%9A%84%E5%88%86%E5%8C%BA%E9%80%89%E6%8B%A9%E7%9A%84%E9%80%BB%E8%BE%91%E5%92%8C%E7%BC%93%E5%AD%98%E3%80%82,1%202%203%204%205%206%207%208
https://segmentfault.com/a/1190000020515457
https://www.confluent.io/blog/apache-kafka-producer-improvements-sticky-partitioner/

Kafka 消费者蛇者偏移量

https://blog.csdn.net/zzti_erlie/article/details/93637932