← back to gangpeng__zmq

Function bodies 324 total

All specs Real LLM only Function bodies
_recv_exact function · python · L84-L91 (8 LOC)
benchmarks/benchmark_compare.py
def _recv_exact(sock, n):
    buf = b''
    while len(buf) < n:
        chunk = sock.recv(min(65536, n - len(buf)))
        if not chunk:
            raise ConnectionError("closed")
        buf += chunk
    return buf
_recv_exact function · python · L84-L91 (8 LOC)
benchmarks/benchmark_compare.py
def _recv_exact(sock, n):
    buf = b''
    while len(buf) < n:
        chunk = sock.recv(min(65536, n - len(buf)))
        if not chunk:
            raise ConnectionError("closed")
        buf += chunk
    return buf
_recv_exact function · python · L84-L91 (8 LOC)
benchmarks/benchmark_compare.py
def _recv_exact(sock, n):
    buf = b''
    while len(buf) < n:
        chunk = sock.recv(min(65536, n - len(buf)))
        if not chunk:
            raise ConnectionError("closed")
        buf += chunk
    return buf
_recv_exact function · python · L84-L91 (8 LOC)
benchmarks/benchmark_compare.py
def _recv_exact(sock, n):
    buf = b''
    while len(buf) < n:
        chunk = sock.recv(min(65536, n - len(buf)))
        if not chunk:
            raise ConnectionError("closed")
        buf += chunk
    return buf
kafka_request_reuse function · python · L93-L100 (8 LOC)
benchmarks/benchmark_compare.py
def kafka_request_reuse(sock, api_key, api_version, corr_id, body=b''):
    client_id = b'bench'
    hdr = struct.pack('>hhih', api_key, api_version, corr_id, len(client_id)) + client_id
    frame = struct.pack('>I', len(hdr + body)) + hdr + body
    sock.sendall(frame)
    size_buf = _recv_exact(sock, 4)
    sz = struct.unpack('>I', size_buf)[0]
    return _recv_exact(sock, sz)
kafka_request_reuse function · python · L93-L100 (8 LOC)
benchmarks/benchmark_compare.py
def kafka_request_reuse(sock, api_key, api_version, corr_id, body=b''):
    client_id = b'bench'
    hdr = struct.pack('>hhih', api_key, api_version, corr_id, len(client_id)) + client_id
    frame = struct.pack('>I', len(hdr + body)) + hdr + body
    sock.sendall(frame)
    size_buf = _recv_exact(sock, 4)
    sz = struct.unpack('>I', size_buf)[0]
    return _recv_exact(sock, sz)
kafka_request_reuse function · python · L93-L100 (8 LOC)
benchmarks/benchmark_compare.py
def kafka_request_reuse(sock, api_key, api_version, corr_id, body=b''):
    client_id = b'bench'
    hdr = struct.pack('>hhih', api_key, api_version, corr_id, len(client_id)) + client_id
    frame = struct.pack('>I', len(hdr + body)) + hdr + body
    sock.sendall(frame)
    size_buf = _recv_exact(sock, 4)
    sz = struct.unpack('>I', size_buf)[0]
    return _recv_exact(sock, sz)
Generated by Repobility's multi-pass static-analysis pipeline (https://repobility.com)
kafka_request_reuse function · python · L93-L100 (8 LOC)
benchmarks/benchmark_compare.py
def kafka_request_reuse(sock, api_key, api_version, corr_id, body=b''):
    client_id = b'bench'
    hdr = struct.pack('>hhih', api_key, api_version, corr_id, len(client_id)) + client_id
    frame = struct.pack('>I', len(hdr + body)) + hdr + body
    sock.sendall(frame)
    size_buf = _recv_exact(sock, 4)
    sz = struct.unpack('>I', size_buf)[0]
    return _recv_exact(sock, sz)
kafka_request_fresh function · python · L102-L108 (7 LOC)
benchmarks/benchmark_compare.py
def kafka_request_fresh(port, api_key, api_version, corr_id, body=b''):
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.settimeout(10)
    s.connect(('127.0.0.1', port))
    resp = kafka_request_reuse(s, api_key, api_version, corr_id, body)
    s.close()
    return resp
kafka_request_fresh function · python · L102-L108 (7 LOC)
benchmarks/benchmark_compare.py
def kafka_request_fresh(port, api_key, api_version, corr_id, body=b''):
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.settimeout(10)
    s.connect(('127.0.0.1', port))
    resp = kafka_request_reuse(s, api_key, api_version, corr_id, body)
    s.close()
    return resp
