forked from temporalio/samples-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
hello_mtls.py
97 lines (83 loc) · 2.87 KB
/
hello_mtls.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import argparse
import asyncio
from dataclasses import dataclass
from datetime import timedelta
from typing import Optional
from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.service import TLSConfig
from temporalio.worker import Worker
@dataclass
class ComposeGreetingInput:
greeting: str
name: str
# Basic activity that logs and does string concatenation
@activity.defn
async def compose_greeting(input: ComposeGreetingInput) -> str:
return f"{input.greeting}, {input.name}!"
# Basic workflow that logs and invokes an activity
@workflow.defn
class GreetingWorkflow:
@workflow.run
async def run(self, name: str) -> str:
return await workflow.execute_activity(
compose_greeting,
ComposeGreetingInput("Hello", name),
start_to_close_timeout=timedelta(seconds=10),
)
async def main():
# Load certs from CLI args
parser = argparse.ArgumentParser(description="Use mTLS with server")
parser.add_argument(
"--target-host", help="Host:port for the server", default="localhost:7233"
)
parser.add_argument(
"--namespace", help="Namespace for the server", default="default"
)
parser.add_argument(
"--server-root-ca-cert", help="Optional path to root server CA cert"
)
parser.add_argument(
"--client-cert", help="Required path to client cert", required=True
)
parser.add_argument(
"--client-key", help="Required path to client key", required=True
)
args = parser.parse_args()
server_root_ca_cert: Optional[bytes] = None
if args.server_root_ca_cert:
with open(args.server_root_ca_cert, "rb") as f:
server_root_ca_cert = f.read()
with open(args.client_cert, "rb") as f:
client_cert = f.read()
with open(args.client_key, "rb") as f:
client_key = f.read()
# Start client with TLS configured
client = await Client.connect(
args.target_host,
namespace=args.namespace,
tls=TLSConfig(
server_root_ca_cert=server_root_ca_cert,
client_cert=client_cert,
client_private_key=client_key,
),
)
# Run a worker for the workflow
async with Worker(
client,
task_queue="hello-mtls-task-queue",
workflows=[GreetingWorkflow],
activities=[compose_greeting],
):
# While the worker is running, use the client to run the workflow and
# print out its result. Note, in many production setups, the client
# would be in a completely separate process from the worker.
result = await client.execute_workflow(
GreetingWorkflow.run,
"World",
id="hello-mtls-workflow-id",
task_queue="hello-mtls-task-queue",
)
print(f"Result: {result}")
if __name__ == "__main__":
asyncio.run(main())