Loki源码浅析

Loki

1. Loki架构图

202401311516224

从上图可以看到 Loki 有3个模块:DistributorIngesterQuerier,还有其他模块没有在图中画出,后续会提到各个模块。其中Distributor模块负责接受客户端的请求,然后将请求转发到Ingester模块。Ingester模块负责接收Distributor转发过来的请求,然后将数据写入到对象存储中。Querier模块负责接收客户端的请求(lokicli、Grafana等),然后将请求转发到Ingester模块。

2. Distributor Push Handler

promtail配置文件中,client配置中URL配置为 http://loki:3100/loki/api/v1/push,可以知道Loki的数据入口是/loki/api/v1/push,该接口在初始化 Distributor 的初始化函数 initDistributor 进行初始化

2.1 initDistributor

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// pkg/loki/modules.go
func (t *Loki) initDistributor() (services.Service, error) {
 var err error
 // 实例化distributor
 t.distributor, err = distributor.New(
  t.Cfg.Distributor,
  t.Cfg.IngesterClient,
  t.tenantConfigs,
  t.ring,
  t.Overrides,
  prometheus.DefaultRegisterer,
 )
 if err != nil {
  return nil, err
 }

 // Register the distributor to receive Push requests over GRPC
 // EXCEPT when running with `-target=all` or `-target=` contains `ingester`
 // target不为all、write或者ingrster,那么就注册distributor到grpc中
 if !t.Cfg.isModuleEnabled(All) && !t.Cfg.isModuleEnabled(Write) && !t.Cfg.isModuleEnabled(Ingester) {
  logproto.RegisterPusherServer(t.Server.GRPC, t.distributor)
 }

 // If the querier module is not part of this process we need to check if multi-tenant queries are enabled.
 // If the querier module is part of this process the querier module will configure everything.
 if !t.Cfg.isModuleEnabled(Querier) && t.Cfg.Querier.MultiTenantQueriesEnabled {
  tenant.WithDefaultResolver(tenant.NewMultiResolver())
 }

 // 加载RecoveryHTTPMiddleware,HTTPAuthMiddleware中间件,其中HTTPAuthMiddleware负责解析租户ID
 pushHandler := middleware.Merge(
  serverutil.RecoveryHTTPMiddleware,
  t.HTTPAuthMiddleware,
 ).Wrap(http.HandlerFunc(t.distributor.PushHandler))

 t.Server.HTTP.Path("/distributor/ring").Methods("GET", "POST").Handler(t.distributor)

 if t.Cfg.InternalServer.Enable {
  t.InternalServer.HTTP.Path("/distributor/ring").Methods("GET").Handler(t.distributor)
 }

 t.Server.HTTP.Path("/api/prom/push").Methods("POST").Handler(pushHandler)
 t.Server.HTTP.Path("/loki/api/v1/push").Methods("POST").Handler(pushHandler)
 return t.distributor, nil
}

2.2 PushHandler

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
// pkg/distributor/http.go
func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) {
 logger := util_log.WithContext(r.Context(), util_log.Logger)
 tenantID, err := tenant.TenantID(r.Context())
 if err != nil {
  level.Error(logger).Log("msg", "error getting tenant id", "err", err)
  http.Error(w, err.Error(), http.StatusBadRequest)
  return
 }
 // 解析HTTP请求的函数,根据请求体大小和`Content-Encoding`选择不同的解析方法;
 // 根据ContentType(内容类型)选择不同的解码方式
 req, err := push.ParseRequest(logger, tenantID, r, d.tenantsRetention)
 if err != nil {
  if d.tenantConfigs.LogPushRequest(tenantID) {
   level.Debug(logger).Log(
    "msg", "push request failed",
    "code", http.StatusBadRequest,
    "err", err,
   )
  }
  http.Error(w, err.Error(), http.StatusBadRequest)
  return
 }

 if d.tenantConfigs.LogPushRequestStreams(tenantID) {
  var sb strings.Builder
  for _, s := range req.Streams {
   sb.WriteString(s.Labels)
  }
  level.Debug(logger).Log(
   "msg", "push request streams",
   "streams", sb.String(),
  )
 }

 // 调用Push方法将请求推送到指定的Distributo
 _, err = d.Push(r.Context(), req)
 if err == nil {
  if d.tenantConfigs.LogPushRequest(tenantID) {
   level.Debug(logger).Log(
    "msg", "push request successful",
   )
  }
  w.WriteHeader(http.StatusNoContent)
  return
 }

 resp, ok := httpgrpc.HTTPResponseFromError(err)
 if ok {
  body := string(resp.Body)
  if d.tenantConfigs.LogPushRequest(tenantID) {
   level.Debug(logger).Log(
    "msg", "push request failed",
    "code", resp.Code,
    "err", body,
   )
  }
  http.Error(w, body, int(resp.Code))
 } else {
  if d.tenantConfigs.LogPushRequest(tenantID) {
   level.Debug(logger).Log(
    "msg", "push request failed",
    "code", http.StatusInternalServerError,
    "err", err.Error(),
   )
  }
  http.Error(w, err.Error(), http.StatusInternalServerError)
 }
}

