Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[issue] In effectively_once mode, when a single pod fails, the entire function will fail due to failure to create the producer. #711

Open
graysonzeng opened this issue Dec 25, 2023 · 15 comments

Comments

@graysonzeng
Copy link

graysonzeng commented Dec 25, 2023

  1. When I enable effectively_once and deploy the function in k8s, for example, after deploying 5 pods, when one of my pods crashes, his subscription will be transferred to other pods due to failover mode. At this time, other pods will fail to create the producer because the producer on the server side is not closed.
Failed to create producer: {"errorMsg":"org.apache.pulsar.broker.service.BrokerServiceException$NamingException: Producer with name 'persistent://pulsar/default/input_test-partition-5-0' is already connected to topic","reqId":1766584259806202457, "remote":"21.21.47.12/21.21.47.12:6650", "local":"/9.165.174.197:46786"}

After this, the function restarts due to an exception, and due to failover, the function once again transfers the subscription and fails due to failure to create the producer. Causes the all function pods to constantly restart
Therefore, when I need to enable effectively_once, I have to deploy multiple functions to consume partitioned topics separately. But this is not an easy way to maintain

  1. In addition, the function can easily fall into the following error and be stuck because of this error until the broker is restarted.
WARN  org.apache.pulsar.client.impl.ClientCnx - [id: 0x24fe09d6, L:/9.165.182.50:36944 ! R:21.21.134.241/21.21.134.241:6650] Received send error from server: PersistenceError : Cannot determine whether the message is a duplicate at this time

If we have any optimization suggestions, I hope can provide them, thank very much

@graysonzeng
Copy link
Author

graysonzeng commented Dec 25, 2023

@freeznet
@nlu90
@jiangpengcheng
PTAL

@jiangpengcheng
Copy link
Member

it looks like that the server should close the producer when the related client is crashed and doesn't close the producer

which version of Pulsar are you using? @graysonzeng

@graysonzeng
Copy link
Author

graysonzeng commented Jan 4, 2024

it looks like that the server should close the producer when the related client is crashed and doesn't close the producer

which version of Pulsar are you using? @graysonzeng

Thanks for your reply, the pulsar version is 3.1.1。I agree with you. It seems that some mechanism is needed to properly shut down the producer before creating it.

@jiangpengcheng
Copy link
Member

I will check it

@jiangpengcheng
Copy link
Member

I can't reproduce the error, when I manually delete one function pod, its producer for output topic will be closed by the server too, so the next active consumer can create the producer

could u share the yaml you used? @graysonzeng

@graysonzeng
Copy link
Author

graysonzeng commented Jan 12, 2024

I can't reproduce the error, when I manually delete one function pod, its producer for output topic will be closed by the server too, so the next active consumer can create the producer

could u share the yaml you used? @graysonzeng

Of course @jiangpengcheng

apiVersion: compute.functionmesh.io/v1alpha1
kind: FunctionMesh
metadata:
  name: functionmesh-001
spec:
  functions:
    - name: functions-dedup-v1
      className: com.tencent.functionNoTag
      image: mirrors.tencent.com/g_k_cdp/pulsar-functions-test:v1.0.1
      replicas: 10
      processingGuarantee: "effectively_once" #effectively_once manual
      pod:
        terminationGracePeriodSeconds: 30
      input:
        topics:
          - persistent://pulsar/default2/input_test
        typeClassName: "[B"
      output:
        topic: persistent://pulsar/default2/alltables3
        typeClassName: "[B"
      pulsar:
        pulsarConfig: "pulsar-dedup-gtmz-167-sink-test02-config-v1"
        authSecret: "sink-dedup-test02-config-auth"
      java:
        extraDependenciesDir: ""
        jar: /pulsar/connectors//DynamicTopic-1.0.nar # the NAR location in image.
        jarLocation: "" # leave empty since we will not download package from Pulsar Packages
      clusterName: pulsar-gtmz-167
      forwardSourceMessageProperty: true
      resources:
        requests:
          cpu: "2"
          memory: 2G
        limits:
          cpu: "2"
          memory: 2G
      clusterName: test-pulsar
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: pulsar-dedup-gtmz-167-sink-test02-config-v1
data:
  webServiceURL: http://xx.xx.47.12:8080
  brokerServiceURL: pulsar://xx.xx.47.12:6650
  authPlugin: "org.apache.pulsar.client.impl.auth.AuthenticationToken"
  authParams: "eyJhxxxx"
---
apiVersion: v1
kind: Secret
metadata:
  name: sink-dedup-test02-config-auth
stringData:
  clientAuthenticationPlugin: "org.apache.pulsar.client.impl.auth.AuthenticationToken"
  clientAuthenticationParameters: "token:eyxxxx"
  

@jiangpengcheng
Copy link
Member

I see, the error is caused by below:

when pod-1 crashes, another pod(like pod-2) will take over pod-1's subscription to pod-1 subscripted partition(like partition-5), if a message comes to partition-5, the pod-2 will create a new producer with the name persistent://pulsar/default/input_test-partition-5-0, then when pod-1 restarted and got a new message, it cannot create new producer with name persistent://pulsar/default/input_test-partition-5-0

this issue is caused by JavaInstanceRunnable, I will fix it in the apache/pulsar repo

@graysonzeng

@graysonzeng
Copy link
Author

graysonzeng commented Jan 17, 2024

I see, the error is caused by below:

when pod-1 crashes, another pod(like pod-2) will take over pod-1's subscription to pod-1 subscripted partition(like partition-5), if a message comes to partition-5, the pod-2 will create a new producer with the name persistent://pulsar/default/input_test-partition-5-0, then when pod-1 restarted and got a new message, it cannot create new producer with name persistent://pulsar/default/input_test-partition-5-0

this issue is caused by JavaInstanceRunnable, I will fix it in the apache/pulsar repo

@graysonzeng

thanks!

@jiangpengcheng
Copy link
Member

I created a pr here: apache/pulsar#21912, and built a jar based on it, could you add it to your runner image to check whether the issue is resolved? @graysonzeng

@jiangpengcheng
Copy link
Member

This issue is hard to fix since we cannot use different producer name to ensure the de-duplication. Below is a workaround when the error happens:

  1. set the function's replicas to 0
  2. wait until all function pods stop so that no producer exists for the output topic
  3. set back the function's replicas

cc @graysonzeng

@graysonzeng
Copy link
Author

Thanks for the suggestion. I have an idea, how about using independent subscription names for each pod. For example, I have 10 partitions on the consumer side and two pods for consumption. Then, my two pods set subscription names A and B respectively. After this, even if the pod fails, the subscription will not be switched to other surviving pods due to failover, and it can continue to consume when the pod is pulled up by kubernetes. @jiangpengcheng

@jiangpengcheng
Copy link
Member

with different sub names will make podA and podB both consume all ten partitions and lead to duplication

@graysonzeng
Copy link
Author

A and B consume part of the partitions respectively. For example, A consumes partitions 0-4 and B consumes partitions 5-9. This will not lead to duplication.

@jiangpengcheng
Copy link
Member

A and B consume part of the partitions respectively.

this needs to be specified manually, which is just like what you do now:

Therefore, when I need to enable effectively_once, I have to deploy multiple functions to consume partitioned topics separately. But this is not an easy way to maintain

@graysonzeng
Copy link
Author

graysonzeng commented Jan 30, 2024

Thanks . I originally thought that the creation of subscription does not need to be specified manually. In this way, the user can create a sink with mutil-replicas. No need to configure multiple configurations in yaml repeatedly. But it seems like this is the only way it can be for now @jiangpengcheng

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants