You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Hi all, I need serious help with my NATS jetstream publish/subscribe implementation.
I have a NATS Eventhandler class which looks like this:
CSM_EVENT_GLOBAL_SUBJECT_NAME = "test_source"
class NatsJetstreamEventHandler(IEventHandler):
async def connect(self):
self._nc = await nats.connect(servers=[self._server_url])
self._js = self._nc.jetstream()
try:
await self._js.add_stream(name=CSM_EVENT_GLOBAL_SUBJECT_NAME, subjects=["test_source"])
except Exception:
pass
async def broadcast(self, *, subject: str, event: Event):
"""
Broadcasts an event to the event queue.
:param subject: Subject of the message.
:param event: Event.
:return:
"""
if not self._nc or not self._nc.is_connected:
logger.info("NATS client is not connected, attempting to connect...")
await self.connect()
if not self._js:
logger.error("JetStream context is not available. Cannot subscribe.")
return
try:
data = pickle.dumps(event)
await self._js.publish(
subject=subject, payload=data, stream=CSM_EVENT_GLOBAL_SUBJECT_NAME
)
except pickle.PicklingError:
pass
async def subscribe(self, *, source: str, callback: Callable):
"""Subscribe to a source and handle messages using the provided callback."""
if not self._nc or not self._nc.is_connected:
logger.info("NATS client is not connected, attempting to connect...")
await self.connect()
if not self._js:
logger.error("JetStream context is not available. Cannot subscribe.")
return
logger.info(f"Attempting to subscribe to source: {source}")
try:
# You may also consider adding a consumer configuration here if needed.
# e.g., ConsumerConfig(...)
# Create or attach to an existing durable consumer.
sub = await self._js.subscribe(source, durable="new_durable_stream", cb=callback)
logger.info(f"Subscription to {source} is active: {sub}")
except Exception as e:
logger.exception(f"Failed to subscribe to {source}: {e}")
raise
in my main.py I want to use this like this:
@app.get("/", response_class=HTMLResponse)
async def read_root():
eventhandler = NatsJetstreamEventHandler()
await eventhandler.connect()
# Ensure connection is established
if not eventhandler.is_connected():
logger.error("Failed to connect to the event handler.")
return HTMLResponse(content="Failed to initialize.", status_code=500)
async def handle_messages(instance, sub: NatsJetstreamEventHandler):
async def process_message(msg):
print("Trying to add event to instance")
event = pickle.loads(msg.data) # Assuming the message is pickled.
await instance.add_events([event])
logger.info(f"Received Event: {event}")
await msg.ack() # Don't forget to acknowledge the message!
await sub.subscribe(source="test_source", callback=process_message)
async def run_state_machine(state_machine_class):
instance = instanceOfSomething()
async with anyio.create_task_group() as inner_tg:
inner_tg.start_soon(handle_messages, instance, eventhandler)
inner_tg.start_soon(instance.execute)
time.sleep(5)
async with anyio.create_task_group() as tg:
for _, state_machine_class in csm_class.state_machines.items():
tg.start_soon(run_state_machine, state_machine_class)
I tried so many different implementations but just can't figure out how this works.
Neither subscribe/publish work. When they are called they are not working but also don't give back an Exception.
I would highly appreciate any help on how to fix this problem.
My NATS Jetstream Cluster looks like this (docker-compose):
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
-
Hi all, I need serious help with my NATS jetstream publish/subscribe implementation.
I have a NATS Eventhandler class which looks like this:
in my main.py I want to use this like this:
I tried so many different implementations but just can't figure out how this works.
Neither subscribe/publish work. When they are called they are not working but also don't give back an Exception.
I would highly appreciate any help on how to fix this problem.
My NATS Jetstream Cluster looks like this (docker-compose):
Beta Was this translation helpful? Give feedback.
All reactions