The Computer Language
Benchmarks Game

chameneos-redux Rust #2 program

source code

// The Computer Language Benchmarks Game
// http://benchmarksgame.alioth.debian.org/
//
// contributed by the Rust Project Developers
// contributed by Joshua Landau

use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;


const DIGITS: [&'static str; 10] = [
   "zero", "one", "two", "three", "four",
   "five", "six", "seven", "eight", "nine",
];

fn wordy_num(num: usize) -> String {
   let mut out = String::new();
   for char in num.to_string().chars() {
      out.push_str(" ");
      out.push_str(DIGITS[char.to_digit(10).unwrap() as usize]);
   }
   out
}


#[derive(Clone, Copy)]
#[repr(u8)]
enum Color {
   Red = 0,
   Yellow = 1,
   Blue = 2,
}

impl Color {
   fn show(&self) -> &'static str {
      use Color::*;
      match *self {
         Red => "red",
         Yellow => "yellow",
         Blue => "blue",
      }
   }
}

fn complement_color(left: Color, right: Color) -> Color {
   use Color::*;
   match ((left as u8) << 2) | (right as u8) {
      0b_00_00 => Red,
      0b_00_01 => Blue,
      0b_00_10 => Yellow,
      0b_01_00 => Blue,
      0b_01_01 => Yellow,
      0b_01_10 => Red,
      0b_10_00 => Yellow,
      0b_10_01 => Red,
      _      => Blue,
   }
}


#[derive(Default)]
struct AtomicColor(AtomicUsize);

impl AtomicColor {
   fn load(&self, order: Ordering) -> Color {
      use Color::*;
      match self.0.load(order) {
         0 => Red,
         1 => Yellow,
         _ => Blue,
      }
   }

   fn store(&self, color: Color, order: Ordering) {
      self.0.store(color as usize, order)
   }
}


// Each Chameneos is atomic to allow safe, fast
// parallel thread access. Unfortunately this
// this is a bit wordy, but it works out OK.
#[derive(Default)]
struct ChameneosState {
   name: u8,
   color: AtomicColor,
   meet_count: AtomicUsize,
   meet_same_count: AtomicUsize,
}

impl ChameneosState {
   fn name(&self) -> u8 {
      self.name
   }

   fn color(&self) -> Color {
      self.color.load(Ordering::Acquire)
   }

   fn meet(&self, same: bool, color: Color) {
      let new = self.meet_count.load(Ordering::Acquire) + 1;
      self.meet_count.store(new, Ordering::Release);
      if same {
         let new = self.meet_same_count.load(Ordering::Acquire) + 1;
         self.meet_same_count.store(new, Ordering::Release);
      }
      self.color.store(color, Ordering::Release);
   }
}


#[derive(Copy, Clone)]
struct Chameneos {
   idx: u32
}

impl Chameneos {
   fn is_valid(&self) -> bool {
      self.idx != 0
   }

   fn get<'st>(&self, shared: &'st Shared) -> &'st ChameneosState {
      &shared.states[(self.idx & BLOCK) as usize]
   }
}


struct Shared {
   // We can only store 12 due to overflow
   // when the maximum number of pairs are
   // created, so no need for a large buffer.
   // Using 16 avoids bounds checks.
   states: [ChameneosState; 16],
   // Bottom block is mall, rest are queue slots
   atomic_queue: AtomicUsize,
   meetings_had: AtomicUsize,
   meetings_limit: usize,
}

impl Shared {
   fn null_task(&self) -> Chameneos {
      self.task_at(0)
   }

   fn task_at(&self, idx: u32) -> Chameneos {
      Chameneos { idx: idx }
   }

   fn load(&self, order: Ordering) -> u32 {
      self.atomic_queue.load(order) as u32
   }

   fn store(&self, val: u32, order: Ordering) {
      self.atomic_queue.store(val as usize, order)
   }

   fn compare_and_swap(&self, current: u32, new: u32, order: Ordering) -> u32 {
      self.atomic_queue.compare_and_swap(current as usize, new as usize, order) as u32
   }
}


const BLOCK: u32 = 0b1111;
const BLOCK_LEN: u32 = 4;
const QUEUE_LEN: u32 = 32 / BLOCK_LEN;
const QUEUE_STOPPED: u32 = !0;

struct State {
   cache: u32,
}

impl State {
   fn new(shared: &Shared) -> State {
      State { cache: shared.load(Ordering::SeqCst) }
   }

   fn run(&mut self, shared: &Shared) -> Option<(TransactionalQueue, Chameneos, u8)> {
      let cache = &mut self.cache;

      if *cache == QUEUE_STOPPED {
         None
      }
      else {
         let mut queue = TransactionalQueue {
            set_state: *cache,
            cache: cache
         };
         let count = queue.take_count();
         let mall = queue.take(shared);
         Some((queue, mall, count))
      }
   }

   fn register_meeting(&mut self, shared: &Shared, count: u8) -> bool {
      if count != 0 { return true; }

      let meetings_had = shared.meetings_had.fetch_add(1, Ordering::Acquire);
      if meetings_had < shared.meetings_limit {
         shared.atomic_queue.fetch_add(0b11111111, Ordering::AcqRel);
         return true;
      }
      // Oops, we couldn't actually do that
      shared.store(QUEUE_STOPPED, Ordering::SeqCst);
      false
   }
}


struct TransactionalQueue<'a> {
   set_state: u32,
   cache: &'a mut u32,
}

impl<'a> TransactionalQueue<'a> {
   fn submit(mut self, shared: &Shared, mall: Chameneos, count: u8) -> bool {
      self.set_state <<= BLOCK_LEN;
      self.set_state |= mall.idx;

      self.set_state <<= 8;
      self.set_state |= count as u32;

      let actual = shared.compare_and_swap(
         *self.cache,    // expected current value
         self.set_state,  // wanted value
         Ordering::Release
      );

      let worked = actual == *self.cache;
      *self.cache = if worked { self.set_state } else { actual };
      worked
   }

   fn cancel(&mut self, shared: &Shared) {
      *self.cache = shared.load(Ordering::Relaxed);
   }

   fn take_count(&mut self) -> u8 {
      let ret = (self.set_state & 0b11111111) as u8;
      self.set_state >>= 8;
      ret
   }

   fn take(&mut self, shared: &Shared) -> Chameneos {
      let ret = self.set_state & BLOCK;
      self.set_state >>= BLOCK_LEN;
      shared.task_at(ret)
   }

   fn put(&mut self, first: Chameneos, second: Chameneos) {
      let zeros = self.set_state.leading_zeros();
      let shift = (QUEUE_LEN - (zeros / BLOCK_LEN)) * BLOCK_LEN;
      self.set_state |= ((first.idx << BLOCK_LEN) | second.idx) << shift;
   }
}


// Runs threads from the shared thread pool.
// Uses optimistic concurrency to queue and
// deque threads, as well as to take from the
// mall.
fn thread_executor(mut task: Chameneos, shared: &Shared) {
   let mut state = State::new(shared);
   let mut mall = shared.null_task();

   loop {
      let mut task_tmp = task;
      let mut mall_tmp = mall;
      let mut count;

      {
         let (mut queue, new_mall, new_count) = match state.run(shared) {
            Some(x) => x,
            None => return,
         };
         count = new_count;

         if mall_tmp.is_valid() {
            queue.put(task_tmp, mall_tmp);
            task_tmp = queue.take(shared);
         }
         else if !task_tmp.is_valid() {
            task_tmp = queue.take(shared);
            if !task_tmp.is_valid() {
               std::thread::sleep_ms(1);
               queue.cancel(shared);
               continue;
            }
         }

         mall_tmp = new_mall;
         if !mall_tmp.is_valid() {
            let replacement_task = queue.take(shared);
            if queue.submit(shared, task_tmp, count) {
               task = replacement_task;
               mall = shared.null_task();
            }
            continue;
         }

         if count == 0 {
            queue.cancel(shared);
            continue;
         }

         count -= 1;

         if !queue.submit(shared, shared.null_task(), count) {
            continue;
         }
         task = task_tmp;
         mall = mall_tmp;
      }

      if !state.register_meeting(shared, count) { return; }

      let actor_ref = task_tmp.get(shared);
      let mall_ref = mall.get(shared);

      let same = actor_ref.name() == mall_ref.name();
      let new_color = complement_color(actor_ref.color(), mall_ref.color());

      actor_ref.meet(same, new_color);
      mall_ref.meet(same, new_color);
   }
}


