diff --git a/vertx-grpc-server/src/test/java/io/vertx/grpc/server/ProcessHelper.java b/vertx-grpc-server/src/test/java/io/vertx/grpc/server/ProcessHelper.java deleted file mode 100644 index 03525dc3..00000000 --- a/vertx-grpc-server/src/test/java/io/vertx/grpc/server/ProcessHelper.java +++ /dev/null @@ -1,29 +0,0 @@ -package io.vertx.grpc.server; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Objects; - -public class ProcessHelper { - - private ProcessHelper() {} - - public static Process exec(Class main, List args) throws IOException { - - List command = new ArrayList<>(); - // java binary executable - command.add(System.getProperty("java.home") + File.separator + "bin" + File.separator + "java"); - command.add("-cp"); - // inherit classpath - command.add(System.getProperty("java.class.path")); - // main class name - command.add(main.getName()); - // args - command.addAll(Objects.requireNonNullElse(args, Collections.emptyList())); - - return new ProcessBuilder(command).inheritIO().start(); - } -} diff --git a/vertx-grpc-server/src/test/java/io/vertx/grpc/server/ServerBridgeTest.java b/vertx-grpc-server/src/test/java/io/vertx/grpc/server/ServerBridgeTest.java index bb3bc794..6463215f 100644 --- a/vertx-grpc-server/src/test/java/io/vertx/grpc/server/ServerBridgeTest.java +++ b/vertx-grpc-server/src/test/java/io/vertx/grpc/server/ServerBridgeTest.java @@ -23,11 +23,20 @@ import io.grpc.stub.ServerCallStreamObserver; import io.grpc.stub.StreamObserver; import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import io.vertx.core.net.NetClient; +import io.vertx.core.net.NetServer; +import io.vertx.core.net.NetSocket; import io.vertx.ext.unit.Async; import io.vertx.ext.unit.TestContext; import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.ArrayList; import java.util.Collections; -import java.util.concurrent.TimeUnit; +import java.util.List; +import java.util.Map; +import org.jetbrains.annotations.Nullable; import org.junit.Test; import java.util.concurrent.atomic.AtomicInteger; @@ -478,7 +487,7 @@ public void testCallNetworkInterrupted(TestContext should) throws InterruptedExc StreamingGrpc.StreamingImplBase impl = new StreamingImplBase() { @Override public StreamObserver pipe(StreamObserver responseObserver) { - return new StreamObserver() { + return new StreamObserver<>() { @Override public void onNext(Item item) { requestCount.incrementAndGet(); @@ -504,11 +513,21 @@ public void onCompleted() { serverStub.bind(server); startServer(server); - Process client = ProcessHelper.exec(ServerBridgeTest.class, Collections.singletonList(String.valueOf(port))); - // waiting for doing request - Thread.sleep(1_000); - client.destroy(); - client.waitFor(); + try (var proxyServer = new ProxyServer(vertx, port + 1, port)) { + proxyServer.start(); + + int proxyPort = proxyServer.proxyServer.actualPort(); + Channel channel = ManagedChannelBuilder.forAddress("localhost", proxyPort).usePlaintext().build(); + StreamingGrpc.StreamingStub stub = StreamingGrpc.newStub(channel); + StreamObserver requestObserver = stub.pipe(new NoopStreamObserver<>()); + Item request = Item.newBuilder().setValue("item").build(); + requestObserver.onNext(request); + requestObserver.onNext(request); + requestObserver.onNext(request); + + // waiting for the connection to be established. + Thread.sleep(1000); + } async.await(20_000); @@ -516,30 +535,70 @@ public void onCompleted() { should.assertTrue(completed.future().failed()); } - public static void main(String... args) throws InterruptedException { - StreamObserver noop = new StreamObserver() { - @Override public void onNext(Item item) { + static class NoopStreamObserver implements StreamObserver { + @Override public void onNext(T ignored) {} - } + @Override public void onError(Throwable ignored) {} - @Override public void onError(Throwable throwable) { + @Override public void onCompleted() {} + } - } + static class ProxyServer implements AutoCloseable { - @Override public void onCompleted() { + private final int listenPort; - } - }; + private final int targetPort; + + private final NetServer proxyServer; - Channel channel = ManagedChannelBuilder.forAddress("localhost", Integer.parseInt(args[0])).usePlaintext().build(); - StreamingGrpc.StreamingStub stub = StreamingGrpc.newStub(channel); - StreamObserver requestObserver = stub.pipe(noop); - Item request = Item.newBuilder().setValue("item").build(); - requestObserver.onNext(request); - requestObserver.onNext(request); - requestObserver.onNext(request); + private final NetClient proxyClient; - // waiting to be killed - Thread.currentThread().join(); + // live or dead + private final List> sockets = new ArrayList<>(); + + ProxyServer(Vertx vertx, int listenPort, int targetPort) { + this.listenPort = listenPort; + this.targetPort = targetPort; + this.proxyServer = vertx.createNetServer().connectHandler(this::handle); + this.proxyClient = vertx.createNetClient(); + } + + void start() { + this.proxyServer.listen(listenPort).toCompletionStage().toCompletableFuture().join(); + } + + void handle(NetSocket socket) { + socket.pause(); + + proxyClient.connect(targetPort, "localhost") + .onComplete(ar -> { + if (ar.succeeded()) { + NetSocket proxySocket = ar.result(); + proxySocket.pause(); + + socket.handler(proxySocket::write); + proxySocket.handler(socket::write); + socket.closeHandler(ignored -> proxySocket.close()); + proxySocket.closeHandler(ignored -> socket.close()); + + sockets.add(Map.entry(socket, proxySocket)); + + proxySocket.resume(); + socket.resume(); + } else { + socket.close(); + } + }); + } + + @Override + public void close() { + this.sockets.forEach(entry -> { + entry.getKey().close(); + entry.getValue().close(); + }); + this.proxyClient.close(); + this.proxyServer.close(); + } } }