Skip to content

Conversation

@shihaobai
Copy link
Collaborator

No description provided.

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @shihaobai, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly upgrades the LightLLM serving framework by introducing robust support for Reinforcement Learning (RL) training, particularly for VERL. The core changes revolve around advanced GPU memory management, enabling dynamic model weight updates, and refining the server's architecture for better control and communication. These enhancements pave the way for more flexible and efficient online model training and deployment scenarios.

Highlights

  • RL Training Support: Introduced comprehensive support for Reinforcement Learning (RL) training, specifically for VERL, by integrating new memory management capabilities and dynamic weight update mechanisms.
  • Memory Management: Implemented TorchMemorySaverWrapper to efficiently manage GPU memory regions (weights, KV cache, and CUDA graphs), allowing for pausing and resuming memory occupation. This includes new API endpoints for release_memory_occupation and resume_memory_occupation.
  • Dynamic Weight Updates: Added functionality for online weight updates, enabling models to be updated without restarting the server. This is facilitated by new API endpoints like init_weights_update_group, destroy_weights_update_group, update_weights_from_distributed, and update_weights_from_tensor.
  • API and Server Refactoring: Refactored server startup logic and API endpoints to improve modularity and support new features. This includes new endpoints for server information, weight version, request abortion, cache flushing, and generation control (pause/continue).
  • Inter-Process Communication (IPC) Enhancements: Introduced new data structures in io_struct.py for clearer and more robust communication between different server components. Updated ZMQ socket types for more efficient message passing.
  • Distributed Environment Improvements: Enhanced distributed training utilities by adding init_custom_process_group for flexible process group initialization and modifying init_process_group calls to avoid device-bound split optimizations that could hinder RL weight updates.
  • Serialization and Tensor Handling: Added serializer.py and tensor_bucket.py to provide secure and efficient serialization of tensors across multiprocessing boundaries, including support for flattened tensor buckets.
  • Kernel Autotuning Configurations: Included new Triton kernel autotuning configurations for NVIDIA H200 and H100 GPUs, optimizing performance for various grouped matrix multiplication and MoE operations.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces significant changes to support reinforcement learning (RL) training, including dynamic memory management and online weight updates. The refactoring of I/O structures and communication patterns is a positive step towards a more modular and maintainable codebase. The introduction of TorchMemorySaverWrapper provides a clean way to optionally use memory-saving features.

However, I've identified several issues that need attention, including a critical bug in the distributed handling of aborted requests, a logic error in request scheduling, and a potential race condition with asyncio.Event. There are also opportunities for code simplification and improved error handling. Please see the detailed comments for each issue.

[req.is_aborted for req in self.running_batch.reqs], dtype=torch.bool, device="cpu"
)
if self.is_multinode_tp:
dist.all_reduce(aborted_req_mask, op=dist.ReduceOp.MIN, group=self.mulitnode_group)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The all_reduce operation on aborted_req_mask uses dist.ReduceOp.MIN. For a boolean tensor, this is equivalent to a logical AND operation across all ranks. This means a request will only be considered aborted if it is marked as aborted on all ranks. The correct behavior should be to consider a request aborted if it is aborted on any rank. This requires a logical OR, which corresponds to dist.ReduceOp.MAX.

Suggested change
dist.all_reduce(aborted_req_mask, op=dist.ReduceOp.MIN, group=self.mulitnode_group)
dist.all_reduce(aborted_req_mask, op=dist.ReduceOp.MAX, group=self.mulitnode_group)

aborted_req_mask = torch.tensor(
[req.is_aborted for req in new_batch.reqs], dtype=torch.bool, device="cpu"
)
dist.all_reduce(aborted_req_mask, op=dist.ReduceOp.MIN, group=self.mulitnode_group)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The all_reduce operation on aborted_req_mask uses dist.ReduceOp.MIN. For a boolean tensor, this is equivalent to a logical AND operation across all ranks. This means a request will only be considered aborted if it is marked as aborted on all ranks. The correct behavior should be to consider a request aborted if it is aborted on any rank. This requires a logical OR, which corresponds to dist.ReduceOp.MAX.

Suggested change
dist.all_reduce(aborted_req_mask, op=dist.ReduceOp.MIN, group=self.mulitnode_group)
dist.all_reduce(aborted_req_mask, op=dist.ReduceOp.MAX, group=self.mulitnode_group)

select_req_ids.append(req_id)

aborted_req_mask = torch.tensor(aborted_req_mask, dtype=torch.bool, device="cpu")
dist.all_reduce(aborted_req_mask, op=dist.ReduceOp.MIN, group=self.mulitnode_group)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The all_reduce operation on aborted_req_mask uses dist.ReduceOp.MIN. For a boolean tensor, this is equivalent to a logical AND operation across all ranks. This means a request will only be considered aborted if it is marked as aborted on all ranks. The correct behavior should be to consider a request aborted if it is aborted on any rank. This requires a logical OR, which corresponds to dist.ReduceOp.MAX.

Suggested change
dist.all_reduce(aborted_req_mask, op=dist.ReduceOp.MIN, group=self.mulitnode_group)
dist.all_reduce(aborted_req_mask, op=dist.ReduceOp.MAX, group=self.mulitnode_group)

