Skip to content

Commit

Permalink
Make the chat distributed
Browse files Browse the repository at this point in the history
  • Loading branch information
angeloskath committed Nov 6, 2024
1 parent 1c52719 commit a14db45
Showing 1 changed file with 55 additions and 9 deletions.
64 changes: 55 additions & 9 deletions llms/mlx_lm/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,39 @@
DEFAULT_MAX_TOKENS = 256
DEFAULT_MODEL = "mlx-community/Llama-3.2-3B-Instruct-4bit"

MAX_PROMPT_CHARS = 16384

def send_chars(x, peer, group):
with mx.stream(mx.cpu):
x = mx.concatenate([x, mx.zeros(MAX_PROMPT_CHARS - len(x), dtype=x.dtype)])
mx.eval(mx.distributed.send(x, peer, group=group))


def recv_chars(peer, group):
with mx.stream(mx.cpu):
x = mx.distributed.recv((MAX_PROMPT_CHARS,), mx.uint8, peer, group=group)
mx.eval(x)
x = bytes(x)
idx = x.index(b'\x00'*4)
return x[:idx].decode()


def share_message(world, prompt):
if world.size() == 1:
return

if world.rank() == 0:
prompt_array = mx.array(prompt.encode())
for i in range(1, world.size()):
send_chars(prompt_array, i, world)
world.barrier()

else:
prompt = recv_chars(0, world)
world.barrier()

return prompt


def setup_arg_parser():
"""Set up and return the argument parser."""
Expand Down Expand Up @@ -53,6 +86,7 @@ def setup_arg_parser():


def main():
world = mx.distributed.init()
parser = setup_arg_parser()
args = parser.parse_args()

Expand All @@ -62,18 +96,27 @@ def main():
args.model,
adapter_path=args.adapter_path,
tokenizer_config={"trust_remote_code": True},
sequential_load=mx.distributed.init().size() > 1,
)

print(f"[INFO] Starting chat session with {args.model}. To exit, enter 'q'.")
print(f"Node {world.rank()} of {world.size()}", flush=True)
print(f"[INFO] Starting chat session with {args.model}. To exit, enter 'q'.", flush=True)
world.barrier()
prompt_cache = make_prompt_cache(model, args.max_kv_size)
while True:
query = input(">> ")
if query == "q":
prompt = None
if world.rank() == 0:
query = input(">> ")
if query == "q":
prompt = query
else:
messages = [{"role": "user", "content": query}]
prompt = tokenizer.apply_chat_template(
messages, tokenize=False, add_generation_prompt=True
)
prompt = share_message(world, prompt)
if prompt == "q":
break
messages = [{"role": "user", "content": query}]
prompt = tokenizer.apply_chat_template(
messages, tokenize=False, add_generation_prompt=True
)
for response in stream_generate(
model,
tokenizer,
Expand All @@ -83,9 +126,12 @@ def main():
top_p=args.top_p,
prompt_cache=prompt_cache,
):
print(response, flush=True, end="")
print()
if world.rank() == 0:
print(response, flush=True, end="")
if world.rank() == 0:
print()


if __name__ == "__main__":
main()

0 comments on commit a14db45

Please sign in to comment.