lockfree data structure - qless / slimqless

Coordinator
Mar 12, 2014 at 8:37 AM
there are several lockfree queue implementations in the world, and most of them contain several limitations, while i am considering for an implementation without any limitations to support a quicker threadpool.
to support the threadpool, i need at least two operations,
void push(T); to add a new data into the queue, never fail, i do not consider memory limitation for it yet.
bool pop(T&); to remove and return the first data in the queue, return true if there is one, means the queue is not empty, otherwise false.
for a common queue, i also support long size(); to return the count of the data in the queue. bool empty(); whether the queue is empty. void clear(); to clear the queue. but this implementation does not optimize for these three functions.

after several days work, i have finished one.
briefly there are two dummy nodes, the head and the tail.
the head node will never change, but only change the next pointer.
the tail node is for appending data into the queue.
when pushing, threads will race to mark the tail node A as 'writing' status, the one thread winning the race-condition continually update the tail node to the new one B to release other pushing threads, then write the data to the node A, and mark it as 'written' status.
when poping, threads will race to update the next pointer of head node to the one after the next node of head, the one thread winning the race-condition continually spin-waits the node to be in 'written' status, and finish poping operation. if the next pointer of head node actually point to the tail node, the poping operation failed.
i.e.
head -> tail, return false
head -> A -> tail, if the thread succeeded to swap the next pointer of head from A to tail, it will return true, otherwise false.
head -> A -> B -> tail, if the thread succeeded to swap the next pointer of head from A to B, it will return true, with the value in A. another thread may failed to swap from A to B, but succeeded to swap from B to tail, it will also return true, with the value in B.

following the code with comments, for the funcationality of Interlocked.CompareExchange, Interlocked.Increment, Interlocked.VolatileRead / VolatileWrite, please refer to msdn.
atomic.eva == Interlocked.VolatileWrite in amd64, or Interlocked.Exchange in x86
_istype.is_cloneable is to decide whether the type implements ICloneable
copy_clone is to call the ICloneable.Clone to make a clone of the data instead of reference
k_assert is for assertion
loops_per_yield is decided by processor <single core> performance, usually it's 6000 - 8000 for common processor.
force_yield() == sleep(0), but will return true only if current thread actually slept over 1/8 timeslice length.

source code is at
https://geminibranch.codeplex.com/SourceControl/latest#osi/root/formation/threadsafe/slimqless2.vb
and
https://geminibranch.codeplex.com/SourceControl/latest#osi/root/formation/threadsafe/qless2.vb
Coordinator
Mar 12, 2014 at 8:38 AM
Edited Mar 12, 2014 at 8:39 AM
source code sample with comments,
Imports System.Threading
Imports osi.root.lock
Imports osi.root.connector
Imports osi.root.envs

Public Class qless2(Of T)
    Implements iqless(Of T)

    Private Shared ReadOnly cloneable As Boolean

    Shared Sub New()
        Dim i As Int64 = loops_per_yield
        cloneable = _istype.is_cloneable(Of T)()
    End Sub

'each node has a value_status to decide whether it can be written / in writting / written == can read.
    Private Structure value_status
'nv == no value, means it can be written
        Private Const nv As Int32 = 0
'bw == before write, means it cannot be written by another thread, but the data is not ready to be read also
        Private Const bw As Int32 = 1
'aw == after write, the data is ready to be read
        Private Const aw As Int32 = 2

'the actual value of the value_status
        Private v As Int32

'make sure the default value of Int32 is nv / no value
        Shared Sub New()
            k_assert([default](Of Int32)() = nv)
        End Sub

'use CAS operation to decide whether current thread wins in the race-condition to write in the node
        Public Function mark_value_writting() As Boolean
            Return Interlocked.CompareExchange(v, bw, nv) = nv
        End Function

'after calling this function, the data is ready to read
        Public Sub mark_value_written()
#If DEBUG Then
            k_assert(v = bw)
#End If
            atomic.eva(v, aw)
        End Sub

'whether the data is ready to read, for pop function
        Public Function value_written() As Boolean
#If DEBUG Then
            k_assert(not_no_value())
#End If
            Return v = aw
        End Function

'just for debug assert, whether the node is not in no value status
        Public Function not_no_value() As Boolean
            Return v <> nv
        End Function
    End Structure

'a node contains a value field <v>, a next pointer <[next], null as default, but null is only for the tail node>, a value status field <vs> as described above
    Private Class node
        Public [next] As node
        Public v As T
        Public vs As value_status
    End Class

'the head node <f> would never change, so it's readonly
    Private ReadOnly f As node
'the tail node <e>, any new data will be written into this node, and the e reference will be changed. which means after each push operation, the e will always point to the tail node of the queue.
    Private e As node
'to support long size(); function
    Private _size As Int64 = 0

'the initialize, the next of head is tail, so the queue is empty, and the initial _size is 0
    Public Sub New()
        f = New node()
        e = New node()
        f.next = e
        _size = 0
    End Sub

    Public Function size() As Int64 Implements iqless(Of T).size
        Return _size
    End Function

    Public Function empty() As Boolean Implements iqless(Of T).empty
        Return size() = 0
    End Function

'i did not figure out a clear solution for void clear(); function, so just pop everything out to avoid to break internal status.
    Public Sub clear() Implements iqless(Of T).clear
        While Not pop() Is Nothing
        End While
    End Sub

'the push function
    Public Sub push(ByVal v As T) Implements iqless(Of T).push
'create a new node, which will be the new tail node.
        Dim ne As node = Nothing
        ne = New node()
        Dim i As Int32 = 0
'if current succeeded to mark the current tail node to 'writing' status, it will go on with the following logic.
        While Not e.vs.mark_value_writting()