kafka_request_fresh function · python · L102-L108 (7 LOC)
benchmarks/benchmark_compare.py
def kafka_request_fresh(port, api_key, api_version, corr_id, body=b''):
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.settimeout(10)
    s.connect(('127.0.0.1', port))
    resp = kafka_request_reuse(s, api_key, api_version, corr_id, body)
    s.close()
    return resp
kafka_request_fresh function · python · L102-L108 (7 LOC)
benchmarks/benchmark_compare.py
def kafka_request_fresh(port, api_key, api_version, corr_id, body=b''):
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.settimeout(10)
    s.connect(('127.0.0.1', port))
    resp = kafka_request_reuse(s, api_key, api_version, corr_id, body)
    s.close()
    return resp
create_topic function · python · L110-L125 (16 LOC)
benchmarks/benchmark_compare.py
def create_topic(sock, corr_id, name, partitions=3):
    """CreateTopics v2 — compatible with Kafka 4.2+ (which dropped v0-v1)."""
    name_b = name.encode()
    # num_topics=1
    body = struct.pack('>i', 1)
    # topic name (string16)
    body += struct.pack('>h', len(name_b)) + name_b
    # num_partitions, replication_factor
    body += struct.pack('>ih', partitions, 1)
    # num_assignments=0 (empty array, non-nullable in v2)
    body += struct.pack('>i', 0)
    # num_configs=0
    body += struct.pack('>i', 0)
    # timeout_ms=30000, validate_only=false
    body += struct.pack('>i', 30000) + struct.pack('>?', False)
    kafka_request_reuse(sock, 19, 2, corr_id, body)
create_topic function · python · L110-L125 (16 LOC)
benchmarks/benchmark_compare.py
def create_topic(sock, corr_id, name, partitions=3):
    """CreateTopics v2 — compatible with Kafka 4.2+ (which dropped v0-v1)."""
    name_b = name.encode()
    # num_topics=1
    body = struct.pack('>i', 1)
    # topic name (string16)
    body += struct.pack('>h', len(name_b)) + name_b
    # num_partitions, replication_factor
    body += struct.pack('>ih', partitions, 1)
    # num_assignments=0 (empty array, non-nullable in v2)
    body += struct.pack('>i', 0)
    # num_configs=0
    body += struct.pack('>i', 0)
    # timeout_ms=30000, validate_only=false
    body += struct.pack('>i', 30000) + struct.pack('>?', False)
    kafka_request_reuse(sock, 19, 2, corr_id, body)
create_topic function · python · L110-L125 (16 LOC)
benchmarks/benchmark_compare.py
def create_topic(sock, corr_id, name, partitions=3):
    """CreateTopics v2 — compatible with Kafka 4.2+ (which dropped v0-v1)."""
    name_b = name.encode()
    # num_topics=1
    body = struct.pack('>i', 1)
    # topic name (string16)
    body += struct.pack('>h', len(name_b)) + name_b
    # num_partitions, replication_factor
    body += struct.pack('>ih', partitions, 1)
    # num_assignments=0 (empty array, non-nullable in v2)
    body += struct.pack('>i', 0)
    # num_configs=0
    body += struct.pack('>i', 0)
    # timeout_ms=30000, validate_only=false
    body += struct.pack('>i', 30000) + struct.pack('>?', False)
    kafka_request_reuse(sock, 19, 2, corr_id, body)
Repobility — same analyzer, your code, free for public repos · /scan/
create_topic function · python · L110-L125 (16 LOC)
benchmarks/benchmark_compare.py
def create_topic(sock, corr_id, name, partitions=3):
    """CreateTopics v2 — compatible with Kafka 4.2+ (which dropped v0-v1)."""
    name_b = name.encode()
    # num_topics=1
    body = struct.pack('>i', 1)
    # topic name (string16)
    body += struct.pack('>h', len(name_b)) + name_b
    # num_partitions, replication_factor
    body += struct.pack('>ih', partitions, 1)
    # num_assignments=0 (empty array, non-nullable in v2)
    body += struct.pack('>i', 0)
    # num_configs=0
    body += struct.pack('>i', 0)
    # timeout_ms=30000, validate_only=false
    body += struct.pack('>i', 30000) + struct.pack('>?', False)
    kafka_request_reuse(sock, 19, 2, corr_id, body)
