-
Hi, I'm trying to use lettuce at kotlin environment. And I wanna get stream data as flow from kotlin. This is my sample code, class RedisHandler {
private val client = RedisClient.create("redis://127.0.0.1")
private val connection by lazy {
client.connect()
}
private val sync by lazy {
connection.sync()
}
private val async by lazy {
connection.async()
}
@OptIn(ExperimentalLettuceCoroutinesApi::class)
private val coroutine by lazy {
connection.coroutines()
}
fun getTestValue(key: String): String {
return sync.get(key)
}
fun setValue(key: String, value: String) {
sync.set(key, value)
}
@OptIn(ExperimentalLettuceCoroutinesApi::class)
fun createConsumerGroup(groupName: String): Flow<StreamMessage<String, String>> {
val result = sync.xgroupCreate(XReadArgs.StreamOffset.latest("test-stream"), groupName)
sync.xgroupCreateconsumer("test-stream", Consumer.from(groupName, "test-stream"))
println(result)
return coroutine.xreadgroup(Consumer.from(groupName, "test-stream"), XReadArgs.StreamOffset.latest("test-stream"))
}
companion object {
private val log = LoggerFactory.getLogger(RedisHandler::class.java)
}
} and this is my testing code. @Test
fun `Test stream data with Flow`() = runBlocking {
val handler = RedisHandler()
val result = handler.createConsumerGroup("Group1")
result.collect() {
println(it.id)
println(it.stream)
}
sleep(10000)
} and, I tried to add stream with local redis using test-cli like this,
But the error shows like,
When i xreadgroup with using redis-cli like,
it passed, but I'm not sure what is equivalent of this command at lettuce. Can u share what is proper way to get stream data as flow from lettuce..? |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment
-
Oh, I realized that, XReadArgs.StreamOffset.from("key", ">") is working. @OptIn(ExperimentalLettuceCoroutinesApi::class)
fun getStream(groupName: String): Flow<StreamMessage<String, String>> {
return coroutine.xreadgroup(Consumer.from(groupName, "test-stream"), XReadArgs.StreamOffset.from("test-stream", ">"))
} This function properly provides flow for stream message. |
Beta Was this translation helpful? Give feedback.
Oh, I realized that,
is working.
This function properly provides flow for stream message.