MongoDB from Go

Roger Peppe

Canonical Ltd

MongoDB

Mgo driver

Insert a record

package main

import (
	"gopkg.in/mgo.v2"

	"log"
	"time"
)

type person struct {
	Name       string `bson:"_id"`
	Status     string
	StatusTime time.Time
}

func main() {
    session, err := mgo.Dial("localhost:27017")
    if err != nil {
        log.Fatalf("cannot dial mongoDB: %v", err)
    }
    db := session.DB("mongodbtalk")
    collection := db.C("people")
    p := person{
        Name:       "Bob",
        Status:     "bored",
        StatusTime: time.Now(),
    }
    if err := collection.Insert(p); err != nil {
        log.Fatalf("cannot insert document: %v", err)
    }
    log.Printf("added %s", p.Name)
}

Fetch a single record

package main

import (
	"github.com/kr/pretty"
	"log"
	"time"

	"gopkg.in/mgo.v2"
)

type person struct {
	Name       string `bson:"_id"`
	Status     string
	StatusTime time.Time
}

func main() {
	session, err := mgo.Dial("localhost:27017")
	if err != nil {
		log.Fatalf("cannot dial mongoDB: %v", err)
	}
	db := session.DB("mongodbtalk")
	collection := db.C("people")

    var p person
    if err := collection.Find(nil).One(&p); err != nil {
        log.Fatalf("cannot get one person: %v", err)
    }
    pretty.Println(p)
}

Fetch all at once

package main

import (
	"github.com/kr/pretty"
	"log"
	"time"

	"gopkg.in/mgo.v2"
)

type person struct {
	Name       string `bson:"_id"`
	Status     string
	StatusTime time.Time
}

func main() {
	session, err := mgo.Dial("localhost:27017")
	if err != nil {
		log.Fatalf("cannot dial mongoDB: %v", err)
	}
	db := session.DB("mongodbtalk")
	collection := db.C("people")

    var people []person
    if err := collection.Find(nil).All(&people); err != nil {
        log.Fatalf("cannot get all people: %v", err)
    }
    pretty.Println(people)
}

Fetch many records space-efficiently

package main

import (
	"fmt"
	"log"
	"time"

	"gopkg.in/mgo.v2"
)

type person struct {
	Name       string `bson:"_id"`
	Status     string
	StatusTime time.Time
}

func main() {
	session, err := mgo.Dial("localhost:27017")
	if err != nil {
		log.Fatalf("cannot dial mongoDB: %v", err)
	}
	db := session.DB("mongodbtalk")
	collection := db.C("people")

    iter := collection.Find(nil).Iter()
    var p person
    for iter.Next(&p) {
        fmt.Printf("%s is %s at %v\n", p.Name, p.Status, p.StatusTime)
    }
    if err := iter.Err(); err != nil {
        log.Fatalf("iteration error: %v", err)
    }
}

Data model

package main

import (
	"fmt"
	"log"
	"time"

	"gopkg.in/mgo.v2"
	"gopkg.in/mgo.v2/bson"
)

type person struct {
	Name       string `bson:"_id"`
	Status     string
	StatusTime time.Time
}

func main() {
	session, err := mgo.Dial("localhost:27017")
	if err != nil {
		log.Fatalf("cannot dial mongoDB: %v", err)
	}
	db := session.DB("mongodbtalk")
	collection := db.C("people")

    var p bson.Raw
    if err := collection.Find(nil).One(&p); err != nil {
        log.Fatalf("cannot get person: %v", err)
    }
    fmt.Printf("%q\n", p.Data)
}

Data model

package main

import (
	"log"
	"time"

	"github.com/kr/pretty"
	"gopkg.in/mgo.v2"
)

type person struct {
	Name       string `bson:"_id"`
	Status     string
	StatusTime time.Time
}

func main() {
	session, err := mgo.Dial("localhost:27017")
	if err != nil {
		log.Fatalf("cannot dial mongoDB: %v", err)
	}
	db := session.DB("mongodbtalk")
	collection := db.C("people")

    var p interface{}
    if err := collection.Find(nil).One(&p); err != nil {
        log.Fatalf("cannot get person: %v", err)
    }
    pretty.Println(p)
}

Data model

type M map[string] interface{}

M{"key": someValue}
type D []DocElem

type DocElem struct {
    Name string
    Value interface{}
}

D{{"key", someValue}}

Query by field value

package main

import (
	"fmt"
	"log"
	"time"

	"gopkg.in/mgo.v2"
	"gopkg.in/mgo.v2/bson"
)