produce_body function · python · L127-L188 (62 LOC)
benchmarks/benchmark_compare.py
def produce_body(topic, partition, msg):
    """Produce v3 body with RecordBatch v2 format.

    Produce v3+ is required because Kafka 4.2 rejects v0-v2 despite reporting
    them as supported in ApiVersions.  v3 adds transactional_id (set to null).
    The record payload uses RecordBatch (magic=2) with CRC32C.
    """
    topic_b = topic.encode()
    if isinstance(msg, str):
        msg = msg.encode()

    # ── Build a single Record (inside the batch) ──
    record = bytearray()
    record.append(0)            # attributes
    record.append(0)            # timestampDelta (varint 0)
    record.append(0)            # offsetDelta (varint 0)
    record.append(0x01)         # keyLength = -1 zigzag-varint
    # (no key bytes)
    _encode_varint_into(record, len(msg))  # valueLength
    record.extend(msg)
    record.append(0)            # headersCount (varint 0)

    record_with_len = bytearray()
    _encode_varint_into(record_with_len, len(record))
    record_with_len.extend(record)

    # 
produce_body function · python · L127-L188 (62 LOC)
benchmarks/benchmark_compare.py
def produce_body(topic, partition, msg):
    """Produce v3 body with RecordBatch v2 format.

    Produce v3+ is required because Kafka 4.2 rejects v0-v2 despite reporting
    them as supported in ApiVersions.  v3 adds transactional_id (set to null).
    The record payload uses RecordBatch (magic=2) with CRC32C.
    """
    topic_b = topic.encode()
    if isinstance(msg, str):
        msg = msg.encode()

    # ── Build a single Record (inside the batch) ──
    record = bytearray()
    record.append(0)            # attributes
    record.append(0)            # timestampDelta (varint 0)
    record.append(0)            # offsetDelta (varint 0)
    record.append(0x01)         # keyLength = -1 zigzag-varint
    # (no key bytes)
    _encode_varint_into(record, len(msg))  # valueLength
    record.extend(msg)
    record.append(0)            # headersCount (varint 0)

    record_with_len = bytearray()
    _encode_varint_into(record_with_len, len(record))
    record_with_len.extend(record)

    # 
produce_body function · python · L127-L188 (62 LOC)
benchmarks/benchmark_compare.py
def produce_body(topic, partition, msg):
    """Produce v3 body with RecordBatch v2 format.

    Produce v3+ is required because Kafka 4.2 rejects v0-v2 despite reporting
    them as supported in ApiVersions.  v3 adds transactional_id (set to null).
    The record payload uses RecordBatch (magic=2) with CRC32C.
    """
    topic_b = topic.encode()
    if isinstance(msg, str):
        msg = msg.encode()

    # ── Build a single Record (inside the batch) ──
    record = bytearray()
    record.append(0)            # attributes
    record.append(0)            # timestampDelta (varint 0)
    record.append(0)            # offsetDelta (varint 0)
    record.append(0x01)         # keyLength = -1 zigzag-varint
    # (no key bytes)
    _encode_varint_into(record, len(msg))  # valueLength
    record.extend(msg)
    record.append(0)            # headersCount (varint 0)

    record_with_len = bytearray()
    _encode_varint_into(record_with_len, len(record))
    record_with_len.extend(record)

    # 
produce_body function · python · L127-L188 (62 LOC)
benchmarks/benchmark_compare.py
def produce_body(topic, partition, msg):
    """Produce v3 body with RecordBatch v2 format.

    Produce v3+ is required because Kafka 4.2 rejects v0-v2 despite reporting
    them as supported in ApiVersions.  v3 adds transactional_id (set to null).
    The record payload uses RecordBatch (magic=2) with CRC32C.
    """
    topic_b = topic.encode()
    if isinstance(msg, str):
        msg = msg.encode()

    # ── Build a single Record (inside the batch) ──
    record = bytearray()
    record.append(0)            # attributes
    record.append(0)            # timestampDelta (varint 0)
    record.append(0)            # offsetDelta (varint 0)
    record.append(0x01)         # keyLength = -1 zigzag-varint
    # (no key bytes)
    _encode_varint_into(record, len(msg))  # valueLength
    record.extend(msg)
    record.append(0)            # headersCount (varint 0)

    record_with_len = bytearray()
    _encode_varint_into(record_with_len, len(record))
    record_with_len.extend(record)

    # 
_encode_varint_into function · python · L193-L200 (8 LOC)
benchmarks/benchmark_compare.py
def _encode_varint_into(buf, value):
    """Encode a signed int as zigzag varint, appending to buf."""
    # Zigzag encode
    value = (value << 1) ^ (value >> 31)
    while value & ~0x7F:
        buf.append((value & 0x7F) | 0x80)
        value >>= 7
    buf.append(value & 0x7F)
_encode_varint_into function · python · L193-L200 (8 LOC)
benchmarks/benchmark_compare.py
def _encode_varint_into(buf, value):
    """Encode a signed int as zigzag varint, appending to buf."""
    # Zigzag encode
    value = (value << 1) ^ (value >> 31)
    while value & ~0x7F:
        buf.append((value & 0x7F) | 0x80)
        value >>= 7
    buf.append(value & 0x7F)
