Skip to content

Commit

Permalink
telegram: rewrite using experiment/urlgetter (#683)
Browse files Browse the repository at this point in the history
Part of #646

Passes all checks from ooni/jafar#28
  • Loading branch information
bassosimone authored Jun 9, 2020
1 parent bb83b5d commit 0a0028e
Show file tree
Hide file tree
Showing 2 changed files with 325 additions and 330 deletions.
231 changes: 66 additions & 165 deletions experiment/telegram/telegram.go
Original file line number Diff line number Diff line change
@@ -1,60 +1,37 @@
// Package telegram contains the Telegram network experiment. This file
// in particular is a pure-Go implementation of that.
// Package telegram contains the Telegram network experiment.
//
// See https://github.com/ooni/spec/blob/master/nettests/ts-020-telegram.md.
package telegram

import (
"bytes"
"context"
"errors"
"fmt"
"math/rand"
"sync"
"strings"
"time"

"github.com/ooni/probe-engine/atomicx"
"github.com/ooni/probe-engine/internal/httpheader"
"github.com/ooni/probe-engine/internal/netxlogger"
"github.com/ooni/probe-engine/internal/oonidatamodel"
"github.com/ooni/probe-engine/internal/oonitemplates"
"github.com/ooni/probe-engine/experiment/urlgetter"
"github.com/ooni/probe-engine/model"
"github.com/ooni/probe-engine/netx/modelx"
)

const (
testName = "telegram"
testVersion = "0.0.5"
testVersion = "0.1.0"
)

// Config contains the experiment config.
// Config contains the telegram experiment config.
type Config struct{}

// TestKeys contains telegram test keys.
type TestKeys struct {
Agent string `json:"agent"`
Queries oonidatamodel.DNSQueriesList `json:"queries"`
Requests oonidatamodel.RequestList `json:"requests"`
TCPConnect oonidatamodel.TCPConnectList `json:"tcp_connect"`
TelegramHTTPBlocking bool `json:"telegram_http_blocking"`
TelegramTCPBlocking bool `json:"telegram_tcp_blocking"`
TelegramWebFailure *string `json:"telegram_web_failure"`
TelegramWebStatus string `json:"telegram_web_status"`
TLSHandshakes oonidatamodel.TLSHandshakesList `json:"tls_handshakes"`
urlgetter.TestKeys
TelegramHTTPBlocking bool `json:"telegram_http_blocking"`
TelegramTCPBlocking bool `json:"telegram_tcp_blocking"`
TelegramWebFailure *string `json:"telegram_web_failure"`
TelegramWebStatus string `json:"telegram_web_status"`
}

func registerExtensions(m *model.Measurement) {
oonidatamodel.ExtHTTP.AddTo(m)
oonidatamodel.ExtDNS.AddTo(m)
oonidatamodel.ExtTCPConnect.AddTo(m)
oonidatamodel.ExtTLSHandshake.AddTo(m)
}

type urlMeasurements struct {
method string
results *oonitemplates.HTTPDoResults
}

func newTestKeys() *TestKeys {
// NewTestKeys creates new telegram TestKeys.
func NewTestKeys() *TestKeys {
return &TestKeys{
TelegramHTTPBlocking: true,
TelegramTCPBlocking: true,
Expand All @@ -63,171 +40,95 @@ func newTestKeys() *TestKeys {
}
}

func (tk *TestKeys) processone(v *urlMeasurements) error {
if v == nil {
return errors.New("passed nil data to processone")
}
r := v.results
if r == nil {
return errors.New("passed nil results")
}
tk.Agent = "redirect"
// update the requests and tcp-connect entries
tk.Queries = append(
tk.Queries, oonidatamodel.NewDNSQueriesList(r.TestKeys)...,
)
tk.Requests = append(
tk.Requests, oonidatamodel.NewRequestList(r.TestKeys)...,
)
tk.TCPConnect = append(
tk.TCPConnect,
oonidatamodel.NewTCPConnectList(r.TestKeys)...,
)
tk.TLSHandshakes = append(
tk.TLSHandshakes,
oonidatamodel.NewTLSHandshakesList(r.TestKeys)...,
)
// process access points first
if v.method != "GET" {
if r.Error == nil {
// Update updates the TestKeys using the given MultiOutput result.
func (tk *TestKeys) Update(v urlgetter.MultiOutput) {
// update the easy to update entries first
tk.NetworkEvents = append(tk.NetworkEvents, v.TestKeys.NetworkEvents...)
tk.Queries = append(tk.Queries, v.TestKeys.Queries...)
tk.Requests = append(tk.Requests, v.TestKeys.Requests...)
tk.TCPConnect = append(tk.TCPConnect, v.TestKeys.TCPConnect...)
tk.TLSHandshakes = append(tk.TLSHandshakes, v.TestKeys.TLSHandshakes...)
// then process access points
if v.Input.Config.Method != "GET" {
if v.TestKeys.Failure == nil {
tk.TelegramHTTPBlocking = false
tk.TelegramTCPBlocking = false
return nil // found successful access point connection
return // found successful access point connection
}
for _, connect := range r.TestKeys.Connects {
if connect.Error == nil {
tk.TelegramTCPBlocking = false
break // not a connect error meaning we can connect
}
if v.TestKeys.FailedOperation == nil || *v.TestKeys.FailedOperation != modelx.ConnectOperation {
tk.TelegramTCPBlocking = false
}
return nil
return
}
// now take care of web
if tk.TelegramWebStatus != "ok" {
return nil // we already flipped the state
return // we already flipped the state
}
if r.Error != nil {
failureString := r.Error.Error()
if v.TestKeys.Failure != nil {
tk.TelegramWebStatus = "blocked"
tk.TelegramWebFailure = &failureString
return nil
tk.TelegramWebFailure = v.TestKeys.Failure
return
}
if r.StatusCode != 200 {
if v.TestKeys.HTTPResponseStatus != 200 {
failureString := "http_request_failed" // MK uses it
tk.TelegramWebFailure = &failureString
tk.TelegramWebStatus = "blocked"
return nil
return
}
title := []byte(`<title>Telegram Web</title>`)
if bytes.Contains(r.BodySnap, title) == false {
title := `<title>Telegram Web</title>`
if strings.Contains(v.TestKeys.HTTPResponseBody, title) == false {
failureString := "telegram_missing_title_error"
tk.TelegramWebFailure = &failureString
tk.TelegramWebStatus = "blocked"
return nil
}
return nil
}

func (tk *TestKeys) processall(m map[string]*urlMeasurements) error {
for _, v := range m {
if err := tk.processone(v); err != nil {
return err
}
return
}
return nil
return
}

type measurer struct {
config Config
}

func (m *measurer) ExperimentName() string {
func (m measurer) ExperimentName() string {
return testName
}

func (m *measurer) ExperimentVersion() string {
func (m measurer) ExperimentVersion() string {
return testVersion
}

func (m *measurer) Run(
ctx context.Context,
sess model.ExperimentSession,
measurement *model.Measurement,
callbacks model.ExperimentCallbacks,
) error {
func (m measurer) Run(ctx context.Context, sess model.ExperimentSession,
measurement *model.Measurement, callbacks model.ExperimentCallbacks) error {
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
registerExtensions(measurement)
// setup data container
var urlmeasurements = map[string]*urlMeasurements{
"http://149.154.175.50/": &urlMeasurements{method: "POST"},
"http://149.154.167.51/": &urlMeasurements{method: "POST"},
"http://149.154.175.100/": &urlMeasurements{method: "POST"},
"http://149.154.167.91/": &urlMeasurements{method: "POST"},
"http://149.154.171.5/": &urlMeasurements{method: "POST"},

"http://149.154.175.50:443/": &urlMeasurements{method: "POST"},
"http://149.154.167.51:443/": &urlMeasurements{method: "POST"},
"http://149.154.175.100:443/": &urlMeasurements{method: "POST"},
"http://149.154.167.91:443/": &urlMeasurements{method: "POST"},
"http://149.154.171.5:443/": &urlMeasurements{method: "POST"},

"http://web.telegram.org/": &urlMeasurements{method: "GET"},
"https://web.telegram.org/": &urlMeasurements{method: "GET"},
urlgetter.RegisterExtensions(measurement)
inputs := []urlgetter.MultiInput{
{Target: "http://149.154.175.50/", Config: urlgetter.Config{Method: "POST"}},
{Target: "http://149.154.167.51/", Config: urlgetter.Config{Method: "POST"}},
{Target: "http://149.154.175.100/", Config: urlgetter.Config{Method: "POST"}},
{Target: "http://149.154.167.91/", Config: urlgetter.Config{Method: "POST"}},
{Target: "http://149.154.171.5/", Config: urlgetter.Config{Method: "POST"}},

{Target: "http://149.154.175.50:443/", Config: urlgetter.Config{Method: "POST"}},
{Target: "http://149.154.167.51:443/", Config: urlgetter.Config{Method: "POST"}},
{Target: "http://149.154.175.100:443/", Config: urlgetter.Config{Method: "POST"}},
{Target: "http://149.154.167.91:443/", Config: urlgetter.Config{Method: "POST"}},
{Target: "http://149.154.171.5:443/", Config: urlgetter.Config{Method: "POST"}},

{Target: "http://web.telegram.org/", Config: urlgetter.Config{Method: "GET"}},
{Target: "https://web.telegram.org/", Config: urlgetter.Config{Method: "GET"}},
}
// run all measurements in parallel
var (
completed = atomicx.NewInt64()
mu sync.Mutex
waitgroup sync.WaitGroup
)
waitgroup.Add(len(urlmeasurements))
for key := range urlmeasurements {
go func(key string) {
defer waitgroup.Done()
// Avoid making all requests concurrently
gen := rand.New(rand.NewSource(time.Now().UnixNano()))
sleeptime := time.Duration(gen.Intn(5000)) * time.Millisecond
select {
case <-time.After(sleeptime):
case <-ctx.Done():
return
}
mu.Lock()
entry := urlmeasurements[key]
mu.Unlock()
entry.results = oonitemplates.HTTPDo(ctx, oonitemplates.HTTPDoConfig{
Accept: httpheader.RandomAccept(),
AcceptLanguage: httpheader.RandomAcceptLanguage(),
Beginning: measurement.MeasurementStartTimeSaved,
Handler: netxlogger.NewHandler(sess.Logger()),
Method: entry.method,
URL: key,
UserAgent: httpheader.RandomUserAgent(),
})
sofar := completed.Add(1)
percentage := float64(sofar) / float64(len(urlmeasurements))
callbacks.OnProgress(percentage, fmt.Sprintf(
"telegram: access %s: %s", key, errString(entry.results.Error),
))
}(key)
multi := urlgetter.Multi{Session: sess}
testkeys := NewTestKeys()
testkeys.Agent = "redirect"
measurement.TestKeys = testkeys
for entry := range multi.Collect(ctx, inputs, "telegram", callbacks) {
testkeys.Update(entry)
}
waitgroup.Wait()
// fill the measurement entry
testkeys := newTestKeys()
measurement.TestKeys = &testkeys
return testkeys.processall(urlmeasurements)
return nil
}

// NewExperimentMeasurer creates a new ExperimentMeasurer.
func NewExperimentMeasurer(config Config) model.ExperimentMeasurer {
return &measurer{config: config}
}

func errString(err error) (s string) {
s = "success"
if err != nil {
s = err.Error()
}
return
return measurer{config: config}
}
Loading

0 comments on commit 0a0028e

Please sign in to comment.