Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AWS SQS Queue Filtering #2066

Open
30 of 47 tasks
aviramha opened this issue Nov 9, 2023 · 27 comments
Open
30 of 47 tasks

AWS SQS Queue Filtering #2066

aviramha opened this issue Nov 9, 2023 · 27 comments
Assignees

Comments

@aviramha
Copy link
Member

aviramha commented Nov 9, 2023

As part of the mirrord for Teams solution, we'd like developers to be able to "fork" a queue based on a filter.
This means the original queue would then split into sub-queues based on a message attribute, so each engineer can consume their relevant messages.
This is currently in ideation and design stage

Current plan:

  • Implement new CRD - MirrordForkSpec?
  • You specify target deployment/rollout
  • You can specify list of MirrordForkType - first and only one for now will be SQS
  • You specify what env/configmap is used for controlling which SQS queue to use
  • You specify how to filter - probably message metadata is best
  • User can select fork specs in mirrord config file and specify its filter.
  • When user launch session with fork specs, it checks if a fork is already active
  • If a fork is active, it registers it's filter to it
  • If it is not, it creates a modified configmap / deployment with changed envs to use a new SQS queue
  • It spawns a pod (or the operator does it? I think diff pod would be better) that reads from the original queue, then applies the filter then writes to the different queues - "stable" and filtered.
  • When filter is removed, remove it's designated queue.
  • When no active filters, restore deployment and delete queues.

Example user config map:

As env from configmap

  XX_SQS_FIFO_QUEUE_NAME:
    valueFrom:
      configMapKeyRef:
        key: XX_fifo
        name: sqs-queues

AS mounted configmap

apiVersion: v1
data:
  config.yaml: |-
    queue_name: xx-queue-name

Issue state:

  • When a user sets a filter to any of the queues defined for their target, we'll set filters for all queues of that target, for the unspecified ones we will use a match-nothing filter.

Follow-up issues / things we're leaving for future versions:

  1. Deployment-wide incoming traffic: on the first version the local process will compete with the existing pods for traffic because we're using an altered copy-target. In the future the local process should receive deployment-wide traffic by default.

TODOs:

  • Plan complete run flow with splitters (~1 day)
  • Remaining Implementation:
    • Session start:
      • Session start when there is already a split:
        • Extracting pod spec from target in SQS controller. (~1 day)
        • Creating new config maps. (~1 day)
        • Creating a copy pod with changed spec. (~1 day)
        • Register filter.
      • Session start for first client: (~2 days)
        • Saving pre-splitting deployment spec.
        • Creating new config maps for deployment.
        • Changing and restarting deployment.
        • Start filtering task.
      • Manual testing ~ 2 days
        • Setup a cluster - AWS
        • Setup some SQS queues.
        • Run dev operator.
        • Test manually.
    • Cleanup:
      • Detect ended/failed-to-start sessions. (~2 days)
      • Delete filter resource/session resources.
      • Handle delete in controller. (~1 day)
      • On last user:
        • Restore spec from before split. (~1 day)
    • Service account - AWS auth for operator #2293
    • update helm chart (< 1 day).
    • Solve cooldown problem - main output queue currently has a deterministic name derived from the original queue's name, but it turns out after deleting a queue we can't create a new queue with the same name for a minute, so that name ought to be randomized. (almost no time, I think)
    • Change CRD status struct definitions not to contain PodSpecs so that they can be created with kubectl apply (~ 1 day). <-- WE ARE HERE
    • Merge the copy pod changes from https://github.com/metalbear-co/operator/pull/517 (hopefully ~ 1 day)
    • Add wait for target patch to happen, so that we only delete the temp main queue after the deployed app stops using it. (no time).
    • Add wait for target patch to happen also for rollouts. (~ 1 day).
    • Change HashMaps to BTreeMaps in CRDs. (no time).
    • Let users set tags in Splitter CRD.
    • Wait for main queue to be empty before returning to original queue (< 1 day).
    • Error handling and logging (~ 2 days)
    • Tests (~ 3 days)
      • Is connecting IAM to a service account possible on localstack?
      • Create localstack infra on test start?
    • Documentation
      • How to create splitter resources.
      • Configuration docs.
    • Test plan for manual team testing.

Next Version:

  • support and test using queues from config maps.
    • implement and test config map cleanup (also in all edge cases).
  • support Argo rollout targets with workloadRefs.
  • Solve conflict when two users start at the same time (< 1 day).
  • backup cleanup task: operator monitors sqssessions and mirrord client sessions and deletes sqssessions that don't have an active client session. IMO we can consider this approach: mirrord operator manages a K8s CRD of mirrord session and ensures it is in sync with active sessions, and the SQS Session resource is a child resource and is deleted automatically by k8s when the operator deletes the mirrord session resource.
@aviramha
Copy link
Member Author

Thinking about the implementation - I think we probably should spawn a pod that reads from the real queue, then change the current service to use the "forked" queue, and the local service to also use the forked queue (but with filter).

@t4lz t4lz self-assigned this Jan 11, 2024
@t4lz
Copy link
Member

t4lz commented Jan 11, 2024

I think it might be possible to implement without requiring the user to create a fork resource ahead of time. All the necessary info can be passed in the mirrord configuration, and the queue splitter can be started with the first filtering client. What do you think, @aviramha?

@aviramha
Copy link
Member Author

Yup I tried to go there in my explanation but might have been less clear

@t4lz
Copy link
Member

t4lz commented Jan 11, 2024

So we probably don't need a CRD, right?

@aviramha
Copy link
Member Author

So we probably don't need a CRD, right?

I thought we should have the CRD so the configuration will be centralized - I think having it in the mirrord json is complicated and you don't want users to mistakenly cause issues on the cluster by miscondiguration

@t4lz
Copy link
Member

t4lz commented Jan 11, 2024

Are region, config_map, and queue_name_key going to be appropriate for all types of queues (for when we support queues other than SQS sometime in the future), or should we just have an SQS-specific CRD?

@aviramha
Copy link
Member Author

Are region, config_map, and queue_name_key going to be appropriate for all types of queues (for when we support queues other than SQS sometime in the future), or should we just have an SQS-specific CRD?

from original issue:

You can specify list of MirrordForkType - first and only one for now will be SQS
btw it's either config map to be source or env variable, need to support both

I imagine having a CRD that specifies what kind of fun stuff happens when you start mirrord on a specific target, which is a list of options, and the list will only support for now SQS Forking.

@t4lz
Copy link
Member

t4lz commented Jan 11, 2024

Is splitting queues from multiple cloud services at the same time a common use-case? Otherwise we could make it a single enum object instead of a list of them.

@aviramha
Copy link
Member Author

Is splitting queues from multiple cloud services at the same time a common use-case? Otherwise we could make it a single enum object instead of a list of them.

Imagine a service using SQS + Kafka, or when we introduce DB splitting or other stuff similar - it'd be convenient to have this defined per "workflow" / deployment

@t4lz
Copy link
Member

t4lz commented Jan 14, 2024

I created CRD and configuration code at #2173 according to this issue. The configuration code isn't valid yet but already roughly represents the structure of the configuration.

An example MirrordQueueSplitter could look something like this:

apiVersion: splitters.mirrord.metalbear.co/v1alpha
kind: MirrordQueueSplitter
metadata:
  name: whatever-q-splitter
  namespace: default
spec:
  queues:
    updates-queue:
      SQS:
        queueNameSource:
          configMap:
            name: whatever-config-map
            queueNameKey: sqsUpdatesQueueName
        region: whatever-1
    tasks-queue:
      SQS:
        queueNameSource:
          configMap:
            name: whatever-config-map
            queueNameKey: sqsTasksQueueName
        region: whatever-1
  consumer:
    deployment: my-deploy

And the mirrord configuration file of a user could look something like this:

{
  "feature": {
    "split_queues": {
      "whatever-q-splitter": {
        "updates-queue": {
          "SQS": {
            "wows": {
              "number_attribute": [
                { "greater_than_int": 2 },
                { "lesser_than_int": 6 }
              ]
            },
            "coolz": {
              "string_attribute": "^very .*"
            }
          }
        }
      }
    }
  }
}

In my opinion that's a bit more complicated than it has to be. I recommend that instead, we create a CRD which we call MirrordSQSSplitter that splits a single queue. When we add support for other queue types in the future, we add similar CRDs for those other queue types.
This would obviate one level of mapping. In the current design the user gives the name of the MirrordQueueSplitter, and then for each queue under that splitter, an identifier that is defined in that splitter to identify the specific queue under it. In my opinion that identifier could be a bit confusing for users. If each queue gets its own MirrordSQSSplitter - the user only has to specify that one name of that resource in their mirrod config.
The user config could have a field sqs_queues which holds a resource-name -> queue-filter-definition mapping (and in the future there would be separate fields for the other types of queues).

@aviramha
Copy link
Member Author

Thinking about long term here, I think the CRD should be MirrordSplitter or something more general, since I imagine in the end we'd have more stuff that is done per target and having the configuration split between different objects is kinda annoying.
In the user configuration, to make it easier to follow I'd skip the SQS definition or put it as internally tagged and remove the whatever-q-splitter - there's already config per queue name (i.e "wows") we don't need them to explicitly mention the MirrordQueueSplitter they'd configure.

@t4lz
Copy link
Member

t4lz commented Jan 15, 2024

I see what you're saying. Just so we're on the same page:

{
  "feature": {
    "split_queues": {
      "whatever-q-splitter": {			<-- this is the name of the crd to get the ConfigMap/EnvVar from
        "first-queue": {			<-- if the crd contains many queues, we need to identify the specific one
          "SQS": {				<-- I'm not sure we can drop that if we have multiple q types in the same CRD
            "wows": {				<-- This is the name of the first attribute to filter SQS messages by.
              "number_attribute": [		<-- Attribute filter
                { "greater_than_int": 2 },
                { "lesser_than_int": 6 }
              ]
            },
            "coolz": {				<-- Another attribute
              "string_attribute": "^very .*"
            }
          }
        }
      }
    }
  }
}

@aviramha
Copy link
Member Author

{
  "feature": {
    "split_queues": {
      "whatever-q-splitter": {			<-- I imagine the mutation of the env var / config map would happen from the operator side - for example when user fetches env it patches it (could be a cool feature regardless tbh ;) replace env for users)
        "first-queue": {			<-- if the crd contains many queues, we need to identify the specific one - agree
          "SQS": {				<-- I'm not sure we can drop that if we have multiple q types in the same CRD - we can, just make it internally tagged enum. in any case, we can make it also not tagged so it will just see if it can parse it as the queue that exists in the remote - for example you'd send this configuration as JSON to the upstream, and operator knows it's SQS so it tries to parse this as SQS.
            "wows": {				<-- This is the name of the first attribute to filter SQS messages by. agree
              "number_attribute": [		<-- Attribute filter
                { "greater_than_int": 2 },
                { "lesser_than_int": 6 }
              ]
            },
            "coolz": {				<-- Another attribute
              "string_attribute": "^very .*"
            }
          }
        }
      }
    }
  }
}

@t4lz
Copy link
Member

t4lz commented Jan 15, 2024

So you want the operator to auto-detect the splitter resource based on the target's deployment/rollout?
The operator would check which splitter resource has that deployment/rollout and use that?

@aviramha
Copy link
Member Author

on another note - how did you think to do the queue filtering? in the operator itself? Initially I thought we'd spawn a job/pod for it but then realized it's better to start in the operator - also if another pod I'd reuse the operator image for enterprise users so they won't need to copy many images to their internal registry.
Also, another note is that we should have a way to cleanup queues - when we create those we should put a label that they're created by mirrord, and also maybe put them as a CRD with kind of keep-alive/managed by field that if an operator "crashes" or exists in a bad way, another instance can remove the unused queues and do the cleanup.

@aviramha
Copy link
Member Author

So you want the operator to auto-detect the splitter resource based on the target's deployment/rollout? The operator would check which splitter resource has that deployment/rollout and use that?

Yeah, I actually think that's kinda of a policy. I do think maybe later on we'd add a feature to opt-out from the splitting, but the default and initially it'd force the user to get splitted.

@t4lz
Copy link
Member

t4lz commented Jan 15, 2024

So you want the operator to auto-detect the splitter resource based on the target's deployment/rollout? The operator would check which splitter resource has that deployment/rollout and use that?

Yeah, I actually think that's kinda of a policy. I do think maybe later on we'd add a feature to opt-out from the splitting, but the default and initially it'd force the user to get splitted.

