Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pub-Sub tries to deserialize messages from not subscribed topic #2113

Open
AqlaSolutions opened this issue Apr 8, 2024 · 3 comments
Open

Comments

@AqlaSolutions
Copy link
Contributor

AqlaSolutions commented Apr 8, 2024

We have a few nodes and some of them were using RunAsClient but now I disabled it and things went wrong. One of nodes (which originally didn't use RunAsClient) leverages pub-sub for internal communication between its actors. Somehow those internal messages arrive to another node and cause a deserialization exception (because another node doesn't know anything about those internal types). This happens even though that node doesn't subscribe to the topic where this message is published (but subscribes to another). It didn't happen before the RunAsClient change.

[23:11:14.902 ERR] Proto.Remote.RemoteMessageHandler ["localhost:7752"] Unable to deserialize message with "cluster.pubsub.PubSubBatchTransport"
System.Exception: Type with the specified name [redacted] not found
   at Proto.Remote.JsonSerializer.Deserialize(ByteString bytes, String typeName)
   at Proto.Remote.Serialization.Deserialize(String typeName, ByteString bytes, Int32 serializerId)
   at Proto.Cluster.PubSub.PubSubBatchTransport.<>c__DisplayClass0_0.<Deserialize>b__0(PubSubEnvelope e)
   at System.Linq.Enumerable.SelectIListIterator`2.ToList()
   at Proto.Cluster.PubSub.PubSubBatchTransport.Deserialize(ActorSystem system)
   at Proto.Remote.RemoteMessageHandler.HandleRemoteMessage(RemoteMessage currentMessage, String remoteAddress)   

Is this a bug or expected behavior?

@AqlaSolutions
Copy link
Contributor Author

AqlaSolutions commented Apr 9, 2024

It looks like TopicActor starts at another non-publisher node so it has to deal with serialization. How can we force it to start at the local node for specific topics?

@rogeralsing
Copy link
Contributor

Hi,
currently this is only one type of topic-actor, and that is cluster-wide so to say. it can run on any cluster node.
This is why you are seeing this behavior.

That being said.
Pub-Sub is still experimental. we could def expand the API to allow for more customization here.
e.g. currently the topic-actor kind is autoregistered for all cluster nodes.
We could make it so that you can register various topic types, with different configurations. and for different nodes that is.

That change would be pretty easy to solve. it´s mostly about allowing to have a non hardcoded kind for the topic actor.

Up for suggestions here

@benbenwilde
Copy link
Contributor

Under the covers, the TopicActor is registered like _clusterKinds.Add(TopicActor.Kind, new ClusterKind(TopicActor.Kind, Props.FromProducer(() => new TopicActor(store))).Build(this));, so to work around the serialization issue, I simply register additional TopicActors under different kinds. This way I can choose where a certain kind of TopicActor can start, and send messages to that topic actor that it's able to deserialize. Then I ended up making some extensions to support specifying a custom topicActorKind so I can target the right one.

public static class SubscriptionExtensions
{
	public static async Task<IAsyncDisposable> SubscribeAsync(this Cluster cluster, string topicActorKind, string topic, Receive receive)
	{
		var subId = cluster.System.Root.Spawn(Props.FromFunc(receive));
		await cluster.RequestAsync<SubscribeResponse>(topic, topicActorKind, new SubscribeRequest
		{
			Subscriber = new SubscriberIdentity
			{
				Pid = subId
			}
		}, default);
		
		return new Subscription(cluster, topicActorKind, topic, subId);
	}
	
	public class Subscription(Cluster cluster, string topicActorKind, string topic, PID subId) : IAsyncDisposable
	{
		public async ValueTask DisposeAsync()
		{
			await cluster.RequestAsync<UnsubscribeResponse>(topic, topicActorKind, new UnsubscribeRequest
			{
				Subscriber = new SubscriberIdentity
				{
					Pid = subId
				}
			}, default);
		}
	}
	
	public static BatchingProducer CreatePublisher(this IContext context, string topicActorKind, string topic, ILogger logger)
	{
		return new BatchingProducer(new TopicActorKindPublisher(context.Cluster(), topicActorKind), topic, new BatchingProducerConfig
		{
			OnPublishingError = async (retries, exception, batch) =>
			{
				if (retries > 6)
				{
					logger.LogError(exception, "Failed to publish batch, giving up.");
					return PublishingErrorDecision.FailBatchAndContinue;
				}
				
				logger.LogWarning(exception, "Failed to publish batch, will retry.");
				return PublishingErrorDecision.RetryBatchImmediately;
			}
		});
	}
}

Would be awesome to build this in at some point, but there is another solution to the serialization problem (although having multiple topic actor kinds is still good for the isolation) - which is to not serialize at all. In theory, the TopicActor and the PubSubMemberDeliveryActor should not actually have to deserialize the contents being published at all! I'm not sure the best way to go about this, since the messages that facilitate publishing need to be deserialized of course, just not the actual payload. Would definitely be a performance bump and make it so the publisher and subscriber are the only ones that need to know how to serialize the contents.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants