Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use serde::{Deserialize, Deserializer};
use std::{collections::HashMap, fmt::Display, net::Ipv4Addr, str::FromStr};
use syscalls::Sysno;

pub mod machine;
pub mod script;
pub mod worker;

Expand Down
115 changes: 115 additions & 0 deletions src/machine.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
use crate::script::ast::MachineInstruction;

use log::{debug, trace};
use std::{
fs,
io::{BufReader, prelude::*},
net::TcpListener,
thread,
};

use crate::script::ast::Node;

#[derive(Debug)]
pub enum MachineError {
Internal,
}

/// Start a dummy listening server
fn start_server(addr: String, target_port: u16) -> Result<(), MachineError> {
debug!("Starting server at {:?}:{:?}", addr, target_port);

let listener = TcpListener::bind((addr, target_port)).unwrap();

for stream in listener.incoming() {
let mut stream = stream.unwrap();

// As a simplest solution to keep a connection open, spawn a
// thread. It's not the best one though, as we waste resources.
// For the purpose of only keeping connections open we could e.g.
// spawn only two threads, where the first one receives connections
// and adds streams into the list of active, and the second iterates
// through streams and replies. This way the connections will have
// high latency, but for the purpose of networking workload it
// doesn't matter.
thread::spawn(move || {
loop {
let mut buf_reader = BufReader::new(&stream);
let mut buffer = String::new();

match buf_reader.read_line(&mut buffer) {
Ok(0) => {
// EOF, exit
trace!("EOF");
return;
}
Ok(_n) => {
trace!("Received {:?}", buffer);

let response = "hello\n";
match stream.write_all(response.as_bytes()) {
Ok(_) => {
// Response is sent, handle the next one
}
Err(e) => {
trace!("ERROR: sending response, {}", e);
break;
}
}
}
Err(e) => {
trace!("ERROR: reading a line, {}", e)
}
}
}
});
}

Ok(())
}

/// Make sure a path exists
fn ensure_path(path: String) -> Result<(), MachineError> {
debug!("Create path {path:?}");
match fs::create_dir_all(path) {
Ok(()) => {
debug!("Success");
Ok(())
}
Err(e) => {
debug!("Failure {e:?}");
Err(MachineError::Internal)
}
}
}

fn apply_instr(instr: MachineInstruction) -> Result<(), MachineError> {
match instr {
MachineInstruction::Server { port } => {
start_server("127.0.0.1".to_string(), port)
}
MachineInstruction::Profile { target: _ } => todo!(),
MachineInstruction::Path { value } => ensure_path(value),
}
}

pub fn apply(machine: Vec<&Node>) -> Result<(), MachineError> {
for m in machine {
let Node::Machine {
ref m_instructions, ..
} = *m
else {
unreachable!()
};

for instr in m_instructions.clone() {
// TODO: Note that for some machine statements there is a race
// conditions here, since we do not wait for them to apply.
// Introduce a distinction between background statements, and those
// that have to be performed synchronously.
thread::spawn(move || apply_instr(instr));
}
}

Ok(())
}
20 changes: 15 additions & 5 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use berserker::{
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};

use berserker::machine::apply;
use berserker::script::{
ast::Node, parser::parse_instructions, rules::apply_rules,
};
Expand Down Expand Up @@ -65,12 +66,14 @@ fn run_script(script_path: String) -> Vec<(i32, u64)> {
parse_instructions(&std::fs::read_to_string(script_path).unwrap())
.unwrap();

let (_machine, works): (Vec<_>, Vec<_>) =
let (machine, works): (Vec<_>, Vec<_>) =
ast.iter().partition_map(|node| match node {
Node::Work { .. } => Either::Right(node),
Node::Machine { .. } => Either::Left(node),
});

let _ = apply(machine);

apply_rules(works)
.into_iter()
.flat_map(|node| {
Expand Down Expand Up @@ -311,20 +314,27 @@ mod tests {
#[test]
fn test_file_script() {
let input = r#"
machine {
path("/tmp/data");
}

random (workers = 1, duration = 10) {
open(random_path("/tmp"));
open(random_path("/tmp/data"));
}

repeated (workers = 1, duration = 10) {
open("/tmp/test");
open("/tmp/data/test");
}
"#;

let ast: Vec<Node> = parse_instructions(input).unwrap();
assert_eq!(ast.len(), 2);
assert_eq!(ast.len(), 3);

new_script_worker(ast[0].clone()).run_payload().unwrap();
// apply machine statements
let _ = apply(vec![&ast[0]]);

// run workers
new_script_worker(ast[1].clone()).run_payload().unwrap();
new_script_worker(ast[2].clone()).run_payload().unwrap();
}
}
Loading