class KvsReplica_Rewrite include Bud state do channel :del_chn, [:@addr, :id] => [:del_id] channel :del_chn_ack, [:@rce_sender, :addr, :id] range :del_chn_approx, [:addr, :id] scratch :del_del_log_r1, [:id] => [:del_id] scratch :del_del_log_r4, [:id] => [:del_id] scratch :del_ins_log_r0, [:id] => [:key, :val] scratch :del_ins_log_r4, [:id] => [:key, :val] table :del_log, [:id] => [:del_id] scratch :del_node_r0, [:addr] scratch :del_node_r1, [:addr] channel :ins_chn, [:@addr, :id] => [:key, :val] channel :ins_chn_ack, [:@rce_sender, :addr, :id] range :ins_chn_approx, [:addr, :id] table :ins_log, [:id] => [:key, :val] range :ins_log_keys, [:id] sealed :node, [:addr] scratch :r0_node_ins_log_joinbuf, [:node_addr, :ins_log_id, :ins_log_key, :ins_log_val] scratch :r0_node_ins_log_missing, [:node_addr, :ins_log_id, :ins_log_key, :ins_log_val] scratch :r1_node_del_log_joinbuf, [:node_addr, :del_log_id, :del_log_del_id] scratch :r1_node_del_log_missing, [:node_addr, :del_log_id, :del_log_del_id] range :seal_del_log, [:ignored] range :seal_ins_log, [:ignored] range :seal_node, [:ignored] scratch :view, [:id] => [:key, :val] end bloom do (del_chn < (node * del_log).pairs { |n, l| (n + l) }.notin(del_chn_approx, 0 => :addr, 1 => :id).~) (del_log < -(del_del_log_r1 * del_del_log_r4).matches { |t0, t1| t0 }) (ins_chn < (node * ins_log).pairs { |n, l| (n + l) }.notin(ins_chn_approx, 0 => :addr, 1 => :id).~) (ins_log < -(del_ins_log_r0 * del_ins_log_r4).matches { |t0, t1| t0 }) (ins_log <= ins_chn.payloads.notin(ins_log_keys, 0 => :id)) (node < -(del_node_r0 * del_node_r1).matches { |t0, t1| t0 }) del_chn_ack <~ del_chn {|c| [c.source_addr, c.addr, c.id]} del_chn_approx <= (del_chn_ack.payloads) del_del_log_r1 <= (del_log * seal_node).lefts.notin(r1_node_del_log_missing, :id => :del_log_id, :del_id => :del_log_del_id) del_del_log_r4 <= ((del_log * ins_log_keys).lefts(:del_id => :id)).notin(ins_log, :del_id => :id) del_ins_log_r0 <= (ins_log * seal_node).lefts.notin(r0_node_ins_log_missing, :id => :ins_log_id, :key => :ins_log_key, :val => :ins_log_val) del_ins_log_r4 <= (ins_log * del_log).lefts(:id => :del_id) del_log <= (del_chn.payloads) del_node_r0 <= (node * seal_ins_log).lefts.notin(r0_node_ins_log_missing, :addr => :node_addr) del_node_r1 <= (node * seal_del_log).lefts.notin(r1_node_del_log_missing, :addr => :node_addr) ins_chn_ack <~ ins_chn {|c| [c.source_addr, c.addr, c.id]} ins_chn_approx <= (ins_chn_ack.payloads) ins_log_keys <+ ins_log {|r| [r.id]} r0_node_ins_log_joinbuf <= (node * ins_log * ins_chn_approx).combos(ins_chn_approx.addr => node.addr, ins_chn_approx.id => ins_log.id) {|x,y,z| x + y} r0_node_ins_log_missing <= (node * ins_log).pairs {|x,y| x + y}.notin(r0_node_ins_log_joinbuf) r1_node_del_log_joinbuf <= (node * del_log * del_chn_approx).combos(del_chn_approx.addr => node.addr, del_chn_approx.id => del_log.id) {|x,y,z| x + y} r1_node_del_log_missing <= (node * del_log).pairs {|x,y| x + y}.notin(r1_node_del_log_joinbuf) view <= (ins_log.notin(del_log, :id => :del_id)) end end