Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion communication/src/allocator/zero_copy/push_pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl<T, P: BytesPush> Pusher<T, P> {
impl<T: Bytesable, P: BytesPush> Push<T> for Pusher<T, P> {
#[inline]
fn push(&mut self, element: &mut Option<T>) {
if let Some(ref mut element) = *element {
if let Some(ref element) = *element {

// determine byte lengths and build header.
let mut header = self.header;
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/generic/handles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl<T: Timestamp, C: Accountable, P: Pull<Message<T, C>>> InputHandleCore<T, C,
let data = std::mem::take(data);
self.staging.push_back((cap, data));
}
self.staging.make_contiguous().sort_by(|x,y| x.0.time().cmp(&y.0.time()));
self.staging.make_contiguous().sort_unstable_by(|x,y| x.0.time().cmp(&y.0.time()));

while let Some((cap, data)) = self.staging.pop_front() {
self.staged.push(data);
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/generic/notificator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ impl<T: Timestamp> FrontierNotificator<T> {

if !self.pending.is_empty() {

self.pending.sort_by(|x,y| x.0.time().cmp(y.0.time()));
self.pending.sort_unstable_by(|x,y| x.0.time().cmp(y.0.time()));
for i in 0 .. self.pending.len() - 1 {
if self.pending[i].0.time() == self.pending[i+1].0.time() {
self.pending[i+1].1 += self.pending[i].1;
Expand Down
4 changes: 2 additions & 2 deletions timely/src/progress/change_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ impl<T, const X: usize> ChangeBatch<T, X>
where
T: Ord,
{

/// Allocates a new `ChangeBatch` with a single entry.
///
/// # Examples
Expand Down Expand Up @@ -291,7 +291,7 @@ where
#[inline]
pub fn compact(&mut self) {
if self.clean < self.updates.len() && self.updates.len() > 1 {
self.updates.sort_by(|x,y| x.0.cmp(&y.0));
self.updates.sort_unstable_by(|x,y| x.0.cmp(&y.0));
for i in 0 .. self.updates.len() - 1 {
if self.updates[i].0 == self.updates[i+1].0 {
self.updates[i+1].1 += self.updates[i].1;
Expand Down
4 changes: 2 additions & 2 deletions timely/src/progress/subgraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@
// will depend on child summaries and capabilities, as well as edges in the subgraph.

// perhaps first check that the children are sanely identified
self.children.sort_by(|x,y| x.index.cmp(&y.index));
self.children.sort_unstable_by(|x,y| x.index.cmp(&y.index));
assert!(self.children.iter().enumerate().all(|(i,x)| i == x.index));

let inputs = self.input_messages.len();
Expand Down Expand Up @@ -487,7 +487,7 @@
}

// Consider scheduling each recipient of progress information to shut down.
self.maybe_shutdown.sort();
self.maybe_shutdown.sort_unstable();
self.maybe_shutdown.dedup();
for child_index in self.maybe_shutdown.drain(..) {
let child_state = self.pointstamp_tracker.node_state(child_index);
Expand Down Expand Up @@ -784,7 +784,7 @@
for (time, diff) in internal.iter() {
if *diff > 0 {
let consumed = shared_progress.consumeds.iter_mut().any(|x| x.iter().any(|(t,d)| *d > 0 && t.less_equal(time)));
let internal = child_state.sources[output].implications.less_equal(time);

Check warning on line 787 in timely/src/progress/subgraph.rs

View workflow job for this annotation

GitHub Actions / Cargo clippy

`internal` shadows a previous, unrelated binding
if !consumed && !internal {
println!("Increment at {:?}, not supported by\n\tconsumed: {:?}\n\tinternal: {:?}", time, shared_progress.consumeds, child_state.sources[output].implications);
panic!("Progress error; internal {:?}", self.name);
Expand Down
4 changes: 2 additions & 2 deletions timely/src/scheduling/activate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl Activations {
let moment = timer.elapsed() + delay;
self.queue.push(Reverse((moment, path.to_vec())));
}
}
}
else {
self.activate(path);
}
Expand Down Expand Up @@ -115,7 +115,7 @@ impl Activations {

{ // Scoped, to allow borrow to drop.
let slices = &self.slices[..];
self.bounds.sort_by_key(|x| &slices[x.0 .. (x.0 + x.1)]);
self.bounds.sort_unstable_by_key(|x| &slices[x.0 .. (x.0 + x.1)]);
self.bounds.dedup_by_key(|x| &slices[x.0 .. (x.0 + x.1)]);
}

Expand Down
2 changes: 1 addition & 1 deletion timely/src/synchronization/sequence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ impl<T: ExchangeData+Clone> Sequencer<T> {
}
});

recvd.sort_by(|x,y| x.0.cmp(&y.0));
recvd.sort_unstable_by(|x,y| x.0.cmp(&y.0));

if let Some(last) = recvd.last() {
let mut activator_borrow = activator_sink.borrow_mut();
Expand Down
Loading