'here is an optimization, since we cannot confirm whether there are some other services / threads eating processor resource, and blocking the thread which has actually marked the tail node to 'writing' status. <i.e. the thread marked 'writing', but do not have processor resource to go on and set new tail node> continually looping helps nothing in this status, so just yield to release the processor resource.
'but briefly, this logic can be removed with only
'while not e.vs.mark_value_writting()
'end while
            i += 1
            If i >= loops_per_yield Then
                If force_yield() Then
                    i = 0
                Else
                    i = loops_per_yield
                End If
            End If
        End While
'to record current tail node, since we should update the e reference as soon as possible.
        Dim n As node = Nothing
'the ordering of following three lines is somehow important. <n = e & n.next = ne can be swapped to e.next = ne & n = e>
        n = e
        n.next = ne
'just update the e as soon as possible, and atomic.eva / Interlocked.VolatileWrite or Interlocked.Exchange can help to do memory barrier to avoid other threads to loop in the 'while' above with no updated e reference
        atomic.eva(e, ne)
'we can somehow waste a little bit time now, since other threads are released, so to make this template container common, make a clone for ICloneable is better
        If cloneable Then
            copy_clone(n.v, v)
        Else
            n.v = v
        End If
'update the _size with atomic operation
        Interlocked.Increment(_size)
'all the value writing operations are finished, we can mark the node to 'written' status and release pop function
        n.vs.mark_value_written()
    End Sub

    Public Function pop(ByRef o As T) As Boolean Implements iqless(Of T).pop
'indeed it's not 'new front', but the one after head node
        Dim nf As node = Nothing
'it should always be the next node of head node
        nf = f.next
        While True
#If DEBUG Then
            k_assert(Not nf Is Nothing)
#End If
'the head is directly linking to tail node, so no more node can be pop, just return false
            If nf Is e Then
                Return False
'if the next of head node is still what we have got before or in last loop, try to swap it to the next of nf
'since nf is always a valid node, i.e. not tail node
            ElseIf Interlocked.CompareExchange(f.next, nf.next, nf) Is nf Then
'just make sure we did not make any mistake
#If DEBUG Then
                k_assert(nf.vs.not_no_value())
#End If
'same optimization as push function, but somehow since the data writting logic in push function is heavy, properly we should consider to reduce the loops_per_yield in the following logic
                Dim i As Int32 = 0
                While Not nf.vs.value_written()
                    i += 1
                    If i >= loops_per_yield Then
                        If force_yield() Then
                            i = 0
                        Else
                            i = loops_per_yield
                        End If
                    End If
                End While
#If DEBUG Then
'it's safe, since the _size will be increased before the nf has been marked to 'written' status
                k_assert(Interlocked.Decrement(_size) >= 0)
#Else
'but in release build, do not waste any time
                Interlocked.Decrement(_size)
#End If
'succeeded
                o = nf.v
                Return True
            Else
'if nf is not tail, but this thread has failed in the race-condition to pop it, just try again with the new node after head
                nf = f.next
            End If
        End While
'do not have any situation to go to this line
        Return k_assert(False)
    End Function

'another overload of pop function, which returns nothing if the queue is empty already
    Public Function pop() As T Implements iqless(Of T).pop
        Dim o As T = Nothing
        If pop(o) Then
            Return o
        Else
            Return Nothing
        End If
    End Function
End Class
Coordinator
Mar 12, 2014 at 8:40 AM
the souce code above is a little bit different than current implementation, since i have separated qless2 with slimqless2, slimqless2 has less functions, but better performance.

for the performance,
there is another simple thread-safe queue, called qless, by using a lock to update the pointers. compare it with this implementation.
i have tried several different concurrence widths,
processor count, this is a typical usage for the implementation, since for the best performance, we should only have same thread count as processor count.
processor count * 2,
processor count * 4,
50,
128,
and i have tried on two different machines,
thinkpad t60, with intel core duo t2600, two cores, 2.16GHz, 3G memory <the impact from memory is pretty low, but just as reference>, windows 2003 enterprise, .net 3.5
a virtual machine in windows azure, with amd opteron 4171HE * 2 = 8 cores and 14G memory, windows 2008 r2 64 bit, .net 3.5.1

the test is pretty straight forward, 60% of the operations are 'pop', while the other 40% are 'push', the selection is based on a random number, so i cannot repro any operation sequence.
and i have reduced the round times when concurrence width increased, so the overall count of operations <16000000> are the same in the following tests.

on t60, the performance of this lock free queue <qless2>, and lock based queue <qless> are pretty similar, <the unit of second field is milliseconds>
qless_perf_processor_count 6422
qless_perf_two_processor_count 6438
qless_perf_four_processor_count 6453
qless_perf_fifty 9938
qless_perf_128 10844
qless2_perf_processor_count 6625
qless2_perf_two_processor_count 6922
qless2_perf_four_processor_count 7547
qless2_perf_fifty 9219
qless2_perf_128 11609
because of the leak of processor resource, the perf of qless2 is slightly lower than qless <consider to change the loops_per_yield would help also, but i have not verified>
but overall, the perf is on par.

on 8 cores machine, the lock free implementation has a significant advantage. <at least half of the time used.>
qless_perf_processor_count 27067
qless_perf_two_processor_count 28085
qless_perf_four_processor_count 30502
qless_perf_fifty 32779
qless_perf_128 44533
qless2_perf_processor_count 9862
qless2_perf_two_processor_count 8737
qless2_perf_four_processor_count 11622
qless2_perf_fifty 13201
qless2_perf_128 13411

and comparing with the results from t60, the lock free implementation has only slightly impact when concurrence width increased.