Skip to content

Commit

Permalink
ID-1570 - Byte copying to preserve original stream; Hash kept as stat…
Browse files Browse the repository at this point in the history
…e variable;
  • Loading branch information
snackk committed Aug 8, 2024
1 parent fb71485 commit 19a4c6c
Show file tree
Hide file tree
Showing 5 changed files with 212 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public StreamConsumer<Void> getStreamConsumer(
} catch (FileAlreadyExistsException e) {
LOG.debug("Destination directory already exists");
}
Files.copy(stream, destination, StandardCopyOption.REPLACE_EXISTING);
Files.copy(stream.getInputStream(), destination, StandardCopyOption.REPLACE_EXISTING);
if ((!noMd5Check) && (!originalFileMd5.matches(stream.md5()))) {
LOG.error("MD5 mismatch. Deleting destination file");
Files.delete(destination);
Expand Down Expand Up @@ -72,9 +72,6 @@ public Optional<MD5Checksum> existingFileMd5() {
try (InputStream is = Files.newInputStream(this.destination);
BufferedInputStream bis = new BufferedInputStream(is);
StreamWithMD5Decorator md5Is = StreamWithMD5Decorator.of(bis)) {
byte[] buffer = new byte[BUFFER];
while ((md5Is.read(buffer)) != -1) {
}
return Optional.of(md5Is.md5());
} catch (IOException | NoSuchAlgorithmException e) {
throw new RuntimeException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public StreamConsumer<Void> getStreamConsumer(
.contentLength(size)
.storageClass(StorageClass.INTELLIGENT_TIERING)
.build();
client.putObject(putObjectRequest, RequestBody.fromInputStream(stream, size));
client.putObject(putObjectRequest, RequestBody.fromInputStream(stream.getInputStream(), size));
Latest latest = new Latest(clock.instant(), key);
String latestContent = mapper.writeValueAsString(latest);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,43 +1,115 @@
package technology.dice.dicewhere.downloader.stream;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.security.DigestInputStream;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Optional;
import javax.xml.bind.annotation.adapters.HexBinaryAdapter;
import technology.dice.dicewhere.downloader.md5.MD5Checksum;

public class StreamWithMD5Decorator extends InputStream {

private final DigestInputStream inputStream;
private final ByteArrayOutputStream buffer = new ByteArrayOutputStream();
private final MessageDigest md5;
DigestInputStream inputStream;
private boolean consumed = false;
private Optional<MD5Checksum> md5Checksum = Optional.empty();

private StreamWithMD5Decorator(DigestInputStream inputStream, MessageDigest md5) {
private StreamWithMD5Decorator(DigestInputStream inputStream, MessageDigest md5)
throws IOException {
this.inputStream = inputStream;
this.md5 = md5;
inputStream.on(false);
consumeStream();
}


public static StreamWithMD5Decorator of(InputStream inputStream) throws NoSuchAlgorithmException {
public static StreamWithMD5Decorator of(InputStream inputStream)
throws NoSuchAlgorithmException, IOException {
MessageDigest md5 = MessageDigest.getInstance("MD5");
DigestInputStream dis = new DigestInputStream(inputStream, md5);
return new StreamWithMD5Decorator(dis, md5);
}

private void consumeStream() throws IOException {
byte[] data = new byte[8192];
int bytesRead;
while ((bytesRead = inputStream.read(data)) != -1) {
buffer.write(data, 0, bytesRead);
}
consumed = true;
}

public MD5Checksum md5() {
String hex = (new HexBinaryAdapter()).marshal(md5.digest());
return MD5Checksum.of(hex);
if (!consumed) {
throw new IllegalStateException("Stream not fully consumed yet.");

Check warning on line 47 in dice-where-downloader-lib/src/main/java/technology/dice/dicewhere/downloader/stream/StreamWithMD5Decorator.java

View check run for this annotation

Codecov / codecov/patch

dice-where-downloader-lib/src/main/java/technology/dice/dicewhere/downloader/stream/StreamWithMD5Decorator.java#L47

Added line #L47 was not covered by tests
}
return md5Checksum.orElseGet(
() -> {
String hex = (new HexBinaryAdapter()).marshal(md5.digest());
MD5Checksum checksum = MD5Checksum.of(hex);
md5Checksum = Optional.of(checksum);
return checksum;
});
}

public InputStream getInputStream() {
return new ByteArrayInputStream(buffer.toByteArray());
}

@Override
public int read() throws IOException {
return inputStream.read();
if (!consumed) {
throw new IllegalStateException("Stream not fully consumed yet.");

Check warning on line 65 in dice-where-downloader-lib/src/main/java/technology/dice/dicewhere/downloader/stream/StreamWithMD5Decorator.java

View check run for this annotation

Codecov / codecov/patch

dice-where-downloader-lib/src/main/java/technology/dice/dicewhere/downloader/stream/StreamWithMD5Decorator.java#L65

Added line #L65 was not covered by tests
}
return getInputStream().read();

Check warning on line 67 in dice-where-downloader-lib/src/main/java/technology/dice/dicewhere/downloader/stream/StreamWithMD5Decorator.java

View check run for this annotation

Codecov / codecov/patch

dice-where-downloader-lib/src/main/java/technology/dice/dicewhere/downloader/stream/StreamWithMD5Decorator.java#L67

Added line #L67 was not covered by tests
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
if (!consumed) {
throw new IllegalStateException("Stream not fully consumed yet.");

Check warning on line 73 in dice-where-downloader-lib/src/main/java/technology/dice/dicewhere/downloader/stream/StreamWithMD5Decorator.java

View check run for this annotation

Codecov / codecov/patch

dice-where-downloader-lib/src/main/java/technology/dice/dicewhere/downloader/stream/StreamWithMD5Decorator.java#L73

Added line #L73 was not covered by tests
}
return getInputStream().read(b, off, len);

Check warning on line 75 in dice-where-downloader-lib/src/main/java/technology/dice/dicewhere/downloader/stream/StreamWithMD5Decorator.java

View check run for this annotation

Codecov / codecov/patch

dice-where-downloader-lib/src/main/java/technology/dice/dicewhere/downloader/stream/StreamWithMD5Decorator.java#L75

Added line #L75 was not covered by tests
}

@Override
public void close() throws IOException {
getInputStream().close();
inputStream.close();
}
/*
public static String of1(InputStream inputStream) throws NoSuchAlgorithmException {
return bytesToHex(checksum(inputStream));
}
private static byte[] checksum(InputStream is) {
MessageDigest md;
try {
md = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
throw new IllegalArgumentException(e);
}
try (DigestInputStream dis = new DigestInputStream(is, md)) {
while (dis.read() != -1)
; // empty loop to clear the data
md = dis.getMessageDigest();
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
return md.digest();
}
private static String bytesToHex(byte[] bytes) {
StringBuilder sb = new StringBuilder();
for (byte b : bytes) {
sb.append(String.format("%02x", b));
}
return sb.toString();
}
*/
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@
public class LocalFileAcceptorTest extends TestCase {

private static final int TEST_FILE_SIZE = 1024 * 1024;
@ClassRule
static WireMockRule wireMockRule = new WireMockRule(wireMockConfig().dynamicPort());
@ClassRule static WireMockRule wireMockRule = new WireMockRule(wireMockConfig().dynamicPort());

@BeforeClass
public static void beforeClass() {
Expand All @@ -45,16 +44,22 @@ public static void beforeClass() {
public void corruptedFileEmptyPreexistingSet() throws IOException, NoSuchAlgorithmException {
Pair<Path, String> tempFile = generateTempFile();
Path destinationDir = Files.createTempDirectory("dice-where");
IpInfoSiteSource ipInfoSiteSource = new IpInfoSiteSource(
new URL("http://localhost:" + wireMockRule.port() + "/data/file.mdb"));
wireMockRule.stubFor(WireMock.head(UrlPattern.ANY).willReturn(
aResponse().withStatus(HttpStatus.SC_OK)
.withHeader("Etag", "aaa")
.withHeader("Content-Length", Long.toString(TEST_FILE_SIZE))
.withHeader("Last-Modified", "Thu, 01 Dec 1994 16:00:00 GMT")));
wireMockRule.stubFor(WireMock.get(UrlPattern.ANY)
.willReturn(aResponse().withBody(
IOUtils.toByteArray(new FileInputStream(tempFile.getLeft().toFile())))));
IpInfoSiteSource ipInfoSiteSource =
new IpInfoSiteSource(new URL("http://localhost:" + wireMockRule.port() + "/data/file.mdb"));
wireMockRule.stubFor(
WireMock.head(UrlPattern.ANY)
.willReturn(
aResponse()
.withStatus(HttpStatus.SC_OK)
.withHeader("Etag", "aaa")
.withHeader("Content-Length", Long.toString(TEST_FILE_SIZE))
.withHeader("Last-Modified", "Thu, 01 Dec 1994 16:00:00 GMT")));
wireMockRule.stubFor(
WireMock.get(UrlPattern.ANY)
.willReturn(
aResponse()
.withBody(
IOUtils.toByteArray(new FileInputStream(tempFile.getLeft().toFile())))));

FileInfo fileInfo = ipInfoSiteSource.fileInfo();
ipInfoSiteSource.produce(new LocalFileAcceptor(destinationDir.resolve("file.mdb")), false);
Expand All @@ -68,72 +73,100 @@ public void corruptedFilePreexistingSet() throws IOException, NoSuchAlgorithmExc
Pair<Path, String> existingFile = generateTempFile();
Path destinationDir = Files.createTempDirectory("dice-where");
Files.copy(existingFile.getLeft(), destinationDir.resolve("existingFile.mdb"));
IpInfoSiteSource ipInfoSiteSource = new IpInfoSiteSource(
new URL("http://localhost:" + wireMockRule.port() + "/data/file.mdb"));
wireMockRule.stubFor(WireMock.head(UrlPattern.ANY).willReturn(
aResponse().withStatus(HttpStatus.SC_OK)
.withHeader("Etag", "aaa")
.withHeader("Content-Length", Long.toString(TEST_FILE_SIZE))
.withHeader("Last-Modified", "Thu, 01 Dec 1994 16:00:00 GMT")));
wireMockRule.stubFor(WireMock.get(UrlPattern.ANY)
.willReturn(aResponse().withBody(
IOUtils.toByteArray(new FileInputStream(tempFile.getLeft().toFile())))));
IpInfoSiteSource ipInfoSiteSource =
new IpInfoSiteSource(new URL("http://localhost:" + wireMockRule.port() + "/data/file.mdb"));
wireMockRule.stubFor(
WireMock.head(UrlPattern.ANY)
.willReturn(
aResponse()
.withStatus(HttpStatus.SC_OK)
.withHeader("Etag", "aaa")
.withHeader("Content-Length", Long.toString(TEST_FILE_SIZE))
.withHeader("Last-Modified", "Thu, 01 Dec 1994 16:00:00 GMT")));
wireMockRule.stubFor(
WireMock.get(UrlPattern.ANY)
.willReturn(
aResponse()
.withBody(
IOUtils.toByteArray(new FileInputStream(tempFile.getLeft().toFile())))));

FileInfo fileInfo = ipInfoSiteSource.fileInfo();
ipInfoSiteSource.produce(new LocalFileAcceptor(destinationDir.resolve("file.mdb")), false);
assertNotEquals(tempFile.getRight().toLowerCase(), fileInfo.getMd5Checksum().stringFormat());
assertEquals(Files.list(destinationDir).count(), 1);
assertFalse(Files.exists(destinationDir.resolve("file.mdb")));
assertTrue(Arrays.equals(Files.readAllBytes(existingFile.getLeft()),
Files.readAllBytes(Files.list(destinationDir).findFirst().get())));
assertTrue(
Arrays.equals(
Files.readAllBytes(existingFile.getLeft()),
Files.readAllBytes(Files.list(destinationDir).findFirst().get())));
}

@Test
public void goodFileEmptyPreexistingSet() throws IOException, NoSuchAlgorithmException {
Pair<Path, String> tempFile = generateTempFile();
Path destinationDir = Files.createTempDirectory("dice-where");
IpInfoSiteSource ipInfoSiteSource = new IpInfoSiteSource(
new URL("http://localhost:" + wireMockRule.port() + "/data/file.mdb"));
wireMockRule.stubFor(WireMock.head(UrlPattern.ANY).willReturn(
aResponse().withStatus(HttpStatus.SC_OK)
.withHeader("Etag", tempFile.getRight())
.withHeader("Content-Length", Long.toString(TEST_FILE_SIZE))
.withHeader("Last-Modified", "Thu, 01 Dec 1994 16:00:00 GMT")));
wireMockRule.stubFor(WireMock.get(UrlPattern.ANY)
.willReturn(aResponse().withBody(
IOUtils.toByteArray(new FileInputStream(tempFile.getLeft().toFile())))));
IpInfoSiteSource ipInfoSiteSource =
new IpInfoSiteSource(new URL("http://localhost:" + wireMockRule.port() + "/data/file.mdb"));
wireMockRule.stubFor(
WireMock.head(UrlPattern.ANY)
.willReturn(
aResponse()
.withStatus(HttpStatus.SC_OK)
.withHeader("Etag", tempFile.getRight())
.withHeader("Content-Length", Long.toString(TEST_FILE_SIZE))
.withHeader("Last-Modified", "Thu, 01 Dec 1994 16:00:00 GMT")));
wireMockRule.stubFor(
WireMock.get(UrlPattern.ANY)
.willReturn(
aResponse()
.withBody(
IOUtils.toByteArray(new FileInputStream(tempFile.getLeft().toFile())))));

FileInfo fileInfo = ipInfoSiteSource.fileInfo();
ipInfoSiteSource.produce(new LocalFileAcceptor(destinationDir.resolve("file.mdb")), false);
assertEquals(tempFile.getRight().toLowerCase(), fileInfo.getMd5Checksum().stringFormat());
assertEquals(1, Files.list(destinationDir).count());
assertTrue(Arrays.equals(Files.readAllBytes(tempFile.getLeft()),
Files.readAllBytes(destinationDir.resolve("file.mdb"))));
assertTrue(
Arrays.equals(
Files.readAllBytes(tempFile.getLeft()),
Files.readAllBytes(destinationDir.resolve("file.mdb"))));
}

@Test
public void goodFilePreexistingSet() throws IOException, NoSuchAlgorithmException {
Pair<Path, String> tempFile = generateTempFile();
Pair<Path, String> existingFile = generateTempFile();

Path destinationDir = Files.createTempDirectory("dice-where");
Files.copy(existingFile.getLeft(), destinationDir.resolve("existingFile.mdb"));
IpInfoSiteSource ipInfoSiteSource = new IpInfoSiteSource(
new URL("http://localhost:" + wireMockRule.port() + "/data/file.mdb"));
wireMockRule.stubFor(WireMock.head(UrlPattern.ANY).willReturn(
aResponse().withStatus(HttpStatus.SC_OK)
.withHeader("Etag", tempFile.getRight())
.withHeader("Content-Length", Long.toString(TEST_FILE_SIZE))
.withHeader("Last-Modified", "Thu, 01 Dec 1994 16:00:00 GMT")));
wireMockRule.stubFor(WireMock.get(UrlPattern.ANY)
.willReturn(aResponse().withBody(
IOUtils.toByteArray(new FileInputStream(tempFile.getLeft().toFile())))));

wireMockRule.stubFor(
WireMock.head(UrlPattern.ANY)
.willReturn(
aResponse()
.withStatus(HttpStatus.SC_OK)
.withHeader("Etag", tempFile.getRight())
.withHeader("Content-Length", Long.toString(TEST_FILE_SIZE))
.withHeader("Last-Modified", "Thu, 01 Dec 1994 16:00:00 GMT")));

wireMockRule.stubFor(
WireMock.get(UrlPattern.ANY)
.willReturn(
aResponse()
.withBody(
IOUtils.toByteArray(new FileInputStream(tempFile.getLeft().toFile())))));

IpInfoSiteSource ipInfoSiteSource =
new IpInfoSiteSource(new URL("http://localhost:" + wireMockRule.port() + "/data/file.mdb"));
FileInfo fileInfo = ipInfoSiteSource.fileInfo();

ipInfoSiteSource.produce(new LocalFileAcceptor(destinationDir.resolve("file.mdb")), false);
assertEquals(tempFile.getRight().toLowerCase(), fileInfo.getMd5Checksum().stringFormat());
assertEquals(2, Files.list(destinationDir).count());
assertTrue(Arrays.equals(Files.readAllBytes(tempFile.getLeft()),
Files.readAllBytes(destinationDir.resolve("file.mdb"))));
assertTrue(
Arrays.equals(
Files.readAllBytes(tempFile.getLeft()),
Files.readAllBytes(destinationDir.resolve("file.mdb"))));
}

private Pair<Path, String> generateTempFile() throws IOException, NoSuchAlgorithmException {
Expand All @@ -145,4 +178,4 @@ private Pair<Path, String> generateTempFile() throws IOException, NoSuchAlgorith
String hex = (new HexBinaryAdapter()).marshal(md.digest(contents));
return Pair.of(tempFile, hex);
}
}
}
Loading

0 comments on commit 19a4c6c

Please sign in to comment.