Comment on lines +103 to +104
return "unknown" # need fix
# raise RuntimeError("No GPU available")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The function get_current_device_name returns "unknown" when no GPU is available. This could mask potential configuration errors. It would be better to raise a RuntimeError as suggested in the commented-out code to fail fast and provide a clear error message when a GPU is expected but not found.

Suggested change
return "unknown" # need fix
# raise RuntimeError("No GPU available")
raise RuntimeError("No GPU available")

Comment on lines +896 to +911
async def http_to_model_special_request(
self, request: GeneralHttpToModelRpcReq, timeout: int = 300
) -> GeneralModelToHttpRpcRsp:
event = await self.get_event_for_func(request.func_name)
await self.transfer_to_next_module(request)
try:
await asyncio.wait_for(event.wait(), timeout=timeout)
ret = event.result

except asyncio.TimeoutError:
ret = GeneralModelToHttpRpcRsp(success=False, msg="wait for response timeout", func_name=request.func_name)
except Exception as e:
ret = GeneralModelToHttpRpcRsp(
success=False, msg="wait for response error: %s" % str(e), func_name=request.func_name
)
return ret
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The asyncio.Event object is not cleared after being used. If the same special request function is called again, the old event will still be set, causing event.wait() to return immediately with potentially stale data. This can lead to incorrect behavior. The event should be cleared before it's waited on.

Suggested change
async def http_to_model_special_request(
self, request: GeneralHttpToModelRpcReq, timeout: int = 300
) -> GeneralModelToHttpRpcRsp:
event = await self.get_event_for_func(request.func_name)
await self.transfer_to_next_module(request)
try:
await asyncio.wait_for(event.wait(), timeout=timeout)
ret = event.result
except asyncio.TimeoutError:
ret = GeneralModelToHttpRpcRsp(success=False, msg="wait for response timeout", func_name=request.func_name)
except Exception as e:
ret = GeneralModelToHttpRpcRsp(
success=False, msg="wait for response error: %s" % str(e), func_name=request.func_name
)
return ret
async def http_to_model_special_request(
self, request: GeneralHttpToModelRpcReq, timeout: int = 300
) -> GeneralModelToHttpRpcRsp:
event = await self.get_event_for_func(request.func_name)
event.clear()
await self.transfer_to_next_module(request)
try:
await asyncio.wait_for(event.wait(), timeout=timeout)
ret = event.result
except asyncio.TimeoutError:
ret = GeneralModelToHttpRpcRsp(success=False, msg="wait for response timeout", func_name=request.func_name)
except Exception as e:
ret = GeneralModelToHttpRpcRsp(
success=False, msg="wait for response error: %s" % str(e), func_name=request.func_name
)
return ret

cur_group_reqs, is_busy, new_batch_first_router_need_tokens
)
if ok_insert:
if ok_insert and False:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The condition if ok_insert and False: will always evaluate to false, which prevents the last group of requests (cur_group_reqs) from being added to the can_run_list. This appears to be a logic error, likely a leftover from debugging, and will cause requests to be unnecessarily delayed or starved.

Suggested change
if ok_insert and False:
if ok_insert:

Comment on lines +1077 to +1080
def release_all(self):
self.torch_memory_saver.pause(tag=MemoryTag.WEIGHT)
self.torch_memory_saver.pause(tag=MemoryTag.KV_CACHE)
self.torch_memory_saver.pause(tag=MemoryTag.GRAPH)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The release_all method can be made more concise and maintainable by iterating over a list of memory tags. This avoids code repetition and makes it easier to add or remove tags in the future.

Suggested change
def release_all(self):
self.torch_memory_saver.pause(tag=MemoryTag.WEIGHT)
self.torch_memory_saver.pause(tag=MemoryTag.KV_CACHE)
self.torch_memory_saver.pause(tag=MemoryTag.GRAPH)
def release_all(self):
for tag in [MemoryTag.WEIGHT, MemoryTag.KV_CACHE, MemoryTag.GRAPH]:
self.torch_memory_saver.pause(tag=tag)

Comment on lines +1091 to +1094
def resume_all(self):
self.torch_memory_saver.resume(tag=MemoryTag.WEIGHT)
self.torch_memory_saver.resume(tag=MemoryTag.KV_CACHE)
self.torch_memory_saver.resume(tag=MemoryTag.GRAPH)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The resume_all method can be made more concise and maintainable by iterating over a list of memory tags. This avoids code repetition and makes it easier to add or remove tags in the future.

Suggested change
def resume_all(self):
self.torch_memory_saver.resume(tag=MemoryTag.WEIGHT)
self.torch_memory_saver.resume(tag=MemoryTag.KV_CACHE)
self.torch_memory_saver.resume(tag=MemoryTag.GRAPH)
def resume_all(self):
for tag in [MemoryTag.WEIGHT, MemoryTag.KV_CACHE, MemoryTag.GRAPH]:
self.torch_memory_saver.resume(tag=tag)

if abort_all:
for group_req_id in list(self.req_id_to_out_inf.keys()):
await self.abort(group_req_id)
pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The pass statement at the end of this method is unnecessary and can be removed for cleaner code.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants