async hashing of channels and flows (big data)

This commit is contained in:
Sergey Chernov 2024-08-25 02:07:10 +02:00
parent 054252a3ce
commit 95c052a22c
4 changed files with 144 additions and 10 deletions

View File

@ -47,6 +47,8 @@ kotlin {
implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.7.0")
implementation("com.ionspin.kotlin:multiplatform-crypto-libsodium-bindings:0.9.2")
implementation(platform("org.kotlincrypto.hash:bom:0.5.1"))
implementation("org.kotlincrypto.hash:sha3")
api("com.ionspin.kotlin:bignum:0.3.9")
api("net.sergeych:mp_bintools:0.1.7")
api("net.sergeych:mp_stools:1.5.1")

View File

@ -2,21 +2,114 @@ package net.sergeych.crypto2
import com.ionspin.kotlin.crypto.generichash.GenericHash
import com.ionspin.kotlin.crypto.util.encodeToUByteArray
import org.komputing.khash.keccak.Keccak
import org.komputing.khash.keccak.KeccakParameter
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.flow.Flow
import org.kotlincrypto.hash.sha3.SHA3_256
import org.kotlincrypto.hash.sha3.SHA3_384
private interface StreamProcessor {
fun update(data: UByteArray)
fun final(): UByteArray
}
/**
* Hash support for crypto2. We implement only secure as for the crypto2 publication time functions,
* and do not include broken algorithms like SHA1 and SHA2, Md5, etc. To calculate hashes use:
*
* - [digest] to calculate a hash of the block in memory
* - [ofChannel] to calculate hash of long or slow data in a channel
* - [ofFlow] to calculate hash of the data in the [Flow<UByteArray>]
*/
@Suppress("unused")
enum class Hash(val perform: (UByteArray)->UByteArray) {
Blake2b({ GenericHash.genericHash(it) }),
Blake2b2l({ blake2b2l(it) }),
Sha3_384({ Keccak.digest(it.asByteArray(), KeccakParameter.SHA3_384).asUByteArray()}),
Sha3_256({ Keccak.digest(it.asByteArray(), KeccakParameter.SHA3_256).asUByteArray()}),
enum class Hash(private val direct: ((UByteArray) -> UByteArray)? = null,private val streamProcessor: () -> StreamProcessor) {
Blake2b(
// direct blacke2 is faster than stream:
{ GenericHash.genericHash(it) }, {
object : StreamProcessor {
val state = GenericHash.genericHashInit()
override fun update(data: UByteArray) {
GenericHash.genericHashUpdate(state, data)
}
override fun final(): UByteArray =
GenericHash.genericHashFinal(state)
}
}),
Sha3_384(
// direct Keccaak currently is slower
null,
{
object : StreamProcessor {
val state = SHA3_384()
override fun update(data: UByteArray) {
state.update(data.asByteArray())
}
override fun final(): UByteArray = state.digest().asUByteArray()
}
}),
Sha3_256(null,
{
object : StreamProcessor {
val state = SHA3_256()
override fun update(data: UByteArray) {
state.update(data.asByteArray())
}
override fun final(): UByteArray = state.digest().asUByteArray()
}
});
@Deprecated("will be removed in favor of digest()", ReplaceWith("digest()"))
fun perform(src: UByteArray): UByteArray = digest(src)
/**
* Calculate digest for the in-memory data
* @param src data to calculate hash digest over
* @return calculated hash value
*/
fun digest(src: UByteArray): UByteArray = direct?.invoke(src) ?: streamProcessor().also { it.update(src)}.final()
/**
* Collect the flow and return the hash digest of all the data. Let calculate hashes on data
* that are too big to fit in memory
*/
suspend fun ofFlow(source: Flow<UByteArray>): UByteArray {
val sp = streamProcessor()
source.collect { sp.update(it) }
return sp.final()
}
/**
* Read all data from the channel and return the hash digest of all the data. Let calculate hashes on data
* that are too big to fit in memory
*/
suspend fun ofChannel(source: ReceiveChannel<UByteArray>): UByteArray {
val sp = streamProcessor()
for( block in source) sp.update(block)
return sp.final()
}
}
private val defaultSuffix1 = "All lay loads on a willing horse".encodeToUByteArray()
private val defaultSuffix2 = "A stitch in time saves nine".encodeToUByteArray()
fun blake2b(src: UByteArray): UByteArray = Hash.Blake2b.perform(src)
fun blake2b2l(src: UByteArray): UByteArray =
blake2b(blake2b(src) + defaultSuffix1 + src)
/**
* Caclulate [Hash.Blake2b] hash for [src], shortcut for [Hash.Blake2b.digest]
*/
fun blake2b(src: UByteArray): UByteArray = Hash.Blake2b.digest(src)
/**
* Double linked Blake2b using the default or specified suffix. This should be more hard to
* brute force.collision attack than just [blake2b]. Note that different suffixes provide different
* results.
*/
fun blake2b2l(src: UByteArray,suffix: UByteArray = defaultSuffix1): UByteArray =
blake2b(blake2b(src) + suffix + src)
/**
* Triple linked [blake2b], even more prone to collision attacks than [blake2b2l].
*/
fun blake2b3l(src: UByteArray): UByteArray = blake2b(blake2b2l(src) + defaultSuffix2 + src)

View File

@ -4,6 +4,7 @@ import com.ionspin.kotlin.bignum.integer.BigInteger
import org.komputing.khash.keccak.extensions.fillWith
import kotlin.math.min
@Deprecated("use Hash enum instead, will be removed in next major release", ReplaceWith("Hash", "net.sergeych.crypto2.Hash"))
object Keccak {
private val BIT_65 = BigInteger.ONE shl (64)

View File

@ -0,0 +1,38 @@
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.test.runTest
import kotlinx.datetime.Clock
import net.sergeych.crypto2.Hash
import net.sergeych.crypto2.initCrypto
import kotlin.random.Random
import kotlin.random.nextUBytes
import kotlin.test.*
@Suppress("UNUSED_PARAMETER", "UNUSED_VARIABLE")
suspend fun <T> sw(label: String, f: suspend () -> T): T {
val t1 = Clock.System.now()
val result = f()
val t2 = Clock.System.now()
// println("$label: ${t2 - t1}")
return result
}
class HashTest {
@Test
fun testEqualMethods() {
fun testMethod(h: Hash) = runTest {
initCrypto()
val a = Random.Default.nextUBytes(1024)
val b = Random.Default.nextUBytes(1024)
val c = a + b
val h1 = sw("dir $h") { h.digest(c) }
val h2 = sw("ind $h") { h.ofFlow(listOf(a, b).asFlow()) }
assertContentEquals(h1, h2)
}
for (i in 0..10) {
testMethod(Hash.Blake2b)
testMethod(Hash.Sha3_256)
testMethod(Hash.Sha3_384)
}
}
}