/* The port on which the server should run */ private int port = 50051; private Server server; private void start() throws Exception { server = ServerBuilder.forPort(port) .addService(GreeterGrpc.bindService(new GreeterImpl())) .build() .start(); logger.info("Server started, listening on " + port); Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { // Use stderr here since the logger may has been reset by its JVM shutdown hook. System.err.println("*** shutting down gRPC server since JVM is shutting down"); HelloWorldServer.this.stop(); System.err.println("*** server shut down"); } }); }
// 一个 双向流式 RPC 是双方使用读写流去发送一个消息序列。两个流独立操作,因此客户端和服务器 可以以任意喜欢的顺序读写:比如, 服务器可以在写入响应前等待接收所有的客户端消息,或者可以交替 的读取和写入消息,或者其他读写的组合。 每个流中的消息顺序被预留。你可以通过在请求和响应前加 stream 关键字去制定方法的类型。 rpc RouteChat(stream RouteNote) returns (stream RouteNote) {} } 我们的 .proto 文件也包含了所有请求的 protocol buffer 消息类型定义以及在服务方法中使用的响 应类型——比如,下面的Point消息类型: // Points are represented as latitude-longitude pairs in the E7 representation // (degrees multiplied by 10**7 and rounded to the nearest integer). // Latitudes should be in the range +/- 90 degrees and longitude should be in // the range +/- 180 degrees (inclusive). message Point { int32 latitude = 1; int32 longitude = 2; }
// A latitude-longitude rectangle, represented as two diagonally opposite // points "lo" and "hi". message Rectangle { // One corner of the rectangle. Point lo = 1;
// The other corner of the rectangle. Point hi = 2; }
// A feature names something at a given point. // // If a feature could not be named, the name is empty. message Feature { // The name of the feature. string name = 1;
// The point where the feature is detected. Point location = 2; }
// Not used in the RPC. Instead, this is here for the form serialized to disk. message FeatureDatabase { repeated Feature feature = 1; }
// A RouteNote is a message sent while at a given point. message RouteNote { // The location from which the message is sent. Point location = 1;
// The message to be sent. string message = 2; }
// A RouteSummary is received in response to a RecordRoute rpc. // // It contains the number of individual points received, the number of // detected features, and the total distance covered as the cumulative sum of // the distance between each point. message RouteSummary { // The number of points received. int32 point_count = 1;
// The number of known features passed while traversing the route. int32 feature_count = 2;
// The distance covered in metres. int32 distance = 3;
// The duration of the traversal in seconds. int32 elapsed_time = 4; }
public class RouteGuideServer { private static final Logger logger = Logger.getLogger(RouteGuideServer.class.getName());
private final int port; private final Server server;
public RouteGuideServer(int port) throws IOException { this(port, RouteGuideUtil.getDefaultFeaturesFile()); }
/** Create a RouteGuide server listening on {@code port} using {@code featureFile} database. */ public RouteGuideServer(int port, URL featureFile) throws IOException { this(ServerBuilder.forPort(port), port, RouteGuideUtil.parseFeatures(featureFile)); }
/** Create a RouteGuide server using serverBuilder as a base and features as data. */ public RouteGuideServer(ServerBuilder<?> serverBuilder, int port, Collection<Feature> features) { this.port = port; server = serverBuilder.addService(new RouteGuideService(features)) .build(); }
/** Start serving requests. */ public void start() throws IOException { server.start(); logger.info("Server started, listening on " + port); Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { // Use stderr here since the logger may have been reset by its JVM shutdown hook. System.err.println("*** shutting down gRPC server since JVM is shutting down"); try { RouteGuideServer.this.stop(); } catch (InterruptedException e) { e.printStackTrace(System.err); } System.err.println("*** server shut down"); } }); }
/** Stop serving requests and shutdown resources. */ public void stop() throws InterruptedException { if (server != null) { server.shutdown().awaitTermination(30, TimeUnit.SECONDS); } }
/** * Await termination on the main thread since the grpc library uses daemon threads. */ private void blockUntilShutdown() throws InterruptedException { if (server != null) { server.awaitTermination(); } }
/** * Main method. This comment makes the linter happy. */ public static void main(String[] args) throws Exception { RouteGuideServer server = new RouteGuideServer(8980); server.start(); server.blockUntilShutdown(); }
/** * 我们的服务器有一个实现了生成的 RouteGuideGrpc.Service 接口的 RouteGuideService 类: * * <p>See route_guide.proto for details of the methods. */ private static class RouteGuideService extends RouteGuideGrpc.RouteGuideImplBase { private final Collection<Feature> features; private final ConcurrentMap<Point, List<RouteNote>> routeNotes = new ConcurrentHashMap<Point, List<RouteNote>>();
/** * 简单 RPC * 1. 如在我们的服务定义中指定的那样,我们组织并填充一个 Feature 应答对象返回给客户端。在这个 例子中,我们通过一个单独的私有方法checkFeature()来实现。 * 2. 我们使用应答观察者的 onNext() 方法返回 Feature。 * 3. 我们使用应答观察者的 onCompleted() 方法来指出我们已经完成了和 RPC的交互。 * @param request the requested location for the feature. 请求 * @param responseObserver the observer that will receive the feature at the requested point.一个应答的观察者,实际上是服务器调用它应答的一个特殊接口。 */ @Override public void getFeature(Point request, StreamObserver<Feature> responseObserver) { responseObserver.onNext(checkFeature(request)); responseObserver.onCompleted(); }
/** * 服务器端流式 RPC * 这次我们得到了需要返回给客户端的足够多的 Feature 对象(在这个场景下,我们根据他们是否在我们的 Rectangle 请求中,从服务的特性集合中选择他们),并且使用 onNext() 方法轮流往响应观察* 者写入。最后,和简单 RPC 的例子一样,我们使用响应观察者的 onCompleted() 方法去告诉 gRPC 写入应答已完成。 * @param request the bounding rectangle for the requested features. * @param responseObserver the observer that will receive the features. */ @Override public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) { int left = min(request.getLo().getLongitude(), request.getHi().getLongitude()); int right = max(request.getLo().getLongitude(), request.getHi().getLongitude()); int top = max(request.getLo().getLatitude(), request.getHi().getLatitude()); int bottom = min(request.getLo().getLatitude(), request.getHi().getLatitude());
for (Feature feature : features) { if (!RouteGuideUtil.exists(feature)) { continue; }
int lat = feature.getLocation().getLatitude(); int lon = feature.getLocation().getLongitude(); if (lon >= left && lon <= right && lat >= bottom && lat <= top) { responseObserver.onNext(feature); } } responseObserver.onCompleted(); }
/** * 客户端流式 RPC * 如你所见,我们的方法和前面的方法类型相似,拿到一个 StreamObserver 应答观察者参数,但是这次它返回一个 StreamObserver 以便客户端写入它的 Point。 * 在这个方法体中,我们返回了一个匿名 StreamObserver 实例,其中我们: * 覆写了 onNext() 方法,每次客户端写入一个 Point 到消息流时,拿到特性和其它信息。 * 覆写了 onCompleted() 方法(在 客户端 结束写入消息时调用),用来填充和构建我们的 RouteSummary。然后我们用 RouteSummary 调用方法自己的的响应观察者的 onNext(),之后调用它的 onCompleted() 方法,结束服务器端的调用。 * * @param responseObserver an observer to receive the response summary. * @return an observer to receive the requested route points. */ @Override public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) { return new StreamObserver<Point>() { int pointCount; int featureCount; int distance; Point previous; final long startTime = System.nanoTime();
@Override public void onNext(Point point) { pointCount++; if (RouteGuideUtil.exists(checkFeature(point))) { featureCount++; } // For each point after the first, add the incremental distance from the previous point to // the total distance value. if (previous != null) { distance += calcDistance(previous, point); } previous = point; }
@Override public void onError(Throwable t) { logger.log(Level.WARNING, "recordRoute cancelled"); }
@Override public void onCompleted() { long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime); responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount) .setFeatureCount(featureCount).setDistance(distance) .setElapsedTime((int) seconds).build()); responseObserver.onCompleted(); } }; }
/** * 双向流式RPC(与客户端流的差别主要是在responseObserver.onNext()方法的调用位置,这个调用位置位于onNext()方法中。而客户端流是在onComplete()方法中,意味着需要把客户端流处理完成后,才能调用onNext方法处理并立即结束。) * 和我们的客户端流的例子一样,我们拿到和返回一个 StreamObserver 应答观察者,除了这次我们在客户端仍然写入消息到 它们的 消息流时通过我们方法的应答观察者返回值。这里读写的语法和客户端流以及服务器流方法一样。虽然每一端都会按照它们写入的顺序拿到另一端的消息,客户端和服务器都可以任意顺序读写——流的操作是互不依赖的。 * * @param responseObserver an observer to receive the stream of previous messages. * @return an observer to handle requested message/location pairs. */ @Override public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) { return new StreamObserver<RouteNote>() { @Override public void onNext(RouteNote note) { List<RouteNote> notes = getOrCreateNotes(note.getLocation());
// Respond with all previous notes at this location. for (RouteNote prevNote : notes.toArray(new RouteNote[0])) { responseObserver.onNext(prevNote); }
// Now add the new note to the list notes.add(note); }
@Override public void onError(Throwable t) { logger.log(Level.WARNING, "routeChat cancelled"); }
@Override public void onCompleted() { responseObserver.onCompleted(); } }; }
/** * Get the notes list for the given location. If missing, create it. */ private List<RouteNote> getOrCreateNotes(Point location) { List<RouteNote> notes = Collections.synchronizedList(new ArrayList<RouteNote>()); List<RouteNote> prevNotes = routeNotes.putIfAbsent(location, notes); return prevNotes != null ? prevNotes : notes; }
/** * Gets the feature at the given point. * * @param location the location to check. * @return The feature object at the point. Note that an empty name indicates no feature. */ private Feature checkFeature(Point location) { for (Feature feature : features) { if (feature.getLocation().getLatitude() == location.getLatitude() && feature.getLocation().getLongitude() == location.getLongitude()) { return feature; } }
// No feature was found, return an unnamed feature. return Feature.newBuilder().setName("").setLocation(location).build(); }
/** * Calculate the distance between two points using the "haversine" formula. * The formula is based on http://mathforum.org/library/drmath/view/51879.html. * * @param start The starting point * @param end The end point * @return The distance between the points in meters */ private static int calcDistance(Point start, Point end) { int r = 6371000; // earth radius in meters double lat1 = toRadians(RouteGuideUtil.getLatitude(start)); double lat2 = toRadians(RouteGuideUtil.getLatitude(end)); double lon1 = toRadians(RouteGuideUtil.getLongitude(start)); double lon2 = toRadians(RouteGuideUtil.getLongitude(end)); double deltaLat = lat2 - lat1; double deltaLon = lon2 - lon1;
public void recordRoute(List<Feature> features, int numPoints) throws Exception { info("*** RecordRoute"); final SettableFuture<Void> finishFuture = SettableFuture.create(); StreamObserver<RouteSummary> responseObserver = new StreamObserver<RouteSummary>() { @Override public void onNext(RouteSummary summary) { info("Finished trip with {0} points. Passed {1} features. " + "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(), summary.getFeatureCount(), summary.getDistance(), summary.getElapsedTime()); }
@Override public void onError(Throwable t) { finishFuture.setException(t); }
@Override public void onCompleted() { finishFuture.set(null); } };
StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver); try { // Send numPoints points randomly selected from the features list. StringBuilder numMsg = new StringBuilder(); Random rand = new Random(); for (int i = 0; i < numPoints; ++i) { int index = rand.nextInt(features.size()); Point point = features.get(index).getLocation(); info("Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point), RouteGuideUtil.getLongitude(point)); requestObserver.onNext(point); // Sleep for a bit before sending the next one. Thread.sleep(rand.nextInt(1000) + 500); if (finishFuture.isDone()) { break; } } info(numMsg.toString()); requestObserver.onCompleted();