type person struct {
	Name       string `bson:"_id"`
	Status     string
	StatusTime time.Time
}

func main() {
	session, err := mgo.Dial("localhost:27017")
	if err != nil {
		log.Fatalf("cannot dial mongoDB: %v", err)
	}
	db := session.DB("mongodbtalk")
	collection := db.C("people")

    q := bson.M{"_id": bson.M{"$gt": "Alice"}}
    iter := collection.Find(q).Iter()
    var p person
    for iter.Next(&p) {
        fmt.Printf("%s is %s at %v\n", p.Name, p.Status, p.StatusTime)
    }
	if err := iter.Err(); err != nil {
		log.Fatalf("iteration error: %v", err)
	}
}

Many equivalent encodings

package main

import (
	"fmt"
	"log"
	"time"

	"gopkg.in/mgo.v2"
	"gopkg.in/mgo.v2/bson"
)

type person struct {
	Name       string `bson:"_id"`
	Status     string
	StatusTime time.Time
}

func main() {
	session, err := mgo.Dial("localhost:27017")
	if err != nil {
		log.Fatalf("cannot dial mongoDB: %v", err)
	}
	db := session.DB("mongodbtalk")
	collection := db.C("people")

    type gtCondition struct {
        Value string `bson:"$gt"`
    }
    type gtQuery struct {
        Id gtCondition `bson:"_id"`
    }
    for i, q := range []interface{}{
        bson.M{"_id": bson.M{"$gt": "Alice"}},
        bson.D{{"_id", bson.D{{"$gt", "Alice"}}}},
        gtQuery{gtCondition{"Alice"}},
    } {
        var p person
        iter := collection.Find(q).Iter()
        for iter.Next(&p) {
            fmt.Printf("query %d: %s is %s at %v\n", i, p.Name, p.Status, p.StatusTime)
        }
        if err := iter.Err(); err != nil {
            log.Fatalf("iteration error: %v", err)
        }
    }
}

Let's make a web service!

GET /latest
PUT /status/:user
GET /status/:user

Make a new type to handle requests

type statusHandler struct {
    collection *mgo.Collection
}

func NewStatusHandler(collection *mgo.Collection) http.Handler {
    mux := http.NewServeMux()

    h := &statusHandler{
        collection: collection,
    }
    mux.HandleFunc("/latest", h.serveLatest)
    mux.HandleFunc("/status/", h.serveStatus)
    return mux
}

Serve latest status

func (h *statusHandler) serveLatest(w http.ResponseWriter, req *http.Request) {
    var p person
    if err := h.collection.Find(nil).Sort("-statustime").One(&p); err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }
    fmt.Fprintf(w, "%s is %s at %v\n", p.Name, p.Status, p.StatusTime)
}

Serve /status/:user

func (h *statusHandler) serveStatus(w http.ResponseWriter, req *http.Request) {
    name := strings.TrimPrefix(req.URL.Path, "/status/")
    switch req.Method {
    case "PUT":
        h.servePutStatus(w, req, name)
    case "GET":
        h.serveGetStatus(w, req, name)
    default:
        http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
    }
}

Serve GET /status/:user

func (h *statusHandler) serveGetStatus(w http.ResponseWriter, req *http.Request, name string) {
    var p person
    err := h.collection.Find(bson.D{{"_id", name}}).One(&p)
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }
    fmt.Fprintf(w, "%s is %s at %v\n", p.Name, p.Status, p.StatusTime)
}

Serve PUT /status/:user

func (h *statusHandler) servePutStatus(w http.ResponseWriter, req *http.Request, name string) {
    status, err := ioutil.ReadAll(req.Body)
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }
    if _, err := h.collection.Upsert(bson.D{{"_id", name}}, bson.D{{
        "$set", bson.D{{
            "status", strings.TrimSpace(string(status)),
        }, {
            "statustime", time.Now(),
        }},
    }}); err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
    }
}

Start the service

package main

import (
	"fmt"
	"io/ioutil"
	"log"
	"net/http"
	"strings"
	"time"

	"gopkg.in/mgo.v2"
	"gopkg.in/mgo.v2/bson"
)

func main() {
    session, err := mgo.Dial("localhost:27017")
    if err != nil {
        log.Fatalf("cannot dial mongoDB: %v", err)
    }
    db := session.DB("mongodbtalk")
    collection := db.C("people")
    log.Println("starting service on port 55667")
    err = http.ListenAndServe(":55667", NewStatusHandler(collection))
    if err != nil {
        log.Fatal(err)
    }
}

