Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions examples/slice_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import libcachesim as lcs
import logging
logging.basicConfig(level=logging.DEBUG)


URI = "s3://cache-datasets/cache_dataset_oracleGeneral/2007_msr/msr_hm_0.oracleGeneral.zst"
reader = lcs.TraceReader(
trace = URI,
trace_type = lcs.TraceType.ORACLE_GENERAL_TRACE,
reader_init_params = lcs.ReaderInitParam(ignore_obj_size=False)
)

for req in reader[:3]:
print(req.obj_id, req.obj_size)

for req in reader[1:4]:
print(req.obj_id, req.obj_size)

reader.reset()
read_n_req = 4
for req in reader:
if read_n_req <= 0:
break
print(req.obj_id, req.obj_size)
read_n_req -= 1
3 changes: 3 additions & 0 deletions libcachesim/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ def get_occupied_byte(self) -> int:

def get_n_obj(self) -> int:
return self._cache.get_n_obj()

def set_cache_size(self, new_size: int) -> None:
self._cache.set_cache_size(new_size)

def print_cache(self) -> str:
return self._cache.print_cache()
Expand Down
56 changes: 44 additions & 12 deletions libcachesim/synthetic_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,28 @@
from .protocols import ReaderProtocol


class SyntheticReaderSliceIterator:
"""Iterator for sliced SyntheticReader."""

def __init__(self, reader: "SyntheticReader", start: int, stop: int, step: int):
self.reader = reader
self.start = start
self.stop = stop
self.step = step
self.current = start

def __iter__(self) -> Iterator[Request]:
return self

def __next__(self) -> Request:
if self.current >= self.stop:
raise StopIteration

req = self.reader[self.current]
self.current += self.step
return req


class SyntheticReader(ReaderProtocol):
"""Efficient synthetic request generator supporting multiple distributions"""

Expand Down Expand Up @@ -206,19 +228,29 @@ def __next__(self) -> Request:

return self.read_one_req()

def __getitem__(self, index: int) -> Request:
"""Support index access"""
if index < 0 or index >= self.num_of_req:
raise IndexError("Index out of range")
def __getitem__(self, key: Union[int, slice]) -> Union[Request, SyntheticReaderSliceIterator]:
"""Support index and slice access"""
if isinstance(key, slice):
# Handle slice
start, stop, step = key.indices(self.num_of_req)
return SyntheticReaderSliceIterator(self, start, stop, step)
elif isinstance(key, int):
# Handle single index
if key < 0:
key += self.num_of_req
if key < 0 or key >= self.num_of_req:
raise IndexError("Index out of range")

req = Request()
obj_id = self.obj_ids[index]
req.obj_id = obj_id
req.obj_size = self.obj_size
req.clock_time = index * self.time_span // self.num_of_req
req.op = ReqOp.OP_READ
req.valid = True
return req
req = Request()
obj_id = self.obj_ids[key]
req.obj_id = obj_id
req.obj_size = self.obj_size
req.clock_time = key * self.time_span // self.num_of_req
req.op = ReqOp.OP_READ
req.valid = True
return req
else:
raise TypeError("SyntheticReader indices must be integers or slices")


def _gen_zipf(m: int, alpha: float, n: int, start: int = 0) -> np.ndarray:
Expand Down
99 changes: 91 additions & 8 deletions libcachesim/trace_reader.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
"""Wrapper of Reader with S3 support."""
from __future__ import annotations

import logging
from typing import overload, Union, Optional
from typing import overload, Union, Optional, Any
from collections.abc import Iterator
from urllib.parse import urlparse

