Skip to content

Commit

Permalink
fix(registration): discovery plugin registration bugfixes and refactor (
Browse files Browse the repository at this point in the history
cryostatio#193)

* fix(registration): respond 401 on plugin pings if wrong credentials supplied

* revamp registration flow logic

* more error handling, attempt to query for previously-submitted matching credentials
  • Loading branch information
andrewazores authored Sep 13, 2023
1 parent 0bd2b3e commit 8db9b4c
Show file tree
Hide file tree
Showing 5 changed files with 270 additions and 111 deletions.
172 changes: 155 additions & 17 deletions src/main/java/io/cryostat/agent/CryostatClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.function.Function;

Expand All @@ -39,8 +42,10 @@
import io.cryostat.agent.model.RegistrationInfo;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.input.CountingInputStream;
import org.apache.http.HttpHeaders;
Expand Down Expand Up @@ -114,7 +119,8 @@ public CompletableFuture<Boolean> checkRegistration(PluginInfo pluginInfo) {
return supply(req, (res) -> logResponse(req, res)).thenApply(this::isOkStatus);
}

public CompletableFuture<PluginInfo> register(PluginInfo pluginInfo, URI callback) {
public CompletableFuture<PluginInfo> register(
int credentialId, PluginInfo pluginInfo, URI callback) {
try {
RegistrationInfo registrationInfo =
new RegistrationInfo(
Expand All @@ -126,7 +132,20 @@ public CompletableFuture<PluginInfo> register(PluginInfo pluginInfo, URI callbac
mapper.writeValueAsString(registrationInfo),
ContentType.APPLICATION_JSON));
return supply(req, (res) -> logResponse(req, res))
.thenApply(res -> assertOkStatus(req, res))
.handle(
(res, t) -> {
if (t != null) {
throw new CompletionException(t);
}
if (!isOkStatus(res)) {
try {
deleteCredentials(credentialId).get();
} catch (InterruptedException | ExecutionException e) {
log.error("Failed to delete previous credentials", e);
}
}
return assertOkStatus(req, res);
})
.thenApply(
res -> {
try (InputStream is = res.getEntity().getContent()) {
Expand Down Expand Up @@ -155,22 +174,82 @@ public CompletableFuture<PluginInfo> register(PluginInfo pluginInfo, URI callbac
public CompletableFuture<Integer> submitCredentialsIfRequired(
int prevId, Credentials credentials, URI callback) {
if (prevId < 0) {
return submitCredentials(credentials, callback);
return queryExistingCredentials(callback)
.thenCompose(
id -> {
if (id >= 0) {
return CompletableFuture.completedFuture(id);
}
return submitCredentials(prevId, credentials, callback);
});
}
HttpGet req = new HttpGet(baseUri.resolve(CREDENTIALS_API_PATH + "/" + prevId));
log.info("{}", req);
return supply(req, (res) -> logResponse(req, res))
.thenApply(this::isOkStatus)
.handle(
(v, t) -> {
if (t != null) {
log.error("Failed to get credentials with ID " + prevId, t);
throw new CompletionException(t);
}
return isOkStatus(v);
})
.thenCompose(
exists -> {
if (exists) {
return CompletableFuture.completedFuture(prevId);
}
return submitCredentials(credentials, callback);
return submitCredentials(prevId, credentials, callback);
});
}

private CompletableFuture<Integer> submitCredentials(Credentials credentials, URI callback) {
private CompletableFuture<Integer> queryExistingCredentials(URI callback) {
HttpGet req = new HttpGet(baseUri.resolve(CREDENTIALS_API_PATH));
log.info("{}", req);
return supply(req, (res) -> logResponse(req, res))
.handle(
(res, t) -> {
if (t != null) {
log.error("Failed to get credentials", t);
throw new CompletionException(t);
}
return assertOkStatus(req, res);
})
.thenApply(
res -> {
try (InputStream is = res.getEntity().getContent()) {
return mapper.readValue(is, ObjectNode.class);
} catch (IOException e) {
log.error("Unable to parse response as JSON", e);
throw new RegistrationException(e);
}
})
.thenApply(
node -> {
try {
return mapper.readValue(
node.get("data").get("result").toString(),
new TypeReference<List<StoredCredential>>() {});
} catch (IOException e) {
log.error("Unable to parse response as JSON", e);
throw new RegistrationException(e);
}
})
.thenApply(
l ->
l.stream()
.filter(
sc ->
Objects.equals(
sc.matchExpression,
selfMatchExpression(callback)))
.map(sc -> sc.id)
.findFirst()
.orElse(-1));
}

private CompletableFuture<Integer> submitCredentials(
int prevId, Credentials credentials, URI callback) {
HttpPost req = new HttpPost(baseUri.resolve(CREDENTIALS_API_PATH));
MultipartEntityBuilder entityBuilder =
MultipartEntityBuilder.create()
Expand Down Expand Up @@ -198,10 +277,38 @@ private CompletableFuture<Integer> submitCredentials(Credentials credentials, UR
log.info("{}", req);
req.setEntity(entityBuilder.build());
return supply(req, (res) -> logResponse(req, res))
.thenApply(res -> assertOkStatus(req, res))
.thenApply(res -> res.getFirstHeader(HttpHeaders.LOCATION).getValue())
.thenApply(res -> res.substring(res.lastIndexOf('/') + 1, res.length()))
.thenApply(Integer::valueOf);
.thenApply(
res -> {
if (!isOkStatus(res)) {
try {
if (res.getStatusLine().getStatusCode() == 409) {
int queried = queryExistingCredentials(callback).get();
if (queried >= 0) {
return queried;
}
}
} catch (InterruptedException | ExecutionException e) {
log.error("Failed to query for existing credentials", e);
}
try {
deleteCredentials(prevId).get();
} catch (InterruptedException | ExecutionException e) {
log.error(
"Failed to delete previous credentials with id "
+ prevId,
e);
throw new RegistrationException(e);
}
}
String location =
assertOkStatus(req, res)
.getFirstHeader(HttpHeaders.LOCATION)
.getValue();
String id =
location.substring(
location.lastIndexOf('/') + 1, location.length());
return Integer.valueOf(id);
});
}

public CompletableFuture<Void> deleteCredentials(int id) {
Expand All @@ -210,9 +317,7 @@ public CompletableFuture<Void> deleteCredentials(int id) {
}
HttpDelete req = new HttpDelete(baseUri.resolve(CREDENTIALS_API_PATH + "/" + id));
log.info("{}", req);
return supply(req, (res) -> logResponse(req, res))
.thenApply(res -> assertOkStatus(req, res))
.thenApply(res -> null);
return supply(req, (res) -> logResponse(req, res)).thenApply(res -> null);
}

public CompletableFuture<Void> deregister(PluginInfo pluginInfo) {
Expand Down Expand Up @@ -339,14 +444,15 @@ private CountingInputStream getRecordingInputStream(Path filePath) throws IOExce

private String selfMatchExpression(URI callback) {
return String.format(
"target.connectUrl == \"%s\" && target.jvmId == \"%s\" &&"
+ " target.annotations.platform[\"INSTANCE_ID\"] == \"%s\"",
callback, jvmId, instanceId);
"target.connectUrl == \"%s\" && target.annotations.platform[\"INSTANCE_ID\"] =="
+ " \"%s\"",
callback, instanceId);
}

private boolean isOkStatus(HttpResponse res) {
int sc = res.getStatusLine().getStatusCode();
return 200 <= sc && sc < 300;
// 2xx is OK, 3xx is redirect range so allow those too
return 200 <= sc && sc < 400;
}

private HttpResponse assertOkStatus(HttpRequestBase req, HttpResponse res) {
Expand All @@ -364,4 +470,36 @@ private HttpResponse assertOkStatus(HttpRequestBase req, HttpResponse res) {
}
return res;
}

@SuppressFBWarnings(
value = {
"URF_UNREAD_FIELD",
"UWF_UNWRITTEN_FIELD",
"UWF_UNWRITTEN_PUBLIC_OR_PROTECTED_FIELD"
})
public static class StoredCredential {

public int id;
public String matchExpression;

@Override
public int hashCode() {
return Objects.hash(id, matchExpression);
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
StoredCredential other = (StoredCredential) obj;
return id == other.id && Objects.equals(matchExpression, other.matchExpression);
}
}
}
4 changes: 4 additions & 0 deletions src/main/java/io/cryostat/agent/HttpException.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,8 @@ public class HttpException extends RuntimeException {
"Unexpected non-OK status code %d on API path %s",
statusCode, uri.toString()));
}

HttpException(int statusCode, Throwable cause) {
super(String.format("HTTP %d", statusCode), cause);
}
}
34 changes: 22 additions & 12 deletions src/main/java/io/cryostat/agent/MainModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@
import io.cryostat.core.sys.FileSystem;
import io.cryostat.core.tui.ClientWriter;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import dagger.Lazy;
import dagger.Module;
import dagger.Provides;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.http.client.HttpClient;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.client.HttpClientBuilder;
Expand Down Expand Up @@ -92,17 +94,9 @@ public static WebServer provideWebServer(
@Named(ConfigModule.CRYOSTAT_AGENT_WEBSERVER_HOST) String host,
@Named(ConfigModule.CRYOSTAT_AGENT_WEBSERVER_PORT) int port,
@Named(ConfigModule.CRYOSTAT_AGENT_CALLBACK) URI callback,
Lazy<Registration> registration,
@Named(ConfigModule.CRYOSTAT_AGENT_REGISTRATION_RETRY_MS) int registrationRetryMs) {
Lazy<Registration> registration) {
return new WebServer(
remoteContexts,
cryostat,
executor,
host,
port,
callback,
registration,
registrationRetryMs);
remoteContexts, cryostat, executor, host, port, callback, registration);
}

@Provides
Expand Down Expand Up @@ -170,7 +164,8 @@ public static HttpClient provideHttpClient(

@Provides
public static ObjectMapper provideObjectMapper() {
return new ObjectMapper();
return new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}

@Provides
Expand Down Expand Up @@ -205,8 +200,23 @@ public static Registration provideRegistration(
@Named(ConfigModule.CRYOSTAT_AGENT_APP_JMX_PORT) int jmxPort,
@Named(ConfigModule.CRYOSTAT_AGENT_REGISTRATION_RETRY_MS) int registrationRetryMs,
@Named(ConfigModule.CRYOSTAT_AGENT_REGISTRATION_CHECK_MS) int registrationCheckMs) {

Logger log = LoggerFactory.getLogger(Registration.class);
return new Registration(
executor,
Executors.newSingleThreadScheduledExecutor(
r -> {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("cryostat-agent-registration");
t.setUncaughtExceptionHandler(
(thread, err) ->
log.error(
String.format(
"[%s] Uncaught exception: %s",
thread.getName(),
ExceptionUtils.getStackTrace(err))));
return t;
}),
cryostat,
callback,
webServer,
Expand Down
Loading

0 comments on commit 8db9b4c

Please sign in to comment.