Skip to content

Commit

Permalink
#52 Add ability to mark a run as failed if data cannot be loaded
Browse files Browse the repository at this point in the history
Failures in most cases will be returned to the user as failed runs rather than 500 errors; and also able to be returned via the API later on.

Still need to return the error message/failure cause, as well as persist the reason for the failure to the database
  • Loading branch information
chadlwilson committed Dec 13, 2021
1 parent d3e109e commit d419924
Show file tree
Hide file tree
Showing 6 changed files with 171 additions and 29 deletions.
3 changes: 2 additions & 1 deletion src/main/kotlin/recce/server/dataset/DatasetRecService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ open class DatasetRecService(
return loadFrom(datasetConfig.source, datasetConfig.resolvedHashingStrategy, recRun, this::saveSourceBatch)
.zipWhen { loadFrom(datasetConfig.target, datasetConfig.resolvedHashingStrategy, recRun, this::saveTargetBatch) }
.flatMap { (source, target) -> recRun.map { it.withMetaData(source, target) } }
.flatMap { run -> runService.complete(run) }
.flatMap { run -> runService.successful(run) }
.onErrorResume { error -> recRun.flatMap { run -> runService.failed(run, error) } }
}

private fun loadFrom(
Expand Down
16 changes: 16 additions & 0 deletions src/main/kotlin/recce/server/recrun/RecRunRepository.kt
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,27 @@ data class RecRun(
@Embedded var sourceMeta: DatasetMeta = DatasetMeta()
@Embedded var targetMeta: DatasetMeta = DatasetMeta()

@Transient var failureCause: Throwable? = null

fun withMetaData(source: DatasetMeta, target: DatasetMeta): RecRun {
sourceMeta = source
targetMeta = target
return this
}

fun asSuccessful(summary: MatchStatus): RecRun {
this.summary = summary
completedTime = Instant.now()
status = RunStatus.Successful
return this
}

fun asFailed(cause: Throwable): RecRun {
this.failureCause = cause
completedTime = Instant.now()
status = RunStatus.Failed
return this
}
}

enum class RunStatus {
Expand Down
15 changes: 7 additions & 8 deletions src/main/kotlin/recce/server/recrun/RecRunService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package recce.server.recrun
import jakarta.inject.Singleton
import mu.KotlinLogging
import reactor.core.publisher.Mono
import java.time.Instant

private val logger = KotlinLogging.logger {}

Expand All @@ -17,16 +16,16 @@ open class RecRunService(
.doOnNext { logger.info { "Starting reconciliation run for $it}..." } }
.cache()

fun complete(run: RecRun): Mono<RecRun> {
fun successful(run: RecRun): Mono<RecRun> {
logger.info { "Summarising results for $run" }
return recordRepository.countMatchedByKeyRecRunId(run.id!!)
.map {
run.apply {
completedTime = Instant.now(); summary = it
status = RunStatus.Successful
}
}
.map { run.asSuccessful(it) }
.flatMap(runRepository::update)
.doOnNext { logger.info { "Run completed for $it" } }
}

fun failed(run: RecRun, cause: Throwable): Mono<RecRun> {
logger.info(cause) { "Recording failure for $run" }
return runRepository.update(run.asFailed(cause))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ class DatasetRecServiceIntegrationTest {
fun `can run a simple reconciliation`() {
StepVerifier.create(service.runFor("test-dataset"))
.assertNext { run ->
checkPersistentFieldsFor(run)
checkCompleted(run)
checkSuccessful(run)
}
.verifyComplete()

Expand All @@ -74,7 +75,8 @@ class DatasetRecServiceIntegrationTest {
}
)
.assertNext { (run, record) ->
checkPersistentFieldsFor(run)
checkCompleted(run)
checkSuccessful(run)
assertThat(record.key.recRunId).isEqualTo(run.id)
assertThat(record.key.migrationKey).isEqualTo("Test0")
assertThat(record.sourceData).isEqualTo(record.targetData).`is`(hexSha256Hash)
Expand Down Expand Up @@ -109,12 +111,15 @@ class DatasetRecServiceIntegrationTest {
.verifyComplete()
}

private fun checkPersistentFieldsFor(run: RecRun) {
private fun checkCompleted(run: RecRun) {
assertThat(run.id).isNotNull
assertThat(run.datasetId).isEqualTo("test-dataset")
assertThat(run.createdTime).isNotNull
assertThat(run.updatedTime).isAfterOrEqualTo(run.createdTime)
assertThat(run.completedTime).isAfterOrEqualTo(run.createdTime)
}

private fun checkSuccessful(run: RecRun) {
assertThat(run.status).isEqualTo(RunStatus.Successful)
assertThat(run.summary).isEqualTo(MatchStatus(1, 2, 2, 0))
val expectedMeta = DatasetMeta(
Expand All @@ -128,33 +133,65 @@ class DatasetRecServiceIntegrationTest {
assertThat(run.targetMeta).usingRecursiveComparison().isEqualTo(expectedMeta)
}

private fun checkFailed(run: RecRun) {
assertThat(run.status).isEqualTo(RunStatus.Failed)
assertThat(run.summary).satisfiesAnyOf(
{ st -> assertThat(st).isNull() },
{ st -> assertThat(st).isEqualTo(MatchStatus()) }
)
assertThat(run.sourceMeta).isEqualTo(DatasetMeta())
assertThat(run.targetMeta).isEqualTo(DatasetMeta())
}

@Test
fun `should emit error on bad source query`() {
fun `should fail run on bad source query`() {
// Wipe the source DB
flywayCleanMigrate(tempDir, "SELECT 1", sourceDataSource)

StepVerifier.create(service.runFor("test-dataset"))
.consumeErrorWith {
assertThat(it)
.assertNext { run ->
checkCompleted(run)
checkFailed(run)
assertThat(run.failureCause)
.isExactlyInstanceOf(DataLoadException::class.java)
.hasMessageContaining("Failed to load data from source")
.hasMessageContaining("\"TESTDATA\" not found")
.hasCauseExactlyInstanceOf(R2dbcBadGrammarException::class.java)
}
.verify()
.verifyComplete()

// Check persisted representation
StepVerifier.create(runRepository.findAll())
.assertNext { run ->
checkCompleted(run)
checkFailed(run)
}
.verifyComplete()
}

@Test
fun `should emit error on bad target query`() {
fun `should fail run on bad target query`() {
// Wipe the target DB
flywayCleanMigrate(tempDir, "SELECT 1", targetDataSource)

StepVerifier.create(service.runFor("test-dataset"))
.consumeErrorWith {
assertThat(it)
.assertNext { run ->
checkCompleted(run)
checkFailed(run)
assertThat(run.failureCause)
.isExactlyInstanceOf(DataLoadException::class.java)
.hasMessageContaining("Failed to load data from target")
.hasMessageContaining("\"TESTDATA\" not found")
.hasCauseExactlyInstanceOf(R2dbcBadGrammarException::class.java)
}
.verify()
.verifyComplete()

// Check persisted representation
StepVerifier.create(runRepository.findAll())
.assertNext { run ->
checkCompleted(run)
checkFailed(run)
}
.verifyComplete()
}
}
87 changes: 80 additions & 7 deletions src/test/kotlin/recce/server/dataset/DatasetRecServiceTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ package recce.server.dataset

import io.r2dbc.spi.Row
import io.r2dbc.spi.RowMetadata
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.assertj.core.api.Assertions.*
import org.junit.jupiter.api.Test
import org.mockito.ArgumentMatchers.anyList
import org.mockito.kotlin.*
Expand All @@ -20,7 +19,8 @@ internal class DatasetRecServiceTest {

private val runService = mock<RecRunService> {
on { start(testDataset) } doReturn Mono.just(recRun)
on { complete(recRun) } doReturn Mono.just(recRun)
on { successful(recRun) } doReturn Mono.just(recRun).map { recRun.asSuccessful(MatchStatus()) }
on { failed(eq(recRun), any()) } doReturn Mono.just(recRun).map { recRun.asFailed(IllegalArgumentException()) }
}

private val emptyDataLoad = mock<DataLoadDefinition> {
Expand Down Expand Up @@ -53,6 +53,74 @@ internal class DatasetRecServiceTest {
.hasMessageContaining(testDataset)
}

@Test
fun `mono should return failed run on failed data load`() {
val service = DatasetRecService(
RecConfiguration(mapOf(testDataset to DatasetConfiguration(emptyDataLoad, emptyDataLoad))),
runService,
mock()
)

val rootCause = IllegalArgumentException("Could not connect to database")
whenever(emptyDataLoad.runQuery()).thenReturn(Flux.error(rootCause))

StepVerifier.create(service.runFor(testDataset))
.assertNext {
assertThat(it.status).isEqualTo(RunStatus.Failed)
}
.verifyComplete()

val errorCaptor = argumentCaptor<Throwable>()
verify(runService).failed(eq(recRun), errorCaptor.capture())

assertThat(errorCaptor.firstValue)
.isInstanceOf(DataLoadException::class.java)
.hasCause(rootCause)
.hasMessageContaining("Failed to load data")
.hasMessageContaining(rootCause.message)
}

@Test
fun `mono should error on failed initial save`() {
val service = DatasetRecService(
RecConfiguration(mapOf(testDataset to DatasetConfiguration(emptyDataLoad, emptyDataLoad))),
runService,
mock()
)

whenever(runService.start(any())).thenReturn(Mono.error(IllegalArgumentException("failed!")))

StepVerifier.create(service.runFor(testDataset))
.expectErrorSatisfies {
assertThat(it)
.isExactlyInstanceOf(IllegalArgumentException::class.java)
.hasMessageContaining("failed!")
}
.verify()
}

@Test
fun `mono should error on failed save of failed run`() {
val service = DatasetRecService(
RecConfiguration(mapOf(testDataset to DatasetConfiguration(emptyDataLoad, emptyDataLoad))),
runService,
mock()
)

val rootCause = IllegalArgumentException("Could not connect to database")
whenever(emptyDataLoad.runQuery()).thenReturn(Flux.error(rootCause))

val failSaveCause = IllegalArgumentException("Could not save failure status")
whenever(runService.failed(any(), any())).thenReturn(Mono.error(failSaveCause))

StepVerifier.create(service.runFor(testDataset))
.expectErrorSatisfies {
assertThat(it)
.isEqualTo(failSaveCause)
}
.verify()
}

@Test
fun `should reconcile empty datasets without error`() {
val service = DatasetRecService(
Expand All @@ -62,6 +130,7 @@ internal class DatasetRecServiceTest {
)
StepVerifier.create(service.runFor(testDataset))
.assertNext {
assertThat(it.status).isEqualTo(RunStatus.Successful)
assertThat(it.sourceMeta.cols).isEmpty()
assertThat(it.targetMeta.cols).isEmpty()
}
Expand All @@ -80,14 +149,15 @@ internal class DatasetRecServiceTest {

StepVerifier.create(service.runFor(testDataset))
.assertNext {
assertThat(it.status).isEqualTo(RunStatus.Successful)
assertThat(it.sourceMeta.cols).isNotEmpty
assertThat(it.targetMeta.cols).isEmpty()
}
.verifyComplete()

verify(recordRepository).saveAll(listOf(RecRecord(key = testRecordKey, sourceData = "def")))
verifyNoMoreInteractions(recordRepository)
verify(runService).complete(recRun)
verify(runService).successful(recRun)
}

@Test
Expand All @@ -103,6 +173,7 @@ internal class DatasetRecServiceTest {

StepVerifier.create(service.runFor(testDataset))
.assertNext {
assertThat(it.status).isEqualTo(RunStatus.Successful)
assertThat(it.sourceMeta.cols).isEmpty()
assertThat(it.targetMeta.cols).isNotEmpty
}
Expand All @@ -111,7 +182,7 @@ internal class DatasetRecServiceTest {
verify(recordRepository).findByRecRunIdAndMigrationKeyIn(testRecordKey.recRunId, listOf(testRecordKey.migrationKey))
verify(recordRepository).saveAll(listOf(RecRecord(key = testRecordKey, targetData = "def")))
verifyNoMoreInteractions(recordRepository)
verify(runService).complete(recRun)
verify(runService).successful(recRun)
}

@Test
Expand All @@ -129,6 +200,7 @@ internal class DatasetRecServiceTest {

StepVerifier.create(service.runFor(testDataset))
.assertNext {
assertThat(it.status).isEqualTo(RunStatus.Successful)
assertThat(it.sourceMeta.cols).isNotEmpty
assertThat(it.targetMeta.cols).isNotEmpty
}
Expand All @@ -138,7 +210,7 @@ internal class DatasetRecServiceTest {
verify(recordRepository).findByRecRunIdAndMigrationKeyIn(testRecordKey.recRunId, listOf(testRecordKey.migrationKey))
verify(recordRepository).saveAll(listOf(RecRecord(key = testRecordKey, targetData = "def")))
verifyNoMoreInteractions(recordRepository)
verify(runService).complete(recRun)
verify(runService).successful(recRun)
}

@Test
Expand All @@ -154,6 +226,7 @@ internal class DatasetRecServiceTest {

StepVerifier.create(service.runFor(testDataset))
.assertNext {
assertThat(it.status).isEqualTo(RunStatus.Successful)
assertThat(it.sourceMeta.cols).isNotEmpty
assertThat(it.targetMeta.cols).isNotEmpty
}
Expand All @@ -163,7 +236,7 @@ internal class DatasetRecServiceTest {
verify(recordRepository).findByRecRunIdAndMigrationKeyIn(testRecordKey.recRunId, listOf(testRecordKey.migrationKey))
verify(recordRepository).updateAll(listOf(testRecord))
verifyNoMoreInteractions(recordRepository)
verify(runService).complete(recRun)
verify(runService).successful(recRun)
}

@Test
Expand Down
20 changes: 18 additions & 2 deletions src/test/kotlin/recce/server/recrun/RecRunServiceTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import org.mockito.kotlin.doReturn
import org.mockito.kotlin.mock
import reactor.core.publisher.Mono
import reactor.test.StepVerifier
import java.lang.IllegalArgumentException
import java.time.Instant

internal class RecRunServiceTest {
Expand All @@ -29,7 +30,7 @@ internal class RecRunServiceTest {
}

@Test
fun `complete should set completed time`() {
fun `should set successful run with match status`() {
val expectedMatchStatus = MatchStatus(1, 1, 1, 1)
val recordRepository = mock<RecRecordRepository> {
on { countMatchedByKeyRecRunId(any()) } doReturn Mono.just(expectedMatchStatus)
Expand All @@ -44,12 +45,27 @@ internal class RecRunServiceTest {
)
}

StepVerifier.create(RecRunService(runRepository, recordRepository).complete(startedRun))
StepVerifier.create(RecRunService(runRepository, recordRepository).successful(startedRun))
.assertNext {
assertThat(it.completedTime).isAfterOrEqualTo(it.createdTime)
assertThat(it.status).isEqualTo(RunStatus.Successful)
assertThat(it.summary).isEqualTo(expectedMatchStatus)
}
.verifyComplete()
}

@Test
fun `should set failed run`() {
val runRepository = mock<RecRunRepository> {
on { update(any()) } doReturn Mono.just(startedRun)
}

StepVerifier.create(RecRunService(runRepository, mock()).failed(startedRun, IllegalArgumentException("failed run!")))
.assertNext {
assertThat(it.completedTime).isAfterOrEqualTo(it.createdTime)
assertThat(it.status).isEqualTo(RunStatus.Failed)
assertThat(it.summary).isNull()
}
.verifyComplete()
}
}

0 comments on commit d419924

Please sign in to comment.