RabbitMqAdapter.kt

  1. package com.hexagontk.messaging.rabbitmq

  2. import com.hexagontk.messaging.Message
  3. import com.hexagontk.messaging.MessagingPort
  4. import com.hexagontk.serialization.SerializationFormat
  5. import com.hexagontk.serialization.serialize
  6. import java.net.URI
  7. import kotlin.reflect.KClass

  8. class RabbitMqAdapter(
  9.     url: String = "amqp://guest:guest@localhost",
  10.     private val serializationFormat: SerializationFormat
  11. ) : MessagingPort {

  12.     private companion object {
  13.         private const val EXCHANGE = "messages"
  14.     }

  15.     private val client by lazy { RabbitMqClient(URI(url), serializationFormat) }

  16.     init {
  17.         client.bindExchange(EXCHANGE, "topic", "*.*.*", "event_pool")
  18.     }

  19.     override fun <T : Message> consume(
  20.         type: KClass<T>, address: String, decoder: (Map<String, *>) -> T, consumer: (T) -> Unit
  21.     ) {
  22.         client.consume(EXCHANGE, address, type, decoder) { consumer(it) }
  23.     }

  24.     override fun publish(message: Message, address: String) {
  25.         client.publish(EXCHANGE, address, message.serialize(serializationFormat))
  26.     }
  27. }