fn run_for(meetings_limit: usize, colors: &[Color]) -> Vec<(usize, usize)> {
   let num_threads = colors.len();

   let x = || Default::default();
   let mut states: [ChameneosState; 16] = [
      x(), x(), x(), x(), x(), x(), x(), x(),
      x(), x(), x(), x(), x(), x(), x(), x(),
   ];

   for (i, &color) in colors.iter().enumerate() {
      let idx = i + 1;
      let chameneos_state = &mut states[idx];
      chameneos_state.name = idx as u8;
      chameneos_state.color.store(color, Ordering::Release);
   }

   let shared = Arc::new(Shared {
      atomic_queue: AtomicUsize::new((meetings_limit % 255) + 1),
      meetings_had: AtomicUsize::new(0),
      meetings_limit: meetings_limit / 255,
      states: states,
   });

   let threads: Vec<_> = (0..num_threads).map(|i| {
      let task = shared.task_at((i + 1) as u32);
      let shared = shared.clone();
      thread::spawn(move || thread_executor(task, &shared))
   }).collect();

   for thread in threads {
      thread.join().unwrap();
   }

   let output = &shared.states[..];

   output[1..colors.len() + 1].iter().map(|ch| (
      ch.meet_count.load(Ordering::SeqCst),
      ch.meet_same_count.load(Ordering::SeqCst),
   )).collect()
}


fn main() {
   use Color::*;
   let small = [Blue, Red, Yellow];
   let large = [Blue, Red, Yellow, Red, Yellow, Blue, Red, Yellow, Red, Blue];

   let num_meetings = std::env::args_os().nth(1)
      .and_then(|s| s.into_string().ok())
      .and_then(|n| n.parse().ok())
      .unwrap_or(600);

   let colors = [Blue, Red, Yellow];
   for &left in &colors {
      for &right in &colors {
         let complement = complement_color(left, right);
         println!("{} + {} -> {}", left.show(), right.show(), complement.show());
      }
   }

   let threads: Vec<(&[_], _)> = vec![
      (&small, thread::spawn(move || run_for(num_meetings, &small))),
      (&large, thread::spawn(move || run_for(num_meetings, &large))),
   ];

   for (colors, thread) in threads {
      println!("");

      for color in colors { print!(" {}", color.show()); }
      println!("");

      let mut total_count = 0;
      for (meet_count, meet_same_count) in thread.join().unwrap() {
         println!("{}{}", meet_count, wordy_num(meet_same_count));
         total_count += meet_count;
      }

      println!("{}", wordy_num(total_count));
   }

   println!("");
}
    

notes, command-line, and program output

NOTES:
64-bit Ubuntu quad core
rustc 1.25.0 (84203cac6 2018-03-25)


Thu, 29 Mar 2018 16:42:40 GMT

MAKE:
/opt/src/rust-1.25.0/bin/rustc -C opt-level=3 -C target-cpu=core2 -C lto -C codegen-units=1  chameneosredux.rs -o chameneosredux.rust-2.rust_run
warning: use of deprecated item 'std::thread::sleep_ms': replaced by `std::thread::sleep`
   --> chameneosredux.rs:285:16
    |
285 |                std::thread::sleep_ms(1);
    |                ^^^^^^^^^^^^^^^^^^^^^
    |
    = note: #[warn(deprecated)] on by default


6.28s to complete and log all make actions

COMMAND LINE:
./chameneosredux.rust-2.rust_run 6000000

PROGRAM OUTPUT:
blue + blue -> blue
blue + red -> yellow
blue + yellow -> red
red + blue -> yellow
red + red -> red
red + yellow -> blue
yellow + blue -> red
yellow + red -> blue
yellow + yellow -> yellow

 blue red yellow
3495974 zero
4557100 zero
3946926 zero
 one two zero zero zero zero zero zero

 blue red yellow red yellow blue red yellow red blue
1067771 zero
1250874 zero
1200757 zero
1229321 zero
1128570 zero
1245064 zero
1159177 zero
1252430 zero
1178023 zero
1288013 zero
 one two zero zero zero zero zero zero