type person struct {
	Name       string `bson:"_id"`
	Status     string
	StatusTime time.Time
}

type statusHandler struct {
	collection *mgo.Collection
}

func NewStatusHandler(collection *mgo.Collection) http.Handler {
	mux := http.NewServeMux()

	h := &statusHandler{
		collection: collection,
	}
	mux.HandleFunc("/latest", h.serveLatest)
	mux.HandleFunc("/status/", h.serveStatus)
	return mux
}

func (h *statusHandler) serveLatest(w http.ResponseWriter, req *http.Request) {
	var p person
	if err := h.collection.Find(nil).Sort("-statustime").One(&p); err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
	fmt.Fprintf(w, "%s is %s at %v\n", p.Name, p.Status, p.StatusTime)
}

func (h *statusHandler) serveStatus(w http.ResponseWriter, req *http.Request) {
	name := strings.TrimPrefix(req.URL.Path, "/status/")
	switch req.Method {
	case "PUT":
		h.servePutStatus(w, req, name)
	case "GET":
		h.serveGetStatus(w, req, name)
	default:
		http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
	}
}

func (h *statusHandler) servePutStatus(w http.ResponseWriter, req *http.Request, name string) {
	status, err := ioutil.ReadAll(req.Body)
	if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
	if _, err := h.collection.Upsert(bson.D{{"_id", name}}, bson.D{{
		"$set", bson.D{{
			"status", strings.TrimSpace(string(status)),
		}, {
			"statustime", time.Now(),
		}},
	}}); err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
	}
}

func (h *statusHandler) serveGetStatus(w http.ResponseWriter, req *http.Request, name string) {
	var p person
	err := h.collection.Find(bson.D{{"_id", name}}).One(&p)
	if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
	fmt.Fprintf(w, "%s is %s at %v\n", p.Name, p.Status, p.StatusTime)
}

BUT!

What happens over time?

By default, mgo.Session does not cope

Mgo implements connection pooling,
but when a connection goes down
it stays down.

Consistency

Reads and writes will always be made to the primary server
over the same connection.

Reads can go to secondary; first write switches to primary.

Read from any secondary; write to primary.

The escape hatch

Just like making a new connection except very cheap.

Strategy

package main

import (
	"fmt"
	"io/ioutil"
	"log"
	"net/http"
	"strings"
	"time"

	"gopkg.in/mgo.v2"
	"gopkg.in/mgo.v2/bson"
)

func main() {
	session, err := mgo.Dial("localhost:27017")
	if err != nil {
		log.Fatalf("cannot dial mongoDB: %v", err)
	}
	db := session.DB("mongodbtalk")
	collection := db.C("people")
	log.Println("starting service on port 55667")
	err = http.ListenAndServe(":55667", NewStatusHandler(collection))
	if err != nil {
		log.Fatal(err)
	}
}

type person struct {
	Name       string `bson:"_id"`
	Status     string
	StatusTime time.Time
}

type statusHandler struct {
	collection *mgo.Collection
}

func NewStatusHandler(collection *mgo.Collection) http.Handler {
    handler := func(w http.ResponseWriter, req *http.Request) {
        mux := http.NewServeMux()

        // Copy the session.
        newSession := collection.Database.Session.Copy()
        h := &statusHandler{
            collection: collection.With(newSession),
        }
        mux.HandleFunc("/latest", h.serveLatest)
        mux.HandleFunc("/status/", h.serveStatus)
        mux.ServeHTTP(w, req)
    }
    return http.HandlerFunc(handler)
}

func (h *statusHandler) serveLatest(w http.ResponseWriter, req *http.Request) {
	var p person
	if err := h.collection.Find(nil).Sort("-statustime").One(&p); err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
	fmt.Fprintf(w, "%s is %s at %v\n", p.Name, p.Status, p.StatusTime)
}

func (h *statusHandler) serveStatus(w http.ResponseWriter, req *http.Request) {
	name := strings.TrimPrefix(req.URL.Path, "/status/")
	switch req.Method {
	case "PUT":
		h.servePutStatus(w, req, name)
	case "GET":
		h.serveGetStatus(w, req, name)
	default:
		http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
	}
}

func (h *statusHandler) servePutStatus(w http.ResponseWriter, req *http.Request, name string) {
	status, err := ioutil.ReadAll(req.Body)
	if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
	if _, err := h.collection.Upsert(bson.D{{"_id", name}}, bson.D{{
		"$set", bson.D{{
			"status", strings.TrimSpace(string(status)),
		}, {
			"statustime", time.Now(),
		}},
	}}); err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
	}
}