from .protocols import ReaderProtocol
from .libcachesim_python import (
TraceType,
SamplerType,
TraceFormat,
Request,
ReaderInitParam,
Reader,
Expand All @@ -21,6 +22,46 @@
logger = logging.getLogger(__name__)


class TraceReaderSliceIterator:
"""Iterator for sliced TraceReader."""

def __init__(self, reader: "TraceReader", start: int, stop: int, step: int):
self.reader = reader
self.start = start
self.stop = stop
self.step = step
self.current = start

def __iter__(self) -> Iterator[Request]:
return self

def __next__(self) -> Request:
if self.current >= self.stop:
raise StopIteration

# Reset reader and skip to current position
self.reader.reset()

# Check if we can use skip_n_req or need to simulate with read_one_req
# zstd files cannot use skip_n_req
if not self.reader._reader.is_zstd_file:
logger.debug(f"Skipping {self.current} requests using skip_n_req")
try:
self.reader.skip_n_req(self.current)
req = self.reader.read_one_req()
except RuntimeError:
logger.warning(f"Failed to skip {self.current} requests, falling back to simulation")
# Fallback to simulation if skip_n_req fails
req = self.reader._simulate_skip_and_read_single(self.current)
else:
logger.debug(f"Simulating skip by reading {self.current} requests one by one")
# Simulate skip by reading requests one by one
req = self.reader._simulate_skip_and_read_single(self.current)

self.current += self.step
return req


class TraceReader(ReaderProtocol):
_reader: Reader

Expand Down Expand Up @@ -302,10 +343,52 @@ def __next__(self) -> Request:
raise StopIteration
return req

def __getitem__(self, index: int) -> Request:
if index < 0 or index >= self._reader.get_num_of_req():
raise IndexError("Index out of range")
self._reader.reset()
self._reader.skip_n_req(index)
def __getitem__(self, key: Union[int, slice]) -> Union[Request, TraceReaderSliceIterator]:
if isinstance(key, slice):
# Handle slice
total_len = self._reader.get_num_of_req()
start, stop, step = key.indices(total_len)
return TraceReaderSliceIterator(self, start, stop, step)
elif isinstance(key, int):
# Handle single index
total_len = self._reader.get_num_of_req()
if key < 0:
key += total_len
if key < 0 or key >= total_len:
raise IndexError("Index out of range")

self._reader.reset()

# Check if we can use skip_n_req or need to simulate
if self._can_use_skip_n_req():
try:
self._reader.skip_n_req(key)
req = Request()
ret = self._reader.read_one_req(req)
if ret != 0:
raise RuntimeError("Failed to read request")
return req
except RuntimeError:
# Fallback to simulation
self._reader.reset()
return self._simulate_skip_and_read_single(key)
else:
# Simulate skip by reading requests one by one
return self._simulate_skip_and_read_single(key)
else:
raise TypeError("TraceReader indices must be integers or slices")

def _simulate_skip_and_read_single(self, index: int) -> Request:
"""Simulate skip_n_req by reading requests one by one for single index access."""
for _ in range(index):
req = Request()
ret = self._reader.read_one_req(req)
if ret != 0:
raise IndexError(f"Cannot reach index {index}")

# Read the target request
req = Request()
return self._reader.read_one_req(req)
ret = self._reader.read_one_req(req)
if ret != 0:
raise IndexError(f"Cannot read request at index {index}")
return req
4 changes: 4 additions & 0 deletions src/export_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,10 @@ void export_cache(py::module& m) {
.def("get_occupied_byte",
[](cache_t& self) { return self.get_occupied_byte(&self); })
.def("get_n_obj", [](cache_t& self) { return self.get_n_obj(&self); })
.def(
"set_cache_size",
[](cache_t& self, uint64_t new_size) { self.cache_size = new_size; },
"new_size"_a)
.def("print_cache", [](cache_t& self) {
// Capture stdout to return as string
std::ostringstream captured_output;
Expand Down
15 changes: 10 additions & 5 deletions src/export_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,13 @@ void export_reader(py::module& m) {
.value("UNKNOWN_TRACE", trace_type_e::UNKNOWN_TRACE)
.export_values();

// Trace format enumeration
py::enum_<trace_format_e>(m, "TraceFormat")
.value("BINARY_TRACE_FORMAT", trace_format_e::BINARY_TRACE_FORMAT)
.value("TXT_TRACE_FORMAT", trace_format_e::TXT_TRACE_FORMAT)
.value("INVALID_TRACE_FORMAT", trace_format_e::INVALID_TRACE_FORMAT)
.export_values();

py::enum_<read_direction>(m, "ReadDirection")
.value("READ_FORWARD", read_direction::READ_FORWARD)
.value("READ_BACKWARD", read_direction::READ_BACKWARD)
Expand Down Expand Up @@ -302,11 +309,9 @@ void export_reader(py::module& m) {
.def(
"skip_n_req",
[](reader_t& self, int n) {
int ret = skip_n_req(&self, n);
if (ret != 0) {
throw std::runtime_error("Failed to skip requests");
}
return ret;
int count = skip_n_req(&self, n);
// Return the actual number of requests skipped
return count;
},
"n"_a)
.def("read_one_req_above",
Expand Down
Loading