Function bodies 324 total
_recv_exact function · python · L84-L91 (8 LOC)benchmarks/benchmark_compare.py
def _recv_exact(sock, n):
buf = b''
while len(buf) < n:
chunk = sock.recv(min(65536, n - len(buf)))
if not chunk:
raise ConnectionError("closed")
buf += chunk
return buf_recv_exact function · python · L84-L91 (8 LOC)benchmarks/benchmark_compare.py
def _recv_exact(sock, n):
buf = b''
while len(buf) < n:
chunk = sock.recv(min(65536, n - len(buf)))
if not chunk:
raise ConnectionError("closed")
buf += chunk
return buf_recv_exact function · python · L84-L91 (8 LOC)benchmarks/benchmark_compare.py
def _recv_exact(sock, n):
buf = b''
while len(buf) < n:
chunk = sock.recv(min(65536, n - len(buf)))
if not chunk:
raise ConnectionError("closed")
buf += chunk
return buf_recv_exact function · python · L84-L91 (8 LOC)benchmarks/benchmark_compare.py
def _recv_exact(sock, n):
buf = b''
while len(buf) < n:
chunk = sock.recv(min(65536, n - len(buf)))
if not chunk:
raise ConnectionError("closed")
buf += chunk
return bufkafka_request_reuse function · python · L93-L100 (8 LOC)benchmarks/benchmark_compare.py
def kafka_request_reuse(sock, api_key, api_version, corr_id, body=b''):
client_id = b'bench'
hdr = struct.pack('>hhih', api_key, api_version, corr_id, len(client_id)) + client_id
frame = struct.pack('>I', len(hdr + body)) + hdr + body
sock.sendall(frame)
size_buf = _recv_exact(sock, 4)
sz = struct.unpack('>I', size_buf)[0]
return _recv_exact(sock, sz)kafka_request_reuse function · python · L93-L100 (8 LOC)benchmarks/benchmark_compare.py
def kafka_request_reuse(sock, api_key, api_version, corr_id, body=b''):
client_id = b'bench'
hdr = struct.pack('>hhih', api_key, api_version, corr_id, len(client_id)) + client_id
frame = struct.pack('>I', len(hdr + body)) + hdr + body
sock.sendall(frame)
size_buf = _recv_exact(sock, 4)
sz = struct.unpack('>I', size_buf)[0]
return _recv_exact(sock, sz)kafka_request_reuse function · python · L93-L100 (8 LOC)benchmarks/benchmark_compare.py
def kafka_request_reuse(sock, api_key, api_version, corr_id, body=b''):
client_id = b'bench'
hdr = struct.pack('>hhih', api_key, api_version, corr_id, len(client_id)) + client_id
frame = struct.pack('>I', len(hdr + body)) + hdr + body
sock.sendall(frame)
size_buf = _recv_exact(sock, 4)
sz = struct.unpack('>I', size_buf)[0]
return _recv_exact(sock, sz)Generated by Repobility's multi-pass static-analysis pipeline (https://repobility.com)
kafka_request_reuse function · python · L93-L100 (8 LOC)benchmarks/benchmark_compare.py
def kafka_request_reuse(sock, api_key, api_version, corr_id, body=b''):
client_id = b'bench'
hdr = struct.pack('>hhih', api_key, api_version, corr_id, len(client_id)) + client_id
frame = struct.pack('>I', len(hdr + body)) + hdr + body
sock.sendall(frame)
size_buf = _recv_exact(sock, 4)
sz = struct.unpack('>I', size_buf)[0]
return _recv_exact(sock, sz)kafka_request_fresh function · python · L102-L108 (7 LOC)benchmarks/benchmark_compare.py
def kafka_request_fresh(port, api_key, api_version, corr_id, body=b''):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(10)
s.connect(('127.0.0.1', port))
resp = kafka_request_reuse(s, api_key, api_version, corr_id, body)
s.close()
return respkafka_request_fresh function · python · L102-L108 (7 LOC)benchmarks/benchmark_compare.py
def kafka_request_fresh(port, api_key, api_version, corr_id, body=b''):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(10)
s.connect(('127.0.0.1', port))
resp = kafka_request_reuse(s, api_key, api_version, corr_id, body)
s.close()
return respkafka_request_fresh function · python · L102-L108 (7 LOC)benchmarks/benchmark_compare.py
def kafka_request_fresh(port, api_key, api_version, corr_id, body=b''):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(10)
s.connect(('127.0.0.1', port))
resp = kafka_request_reuse(s, api_key, api_version, corr_id, body)
s.close()
return respkafka_request_fresh function · python · L102-L108 (7 LOC)benchmarks/benchmark_compare.py
def kafka_request_fresh(port, api_key, api_version, corr_id, body=b''):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(10)
s.connect(('127.0.0.1', port))
resp = kafka_request_reuse(s, api_key, api_version, corr_id, body)
s.close()
return respcreate_topic function · python · L110-L125 (16 LOC)benchmarks/benchmark_compare.py
def create_topic(sock, corr_id, name, partitions=3):
"""CreateTopics v2 — compatible with Kafka 4.2+ (which dropped v0-v1)."""
name_b = name.encode()
# num_topics=1
body = struct.pack('>i', 1)
# topic name (string16)
body += struct.pack('>h', len(name_b)) + name_b
# num_partitions, replication_factor
body += struct.pack('>ih', partitions, 1)
# num_assignments=0 (empty array, non-nullable in v2)
body += struct.pack('>i', 0)
# num_configs=0
body += struct.pack('>i', 0)
# timeout_ms=30000, validate_only=false
body += struct.pack('>i', 30000) + struct.pack('>?', False)
kafka_request_reuse(sock, 19, 2, corr_id, body)create_topic function · python · L110-L125 (16 LOC)benchmarks/benchmark_compare.py
def create_topic(sock, corr_id, name, partitions=3):
"""CreateTopics v2 — compatible with Kafka 4.2+ (which dropped v0-v1)."""
name_b = name.encode()
# num_topics=1
body = struct.pack('>i', 1)
# topic name (string16)
body += struct.pack('>h', len(name_b)) + name_b
# num_partitions, replication_factor
body += struct.pack('>ih', partitions, 1)
# num_assignments=0 (empty array, non-nullable in v2)
body += struct.pack('>i', 0)
# num_configs=0
body += struct.pack('>i', 0)
# timeout_ms=30000, validate_only=false
body += struct.pack('>i', 30000) + struct.pack('>?', False)
kafka_request_reuse(sock, 19, 2, corr_id, body)create_topic function · python · L110-L125 (16 LOC)benchmarks/benchmark_compare.py
def create_topic(sock, corr_id, name, partitions=3):
"""CreateTopics v2 — compatible with Kafka 4.2+ (which dropped v0-v1)."""
name_b = name.encode()
# num_topics=1
body = struct.pack('>i', 1)
# topic name (string16)
body += struct.pack('>h', len(name_b)) + name_b
# num_partitions, replication_factor
body += struct.pack('>ih', partitions, 1)
# num_assignments=0 (empty array, non-nullable in v2)
body += struct.pack('>i', 0)
# num_configs=0
body += struct.pack('>i', 0)
# timeout_ms=30000, validate_only=false
body += struct.pack('>i', 30000) + struct.pack('>?', False)
kafka_request_reuse(sock, 19, 2, corr_id, body)Repobility — same analyzer, your code, free for public repos · /scan/
create_topic function · python · L110-L125 (16 LOC)benchmarks/benchmark_compare.py
def create_topic(sock, corr_id, name, partitions=3):
"""CreateTopics v2 — compatible with Kafka 4.2+ (which dropped v0-v1)."""
name_b = name.encode()
# num_topics=1
body = struct.pack('>i', 1)
# topic name (string16)
body += struct.pack('>h', len(name_b)) + name_b
# num_partitions, replication_factor
body += struct.pack('>ih', partitions, 1)
# num_assignments=0 (empty array, non-nullable in v2)
body += struct.pack('>i', 0)
# num_configs=0
body += struct.pack('>i', 0)
# timeout_ms=30000, validate_only=false
body += struct.pack('>i', 30000) + struct.pack('>?', False)
kafka_request_reuse(sock, 19, 2, corr_id, body)produce_body function · python · L127-L188 (62 LOC)benchmarks/benchmark_compare.py
def produce_body(topic, partition, msg):
"""Produce v3 body with RecordBatch v2 format.
Produce v3+ is required because Kafka 4.2 rejects v0-v2 despite reporting
them as supported in ApiVersions. v3 adds transactional_id (set to null).
The record payload uses RecordBatch (magic=2) with CRC32C.
"""
topic_b = topic.encode()
if isinstance(msg, str):
msg = msg.encode()
# ── Build a single Record (inside the batch) ──
record = bytearray()
record.append(0) # attributes
record.append(0) # timestampDelta (varint 0)
record.append(0) # offsetDelta (varint 0)
record.append(0x01) # keyLength = -1 zigzag-varint
# (no key bytes)
_encode_varint_into(record, len(msg)) # valueLength
record.extend(msg)
record.append(0) # headersCount (varint 0)
record_with_len = bytearray()
_encode_varint_into(record_with_len, len(record))
record_with_len.extend(record)
# produce_body function · python · L127-L188 (62 LOC)benchmarks/benchmark_compare.py
def produce_body(topic, partition, msg):
"""Produce v3 body with RecordBatch v2 format.
Produce v3+ is required because Kafka 4.2 rejects v0-v2 despite reporting
them as supported in ApiVersions. v3 adds transactional_id (set to null).
The record payload uses RecordBatch (magic=2) with CRC32C.
"""
topic_b = topic.encode()
if isinstance(msg, str):
msg = msg.encode()
# ── Build a single Record (inside the batch) ──
record = bytearray()
record.append(0) # attributes
record.append(0) # timestampDelta (varint 0)
record.append(0) # offsetDelta (varint 0)
record.append(0x01) # keyLength = -1 zigzag-varint
# (no key bytes)
_encode_varint_into(record, len(msg)) # valueLength
record.extend(msg)
record.append(0) # headersCount (varint 0)
record_with_len = bytearray()
_encode_varint_into(record_with_len, len(record))
record_with_len.extend(record)
# produce_body function · python · L127-L188 (62 LOC)benchmarks/benchmark_compare.py
def produce_body(topic, partition, msg):
"""Produce v3 body with RecordBatch v2 format.
Produce v3+ is required because Kafka 4.2 rejects v0-v2 despite reporting
them as supported in ApiVersions. v3 adds transactional_id (set to null).
The record payload uses RecordBatch (magic=2) with CRC32C.
"""
topic_b = topic.encode()
if isinstance(msg, str):
msg = msg.encode()
# ── Build a single Record (inside the batch) ──
record = bytearray()
record.append(0) # attributes
record.append(0) # timestampDelta (varint 0)
record.append(0) # offsetDelta (varint 0)
record.append(0x01) # keyLength = -1 zigzag-varint
# (no key bytes)
_encode_varint_into(record, len(msg)) # valueLength
record.extend(msg)
record.append(0) # headersCount (varint 0)
record_with_len = bytearray()
_encode_varint_into(record_with_len, len(record))
record_with_len.extend(record)
# produce_body function · python · L127-L188 (62 LOC)benchmarks/benchmark_compare.py
def produce_body(topic, partition, msg):
"""Produce v3 body with RecordBatch v2 format.
Produce v3+ is required because Kafka 4.2 rejects v0-v2 despite reporting
them as supported in ApiVersions. v3 adds transactional_id (set to null).
The record payload uses RecordBatch (magic=2) with CRC32C.
"""
topic_b = topic.encode()
if isinstance(msg, str):
msg = msg.encode()
# ── Build a single Record (inside the batch) ──
record = bytearray()
record.append(0) # attributes
record.append(0) # timestampDelta (varint 0)
record.append(0) # offsetDelta (varint 0)
record.append(0x01) # keyLength = -1 zigzag-varint
# (no key bytes)
_encode_varint_into(record, len(msg)) # valueLength
record.extend(msg)
record.append(0) # headersCount (varint 0)
record_with_len = bytearray()
_encode_varint_into(record_with_len, len(record))
record_with_len.extend(record)
# _encode_varint_into function · python · L193-L200 (8 LOC)benchmarks/benchmark_compare.py
def _encode_varint_into(buf, value):
"""Encode a signed int as zigzag varint, appending to buf."""
# Zigzag encode
value = (value << 1) ^ (value >> 31)
while value & ~0x7F:
buf.append((value & 0x7F) | 0x80)
value >>= 7
buf.append(value & 0x7F)_encode_varint_into function · python · L193-L200 (8 LOC)benchmarks/benchmark_compare.py
def _encode_varint_into(buf, value):
"""Encode a signed int as zigzag varint, appending to buf."""
# Zigzag encode
value = (value << 1) ^ (value >> 31)
while value & ~0x7F:
buf.append((value & 0x7F) | 0x80)
value >>= 7
buf.append(value & 0x7F)_encode_varint_into function · python · L193-L200 (8 LOC)benchmarks/benchmark_compare.py
def _encode_varint_into(buf, value):
"""Encode a signed int as zigzag varint, appending to buf."""
# Zigzag encode
value = (value << 1) ^ (value >> 31)
while value & ~0x7F:
buf.append((value & 0x7F) | 0x80)
value >>= 7
buf.append(value & 0x7F)Provenance: Repobility (https://repobility.com) — every score reproducible from /scan/
_encode_varint_into function · python · L193-L200 (8 LOC)benchmarks/benchmark_compare.py
def _encode_varint_into(buf, value):
"""Encode a signed int as zigzag varint, appending to buf."""
# Zigzag encode
value = (value << 1) ^ (value >> 31)
while value & ~0x7F:
buf.append((value & 0x7F) | 0x80)
value >>= 7
buf.append(value & 0x7F)_crc32c function · python · L202-L220 (19 LOC)benchmarks/benchmark_compare.py
def _crc32c(data):
"""Compute CRC-32C (Castagnoli). Uses crcmod if available, else pure Python."""
try:
import crcmod
fn = crcmod.predefined.mkCrcFun('crc-32c')
return fn(data) & 0xFFFFFFFF
except ImportError:
pass
# Pure-Python fallback (slow but correct)
crc = 0xFFFFFFFF
poly = 0x82F63B78
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 1:
crc = (crc >> 1) ^ poly
else:
crc >>= 1
return crc ^ 0xFFFFFFFF_crc32c function · python · L202-L220 (19 LOC)benchmarks/benchmark_compare.py
def _crc32c(data):
"""Compute CRC-32C (Castagnoli). Uses crcmod if available, else pure Python."""
try:
import crcmod
fn = crcmod.predefined.mkCrcFun('crc-32c')
return fn(data) & 0xFFFFFFFF
except ImportError:
pass
# Pure-Python fallback (slow but correct)
crc = 0xFFFFFFFF
poly = 0x82F63B78
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 1:
crc = (crc >> 1) ^ poly
else:
crc >>= 1
return crc ^ 0xFFFFFFFF_crc32c function · python · L202-L220 (19 LOC)benchmarks/benchmark_compare.py
def _crc32c(data):
"""Compute CRC-32C (Castagnoli). Uses crcmod if available, else pure Python."""
try:
import crcmod
fn = crcmod.predefined.mkCrcFun('crc-32c')
return fn(data) & 0xFFFFFFFF
except ImportError:
pass
# Pure-Python fallback (slow but correct)
crc = 0xFFFFFFFF
poly = 0x82F63B78
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 1:
crc = (crc >> 1) ^ poly
else:
crc >>= 1
return crc ^ 0xFFFFFFFF_crc32c function · python · L202-L220 (19 LOC)benchmarks/benchmark_compare.py
def _crc32c(data):
"""Compute CRC-32C (Castagnoli). Uses crcmod if available, else pure Python."""
try:
import crcmod
fn = crcmod.predefined.mkCrcFun('crc-32c')
return fn(data) & 0xFFFFFFFF
except ImportError:
pass
# Pure-Python fallback (slow but correct)
crc = 0xFFFFFFFF
poly = 0x82F63B78
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 1:
crc = (crc >> 1) ^ poly
else:
crc >>= 1
return crc ^ 0xFFFFFFFFfetch_body function · python · L222-L237 (16 LOC)benchmarks/benchmark_compare.py
def fetch_body(topic, partition, offset, max_bytes=1048576):
"""Fetch v4 body — compatible with Kafka 4.2+ (which dropped v0-v3)."""
topic_b = topic.encode()
# replica_id=-1, max_wait_ms=100, min_bytes=1, max_bytes
body = struct.pack('>iiii', -1, 100, 1, max_bytes)
# isolation_level=0 (READ_UNCOMMITTED) — added in v4
body += struct.pack('>b', 0)
# num_topics=1
body += struct.pack('>i', 1)
# topic name (string16)
body += struct.pack('>h', len(topic_b)) + topic_b
# num_partitions=1
body += struct.pack('>i', 1)
# partition, fetch_offset, partition_max_bytes
body += struct.pack('>iqi', partition, offset, max_bytes)
return bodyfetch_body function · python · L222-L237 (16 LOC)benchmarks/benchmark_compare.py
def fetch_body(topic, partition, offset, max_bytes=1048576):
"""Fetch v4 body — compatible with Kafka 4.2+ (which dropped v0-v3)."""
topic_b = topic.encode()
# replica_id=-1, max_wait_ms=100, min_bytes=1, max_bytes
body = struct.pack('>iiii', -1, 100, 1, max_bytes)
# isolation_level=0 (READ_UNCOMMITTED) — added in v4
body += struct.pack('>b', 0)
# num_topics=1
body += struct.pack('>i', 1)
# topic name (string16)
body += struct.pack('>h', len(topic_b)) + topic_b
# num_partitions=1
body += struct.pack('>i', 1)
# partition, fetch_offset, partition_max_bytes
body += struct.pack('>iqi', partition, offset, max_bytes)
return bodyfetch_body function · python · L222-L237 (16 LOC)benchmarks/benchmark_compare.py
def fetch_body(topic, partition, offset, max_bytes=1048576):
"""Fetch v4 body — compatible with Kafka 4.2+ (which dropped v0-v3)."""
topic_b = topic.encode()
# replica_id=-1, max_wait_ms=100, min_bytes=1, max_bytes
body = struct.pack('>iiii', -1, 100, 1, max_bytes)
# isolation_level=0 (READ_UNCOMMITTED) — added in v4
body += struct.pack('>b', 0)
# num_topics=1
body += struct.pack('>i', 1)
# topic name (string16)
body += struct.pack('>h', len(topic_b)) + topic_b
# num_partitions=1
body += struct.pack('>i', 1)
# partition, fetch_offset, partition_max_bytes
body += struct.pack('>iqi', partition, offset, max_bytes)
return bodyWant fix-PRs on findings? Install Repobility's GitHub App · github.com/apps/repobility-bot
fetch_body function · python · L222-L237 (16 LOC)benchmarks/benchmark_compare.py
def fetch_body(topic, partition, offset, max_bytes=1048576):
"""Fetch v4 body — compatible with Kafka 4.2+ (which dropped v0-v3)."""
topic_b = topic.encode()
# replica_id=-1, max_wait_ms=100, min_bytes=1, max_bytes
body = struct.pack('>iiii', -1, 100, 1, max_bytes)
# isolation_level=0 (READ_UNCOMMITTED) — added in v4
body += struct.pack('>b', 0)
# num_topics=1
body += struct.pack('>i', 1)
# topic name (string16)
body += struct.pack('>h', len(topic_b)) + topic_b
# num_partitions=1
body += struct.pack('>i', 1)
# partition, fetch_offset, partition_max_bytes
body += struct.pack('>iqi', partition, offset, max_bytes)
return bodybench function · python · L244-L276 (33 LOC)benchmarks/benchmark_compare.py
def bench(name, fn, iterations, warmup=100):
"""Run warmup, then measure iterations. Returns dict with throughput + latencies."""
for i in range(warmup):
try:
fn(i)
except Exception:
pass
latencies = []
errors = 0
t0 = time.monotonic()
for i in range(iterations):
start = time.monotonic()
try:
fn(warmup + i)
latencies.append(time.monotonic() - start)
except Exception:
errors += 1
elapsed = time.monotonic() - t0
if not latencies:
print(f" {name}: FAILED ({errors} errors)")
return {"throughput": 0, "p50": 0, "p99": 0, "p999": 0, "errors": errors}
throughput = len(latencies) / elapsed
latencies_ms = sorted([l * 1000 for l in latencies])
p50 = latencies_ms[len(latencies_ms) // 2]
p99 = latencies_ms[int(len(latencies_ms) * 0.99)]
p999 = latencies_ms[int(len(latencies_ms) * 0.999)]
suffix = f" ({errors} errors)" if bench function · python · L244-L276 (33 LOC)benchmarks/benchmark_compare.py
def bench(name, fn, iterations, warmup=100):
"""Run warmup, then measure iterations. Returns dict with throughput + latencies."""
for i in range(warmup):
try:
fn(i)
except Exception:
pass
latencies = []
errors = 0
t0 = time.monotonic()
for i in range(iterations):
start = time.monotonic()
try:
fn(warmup + i)
latencies.append(time.monotonic() - start)
except Exception:
errors += 1
elapsed = time.monotonic() - t0
if not latencies:
print(f" {name}: FAILED ({errors} errors)")
return {"throughput": 0, "p50": 0, "p99": 0, "p999": 0, "errors": errors}
throughput = len(latencies) / elapsed
latencies_ms = sorted([l * 1000 for l in latencies])
p50 = latencies_ms[len(latencies_ms) // 2]
p99 = latencies_ms[int(len(latencies_ms) * 0.99)]
p999 = latencies_ms[int(len(latencies_ms) * 0.999)]
suffix = f" ({errors} errors)" if bench function · python · L244-L276 (33 LOC)benchmarks/benchmark_compare.py
def bench(name, fn, iterations, warmup=100):
"""Run warmup, then measure iterations. Returns dict with throughput + latencies."""
for i in range(warmup):
try:
fn(i)
except Exception:
pass
latencies = []
errors = 0
t0 = time.monotonic()
for i in range(iterations):
start = time.monotonic()
try:
fn(warmup + i)
latencies.append(time.monotonic() - start)
except Exception:
errors += 1
elapsed = time.monotonic() - t0
if not latencies:
print(f" {name}: FAILED ({errors} errors)")
return {"throughput": 0, "p50": 0, "p99": 0, "p999": 0, "errors": errors}
throughput = len(latencies) / elapsed
latencies_ms = sorted([l * 1000 for l in latencies])
p50 = latencies_ms[len(latencies_ms) // 2]
p99 = latencies_ms[int(len(latencies_ms) * 0.99)]
p999 = latencies_ms[int(len(latencies_ms) * 0.999)]
suffix = f" ({errors} errors)" if bench function · python · L244-L276 (33 LOC)benchmarks/benchmark_compare.py
def bench(name, fn, iterations, warmup=100):
"""Run warmup, then measure iterations. Returns dict with throughput + latencies."""
for i in range(warmup):
try:
fn(i)
except Exception:
pass
latencies = []
errors = 0
t0 = time.monotonic()
for i in range(iterations):
start = time.monotonic()
try:
fn(warmup + i)
latencies.append(time.monotonic() - start)
except Exception:
errors += 1
elapsed = time.monotonic() - t0
if not latencies:
print(f" {name}: FAILED ({errors} errors)")
return {"throughput": 0, "p50": 0, "p99": 0, "p999": 0, "errors": errors}
throughput = len(latencies) / elapsed
latencies_ms = sorted([l * 1000 for l in latencies])
p50 = latencies_ms[len(latencies_ms) // 2]
p99 = latencies_ms[int(len(latencies_ms) * 0.99)]
p999 = latencies_ms[int(len(latencies_ms) * 0.999)]
suffix = f" ({errors} errors)" if wait_for_broker function · python · L278-L291 (14 LOC)benchmarks/benchmark_compare.py
def wait_for_broker(port, timeout=120):
"""Wait until the broker responds to ApiVersions."""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
try:
s = socket.socket()
s.settimeout(3)
s.connect(('127.0.0.1', port))
kafka_request_reuse(s, 18, 0, 1)
s.close()
return True
except Exception:
time.sleep(1)
return Falsewait_for_broker function · python · L278-L291 (14 LOC)benchmarks/benchmark_compare.py
def wait_for_broker(port, timeout=120):
"""Wait until the broker responds to ApiVersions."""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
try:
s = socket.socket()
s.settimeout(3)
s.connect(('127.0.0.1', port))
kafka_request_reuse(s, 18, 0, 1)
s.close()
return True
except Exception:
time.sleep(1)
return Falsewait_for_broker function · python · L278-L291 (14 LOC)benchmarks/benchmark_compare.py
def wait_for_broker(port, timeout=120):
"""Wait until the broker responds to ApiVersions."""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
try:
s = socket.socket()
s.settimeout(3)
s.connect(('127.0.0.1', port))
kafka_request_reuse(s, 18, 0, 1)
s.close()
return True
except Exception:
time.sleep(1)
return FalseGenerated by Repobility's multi-pass static-analysis pipeline (https://repobility.com)
wait_for_broker function · python · L278-L291 (14 LOC)benchmarks/benchmark_compare.py
def wait_for_broker(port, timeout=120):
"""Wait until the broker responds to ApiVersions."""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
try:
s = socket.socket()
s.settimeout(3)
s.connect(('127.0.0.1', port))
kafka_request_reuse(s, 18, 0, 1)
s.close()
return True
except Exception:
time.sleep(1)
return Falserun_benchmarks function · python · L293-L374 (82 LOC)benchmarks/benchmark_compare.py
def run_benchmarks(label):
"""Run the full benchmark suite. Returns results dict or None on failure."""
storage = "local disk" if "Kafka" in label else "MinIO S3"
print(f"\n{'=' * 60}")
print(f" {label} Benchmark — 3-Node Cluster + {storage}")
print(f"{'=' * 60}")
print(f" Waiting for broker on port {BROKER_PORT}...", end="", flush=True)
if not wait_for_broker(BROKER_PORT):
print(f" FAILED")
return None
print(f" OK")
# Create topic (ignore errors — may already exist)
sock = socket.socket()
sock.settimeout(10)
sock.connect(('127.0.0.1', BROKER_PORT))
try:
create_topic(sock, 1, "bench-topic", 3)
except Exception:
pass
sock.close()
time.sleep(1)
results = {}
# 1. ApiVersions
print(f"\n [1/5] ApiVersions (connection reuse, {ITERATIONS['api_versions']} iters)")
sock = socket.socket()
sock.settimeout(10)
sock.connect(('127.0.0.1', BROKER_PORT))
def api_versions_frun_benchmarks function · python · L293-L374 (82 LOC)benchmarks/benchmark_compare.py
def run_benchmarks(label):
"""Run the full benchmark suite. Returns results dict or None on failure."""
storage = "local disk" if "Kafka" in label else "MinIO S3"
print(f"\n{'=' * 60}")
print(f" {label} Benchmark — 3-Node Cluster + {storage}")
print(f"{'=' * 60}")
print(f" Waiting for broker on port {BROKER_PORT}...", end="", flush=True)
if not wait_for_broker(BROKER_PORT):
print(f" FAILED")
return None
print(f" OK")
# Create topic (ignore errors — may already exist)
sock = socket.socket()
sock.settimeout(10)
sock.connect(('127.0.0.1', BROKER_PORT))
try:
create_topic(sock, 1, "bench-topic", 3)
except Exception:
pass
sock.close()
time.sleep(1)
results = {}
# 1. ApiVersions
print(f"\n [1/5] ApiVersions (connection reuse, {ITERATIONS['api_versions']} iters)")
sock = socket.socket()
sock.settimeout(10)
sock.connect(('127.0.0.1', BROKER_PORT))
def api_versions_frun_benchmarks function · python · L293-L374 (82 LOC)benchmarks/benchmark_compare.py
def run_benchmarks(label):
"""Run the full benchmark suite. Returns results dict or None on failure."""
storage = "local disk" if "Kafka" in label else "MinIO S3"
print(f"\n{'=' * 60}")
print(f" {label} Benchmark — 3-Node Cluster + {storage}")
print(f"{'=' * 60}")
print(f" Waiting for broker on port {BROKER_PORT}...", end="", flush=True)
if not wait_for_broker(BROKER_PORT):
print(f" FAILED")
return None
print(f" OK")
# Create topic (ignore errors — may already exist)
sock = socket.socket()
sock.settimeout(10)
sock.connect(('127.0.0.1', BROKER_PORT))
try:
create_topic(sock, 1, "bench-topic", 3)
except Exception:
pass
sock.close()
time.sleep(1)
results = {}
# 1. ApiVersions
print(f"\n [1/5] ApiVersions (connection reuse, {ITERATIONS['api_versions']} iters)")
sock = socket.socket()
sock.settimeout(10)
sock.connect(('127.0.0.1', BROKER_PORT))
def api_versions_frun_benchmarks function · python · L293-L374 (82 LOC)benchmarks/benchmark_compare.py
def run_benchmarks(label):
"""Run the full benchmark suite. Returns results dict or None on failure."""
storage = "local disk" if "Kafka" in label else "MinIO S3"
print(f"\n{'=' * 60}")
print(f" {label} Benchmark — 3-Node Cluster + {storage}")
print(f"{'=' * 60}")
print(f" Waiting for broker on port {BROKER_PORT}...", end="", flush=True)
if not wait_for_broker(BROKER_PORT):
print(f" FAILED")
return None
print(f" OK")
# Create topic (ignore errors — may already exist)
sock = socket.socket()
sock.settimeout(10)
sock.connect(('127.0.0.1', BROKER_PORT))
try:
create_topic(sock, 1, "bench-topic", 3)
except Exception:
pass
sock.close()
time.sleep(1)
results = {}
# 1. ApiVersions
print(f"\n [1/5] ApiVersions (connection reuse, {ITERATIONS['api_versions']} iters)")
sock = socket.socket()
sock.settimeout(10)
sock.connect(('127.0.0.1', BROKER_PORT))
def api_versions_fcompose_up function · python · L376-L388 (13 LOC)benchmarks/benchmark_compare.py
def compose_up(compose_file, label, wait_secs=10):
"""Start a Docker Compose cluster."""
print(f"\n>>> Starting {label} cluster...")
r = subprocess.run(
["docker", "compose", "-f", compose_file, "up", "-d"],
capture_output=True, text=True, cwd=PROJECT_DIR
)
if r.returncode != 0:
print(f" docker compose up failed: {r.stderr[:300]}")
return False
print(f" Containers started, waiting {wait_secs}s for initialization...")
time.sleep(wait_secs)
return Truecompose_up function · python · L376-L388 (13 LOC)benchmarks/benchmark_compare.py
def compose_up(compose_file, label, wait_secs=10):
"""Start a Docker Compose cluster."""
print(f"\n>>> Starting {label} cluster...")
r = subprocess.run(
["docker", "compose", "-f", compose_file, "up", "-d"],
capture_output=True, text=True, cwd=PROJECT_DIR
)
if r.returncode != 0:
print(f" docker compose up failed: {r.stderr[:300]}")
return False
print(f" Containers started, waiting {wait_secs}s for initialization...")
time.sleep(wait_secs)
return Truecompose_up function · python · L376-L388 (13 LOC)benchmarks/benchmark_compare.py
def compose_up(compose_file, label, wait_secs=10):
"""Start a Docker Compose cluster."""
print(f"\n>>> Starting {label} cluster...")
r = subprocess.run(
["docker", "compose", "-f", compose_file, "up", "-d"],
capture_output=True, text=True, cwd=PROJECT_DIR
)
if r.returncode != 0:
print(f" docker compose up failed: {r.stderr[:300]}")
return False
print(f" Containers started, waiting {wait_secs}s for initialization...")
time.sleep(wait_secs)
return TrueRepobility — same analyzer, your code, free for public repos · /scan/
compose_up function · python · L376-L388 (13 LOC)benchmarks/benchmark_compare.py
def compose_up(compose_file, label, wait_secs=10):
"""Start a Docker Compose cluster."""
print(f"\n>>> Starting {label} cluster...")
r = subprocess.run(
["docker", "compose", "-f", compose_file, "up", "-d"],
capture_output=True, text=True, cwd=PROJECT_DIR
)
if r.returncode != 0:
print(f" docker compose up failed: {r.stderr[:300]}")
return False
print(f" Containers started, waiting {wait_secs}s for initialization...")
time.sleep(wait_secs)
return Truecompose_down function · python · L390-L396 (7 LOC)benchmarks/benchmark_compare.py
def compose_down(compose_file, label):
"""Stop and clean a Docker Compose cluster."""
print(f"\n>>> Stopping {label} cluster...")
subprocess.run(
["docker", "compose", "-f", compose_file, "down", "-v"],
capture_output=True, cwd=PROJECT_DIR
)compose_down function · python · L390-L396 (7 LOC)benchmarks/benchmark_compare.py
def compose_down(compose_file, label):
"""Stop and clean a Docker Compose cluster."""
print(f"\n>>> Stopping {label} cluster...")
subprocess.run(
["docker", "compose", "-f", compose_file, "down", "-v"],
capture_output=True, cwd=PROJECT_DIR
)page 1 / 7next ›