Devnotes > Channel with infinite buffer in go
In Golang we have buffered and unbuffered channels. Buffered channels can hold an x amount of values "in transit" before the channel is blocked from writing. It is necessary to read a value from the other end first, to "make room", before more can be put in. That amount x has to be specified and is limited.
Sometimes one wants the readers and the writers of the cannel to be completely independent from each other, so it is always possible to write, regardless of the speed of reading. For that to happen with a buffered channel, x would have to be very, very large. This is undesirabele.
Jon Bodner described a much better solution in this article on Medium, that uses two channels with a slice in between. There are some tricky details, so go read it before you copy and paste the final result shown below. I save it here to make sure it stays up and that I can find it again in a few years from now. Also, the text below is actual text that you can copy and paste.
One does not need to use an empty interface like he did, of course. In general channels have a type as you know what will flow through it beforehand. Here we will pretend that the channel is of type *Thing
and that Thing
has an id of type string
that we can get by calling Thing.Id()
. Because that happens to be the usecase I have in front of me right now.
Also, there needs to be some locking to make it thread safe.
package thing
type Queue struct {
inQueue []*Thing
in chan *Thing
out chan *Thing
}
func NewQueue() *Queue {
q := &Queue{
inQueue: []*Thing{},
in: make(chan *Thing),
out: make(chan *Thing),
}
go func() {
outCh := func() chan *Thing {
if q.Count() == 0 {
return nil
}
return q.out
}
cur := func() *Thing {
if q.Count() == 0 {
return nil
}
return q.inQueue[0]
}
for len(q.inQueue) > 0 || q.in != nil {
select {
case oc, ok := <-q.in:
if !ok {
q.in = nil
} else {
q.Append(oc)
}
case outCh() <- cur():
if q.out != nil {
q.Unshift()
}
}
}
close(q.out)
}()
return q
}
func (q *Queue) In() chan *Thing {
return q.in
}
func (q *Queue) Out() chan *Thing {
return q.out
}
func (q *Queue) Close() {
close(q.in)
}
func (q *Queue) Append(oc *Thing) {
lock.Lock()
defer lock.Unlock()
q.inQueue = append(q.inQueue, oc)
}
func (q *Queue) Unshift() {
lock.Lock()
defer lock.Unlock()
q.inQueue = q.inQueue[1:]
}
func (q *Queue) Count() int {
lock.RLock()
defer lock.RUnlock()
return len(q.inQueue)
}
And the corresponding tests:
package thing_test
func TestNewQueue(t *testing.T) {
count := 10
idFormat := "id-%d"
for _, tc := range []struct {
name string
writeDelay bool
readDelay bool
}{
{
name: "no delay",
},
{
name: "slow write",
writeDelay: true,
},
{
name: "slow read",
readDelay: true,
},
{
name: "slow read and write",
writeDelay: true,
readDelay: true,
},
} {
t.Run(tc.name, func(t *testing.T) {
queue := thing.NewQueue()
var lastIntId int
lastId := fmt.Sprintf(idFormat, lastIntId)
var wg sync.WaitGroup
wg.Add(1)
go func() {
for o := range queue.Out() {
if tc.readDelay {
time.Sleep(50 * time.Millisecond)
}
lastIntId += 1
lastId = fmt.Sprintf(idFormat, lastIntId)
test.Equals(t, lastId, o.CaseId())
}
wg.Done()
}()
for i := 1; i <= count; i++ {
if tc.writeDelay {
time.Sleep(50 * time.Millisecond)
}
queue.In() <- newThing(fmt.Sprintf(idFormat, i))
}
queue.Close()
wg.Wait()
test.Equals(t, fmt.Sprintf(idFormat, count), lastId)
})
}
}
func TestQueueCount(t *testing.T) {
for _, tc := range []struct {
name string
count int
}{
{
name: "empty",
},
{
name: "one",
count: 1,
},
{
name: "two",
count: 2,
},
{
name: "many",
count: 50,
},
} {
t.Run(tc.name, func(t *testing.T) {
queue := thing.NewQueue()
for i := 0; i < tc.count; i++ {
queue.In() <- newThing(fmt.Sprintf("id-%d", i))
}
queue.Close()
time.Sleep(50 * time.Millisecond)
test.Equals(t, tc.count, queue.Count())
})
}
}