2.3 ParseRequest

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
// 1. 首先获取请求体的大小,并根据Content-Encoding(内容编码)选择不同的处理方式。
// 2. 如果Content-Encoding为空,则直接使用请求体;
// 3. 如果Content-Encoding为"snappy",则使用loki_util.NewSizeReader函数解析请求体;
// 4. 如果Content-Encoding为"gzip",则使用gzip.NewReader函数解压缩请求体;
// 5. 如果Content-Encoding为"deflate",则使用flate.NewReader函数解压缩请求体;
// 6. 如果Content-Encoding为其他值,则返回一个错误
func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRetention TenantsRetention) (*logproto.PushRequest, error) {
 // Body
 var body io.Reader
 // bodySize should always reflect the compressed size of the request body
 bodySize := loki_util.NewSizeReader(r.Body)
 contentEncoding := r.Header.Get(contentEnc)
 switch contentEncoding {
 case "":
  body = bodySize
 case "snappy":
  // Snappy-decoding is done by `util.ParseProtoReader(..., util.RawSnappy)` below.
  // Pass on body bytes. Note: HTTP clients do not need to set this header,
  // but they sometimes do. See #3407.
  body = bodySize
 case "gzip":
  gzipReader, err := gzip.NewReader(bodySize)
  if err != nil {
   return nil, err
  }
  defer gzipReader.Close()
  body = gzipReader
 case "deflate":
  flateReader := flate.NewReader(bodySize)
  defer flateReader.Close()
  body = flateReader
 default:
  return nil, fmt.Errorf("Content-Encoding %q not supported", contentEncoding)
 }

 contentType := r.Header.Get(contentType)
 var (
  entriesSize      int64
  streamLabelsSize int64
  totalEntries     int64
  // 定义请求体数据结构
  req              logproto.PushRequest
 )

 contentType, _ /* params */, err := mime.ParseMediaType(contentType)
 if err != nil {
  return nil, err
 }

 switch contentType {
 case applicationJSON:

  var err error

  // todo once https://github.com/weaveworks/common/commit/73225442af7da93ec8f6a6e2f7c8aafaee3f8840 is in Loki.
  // We can try to pass the body as bytes.buffer instead to avoid reading into another buffer.
  if loghttp.GetVersion(r.RequestURI) == loghttp.VersionV1 {
   err = unmarshal.DecodePushRequest(body, &req)
  } else {
   err = unmarshal2.DecodePushRequest(body, &req)
  }

  if err != nil {
   return nil, err
  }

 default:
  // When no content-type header is set or when it is set to
  // `application/x-protobuf`: expect snappy compression.
  if err := util.ParseProtoReader(r.Context(), body, int(r.ContentLength), math.MaxInt32, &req, util.RawSnappy); err != nil {
   return nil, err
  }
 }

 mostRecentEntry := time.Unix(0, 0)

 for _, s := range req.Streams {
  streamLabelsSize += int64(len(s.Labels))
  var retentionHours string
  if tenantsRetention != nil {
   lbs, err := syntax.ParseLabels(s.Labels)
   if err != nil {
    return nil, err
   }
   retentionHours = fmt.Sprintf("%d", int64(math.Floor(tenantsRetention.RetentionPeriodFor(userID, lbs).Hours())))
  }
  for _, e := range s.Entries {
   totalEntries++
   entriesSize += int64(len(e.Line))
   bytesIngested.WithLabelValues(userID, retentionHours).Add(float64(int64(len(e.Line))))
   bytesReceivedStats.Inc(int64(len(e.Line)))
   if e.Timestamp.After(mostRecentEntry) {
    mostRecentEntry = e.Timestamp
   }
  }
 }

 // incrementing tenant metrics if we have a tenant.
 if totalEntries != 0 && userID != "" {
  linesIngested.WithLabelValues(userID).Add(float64(totalEntries))
 }
 linesReceivedStats.Inc(totalEntries)

 level.Debug(logger).Log(
  "msg", "push request parsed",
  "path", r.URL.Path,
  "contentType", contentType,
  "contentEncoding", contentEncoding,
  "bodySize", humanize.Bytes(uint64(bodySize.Size())),
  "streams", len(req.Streams),
  "entries", totalEntries,
  "streamLabelsSize", humanize.Bytes(uint64(streamLabelsSize)),
  "entriesSize", humanize.Bytes(uint64(entriesSize)),
  "totalSize", humanize.Bytes(uint64(entriesSize+streamLabelsSize)),
  "mostRecentLagMs", time.Since(mostRecentEntry).Milliseconds(),
 )
 return &req, nil
}