The user has to specify filters anyways, so I don't see why/when there would be force splitting. If the user does not specify a filter for a queue, we don't split it.

@aviramha
Copy link
Member Author

So you want the operator to auto-detect the splitter resource based on the target's deployment/rollout? The operator would check which splitter resource has that deployment/rollout and use that?

Yeah, I actually think that's kinda of a policy. I do think maybe later on we'd add a feature to opt-out from the splitting, but the default and initially it'd force the user to get splitted.

The user has to specify filters anyways, so I don't see why/when there would be force splitting. If the user does not specify a filter for a queue, we don't split it.

It's a UX decision that matters - what happens if a user without a configuration does that when it's already split by other users? You can:

  • Abort the unfiltered user session from starting
  • Provide an "empty" queue that wouldn't get any traffic
  • Let them get the fallback messages (all messages that aren't filtered)

I think our mandate should go towards safety of sharing environments by default, which would mean option 1, and that's why I'm leaning towards forcing users to get a split queue always, when a split is defined.

@t4lz
Copy link
Member

t4lz commented Jan 15, 2024

Yeah number 1 is reasonable (imo also 3 is good) - should we do it like our stealing policy:

  • you don't have to set a filter if you're the first client to target a queue-consuming target.
  • If someone is targeting a queue-consuming deployment without a filter - you can't target it until they're done.
  • If someone is targeting it with a filter - you have to also set a filter. In case of intersecting filters - earlier client gets the message.

@aviramha
Copy link
Member Author

  • you don't have to set a filter if you're the first client to target a queue-consuming target.
    I disagree on that, at least for the first version since it will cause unsafe behavior among shared environments, which is our target.

@t4lz
Copy link
Member

t4lz commented Jan 15, 2024

  • you don't have to set a filter if you're the first client to target a queue-consuming target.
    I disagree on that, at least for the first version since it will cause unsafe behavior among shared environments, which is our target.

What unsafe behaviour?

@aviramha
Copy link
Member Author

Taking all messages

@t4lz
Copy link
Member

t4lz commented Jan 15, 2024

Why is that unsafe?

@t4lz
Copy link
Member

t4lz commented Jan 15, 2024

on another note - how did you think to do the queue filtering? in the operator itself? Initially I thought we'd spawn a job/pod for it but then realized it's better to start in the operator - also if another pod I'd reuse the operator image for enterprise users so they won't need to copy many images to their internal registry. Also, another note is that we should have a way to cleanup queues - when we create those we should put a label that they're created by mirrord, and also maybe put them as a CRD with kind of keep-alive/managed by field that if an operator "crashes" or exists in a bad way, another instance can remove the unused queues and do the cleanup.

Just throwing another idea out there - we could also go in the direction of a microservice architecture and have a splitter service that the operator uses for splitting by sending requests to start/stop splitting.
I don't have a preference, so if you say it's best to do everything in the operator I'll just go with that.

@aviramha
Copy link
Member Author

Why is that unsafe?

Because it means a service answers for all and other users can't filter.

Just throwing another idea out there - we could also go in the direction of a microservice architecture and have a splitter service that the operator uses for splitting by sending requests to start/stop splitting.
I don't have a preference, so if you say it's best to do everything in the operator I'll just go with that.

I initially had same idea, but the problem is that it adds complexity and usually means another image, which means another image for the enterprise people to copy to their internal registries etc..

@t4lz
Copy link
Member

t4lz commented Mar 7, 2024

apiVersion: v1
data:
  config.yaml: |-
    queue_name: xx-queue-name

In this example from the original issue comment the queue name is defined inside a file/text field of a config map. I forgot about this example at some point. The interface we talked for the MirrordQueueSplitter custom resource is not really enough for handling that. What do we want the interface to be for that case - take a ConfigMap name + key name + "path" inside yaml file, and parse the config map field as a yaml file and get the queue name from the "path" in the yaml file?
Do we only support yaml? Subset of yaml? More formats, with detection by extension (e.g. .yaml) in ConfigMap key?

It's not a big change, doesn't affect schedule much.

@aviramha
Copy link
Member Author

aviramha commented Mar 7, 2024

I think we can go for the first case then extend for the first case.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants