Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 45 additions & 9 deletions cmping.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,8 @@ def main():
args.relay2 = args.relay1

pinger = perform_ping(args)
expected_total = pinger.sent * args.numrecipients
# Use actual number of receivers that joined, not args.numrecipients
expected_total = pinger.sent * len(pinger.receivers)
raise SystemExit(0 if pinger.received == expected_total else 1)


Expand Down Expand Up @@ -358,7 +359,9 @@ def wait_for_receivers_to_join(args, sender, receivers, timeout_seconds=30):
timeout_seconds: Maximum time to wait for all receivers

Returns:
int: Number of receivers that successfully joined
tuple: (joined_indices, join_errors) where:
- joined_indices: set of indices of receivers that successfully joined
- join_errors: list of (idx, error_message) tuples for failed receivers
"""
print("# Waiting for receivers to come online", end="", flush=True)
sender_addr = sender.get_config("addr")
Expand All @@ -367,6 +370,7 @@ def wait_for_receivers_to_join(args, sender, receivers, timeout_seconds=30):
# Track which receivers have joined
joined_receivers = set()
joined_addrs = [] # Track addresses in order they joined
join_errors = [] # Track errors for receivers that failed to join
receiver_threads_queue = queue.Queue()

def wait_for_receiver_join(idx, receiver, deadline):
Expand Down Expand Up @@ -445,6 +449,8 @@ def wait_for_receiver_join(idx, receiver, deadline):
flush=True,
)
elif event_type == "timeout":
receiver_addr = receivers[idx].get_config("addr")
join_errors.append((idx, f"receiver {receiver_addr} did not join group within {timeout_seconds}s"))
print(
f"\n# WARNING: receiver {idx} did not join group within {timeout_seconds}s"
)
Expand All @@ -454,6 +460,8 @@ def wait_for_receiver_join(idx, receiver, deadline):
flush=True,
)
elif event_type == "exception":
receiver_addr = receivers[idx].get_config("addr")
join_errors.append((idx, f"receiver {receiver_addr} encountered exception: {data}"))
print(f"\n# ERROR: receiver {idx} encountered exception: {data}")
print(
f"# Waiting for receivers to come online {len(joined_receivers)}/{total_receivers}",
Expand All @@ -472,6 +480,20 @@ def wait_for_receiver_join(idx, receiver, deadline):
for t in threads:
t.join(timeout=1.0)

# Drain any remaining queue events to catch timeouts/errors that came in after the loop exited
# Use a simple loop with try/except - the queue.Empty exception will break us out
while True:
try:
event_type, idx, data = receiver_threads_queue.get_nowait()
if event_type == "timeout" and idx not in joined_receivers:
receiver_addr = receivers[idx].get_config("addr")
join_errors.append((idx, f"receiver {receiver_addr} did not join group within {timeout_seconds}s"))
elif event_type == "exception" and idx not in joined_receivers:
receiver_addr = receivers[idx].get_config("addr")
join_errors.append((idx, f"receiver {receiver_addr} encountered exception: {data}"))
except queue.Empty:
break

# Final status
print(
f"\r# Waiting for receivers to come online {len(joined_receivers)}/{total_receivers} - Complete!"
Expand All @@ -492,7 +514,7 @@ def wait_for_receiver_join(idx, receiver, deadline):
f"# WARNING: Only {len(joined_receivers)}/{total_receivers} receivers joined the group"
)

return len(joined_receivers)
return joined_receivers, join_errors


def wait_profiles_online(maker):
Expand Down Expand Up @@ -652,19 +674,24 @@ def perform_ping(args):
group = create_and_promote_group(sender, receivers, verbose=args.verbose)

# Wait for all receivers to join the group
wait_for_receivers_to_join(args, sender, receivers)
joined_indices, join_errors = wait_for_receivers_to_join(args, sender, receivers)

# Filter receivers to only include those that successfully joined
joined_receivers = [receivers[idx] for idx in sorted(joined_indices)]

group_join_time = time.time() - group_join_start

# Phase 3: Message Ping/Pong (timed)
message_start = time.time()

pinger = Pinger(args, sender, group, receivers)
pinger = Pinger(args, sender, group, joined_receivers, join_errors=join_errors)
received = {}
# Track current sequence for output formatting
current_seq = None
# Track timing for each sequence: {seq: {'count': N, 'first_time': ms, 'last_time': ms, 'size': bytes}}
seq_tracking = {}
# Use the actual number of joined receivers for the total count
num_joined_receivers = len(joined_receivers)
try:
for seq, ms_duration, size, receiver_idx in pinger.receive():
if seq not in received:
Expand Down Expand Up @@ -696,7 +723,7 @@ def perform_ping(args):

# Print N/M ratio with in-place update (spinning effect)
count = seq_tracking[seq]["count"]
total = args.numrecipients
total = num_joined_receivers
# Calculate how many characters we need to overwrite from previous ratio
if count > 1:
# Backspace over previous ratio to update in-place
Expand Down Expand Up @@ -753,6 +780,12 @@ def perform_ping(args):
recv_rate = pinger.received / message_time
print(f"recv rate: {recv_rate:.2f} msg/s")

# Print join errors at the end of output if any
if join_errors:
print("--- join errors ---")
for idx, error_msg in join_errors:
print(f"✗ {error_msg}")

return pinger
finally:
# Clean up all RPC contexts
Expand All @@ -777,26 +810,29 @@ class Pinger:
sent: Number of messages sent
received: Number of messages received (across all receivers)
loss: Percentage of expected messages not received
join_errors: List of (idx, error_message) tuples for receivers that failed to join
"""

def __init__(self, args, sender, group, receivers):
def __init__(self, args, sender, group, receivers, join_errors=None):
"""Initialize Pinger and start sending messages.

Args:
args: Command line arguments
sender: Sender account object
group: Group chat object
receivers: List of receiver account objects
receivers: List of receiver account objects (only those that successfully joined)
join_errors: List of (idx, error_message) tuples for receivers that failed to join
"""
self.args = args
self.sender = sender
self.group = group
self.receivers = receivers
self.join_errors = join_errors or []
self.addr1 = sender.get_config("addr")
self.receivers_addrs = [receiver.get_config("addr") for receiver in receivers]
self.receivers_addrs_str = ", ".join(self.receivers_addrs)
self.relay1 = self.addr1.split("@")[1]
self.relay2 = self.receivers_addrs[0].split("@")[1]
self.relay2 = self.receivers_addrs[0].split("@")[1] if self.receivers_addrs else args.relay2

print(
f"CMPING {self.relay1}({self.addr1}) -> {self.relay2}(group with {len(receivers)} receivers) count={args.count} interval={args.interval}s"
Expand Down