_encode_varint_into function · python · L193-L200 (8 LOC)
benchmarks/benchmark_compare.py
def _encode_varint_into(buf, value):
    """Encode a signed int as zigzag varint, appending to buf."""
    # Zigzag encode
    value = (value << 1) ^ (value >> 31)
    while value & ~0x7F:
        buf.append((value & 0x7F) | 0x80)
        value >>= 7
    buf.append(value & 0x7F)
Provenance: Repobility (https://repobility.com) — every score reproducible from /scan/
_encode_varint_into function · python · L193-L200 (8 LOC)
benchmarks/benchmark_compare.py
def _encode_varint_into(buf, value):
    """Encode a signed int as zigzag varint, appending to buf."""
    # Zigzag encode
    value = (value << 1) ^ (value >> 31)
    while value & ~0x7F:
        buf.append((value & 0x7F) | 0x80)
        value >>= 7
    buf.append(value & 0x7F)
_crc32c function · python · L202-L220 (19 LOC)
benchmarks/benchmark_compare.py
def _crc32c(data):
    """Compute CRC-32C (Castagnoli). Uses crcmod if available, else pure Python."""
    try:
        import crcmod
        fn = crcmod.predefined.mkCrcFun('crc-32c')
        return fn(data) & 0xFFFFFFFF
    except ImportError:
        pass
    # Pure-Python fallback (slow but correct)
    crc = 0xFFFFFFFF
    poly = 0x82F63B78
    for byte in data:
        crc ^= byte
        for _ in range(8):
            if crc & 1:
                crc = (crc >> 1) ^ poly
            else:
                crc >>= 1
    return crc ^ 0xFFFFFFFF
_crc32c function · python · L202-L220 (19 LOC)
benchmarks/benchmark_compare.py
def _crc32c(data):
    """Compute CRC-32C (Castagnoli). Uses crcmod if available, else pure Python."""
    try:
        import crcmod
        fn = crcmod.predefined.mkCrcFun('crc-32c')
        return fn(data) & 0xFFFFFFFF
    except ImportError:
        pass
    # Pure-Python fallback (slow but correct)
    crc = 0xFFFFFFFF
    poly = 0x82F63B78
    for byte in data:
        crc ^= byte
        for _ in range(8):
            if crc & 1:
                crc = (crc >> 1) ^ poly
            else:
                crc >>= 1
    return crc ^ 0xFFFFFFFF
_crc32c function · python · L202-L220 (19 LOC)
benchmarks/benchmark_compare.py
def _crc32c(data):
    """Compute CRC-32C (Castagnoli). Uses crcmod if available, else pure Python."""
    try:
        import crcmod
        fn = crcmod.predefined.mkCrcFun('crc-32c')
        return fn(data) & 0xFFFFFFFF
    except ImportError:
        pass
    # Pure-Python fallback (slow but correct)
    crc = 0xFFFFFFFF
    poly = 0x82F63B78
    for byte in data:
        crc ^= byte
        for _ in range(8):
            if crc & 1:
                crc = (crc >> 1) ^ poly
            else:
                crc >>= 1
    return crc ^ 0xFFFFFFFF
_crc32c function · python · L202-L220 (19 LOC)
benchmarks/benchmark_compare.py
def _crc32c(data):
    """Compute CRC-32C (Castagnoli). Uses crcmod if available, else pure Python."""
    try:
        import crcmod
        fn = crcmod.predefined.mkCrcFun('crc-32c')
        return fn(data) & 0xFFFFFFFF
    except ImportError:
        pass
    # Pure-Python fallback (slow but correct)
    crc = 0xFFFFFFFF
    poly = 0x82F63B78
    for byte in data:
        crc ^= byte
        for _ in range(8):
            if crc & 1:
                crc = (crc >> 1) ^ poly
            else:
                crc >>= 1
    return crc ^ 0xFFFFFFFF
fetch_body function · python · L222-L237 (16 LOC)
benchmarks/benchmark_compare.py
def fetch_body(topic, partition, offset, max_bytes=1048576):
    """Fetch v4 body — compatible with Kafka 4.2+ (which dropped v0-v3)."""
    topic_b = topic.encode()
    # replica_id=-1, max_wait_ms=100, min_bytes=1, max_bytes
    body = struct.pack('>iiii', -1, 100, 1, max_bytes)
    # isolation_level=0 (READ_UNCOMMITTED) — added in v4
    body += struct.pack('>b', 0)
    # num_topics=1
    body += struct.pack('>i', 1)
    # topic name (string16)
    body += struct.pack('>h', len(topic_b)) + topic_b
    # num_partitions=1
    body += struct.pack('>i', 1)
    # partition, fetch_offset, partition_max_bytes
    body += struct.pack('>iqi', partition, offset, max_bytes)
    return body
fetch_body function · python · L222-L237 (16 LOC)
benchmarks/benchmark_compare.py
def fetch_body(topic, partition, offset, max_bytes=1048576):
    """Fetch v4 body — compatible with Kafka 4.2+ (which dropped v0-v3)."""
    topic_b = topic.encode()
    # replica_id=-1, max_wait_ms=100, min_bytes=1, max_bytes
    body = struct.pack('>iiii', -1, 100, 1, max_bytes)
    # isolation_level=0 (READ_UNCOMMITTED) — added in v4
    body += struct.pack('>b', 0)
    # num_topics=1
    body += struct.pack('>i', 1)
    # topic name (string16)
    body += struct.pack('>h', len(topic_b)) + topic_b
    # num_partitions=1
    body += struct.pack('>i', 1)
    # partition, fetch_offset, partition_max_bytes
    body += struct.pack('>iqi', partition, offset, max_bytes)
    return body
fetch_body function · python · L222-L237 (16 LOC)
benchmarks/benchmark_compare.py
def fetch_body(topic, partition, offset, max_bytes=1048576):
    """Fetch v4 body — compatible with Kafka 4.2+ (which dropped v0-v3)."""
    topic_b = topic.encode()
    # replica_id=-1, max_wait_ms=100, min_bytes=1, max_bytes
    body = struct.pack('>iiii', -1, 100, 1, max_bytes)
    # isolation_level=0 (READ_UNCOMMITTED) — added in v4
    body += struct.pack('>b', 0)
    # num_topics=1
    body += struct.pack('>i', 1)
    # topic name (string16)
    body += struct.pack('>h', len(topic_b)) + topic_b
    # num_partitions=1
    body += struct.pack('>i', 1)
    # partition, fetch_offset, partition_max_bytes
    body += struct.pack('>iqi', partition, offset, max_bytes)
    return body
Want fix-PRs on findings? Install Repobility's GitHub App · github.com/apps/repobility-bot
fetch_body function · python · L222-L237 (16 LOC)
benchmarks/benchmark_compare.py
def fetch_body(topic, partition, offset, max_bytes=1048576):
    """Fetch v4 body — compatible with Kafka 4.2+ (which dropped v0-v3)."""
    topic_b = topic.encode()
    # replica_id=-1, max_wait_ms=100, min_bytes=1, max_bytes
    body = struct.pack('>iiii', -1, 100, 1, max_bytes)
    # isolation_level=0 (READ_UNCOMMITTED) — added in v4
    body += struct.pack('>b', 0)
    # num_topics=1
    body += struct.pack('>i', 1)
    # topic name (string16)
    body += struct.pack('>h', len(topic_b)) + topic_b
    # num_partitions=1
    body += struct.pack('>i', 1)
    # partition, fetch_offset, partition_max_bytes
    body += struct.pack('>iqi', partition, offset, max_bytes)
    return body
bench function · python · L244-L276 (33 LOC)
benchmarks/benchmark_compare.py
def bench(name, fn, iterations, warmup=100):
    """Run warmup, then measure iterations. Returns dict with throughput + latencies."""
    for i in range(warmup):
        try:
            fn(i)
        except Exception:
            pass

    latencies = []
    errors = 0
    t0 = time.monotonic()
    for i in range(iterations):
        start = time.monotonic()
        try:
            fn(warmup + i)
            latencies.append(time.monotonic() - start)
        except Exception:
            errors += 1
    elapsed = time.monotonic() - t0

    if not latencies:
        print(f"    {name}: FAILED ({errors} errors)")
        return {"throughput": 0, "p50": 0, "p99": 0, "p999": 0, "errors": errors}

    throughput = len(latencies) / elapsed
    latencies_ms = sorted([l * 1000 for l in latencies])
    p50 = latencies_ms[len(latencies_ms) // 2]
    p99 = latencies_ms[int(len(latencies_ms) * 0.99)]
    p999 = latencies_ms[int(len(latencies_ms) * 0.999)]

    suffix = f"  ({errors} errors)" if 
bench function · python · L244-L276 (33 LOC)
benchmarks/benchmark_compare.py
def bench(name, fn, iterations, warmup=100):
    """Run warmup, then measure iterations. Returns dict with throughput + latencies."""
    for i in range(warmup):
        try:
            fn(i)
        except Exception:
            pass

    latencies = []
    errors = 0
    t0 = time.monotonic()
    for i in range(iterations):
        start = time.monotonic()
        try:
            fn(warmup + i)
            latencies.append(time.monotonic() - start)
        except Exception:
            errors += 1
    elapsed = time.monotonic() - t0

    if not latencies:
        print(f"    {name}: FAILED ({errors} errors)")
        return {"throughput": 0, "p50": 0, "p99": 0, "p999": 0, "errors": errors}

    throughput = len(latencies) / elapsed
    latencies_ms = sorted([l * 1000 for l in latencies])
    p50 = latencies_ms[len(latencies_ms) // 2]
    p99 = latencies_ms[int(len(latencies_ms) * 0.99)]
    p999 = latencies_ms[int(len(latencies_ms) * 0.999)]

    suffix = f"  ({errors} errors)" if 
bench function · python · L244-L276 (33 LOC)
benchmarks/benchmark_compare.py
def bench(name, fn, iterations, warmup=100):
    """Run warmup, then measure iterations. Returns dict with throughput + latencies."""
    for i in range(warmup):
        try:
            fn(i)
        except Exception:
            pass

    latencies = []
    errors = 0
    t0 = time.monotonic()
    for i in range(iterations):
        start = time.monotonic()
        try:
            fn(warmup + i)
            latencies.append(time.monotonic() - start)
        except Exception:
            errors += 1
    elapsed = time.monotonic() - t0

    if not latencies:
        print(f"    {name}: FAILED ({errors} errors)")
        return {"throughput": 0, "p50": 0, "p99": 0, "p999": 0, "errors": errors}

    throughput = len(latencies) / elapsed
    latencies_ms = sorted([l * 1000 for l in latencies])
    p50 = latencies_ms[len(latencies_ms) // 2]
    p99 = latencies_ms[int(len(latencies_ms) * 0.99)]
    p999 = latencies_ms[int(len(latencies_ms) * 0.999)]

    suffix = f"  ({errors} errors)" if 
bench function · python · L244-L276 (33 LOC)
benchmarks/benchmark_compare.py
def bench(name, fn, iterations, warmup=100):
    """Run warmup, then measure iterations. Returns dict with throughput + latencies."""
    for i in range(warmup):
        try:
            fn(i)
        except Exception:
            pass

    latencies = []
    errors = 0
    t0 = time.monotonic()
    for i in range(iterations):
        start = time.monotonic()
        try:
            fn(warmup + i)
            latencies.append(time.monotonic() - start)
        except Exception:
            errors += 1
    elapsed = time.monotonic() - t0

    if not latencies:
        print(f"    {name}: FAILED ({errors} errors)")
        return {"throughput": 0, "p50": 0, "p99": 0, "p999": 0, "errors": errors}

    throughput = len(latencies) / elapsed
    latencies_ms = sorted([l * 1000 for l in latencies])
    p50 = latencies_ms[len(latencies_ms) // 2]
    p99 = latencies_ms[int(len(latencies_ms) * 0.99)]
    p999 = latencies_ms[int(len(latencies_ms) * 0.999)]

    suffix = f"  ({errors} errors)" if 
wait_for_broker function · python · L278-L291 (14 LOC)
benchmarks/benchmark_compare.py
def wait_for_broker(port, timeout=120):
    """Wait until the broker responds to ApiVersions."""
    deadline = time.monotonic() + timeout
    while time.monotonic() < deadline:
        try:
            s = socket.socket()
            s.settimeout(3)
            s.connect(('127.0.0.1', port))
            kafka_request_reuse(s, 18, 0, 1)
            s.close()
            return True
        except Exception:
            time.sleep(1)
    return False
wait_for_broker function · python · L278-L291 (14 LOC)
benchmarks/benchmark_compare.py
def wait_for_broker(port, timeout=120):
    """Wait until the broker responds to ApiVersions."""
    deadline = time.monotonic() + timeout
    while time.monotonic() < deadline:
        try:
            s = socket.socket()
            s.settimeout(3)
            s.connect(('127.0.0.1', port))
            kafka_request_reuse(s, 18, 0, 1)
            s.close()
            return True
        except Exception:
            time.sleep(1)
    return False
wait_for_broker function · python · L278-L291 (14 LOC)
benchmarks/benchmark_compare.py
def wait_for_broker(port, timeout=120):
    """Wait until the broker responds to ApiVersions."""
    deadline = time.monotonic() + timeout
    while time.monotonic() < deadline:
        try:
            s = socket.socket()
            s.settimeout(3)
            s.connect(('127.0.0.1', port))
            kafka_request_reuse(s, 18, 0, 1)
            s.close()
            return True
        except Exception:
            time.sleep(1)
    return False
Generated by Repobility's multi-pass static-analysis pipeline (https://repobility.com)
wait_for_broker function · python · L278-L291 (14 LOC)
benchmarks/benchmark_compare.py
def wait_for_broker(port, timeout=120):
    """Wait until the broker responds to ApiVersions."""
    deadline = time.monotonic() + timeout
    while time.monotonic() < deadline:
        try:
            s = socket.socket()
            s.settimeout(3)
            s.connect(('127.0.0.1', port))
            kafka_request_reuse(s, 18, 0, 1)
            s.close()
            return True
        except Exception:
            time.sleep(1)
    return False
run_benchmarks function · python · L293-L374 (82 LOC)
benchmarks/benchmark_compare.py
def run_benchmarks(label):
    """Run the full benchmark suite. Returns results dict or None on failure."""
    storage = "local disk" if "Kafka" in label else "MinIO S3"
    print(f"\n{'=' * 60}")
    print(f"  {label} Benchmark — 3-Node Cluster + {storage}")
    print(f"{'=' * 60}")

    print(f"  Waiting for broker on port {BROKER_PORT}...", end="", flush=True)
    if not wait_for_broker(BROKER_PORT):
        print(f" FAILED")
        return None
    print(f" OK")

    # Create topic (ignore errors — may already exist)
    sock = socket.socket()
    sock.settimeout(10)
    sock.connect(('127.0.0.1', BROKER_PORT))
    try:
        create_topic(sock, 1, "bench-topic", 3)
    except Exception:
        pass
    sock.close()
    time.sleep(1)

    results = {}

    # 1. ApiVersions
    print(f"\n  [1/5] ApiVersions (connection reuse, {ITERATIONS['api_versions']} iters)")
    sock = socket.socket()
    sock.settimeout(10)
    sock.connect(('127.0.0.1', BROKER_PORT))
    def api_versions_f
run_benchmarks function · python · L293-L374 (82 LOC)
benchmarks/benchmark_compare.py
def run_benchmarks(label):
    """Run the full benchmark suite. Returns results dict or None on failure."""
    storage = "local disk" if "Kafka" in label else "MinIO S3"
    print(f"\n{'=' * 60}")
    print(f"  {label} Benchmark — 3-Node Cluster + {storage}")
    print(f"{'=' * 60}")

    print(f"  Waiting for broker on port {BROKER_PORT}...", end="", flush=True)
    if not wait_for_broker(BROKER_PORT):
        print(f" FAILED")
        return None
    print(f" OK")

    # Create topic (ignore errors — may already exist)
    sock = socket.socket()
    sock.settimeout(10)
    sock.connect(('127.0.0.1', BROKER_PORT))
    try:
        create_topic(sock, 1, "bench-topic", 3)
    except Exception:
        pass
    sock.close()
    time.sleep(1)

    results = {}

    # 1. ApiVersions
    print(f"\n  [1/5] ApiVersions (connection reuse, {ITERATIONS['api_versions']} iters)")
    sock = socket.socket()
    sock.settimeout(10)
    sock.connect(('127.0.0.1', BROKER_PORT))
    def api_versions_f
run_benchmarks function · python · L293-L374 (82 LOC)
benchmarks/benchmark_compare.py
def run_benchmarks(label):
    """Run the full benchmark suite. Returns results dict or None on failure."""
    storage = "local disk" if "Kafka" in label else "MinIO S3"
    print(f"\n{'=' * 60}")
    print(f"  {label} Benchmark — 3-Node Cluster + {storage}")
    print(f"{'=' * 60}")

    print(f"  Waiting for broker on port {BROKER_PORT}...", end="", flush=True)
    if not wait_for_broker(BROKER_PORT):
        print(f" FAILED")
        return None
    print(f" OK")

    # Create topic (ignore errors — may already exist)
    sock = socket.socket()
    sock.settimeout(10)
    sock.connect(('127.0.0.1', BROKER_PORT))
    try:
        create_topic(sock, 1, "bench-topic", 3)
    except Exception:
        pass
    sock.close()
    time.sleep(1)

    results = {}

    # 1. ApiVersions
    print(f"\n  [1/5] ApiVersions (connection reuse, {ITERATIONS['api_versions']} iters)")
    sock = socket.socket()
    sock.settimeout(10)
    sock.connect(('127.0.0.1', BROKER_PORT))
    def api_versions_f
run_benchmarks function · python · L293-L374 (82 LOC)
benchmarks/benchmark_compare.py
def run_benchmarks(label):
    """Run the full benchmark suite. Returns results dict or None on failure."""
    storage = "local disk" if "Kafka" in label else "MinIO S3"
    print(f"\n{'=' * 60}")
    print(f"  {label} Benchmark — 3-Node Cluster + {storage}")
    print(f"{'=' * 60}")

    print(f"  Waiting for broker on port {BROKER_PORT}...", end="", flush=True)
    if not wait_for_broker(BROKER_PORT):
        print(f" FAILED")
        return None
    print(f" OK")

    # Create topic (ignore errors — may already exist)
    sock = socket.socket()
    sock.settimeout(10)
    sock.connect(('127.0.0.1', BROKER_PORT))
    try:
        create_topic(sock, 1, "bench-topic", 3)
    except Exception:
        pass
    sock.close()
    time.sleep(1)

    results = {}

    # 1. ApiVersions
    print(f"\n  [1/5] ApiVersions (connection reuse, {ITERATIONS['api_versions']} iters)")
    sock = socket.socket()
    sock.settimeout(10)
    sock.connect(('127.0.0.1', BROKER_PORT))
    def api_versions_f
compose_up function · python · L376-L388 (13 LOC)
benchmarks/benchmark_compare.py
def compose_up(compose_file, label, wait_secs=10):
    """Start a Docker Compose cluster."""
    print(f"\n>>> Starting {label} cluster...")
    r = subprocess.run(
        ["docker", "compose", "-f", compose_file, "up", "-d"],
        capture_output=True, text=True, cwd=PROJECT_DIR
    )
    if r.returncode != 0:
        print(f"  docker compose up failed: {r.stderr[:300]}")
        return False
    print(f"  Containers started, waiting {wait_secs}s for initialization...")
    time.sleep(wait_secs)
    return True
compose_up function · python · L376-L388 (13 LOC)
benchmarks/benchmark_compare.py
def compose_up(compose_file, label, wait_secs=10):
    """Start a Docker Compose cluster."""
    print(f"\n>>> Starting {label} cluster...")
    r = subprocess.run(
        ["docker", "compose", "-f", compose_file, "up", "-d"],
        capture_output=True, text=True, cwd=PROJECT_DIR
    )
    if r.returncode != 0:
        print(f"  docker compose up failed: {r.stderr[:300]}")
        return False
    print(f"  Containers started, waiting {wait_secs}s for initialization...")
    time.sleep(wait_secs)
    return True
compose_up function · python · L376-L388 (13 LOC)
benchmarks/benchmark_compare.py
def compose_up(compose_file, label, wait_secs=10):
    """Start a Docker Compose cluster."""
    print(f"\n>>> Starting {label} cluster...")
    r = subprocess.run(
        ["docker", "compose", "-f", compose_file, "up", "-d"],
        capture_output=True, text=True, cwd=PROJECT_DIR
    )
    if r.returncode != 0:
        print(f"  docker compose up failed: {r.stderr[:300]}")
        return False
    print(f"  Containers started, waiting {wait_secs}s for initialization...")
    time.sleep(wait_secs)
    return True
Repobility — same analyzer, your code, free for public repos · /scan/
compose_up function · python · L376-L388 (13 LOC)
benchmarks/benchmark_compare.py
def compose_up(compose_file, label, wait_secs=10):
    """Start a Docker Compose cluster."""
    print(f"\n>>> Starting {label} cluster...")
    r = subprocess.run(
        ["docker", "compose", "-f", compose_file, "up", "-d"],
        capture_output=True, text=True, cwd=PROJECT_DIR
    )
    if r.returncode != 0:
        print(f"  docker compose up failed: {r.stderr[:300]}")
        return False
    print(f"  Containers started, waiting {wait_secs}s for initialization...")
    time.sleep(wait_secs)
    return True
compose_down function · python · L390-L396 (7 LOC)
benchmarks/benchmark_compare.py
def compose_down(compose_file, label):
    """Stop and clean a Docker Compose cluster."""
    print(f"\n>>> Stopping {label} cluster...")
    subprocess.run(
        ["docker", "compose", "-f", compose_file, "down", "-v"],
        capture_output=True, cwd=PROJECT_DIR
    )
compose_down function · python · L390-L396 (7 LOC)
benchmarks/benchmark_compare.py
def compose_down(compose_file, label):
    """Stop and clean a Docker Compose cluster."""
    print(f"\n>>> Stopping {label} cluster...")
    subprocess.run(
        ["docker", "compose", "-f", compose_file, "down", "-v"],
        capture_output=True, cwd=PROJECT_DIR
    )
page 1 / 7next ›