2.4 PushRequest 数据结构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// pkg/push/push.pb.go
type PushRequest struct {
 Streams []Stream `protobuf:"bytes,1,rep,name=streams,proto3,customtype=Stream" json:"streams"`
}

// pkg/push/types.go
// Stream 流包含一个唯一的标签集,作为一个字符串,然后还包含一组日志条目
type Stream struct {
 Labels  string  `protobuf:"bytes,1,opt,name=labels,proto3" json:"labels"`
 Entries []Entry `protobuf:"bytes,2,rep,name=entries,proto3,customtype=EntryAdapter" json:"entries"`
 Hash    uint64  `protobuf:"varint,3,opt,name=hash,proto3" json:"-"`
}

2.5 Push

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error) {
 // 首先从上下文中获取租户ID,然后检查请求对象req是否为空,如果为空则直接返回一个空的响应对象
 tenantID, err := tenant.TenantID(ctx)
 if err != nil {
  return nil, err
 }

 // Return early if request does not contain any streams
 if len(req.Streams) == 0 {
  return &logproto.PushResponse{}, nil
 }

 // First we flatten out the request into a list of samples.
 // We use the heuristic of 1 sample per TS to size the array.
 // We also work out the hash value at the same time.
 // 将每个流数据转换为一个streamTracker对象,并计算出流数据的哈希值。
 // 函数还会对流数据进行验证,如果验证失败则会记录相关信息并继续处理下一个流数据
 streams := make([]streamTracker, 0, len(req.Streams))
 keys := make([]uint32, 0, len(req.Streams))
 validatedLineSize := 0
 validatedLineCount := 0

 var validationErr error
 validationContext := d.validator.getValidationContextForTime(time.Now(), tenantID)

 // 遍历请求中的每一个Stream集
 for _, stream := range req.Streams {
  // Return early if stream does not contain any entries
  if len(stream.Entries) == 0 {
   continue
  }

  // Truncate first so subsequent steps have consistent line lengths
  d.truncateLines(validationContext, &stream)

  // 解析Stream中的labels计算出流数据的哈希值
  stream.Labels, stream.Hash, err = d.parseStreamLabels(validationContext, stream.Labels, &stream)
  if err != nil {
   validationErr = err
   validation.DiscardedSamples.WithLabelValues(validation.InvalidLabels, tenantID).Add(float64(len(stream.Entries)))
   bytes := 0
   for _, e := range stream.Entries {
    bytes += len(e.Line)
   }
   validation.DiscardedBytes.WithLabelValues(validation.InvalidLabels, tenantID).Add(float64(bytes))
   continue
  }

  // 将流数据进行分片,并将分片后的数据发送到不同的ingesters
  n := 0
  streamSize := 0
  for _, entry := range stream.Entries {
   if err := d.validator.ValidateEntry(validationContext, stream.Labels, entry); err != nil {
    validationErr = err
    continue
   }

   stream.Entries[n] = entry

   // If configured for this tenant, increment duplicate timestamps. Note, this is imperfect
   // since Loki will accept out of order writes it doesn't account for separate
   // pushes with overlapping time ranges having entries with duplicate timestamps
   if validationContext.incrementDuplicateTimestamps && n != 0 {
    // Traditional logic for Loki is that 2 lines with the same timestamp and
    // exact same content will be de-duplicated, (i.e. only one will be stored, others dropped)
    // To maintain this behavior, only increment the timestamp if the log content is different
    if stream.Entries[n-1].Line != entry.Line {
     stream.Entries[n].Timestamp = maxT(entry.Timestamp, stream.Entries[n-1].Timestamp.Add(1*time.Nanosecond))
    }
   }

   n++
   validatedLineSize += len(entry.Line)
   validatedLineCount++
   streamSize += len(entry.Line)
  }
  stream.Entries = stream.Entries[:n]

  shardStreamsCfg := d.validator.Limits.ShardStreams(tenantID)
  if shardStreamsCfg.Enabled {
   derivedKeys, derivedStreams := d.shardStream(stream, streamSize, tenantID)
   keys = append(keys, derivedKeys...)
   streams = append(streams, derivedStreams...)
  } else {
   keys = append(keys, util.TokenFor(tenantID, stream.Labels))
   streams = append(streams, streamTracker{stream: stream})
  }
 }

 // Return early if none of the streams contained entries
 if len(streams) == 0 {
  return &logproto.PushResponse{}, validationErr
 }

 now := time.Now()
 // 根据配置信息进行限流,并返回429响应
 if !d.ingestionRateLimiter.AllowN(now, tenantID, validatedLineSize) {
  // Return a 429 to indicate to the client they are being rate limited
  validation.DiscardedSamples.WithLabelValues(validation.RateLimited, tenantID).Add(float64(validatedLineCount))
  validation.DiscardedBytes.WithLabelValues(validation.RateLimited, tenantID).Add(float64(validatedLineSize))
  return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, tenantID, int(d.ingestionRateLimiter.Limit(now, tenantID)), validatedLineCount, validatedLineSize)
 }

 const maxExpectedReplicationSet = 5 // typical replication factor 3 plus one for inactive plus one for luck
 var descs [maxExpectedReplicationSet]ring.InstanceDesc

 // 根据ingesters的数量创建一个map,用于存储每个ingester对应的流数据
 streamsByIngester := map[string][]*streamTracker{}
 ingesterDescs := map[string]ring.InstanceDesc{}
 for i, key := range keys {
  replicationSet, err := d.ingestersRing.Get(key, ring.WriteNoExtend, descs[:0], nil, nil)
  if err != nil {
   return nil, err
  }

  streams[i].minSuccess = len(replicationSet.Instances) - replicationSet.MaxErrors
  streams[i].maxFailures = replicationSet.MaxErrors
  for _, ingester := range replicationSet.Instances {
   streamsByIngester[ingester.Addr] = append(streamsByIngester[ingester.Addr], &streams[i])
   ingesterDescs[ingester.Addr] = ingester
  }
 }

 tracker := pushTracker{
  done: make(chan struct{}, 1), // buffer avoids blocking if caller terminates - sendSamples() only sends once on each
  err:  make(chan error, 1),
 }
 tracker.streamsPending.Store(int32(len(streams)))
 // 根据配置信息和流数据的哈希值选择一个ingester,并将流数据发送到该ingester中
 for ingester, streams := range streamsByIngester {
  go func(ingester ring.InstanceDesc, samples []*streamTracker) {
   // Use a background context to make sure all ingesters get samples even if we return early
   localCtx, cancel := context.WithTimeout(context.Background(), d.clientCfg.RemoteTimeout)
   defer cancel()
   localCtx = user.InjectOrgID(localCtx, tenantID)
   if sp := opentracing.SpanFromContext(ctx); sp != nil {
    localCtx = opentracing.ContextWithSpan(localCtx, sp)
   }
   d.sendStreams(localCtx, ingester, samples, &tracker)
  }(ingesterDescs[ingester], streams)
 }
 select {
 case err := <-tracker.err:
  return nil, err
 case <-tracker.done:
  return &logproto.PushResponse{}, validationErr
 case <-ctx.Done():
  return nil, ctx.Err()
 }
}

3. Ingester

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// pkg/ingester/ingester.go
// Push implements logproto.Pusher.
// Push 实现 logproto.Pusher.
func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error) {
 instanceID, err := tenant.TenantID(ctx)
 if err != nil {
  return nil, err
 } else if i.readonly {
  return nil, ErrReadOnly
 }

 instance, err := i.GetOrCreateInstance(instanceID)
 if err != nil {
  return &logproto.PushResponse{}, err
 }
 err = instance.Push(ctx, req)
 return &logproto.PushResponse{}, err
}
0%