diff --git a/cmping.py b/cmping.py index a7e6a76..d484523 100644 --- a/cmping.py +++ b/cmping.py @@ -193,7 +193,8 @@ def main(): args.relay2 = args.relay1 pinger = perform_ping(args) - expected_total = pinger.sent * args.numrecipients + # Use actual number of receivers that joined, not args.numrecipients + expected_total = pinger.sent * len(pinger.receivers) raise SystemExit(0 if pinger.received == expected_total else 1) @@ -358,7 +359,9 @@ def wait_for_receivers_to_join(args, sender, receivers, timeout_seconds=30): timeout_seconds: Maximum time to wait for all receivers Returns: - int: Number of receivers that successfully joined + tuple: (joined_indices, join_errors) where: + - joined_indices: set of indices of receivers that successfully joined + - join_errors: list of (idx, error_message) tuples for failed receivers """ print("# Waiting for receivers to come online", end="", flush=True) sender_addr = sender.get_config("addr") @@ -367,6 +370,7 @@ def wait_for_receivers_to_join(args, sender, receivers, timeout_seconds=30): # Track which receivers have joined joined_receivers = set() joined_addrs = [] # Track addresses in order they joined + join_errors = [] # Track errors for receivers that failed to join receiver_threads_queue = queue.Queue() def wait_for_receiver_join(idx, receiver, deadline): @@ -445,6 +449,8 @@ def wait_for_receiver_join(idx, receiver, deadline): flush=True, ) elif event_type == "timeout": + receiver_addr = receivers[idx].get_config("addr") + join_errors.append((idx, f"receiver {receiver_addr} did not join group within {timeout_seconds}s")) print( f"\n# WARNING: receiver {idx} did not join group within {timeout_seconds}s" ) @@ -454,6 +460,8 @@ def wait_for_receiver_join(idx, receiver, deadline): flush=True, ) elif event_type == "exception": + receiver_addr = receivers[idx].get_config("addr") + join_errors.append((idx, f"receiver {receiver_addr} encountered exception: {data}")) print(f"\n# ERROR: receiver {idx} encountered exception: {data}") print( f"# Waiting for receivers to come online {len(joined_receivers)}/{total_receivers}", @@ -472,6 +480,20 @@ def wait_for_receiver_join(idx, receiver, deadline): for t in threads: t.join(timeout=1.0) + # Drain any remaining queue events to catch timeouts/errors that came in after the loop exited + # Use a simple loop with try/except - the queue.Empty exception will break us out + while True: + try: + event_type, idx, data = receiver_threads_queue.get_nowait() + if event_type == "timeout" and idx not in joined_receivers: + receiver_addr = receivers[idx].get_config("addr") + join_errors.append((idx, f"receiver {receiver_addr} did not join group within {timeout_seconds}s")) + elif event_type == "exception" and idx not in joined_receivers: + receiver_addr = receivers[idx].get_config("addr") + join_errors.append((idx, f"receiver {receiver_addr} encountered exception: {data}")) + except queue.Empty: + break + # Final status print( f"\r# Waiting for receivers to come online {len(joined_receivers)}/{total_receivers} - Complete!" @@ -492,7 +514,7 @@ def wait_for_receiver_join(idx, receiver, deadline): f"# WARNING: Only {len(joined_receivers)}/{total_receivers} receivers joined the group" ) - return len(joined_receivers) + return joined_receivers, join_errors def wait_profiles_online(maker): @@ -652,19 +674,24 @@ def perform_ping(args): group = create_and_promote_group(sender, receivers, verbose=args.verbose) # Wait for all receivers to join the group - wait_for_receivers_to_join(args, sender, receivers) + joined_indices, join_errors = wait_for_receivers_to_join(args, sender, receivers) + + # Filter receivers to only include those that successfully joined + joined_receivers = [receivers[idx] for idx in sorted(joined_indices)] group_join_time = time.time() - group_join_start # Phase 3: Message Ping/Pong (timed) message_start = time.time() - pinger = Pinger(args, sender, group, receivers) + pinger = Pinger(args, sender, group, joined_receivers, join_errors=join_errors) received = {} # Track current sequence for output formatting current_seq = None # Track timing for each sequence: {seq: {'count': N, 'first_time': ms, 'last_time': ms, 'size': bytes}} seq_tracking = {} + # Use the actual number of joined receivers for the total count + num_joined_receivers = len(joined_receivers) try: for seq, ms_duration, size, receiver_idx in pinger.receive(): if seq not in received: @@ -696,7 +723,7 @@ def perform_ping(args): # Print N/M ratio with in-place update (spinning effect) count = seq_tracking[seq]["count"] - total = args.numrecipients + total = num_joined_receivers # Calculate how many characters we need to overwrite from previous ratio if count > 1: # Backspace over previous ratio to update in-place @@ -753,6 +780,12 @@ def perform_ping(args): recv_rate = pinger.received / message_time print(f"recv rate: {recv_rate:.2f} msg/s") + # Print join errors at the end of output if any + if join_errors: + print("--- join errors ---") + for idx, error_msg in join_errors: + print(f"✗ {error_msg}") + return pinger finally: # Clean up all RPC contexts @@ -777,26 +810,29 @@ class Pinger: sent: Number of messages sent received: Number of messages received (across all receivers) loss: Percentage of expected messages not received + join_errors: List of (idx, error_message) tuples for receivers that failed to join """ - def __init__(self, args, sender, group, receivers): + def __init__(self, args, sender, group, receivers, join_errors=None): """Initialize Pinger and start sending messages. Args: args: Command line arguments sender: Sender account object group: Group chat object - receivers: List of receiver account objects + receivers: List of receiver account objects (only those that successfully joined) + join_errors: List of (idx, error_message) tuples for receivers that failed to join """ self.args = args self.sender = sender self.group = group self.receivers = receivers + self.join_errors = join_errors or [] self.addr1 = sender.get_config("addr") self.receivers_addrs = [receiver.get_config("addr") for receiver in receivers] self.receivers_addrs_str = ", ".join(self.receivers_addrs) self.relay1 = self.addr1.split("@")[1] - self.relay2 = self.receivers_addrs[0].split("@")[1] + self.relay2 = self.receivers_addrs[0].split("@")[1] if self.receivers_addrs else args.relay2 print( f"CMPING {self.relay1}({self.addr1}) -> {self.relay2}(group with {len(receivers)} receivers) count={args.count} interval={args.interval}s"