Skip to content

Commit

Permalink
test: use proxy server instead of new process
Browse files Browse the repository at this point in the history
  • Loading branch information
hu-chia authored and hujia committed Jun 24, 2024
1 parent 44d3768 commit 0409169
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 54 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,20 @@
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetServer;
import io.vertx.core.net.NetSocket;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.List;
import java.util.Map;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;

import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -478,7 +487,7 @@ public void testCallNetworkInterrupted(TestContext should) throws InterruptedExc
StreamingGrpc.StreamingImplBase impl = new StreamingImplBase() {
@Override
public StreamObserver<Item> pipe(StreamObserver<Item> responseObserver) {
return new StreamObserver<Item>() {
return new StreamObserver<>() {
@Override
public void onNext(Item item) {
requestCount.incrementAndGet();
Expand All @@ -504,42 +513,92 @@ public void onCompleted() {
serverStub.bind(server);
startServer(server);

Process client = ProcessHelper.exec(ServerBridgeTest.class, Collections.singletonList(String.valueOf(port)));
// waiting for doing request
Thread.sleep(1_000);
client.destroy();
client.waitFor();
try (var proxyServer = new ProxyServer(vertx, port + 1, port)) {
proxyServer.start();

int proxyPort = proxyServer.proxyServer.actualPort();
Channel channel = ManagedChannelBuilder.forAddress("localhost", proxyPort).usePlaintext().build();
StreamingGrpc.StreamingStub stub = StreamingGrpc.newStub(channel);
StreamObserver<Item> requestObserver = stub.pipe(new NoopStreamObserver<>());
Item request = Item.newBuilder().setValue("item").build();
requestObserver.onNext(request);
requestObserver.onNext(request);
requestObserver.onNext(request);

// waiting for the connection to be established.
Thread.sleep(1000);
}

async.await(20_000);

should.assertEquals(requestCount.get(), 3);
should.assertTrue(completed.future().failed());
}

public static void main(String... args) throws InterruptedException {
StreamObserver<Item> noop = new StreamObserver<Item>() {
@Override public void onNext(Item item) {
static class NoopStreamObserver<T> implements StreamObserver<T> {
@Override public void onNext(T ignored) {}

}
@Override public void onError(Throwable ignored) {}

@Override public void onError(Throwable throwable) {
@Override public void onCompleted() {}
}

}
static class ProxyServer implements AutoCloseable {

@Override public void onCompleted() {
private final int listenPort;

}
};
private final int targetPort;

private final NetServer proxyServer;

Channel channel = ManagedChannelBuilder.forAddress("localhost", Integer.parseInt(args[0])).usePlaintext().build();
StreamingGrpc.StreamingStub stub = StreamingGrpc.newStub(channel);
StreamObserver<Item> requestObserver = stub.pipe(noop);
Item request = Item.newBuilder().setValue("item").build();
requestObserver.onNext(request);
requestObserver.onNext(request);
requestObserver.onNext(request);
private final NetClient proxyClient;

// waiting to be killed
Thread.currentThread().join();
// live or dead
private final List<Map.Entry<NetSocket, NetSocket>> sockets = new ArrayList<>();

ProxyServer(Vertx vertx, int listenPort, int targetPort) {
this.listenPort = listenPort;
this.targetPort = targetPort;
this.proxyServer = vertx.createNetServer().connectHandler(this::handle);
this.proxyClient = vertx.createNetClient();
}

void start() {
this.proxyServer.listen(listenPort).toCompletionStage().toCompletableFuture().join();
}

void handle(NetSocket socket) {
socket.pause();

proxyClient.connect(targetPort, "localhost")
.onComplete(ar -> {
if (ar.succeeded()) {
NetSocket proxySocket = ar.result();
proxySocket.pause();

socket.handler(proxySocket::write);
proxySocket.handler(socket::write);
socket.closeHandler(ignored -> proxySocket.close());
proxySocket.closeHandler(ignored -> socket.close());

sockets.add(Map.entry(socket, proxySocket));

proxySocket.resume();
socket.resume();
} else {
socket.close();
}
});
}

@Override
public void close() {
this.sockets.forEach(entry -> {
entry.getKey().close();
entry.getValue().close();
});
this.proxyClient.close();
this.proxyServer.close();
}
}
}

0 comments on commit 0409169

Please sign in to comment.