func (h *statusHandler) serveGetStatus(w http.ResponseWriter, req *http.Request, name string) {
	var p person
	err := h.collection.Find(bson.D{{"_id", name}}).One(&p)
	if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
	fmt.Fprintf(w, "%s is %s at %v\n", p.Name, p.Status, p.StatusTime)
}

But...

The way out: sync.Pool

func newStatusHandler() *statusHandler {
    h := &statusHandler{
        mux: http.NewServeMux(),
    }
    h.mux.HandleFunc("/latest", h.serveLatest)
    h.mux.HandleFunc("/status/", h.serveStatus)
    return h
}

Use sync.Pool to amortise creation time

var statusHandlerPool = sync.Pool{
    New: func() interface{} {
        return newStatusHandler()
    },
}

func getStatusHandler() *statusHandler {
    return statusHandlerPool.Get().(*statusHandler)
}

func (h *statusHandler) Close() {
    h.collection.Database.Session.Close()
    h.collection = nil
    statusHandlerPool.Put(h)
}

Use it

package main

import (
	"fmt"
	"io/ioutil"
	"log"
	"net/http"
	"strings"
	"sync"
	"time"

	"gopkg.in/mgo.v2"
	"gopkg.in/mgo.v2/bson"
)

func main() {
	session, err := mgo.Dial("localhost:27017")
	if err != nil {
		log.Fatalf("cannot dial mongoDB: %v", err)
	}
	db := session.DB("mongodbtalk")
	collection := db.C("people")
	log.Println("starting service on port 55667")
	err = http.ListenAndServe(":55667", NewStatusHandler(collection))
	if err != nil {
		log.Fatal(err)
	}
}

type person struct {
	Name       string `bson:"_id"`
	Status     string
	StatusTime time.Time
}

type statusHandler struct {
	mux        *http.ServeMux
	collection *mgo.Collection
}

func newStatusHandler() *statusHandler {
	h := &statusHandler{
		mux: http.NewServeMux(),
	}
	h.mux.HandleFunc("/latest", h.serveLatest)
	h.mux.HandleFunc("/status/", h.serveStatus)
	return h
}

// POOL OMIT
var statusHandlerPool = sync.Pool{
	New: func() interface{} {
		return newStatusHandler()
	},
}

func getStatusHandler() *statusHandler {
	return statusHandlerPool.Get().(*statusHandler)
}

func (h *statusHandler) Close() {
	h.collection.Database.Session.Close()
	h.collection = nil
	statusHandlerPool.Put(h)
}

// ENDPOOL OMIT

func NewStatusHandler(collection *mgo.Collection) http.Handler {
    handler := func(w http.ResponseWriter, req *http.Request) {
        h := getStatusHandler()
        defer h.Close()

        newSession := collection.Database.Session.Copy()
        h.collection = collection.With(newSession)

        h.mux.ServeHTTP(w, req)
    }
    return http.HandlerFunc(handler)
}

func (h *statusHandler) serveLatest(w http.ResponseWriter, req *http.Request) {
	var p person
	if err := h.collection.Find(nil).Sort("-statustime").One(&p); err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
	fmt.Fprintf(w, "%s is %s at %v\n", p.Name, p.Status, p.StatusTime)
}

func (h *statusHandler) serveStatus(w http.ResponseWriter, req *http.Request) {
	name := strings.TrimPrefix(req.URL.Path, "/status/")
	switch req.Method {
	case "PUT":
		h.servePutStatus(w, req, name)
	case "GET":
		h.serveGetStatus(w, req, name)
	default:
		http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
	}
}

func (h *statusHandler) servePutStatus(w http.ResponseWriter, req *http.Request, name string) {
	status, err := ioutil.ReadAll(req.Body)
	if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
	if _, err := h.collection.Upsert(bson.D{{"_id", name}}, bson.D{{
		"$set", bson.D{{
			"status", strings.TrimSpace(string(status)),
		}, {
			"statustime", time.Now(),
		}},
	}}); err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
	}
}

func (h *statusHandler) serveGetStatus(w http.ResponseWriter, req *http.Request, name string) {
	var p person
	err := h.collection.Find(bson.D{{"_id", name}}).One(&p)
	if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
	fmt.Fprintf(w, "%s is %s at %v\n", p.Name, p.Status, p.StatusTime)
}

Conclusion

Thank you

Roger Peppe

Canonical Ltd